본문 바로가기

SK네트웍스 Family AI캠프 10기/Daily 회고

46일차. 자연어 딥러닝 - Seq2Seq & Attention

더보기

 

46일 차 회고.

 

 운동을 하려고 했는데 너무 졸려서 공부도 다 못하고 10시에 잤다. 그런데 오늘도 너무 피곤해서 일찍 자야 할 것 같다.

 

 

 

 

1. Seq2Seq

 

 

1-1. Seq2Seq of 번역

 

Setup

!pip install -U torchtext==0.15.2

import numpy as np
import pandas as pd
import torch
import random
import os
from tqdm.auto import tqdm

SEED = 42

def reset_seeds(seed=42):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

device = 'cuda' if torch.cuda.is_available() else 'cpu'

 

Load Data

from google.colab import drive
drive.mount('/content/drive')

data_path = ''
train = pd.read_csv(f'{DATA_PATH}translate_en_ko.csv')

 

Text Preprocessing

  • Cleaning
train['en'] = train['en'].str.replace('[^a-zA-Z0-9 .,!?\'\"]', '', regex=True).str.lower()
train['ko'] = train['ko'].str.replace('[^가-힣0-9 .,!?\'\"]', '', regex=True)
  • Tokenization & Vocabulary
# 한글

!pip install kiwipiepy
from kiwipiepy import Kiwi

kisi=Kiwi()

# 토큰화
result = kiwi.tokenize(train['ko'])

src_data = []
for tokens in result:
    tokens = [t.form for t in tokens]
    src_data.append(tokens)

# 어휘집
from torchtext.vocab import build_vocab_from_iterator

vocab_ko = build_vocab_from_iterator(src_data, specials=['<pad>', '<unk>'])
vocab_ko.set_default_index(vocab_ko['<unk>'])
len(vocab_ko)
# 3250

# 토큰 -> index
src_data = [vocab_ko(tokens) for tokens in src_data]
# 영어

from torchtext.data.utils import get_tokenizer

tokenizer = get_tokenizer('basic_english')

sos_token = '<sos>'
eos_token = '<eos>'

# 토큰화
tgt_data = []
for text in train['en']:
    tokens = [sos_token] + tokenizer(text) + [eos_token]
    tgt_data.append(tokens)

# 어휘집
from torchtext.vocab import build_vocab_from_iterator

vocab_en = build_vocab_from_iterator(tgt_data.specials=['<pad>', '<unk>', sos_token, eos_token])
vocab_en.set_default_index(vocab_en['<unk>'])
len(vocab_en)
# 3129

# 토큰 -> index
tgt_data = [vocab_en(tokens) for tokens in tgt_data]

 

Dataset

from torch.utils.data import Dataset

class TranslateDataset(Dataset):
    def __init__(self, src, tgt):
        self.src = src
        self.tgt = tgt
    
    def __len__(self):
        return len(self.src)
    
    def __getitem__(self, idx):
        item = {}
        item['src'] = torch.tensor(self.src[idx])
        item['tgt'] = torch.tensor(self.tgt[idx])
        
        return item

dt = TranslateDataset(src_data, tgt_data)
len(dt)
# 5794

t[0]['src'].shape, dt[0]['tgt'].shape
# (torch.Size([10]), torch.Size([10]))

t[-1]['src'].shape, dt[-1]['tgt'].shape
# (torch.Size([5]), torch.Size([6]))
  • Padding
    • torch.nn.utils.rnn.pad_sequence
def collate_fn(samples):
    src = [sample['src'] for sample in samples]
    tgt = [sample['tgt'] for sample in samples]
    
    src = torch.nn.utils.rnn.pad_sequence(src, batch_first=True)
    tgt = torch.nn.utils.rnn.pad_sequence(tgt, batch_first=True)
    
    return {'src': src, 'tgt': tgt}

 

DataLoader

from torch.utils.data import DataLoader

dl = DataLoader(dataset=dt, batch_size=2, shuffle=False, collate_fn=collate_fn)
batch = next(iter(dl))

 

Model

  • Encoder
    • 한국어를 학습하는 모델
      • Input: [batch, seq_len]
      • Output: [n_layers, batch, n_hidden]
