노트북 환경 구축
1. 꼭 필요한 것은 아니지만.... 한글 데이터 처리를 위한 폰트 (나눔 글꼴의 예)
!sudo apt-get install -y fonts-nanum
!sudo fc-cache -fv
!rm ~/.cache/matplotlib -rf
데이터 불러오기
1. 인터넷에 공개 되어 있는 텍스트 파일 다운로드
urllib.request.urlretrieve("source_url", filename="local_file_name")
2. git에 올려져 있는 데이터 clone하기
!git clone git_repo_url.git
3. pandas를 이용한 csv 파일 불러오기
final_data = pd.read_csv("file_path", delimiter='구분자, 탭이면 \t, comma면 무시해도 됨', quoting=불러 올 컬럼 수)
데이터 정리
1. Null 정리
- null인 데이터 개수가 몇 개 되지 않으면 그 행을 제거
data.dropna(inplace=True)
2. 문자열 컬럼의 양끝의 불필요한 공백 제거(strip)
data['target_column'] = data[' target_column '].str.strip()
3. 중복 데이터 제거
그냥 제거하지 말고, 중복이 얼마나 되어 있는지 체크하고 지울만하면 지우도록 한다.
data[' target_column'].duplicated().sum()
전체 데이터 개수 대비 삭제 해도 될만한 수량이면 삭제, 큰 비중이라면 데이터 출처에 문의를 해봐야 할 듯.
처리 단계별로 dataframe을 유지하려면 inplace는False로 두고, 처리 된 결과를 새로운 dataframe으로 받도록 함.
data.drop_duplicates(subset=[' target_column'], inplace=True)
4. 불필요한 문자 제거
정규식을 이용한 처리.
- 조건에 맞는 문자열을 아무것도 없는 데이터로 교체하는 방법
data['target_column'].replace('[^가-힣 ]','', regex=True)
또는
data[' target_column'] = data[' target_column'].str.replace('[^가-힣 ]','', regex=True)
5. 분류 할 데이터의 비율 검토
- value_counts로 확인
data [' target_column'].value_counts()
데이터 분리
1. Feature/ Label 분리
- 단순하게 dataframe에서 특정 컬럼을 지정하여 분리하는 방법
features = data['feature_column']
labels = data['label']
- 특정 컬럼과 그 나머지로 분리하는 방법
. 특정 컬럼은 1번과 동일하게 컬럼명으로 얻고, 나머지는 drop을 한 결과를 받으면 됨.inplace에 주의
df.drop(column_name 또는 column_name의 list, axis=1, inplace=False)
2. train/ test 데이터 분리
from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split(features, labels , test_size=0.2, random_state=41)
label의 비율을 유사하게 하려면 stratify 컬럼으로 label 지정
x_train, x_test, y_train, y_test = train_test_split(features, labels , test_size=0.2, stratify=labels, random_state=41)
문자열 데이터의 벡터화
1. TF-IDF를 이용한 단어의 index 부여(토큰화)
from sklearn.feature_extraction.text import TfidfVectorizer
tfidf = TfidfVectorizer()
x_train_v = tfidf.fit_transform( x_train )
x_test_v = tfidf.transform( x_test )
y_train_v = y_train.copy()
y_test_v = y_test.copy()
머신러닝 분류기
1. RandomForest분류기
from sklearn.ensemble import RandomForestClassifier
rfc = RandomForestClassifier()
rfc.fit(x_train_v, y_train_v)
print(f'RandomForestClassifier 성능 : { rfc.score(x_test_v, y_test_v) }')
2. SGD분류기
from sklearn.linear_model import SGDClassifier
sgdc = SGDClassifier()
sgdc.fit(x_train_v, y_train_v)
print(f'SGDClassifier 성능 : { sgdc.score(x_test_v, y_test_v) }')
3. 예측
predict = sgdc.predict(x_test_v[:1])
print(predict)
LSTM 방식
데이터 전처리는 동일하고, 토큰화부터 다름.
문자열 데이터의 벡터화
1. 빈도수 고려하지 않는 전체 단어의 토큰화
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
tokenizer = Tokenizer()
tokenizer.fit_on_texts(x_train)
토큰화 된 결과 조회
단어 : index
index : 단어
단어별 빈도
총 단어 개수 : len(tokenizer.index_word)
2. 문장을 숫자로 표현 : texts to sequences
x_train_seq = tokenizer.texts_to_sequences(x_train)
x_test_seq = tokenizer.texts_to_sequences(x_test)
3. 문장을 표현하는 배열의 길이를 통일하기 위해 빈 데이터를 추가 : Padding
- 최대 길이 확인
max_len = max(len(line) for line in x_train_seq)
- 최대 길이로 padding
x_train_pad = pad_sequences(x_train_seq, maxlen=max_len)
x_test_pad = pad_sequences(x_test_seq, maxlen= max_len )
LSTM 모델링
1. 기본 코드
from tensorflow.keras.layers import Dense, Flatten, Conv1D, MaxPool2D
from tensorflow.keras.layers import Embedding, Bidirectional, LSTM, SimpleRNN, GRU
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
max_words = 47646 + 1 # 총 단어 갯수 + padding 0 번호
max_len = 42 # 최대 문장 길이
embedding_dim = 64 # embedding 차원
model.add(Embedding(max_words, embedding_dim, input_length=max_len))
model.add(LSTM(16, return_sequences=True))
model.add(LSTM(16, return_sequences=True))
model.add(Flatten())
model.add(Dense(128, activation='swish'))
model.add(Dense(32, activation='swish'))
model.add(Dense(2, activation='softmax'))
model.compile(loss = 'sparse_categorical_crossentropy',
optimizer = 'adam',
metrics = ['accuracy'])
model.summary()
# 조기종료 콜백함수 정의(EarlyStopping)
es = EarlyStopping(monitor='val_loss', patience=10, verbose=1)
# 체크포인트 저장(EarlyStopping)
checkpoint_path = 'tmp_checkpoint.keras'
cp = ModelCheckpoint(checkpoint_path, monitor='val_loss', verbose=1, save_best_only=True)
history = model.fit(x_train_pad, y_train, epochs=50, batch_size=512,
validation_data=(x_test_pad, y_test), verbose =1, callbacks=[es, cp])
2. 결과 검토
epochs = range(1, len(history.history['accuracy']) + 1)
plt.plot(epochs, history.history['accuracy'])
plt.plot(epochs, history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'valid'], )
plt.show()
3. 예측
predict = model.predict(x_test_pad[:1])
BERT 로 분류하기
데이터 전처리는 동일
1. 토큰화
형태도 단위로 전체 데이터를 embedding 하는데 시간이 많이 걸리므로 일부만 사용(성능이 되면 더 쓰고..)
data = pd.concat([ data .iloc[:10000], data .iloc[-10000:]])
!pip install konlpy
from konlpy.tag import Okt
okt = Okt()
data ['target_column'] = data [' target_column '].map(lambda x: ' '.join(okt.morphs(x, stem = True)))
2. Train/Test 분할
from sklearn.model_selection import train_test_split
train_df, test_df = train_test_split( data , test_size=0.2, random_state=42)
train_texts = train_df[' target_column '].astype(str).tolist() # 문자열 데이터로 명시 후 리스트 화
train_labels = train_df['label'].tolist()
test_texts = test_df[' target_column '].astype(str).tolist()
test_labels = test_df['label'].tolist()
3. BERT 토크나이저
# !pip install transformers
from transformers import BertTokenizer, BertForSequenceClassification
model_name = 'monologg/kobert'
tokenizer = BertTokenizer.from_pretrained(model_name)
train_encodings = tokenizer(train_texts, truncation=True, padding=True)
test_encodings = tokenizer(test_texts, truncation=True, padding=True)
Data Loader
import torch
from torch.utils.data import DataLoader, Dataset
class CustomDataset(torch.utils.data.Dataset):
def __init__(self, encodings, labels):
self.encodings = encodings
self.labels = labels
def __getitem__(self, idx):
item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
item['labels'] = torch.tensor(self.labels[idx])
return item
def __len__(self):
return len(self.labels)
train_dataset = CustomDataset(train_encodings, train_labels)
test_dataset = CustomDataset(test_encodings, test_labels)
batch_size = 64 # 배치 사이즈는 직접 지정해야 합니다.
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
BERT 모델 훈련
1. 불러오기 - 분류기로 분류 할 것이 2개.
# 0, 1로 분류하기 때문에 레이블은 2개로 지정합니다.
model = BertForSequenceClassification.from_pretrained('monologg/kobert', num_labels=2)
2. 훈련
from tqdm.auto import tqdm
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # GPU 사용이 가능한 경우 설정
start = time.time()
num_epochs = 5
learning_rate = 2e-5 #2e-5는 0.00002
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
criterion = torch.nn.CrossEntropyLoss()
model.to(device) # GPU 사용이 가능한 경우
for epoch in range(num_epochs):
model.train() # 훈련 모드 지정
total_loss = 0
for batch in tqdm(train_loader):
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
labels = batch['labels'].to(device)
optimizer.zero_grad()
outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
loss = outputs.loss
total_loss += loss.item()
loss.backward()
optimizer.step()
average_loss = total_loss / len(train_loader)
print(f"Epoch {epoch+1}/{num_epochs} - Average Loss: {average_loss:.4f}")
print(f'총학습시간: { end - start }')
3. 테스트
model.eval()
correct_predictions = 0
total_predictions = 0
with torch.no_grad():
for batch in test_loader:
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
labels = batch['labels'].to(device)
outputs = model(input_ids, attention_mask=attention_mask)
_, predicted_labels = torch.max(outputs.logits, dim=1)
correct_predictions += torch.sum(predicted_labels == labels).item()
total_predictions += labels.size(0)
accuracy = correct_predictions / total_predictions
print(f"Test Accuracy: {accuracy:.4f}")
4. 추론
input_text = '그냥 넷플릭스에서 보는게 좋았겠다.'
input_encoding = tokenizer.encode_plus(
input_text,
truncation=True,
padding=True,
return_tensors='pt'
)
input_ids = input_encoding['input_ids'].to(device)
attention_mask = input_encoding['attention_mask'].to(device)
model.eval()
with torch.no_grad():
outputs = model(input_ids, attention_mask=attention_mask)
_, predicted_labels = torch.max(outputs.logits, dim=1)
predicted_labels = predicted_labels.item()
print(predicted_labels)
댓글