본문 바로가기

SK네트웍스 Family AI캠프 10기/Daily 회고

55일차. Fine Tuning - DeepSpeed & Accelerate & LLM 프로젝트

더보기

 

55일 차 회고.

 

 요즘 매주에 한 번씩은 아프게 되는 것 같다. 어제부터 목이 부은 것이 느껴져서 약국에서 약을 사 먹었는데 계속 아프면 수업이 끝나고 바로 병원에 들러야 할 것 같다.

 

 

 

 

1. DeepSpeed

 

 

1-1. DeepSpeed

 

DeepSpeed

  • Microsoft에서 개발한 오픈 소스 딥러닝 최적화 라이브러리
  • 자연어 처리(NLP)를 위한 대규모 언어 모델 학습의 효율성과 속도를 개선한다.
    • 모델 학습을 더 빠르고 비용 효율적으로 만들기 위해 설계되었다.
  • PyTorch 모델에 대한 몇 줄의 코드 변경만으로 속도와 확장성을 높일 수 있다.

 

주요 기능

  • Model Scale
    • 모델 분할
  • Speed
    • 효율적인 데이터 병렬 처리
  • Scalability
    • 모델 학습 가속
  • Usability
    • ZeRO 최적화

 

ZeRO(Zero Redundancy Optimizer)

  • ZeRO
    • 분산 학습 과정에서의 불필요한 메모리의 중복을 제거하여 같은 환경 내에서 대용량 모델을 학습한다.
    • 주요 최적화 단계
      • Optimizer State Partitioning($P_{os}$)
        • 메모리 4배 감소
      • Add Gradient Partitioning($P_{os+g}$)
        • 메모리 8배 감소
      • Add Parameter Partitioning($P_{os+g+p}$)
        • GPU 개수와 비례하여 메모리 감소
  • ZeRO-2
    • Model State Memory
    • Activation Memory
    • Fragmented Memory
  • ZeRO-3(ZeRO-Offload)
    • CPU 연산 제약
      • Optimizer 연산 및 Weight updates를 CPU가 수행한다.
    • 메모리 절감 최대화
    • ZeRO-Offload 스케줄링
    • 최적화된 CPU 실행

 

 

1-2. 예시 코드

 

Setup

!pip install deepspeed torchvision pillow matplotlib mpi4py
import argparse

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms

import deepspeed
from deepspeed.accelerator import get_accelerator
from deepspeed.moe.utils import split_params_into_different_moe_groups_for_optimizer
  • RunPod으로 실행할 경우, 오류가 뜬다. 따라서 다음을 Terminal에서 실행한다.
apt update
apt-get install libopenmpi-dev
apt --fix-broken install
apt install mpich
  • 그 후, Kernel을 재시작하여 다음 코드 또한 실행한 후에, 기존의 코드들을 재실행한다.
!pip install typing_extensions==4.7.1 --upgrade

 

Argument Parsing

def add_argument():
    parser = argparse.ArgumentParser(description="CIFAR")

    parser.add_argument(
        "-e",
        "--epochs",
        default=30,
        type=int,
        help="number of total epochs (default: 30)",
    )
    parser.add_argument(
        "--local_rank",
        type=int,
        default=-1,
        help="local rank passed from distributed launcher",
    )
    parser.add_argument(
        "--log-interval",
        type=int,
        default=2000,
        help="output logging information at a given interval",
    )

    parser.add_argument(
        "--dtype",
        default="fp16",
        type=str,
        choices=["bf16", "fp16", "fp32"],
        help="Datatype used for training",
    )

    parser.add_argument(
        "--stage",
        default=0,
        type=int,
        choices=[0, 1, 2, 3],
        help="Datatype used for training",
    )

    parser.add_argument(
        "--moe",
        default=False,
        action="store_true",
        help="use deepspeed mixture of experts (moe)",
    )
    parser.add_argument(
        "--ep-world-size", default=1, type=int, help="(moe) expert parallel world size"
    )
    parser.add_argument(
        "--num-experts",
        type=int,
        nargs="+",
        default=[
            1,
        ],
        help="number of experts list, MoE related.",
    )
    parser.add_argument(
        "--mlp-type",
        type=str,
        default="standard",
        help="Only applicable when num-experts > 1, accepts [standard, residual]",
    )
    parser.add_argument(
        "--top-k", default=1, type=int, help="(moe) gating top 1 and 2 supported"
    )
    parser.add_argument(
        "--min-capacity",
        default=0,
        type=int,
        help="(moe) minimum capacity of an expert regardless of the capacity_factor",
    )
    parser.add_argument(
        "--noisy-gate-policy",
        default=None,
        type=str,
        help="(moe) noisy gating (only supported with top-1). Valid values are None, RSample, and Jitter",
    )
    parser.add_argument(
        "--moe-param-group",
        default=False,
        action="store_true",
        help="(moe) create separate moe param groups, required when using ZeRO w. MoE",
    )

    parser = deepspeed.add_config_arguments(parser)

    args, unknown = parser.parse_known_args()

    return args

 

