본문 바로가기

SK네트웍스 Family AI캠프 10기/Daily 회고

44일차. 자연어 딥러닝 - LSTM

더보기

 

44일 차 회고.

 

 오늘 오후 4시에 ADsP의 사전 점수가 공개되었다. 64점으로 합격 예정이 떴다. 빅데이터분석기사도 조금씩 공부하고 있긴 한데 예상보다 범위가 넓어서 부지런히 공부해야 할 것 같다. 그리고 주말 동안 복습도 해야 하지만 월요일에 마감인 공고들이 있어서 이에 대한 자소서도 써야 한다.

 

 

 

 

1. LSTM

 

 

1-1. 시계열 분석

 

시계열 데이터

  • 시간의 흐름에 따라 순서대로 관측되어 시간의 영향을 받게 되는 데이터
    • 순차적으로 발생한 연속적인 관측치는 서로 상관관계를 맺고 있다.
  • 시계열이 갖고 있는 법칙성을 발견하여 이를 모형화하고 추정된 모형을 통해 미래의 값을 예측하기 위해 분석한다.

 

 

1-2. LSTM Dataset

 

Load Data

import numpy as np
import pandas as pd
from pandas_datareader import data as web

df = web.DataReader(
    name='000660',
    data_source='naver',
    start='2024-01-01',
    end='2024-12-31'
)
df.shape
# (244, 5)

df.info()
"""
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 244 entries, 2024-01-02 to 2024-12-30
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   Open    244 non-null    object
 1   High    244 non-null    object
 2   Low     244 non-null    object
 3   Close   244 non-null    object
 4   Volume  244 non-null    object
dtypes: object(5)
memory usage: 11.4+ KB
"""
  • 데이터 형변환
df = df.astype(int)
df.info()
"""
<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 244 entries, 2024-01-02 to 2024-12-30
Data columns (total 5 columns):
 #   Column  Non-Null Count  Dtype
---  ------  --------------  -----
 0   Open    244 non-null    int64
 1   High    244 non-null    int64
 2   Low     244 non-null    int64
 3   Close   244 non-null    int64
 4   Volume  244 non-null    int64
dtypes: int64(5)
memory usage: 11.4 KB
"""
  • Scaling
data = df.to_numpy()
data.shape
# (244, 5)

data_min = data.min(axis=0)
data_max = data.max(axis=0)
data_size = data_max - data_min
scaled_data = (data - data_min) / data_size
  • 시계열 데이터 생성
input_size = 7
pred_size = 2

for i in range(input_size, len(scaled_data) - pred_size + 1):
    _feature = scaled_data[i - input_size : i]
    _target = scaled_data[i : i + pred_size, 3]
# for i in range(0, len(scaled_data) - input_size - pred_size + 1):
#     _feature = scaled_data[i : i + input_size]
#     _target = scaled_data[i + input_size : i + input_size + pred_size, 3]

_feature.shape, _target.shape
# ((7, 5), (2,))

 

Dataset

import torch
from torch.utils.data import Dataset

class StockDataset(Dataset):
    def __init__(self, df_stock:pd.DataFrame, input_size:int, pred_size:int, target_index:int) -> None:
        super().__init__()
        self.target_index = target_index
        self.__convert_time_series_data(self.__transform_min_max_scaling(df_stock), input_size, pred_size)
    
    def __transform_min_max_scaling(self, df_stock:pd.DataFrame):
        np_stock = np.array(df_stock)
        self.stock_min = np_stock.min(axis=0)
        self.stock_size = np_stock.max(axis=0) - self.stock_min
        return (np_stock - self.stock_min) / self.stock_size
    
    def __convert_time_series_data(self, scaled_stock:np.array, input_size:int, pred_size:int):
        features, targets = [], []
        for i in range(input_size, len(scaled_stock) - pred_size + 1):
            feature = scaled_stock[i - input_size : i]
            features.append(feature)
            target = scaled_stock[i : i + pred_size, self.target_index]
            targets.append(target)
        
        self.features = np.array(features)
        self.targets = np.array(targets)
    
    def __len__(self):
        return self.features.shape[0]
    
    def __getitem__(self, index:int):
        return torch.Tensor(self.features[index]), torch.Tensor(self.targets[index])
    
    def transform_target(self, preds):
        return (preds * self.stock_size[self.target_index]) + self.stock_min[self.target_index]

dataset = StockDataset(df_stock=df, input_size=5, pred_size=2, target_index=3)
len(dataset)
# 238

feature_0, _ = dataset[0]
feature_237, _ = dataset[237]
feature_0.shape, feature_237.shape
# (torch.Size([5, 5]), torch.Size([5, 5]))

 

 

