본문 바로가기

SK네트웍스 Family AI캠프 10기/Daily 회고

43일차. 자연어 딥러닝 - RNN

더보기

 

43일 차 회고.

 

 밥을 먹고 나서 배가 너무 아파서 점심시간 이후로 집중을 제대로 못 했다. 다음부터는 약을 챙겨 다녀야 할 것 같다.

 

 

 

 

1. RNN

 

 

1-1. RNN Model

import torch
from torch import nn
# Debugging
batch_size = 10
seq_len = 300

input = torch.zeros(batch_size, seq_len, dtype=torch.long)
input.shape
# torch.Size([10, 300])

 

Embedding Layer

class Embedding_Layer(nn.Module):
    def __init__(self, vocab_len, emb_dim=128) -> None:
        super().__init__()
        
        self.emb = nn.Embedding(num_embeddings=vocab_len, embedding_dim=emb_dim)
        
    def forward(self, x):
        return self.emb(x)
  • Input: [batch, seq_len]
  • Output: [batch, seq_len, emb_dim]
# Debugging
embedding_layer = Embedding_Layer(500)
emb_out = embedding_layer(input)
emb_out.shape
# torch.Size([10, 300, 128])

 

RNN Layer

class RNN_Layer(nn.Module):
    def __init__(self, emb_dim, n_layer=2, n_hidden=3, is_bidirection=True) -> None:
        super().__init__()
        
        self.emb_dim = emb_dim
        self.n_layer = n_layer
        self.n_hidden = n_hidden
        self.is_bidirection = is_bidirection
        
        self.rnn = nn.RNN(
            input_size=self.emb_dim
            hidden_size=self.n_hidden
            num_layers=self.n_layer,
            bidirectional=self.is_bidirection
        )
    
    def forward(self, x):
        x_trans = x.transpose(0, 1)
        
        n_direction = 2 if self.is_bidirection == True else 1
        init_state = torch.zeros(n_direction * self.n_layer, x.shape[0], self.n_hidden)
        
        out, hidden_state = self.rnn(x_trans, init_state)
        
        return hidden_state[-1]
  • Input: [batch, seq_len, emb_dim]
  • Output: [batch, n_hidden]
# Debugging
rnn_layer = RNN_Layer(emb_dim=128)
rnn_out = rnn_layer(emb_out)
rnn_out.shape
# torch.Size([10, 3])

 

FC Layer

class FC_Layer(nn.Module):
    def __init__((self, n_hidden, target_size, hidden_size=128) -> None:
        super().__init__()
        
        self.fc = nn.Sequential(
            nn.Linear(in_features=n_hidden, out_features=hidden_size),
            nn.ReLU(),
            nn.Linear(in_features=hidden_size, out_features=target_size)
        )
    
    def forward(self, x):
        return self.fc(x)
  • Input: [batch, n_hidden]
  • Output: [batch, target_size]
# Debugging
fc_layer = FC_Layer(n_hidden=3, target_size=1)
fc_out = fc_layer(rnn_out)
fc_out.shape
# torch.Size([10, 1])

 

RNN Model

class RNN_Model(nn.Module):
    def __init__(self, vocab_len, target_size, emb_dim=128, n_hidden=32) -> None:
        super().__init__()
        self.embedding_layer = Embedding_Layer(vocab_len=vocab_len, emb_dim=emb_dim)
        self.rnn_layer = RNN_Layer(emb_dim=emb_dim, n_hidden=n_hidden)
        self.fc_layer = FC_Layer(n_hidden=n_hidden, target_size=target_size)
    
    def forward(self, x):
        emb_out = self.embedding_layer(x)
        rnn_out = self.rnn_layer(emb_out)
        fc_out = self.fc_layer(rnn_out)
        return fc_out
# Debugging
!pip install torchinfo
import torchinfo

rnn_model = RNN_Model(vocab_len=500, target_size=1)
torchinfo.summary(
    model=rnn_model,
    input_size=(10, 300),
    dtypes=[torch.long],
    col_names=['input_size', 'output_size', 'num_params'])
)
"""
===================================================================================================================
Layer (type:depth-idx)                   Input Shape               Output Shape              Param #
===================================================================================================================
RNN_Model                                [10, 300]                 [10, 1]                   --
├─Embedding_Layer: 1-1                   [10, 300]                 [10, 300, 128]            --
│    └─Embedding: 2-1                    [10, 300]                 [10, 300, 128]            64,000
├─RNN_Layer: 1-2                         [10, 300, 128]            [10, 32]                  --
│    └─RNN: 2-2                          [300, 10, 128]            [300, 10, 64]             16,640
├─FC_Layer: 1-3                          [10, 32]                  [10, 1]                   --
│    └─Sequential: 2-3                   [10, 32]                  [10, 1]                   --
│    │    └─Linear: 3-1                  [10, 32]                  [10, 128]                 4,224
│    │    └─ReLU: 3-2                    [10, 128]                 [10, 128]                 --
│    │    └─Linear: 3-3                  [10, 128]                 [10, 1]                   129
===================================================================================================================
Total params: 84,993
Trainable params: 84,993
Non-trainable params: 0
Total mult-adds (Units.MEGABYTES): 50.60
===================================================================================================================
Input size (MB): 0.02
Forward/backward pass size (MB): 4.62
Params size (MB): 0.34
Estimated Total Size (MB): 4.98
===================================================================================================================
"""

 

 

