본문 바로가기
카테고리 없음

자연어 처리 - 긍정/부정 평가 분류기

by 바른생활머시마 2024. 9. 19.
728x90

 

노트북 환경 구축

1. 꼭 필요한 것은 아니지만.... 한글 데이터 처리를 위한 폰트 (나눔 글꼴의 예)

!sudo apt-get install -y fonts-nanum
!sudo fc-cache -fv
!rm ~/.cache/matplotlib -rf

 

 

데이터 불러오기

1. 인터넷에 공개 되어 있는 텍스트 파일 다운로드

import urllib.request

urllib.request.urlretrieve("source_url", filename="local_file_name")

 

2. git에 올려져 있는 데이터 clone하기

!git clone git_repo_url.git

 

3. pandas를 이용한 csv 파일 불러오기

import pandas as pd
final_data = pd.read_csv("file_path", delimiter='구분자, 탭이면 \t, comma면 무시해도 됨', quoting=불러 올 컬럼 수)

 

 

데이터 정리

1. Null 정리

- null인 데이터 개수가 몇 개 되지 않으면 그 행을 제거

data.dropna(inplace=True)

 

2. 문자열 컬럼의 양끝의 불필요한 공백 제거(strip)

data['target_column'] = data[' target_column '].str.strip()

 

3. 중복 데이터 제거

그냥 제거하지 말고, 중복이 얼마나 되어 있는지 체크하고 지울만하면 지우도록 한다.

data[' target_column'].duplicated().sum()

 

전체 데이터 개수 대비 삭제 해도 될만한 수량이면 삭제, 큰 비중이라면 데이터 출처에 문의를 해봐야 할 듯.

처리 단계별로 dataframe을 유지하려면 inplace는False로 두고, 처리 된 결과를 새로운 dataframe으로 받도록 함.

data.drop_duplicates(subset=[' target_column'], inplace=True)

 

4. 불필요한 문자 제거 

정규식을 이용한 처리.

- 조건에 맞는 문자열을 아무것도 없는 데이터로 교체하는 방법

data['target_column'].replace('[^가-힣 ]','', regex=True)

또는

data[' target_column'] = data[' target_column'].str.replace('[^가-힣 ]','', regex=True)

 

5. 분류 할 데이터의 비율 검토

- value_counts로 확인

data [' target_column'].value_counts()

 

 

데이터 분리

1. Feature/ Label 분리

- 단순하게 dataframe에서 특정 컬럼을 지정하여 분리하는 방법

features = data['feature_column']
labels = data['label']

 

- 특정 컬럼과 그 나머지로 분리하는 방법

 . 특정 컬럼은 1번과 동일하게 컬럼명으로 얻고, 나머지는 drop을 한 결과를 받으면 됨.inplace에 주의

df.drop(column_name 또는 column_name의 list, axis=1, inplace=False)

 

2. train/ test 데이터 분리

from sklearn.model_selection import train_test_split

x_train, x_test, y_train, y_test = train_test_split(features, labels , test_size=0.2, random_state=41)

label의 비율을 유사하게 하려면 stratify 컬럼으로 label 지정

x_train, x_test, y_train, y_test = train_test_split(features, labels , test_size=0.2, stratify=labels, random_state=41)

 

 

문자열 데이터의 벡터화

1. TF-IDF를 이용한 단어의 index 부여(토큰화)

from sklearn.feature_extraction.text import TfidfVectorizer

tfidf = TfidfVectorizer()
x_train_v = tfidf.fit_transform( x_train )
x_test_v = tfidf.transform( x_test )

y_train_v = y_train.copy()
y_test_v = y_test.copy()

 

머신러닝 분류기

1. RandomForest분류기

from sklearn.ensemble import RandomForestClassifier

rfc = RandomForestClassifier()
rfc.fit(x_train_v, y_train_v)
print(f'RandomForestClassifier 성능 : { rfc.score(x_test_v, y_test_v) }')

 

2. SGD분류기

from sklearn.linear_model import SGDClassifier

sgdc = SGDClassifier()
sgdc.fit(x_train_v, y_train_v)
print(f'SGDClassifier 성능 : { sgdc.score(x_test_v, y_test_v) }')

 

3. 예측

predict = sgdc.predict(x_test_v[:1])
print(predict)

 

 

 

LSTM 방식

데이터 전처리는 동일하고, 토큰화부터 다름.

문자열 데이터의 벡터화

1. 빈도수 고려하지 않는 전체 단어의 토큰화

import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
 
tokenizer = Tokenizer()
tokenizer.fit_on_texts(x_train)

 토큰화 된 결과 조회

단어 : index

tokenizer.word_index

 

index : 단어

tokenizer.index_word

단어별 빈도

okenizer.word_counts

총 단어 개수 : len(tokenizer.index_word)

 

2. 문장을 숫자로 표현 : texts to sequences

x_train_seq = tokenizer.texts_to_sequences(x_train)
x_test_seq = tokenizer.texts_to_sequences(x_test)

 

3. 문장을 표현하는 배열의 길이를 통일하기 위해 빈 데이터를 추가 : Padding

- 최대 길이 확인

max_len = max(len(line) for line in x_train_seq)

 

- 최대 길이로 padding

x_train_pad = pad_sequences(x_train_seq, maxlen=max_len)
x_test_pad = pad_sequences(x_test_seq, maxlen= max_len )

 

 