1-3. RNN(Recurrent Neural Networks)

 

Vanilla RNN

  • 짧은 시퀀스를 처리할 때 유리하다.
  • Vanishing Gradient Problem
    • 시퀀스 길이가 길어질수록, 초기 정보가 후반부까지 전달되지 못한다. 

 

LSTM(Long Short Term Memory)

  • Cell State
    • 정보가 시간에 따라 흐르도록 유지되는 메모리 저장 공간
    • 작은 linear interaction만을 적용하여 정보 손실을 최소화한다.
  • Forget Gate
    • 과거 정보를 얼마나 잊을지 결정한다.
      • 결과 값이 1일 경우: 모든 정보를 유지한다.
      • 결과 값이 0일 경우: 모든 정보를 삭제한다.
  • Input Gate
    • 현재 정보를 얼마나 기억할지 결정한다.
  • Output Gate
    • 다음 State로 전달할 Output(Hidden State)을 결정한다.

 

 

1-4. LSTM Model

 

batch_size = 20
seq_len = 10
features = 5

input_data = torch.zeros(batch_size, seq_len, features)
input_data.shape
# torch.Size([20, 10, 5])

 

Embedding Layer

  • NLP(자연어 데이터)를 학습할 때 사용하는 Layer
  • 데이터가 시계열 데이터일 경우 생략한다.

 

LSTM Layer

import torch
from torch import nn

class LSTM_Layer(nn.Module):
    def __init__(self, input_size, n_hidden=64, n_layers=2, is_bidirectional=False) -> None:
        super().__init__()
        
        self.n_hidden = n_hidden
        self.n_layers = n_layers
        self.is_bidirectional = is_bidirectional
        
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=self.n_hidden,
            num_layers=self.n_layers,
            bidirectional=self.is_bidirectional
        )
    
    def forward(self, x):
        x_trans = x.transpose(0, 1)
        
        n_direction = 2 if self.is_bidirectional == True else 1
        init_hidden = torch.zeros(n_direction * self.n_layers, x.shape[0], self.n_hidden)
        init_cell = torch.zeros(n_direction * self.n_layers, x.shape[0], self.n_hidden)
        
        out, (hidden_state, cell_state) = self.lstm(x_trans, (init_hidden, init_cell))
        
        return hidden_state[-1]
  • Input: [batch, seq_len, features]
  • Output: [batch, n_hidden]
# Debugging

lstm_layer = LSTM_Layer(input_size=5)
lstm_out = lstm_layer(input_data)
lstm_out.shape
# torch.Size([20, 64])

 

FC Layer

class FC_Layer(nn.Module):
    def __init__(self, n_hidden, target_size, hidden_size=128) -> None:
        super().__init__()
        
        self.fc = nn.Sequential(
            nn.Linear(in_features=n_hidden, out_features=hidden_size),
            nn.ReLU(),
            nn.Linear(in_features=hidden_size, out_features=target_size)
        )
    
    def forward(self, x):
        return self.fc(x)
  • Input: [batch, n_hidden]
  • Output: [batch, target_size]
# Debugging

fc_layer = FC_Layer(n_hidden=64, target_size=2)
fc_out = fc_layer(lstm_out)
fc_out.shape
# torch.Size([20, 2])

 

LSTM Model

class LSTM_Model(nn.Module):
    def __init__(self, features_size, pred_size, n_hidden=128) -> None:
        super().__init__()
        self.lstm_layer = LSTM_Layer(input_size=features_size, n_hidden=n_hidden)
        self.fc_layer = FC_Layer(n_hidden=n_hidden, target_size=pred_size)
    
    def forward(self, x):
        lstm_out = self.lstm_layer(x)
        fc_out = self.fc_layer(lstm_out)
        return fc_out
# Debugging

!pip install torchinfo
import torchinfo

lstm_model = LSTM_Model(features_size=5, pred_size=2)
torch.info.summary(
    model=lstm_model,
    input_size=(20, 10, 5),
    col_names=['input_size', 'output_size', 'num_params']
)
"""
===================================================================================================================
Layer (type:depth-idx)                   Input Shape               Output Shape              Param #
===================================================================================================================
LSTM_Model                               [20, 10, 5]               [20, 2]                   --
├─LSTM_Layer: 1-1                        [20, 10, 5]               [20, 128]                 --
│    └─LSTM: 2-1                         [10, 20, 5]               [10, 20, 128]             201,216
├─FC_Layer: 1-2                          [20, 128]                 [20, 2]                   --
│    └─Sequential: 2-2                   [20, 128]                 [20, 2]                   --
│    │    └─Linear: 3-1                  [20, 128]                 [20, 128]                 16,512
│    │    └─ReLU: 3-2                    [20, 128]                 [20, 128]                 --
│    │    └─Linear: 3-3                  [20, 128]                 [20, 2]                   258
===================================================================================================================
Total params: 217,986
Trainable params: 217,986
Non-trainable params: 0
Total mult-adds (Units.MEGABYTES): 40.58
===================================================================================================================
Input size (MB): 0.00
Forward/backward pass size (MB): 0.23
Params size (MB): 0.87
Estimated Total Size (MB): 1.10
===================================================================================================================
"""

 