1-2. RNN with HPO

 

Load Data

from google.colab import drive
drive.mount('/content/data')
import numpy as np
import pandas as pd

data_path = ''
data = pd.read_csv(data_path + 'ratings_train.txt', sep='\t')
data = data[:100000]
data.shape
(100000, 3)

 

Cleaning

data.dropna(inplace=True)

data['document'] = data['document'].map(
    lambda x: x.replace('..', '').replace('ㅡㅡ', '').replace('ㅠ.ㅠ', '')\
    .replace('ㅋ', '').replace('ㅎ', '').replace('~', '').replace('^^', '')
)

 

Tokenization

!pip install kiwipiepy
  • Tokenizer 생성
from kiwipiepy import Kiwi

kiwi = Kiwi()
  • Stemming / Stopword
from kiwipiepy.utils import Stopwords

stopwords = Stopwords()

def tokenizer(text):
    tokens = kiwi.tokenize(text, stopwords=stopwords)
    
    return [t.form for t in tokens if t.tag[0] in 'NJMV']
  • 어휘집
!pip install -U torchtext==0.15.2
from torchtext.vocab import build_vocab_from_iterator
from tqdm.auto import tqdm

def yield_tokens(data, tokenizer):
    for text in tqdm(data):
        yield tokenizer(text)

gen = yield_tokens(data['document'], tokenizer)
vocab = build_vocab_from_iterator(gen, specials=['<pad>', '<unk>'])
vocab.set_default_index(vocab['<unk>'])
len(vocab)
# 35735

 

Dataset Class

import torch
from torch.utils.data import Dataset

class ReviewDataset(Dataset):
    def __init__(self, vocab, tokenizer, features, max_len=None, targets=None) -> None:
        super().__init__()
        
        self.max_len = max_len
        self.targets = np.array(targets).reshape(-1, 1)
        self.do_tokenize(vocab, tokenizer, features)
    
    def do_tokenize(self, vocab, tokenizer, features):
        _features = [vocab(tokenizer(text)) for text in tqdm(features)]
        
        if self.max_len is None:
            self.max_len = max([len(token) for token in _features])
            self.features = np.array([
                token + vocab(['<pad>']) * (self.max_len - len(token))
                if len(token) <= self.max_len else token[:self.max_len]
                for token in tqdm(_features)
            ])
    
    def __len__(self):
        return self.features.shape[0]
    
    def __getitem__(self, index:int):
        feature = torch.LongTensor(self.features[index])
        target = None
        
        if self.targets is not None:
            target = torch.Tensor(self.targets[index])
        
        return feature, target

dt = ReviewDataset(vocab=vocab, tokenizer=tokenizer, features=data['document'].tolist(), targets=data['label'].tolist())
len(dt)
# 100000

 

Engine

  • Train Step
from sklearn.metrics import accuracy_score

def train_step(model, dataloader, loss_fn, optimizer, device):
    model.train()
    
    train_loss, train_acc = 0, 0
    
    sig = torch.nn.Sigmoid()
    for X, y in tqdm(dataloader, desc='train step', leave=False):
        X, y = X.to(device), y.to(device)
        
        pred = model(X)
        
        loss = loss_fn(pred, y)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        
        pred_ = sig(pred)
        pred_ = pred_.to('cpu').detach().numpy()
        pred_ = (pred_ > 0.5).astype(int)
        train_acc += accuracy_score(y.to('cpu').numpy(), pred_)
    
    train_loss /= len(dataloader)
    train_acc /= len(dataloader)
    
    return train_loss, train_acc
  • Test Step
@torch.inference_mode()
def test_step(model, dataloader, loss_fn, device):
    model.eval()
    
    test_loss, test_acc = 0, 0
    pred_list = []
    
    sig = torch.nn.Sigmoid()
    for X, y in tqdm(dataloader, desc='test step', leave=False):
        X, y = X.to(device), y.to(device)
        
        pred = model(X)
        
        pred_ = sig(pred)
        pred_ = pred_.to('cpu').numpy()
        pred_list.append(pred_)
        
        if y is not None:
            loss = loss_fn(pred, y)
            test_acc += accuracy_score(y.to('cpu').numpy(), pred_)
    
    test_loss /= len(dataloader)
    test_acc /= len(dataloader)
    
    return test_loss, test_acc
  • Early Stop