class Encoder(torch.nn.Module):
    def __init__(self, vocab_ko_len, emb_dim, device):
        super().__init__()
        self.n_hidden = emb_dim * 2
        self.n_layers = 1
        self.device = device
        
        self.emb_layer = torch.nn.Embedding(
            num_embeddings=vocab_ko_len,
            embedding_dim=emb_dim
        )
        self.rnn_layer = torcch.nn.LSTM(
            input_size=emb_dim,
            hidden_size=self.n_hidden,
            batch_first=True,
            num_layers=self.n_layers
        )
    
    def forward(self, x):
        emb_out = self.emb_layer(x)
        
        init_hidden = torch.zeros(self.n_layers, x.shape[0], self.n_hidden).to(self.device)
        init_cell = torch.zeros(self.n_layers, x.shape[0], self.n_hidden).to(self.device)
        _, (hn, cn) = self.rnn_layer(emb_out, (init_hidden, init_cell))
        
        return hn, cn

encoder = Encoder(
    vocab_len_ko=len(vocab_ko),
    emb_dim=64,
    device=device
).to(device)

batch['src'].shape
# torch.Size([2, 10])

hn, cn = encoder(batch['src'].to(device))
hn.shape, cn.shape
# (torch.Size([1, 2, 128]), torch.Size([1, 2, 128]))
  • Decoder
    • 영어 문장을 예측하는 모델
      • Input: [batch, seq_len]
      • Output
        • Prediction: [batch, target_size]
        • Hidden State: [n_layers, batch, n_hidden]
        • Cell State: [n_layers, batch, n_hidden]
class Decoder(torch.nn.Module):
    def __init__(self, vocab_en_len, emb_dim):
        super().__init__()
        
        self.emb_layer = torch.nn.Embedding(
            num_embeddings=vocab_en_len,
            embedding_dim=emb_dim
        )
        self.rnn_layer = torch.nn.LSTM(
            input_size=emb_dim,
            hidden_size=emb_dim*2,
            batch_first=True
        )
        self.fc_layer = torch.nn.Linear(
            in_features=emb_dim*2,
            out_features=vocab_en_len
        )
    
    def forward(self, x, encoder_hn, encoder_cn):
        emb_out = self.emb_layer(x)
        
        outputs, (out_hn, out_cn) = self.rnn_layer(emb_out, (encoder_hn, encoder_cn))
        prediction = self.fc_layer(out_hn[-1])
        
        return prediction, out_hn, out_cn

decoder = Decoder(
    vocab_en_len=len(vocab_en),
    emb_dim=64
).to(device)

tgt = batch['tgt'][:, 0].view(-1, 1)
tgt.shape
# torch.Size([2, 1])

pred, hn, cn = decoder(tgt.to(device), hn, cn)
pred.shape, hn.shape, cn.shape
# (torch.size([2, 3129]), torch.Size([1, 2, 128]), torch.Size([1, 2, 128]))
  • Seq2Seq Model
class Net(torch.nn.Module):
    def __init__(self, vocab_size_src, vocab_size_tgt, emb_dim=64, device='cpu'):
        super().__init__()
        
        self.vocab_size_tgt = vocab_size_tgt
        self.device = device
        
        self.encoder = Encoder(vocab_size_src, emb_dim, self.device)
        self.decoder = Decoder(vocab_size_tgt, emb_dim)
    
    def forward(self, src, tgt, hn=None, cn=None, teacher_forcing_ratio=0.5):
        batch_size = tgt.shape[0]
        tgt_len = tgt.shape[1]
        
        prediction = torch.zeros(batch_size, tgt_len, self.vocab_size_tgt).to(self.device)
        
        if hn is None:
            hn, cn = self.encoder(src)
        
        dec_input = tgt[:, 0].view(-1, 1)
        for t in range(1, tgt_len):
            output, hn, cn = self.decoder(dec_input, hn, cn)
            prediction[:, t] = output
            dec_input = output.argmax(1).view(-1, 1)
            
            if random.random() < teacher_forcing_ratio:
                dec_input = tgt[:, t].view(-1, 1)
        
        return prediction, hn, cn

model = Net(
    vocab_size_src=len(vocab_ko),
    vocab_size_tgt=len(vocab_ko)
)
pred, hn, cn = model(batch['src'], batch['tgt'])
pred.shape, hn.shape, cn.shape
# (torch.Size([2, 10, 3129]), torch.Size([1, 2, 128]), torch.Size([1, 2, 128]))

 

Engine

def train_loop(model, dataloader, loss_fn, optimizer, device):
    model.train()
    
    epoch_loss = 0
    for batch in tqdm(dataloader, desc='train loop', leave=False):
        src = batch['src'].to(device)
        tgt = batch['tgt'].to(device)
        pred, _, _ = model(src, tgt)
        
        num_class = pred.shape[-1]
        pred = pred.view(-1, num_class)
        tgt = tgt.flatten()
        
        mask = tgt > 2
        tgt = tgt[mask]
        pred = pred[mask]
        loss = loss_fn(pred, tgt)
        
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1)
        optimizer.step()
        
        epoch_loss += loss.item()
    
    epoch_loss /= len(dataloader)
    
    return epoch_loss

 