Engine

  • Train Step
def train_step(model, dataloader, loss_fn, optimizer, device):
    model.train()
    
    train_loss = 0
    for X, y in dataloader:
        X, y = X.to(device), y.to(device)
        
        pred = model(X)
        
        loss = loss_fn(pred, y)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
    
    train_loss /= len(dataloader)
    
    return train_loss
  • Test Step
@torch.inference_mode()
def test_step(model, dataloader, loss_fn, device):
    model.eval()
    
    test_loss = 0
    target_list, pred_list = [], []
    
    for X, y in dataloader:
        X, y = X.to(device), y.to(device)
        
        pred = model(X)
        
        if y is not None:
            target_list.append(y.detach().to('cpu').numpy())
            
            loss = loss_fn(pred, y)
            test_loss += loss.item()
        
        _pred = pred.detach().to('cpu').numpy()
        pred_list.append(_pred)
    
    test_loss /= len(dataloader)
    
    targets = np.concatenate(target_list)
    preds = np.concatenate(pred_list)
    
    return test_loss, targets, preds
  • Early Stop
class EarlyStopper(object):
    def __init__(self, num_trials, save_path) -> None:
        self.num_trials = num_trials
        self.trial_counter = 0
        self.best_loss = np.inf
        self.save_path = save_path
    
    def is_continuable(self, model, loss):
        if loss < self.best_loss:
            self.best_loss = loss
            self.trial_counter = 0
            torch.save(model, self.save_path)
            return True
        elif self.trial_counter + 1 < self.num_trials:
            self.trial_counter += 1
            return True
        else:
            return False
    
    def get_best_model(self, device):
        return torch.load(self.save_path).to(device)

 

Training

  • K-Fold
from sklearn.model_selection import KFold

cv = KFold(n_splits=5, shuffle=True)
  • Training
from torch.utils.data import DataLoader
from sklearn.metrics import mean_absolute_percentage_error

device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = LSTM_Model(features_size=df.shape[1], pred_size=3)

optimizer = torch.optim.SGD(params=model.parameters())
early_stopper = EarlyStopper(num_trials=5, save_path='./trained_model.pth')
loss_fn = nn.MSELoss()

epochs = 10
batch_size=64

for i, (tri, val) in enumerate(cv.split(df)):
    dt_train = StockDataset(df.iloc[tri], input_size=5, pred_size=3, target_index=3)
    dt_test = StockDataset(df.iloc[val], input_size=5, pred_size=3, target_index=3)
    
    dl_train = DataLoader(dt_train, batch_size=batch_size, shuffle=True)
    dl_test = DataLoader(dt_test, batch_size=batch_size, shuffle=False)
    
    for epoch in range(epochs):
        train_loss = train_step(
            model=model,
            dataloader=dl_train,
            loss_fn=loss_fn,
            optimizer=optimizer,
            device=device
        )
        test_loss, test_target, test_pred = test_step(
            model=model,
            dataloader=dl_train,
            loss_fn=loss_fn,
            device=device
        )
        
        targets = dt_test.transform_target(test_target)
        preds = dt_test.transform_target(test_pred)
        score = mean_absolute_percentage_error(targets, preds)
        
        if not early_stopper.is_continuable(model, score):
            print(f'Cross Validation:{i} >> best score(loss): {early_stopper.best_loss}')
            break

"""
Cross Validation:0 >> best score(loss): 0.19433015543938503
Cross Validation:1 >> best score(loss): 0.19433015543938503
Cross Validation:3 >> best score(loss): 0.1824794301464131
Cross Validation:4 >> best score(loss): 0.1824794301464131
"""

 

Prediction

best_model = torch.load(early_stopper.save_path)

input = torch.Tensor(scaled_data[-5:, :]).unsqueeze(dim=0)
input.shape
# torch.Size([1, 5, 5])

pred = best_model(input)
pred = pred.squeeze()
pred.shape
# (3,)

(pred_data * data_size[3]) + data_min[3]
# array([141901.32877231, 141419.08100247, 133277.66346186])