MoE(Mixture of Experts)

def create_moe_param_groups(model):
    parameters = {"params": [p for p in model.parameters()], "name": "parameters"}
    return split_params_into_different_moe_groups_for_optimizer(parameters)

 

Configure Parameters

def get_ds_config(args):
    ds_config = {
        "train_batch_size": 16,
        "steps_per_print": 2000,
        "optimizer": {
            "type": "Adam",
            "params": {
                "lr": 0.001,
                "betas": [0.8, 0.999],
                "eps": 1e-8,
                "weight_decay": 3e-7,
            },
        },
        "scheduler": {
            "type": "WarmupLR",
            "params": {
                "warmup_min_lr": 0,
                "warmup_max_lr": 0.001,
                "warmup_num_steps": 1000,
            },
        },
        "gradient_clipping": 1.0,
        "prescale_gradients": False,
        "bf16": {"enabled": args.dtype == "bf16"},
        "fp16": {
            "enabled": args.dtype == "fp16",
            "fp16_master_weights_and_grads": False,
            "loss_scale": 0,
            "loss_scale_window": 500,
            "hysteresis": 2,
            "min_loss_scale": 1,
            "initial_scale_power": 15,
        },
        "wall_clock_breakdown": False,
        "zero_optimization": {
            "stage": args.stage,
            "allgather_partitions": True,
            "reduce_scatter": True,
            "allgather_bucket_size": 50000000,
            "reduce_bucket_size": 50000000,
            "overlap_comm": True,
            "contiguous_gradients": True,
            "cpu_offload": False,
        }
    }

    return ds_config

 

Model

class Net(nn.Module):
    def __init__(self, args):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.moe = args.moe
        if self.moe:
            fc3 = nn.Linear(84, 84)
            self.moe_layer_list = []
            for n_e in args.num_experts:
                self.moe_layer_list.append(
                    deepspeed.moe.layer.MoE(
                        hidden_size=84,
                        expert=fc3,
                        num_experts=n_e,
                        ep_size=args.ep_world_size,
                        use_residual=args.mlp_type == "residual",
                        k=args.top_k,
                        min_capacity=args.min_capacity,
                        noisy_gate_policy=args.noisy_gate_policy,
                    )
                )
            self.moe_layer_list = nn.ModuleList(self.moe_layer_list)
            self.fc4 = nn.Linear(84, 10)
        else:
            self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        if self.moe:
            for layer in self.moe_layer_list:
                x, _, _ = layer(x)
            x = self.fc4(x)
        else:
            x = self.fc3(x)
        return x
def test(model_engine, testset, local_device, target_dtype, test_batch_size=4):
    classes = (
        "plane",
        "car",
        "bird",
        "cat",
        "deer",
        "dog",
        "frog",
        "horse",
        "ship",
        "truck",
    )

    testloader = torch.utils.data.DataLoader(
        testset, batch_size=test_batch_size, shuffle=False, num_workers=0
    )

    correct, total = 0, 0
    class_correct = list(0.0 for i in range(10))
    class_total = list(0.0 for i in range(10))

    model_engine.eval()
    with torch.no_grad():
        for data in testloader:
            images, labels = data
            if target_dtype != None:
                images = images.to(target_dtype)
            outputs = model_engine(images.to(local_device))
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels.to(local_device)).sum().item()

            batch_correct = (predicted == labels.to(local_device)).squeeze()
            for i in range(test_batch_size):
                label = labels[i]
                class_correct[label] += batch_correct[i].item()
                class_total[label] += 1

    if model_engine.local_rank == 0:
        print(
            f"Accuracy of the network on the {total} test images: {100 * correct / total : .0f} %"
        )

        for i in range(10):
            print(
                f"Accuracy of {classes[i] : >5s} : {100 * class_correct[i] / class_total[i] : 2.0f} %"
            )

 