Training

from tqdm.auto import tqdm

reset_seeds(SEED)

model = Net(
    vocab_size_src=len(vocab_ko),
    vocab_size_tgt=len(vocab_en),
    device=device
).to(device)
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
batch_size = 64

train_dt = TranslateDataset(
    src=src_data,
    tgt=tgt_data
)
train_dl = torch.utils.data.DataLoader(
    dataset=train_dt,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=collate_fn
)

epochs = 300
train_loss_list = []

for _ in tqdm(range(epochs), desc='epochs'):
    train_loss = train_loop(
        model=model,
        dataloader=train_dl,
        loss_fn=loss_fn,
        optimizer=optimizer,
        device=device
    )
    train_loss_list.append(train_loss)
import seaborn as sns
import matplotlib.pyplot as plt

sns.lineplot(train_loss_list)

 

Prediction

@torch.no_grad()
def translate(model, text, vocab_src, vocab_tgt, tgt_max_len, device):
    model.eval()
    
    src = vocab_src([t.form for t in kiwi.tokenize(text)])
    src = torch.tensor(src).view(1, -1).to(device)
    
    tgt = [2, 0]
    tgt = torch.tensor(tgt).view(1, -1).to(device)
    
    hn = None
    cn = None
    for _ in range(tgt_max_len):
        pred, hn, cn = model(
            src=src,
            tgt=tgt,
            hn=hn,
            cn=cn
        )
        char_no = pred[-1, -1].argmax().item()
        
        if char_no == 3:
            break
        
        print(vocab_tgt.lookup_token(char_no), end=' ')
        
        tgt = [char_no, 0]
        tgt = torch.tensor(tgt).view(1, -1).to(device)

text = train['ko'][42]
text
# '언어가 좋습니다.'

translate(
    model=model,
    text=text,
    vocab_src=vocab_ko,
    vocab_tgt=vocab_en,
    tgt_max_len=tgt_max_len,
    device=device
)
# i like languages .

target = train['en'][42]
target
# 'i like languades.'

 

 

 

2. Attention

 

 

2-1. Seq2Seq의 한계

 

  • 입력 시퀀스 전체에 대한 세밀한 문맥 정보 파악이 어렵다.
  • 입력 시퀀스의 모든 정보를 하나의 컨텍스트 벡터에 압축하면 정보의 손실이 발생할 수 있다.
  • RNN 구조 특성상, 긴 시퀀스를 처리할 때 Gradient Vanishing/Exploding 문제가 발생할 수 있다.

 

 

2-2. Attention Mechanism

 

가정

  • 디코더가 단어 X를 출력하기 직전의 디코더 은닉 상태는 인코더가 입력 시퀀스에서 단어 X와 연관이 깊은 부분(단어)을 읽은 직후의 인코더 은닉 상태와 유사할 것이다.

 

Attention 동작 방법

  • 디코더의 현재 Hidden State(Query)와 인코더의 각 Hidden State(Keys) 간의 유사도를 Dot Product로 계산하여 Attention Score를 구한다.
      • Cos Similarity: $A \cdot B = \|A\| \|B\| \cos(\theta)$
  • Attention Score에 Softmax 함수를 적용하여 Attention Distribution을 구한다.
  • Attention Distribution을 인코더의 각 Hidden State(Keys 또는 Values)에 곱한 뒤, 이를 가중합하여 Attention Value를 구한다.
    • Attention Value를 통해 입력 시퀀스 중 어떤 부분(단어)에 집중해야 하는지를 학습한다.
  • Attention Value와 디코더의 현재 시점 Hidden State(Query)를 연결(Concatenate)한다.
  • 연결된 벡터를 Dense Layer에 통과시킨 후, Softmax를 적용하여 최종 출력 확률 분포를 계산한다.

 

 

2-3. Seq2Seq and Attention of 번역

 

Setup

!pip install -U numpy==1.24.1 torchtext==0.15.2 kiwipiepy
from google.colab import drive

drive.mount('/content/drive')

data_path = ''
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt

n_step = 5
n_hidden = 128

 

Data

  • Create Data
sentences = ['ich mochte ein bier P', 'S i want a beer', 'i want a beer E']
len(sentences)
# 3
  • Data Tokenization
