02 - CI/CD가 포함된 ML 파이프라인: GitHub Actions + Docker
시리즈의 첫 번째 기사에서는 ML 프로젝트의 85%가 프로덕션에 도달하지 못하는 이유를 확인했습니다. MLOps가 이 문제를 어떻게 해결하는지 알아보세요. 모놀리식 노트북을 파이프라인으로 전환했습니다. 모듈식이며 구성 가능합니다. 이제 다음 단계를 밟아야 할 때입니다. 전체를 자동화하다 CI/CD가 포함된 파이프라인, 코드, 데이터 또는 구성이 변경될 수 있도록 모델 훈련, 검증 및 배포를 자동으로 트리거합니다.
이 기사에서는 다음을 사용하여 기계 학습을 위한 완전한 CI/CD 파이프라인을 구축합니다. GitHub 작업 오케스트레이터로서 도커 런타임으로 실행. 우리는 이론에만 국한되지 않고 분류자를 사용하여 실제 프로젝트를 만들 것입니다. 다단계 Dockerfile, YAML 워크플로, 데이터 검증, 모델 레지스트리로 완성된 감정 자동 배포.
무엇을 배울 것인가
- ML용 CI/CD가 기존 소프트웨어 CI/CD와 다른 이유
- 엔드투엔드 ML 파이프라인의 아키텍처를 설계하는 방법
- 학습 및 제공에 최적화된 다단계 Dockerfile을 만드는 방법
- ML을 위한 완전한 GitHub Actions 워크플로를 작성하는 방법
- 데이터 검증, 모델 레지스트리 및 자동 배포를 통합하는 방법
- 파이프라인에서 DVC를 사용하여 데이터 버전 관리를 관리하는 방법
- ML 관련 테스트(단위, 통합, 연기) 구현 방법
- 배포 후 모델을 모니터링하는 방법
- 캐싱 및 자체 호스팅 실행기를 사용하여 비용을 최적화하는 방법
- 팀에 적합한 CI/CD 도구를 선택하는 방법
ML용 CI/CD와 다른 이유
당신이 전통적인 소프트웨어 개발의 세계에서 왔다면, 단지 적용만 한다고 생각할 수도 있습니다. ML 프로젝트와 동일한 CI/CD 방식을 적용합니다. 실제로 머신러닝은 복잡성을 야기합니다. 특별한 접근 방식이 필요한 고유한 것입니다. 근본적인 차이점은 소프트웨어에서 전통적으로 CI/CD는 하나의 아티팩트(코드)만 관리하는 반면 ML에서는 여러 개의 아티팩트를 관리해야 합니다. 코드, 데이터, 모델 세 가지를 동시에.
ML의 세 가지 아티팩트
기존 소프트웨어에서는 코드가 변경되지 않으면 출력도 변경되지 않습니다. ML에서는 동일한 코드, 데이터 변경으로 인해 다른 모델이 생성됩니다. 이는 다음을 의미합니다. CI/CD 파이프라인은 세 가지 독립적인 차원을 추적하고 검증해야 합니다.
| 크기 | 기존 CI/CD | ML용 CI/CD |
|---|---|---|
| 암호 | Git 푸시가 빌드 + 테스트를 트리거합니다. | Git 푸시 트리거 교육 + 평가 |
| 데이터 | 해당 없음 | 새로운 데이터가 재교육을 유발합니다 |
| 모델 | 해당 없음 | 새 모델에는 검증 + 프로모션이 필요합니다. |
| 구성 | 기능 플래그, 환경 변수 | 하이퍼파라미터, 기능 세트, 측정항목 임계값 |
| 환경 | OS + 라이브러리 | OS + 라이브러리 + GPU 드라이버 + CUDA 버전 |
| 확인 | 합격/불합격 테스트 | 임계값 초과/미달 측정항목 + 생산 중인 모델과의 비교 |
| 전개 | 배포 또는 롤백 | 점진적 배포 + A/B 테스트 + 드리프트 모니터링 |
지속적인 훈련: 핵심 개념
ML용 CI/CD는 기존 소프트웨어에는 존재하지 않는 개념을 도입합니다. 는 지속적인 훈련(CT). 지속적인 통합과 지속적인 통합뿐만 아니라 배포, CT는 다음과 같은 경우 모델이 자동으로 재학습되도록 보장합니다.
- 새로운 데이터 도착: 데이터세트가 새로운 관측값으로 업데이트됩니다.
- 코드를 변경하세요: 전처리 또는 알고리즘이 변경되었습니다.
- 측정항목이 저하됩니다. 모니터링을 통해 데이터 드리프트 또는 성능 저하 감지
- 타이머가 만료됩니다: 예정된 재교육(예: 매주)이 활성화되었습니다.
일반적인 오류: CT가 없는 CI/CD
많은 팀이 ML 코드용 CI/CD를 구현하지만 지속적인 학습을 잊어버립니다. 그 결과, 한 번 배포된 후 다시는 업데이트되지 않는 모델이 탄생했습니다. 프로덕션 데이터가 다양해짐에 따라 시간이 지남에 따라 자동으로 저하됨 훈련 데이터에서. CT가 없는 파이프라인은 유지 관리가 없는 자동차와 같습니다. 고장날 때까지 작동합니다.
ML 파이프라인 아키텍처
코드를 작성하기 전에 파이프라인의 전체 아키텍처를 설계합니다. 모든 단계 특정 입력과 출력이 있으며 한 단계에 오류가 발생하면 후속 단계가 차단됩니다. 이 "빠른 실패" 접근 방식은 검증된 모델만 생산에 도달하도록 보장합니다.
+------------------+ +------------------+ +------------------+
| DATA INGESTION |---->| PREPROCESSING |---->| TRAINING |
| | | | | |
| - Pull dati DVC | | - Pulizia | | - Train modello |
| - Validazione | | - Feature eng. | | - Log metriche |
| - Schema check | | - Split train/ | | - Log parametri |
| | | test/val | | - Salva artefatti|
+------------------+ +------------------+ +------------------+
| |
| (trigger: dati |
| nuovi/schedule) v
| +------------------+
| | EVALUATION |
| | |
| | - Metriche |
| | - Confronto con |
| | produzione |
| | - Gate: soglie |
| +------------------+
| |
| (se metriche > threshold)
| v
+------------------+ +------------------+ +------------------+
| MONITORING |<----| SMOKE TEST |<----| DEPLOYMENT |
| | | | | |
| - Health check | | - Test endpoint | | - Push registry |
| - Drift detect. | | - Predizione | | - Stage/Prod |
| - Alert | | di prova | | - Rollback ready |
| - Trigger retrain| | - Latenza check | | |
+------------------+ +------------------+ +------------------+
각 블록은 GitHub Actions 워크플로의 단계에 해당합니다. 지금 보자 Docker를 사용한 컨테이너화부터 시작하여 각 단계를 구현하는 방법을 자세히 설명합니다.
머신러닝용 도커
Docker는 ML에서 가장 실망스러운 문제 중 하나를 해결합니다. "내 컴퓨터에서 작동합니다". 훈련 및 제공 환경을 컨테이너화함으로써 코드가 다음을 생성하도록 보장합니다. 데이터 과학자의 노트북, CI/CD 실행기 등 어디에서 실행하든 동일한 결과가 나타납니다. 그리고 생산 중입니다. ML의 경우 Docker에는 특별한 주의가 필요합니다. 규모가 매우 크고(과학 라이브러리 + GPU 드라이버) 빌드가 느려질 수 있습니다.
ML용 기본 이미지
이미지 기반의 선택은 크기와 호환성을 위해 중요합니다. 옵션은 다음과 같습니다. 주요 항목과 사용 시기.
| 기본 이미지 | 크기 | 사용 | 그것을 선택하는 경우 |
|---|---|---|---|
| 파이썬:3.11-슬림 | ~120MB | 훈련/서빙 CPU | Scikit-learn 모델, XGBoost, 가벼운 서비스 제공 |
| 파이썬:3.11-책벌레 | ~900MB | 빌드 도구를 사용한 교육 | C/C++ 컴파일이 필요한 종속성 |
| 엔비디아/쿠다:12.1-런타임 | ~3.5GB | GPU 추론 | 딥러닝 모델 제공 |
| 엔비디아/쿠다:12.1-devel | ~5.2GB | GPU 훈련 | CUDA를 사용한 PyTorch/TensorFlow 교육 |
| 파이토치/파이토치:2.1.0-cuda12.1 | ~6GB | PyTorch 훈련/제공 | 수동 CUDA 설정을 피하고 싶은 PyTorch 프로젝트 |
학습 및 제공을 위한 Dockerfile 다단계
다단계 패턴은 ML의 기본입니다. 두 개의 별도 단계를 사용하여 다음을 얻을 수 있습니다. 완전한 빌드 환경(컴파일러 및 빌드 도구 포함) 및 최종 이미지 필요한 런타임만 포함되어 간소화되었습니다. 이렇게 하면 이미지 크기가 줄어듭니다 최종적으로는 60%까지.
# ============================================
# Stage 1: Builder - installa dipendenze
# ============================================
FROM python:3.11-slim AS builder
WORKDIR /build
# Installa build tools necessari per compilare dipendenze native
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Copia e installa dipendenze in un virtual environment
COPY requirements.txt .
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# ============================================
# Stage 2: Training - esegue il training
# ============================================
FROM python:3.11-slim AS trainer
WORKDIR /app
# Copia virtual environment dallo stage builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Copia codice sorgente
COPY src/ ./src/
COPY config/ ./config/
COPY train.py .
COPY evaluate.py .
# Entrypoint per training
ENTRYPOINT ["python", "train.py"]
# ============================================
# Stage 3: Serving - API di produzione
# ============================================
FROM python:3.11-slim AS serving
WORKDIR /app
# Utente non-root per sicurezza
RUN useradd --create-home appuser
# Copia virtual environment dallo stage builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Copia solo il codice necessario per serving
COPY src/serving/ ./src/serving/
COPY src/preprocessing/ ./src/preprocessing/
# Healthcheck endpoint
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')"
# Switch a utente non-root
USER appuser
# Porta di default
EXPOSE 8000
# Entrypoint per serving
ENTRYPOINT ["uvicorn", "src.serving.app:app", "--host", "0.0.0.0", "--port", "8000"]
ML이 다단계인 이유는 무엇입니까?
- 안전: 제공 이미지에 컴파일러나 빌드 도구가 포함되어 있지 않습니다.
- 크기: 제공 단계가 훨씬 가볍습니다(~300MB 대 ~1.2GB).
- 은닉처: 종속성은 레이어 캐시를 활용하여 코드보다 덜 자주 변경됩니다.
- 유연성: 훈련 단계만 구축하거나 서비스 단계만 구축할 수 있습니다.
레이어 캐시 최적화
Dockerfile의 COPY 문의 순서는 캐시에 매우 중요합니다. Python 종속성
거의 변경되지 않고 소스 코드가 자주 변경됩니다. 먼저 복사해서
requirements.txt 그런 다음 코드를 사용하면 종속성을 다시 설치하지 않아도 됩니다.
코드가 변경될 때마다.
# Dati e modelli (gestiti da DVC, non da Docker)
data/
models/
*.pkl
*.h5
*.pt
# Ambiente di sviluppo
.venv/
__pycache__/
*.pyc
.pytest_cache/
.mypy_cache/
# Git e CI
.git/
.github/
.dvc/cache/
# IDE e editor
.vscode/
.idea/
*.swp
# Documentazione
docs/
*.md
LICENSE
GPU를 지원하는 Docker
딥 러닝 모델을 훈련하려면 컨테이너에서 GPU 지원이 필요합니다. 도커는 지원합니다 NVIDIA 컨테이너 툴킷을 통한 NVIDIA GPU. 설치에는 NVIDIA 드라이버가 필요합니다 호스트 및 설치된 툴킷에 있습니다.
# Base image con CUDA runtime
FROM nvidia/cuda:12.1.1-runtime-ubuntu22.04 AS gpu-trainer
WORKDIR /app
# Installa Python e dipendenze di sistema
RUN apt-get update && apt-get install -y --no-install-recommends \
python3.11 \
python3.11-venv \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# Virtual environment
RUN python3.11 -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Dipendenze PyTorch con CUDA
COPY requirements-gpu.txt .
RUN pip install --no-cache-dir -r requirements-gpu.txt
COPY src/ ./src/
COPY train.py .
# Variabili ambiente per CUDA
ENV NVIDIA_VISIBLE_DEVICES=all
ENV NVIDIA_DRIVER_CAPABILITIES=compute,utility
ENTRYPOINT ["python", "train.py"]
# Build dell'immagine GPU
docker build -f Dockerfile.gpu -t ml-trainer:gpu .
# Esecuzione con accesso GPU
docker run --gpus all \
-v $(pwd)/data:/app/data \
-v $(pwd)/models:/app/models \
ml-trainer:gpu \
--config config/training.yaml
기계 학습을 위한 GitHub 작업
GitHub Actions는 자동화된 워크플로를 실행하는 GitHub에 통합된 CI/CD 서비스입니다. 이벤트(푸시, 풀 요청, 일정, 수동 디스패치)에 대한 응답입니다. ML의 경우 다음을 제공합니다. 중요한 장점: Git 저장소와의 기본 통합, 액션이 포함된 마켓플레이스 사전 정의, 자격 증명 비밀 관리 및 월 최대 2,000분 무료 제공 공개 저장소용.
ML 워크플로 구조
ML용 GitHub Actions 워크플로에는 여러 일치 작업이라는 특정 구조가 있습니다. 작업과 실행 조건 간의 명시적인 종속성을 통해 파이프라인 단계로 모델 측정항목을 기반으로 합니다.
name: ML Pipeline - Train, Evaluate, Deploy
on:
# Trigger su push al branch main (codice o config)
push:
branches: [main]
paths:
- 'src/**'
- 'config/**'
- 'requirements.txt'
- 'train.py'
- 'evaluate.py'
# Trigger schedulato per retraining periodico
schedule:
- cron: '0 6 * * 1' # Ogni lunedi alle 6:00 UTC
# Trigger manuale con parametri
workflow_dispatch:
inputs:
force_deploy:
description: 'Forza il deployment anche se le metriche non migliorano'
required: false
default: 'false'
type: choice
options:
- 'true'
- 'false'
training_config:
description: 'File di configurazione per il training'
required: false
default: 'config/training.yaml'
env:
PYTHON_VERSION: '3.11'
DOCKER_REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}/ml-model
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
jobs:
# ============================================
# Job 1: Data Validation
# ============================================
data-validation:
name: Validate Data Quality
runs-on: ubuntu-latest
outputs:
data_valid: ${{ steps.validate.outputs.valid }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Setup DVC
uses: iterative/setup-dvc@v2
- name: Pull data from DVC
run: dvc pull
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Validate data quality
id: validate
run: |
python -m src.data.validate_data \
--data-path data/raw/reviews.csv \
--schema-path config/data_schema.yaml
echo "valid=true" >> $GITHUB_OUTPUT
# ============================================
# Job 2: Model Training
# ============================================
training:
name: Train Model
needs: data-validation
if: needs.data-validation.outputs.data_valid == 'true'
runs-on: ubuntu-latest
outputs:
model_version: ${{ steps.train.outputs.model_version }}
run_id: ${{ steps.train.outputs.run_id }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Setup DVC and pull data
run: |
pip install dvc[s3]
dvc pull
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Train model
id: train
run: |
python train.py \
--config ${{ github.event.inputs.training_config || 'config/training.yaml' }} \
--output-dir models/
echo "model_version=$(cat models/version.txt)" >> $GITHUB_OUTPUT
echo "run_id=$(cat models/run_id.txt)" >> $GITHUB_OUTPUT
env:
MLFLOW_TRACKING_URI: ${{ env.MLFLOW_TRACKING_URI }}
MLFLOW_TRACKING_USERNAME: ${{ secrets.MLFLOW_USERNAME }}
MLFLOW_TRACKING_PASSWORD: ${{ secrets.MLFLOW_PASSWORD }}
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: trained-model
path: models/
retention-days: 30
# ============================================
# Job 3: Model Evaluation
# ============================================
evaluation:
name: Evaluate Model
needs: training
runs-on: ubuntu-latest
outputs:
metrics_pass: ${{ steps.evaluate.outputs.metrics_pass }}
accuracy: ${{ steps.evaluate.outputs.accuracy }}
f1_score: ${{ steps.evaluate.outputs.f1_score }}
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
cache: 'pip'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Download model artifact
uses: actions/download-artifact@v4
with:
name: trained-model
path: models/
- name: Setup DVC and pull test data
run: |
pip install dvc[s3]
dvc pull data/test/
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Evaluate model
id: evaluate
run: |
python evaluate.py \
--model-path models/model.pkl \
--test-data data/test/reviews_test.csv \
--thresholds config/thresholds.yaml \
--output metrics/report.json
echo "accuracy=$(python -c 'import json; print(json.load(open(\"metrics/report.json\"))[\"accuracy\"])')" >> $GITHUB_OUTPUT
echo "f1_score=$(python -c 'import json; print(json.load(open(\"metrics/report.json\"))[\"f1_score\"])')" >> $GITHUB_OUTPUT
echo "metrics_pass=$(python -c 'import json; print(json.load(open(\"metrics/report.json\"))[\"pass\"])')" >> $GITHUB_OUTPUT
- name: Post metrics as PR comment
if: github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const metrics = JSON.parse(fs.readFileSync('metrics/report.json'));
const body = `## Model Evaluation Results
| Metric | Value | Threshold | Status |
|--------|-------|-----------|--------|
| Accuracy | ${metrics.accuracy.toFixed(4)} | ${metrics.thresholds.accuracy} | ${metrics.accuracy >= metrics.thresholds.accuracy ? 'PASS' : 'FAIL'} |
| F1 Score | ${metrics.f1_score.toFixed(4)} | ${metrics.thresholds.f1_score} | ${metrics.f1_score >= metrics.thresholds.f1_score ? 'PASS' : 'FAIL'} |
| AUC-ROC | ${metrics.auc_roc.toFixed(4)} | ${metrics.thresholds.auc_roc} | ${metrics.auc_roc >= metrics.thresholds.auc_roc ? 'PASS' : 'FAIL'} |`;
github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.issue.number,
body: body
});
- name: Upload evaluation report
uses: actions/upload-artifact@v4
with:
name: evaluation-report
path: metrics/
# ============================================
# Job 4: Build and Push Docker Image
# ============================================
build-image:
name: Build Docker Image
needs: [evaluation]
if: |
needs.evaluation.outputs.metrics_pass == 'true' ||
github.event.inputs.force_deploy == 'true'
runs-on: ubuntu-latest
outputs:
image_tag: ${{ steps.meta.outputs.tags }}
steps:
- uses: actions/checkout@v4
- name: Download model artifact
uses: actions/download-artifact@v4
with:
name: trained-model
path: models/
- name: Docker meta
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=sha,prefix=
type=raw,value=latest
- name: Login to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.DOCKER_REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push serving image
uses: docker/build-push-action@v5
with:
context: .
target: serving
push: true
tags: ${{ steps.meta.outputs.tags }}
cache-from: type=gha
cache-to: type=gha,mode=max
# ============================================
# Job 5: Deploy to Staging + Smoke Test
# ============================================
deploy:
name: Deploy and Smoke Test
needs: [build-image, training]
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: |
echo "Deploying model version ${{ needs.training.outputs.model_version }}"
# Qui il comando di deploy reale (kubectl, docker-compose, etc.)
- name: Smoke test
run: |
# Attendi che il servizio sia pronto
for i in $(seq 1 30); do
if curl -sf http://staging:8000/health; then
echo "Service is ready"
break
fi
echo "Waiting for service... attempt $i"
sleep 5
done
# Test di predizione
RESPONSE=$(curl -sf -X POST http://staging:8000/predict \
-H "Content-Type: application/json" \
-d '{"text": "This product is amazing, I love it!"}')
echo "Prediction response: $RESPONSE"
# Verifica che la risposta sia valida
echo "$RESPONSE" | python -c "
import sys, json
data = json.load(sys.stdin)
assert 'prediction' in data, 'Missing prediction field'
assert 'confidence' in data, 'Missing confidence field'
assert data['confidence'] > 0.5, 'Low confidence'
print('Smoke test PASSED')
"
파이프라인의 비밀과 보안
YAML 워크플로에 자격 증명을 직접 입력하지 마세요. 항상 GitHub 비밀을 사용하세요 AWS 키, MLflow 토큰, Docker 레지스트리 자격 증명 및 기타 민감한 정보. 비밀 구성 설정 > 비밀 및 변수 > 작업 GitHub 저장소에 있습니다.
예시 프로젝트: 감정 분류자
구체적인 프로젝트로 모든 것을 하나로 묶어 봅시다. 감정 분류기를 구축하겠습니다. 완전한 CI/CD 파이프라인을 갖춘 제품 검토용. 이 프로젝트에서는 scikit-learn을 사용합니다. 단순하지만 아키텍처는 PyTorch 또는 TensorFlow 모델에 동일하게 적용됩니다.
프로젝트 구조
sentiment-classifier/
src/
data/
__init__.py
preprocessing.py # Pulizia e trasformazione testi
validate_data.py # Validazione schema e qualità
models/
__init__.py
trainer.py # Training del classificatore
serving/
__init__.py
app.py # FastAPI application
schemas.py # Pydantic schemas
monitoring/
__init__.py
health.py # Health checks
tests/
test_preprocessing.py # Unit test preprocessing
test_trainer.py # Unit test training
test_api.py # Integration test API
config/
training.yaml # Configurazione training
thresholds.yaml # Soglie metriche
data_schema.yaml # Schema validazione dati
train.py # Entrypoint training
evaluate.py # Entrypoint evaluation
Dockerfile # Multi-stage build
requirements.txt # Dipendenze Python
.github/
workflows/
ml-pipeline.yml # Pipeline CI/CD
.dvc/ # Configurazione DVC
dvc.yaml # Pipeline DVC
dvc.lock # Lock file DVC
훈련 스크립트
"""Script principale di training per il classificatore di sentiment."""
import argparse
import yaml
import mlflow
import mlflow.sklearn
from pathlib import Path
from datetime import datetime
from src.data.preprocessing import load_and_preprocess_data, split_dataset
from src.models.trainer import create_pipeline, train_model
def parse_args():
"""Parse degli argomenti da riga di comando."""
parser = argparse.ArgumentParser(
description="Train sentiment classifier"
)
parser.add_argument(
"--config",
type=str,
default="config/training.yaml",
help="Path al file di configurazione"
)
parser.add_argument(
"--output-dir",
type=str,
default="models/",
help="Directory per salvare il modello"
)
parser.add_argument(
"--experiment-name",
type=str,
default="sentiment-classifier",
help="Nome dell'esperimento MLflow"
)
return parser.parse_args()
def main():
"""Esegue la pipeline di training completa."""
args = parse_args()
# 1. Carica configurazione
with open(args.config) as f:
config = yaml.safe_load(f)
# 2. Setup MLflow
mlflow.set_experiment(args.experiment_name)
with mlflow.start_run(run_name=f"train-{datetime.now().strftime('%Y%m%d-%H%M%S')}") as run:
# 3. Log dei parametri
mlflow.log_params(config["model"])
mlflow.log_param("data_path", config["data"]["train_path"])
mlflow.log_param("test_size", config["data"]["test_size"])
# 4. Caricamento e preprocessing dati
print("[1/5] Caricamento e preprocessing dati...")
X, y = load_and_preprocess_data(config["data"]["train_path"])
# 5. Split dataset
print("[2/5] Split train/validation...")
X_train, X_val, y_train, y_val = split_dataset(
X, y,
test_size=config["data"]["test_size"],
random_state=config["data"]["random_state"]
)
mlflow.log_param("train_size", len(X_train))
mlflow.log_param("val_size", len(X_val))
# 6. Crea pipeline di preprocessing + modello
print("[3/5] Creazione pipeline ML...")
pipeline = create_pipeline(config["model"])
# 7. Training
print("[4/5] Training in corso...")
trained_pipeline = train_model(pipeline, X_train, y_train)
# 8. Valutazione su validation set
print("[5/5] Valutazione...")
from src.models.trainer import evaluate_model
metrics = evaluate_model(trained_pipeline, X_val, y_val)
# 9. Log metriche in MLflow
mlflow.log_metrics(metrics)
# 10. Salva modello
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
import joblib
model_path = output_dir / "model.pkl"
joblib.dump(trained_pipeline, model_path)
# Log modello in MLflow con signature
from mlflow.models.signature import infer_signature
signature = infer_signature(X_train[:5], trained_pipeline.predict(X_train[:5]))
mlflow.sklearn.log_model(
trained_pipeline,
"model",
signature=signature,
registered_model_name="sentiment-classifier"
)
# Salva versione e run_id per il CI/CD
(output_dir / "version.txt").write_text(run.info.run_id[:8])
(output_dir / "run_id.txt").write_text(run.info.run_id)
print(f"\nTraining completato!")
print(f" Run ID: {run.info.run_id}")
for name, value in metrics.items():
print(f" {name}: {value:.4f}")
if __name__ == "__main__":
main()
전처리 모듈
"""Modulo per il preprocessing dei dati testuali."""
import re
import pandas as pd
from typing import Tuple
from sklearn.model_selection import train_test_split
def clean_text(text: str) -> str:
"""Pulisce un singolo testo rimuovendo HTML, caratteri speciali e spazi extra."""
if not isinstance(text, str):
return ""
# Rimuovi tag HTML
text = re.sub(r'<[^>]+>', '', text)
# Rimuovi URL
text = re.sub(r'http\S+|www\.\S+', '', text)
# Rimuovi caratteri speciali (mantieni lettere, numeri, spazi)
text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text)
# Normalizza spazi
text = re.sub(r'\s+', ' ', text).strip()
return text.lower()
def load_and_preprocess_data(data_path: str) -> Tuple[pd.Series, pd.Series]:
"""Carica e preprocess il dataset di recensioni."""
df = pd.read_csv(data_path)
# Validazione colonne richieste
required_cols = ["review_text", "sentiment"]
missing = [c for c in required_cols if c not in df.columns]
if missing:
raise ValueError(f"Colonne mancanti nel dataset: {missing}")
# Rimuovi righe con valori mancanti
df = df.dropna(subset=required_cols)
# Pulisci testi
df["clean_text"] = df["review_text"].apply(clean_text)
# Rimuovi testi vuoti dopo pulizia
df = df[df["clean_text"].str.len() > 0]
return df["clean_text"], df["sentiment"]
def split_dataset(
X: pd.Series,
y: pd.Series,
test_size: float = 0.2,
random_state: int = 42
) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series]:
"""Split stratificato del dataset."""
return train_test_split(
X, y,
test_size=test_size,
random_state=random_state,
stratify=y
)
교육 모듈
"""Modulo per la creazione, il training e la valutazione del modello."""
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
accuracy_score, f1_score, precision_score,
recall_score, roc_auc_score
)
from typing import Dict, Any
import pandas as pd
MODELS = {
"logistic_regression": LogisticRegression,
"random_forest": RandomForestClassifier,
}
def create_pipeline(model_config: Dict[str, Any]) -> Pipeline:
"""Crea una pipeline sklearn con TF-IDF + classificatore."""
algorithm = model_config.get("algorithm", "logistic_regression")
model_class = MODELS.get(algorithm)
if model_class is None:
raise ValueError(
f"Algoritmo non supportato: {algorithm}. "
f"Supportati: {list(MODELS.keys())}"
)
# Parametri specifici del modello
model_params = {
k: v for k, v in model_config.items()
if k not in ("algorithm", "tfidf")
}
# Parametri TF-IDF
tfidf_params = model_config.get("tfidf", {})
return Pipeline([
("tfidf", TfidfVectorizer(
max_features=tfidf_params.get("max_features", 10000),
ngram_range=tuple(tfidf_params.get("ngram_range", [1, 2])),
min_df=tfidf_params.get("min_df", 2),
max_df=tfidf_params.get("max_df", 0.95),
)),
("classifier", model_class(**model_params)),
])
def train_model(
pipeline: Pipeline,
X_train: pd.Series,
y_train: pd.Series
) -> Pipeline:
"""Addestra la pipeline sul training set."""
pipeline.fit(X_train, y_train)
return pipeline
def evaluate_model(
pipeline: Pipeline,
X_test: pd.Series,
y_test: pd.Series
) -> Dict[str, float]:
"""Valuta il modello e restituisce tutte le metriche."""
y_pred = pipeline.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"f1_score": f1_score(y_test, y_pred, average="weighted"),
"precision": precision_score(y_test, y_pred, average="weighted"),
"recall": recall_score(y_test, y_pred, average="weighted"),
}
# AUC-ROC solo per classificazione binaria
if len(set(y_test)) == 2:
y_proba = pipeline.predict_proba(X_test)[:, 1]
metrics["auc_roc"] = roc_auc_score(y_test, y_proba)
return metrics
평가 스크립트
"""Script di valutazione del modello con confronto soglie."""
import argparse
import json
import yaml
import joblib
import pandas as pd
from pathlib import Path
from src.data.preprocessing import clean_text
from src.models.trainer import evaluate_model
def parse_args():
parser = argparse.ArgumentParser(description="Evaluate trained model")
parser.add_argument("--model-path", required=True, help="Path al modello .pkl")
parser.add_argument("--test-data", required=True, help="Path ai dati di test")
parser.add_argument("--thresholds", required=True, help="Path al file soglie YAML")
parser.add_argument("--output", required=True, help="Path per il report JSON")
return parser.parse_args()
def main():
args = parse_args()
# 1. Carica modello e dati
pipeline = joblib.load(args.model_path)
df = pd.read_csv(args.test_data)
df["clean_text"] = df["review_text"].apply(clean_text)
X_test = df["clean_text"]
y_test = df["sentiment"]
# 2. Valuta
metrics = evaluate_model(pipeline, X_test, y_test)
# 3. Confronta con soglie
with open(args.thresholds) as f:
thresholds = yaml.safe_load(f)["thresholds"]
all_pass = True
results = {}
for metric_name, threshold_value in thresholds.items():
actual = metrics.get(metric_name, 0.0)
passed = actual >= threshold_value
if not passed:
all_pass = False
results[metric_name] = {
"value": actual,
"threshold": threshold_value,
"pass": passed
}
# 4. Genera report
report = {
**metrics,
"thresholds": thresholds,
"details": results,
"pass": all_pass
}
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump(report, f, indent=2)
# 5. Stampa risultati
print("\n=== Evaluation Report ===")
for name, detail in results.items():
status = "PASS" if detail["pass"] else "FAIL"
print(f" {name}: {detail['value']:.4f} (threshold: {detail['threshold']}) [{status}]")
print(f"\nOverall: {'PASS' if all_pass else 'FAIL'}")
# Exit code non-zero se le metriche non passano
if not all_pass:
print("\nWARNING: Le metriche non raggiungono le soglie minime!")
# Non usiamo exit(1) perchè il workflow legge l'output
if __name__ == "__main__":
main()
훈련 및 임계값 구성
# Configurazione pipeline di training
data:
train_path: "data/raw/reviews.csv"
test_size: 0.2
random_state: 42
model:
algorithm: "logistic_regression"
max_iter: 1000
C: 1.0
random_state: 42
tfidf:
max_features: 15000
ngram_range: [1, 2]
min_df: 3
max_df: 0.9
mlflow:
experiment_name: "sentiment-classifier"
registered_model_name: "sentiment-classifier"
# Soglie minime per approvare il deployment
thresholds:
accuracy: 0.85
f1_score: 0.83
precision: 0.80
recall: 0.80
auc_roc: 0.90
# Confronto con modello in produzione
comparison:
# Il nuovo modello deve migliorare almeno dello 0.5%
min_improvement: 0.005
# Metriche su cui e richiesto il miglioramento
compare_metrics:
- f1_score
- auc_roc
파일 요구 사항
# Core ML
scikit-learn==1.4.0
pandas==2.2.0
numpy==1.26.3
# NLP preprocessing
nltk==3.8.1
# Experiment tracking
mlflow==2.10.0
# Model serving
fastapi==0.109.0
uvicorn==0.27.0
pydantic==2.5.3
# Data validation
pandera==0.18.0
great-expectations==0.18.8
# Data versioning
dvc[s3]==3.42.0
# Configuration
pyyaml==6.0.1
python-dotenv==1.0.1
# Serialization
joblib==1.3.2
# Testing
pytest==7.4.4
httpx==0.26.0
파이프라인의 데이터 검증
모델을 훈련하기 전에 데이터가 유효한지 확인해야 합니다. 손상된 데이터에 대해 학습된 모델은 예측할 수 없는 결과를 생성합니다. 유효성 검사 날짜 파이프라인의 첫 번째 관문: 데이터가 검사에 실패하면 훈련 떠나지 않습니다.
"""Validazione della qualità dei dati con Pandera."""
import argparse
import sys
import yaml
import pandas as pd
import pandera as pa
from pandera import Column, Check, DataFrameSchema
def build_schema(schema_config: dict) -> DataFrameSchema:
"""Costruisce uno schema Pandera dalla configurazione YAML."""
columns = {}
for col_name, col_spec in schema_config["columns"].items():
checks = []
if "min_length" in col_spec:
checks.append(Check.str_length(min_value=col_spec["min_length"]))
if "max_length" in col_spec:
checks.append(Check.str_length(max_value=col_spec["max_length"]))
if "allowed_values" in col_spec:
checks.append(Check.isin(col_spec["allowed_values"]))
if "min_value" in col_spec:
checks.append(Check.greater_than_or_equal_to(col_spec["min_value"]))
if "max_value" in col_spec:
checks.append(Check.less_than_or_equal_to(col_spec["max_value"]))
columns[col_name] = Column(
dtype=col_spec.get("dtype", "object"),
nullable=col_spec.get("nullable", False),
checks=checks if checks else None
)
return DataFrameSchema(
columns=columns,
coerce=True,
strict=schema_config.get("strict", False)
)
def validate_data(data_path: str, schema_path: str) -> bool:
"""Valida il dataset contro lo schema definito."""
# Carica schema
with open(schema_path) as f:
schema_config = yaml.safe_load(f)
schema = build_schema(schema_config)
# Carica dati
df = pd.read_csv(data_path)
# Controlla dimensione minima
min_rows = schema_config.get("min_rows", 100)
if len(df) < min_rows:
print(f"FAIL: Dataset ha {len(df)} righe, minimo richiesto: {min_rows}")
return False
# Controlla duplicati
max_duplicates_pct = schema_config.get("max_duplicates_pct", 0.05)
duplicates_pct = df.duplicated().mean()
if duplicates_pct > max_duplicates_pct:
print(f"FAIL: {duplicates_pct:.1%} duplicati (max: {max_duplicates_pct:.1%})")
return False
# Valida schema
try:
schema.validate(df, lazy=True)
print(f"PASS: Dataset valido ({len(df)} righe, {len(df.columns)} colonne)")
return True
except pa.errors.SchemaErrors as e:
print(f"FAIL: Schema validation errors:")
print(e.failure_cases.head(20))
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data-path", required=True)
parser.add_argument("--schema-path", required=True)
args = parser.parse_args()
valid = validate_data(args.data_path, args.schema_path)
sys.exit(0 if valid else 1)
# Schema per il dataset di recensioni
min_rows: 1000
max_duplicates_pct: 0.05
strict: false
columns:
review_text:
dtype: "object"
nullable: false
min_length: 10
max_length: 5000
sentiment:
dtype: "int64"
nullable: false
allowed_values: [0, 1]
rating:
dtype: "float64"
nullable: true
min_value: 1.0
max_value: 5.0
review_date:
dtype: "object"
nullable: true
파이프라인에서 DVC를 사용한 데이터 버전 관리
데이터가 Git에 비해 너무 크지만 버전을 관리하고 동기화해야 합니다.
CI/CD 파이프라인에서. DVC(데이터 버전 관리) 이 문제를 해결합니다.
데이터를 원격 스토리지(S3, GCS, Azure Blob)에 저장하고 Git에서만 추적
메타데이터(해시, 크기). GitHub Actions는 다음을 사용할 수 있습니다. dvc pull 에 대한
현재 커밋과 관련된 데이터 버전을 정확하게 다운로드합니다.
stages:
prepare:
cmd: python -m src.data.preprocessing --config config/training.yaml
deps:
- src/data/preprocessing.py
- config/training.yaml
- data/raw/reviews.csv
outs:
- data/processed/train.csv
- data/processed/test.csv
train:
cmd: python train.py --config config/training.yaml
deps:
- train.py
- src/models/trainer.py
- data/processed/train.csv
- config/training.yaml
outs:
- models/model.pkl
metrics:
- metrics/train_metrics.json:
cache: false
evaluate:
cmd: >-
python evaluate.py
--model-path models/model.pkl
--test-data data/processed/test.csv
--thresholds config/thresholds.yaml
--output metrics/eval_metrics.json
deps:
- evaluate.py
- models/model.pkl
- data/processed/test.csv
- config/thresholds.yaml
metrics:
- metrics/eval_metrics.json:
cache: false
# Inizializza DVC
dvc init
# Configura storage remoto (S3 in questo esempio)
dvc remote add -d myremote s3://my-ml-data-bucket/sentiment-classifier
dvc remote modify myremote region eu-west-1
# Traccia il dataset
dvc add data/raw/reviews.csv
# Committa i file DVC in Git
git add data/raw/reviews.csv.dvc data/raw/.gitignore dvc.yaml dvc.lock
git commit -m "feat: add DVC tracking for training data"
# Push dei dati su S3
dvc push
DVC + GitHub 작업: 작동 방식
- Git 트랙 파일
.dvc(메타데이터: SHA256 해시, 크기) - 실제 데이터는 S3(또는 GCS, Azure Blob, Google Drive)에 있습니다.
- GitHub Actions 워크플로가 실행됩니다.
dvc pull데이터를 다운로드하려면 - S3 자격 증명은 GitHub 비밀을 통해 전달됩니다.
- 각 Git 커밋은 데이터의 정확한 버전에 해당합니다.
MLflow를 사용한 모델 레지스트리
모델 레지스트리(Model Registry)는 모델이 출시된 후 모델의 수명주기를 관리하는 구성 요소입니다. 훈련. 훈련된 각 모델은 이름, 버전 및 상태(스테이징, 프로덕션, 보관됨). CI/CD 파이프라인은 레지스트리와 상호 작용합니다. 검증 임계값을 초과하는 모델을 홍보합니다.
"""Script per la promozione del modello nel registry MLflow."""
import mlflow
from mlflow.tracking import MlflowClient
def promote_model(
model_name: str,
run_id: str,
target_stage: str = "Production"
) -> None:
"""Promuove un modello a Production nel registry."""
client = MlflowClient()
# Cerca la versione del modello associata al run
model_versions = client.search_model_versions(
f"name='{model_name}'"
)
target_version = None
for mv in model_versions:
if mv.run_id == run_id:
target_version = mv.version
break
if target_version is None:
raise ValueError(
f"Nessun modello trovato per run_id={run_id}"
)
# Archivia il modello attualmente in Production
for mv in model_versions:
if mv.current_stage == "Production":
client.transition_model_version_stage(
name=model_name,
version=mv.version,
stage="Archived",
archive_existing_versions=False
)
print(f"Archiviato modello v{mv.version} (precedente Production)")
# Promuovi il nuovo modello
client.transition_model_version_stage(
name=model_name,
version=target_version,
stage=target_stage
)
print(f"Promosso modello v{target_version} a {target_stage}")
def load_production_model(model_name: str):
"""Carica il modello attualmente in Production."""
model_uri = f"models:/{model_name}/Production"
return mlflow.sklearn.load_model(model_uri)
모델 서명 및 입력 예
모델 서명은 모델 입력 및 출력 패턴을 문서화합니다. 이는 문서화 및 자동 검증 역할을 합니다. 다른 스키마를 사용하여 입력을 전달하려고 하면 MLflow에서 명확한 오류가 발생합니다.
import mlflow
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec
# Definisci la signature esplicita
input_schema = Schema([
ColSpec("string", "text")
])
output_schema = Schema([
ColSpec("long", "prediction")
])
signature = ModelSignature(
inputs=input_schema,
outputs=output_schema
)
# Esempio di input per documentazione
input_example = {
"text": "This product is excellent, highly recommended!"
}
# Registra il modello con signature e esempio
mlflow.sklearn.log_model(
sk_model=trained_pipeline,
artifact_path="model",
signature=signature,
input_example=input_example,
registered_model_name="sentiment-classifier"
)
ML 파이프라인에서 테스트
ML 테스트는 기존 소프트웨어 테스트보다 더 복잡합니다. 확인하는 것만으로는 충분하지 않습니다. 코드가 "작동"하는지 확인하려면 데이터 품질, 데이터의 정확성을 테스트해야 합니다. 전처리, 훈련 안정성, API 동작 제공. 는 CI/CD 파이프라인은 세 가지 수준의 테스트를 수행합니다.
전처리를 위한 단위 테스트
"""Unit test per il modulo di preprocessing."""
import pytest
import pandas as pd
from src.data.preprocessing import clean_text, load_and_preprocess_data
class TestCleanText:
"""Test per la funzione clean_text."""
def test_removes_html_tags(self):
assert clean_text("<p>Hello</p>") == "hello"
def test_removes_urls(self):
assert clean_text("Visit http://example.com for info") == "visit for info"
def test_removes_special_characters(self):
assert clean_text("Hello!!! World???") == "hello world"
def test_normalizes_whitespace(self):
assert clean_text("Hello World") == "hello world"
def test_lowercases(self):
assert clean_text("HELLO WORLD") == "hello world"
def test_empty_string(self):
assert clean_text("") == ""
def test_none_input(self):
assert clean_text(None) == ""
def test_numeric_preserved(self):
assert clean_text("Rating 5 out of 10") == "rating 5 out of 10"
class TestLoadAndPreprocess:
"""Test per il caricamento e preprocessing dei dati."""
def test_missing_columns_raises(self, tmp_path):
"""Verifica che colonne mancanti generino un errore."""
df = pd.DataFrame({"wrong_col": ["text"]})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
with pytest.raises(ValueError, match="Colonne mancanti"):
load_and_preprocess_data(str(csv_path))
def test_drops_na_rows(self, tmp_path):
"""Verifica che le righe con NA vengano rimosse."""
df = pd.DataFrame({
"review_text": ["Good product", None, "Bad product"],
"sentiment": [1, 0, 0]
})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
X, y = load_and_preprocess_data(str(csv_path))
assert len(X) == 2
def test_output_types(self, tmp_path):
"""Verifica i tipi di output."""
df = pd.DataFrame({
"review_text": ["Great product", "Terrible service"],
"sentiment": [1, 0]
})
csv_path = tmp_path / "test.csv"
df.to_csv(csv_path, index=False)
X, y = load_and_preprocess_data(str(csv_path))
assert isinstance(X, pd.Series)
assert isinstance(y, pd.Series)
파이프라인 통합 테스트
"""Integration test per il training e la valutazione del modello."""
import pytest
import pandas as pd
from src.models.trainer import create_pipeline, train_model, evaluate_model
@pytest.fixture
def sample_data():
"""Crea un dataset di test sintetico."""
texts = [
"amazing product love it", "terrible waste of money",
"great quality recommended", "horrible experience never again",
"excellent value for price", "poor quality disappointed",
"best purchase ever made", "worst product i bought",
"fantastic results happy", "awful terrible regret buying",
] * 10 # Ripetuto per avere abbastanza dati
sentiments = [1, 0, 1, 0, 1, 0, 1, 0, 1, 0] * 10
return pd.Series(texts), pd.Series(sentiments)
class TestPipeline:
"""Test per la pipeline di training."""
def test_create_pipeline_logistic(self):
"""Verifica creazione pipeline con LogisticRegression."""
config = {"algorithm": "logistic_regression", "max_iter": 100}
pipeline = create_pipeline(config)
assert len(pipeline.steps) == 2
def test_create_pipeline_invalid_algorithm(self):
"""Verifica errore con algoritmo non supportato."""
with pytest.raises(ValueError, match="non supportato"):
create_pipeline({"algorithm": "invalid_algo"})
def test_train_and_evaluate(self, sample_data):
"""Test end-to-end: training + evaluation."""
X, y = sample_data
config = {
"algorithm": "logistic_regression",
"max_iter": 200,
"random_state": 42
}
pipeline = create_pipeline(config)
trained = train_model(pipeline, X[:80], y[:80])
metrics = evaluate_model(trained, X[80:], y[80:])
assert "accuracy" in metrics
assert "f1_score" in metrics
assert 0.0 <= metrics["accuracy"] <= 1.0
assert 0.0 <= metrics["f1_score"] <= 1.0
def test_model_deterministic(self, sample_data):
"""Verifica che il training sia deterministico con seed fisso."""
X, y = sample_data
config = {
"algorithm": "logistic_regression",
"max_iter": 200,
"random_state": 42
}
p1 = train_model(create_pipeline(config), X[:80], y[:80])
p2 = train_model(create_pipeline(config), X[:80], y[:80])
m1 = evaluate_model(p1, X[80:], y[80:])
m2 = evaluate_model(p2, X[80:], y[80:])
assert m1["accuracy"] == m2["accuracy"]
서빙을 위한 연기 테스트
"""Smoke test per l'API di serving FastAPI."""
import pytest
from httpx import AsyncClient, ASGITransport
from src.serving.app import app
@pytest.fixture
def client():
"""Client HTTP per testare l'API."""
transport = ASGITransport(app=app)
return AsyncClient(transport=transport, base_url="http://test")
@pytest.mark.asyncio
async def test_health_endpoint(client):
"""Verifica che l'endpoint /health risponda 200."""
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
@pytest.mark.asyncio
async def test_predict_positive(client):
"""Verifica predizione per testo positivo."""
response = await client.post(
"/predict",
json={"text": "This product is amazing, I love it!"}
)
assert response.status_code == 200
data = response.json()
assert "prediction" in data
assert "confidence" in data
assert data["confidence"] > 0.0
@pytest.mark.asyncio
async def test_predict_empty_text(client):
"""Verifica errore con testo vuoto."""
response = await client.post(
"/predict",
json={"text": ""}
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_predict_batch(client):
"""Verifica predizione batch."""
response = await client.post(
"/predict/batch",
json={"texts": [
"Great product",
"Terrible experience"
]}
)
assert response.status_code == 200
data = response.json()
assert len(data["predictions"]) == 2
# Aggiungere questo job prima del training nel workflow
tests:
name: Run Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest-asyncio pytest-cov
- name: Run unit tests
run: pytest tests/ -v --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v4
with:
file: coverage.xml
FastAPI로 API 제공
학습된 모델은 REST API를 통해 액세스할 수 있어야 합니다. FastAPI 및 선택 Python에서 ML을 제공하는 데 이상적이며 빠르며 다음을 통한 자동 입력 검증 기능이 있습니다. Pydantic은 OpenAPI 문서를 자동으로 생성합니다.
"""API di serving per il classificatore di sentiment."""
import os
import time
import joblib
from pathlib import Path
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from src.serving.schemas import (
PredictionRequest, PredictionResponse,
BatchPredictionRequest, BatchPredictionResponse,
HealthResponse
)
from src.data.preprocessing import clean_text
# Variabili globali per il modello
model_pipeline = None
model_version = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Carica il modello all'avvio dell'applicazione."""
global model_pipeline, model_version
model_path = os.getenv("MODEL_PATH", "models/model.pkl")
if not Path(model_path).exists():
raise RuntimeError(f"Modello non trovato: {model_path}")
model_pipeline = joblib.load(model_path)
model_version = os.getenv("MODEL_VERSION", "unknown")
print(f"Modello caricato: v{model_version}")
yield
model_pipeline = None
app = FastAPI(
title="Sentiment Classifier API",
version="1.0.0",
lifespan=lifespan
)
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""Health check dell'API."""
return HealthResponse(
status="healthy" if model_pipeline is not None else "unhealthy",
model_version=model_version or "not loaded"
)
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Predizione singola."""
if model_pipeline is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.time()
cleaned = clean_text(request.text)
if not cleaned:
raise HTTPException(status_code=422, detail="Text is empty after cleaning")
prediction = model_pipeline.predict([cleaned])[0]
probabilities = model_pipeline.predict_proba([cleaned])[0]
confidence = float(max(probabilities))
latency_ms = (time.time() - start) * 1000
return PredictionResponse(
prediction=int(prediction),
confidence=confidence,
label="positive" if prediction == 1 else "negative",
latency_ms=round(latency_ms, 2)
)
@app.post("/predict/batch", response_model=BatchPredictionResponse)
async def predict_batch(request: BatchPredictionRequest):
"""Predizione batch su più testi."""
if model_pipeline is None:
raise HTTPException(status_code=503, detail="Model not loaded")
start = time.time()
cleaned_texts = [clean_text(t) for t in request.texts]
predictions = model_pipeline.predict(cleaned_texts)
probabilities = model_pipeline.predict_proba(cleaned_texts)
latency_ms = (time.time() - start) * 1000
results = []
for i, text in enumerate(request.texts):
results.append(PredictionResponse(
prediction=int(predictions[i]),
confidence=float(max(probabilities[i])),
label="positive" if predictions[i] == 1 else "negative",
latency_ms=0
))
return BatchPredictionResponse(
predictions=results,
total_latency_ms=round(latency_ms, 2)
)
"""Pydantic schemas per l'API di serving."""
from pydantic import BaseModel, Field
from typing import List
class PredictionRequest(BaseModel):
text: str = Field(..., min_length=1, max_length=5000,
description="Testo della recensione")
class PredictionResponse(BaseModel):
prediction: int = Field(..., description="0=negativo, 1=positivo")
confidence: float = Field(..., ge=0.0, le=1.0,
description="Confidenza della predizione")
label: str = Field(..., description="Label leggibile")
latency_ms: float = Field(..., description="Latenza in millisecondi")
class BatchPredictionRequest(BaseModel):
texts: List[str] = Field(..., min_length=1, max_length=100,
description="Lista di testi")
class BatchPredictionResponse(BaseModel):
predictions: List[PredictionResponse]
total_latency_ms: float
class HealthResponse(BaseModel):
status: str
model_version: str
배포 후 모니터링
배포는 파이프라인의 끝이 아니라 가장 중요한 단계의 시작입니다. 생산 중 모니터링. ML 모델은 다음과 같은 경우 자동으로 성능이 저하될 수 있습니다. 실제 데이터는 훈련 데이터와 다릅니다. 파이프라인에는 상태 확인이 포함되어야 합니다. 연속, 예측 로깅 및 자동 재교육 트리거.
"""Monitoring post-deployment per il modello ML."""
import time
import logging
from datetime import datetime, timedelta
from collections import deque
from typing import Dict, Optional
from dataclasses import dataclass, field
@dataclass
class PredictionLog:
"""Log di una singola predizione."""
timestamp: datetime
input_text: str
prediction: int
confidence: float
latency_ms: float
class ModelMonitor:
"""Monitora le performance del modello in produzione."""
def __init__(
self,
window_size: int = 1000,
min_confidence_threshold: float = 0.6,
max_latency_ms: float = 500.0,
drift_check_interval: int = 100
):
self.window_size = window_size
self.min_confidence = min_confidence_threshold
self.max_latency = max_latency_ms
self.drift_check_interval = drift_check_interval
self.predictions: deque = deque(maxlen=window_size)
self.alert_callbacks = []
self.prediction_count = 0
self.logger = logging.getLogger("model_monitor")
def log_prediction(self, log: PredictionLog) -> None:
"""Registra una predizione e verifica le metriche."""
self.predictions.append(log)
self.prediction_count += 1
# Check latenza
if log.latency_ms > self.max_latency:
self._alert(
"HIGH_LATENCY",
f"Latenza {log.latency_ms:.0f}ms supera soglia {self.max_latency}ms"
)
# Check confidenza bassa
if log.confidence < self.min_confidence:
self._alert(
"LOW_CONFIDENCE",
f"Confidenza {log.confidence:.2f} sotto soglia {self.min_confidence}"
)
# Check periodico per drift
if self.prediction_count % self.drift_check_interval == 0:
self._check_distribution_drift()
def get_metrics(self) -> Dict:
"""Restituisce le metriche correnti della finestra."""
if not self.predictions:
return {"status": "no_data"}
recent = list(self.predictions)
confidences = [p.confidence for p in recent]
latencies = [p.latency_ms for p in recent]
predictions = [p.prediction for p in recent]
positive_rate = sum(1 for p in predictions if p == 1) / len(predictions)
return {
"total_predictions": self.prediction_count,
"window_size": len(recent),
"avg_confidence": sum(confidences) / len(confidences),
"min_confidence": min(confidences),
"avg_latency_ms": sum(latencies) / len(latencies),
"p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)],
"positive_rate": positive_rate,
"low_confidence_pct": sum(
1 for c in confidences if c < self.min_confidence
) / len(confidences),
}
def _check_distribution_drift(self) -> None:
"""Verifica se la distribuzione delle predizioni e cambiata."""
if len(self.predictions) < self.window_size:
return
recent = list(self.predictions)
half = len(recent) // 2
first_half = [p.prediction for p in recent[:half]]
second_half = [p.prediction for p in recent[half:]]
rate_first = sum(first_half) / len(first_half)
rate_second = sum(second_half) / len(second_half)
# Se la distribuzione cambia più del 15%, segnala drift
if abs(rate_first - rate_second) > 0.15:
self._alert(
"DISTRIBUTION_DRIFT",
f"Positive rate cambiato: {rate_first:.2f} -> {rate_second:.2f}"
)
def _alert(self, alert_type: str, message: str) -> None:
"""Invia un alert."""
self.logger.warning(f"[{alert_type}] {message}")
for callback in self.alert_callbacks:
callback(alert_type, message)
모니터링할 주요 지표
- 지연 시간(p50, p95, p99): API 응답 시간
- 처리량: 초당 예측 수
- 예측 분포: 긍정/부정 비율의 변화
- 중간 신뢰도: 감소는 모델이 "불확실"함을 나타냅니다.
- 오류율: HTTP 오류율 5xx
- 날짜 드리프트: 생산 데이터와 훈련 데이터의 차이
비용 및 최적화
GitHub Actions는 공개 저장소의 경우 월 2,000분, 저장소의 경우 월 500분을 무료로 제공합니다. 개인 저장소(무료 플랜). ML 훈련은 이러한 시간을 빠르게 소비할 수 있습니다. 최적화하는 방법은 다음과 같습니다.
캐싱 전략
# Cache delle dipendenze Python
- name: Cache pip packages
uses: actions/cache@v4
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('requirements.txt') }}
restore-keys: |
${{ runner.os }}-pip-
# Cache del dataset DVC (evita download ripetuti)
- name: Cache DVC data
uses: actions/cache@v4
with:
path: |
data/
.dvc/cache/
key: dvc-${{ hashFiles('data/*.dvc', 'dvc.lock') }}
restore-keys: |
dvc-
# Cache della Docker layer
- name: Build with cache
uses: docker/build-push-action@v5
with:
context: .
cache-from: type=gha
cache-to: type=gha,mode=max
GPU를 갖춘 자체 호스팅 실행기
GPU 교육의 경우 GitHub에서 호스팅하는 실행기만으로는 충분하지 않습니다(GPU가 없음). 해결책 GPU가 있는 머신의 자체 호스팅 실행기입니다. 이는 또한 분당 비용을 제거합니다. GitHub Actions로.
# 1. Sulla macchina con GPU, scarica il runner
mkdir actions-runner && cd actions-runner
curl -o actions-runner.tar.gz -L \
https://github.com/actions/runner/releases/download/v2.311.0/actions-runner-linux-x64-2.311.0.tar.gz
tar xzf actions-runner.tar.gz
# 2. Configura il runner
./config.sh --url https://github.com/YOUR_ORG/YOUR_REPO \
--token YOUR_TOKEN \
--labels gpu,cuda12,ml-training
# 3. Installa come servizio
sudo ./svc.sh install
sudo ./svc.sh start
training:
name: Train Model (GPU)
runs-on: [self-hosted, gpu, cuda12]
# Il job viene eseguito sulla macchina con GPU
steps:
- uses: actions/checkout@v4
- name: Train with GPU
run: |
nvidia-smi # Verifica GPU disponibile
python train.py --config config/training-gpu.yaml --device cuda
시나리오에 대한 비용 추정
| 대본 | 분/월 | GitHub Actions 비용 | 총 비용 |
|---|---|---|---|
| 프로토타입(수동 학습, scikit-learn) | ~200 | 무료(무료 플랜) | ~0 EUR/월 |
| PMI(주간 훈련, 중간 모델) | ~800 | ~12 EUR/월 | ~50 EUR/월(S3 사용) |
| 확장(일일 학습, 딥 러닝) | ~3,000 | ~48 EUR/월 | ~200 EUR/월(클라우드 GPU 사용) |
| 엔터프라이즈(다중 모델, 지속적인 교육) | ~10,000+ | 자체 호스팅 실행기 | ~500+ EUR/월 |
ML용 CI/CD 도구 비교
GitHub Actions가 유일한 옵션은 아닙니다. 각 도구는 그에 따라 특정한 장점을 가지고 있습니다. 맥락의. 다음은 선택에 도움이 되는 실제 비교입니다.
| 특성 | GitHub 작업 | GitLab CI | 젠킨스 | 아르고 워크플로우 |
|---|---|---|---|---|
| 설정 | 제로(통합) | 제로(통합) | 전용 서버 | Kubernetes 클러스터 |
| GPU 지원 | 자체 호스팅 실행기 | 자체 호스팅 실행기 | NVIDIA 플러그인 | 네이티브(K8s GPU) |
| 비용(소규모 팀) | 무료/낮음 | 무료/낮음 | 서버 비용 | K8s 클러스터 비용 |
| 병행 | 양호(매트릭스) | 좋은 | 최적 | 훌륭함(DAG) |
| 코드로서의 파이프라인 | YAML | YAML | 그루비/YAML | YAML/파이썬 SDK |
| ML 생태계 | 광대한 시장 | 좋은 | 플러그인 | 클라우드 네이티브 |
| 학습 곡선 | 낮은 | 낮은 | 평균 | 높은 |
| 다음에 이상적입니다. | 소규모/중간 팀, GitHub 레포 | 자체 관리되는 GitLab 팀 | 기업, 온프레미스 | 팀 K8, 복잡한 파이프라인 |
어느 것을 선택할까요?
- 시작 및 프로토타입: GitHub Actions - 제로 설정, 기본 통합, 공개 저장소에 무료
- GitLab 팀: GitLab CI - 우수한 레지스트리 컨테이너인 GitLab과의 기본 통합
- 기업 온프레미스: Jenkins - 최대의 유연성, 성숙한 플러그인 생태계
- K8을 갖춘 클라우드 네이티브 팀: Argo Workflows - DAG 파이프라인, 기본 확장, 복잡한 ML에 적합
연간 5,000 EUR 미만으로 설정 완료
완전한 ML CI/CD 파이프라인을 구현하려는 SMB의 경우 가능합니다. 오픈 소스 도구 및 클라우드 서비스를 사용하여 연간 5,000 EUR 미만 유지 무료 또는 저가 계층이 있습니다. 권장되는 스택은 다음과 같습니다.
| 요소 | 도구 | 비용/년 | 메모 |
|---|---|---|---|
| 리포지토리 + CI/CD | GitHub(팀) | ~400유로 | 3,000분/월 작업 포함 |
| 데이터 저장 | AWS S3 | ~120유로 | ~500GB 데이터 세트, 전송 포함 |
| 실험 추적 | MLflow(자체 호스팅) | ~0유로 | 클라우드 VM에 배포된 오픈 소스 |
| 모델 레지스트리 | MLflow 모델 레지스트리 | ~0유로 | MLflow에 포함됨 |
| 컨테이너 레지스트리 | GitHub 컨테이너 레지스트리 | ~0유로 | GitHub에 포함됨 |
| 호스팅 모델 | 클라우드 VM(e2-medium) | ~500유로 | FastAPI + MLflow 서버 제공용 |
| 데이터 버전 관리 | DVC | ~0유로 | 위에서 이미 계산된 오픈 소스, 스토리지 |
| 모니터링 | 프로메테우스 + 그라파나 | ~0유로 | 동일한 VM의 오픈 소스 |
| 데이터 검증 | 판데라 / 큰 기대 | ~0유로 | 오픈 소스 |
| GPU 훈련(수시) | Cloud GPU 스팟 인스턴스 | ~600유로 | ~50시간/월 T4 스팟 |
추정 총액: ~1,620 EUR/년 - 예산이 5,000유로 미만, 스케일링을 위한 여유가 있습니다. 가장 큰 비용은 GPU 교육입니다. 모델을 사용하는 경우 클래식(scikit-learn, XGBoost) GPU 비용은 0이 됩니다.
숨겨진 비용을 조심하세요
도구 비용은 일부일뿐입니다. 가장 중요한 비용과 팀 시간: 숙련된 엔지니어의 경우 초기 파이프라인을 설정하는 데 약 2~4주가 걸립니다. 지속적인 유지 관리에는 주당 약 2~4시간이 소요됩니다. 비용도 계산해 보세요 대규모 데이터 세트로 인해 빠르게 증가할 수 있는 클라우드 서비스 간 데이터 전송(송신).
결론 및 다음 단계
이 기사에서 우리는 기계 학습을 위한 완전한 CI/CD 파이프라인을 구축했습니다. 다단계 Dockerfile부터 GitHub Actions 워크플로까지 데이터 검증, 교육, 평가 및 자동 배포. 기억해야 할 주요 개념은 다음과 같습니다.
- ML용 CI/CD는 세 가지 아티팩트를 처리합니다. (코드, 데이터, 모델) 및 지속적인 학습 소개
- 다단계 도커 최적화된 이미지를 위한 별도의 빌드, 학습 및 제공
- GitHub 작업 종속 및 조건부 작업으로 전체 파이프라인을 조정합니다.
- 데이터 검증 첫 번째 관문: 손상된 데이터는 사용할 수 없는 모델을 생성합니다.
- 모델 레지스트리 템플릿 릴리스 및 프로모션을 관리합니다.
- ML 테스트 단위, 통합, 연기 테스트의 세 가지 수준을 다룹니다.
- 모니터링 배포 후 드리프트 및 성능 저하를 감지하는 데 중요합니다.
- 비용은 관리 가능합니다. 오픈 소스 도구를 사용하면 연간 2,000 EUR 미만으로 유지됩니다.
우리가 구축한 파이프라인은 성숙도 모델의 레벨 2 구글 MLOps: 교육 및 배포의 완전한 자동화 통합 검증 및 모니터링. 이를 토대로 다음 글에서 우리는 더 깊이 탐구할 것입니다 MLflow 고급 실험 추적을 위해 모델 레지스트리 그리고 유물 관리.
시리즈 로드맵
- 제1조: MLOps: 실험부터 생산까지(완료)
- 제2조: CI/CD가 포함된 ML 파이프라인: GitHub Actions + Docker(이 문서)
- 제3조: MLflow 심층 분석 - 실험 추적 및 모델 레지스트리
- 제4조: DVC - ML용 데이터 버전 관리
- 제5조: FastAPI 및 Docker를 사용한 확장 가능한 모델 제공
- 제6조: ML용 Kubernetes: 오케스트레이션 및 확장
- 제7조: 고급 모니터링: 데이터 드리프트와 AI
- 제8조: 프로덕션의 ML 모델에 대한 A/B 테스트
- 제9조: 거버넌스, 규정 준수 및 책임 있는 ML
- 제10조: 사례 연구: 엔드투엔드 MLOps 파이프라인







