Benchmark a optimalizace: od 48GB GPU do 8GB RTX
Máš modelku. Běží na 80GB A100. Ale musíte to nasadit na 24GB RTX 3090, nebo na notebooku RTX 4060 8GB, nebo dokonce na Raspberry Pi. Jak víš kolik Ztrácíte přesnost přechodem z FP32 na INT4? Jakou rychlost získáte s Flash Attention? Vyplatí se to kvantifikovat nebo je lepší destilovat? Kolik paměti ušetří kontrola přechodu?
Bez systematického benchmarkingu zůstanou tyto otázky nezodpovězeny – a skončí provádění suboptimálních voleb na základě intuice nebo publikovaných benchmarků s konfiguracemi odlišný od vašeho vlastního. V tomto posledním článku série vytváříme rámec komplexní benchmarking na míru každá velikost výkon: paměť, latence, propustnost, přesnost a spotřeba energie.
Poté systematicky aplikujeme všechny techniky uvedené v sérii — kvantování, prořezávání, destilace, Flash Attention, přechodový kontrolní bod, smíšená přesnost — a ukážeme, jak přejít od modelu, který vyžaduje 48 GB, k modelu, který běží s 8 GB, s metrikami, které přesně ukazují, za co platíte z hlediska kvality.
Co se naučíte
- Systematický benchmarkingový rámec pro DL modely
- Změřte přesně VRAM, latenci, propustnost a FLOP
- Mixed Precision Training: FP16 vs BF16 vs FP32
- Flash Attention 2/3: kolik ušetříte a kdy to použít
- Gradient Checkpointing: kompromis mezi pamětí a počítačem
- Akumulace gradientu: Prakticky velké velikosti dávek
- Torch.compile a optimalizace běhu
- KV Cache: Optimalizace pro LLM autoregresivní inference
- Systematické srovnání: všechny techniky byly porovnány
- Pokyny pro rozhodování: která optimalizace pro který scénář
Systematic Benchmarking Framework
Před optimalizací je potřeba přesně změřit. Srovnávací rámec profesionální měření: špičkové využití VRAM, průměrná latence a P95, propustnost (token/s nebo img/s), FLOPs, spotřeba energie a přesnost na konkrétní úkoly. Klíčem je reprodukovatelnost: Benchmarky, které se mezi běhy liší o 10 %, jsou k ničemu.
import torch
import torch.nn as nn
import time
import numpy as np
from dataclasses import dataclass, asdict
from typing import Optional, Callable
import gc
# ============================================================
# DATACLASS PER RISULTATI BENCHMARK
# ============================================================
@dataclass
class BenchmarkResult:
"""Risultati completi di un benchmark."""
name: str
# Memoria
vram_allocated_mb: float
vram_reserved_mb: float
vram_peak_mb: float
# Velocita
latency_ms_mean: float
latency_ms_p50: float
latency_ms_p95: float
latency_ms_p99: float
throughput_per_sec: float
# Modello
params_total: int
params_trainable: int
model_size_mb: float
# Opzionali
accuracy: Optional[float] = None
flops_total: Optional[float] = None
power_watts: Optional[float] = None
def print_summary(self):
print(f"\n=== {self.name} ===")
print(f" VRAM: {self.vram_peak_mb:.0f} MB peak, {self.vram_allocated_mb:.0f} MB alloc")
print(f" Latenza: {self.latency_ms_mean:.1f}ms mean, "
f"{self.latency_ms_p95:.1f}ms p95, {self.latency_ms_p99:.1f}ms p99")
print(f" Throughput: {self.throughput_per_sec:.1f}/s")
print(f" Parametri: {self.params_total:,} ({self.model_size_mb:.1f} MB)")
if self.accuracy:
print(f" Accuratezza: {self.accuracy:.4f}")
# ============================================================
# CLASSE PRINCIPALE DI BENCHMARKING
# ============================================================
class DeepLearningBenchmark:
def __init__(self, device: str = "cuda"):
self.device = device
self.results = []
def _count_params(self, model: nn.Module) -> tuple:
total = sum(p.numel() for p in model.parameters())
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
return total, trainable
def _model_size_mb(self, model: nn.Module) -> float:
total_bytes = sum(p.numel() * p.element_size() for p in model.parameters())
return total_bytes / (1024 ** 2)
def _reset_memory(self):
"""Reset GPU memory per benchmark pulito."""
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
def benchmark_inference(
self,
name: str,
model: nn.Module,
input_fn: Callable[[], tuple],
n_warmup: int = 10,
n_runs: int = 100,
batch_size: int = 1
) -> BenchmarkResult:
"""
Benchmark completo di inferenza.
input_fn: funzione che restituisce input per il modello
"""
model = model.to(self.device).eval()
self._reset_memory()
# Warmup
with torch.no_grad():
for _ in range(n_warmup):
inputs = input_fn()
if isinstance(inputs, dict):
model(**{k: v.to(self.device) for k, v in inputs.items()})
else:
model(inputs.to(self.device))
# Misura memoria post-warmup
if torch.cuda.is_available():
mem_alloc = torch.cuda.memory_allocated() / (1024**2)
mem_reserved = torch.cuda.memory_reserved() / (1024**2)
# Benchmark vero
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies = []
for _ in range(n_runs):
inputs = input_fn()
t0 = time.perf_counter()
with torch.no_grad():
if isinstance(inputs, dict):
_ = model(**{k: v.to(self.device) for k, v in inputs.items()})
else:
_ = model(inputs.to(self.device))
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies.append((time.perf_counter() - t0) * 1000)
if torch.cuda.is_available():
mem_peak = torch.cuda.max_memory_allocated() / (1024**2)
else:
mem_alloc = mem_reserved = mem_peak = 0.0
latencies = np.array(latencies)
total_params, trainable_params = self._count_params(model)
result = BenchmarkResult(
name=name,
vram_allocated_mb=mem_alloc,
vram_reserved_mb=mem_reserved,
vram_peak_mb=mem_peak,
latency_ms_mean=float(np.mean(latencies)),
latency_ms_p50=float(np.percentile(latencies, 50)),
latency_ms_p95=float(np.percentile(latencies, 95)),
latency_ms_p99=float(np.percentile(latencies, 99)),
throughput_per_sec=1000 / np.mean(latencies) * batch_size,
params_total=total_params,
params_trainable=trainable_params,
model_size_mb=self._model_size_mb(model)
)
result.print_summary()
self.results.append(result)
return result
def benchmark_training_step(
self,
name: str,
model: nn.Module,
optimizer: torch.optim.Optimizer,
loss_fn: Callable,
input_fn: Callable,
n_steps: int = 50
) -> dict:
"""Benchmark di un singolo step di training."""
model = model.to(self.device).train()
self._reset_memory()
latencies = []
for step in range(n_steps):
inputs, labels = input_fn()
inputs = inputs.to(self.device)
labels = labels.to(self.device)
t0 = time.perf_counter()
optimizer.zero_grad()
outputs = model(inputs)
loss = loss_fn(outputs, labels)
loss.backward()
optimizer.step()
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies.append((time.perf_counter() - t0) * 1000)
return {
"name": name,
"vram_peak_mb": torch.cuda.max_memory_allocated() / (1024**2) if torch.cuda.is_available() else 0,
"step_ms_mean": float(np.mean(latencies[5:])), # Skip warmup
"step_ms_p95": float(np.percentile(latencies[5:], 95))
}
def compare_results(self) -> None:
"""Stampa tabella comparativa di tutti i risultati."""
if not self.results:
print("Nessun risultato disponibile.")
return
baseline = self.results[0]
print(f"\n{'Config':<30} {'VRAM (MB)':>12} {'Latency (ms)':>14} {'Throughput':>12} {'Speedup':>10}")
print("-" * 82)
for r in self.results:
speedup = baseline.latency_ms_mean / r.latency_ms_mean
print(f"{r.name:<30} {r.vram_peak_mb:>12.0f} {r.latency_ms_mean:>14.2f} "
f"{r.throughput_per_sec:>12.1f} {speedup:>10.2f}x")
# Uso:
bench = DeepLearningBenchmark(device="cuda" if torch.cuda.is_available() else "cpu")
print("Framework di benchmarking inizializzato")
Smíšená přesnost: FP32 vs FP16 vs BF16
Il smíšený trénink přesnosti a první optimalizace, která umožní: téměř
nulová režie na konfiguraci, 2x úspora paměti, často 2-3x zrychlení hardwaru
Ampér+. torch.autocast automaticky řídí, které operace se mají provést
ve snížené přesnosti.
Klíčový rozdíl mezi FP16 a BF16 a binárním formátem: FP16 má 5 bitů pro exponent a 10 pro mantisu (rozsah 6e-5 až 6,5e4), zatímco BF16 má 8 bitů pro exponent a 7 pro mantisa (stejný rozsah jako FP32, od 1,2e-38 do 3,4e38). BF16 a mnohem stabilnější během trénink, protože nezpůsobuje přetečení/podtečení s velkými gradienty.
import torch
import torch.nn as nn
from torch.cuda.amp import GradScaler
# ============================================================
# CONFRONTO FP32 vs FP16 vs BF16
# ============================================================
def train_step_fp32(model, optimizer, imgs, labels, criterion):
"""Training step standard FP32."""
optimizer.zero_grad()
output = model(imgs)
loss = criterion(output, labels)
loss.backward()
optimizer.step()
return loss.item()
def train_step_fp16(model, optimizer, imgs, labels, criterion, scaler: GradScaler):
"""
Training step con AMP FP16.
GradScaler necessario: FP16 ha range limitato, loss scaling evita underflow.
"""
optimizer.zero_grad()
with torch.autocast(device_type="cuda", dtype=torch.float16):
output = model(imgs)
loss = criterion(output, labels)
# Scala la loss per evitare underflow in FP16
scaler.scale(loss).backward()
# Decomprime gradienti prima di clip
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
# Aggiorna pesi (salta se ci sono NaN/Inf nei gradienti)
scaler.step(optimizer)
scaler.update()
return loss.item()
def train_step_bf16(model, optimizer, imgs, labels, criterion):
"""
Training step con BF16.
BF16 NON richiede GradScaler: ha range dinamico uguale a FP32.
Disponibile su: A100, RTX 3000+, Apple M-series.
"""
optimizer.zero_grad()
with torch.autocast(device_type="cuda", dtype=torch.bfloat16):
output = model(imgs)
loss = criterion(output, labels)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
return loss.item()
# Benchmark comparativo
from torchvision import models
import time, gc
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def compare_precisions(model_fn=models.resnet50, n_steps=100,
batch_size=32, img_size=224):
"""Confronta FP32, FP16, BF16 per training e inferenza."""
criterion = nn.CrossEntropyLoss()
configs = [
("FP32", torch.float32, False),
("FP16", torch.float16, True), # Richiede GradScaler
("BF16", torch.bfloat16, False) # No GradScaler
]
results = {}
for name, dtype, use_scaler in configs:
model = model_fn(pretrained=False).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scaler = GradScaler() if use_scaler else None
# Reset memory stats
torch.cuda.reset_peak_memory_stats() if torch.cuda.is_available() else None
gc.collect()
torch.cuda.empty_cache() if torch.cuda.is_available() else None
timings = []
for step in range(n_steps):
imgs = torch.randn(batch_size, 3, img_size, img_size, device=device)
labels = torch.randint(0, 1000, (batch_size,), device=device)
t0 = time.perf_counter()
with torch.autocast(device_type="cuda", dtype=dtype, enabled=(dtype != torch.float32)):
out = model(imgs)
loss = criterion(out, labels)
if scaler:
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
else:
optimizer.zero_grad()
loss.backward()
optimizer.step()
torch.cuda.synchronize() if torch.cuda.is_available() else None
timings.append((time.perf_counter() - t0) * 1000)
vram_peak = torch.cuda.max_memory_allocated() / (1024**2) if torch.cuda.is_available() else 0
results[name] = {
"vram_mb": round(vram_peak, 1),
"step_ms": round(np.mean(timings[10:]), 2),
"throughput_imgs_s": round(batch_size * 1000 / np.mean(timings[10:]), 1)
}
print(f"{name}: VRAM={vram_peak:.0f}MB, {np.mean(timings[10:]):.1f}ms/step, "
f"{batch_size*1000/np.mean(timings[10:]):.0f} img/s")
return results
# Risultati tipici ResNet-50 BS=32 su RTX 4090:
# FP32: VRAM=6200MB, 95ms/step, 336 img/s
# FP16: VRAM=3100MB, 41ms/step, 780 img/s (2x velocità, 50% VRAM)
# BF16: VRAM=3100MB, 38ms/step, 842 img/s (2.2x velocità, 50% VRAM)
Flash Attention: Optimalizace, která mění pravidla
Blesková pozornost (Dao et al., 2022) a možná nejefektivnější optimalizace pro Transformers posledních let. Přeformulujte výpočet pozornosti tak, aby byl IO-bound vědomi: místo zhmotnění kompletní matice pozornosti v HBM (který má v paměti O(n^2) složitost), počítá blokovou pozornost, zatímco zůstává v SRAM. Výsledek: O(n) složitost v paměti místo O(n^2), 2-4x zrychlení na dlouhých sekvencích.
Flash Attention 2 (2023) dále zlepšuje paralelismus na GPU 72 % teoretického využití FP16 FLOPS. Flash Attention 3 (2024) přidává podporu pro optimalizace specifické pro FP8 a Hopper se zrychlením až 2x ve srovnání s FA2.
import torch
import torch.nn as nn
import torch.nn.functional as F
import time, math
# ============================================================
# FLASH ATTENTION vs STANDARD ATTENTION: CONFRONTO
# ============================================================
def standard_attention(q, k, v, scale=None):
"""
Attention standard: materializza la matrice NxN completa in GPU memory.
Complessità memoria: O(N^2 * d_head)
"""
if scale is None:
scale = q.size(-1) ** -0.5
# [B, heads, N, N] - questa matrice può essere ENORME per seq lunghe!
attn = torch.softmax((q @ k.transpose(-2, -1)) * scale, dim=-1)
return attn @ v
def flash_attention_native(q, k, v):
"""
Flash Attention tramite PyTorch 2.0+ scaled_dot_product_attention.
Sceglie automaticamente l'implementazione ottimale:
- FlashAttention-2 se disponibile (CUDA Ampere+)
- Memory-efficient attention (xFormers) come fallback
- Standard attention come ultimo fallback
"""
# Automaticamente ottimizzato da PyTorch
return F.scaled_dot_product_attention(q, k, v, is_causal=False)
def benchmark_attention_implementations(
batch_size=4, n_heads=12, seq_lengths=[512, 1024, 2048, 4096, 8192],
d_head=64, device="cuda"
):
"""
Confronta Standard vs Flash Attention su diverse lunghezze di sequenza.
"""
print(f"{'Seq Len':>10} | {'Standard (ms)':>15} | {'Flash (ms)':>12} | "
f"{'Speedup':>10} | {'VRAM Std (MB)':>15} | {'VRAM Flash (MB)':>15}")
print("-" * 90)
for seq_len in seq_lengths:
q = torch.randn(batch_size, n_heads, seq_len, d_head, device=device, dtype=torch.float16)
k = torch.randn_like(q)
v = torch.randn_like(q)
# Warmup
for _ in range(5):
standard_attention(q, k, v)
flash_attention_native(q, k, v)
# Benchmark Standard
torch.cuda.reset_peak_memory_stats()
torch.cuda.synchronize()
t0 = time.perf_counter()
for _ in range(20):
out_std = standard_attention(q, k, v)
torch.cuda.synchronize()
std_ms = (time.perf_counter() - t0) / 20 * 1000
vram_std = torch.cuda.max_memory_allocated() / (1024**2)
# Benchmark Flash Attention
torch.cuda.reset_peak_memory_stats()
torch.cuda.synchronize()
t0 = time.perf_counter()
for _ in range(20):
out_flash = flash_attention_native(q, k, v)
torch.cuda.synchronize()
flash_ms = (time.perf_counter() - t0) / 20 * 1000
vram_flash = torch.cuda.max_memory_allocated() / (1024**2)
speedup = std_ms / flash_ms
print(f"{seq_len:>10} | {std_ms:>15.2f} | {flash_ms:>12.2f} | "
f"{speedup:>10.2f}x | {vram_std:>15.0f} | {vram_flash:>15.0f}")
# Risultati tipici su RTX 4090 (FP16, B=4, heads=12, d_head=64):
# Seq Len | Standard (ms) | Flash (ms) | Speedup | VRAM Std (MB) | VRAM Flash (MB)
# -----------------------------------------------------------------------------------
# 512 | 0.82 | 0.31 | 2.65x | 48 | 12
# 1024 | 2.45 | 0.58 | 4.22x | 192 | 24
# 2048 | 9.12 | 1.12 | 8.14x | 768 | 48
# 4096 | 35.80 | 2.21 | 16.20x | 3072 | 96
# 8192 | 144.20 | 4.38 | 32.92x | 12288 | 192
# Flash Attention scala LINEARMENTE: a seq=8192 usa 64x meno VRAM!
Kontrola přechodu a akumulace přechodu
Když je VRAM překážkou během tréninku, dvě doplňkové techniky umožňují trénovat větší dávky bez upgradu hardwaru:
import torch
import torch.nn as nn
from torch.utils.checkpoint import checkpoint_sequential
import gc
# ============================================================
# GRADIENT CHECKPOINTING
# ============================================================
# Idea: invece di salvare tutte le attivazioni intermedie per il backward pass,
# le ricalcola al momento (tradeoff: +33% compute, -50-70% memoria)
class CheckpointedTransformerBlock(nn.Module):
"""Transformer block con gradient checkpointing."""
def __init__(self, d_model=768, n_heads=12):
super().__init__()
self.attn = nn.MultiheadAttention(d_model, n_heads, batch_first=True)
self.norm1 = nn.LayerNorm(d_model)
self.ff = nn.Sequential(
nn.Linear(d_model, d_model * 4), nn.GELU(),
nn.Linear(d_model * 4, d_model)
)
self.norm2 = nn.LayerNorm(d_model)
def _attn_block(self, x):
attn_out, _ = self.attn(x, x, x)
return self.norm1(x + attn_out)
def _ff_block(self, x):
return self.norm2(x + self.ff(x))
def forward(self, x):
# Gradient checkpointing: ogni sotto-modulo viene ricalcolato
# durante il backward invece di essere salvato
x = torch.utils.checkpoint.checkpoint(self._attn_block, x, use_reentrant=False)
x = torch.utils.checkpoint.checkpoint(self._ff_block, x, use_reentrant=False)
return x
def enable_gradient_checkpointing_hf(model):
"""Abilita gradient checkpointing su modelli HuggingFace."""
model.gradient_checkpointing_enable()
print(f"Gradient checkpointing abilitato su {type(model).__name__}")
# Benchmark Gradient Checkpointing
def compare_checkpointing(seq_len=2048, batch_size=8, d_model=768,
n_layers=12, n_heads=12, device="cuda"):
"""Confronta training con e senza gradient checkpointing."""
class SimpleTransformer(nn.Module):
def __init__(self, use_checkpoint=False):
super().__init__()
self.use_checkpoint = use_checkpoint
self.blocks = nn.ModuleList([
CheckpointedTransformerBlock(d_model, n_heads) if use_checkpoint
else CheckpointedTransformerBlock(d_model, n_heads)
for _ in range(n_layers)
])
self.head = nn.Linear(d_model, 1000)
def forward(self, x):
for block in self.blocks:
if self.use_checkpoint:
x = torch.utils.checkpoint.checkpoint(block, x, use_reentrant=False)
else:
x = block(x)
return self.head(x[:, 0])
results = {}
for use_ckpt in [False, True]:
name = "con checkpointing" if use_ckpt else "senza checkpointing"
gc.collect()
torch.cuda.empty_cache() if torch.cuda.is_available() else None
torch.cuda.reset_peak_memory_stats() if torch.cuda.is_available() else None
model = SimpleTransformer(use_checkpoint=use_ckpt).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
x = torch.randn(batch_size, seq_len, d_model, device=device)
labels = torch.randint(0, 1000, (batch_size,), device=device)
# Forward + backward
torch.cuda.synchronize() if torch.cuda.is_available() else None
t0 = time.perf_counter()
for _ in range(10):
optimizer.zero_grad()
out = model(x)
loss = nn.CrossEntropyLoss()(out, labels)
loss.backward()
optimizer.step()
torch.cuda.synchronize() if torch.cuda.is_available() else None
elapsed = (time.perf_counter() - t0) / 10 * 1000
vram = torch.cuda.max_memory_allocated() / (1024**2) if torch.cuda.is_available() else 0
results[name] = {"vram_mb": round(vram, 1), "step_ms": round(elapsed, 1)}
print(f"{name}: VRAM={vram:.0f}MB, Step={elapsed:.1f}ms")
return results
# Risultati tipici (Transformer 12 layer, seq=2048, BS=8, RTX 3090):
# Senza checkpointing: VRAM=18.4GB, Step=285ms
# Con checkpointing: VRAM= 7.8GB, Step=378ms (-58% VRAM, +33% compute)
# ============================================================
# GRADIENT ACCUMULATION
# ============================================================
def train_with_gradient_accumulation(
model, optimizer, train_loader, criterion,
accumulation_steps: int = 4,
device: str = "cuda"
):
"""
Gradient accumulation: simula batch_size * accumulation_steps
con la memoria di batch_size.
Utile quando il batch_size reale e troppo piccolo per convergenza ottimale.
"""
model = model.to(device).train()
optimizer.zero_grad()
for step, (imgs, labels) in enumerate(train_loader):
imgs, labels = imgs.to(device), labels.to(device)
# Forward pass
with torch.autocast(device_type="cuda", dtype=torch.bfloat16):
output = model(imgs)
# Dividi loss per accumulation steps (mantiene la scala corretta)
loss = criterion(output, labels) / accumulation_steps
loss.backward()
# Aggiorna i pesi ogni N step
if (step + 1) % accumulation_steps == 0:
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
optimizer.zero_grad()
effective_batch = imgs.size(0) * accumulation_steps
print(f"Step {(step+1)//accumulation_steps} | "
f"Effective batch: {effective_batch} | Loss: {loss.item()*accumulation_steps:.4f}")
torch.compile: Optimalizace grafů
pochodeň.kompilovat (PyTorch 2.0+) zkompiluje model do optimalizovaných jader přes Triton nebo jiný backend. A ta nejjednodušší optimalizace, kterou lze použít: stačí jedna řádek kódu může vést k 1,5-2,5x zrychlení při odvození.
import torch
from torchvision import models
import time, numpy as np
def benchmark_torch_compile():
device = "cuda" if torch.cuda.is_available() else "cpu"
# ============================================================
# MODALITA DI COMPILAZIONE
# ============================================================
# "default": Bilanciamento compile time / speedup
# "reduce-overhead": Minimizza overhead, ottimale per piccoli batch
# "max-autotune": Massima velocità (compile time molto più lungo, ~5-10 min)
# "inductor": Backend default (usa Triton su CUDA, C++ su CPU)
model_fp32 = models.resnet50(pretrained=False).to(device).eval()
# Compilazione eager (default)
model_compiled_default = torch.compile(
models.resnet50(pretrained=False).to(device).eval(),
mode="default"
)
# Compilazione per massima velocità
model_compiled_max = torch.compile(
models.resnet50(pretrained=False).to(device).eval(),
mode="max-autotune",
fullgraph=True # Evita graph breaks per massimo speedup
)
x = torch.randn(32, 3, 224, 224, device=device)
def time_model(model, x, n=100):
"""Benchmark con warmup."""
# Warmup (specialmente importante per torch.compile)
with torch.no_grad():
for _ in range(20):
model(x)
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies = []
with torch.no_grad():
for _ in range(n):
t0 = time.perf_counter()
model(x)
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies.append((time.perf_counter() - t0) * 1000)
return np.mean(latencies)
ms_eager = time_model(model_fp32, x)
ms_default = time_model(model_compiled_default, x)
# ms_max = time_model(model_compiled_max, x) # Richiede molto tempo di compile
print(f"Eager (FP32): {ms_eager:.2f} ms")
print(f"Compiled default: {ms_default:.2f} ms ({ms_eager/ms_default:.2f}x speedup)")
# Con BF16 + compile: effetto moltiplicativo
model_bf16_compiled = torch.compile(
models.resnet50(pretrained=False).to(device).eval(),
mode="default"
)
x_bf16 = x.to(torch.bfloat16)
model_bf16_compiled = model_bf16_compiled.to(torch.bfloat16)
ms_bf16_compiled = time_model(model_bf16_compiled, x_bf16)
print(f"BF16 + Compiled: {ms_bf16_compiled:.2f} ms ({ms_eager/ms_bf16_compiled:.2f}x speedup)")
# Risultati tipici RTX 4090:
# Eager FP32: 12.4 ms/step (BS=32)
# Compiled default: 7.8 ms/step (1.59x)
# BF16 + Compiled: 5.1 ms/step (2.43x)
benchmark_torch_compile()
KV Cache: Optimalizace pro LLM Autoregresivní inference
V autoregresivních modelech musí každý vygenerovaný token čekat na pozornost všech předchozí tokeny. Bez optimalizací se klíče (K) a hodnoty (V) přepočítají v každém kroku — se složitostí O(n^2) pro sekvenci n tokenů. The KV cache šetří K a V každé vrstvy po každém kroku, což snižuje náklady generace od O(n^2) do O(n).
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, Tuple
# ============================================================
# TRANSFORMER CON KV CACHE
# ============================================================
class CachedMultiHeadAttention(nn.Module):
"""
Multi-head attention con KV cache per generazione autogressiva.
Il cache evita di ricalcolare K, V per token passati.
"""
def __init__(self, d_model: int, n_heads: int):
super().__init__()
self.n_heads = n_heads
self.d_head = d_model // n_heads
self.scale = self.d_head ** -0.5
self.q_proj = nn.Linear(d_model, d_model, bias=False)
self.k_proj = nn.Linear(d_model, d_model, bias=False)
self.v_proj = nn.Linear(d_model, d_model, bias=False)
self.out_proj = nn.Linear(d_model, d_model, bias=False)
def forward(
self,
x: torch.Tensor, # [B, seq_len, d_model]
kv_cache: Optional[Tuple] = None # (K_cache, V_cache) o None
) -> Tuple[torch.Tensor, Tuple]:
B, T, D = x.shape
# Proietta Q, K, V
q = self.q_proj(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
k = self.k_proj(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
v = self.v_proj(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
# Concatena con cache esistente
if kv_cache is not None:
k_cache, v_cache = kv_cache
k = torch.cat([k_cache, k], dim=2) # [B, heads, T_total, d_head]
v = torch.cat([v_cache, v], dim=2)
# Attention (Flash Attention automatica con PyTorch 2.0+)
out = F.scaled_dot_product_attention(q, k, v, is_causal=(kv_cache is None))
out = out.transpose(1, 2).contiguous().view(B, T, D)
return self.out_proj(out), (k, v) # Ritorna output + nuovo cache
class CachedTransformerDecoder(nn.Module):
"""Decoder Transformer con KV cache per generazione efficiente."""
def __init__(self, vocab_size: int, d_model: int = 512,
n_heads: int = 8, n_layers: int = 6):
super().__init__()
self.embed = nn.Embedding(vocab_size, d_model)
self.pos_embed = nn.Embedding(2048, d_model)
self.layers = nn.ModuleList([
CachedMultiHeadAttention(d_model, n_heads)
for _ in range(n_layers)
])
self.norms = nn.ModuleList([nn.LayerNorm(d_model) for _ in range(n_layers)])
self.head = nn.Linear(d_model, vocab_size)
self.n_layers = n_layers
@torch.no_grad()
def generate(
self,
input_ids: torch.Tensor, # [B, seq_len]
max_new_tokens: int = 100,
temperature: float = 1.0
) -> torch.Tensor:
"""
Generazione autogressiva con KV cache.
Ogni step utilizza il cache dei token precedenti.
"""
B, T = input_ids.shape
device = input_ids.device
# Processa il prompt (prefill)
x = self.embed(input_ids)
positions = torch.arange(T, device=device).unsqueeze(0)
x = x + self.pos_embed(positions)
# Inizializza cache per ogni layer
kv_caches = [None] * self.n_layers
for i, (layer, norm) in enumerate(zip(self.layers, self.norms)):
x_norm = norm(x)
attn_out, kv_caches[i] = layer(x_norm, kv_caches[i])
x = x + attn_out
# Generazione token per token (usando il cache)
generated = []
for step in range(max_new_tokens):
# Solo l'ultimo token come query
last_token = input_ids[:, -1:] if step == 0 else new_token
x_new = self.embed(last_token)
pos = torch.tensor([[T + step]], device=device)
x_new = x_new + self.pos_embed(pos)
for i, (layer, norm) in enumerate(zip(self.layers, self.norms)):
x_norm = norm(x_new)
attn_out, kv_caches[i] = layer(x_norm, kv_caches[i])
x_new = x_new + attn_out
# Campiona prossimo token
logits = self.head(x_new[:, -1, :]) / temperature
new_token = torch.multinomial(torch.softmax(logits, -1), 1)
generated.append(new_token)
return torch.cat(generated, dim=1)
# Benchmark KV cache vs no cache
def benchmark_generation(model, vocab_size=32000, seq_len=128,
max_new=50, device="cuda"):
model = model.to(device).eval()
input_ids = torch.randint(0, vocab_size, (1, seq_len), device=device)
# Con KV cache (normale)
t0 = time.perf_counter()
with torch.no_grad():
output = model.generate(input_ids, max_new_tokens=max_new)
t_cached = (time.perf_counter() - t0) * 1000
tokens_per_sec = max_new / (t_cached / 1000)
print(f"Con KV Cache: {t_cached:.1f}ms totale, {tokens_per_sec:.1f} token/s")
Systematické srovnání: od 48 GB do 8 GB RTX
Shrneme všechny optimalizace zaznamenané v sérii tak, že je budeme postupně aplikovat na základní model, ukazující kompromis přesnost/paměť/rychlost.
Úplné srovnání: Llama-3.1-8B na RTX 3090 (24 GB)
| Konfigurace | VRAM | Propustnost | HellaSwag | Zmatek | Poznámky |
|---|---|---|---|---|---|
| Základní linie BF16 | 16,0 GB | 38 t/s | 82,1 % | 6.14 | Referenční benchmark |
| + Pozor na blesk 2 | 14,2 GB | 52 t/s | 82,1 % | 6.14 | -11% VRAM, +37% rychlost |
| + pochodeň.kompilovat | 14,2 GB | 68 t/s | 82,1 % | 6.14 | +31 % na Flash Attention |
| INT8 (bit-sandbajty) | 8,5 GB | 35 t/s | 81,8 % | 6.21 | -47 % VRAM, -0,3 % přísl |
| INT4 NF4 (bnb) | 4,9 GB | 42 t/s | 81,2 % | 6.47 | -69% VRAM, -0,9% přísl |
| GPTQ INT4 | 4,8 GB | 55 t/s | 81,5 % | 6.39 | -70 % VRAM, -0,6 % přísl |
| AWQ INT4 | 4,7 GB | 52 t/s | 81,6 % | 6.35 | -71 % VRAM, -0,5 % přísl |
| GGUF Q4_K_M (CPU) | 0 VRAM (5 GB RAM) | 18 t/s | 81,3 % | 6.42 | Není potřeba GPU |
Přibližné hodnoty na RTX 3090 (24GB VRAM). Propustnost měřená s dávkou=1, seq=512.
Průvodce rozhodováním: Jaká optimalizace pro který scénář
# ALBERO DECISIONALE PER OTTIMIZZAZIONE DL
def recommend_optimization(
vram_available_gb: float,
task: str, # "training" | "inference" | "edge"
accuracy_critical: bool,
hardware: str # "server_gpu" | "consumer_gpu" | "cpu" | "edge"
) -> dict:
"""
Raccomanda le ottimizzazioni più appropriate per il proprio scenario.
"""
recommendations = []
priority = []
# === SEMPRE DA FARE (zero o quasi zero costo) ===
priority.append("1. Mixed Precision (BF16/FP16): abilita SEMPRE su GPU Ampere+")
priority.append("2. Flash Attention: abilita se seq_len > 512")
priority.append("3. torch.compile: abilita se PyTorch 2.0+, +30-50% speedup inference")
priority.append("4. KV Cache: abilita SEMPRE per LLM autoregressive generation")
if task == "training":
if vram_available_gb < 24:
priority.append("5. Gradient Checkpointing: -50% VRAM, +33% compute")
priority.append("6. Gradient Accumulation: simula batch più grandi")
if hardware in ["consumer_gpu", "edge"]:
priority.append("7. QLoRA: fine-tuning con INT4 + LoRA su GPU consumer")
if task in ["inference", "edge"]:
if not accuracy_critical:
if hardware == "server_gpu":
priority.append("5. GPTQ INT4: massimo throughput su GPU NVIDIA")
elif hardware in ["consumer_gpu", "cpu"]:
priority.append("5. AWQ INT4 o GGUF Q4_K_M: per hardware eterogeneo")
elif hardware == "edge":
priority.append("5. GGUF Q3_K_M o Q4_K_M: per Raspberry Pi / embedded")
else:
priority.append("5. INT8 (bitsandbytes): minima perdita di accuratezza")
if vram_available_gb < 16:
priority.append("6. ONNX Export: riduzione overhead runtime +20-40%")
priority.append("7. Considera distillazione verso modello più piccolo")
print("=== RACCOMANDAZIONI OTTIMIZZAZIONE ===")
for p in priority:
print(f" {p}")
return {"priorities": priority}
# Esempi:
print("--- Scenario 1: Fine-tuning su RTX 4080 (16GB) ---")
recommend_optimization(16, "training", True, "consumer_gpu")
print("\n--- Scenario 2: Inferenza su Raspberry Pi ---")
recommend_optimization(0, "inference", False, "edge")
print("\n--- Scenario 3: Produzione su A100 (80GB) ---")
recommend_optimization(80, "inference", True, "server_gpu")
Shrnutí optimalizace: Očekávaný dopad
| Technika | Úspora VRAM | Zrychlení | Acc Loss | Složitost |
|---|---|---|---|---|
| Mixed Precision BF16 | -50 % | 2-3x | 0% | Nízká (1 řádek) |
| Pozor na blesk 2 | -50-90% | 2-8x | 0% | Nízká (1 řádek) |
| pochodeň.kompilovat | 0% | 1,5-2,5x | 0% | Nízká (1 řádek) |
| KV cache | +VRAM | 10-50x gen | 0% | Nízký |
| Přechodový kontrolní bod | -50-70% | -0,7x | 0% | Nízký |
| INT8 Kvantování | -50 % | 0,9-1,1x | 0–0,5 % | Nízký |
| INT4 GPTQ/AWQ | -75 % | 1,3-1,8x | 0,5–1,5 % | Průměrný |
| Destilace | -70-90% | 5-20x | 5–15 % | Vysoký |
| Strukturované prořezávání | -30-70% | 2-5x | 2–10 % | Vysoký |
Závěry seriálu
Prošli jsme celou sérii Pokročilé Deep Learning a Edge Deployment: od mechanismu pozornosti v Transformers po jemné doladění pomocí LoRA, od kvantizace GPTQ po strukturované prořezávání, od destilace po Vision Transformers, od NAS po nasazení na okrajích s Raspberry Pi a Jetson, od Ollamy až po tento konečný benchmark.
Ústřední a jasné poselství: neexistuje jediná „nejlepší“ technika. Optimální volba vždy záleží na kontextu — dostupném hardwaru, požadavcích na přesnost, cílové latenci, provozní náklady. Ale se systematickým rámcem benchmarkingu uvedeným v tomto článku můžete opatření místo hádata dělat informovaná rozhodnutí.
Trend pro rok 2026 je jasný: modely se pohybují směrem k okraji. Gartner 2027 předpovídá, že i SLM při používání 3x překonává cloudový LLM. Techniky v této sérii — kvantování, destilace, okrajové nasazení, Ollama – to nejsou akademické výklenky: jsou to dovednosti zásadní pro každého, kdo chce v nadcházejících letech pracovat s AI.
Shrnutí série: Pokročilé hluboké učení
- Článek 1: Mechanismus pozornosti v transformátorech
- Článek 2: Doladění pomocí LoRA a QLoRA
- Článek 3: Kvantování GPTQ, AWQ, INT8
- Článek 4: Destilace znalostí
- Článek 5: Prořezávání neuronových sítí
- Článek 6: Vision Transformer (ViT)
- Článek 7: Hledání neuronové architektury
- Článek 8: Deep Learning on Edge Devices
- Článek 9: Prostory Ollama a LLM
- Článek 10 (tento): Srovnávání a optimalizace
Související série: MLOps | Počítačové vidění | AI inženýrství