DeepSpeed

def main(args):
    deepspeed.init_distributed()

    transform = transforms.Compose(
        [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
    )

    if torch.distributed.get_rank() != 0:
        torch.distributed.barrier()

    trainset = torchvision.datasets.CIFAR10(
        root="./data", train=True, download=True, transform=transform
    )
    testset = torchvision.datasets.CIFAR10(
        root="./data", train=False, download=True, transform=transform
    )

    if torch.distributed.get_rank() == 0:
        torch.distributed.barrier()

    net = Net(args)

    parameters = filter(lambda p: p.requires_grad, net.parameters())

    if args.moe_param_group:
        parameters = create_moe_param_groups(net)

    ds_config = get_ds_config(args)
    model_engine, optimizer, trainloader, __ = deepspeed.initialize(
        args=args,
        model=net,
        model_parameters=parameters,
        training_data=trainset,
        config=ds_config,
    )

    local_device = get_accelerator().device_name(model_engine.local_rank)
    local_rank = model_engine.local_rank

    target_dtype = None
    if model_engine.bfloat16_enabled():
        target_dtype = torch.bfloat16
    elif model_engine.fp16_enabled():
        target_dtype = torch.half

    criterion = nn.CrossEntropyLoss()

    for epoch in range(args.epochs):
        running_loss = 0.0
        for i, data in enumerate(trainloader):
            inputs, labels = data[0].to(local_device), data[1].to(local_device)

            if target_dtype != None:
                inputs = inputs.to(target_dtype)

            outputs = model_engine(inputs)
            loss = criterion(outputs, labels)
            
            model_engine.backward(loss)
            model_engine.step()

            running_loss += loss.item()
            if local_rank == 0 and i % args.log_interval == (
                args.log_interval - 1
            ):  # Print every log_interval mini-batches.
                print(
                    f"[{epoch + 1 : d}, {i + 1 : 5d}] loss: {running_loss / args.log_interval : .3f}"
                )
                running_loss = 0.0
    print("Finished Training")

    test(model_engine, testset, local_device, target_dtype)

 

Training

if __name__ == "__main__":
    args = add_argument()
    main(args)

 

 

 

2. Accelerate

 

 

2-1. Accelerate

 

Accelerate

  • 딥러닝 모델을 훈련하는 데 있어 분산 훈련을 쉽게 설정하고 관리할 수 있는 라이브러리
  • 주로 Transformers 모델을 훈련할 때 사용한다.
  • 단 몇 줄의 코드로 분산 훈련 환경을 구성할 수 있다.

 

대규모 모델 학습 시 고려 요소

  • 배치 사이즈 선택
  • Gradient Accumulation
  • Gradient Checkpointing
  • Mixed Precision Training
  • Optimizer 선택
  • Data Preloading
  • DeepSpeed ZeRO
  • torch.compile
  • HF PEFT(Parameter Efficient Fine Tuning)

 

 

2-2. 예시 코드

 

Setup

!python -m pip install --upgrade pip
!pip install mpi4py-mpich
!pip install -U accelerate transformers evaluate datasets schedulefree huggingface_hub deepspeed scipy scikit-learn
import os

os.environ['HF_TOKEN'] = ''
def clean_memory():
    import gc
    import torch
    
    for key in list(globals().keys()):
        if key not in ['clean_memory', '__name__', '__doc__', '__package__', '__loader__', '__spec__',
                       '__builtin__', '__builtins__', '_ih', '_oh', '_dh', 'In', 'Out', 'get_ipython',
                       'exit', 'quit', '_', '__', '___', '_i', '_ii', '_iii', '_i1', 'gc', '_1', '_i2', '_i3']:
            del globals()[key]
    
    gc.collect()
    torch.cuda.empty_cache()

clean_memory()

 

Configuration

!accelerate config default

# !mv /root/.cache/huggingface/accelerate/default_config.yaml /content/
!mv /root/.cache/huggingface/accelerate/default_config.yaml /workspace/
# default_config.yaml

{
  "compute_environment": "LOCAL_MACHINE",
  "debug": false,
  "distributed_type": "NO",
  "downcast_bf16": false,
  "enable_cpu_affinity": false,
  "machine_rank": 0,
  "main_training_function": "main",
  "mixed_precision": "no",
  "num_machines": 1,
  "num_processes": 1,
  "rdzv_backend": "static",
  "same_network": false,
  "tpu_use_cluster": false,
  "tpu_use_sudo": false,
  "use_cpu": false,
  
  # ZeRO Stage-3 with CPU Offload DeepSpeed Plugin Example
  "deepspeed_config": {
    "gradient_accumulation_steps": 1,
    "gradient_clipping": 1.0,
    "offload_optimizer_device": cpu,
    "offload_param_device": cpu,
    "zero3_init_flag": true,
    "zero3_save_16bit_model": true,
    "zero_stage": 3
  }
}
# !mv /content/default_config.yaml /root/.cache/huggingface/accelerate/
!mv /workspace/default_config.yaml /root/.cache/huggingface/accelerate/

!accelerate config update
import argparse

import evaluate

from datasets import load_dataset

import torch
from torch.optim import AdamW
from torch.utils.data import DataLoader
from transformers import AutoModelForSequenceClassification, AutoTokenizer, get_linear_schedule_with_warmup, set_seed

from accelerate import Accelerator, DistributedType

MAX_GPU_BATCH_SIZE = 16
EVAL_BATCH_SIZE = 32

 

 

 

3. LLM 프로젝트

 

 

3-1. Flow Chart

 

Main

 

Database

 

 

3-2. File Structure

 

LLM_PROJECT
├─ database
│   ├─ db
│   └─ docker-compose.yml
├─ etl
│   └─ README.md
├─ training
│   └─ README.md
├─ web
│   ├─ .venv
│   ├─ common
│   │   ├─ database
│   │   │   └─ connector.py
│   │   ├─ display
│   │   │   ├─ constant.py
│   │   │   ├─ history.py
│   │   │   ├─ input.py
│   │   │   ├─ output.py
│   │   │   └─ utils.py
│   │   ├─ model
│   │   │   ├─ groq.py
│   │   │   ├─ ollama.py
│   │   │   ├─ openai.py
│   │   │   ├─ provider_class.py
│   │   │   └─ provider.py
│   │   ├─ service
│   │   │   └─ film_service.py
│   │   └─ pages
│   │       └─ db.py
│   ├─ .env
│   ├─ chatbot.py
│   ├─ requirements.txt
│   └─ test.ipynb
├─ .gitignore
└─ README.md

 

 

3-3. GGUF

 

.gguf - gemma3

  • .gguf 파일과 Modelfile을 통해 모델을 생성한다.
ollama create gemma3-q8 -f Modelfile

 

일반 파일 - gemma3

  • 일반 파일을 .gguf 파일로 convert한 후, 위의 방법을 사용한다.
import os

os.environ['HF_TOKEN'] = ''
from IPython.display import clear_output

!git clone https://github.com/ggerganov/llama.cpp.git
%cd llama.cpp

!apt update
!apt install -y build-essential cmake git libssl-dev libcurl4-openssl-dev

clear_output()

!make

%cd /content
!pip install huggingface_hub sentencepiece safetensors

from huggingface_hub import snapshot_download

model_id = 'google/gemma-3-1b-it'
snapshot_download(
    repo_id=model_id,
    local_dir='gemma',
    local_dir_use_symlinks=False,
    revision='main'
)
!python /content/llama.cpp/convert_hf_to_gguf.py /content/gemma/ --outfile gemma3-1b.gguf