word_list = ' '.join(sentences).split()
word_list = list(set(word_list))
len(word_list)
# 11
  • Data Dictionary
word_dict = {w: i for i, w in enumerate(word_list)}
number_dict = {i: w for i, w in enumerate(word_list)}
n_class = len(word_dict)
n_class
# 11

 

Dataset

def make_batch():
    input_batch = [np.eye(n_class)[[word_dict[n] for n in sentences[0].split()]]]
    output_batch = [np.eye(n_class)[[word_dict[n] for n in sentences[1].split()]]]
    target_batch = [[word_dict[n] for n in sentences[2].split()]]
    
    return torch.FloatTensro(input_batch), torch.FloatTensor(output_batch), torch.LongTensor(target_batch)
# Debugging

input, output, target = make_batch()
input.shape, output.shape, target.shape
# (torch.Size([1, 5, 11]), torch.Size([1, 5, 11]), torch.Size([1, 5]))

 

Model

class Attention(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.enc_cell = nn.RNN(
            input_size=n_class,
            hidden_size=n_hidden,
            dropout=0.5
        )
        self.dec_cell = nn.RNN(
            input_size=n_class,
            hidden_size=n_hidden,
            dropout=0.5
        )
        
        self.attn = nn.Linear(
            in_features=n_hidden,
            out_features=n_hidden
        )
        
        self.fc = nn.Linear(
            in_features=n_hidden * 2,
            out_features=n_class
        )
    
    def forward(self, enc_inputs, hidden, dec_inputs):
        enc_inputs = enc_inputs.transpose(0, 1)
        dec_inputs = dec_inputs.transpose(0, 1)
        
        enc_outputs, enc_hidden = self.enc_cell(enc_inputs, hidden)
        
        trained_attn = []
        hidden = enc_hidden
        n_step = len(dec_inputs)
        response = torch.empty([n_step, 1, n_class])
        
        for i in range(n_step):
            dec_output, hidden = self.dec_cell(dec_inputs[i].unsqueeze(0), hidden)
            
            attn_weights = self.get_att_weight(dec_output, enc_outputs)
            trained_attn.append(attn_weights.squeeze().data.numpy())
            
            attn_values = attn_weights.bmm(enc_outputs.transpose(0, 1))
            
            dec_output = dec_output.squeeze(0)
            
            attn_values = attn_values.squeeze(1)
            cat_output = torch.cat((dec_output, attn_values), 1)
            
            response[i] = self.fc(cat_output)
        
        return response.transpose(0, 1).squeeze(0, trained_attn
    
    def get_att_weight(self, dec_output, enc_outputs):
        n_step = len(enc_outputs)
        attn_scores = torch.zeros(n_step)
        
        for i in range(n_step):
            attn_scores[i] = self.get_att_score(dec_output, enc_outputs[i])
        
        return F.softmax(attn_scores).view(1, 1, -1)
    
    def get_att_score(self, dec_output, enc_output):
        score = self.attn(enc_output)
        
        return torch.dot(dec_output.view(-1), score.view(-1))

 

Training

hidden = torch.zeros(1, 1, n_hidden)

model = Attention()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

input_batch, output_batch, target_batch = make_batch()

for epoch in range(2000):
    output, _ = model(
        enc_inputs=input_batch,
        hidden=hidden,
        dec_inputs=output_batch
    )
    
    loss = criterion(output, target_batch.squeeze(0))
    
    if (epoch + 1) % 400 == 0:
        print('Epoch: ', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

"""
Epoch: 0400 cost = 0.000484
Epoch: 0800 cost = 0.000157
Epoch: 1200 cost = 0.000077
Epoch: 1600 cost = 0.000046
Epoch: 2000 cost = 0.000030
"""

 

Prediction

test_batch = [np.eye(n_class)[[word_dict[n] for n in 'SPPPP']]]
test_batch = torch.FloatTensor(test_batch)

predict, trained_attn = model(
    enc_inputs=input_batch,
    hidden=hidden,
    dec_inputs=test_batch
)
predict = predict.data.max(1, keepdim=True)[1]

print(sentences[0], '->', [number_dict[n.item()] for n in predict.squeeze()])
# ich mochte ein bier P -> ['i', 'want', 'a', 'beer', 'E']
fig = plt.figure(figsize=(5, 5))
ax = fig.add_subplot(1, 1, 1)
ax.matshow(trained_attn, cmap='viridis')
ax.set_xticklabels([''] + sentences[0].split(), fontdict={'fontsize': 14})
ax.set_yticklabels([''] + sentences[2].split(), fontdict={'fontsize': 14})
plt.show()