class EarlyStopper(object):
    def __init__(self, num_trials, save_path) -> None:
        self.num_trials = num_trials
        self.trial_counter = 0
        self.best_loss = np.inf
        self.save_path = save_path
    
    def is_continuable(self, model, loss):
        if loss < self.best_loss:
            self.best_loss = loss
            self.trial_counter = 0
            torch.save(model, self.save_path)
            return True
        elif self.trial_counter + 1 < self.num_trials:
            self.trial_counter += 1
            return True
        else:
            return False
    
    def get_best_model(self, device):
        return torch.load(self.save_path).to(device)
  • Plot Loss
import matplotlib.pyplot as plt

def plot_loss_curves(results):
    loss = results['train_loss']
    test_loss = results['test_loss']
    
    accuracy = results['train_acc']
    test_accuracy = results['test_acc']
    
    epochs = range(len(results['train_loss']))
    
    plt.figure(figsize=(15, 7))
    
    plt.subplot(1, 2, 1)
    plt.plot(epochs, loss, label='Train Loss')
    plt.plot(epochs, test_loss, label='Test Loss')
    plt.title('Loss')
    plt.xlabel('Epochs')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(epochs, accuracy, label='Train Accuracy')
    plt.plot(epochs, test_accuracy, label='Test Accuracy')
    plt.title('Accuracy')
    plt.xlabel('Epochs')
    plt.legend()
  • Main
def main(
        model:torch.nn.Module,
        train_dataloader:torch.utils.data.DataLoader,
        test_dataloader:torch.utils.data.DataLoader,
        optimizer:torch.optim.Optimizer,
        scheduler,
        early_stopper,
        loss_fn:torch.nn.Module,
        device:str,
        epochs:int=5
):
    results = {
        'train_loss': [],
        'train_acc': [],
        'test_loss': [],
        'test_acc': []
    }
    
    model.to(device)
    
    for epoch in tqdm(range(epochs)):
        train_loss, train_acc = train_step(
            model=model,
            dataloader=train_dataloader,
            loss_fn=loss_fn,
            optimizer=optimizer,
            device=device
        )
        test_loss, test_acc = test_step(
            model=model,
            dataloader=test_dataloader,
            loss_fn=loss_fn,
            optimizer=optimizer,
            device=device
        )
        
        scheduler.step()
        
        print(
            f'Epoch: {epoch + 1} | '
            f'train_loss: {train_loss:.4f} | '
            f'train_acc: {train_acc:.4f} | '
            f'test_loss: {test_loss:.4f} | '
            f'test_acc: {test_acc:.4f}'
        )
        
        results['train_loss'].append(train_loss)
        results['train_acc'].append(train_acc)
        results['test_loss'].append(test_loss)
        results['test_acc'].append(test_acc)
        
        if not early_stopper.is_continuable(model, test_loss):
            print(f'validation: best loss: {early_stopper.best_loss}')
            break
    
    return results

 

Training

  • KFold
from sklearn.model_selection import StratifiedKFold

cv = StratifiedKFold(n_splits=5, shuffle=True)
  • Training
from torch.utils.data import DataLoader

device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = RNN_Model(vocab_len=len(vocab), target_size=1).to(device)

optimizer = torch.optim.SGD(params=model.parameters(), lr=100)
scheduler = torch.optim.lr_scheduler.CyclidLR(optimizer, base_lr=0.001, max_lr=0.1, step_size_up=5, mode='exp_range', gamma=0.85)
early_stopper = EarlyStopper(num_trials=5, save_path='./trained_model.pth')
loss_fn = nn.BCEWithLogitsLoss()

for tr_index, val_index in cv.split(data, data['label']):
    label_train = data['label'].iloc[tr_index]
    label_valid = data['label'].iloc[val_index]
    
    feature_train = data['document'].iloc[tr_index]
    feature_valid = data['document'].iloc[val_index]
    
    dt_train = ReviewDataset(vocab=vocab, tokenizer=tokenizer, features=feature_train.tolist(), targets=label_train.tolist())
    dt_test = ReviewDataset(vocab=vocab, tokenizer=tokenizer, features=feature_valid.tolist(), targets=label_valid.tolist())
    
    dl_train = DataLoader(dt_train, batch_size=256, shuffle=True)
    dl_test = DataLoader(dt_test, batch_size=256, shuffle=False)
    
    result = main(
        model=model,
        train_dataloader=dl_train,
        test_dataloader=dl_test,
        optimizer=optimizer,
        scheduler=scheduler,
        early_stopper=early_stopper,
        loss_fn=loss_fn,
        device=device,
        epochs=50
    )
plot_loss_curvew(result)