LSTM 모델링

1. 기본 코드

from tensorflow.keras.layers import Dense, Flatten, Conv1D, MaxPool2D
from tensorflow.keras.layers import Embedding, Bidirectional, LSTM, SimpleRNN, GRU
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
 
max_words = 47646 + 1    # 총 단어 갯수 + padding 0 번호
max_len = 42             # 최대 문장 길이
embedding_dim = 64      # embedding 차원
 
model = Sequential()
 
model.add(Embedding(max_words, embedding_dim, input_length=max_len))
 
model.add(LSTM(16, return_sequences=True))
model.add(LSTM(16, return_sequences=True))
model.add(Flatten())
model.add(Dense(128, activation='swish'))
model.add(Dense(32, activation='swish'))
model.add(Dense(2, activation='softmax'))

model.compile(loss = 'sparse_categorical_crossentropy',
              optimizer = 'adam',
              metrics = ['accuracy'])
model.summary()
 
# 조기종료 콜백함수 정의(EarlyStopping)
es = EarlyStopping(monitor='val_loss', patience=10, verbose=1)

# 체크포인트 저장(EarlyStopping)
checkpoint_path = 'tmp_checkpoint.keras'
cp = ModelCheckpoint(checkpoint_path, monitor='val_loss', verbose=1, save_best_only=True)
 
history = model.fit(x_train_pad, y_train, epochs=50, batch_size=512,
                      validation_data=(x_test_pad, y_test), verbose =1, callbacks=[es, cp])

 

2. 결과 검토

epochs = range(1, len(history.history['accuracy']) + 1)
plt.plot(epochs, history.history['accuracy'])
plt.plot(epochs, history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'valid'], )
plt.show()

 

3. 예측

predict = model.predict(x_test_pad[:1])

 

 

 

BERT 로 분류하기

데이터 전처리는 동일

 

1. 토큰화

형태도 단위로 전체 데이터를 embedding 하는데 시간이 많이 걸리므로 일부만 사용(성능이 되면 더 쓰고..)

data = pd.concat([ data  .iloc[:10000], data  .iloc[-10000:]])

 

!pip install konlpy

from konlpy.tag import Okt
okt = Okt()
data ['target_column'] = data [' target_column '].map(lambda x: ' '.join(okt.morphs(x, stem = True)))

 

 

2. Train/Test 분할

from sklearn.model_selection import train_test_split

train_df, test_df = train_test_split( data  , test_size=0.2, random_state=42)

train_texts = train_df[' target_column  '].astype(str).tolist() # 문자열 데이터로 명시 후 리스트 화
train_labels = train_df['label'].tolist()
test_texts = test_df[' target_column  '].astype(str).tolist()
test_labels = test_df['label'].tolist()

 

3. BERT 토크나이저

# !pip install transformers

from transformers import BertTokenizer, BertForSequenceClassification

model_name = 'monologg/kobert'
tokenizer = BertTokenizer.from_pretrained(model_name)

train_encodings = tokenizer(train_texts, truncation=True, padding=True)
test_encodings = tokenizer(test_texts, truncation=True, padding=True)

 

Data Loader

import torch
from torch.utils.data import DataLoader, Dataset

class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = CustomDataset(train_encodings, train_labels)
test_dataset = CustomDataset(test_encodings, test_labels)
batch_size = 64  # 배치 사이즈는 직접 지정해야 합니다.
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

 

 

BERT 모델 훈련

1. 불러오기 - 분류기로 분류 할 것이 2개.

# 0, 1로 분류하기 때문에 레이블은 2개로 지정합니다.
model = BertForSequenceClassification.from_pretrained('monologg/kobert', num_labels=2)

 

 

2. 훈련

from tqdm.auto import tqdm

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # GPU 사용이 가능한 경우 설정

start = time.time()

num_epochs = 5
learning_rate = 2e-5 #2e-5는 0.00002
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
criterion = torch.nn.CrossEntropyLoss()
model.to(device) # GPU 사용이 가능한 경우

for epoch in range(num_epochs):
    model.train() # 훈련 모드 지정
    total_loss = 0

    for batch in tqdm(train_loader):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        total_loss += loss.item()

        loss.backward()
        optimizer.step()

    average_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{num_epochs} - Average Loss: {average_loss:.4f}")


print(f'총학습시간: { end - start }')

 

3. 테스트

model.eval()
correct_predictions = 0
total_predictions = 0

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        outputs = model(input_ids, attention_mask=attention_mask)
        _, predicted_labels = torch.max(outputs.logits, dim=1)

        correct_predictions += torch.sum(predicted_labels == labels).item()
        total_predictions += labels.size(0)

accuracy = correct_predictions / total_predictions
print(f"Test Accuracy: {accuracy:.4f}")

 

4. 추론

input_text = '그냥 넷플릭스에서 보는게 좋았겠다.'
input_encoding = tokenizer.encode_plus(
    input_text,
    truncation=True,
    padding=True,
    return_tensors='pt'
)

input_ids = input_encoding['input_ids'].to(device)
attention_mask = input_encoding['attention_mask'].to(device)

model.eval()
with torch.no_grad():
    outputs = model(input_ids, attention_mask=attention_mask)
    _, predicted_labels = torch.max(outputs.logits, dim=1)
predicted_labels = predicted_labels.item()

print(predicted_labels)

 

728x90
반응형

댓글