Benchmark e Ottimizzazione: da 48GB GPU a 8GB RTX
Hai un modello. Funziona su una A100 da 80 GB. Ma devi deployarlo su una RTX 3090 da 24 GB, o su una RTX 4060 laptop da 8 GB, o persino su un Raspberry Pi. Come fai a sapere quanta accuratezza perdi passando da FP32 a INT4? Quanto guadagni in velocità con Flash Attention? Vale la pena quantizzare o e meglio distillare? Quanta memoria risparmia il gradient checkpointing?
Senza benchmark sistematici, queste domande rimangono senza risposta — e si finisce per fare scelte subottimali basate su intuizioni o benchmark pubblicati con configurazioni diverse dalla propria. In questo articolo finale della serie, costruiamo un framework di benchmarking completo per misurare ogni dimensione della performance: memoria, latenza, throughput, accuratezza e consumo energetico.
Poi applichiamo sistematicamente tutte le tecniche viste nella serie — quantizzazione, pruning, distillazione, Flash Attention, gradient checkpointing, mixed precision — e mostriamo come passare da un modello che richiede 48 GB a uno che gira in 8 GB, con le metriche che dimostrano esattamente cosa si paga in termini di qualità.
Cosa Imparerai
- Framework di benchmarking sistematico per modelli DL
- Misurare VRAM, latenza, throughput e FLOPs con precisione
- Mixed Precision Training: FP16 vs BF16 vs FP32
- Flash Attention 2/3: quanto risparmia e quando usarla
- Gradient Checkpointing: memoria vs compute trade-off
- Gradient Accumulation: batch size virtualmente grandi
- Torch.compile e ottimizzazioni runtime
- KV Cache: ottimizzazione per LLM autoregressive inference
- Confronto sistematico: tutte le tecniche a confronto
- Guida decisionale: quale ottimizzazione per quale scenario
Framework di Benchmarking Sistematico
Prima di ottimizzare, bisogna misurare con precisione. Un framework di benchmarking professionale misura: peak VRAM usage, latency media e P95, throughput (token/s o img/s), FLOPs, consumo energetico e accuratezza su task specifici. La chiave e la riproducibilità: benchmark che variano del 10% tra run sono inutili.
import torch
import torch.nn as nn
import time
import numpy as np
from dataclasses import dataclass, asdict
from typing import Optional, Callable
import gc
# ============================================================
# DATACLASS PER RISULTATI BENCHMARK
# ============================================================
@dataclass
class BenchmarkResult:
"""Risultati completi di un benchmark."""
name: str
# Memoria
vram_allocated_mb: float
vram_reserved_mb: float
vram_peak_mb: float
# Velocita
latency_ms_mean: float
latency_ms_p50: float
latency_ms_p95: float
latency_ms_p99: float
throughput_per_sec: float
# Modello
params_total: int
params_trainable: int
model_size_mb: float
# Opzionali
accuracy: Optional[float] = None
flops_total: Optional[float] = None
power_watts: Optional[float] = None
def print_summary(self):
print(f"\n=== {self.name} ===")
print(f" VRAM: {self.vram_peak_mb:.0f} MB peak, {self.vram_allocated_mb:.0f} MB alloc")
print(f" Latenza: {self.latency_ms_mean:.1f}ms mean, "
f"{self.latency_ms_p95:.1f}ms p95, {self.latency_ms_p99:.1f}ms p99")
print(f" Throughput: {self.throughput_per_sec:.1f}/s")
print(f" Parametri: {self.params_total:,} ({self.model_size_mb:.1f} MB)")
if self.accuracy:
print(f" Accuratezza: {self.accuracy:.4f}")
# ============================================================
# CLASSE PRINCIPALE DI BENCHMARKING
# ============================================================
class DeepLearningBenchmark:
def __init__(self, device: str = "cuda"):
self.device = device
self.results = []
def _count_params(self, model: nn.Module) -> tuple:
total = sum(p.numel() for p in model.parameters())
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
return total, trainable
def _model_size_mb(self, model: nn.Module) -> float:
total_bytes = sum(p.numel() * p.element_size() for p in model.parameters())
return total_bytes / (1024 ** 2)
def _reset_memory(self):
"""Reset GPU memory per benchmark pulito."""
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
def benchmark_inference(
self,
name: str,
model: nn.Module,
input_fn: Callable[[], tuple],
n_warmup: int = 10,
n_runs: int = 100,
batch_size: int = 1
) -> BenchmarkResult:
"""
Benchmark completo di inferenza.
input_fn: funzione che restituisce input per il modello
"""
model = model.to(self.device).eval()
self._reset_memory()
# Warmup
with torch.no_grad():
for _ in range(n_warmup):
inputs = input_fn()
if isinstance(inputs, dict):
model(**{k: v.to(self.device) for k, v in inputs.items()})
else:
model(inputs.to(self.device))
# Misura memoria post-warmup
if torch.cuda.is_available():
mem_alloc = torch.cuda.memory_allocated() / (1024**2)
mem_reserved = torch.cuda.memory_reserved() / (1024**2)
# Benchmark vero
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies = []
for _ in range(n_runs):
inputs = input_fn()
t0 = time.perf_counter()
with torch.no_grad():
if isinstance(inputs, dict):
_ = model(**{k: v.to(self.device) for k, v in inputs.items()})
else:
_ = model(inputs.to(self.device))
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies.append((time.perf_counter() - t0) * 1000)
if torch.cuda.is_available():
mem_peak = torch.cuda.max_memory_allocated() / (1024**2)
else:
mem_alloc = mem_reserved = mem_peak = 0.0
latencies = np.array(latencies)
total_params, trainable_params = self._count_params(model)
result = BenchmarkResult(
name=name,
vram_allocated_mb=mem_alloc,
vram_reserved_mb=mem_reserved,
vram_peak_mb=mem_peak,
latency_ms_mean=float(np.mean(latencies)),
latency_ms_p50=float(np.percentile(latencies, 50)),
latency_ms_p95=float(np.percentile(latencies, 95)),
latency_ms_p99=float(np.percentile(latencies, 99)),
throughput_per_sec=1000 / np.mean(latencies) * batch_size,
params_total=total_params,
params_trainable=trainable_params,
model_size_mb=self._model_size_mb(model)
)
result.print_summary()
self.results.append(result)
return result
def benchmark_training_step(
self,
name: str,
model: nn.Module,
optimizer: torch.optim.Optimizer,
loss_fn: Callable,
input_fn: Callable,
n_steps: int = 50
) -> dict:
"""Benchmark di un singolo step di training."""
model = model.to(self.device).train()
self._reset_memory()
latencies = []
for step in range(n_steps):
inputs, labels = input_fn()
inputs = inputs.to(self.device)
labels = labels.to(self.device)
t0 = time.perf_counter()
optimizer.zero_grad()
outputs = model(inputs)
loss = loss_fn(outputs, labels)
loss.backward()
optimizer.step()
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies.append((time.perf_counter() - t0) * 1000)
return {
"name": name,
"vram_peak_mb": torch.cuda.max_memory_allocated() / (1024**2) if torch.cuda.is_available() else 0,
"step_ms_mean": float(np.mean(latencies[5:])), # Skip warmup
"step_ms_p95": float(np.percentile(latencies[5:], 95))
}
def compare_results(self) -> None:
"""Stampa tabella comparativa di tutti i risultati."""
if not self.results:
print("Nessun risultato disponibile.")
return
baseline = self.results[0]
print(f"\n{'Config':<30} {'VRAM (MB)':>12} {'Latency (ms)':>14} {'Throughput':>12} {'Speedup':>10}")
print("-" * 82)
for r in self.results:
speedup = baseline.latency_ms_mean / r.latency_ms_mean
print(f"{r.name:<30} {r.vram_peak_mb:>12.0f} {r.latency_ms_mean:>14.2f} "
f"{r.throughput_per_sec:>12.1f} {speedup:>10.2f}x")
# Uso:
bench = DeepLearningBenchmark(device="cuda" if torch.cuda.is_available() else "cpu")
print("Framework di benchmarking inizializzato")
Mixed Precision: FP32 vs FP16 vs BF16
Il mixed precision training e la prima ottimizzazione da abilitare: quasi
zero overhead da configurare, 2x risparmio di memoria, spesso 2-3x speedup su hardware
Ampere+. torch.autocast gestisce automaticamente quali operazioni eseguire
in precision ridotta.
La differenza chiave tra FP16 e BF16 e il formato binario: FP16 ha 5 bit per l'esponente e 10 per la mantissa (range 6e-5 a 6.5e4), mentre BF16 ha 8 bit per l'esponente e 7 per la mantissa (stesso range di FP32, da 1.2e-38 a 3.4e38). BF16 e molto più stabile durante il training perchè non causa overflow/underflow con gradient grandi.
import torch
import torch.nn as nn
from torch.cuda.amp import GradScaler
# ============================================================
# CONFRONTO FP32 vs FP16 vs BF16
# ============================================================
def train_step_fp32(model, optimizer, imgs, labels, criterion):
"""Training step standard FP32."""
optimizer.zero_grad()
output = model(imgs)
loss = criterion(output, labels)
loss.backward()
optimizer.step()
return loss.item()
def train_step_fp16(model, optimizer, imgs, labels, criterion, scaler: GradScaler):
"""
Training step con AMP FP16.
GradScaler necessario: FP16 ha range limitato, loss scaling evita underflow.
"""
optimizer.zero_grad()
with torch.autocast(device_type="cuda", dtype=torch.float16):
output = model(imgs)
loss = criterion(output, labels)
# Scala la loss per evitare underflow in FP16
scaler.scale(loss).backward()
# Decomprime gradienti prima di clip
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
# Aggiorna pesi (salta se ci sono NaN/Inf nei gradienti)
scaler.step(optimizer)
scaler.update()
return loss.item()
def train_step_bf16(model, optimizer, imgs, labels, criterion):
"""
Training step con BF16.
BF16 NON richiede GradScaler: ha range dinamico uguale a FP32.
Disponibile su: A100, RTX 3000+, Apple M-series.
"""
optimizer.zero_grad()
with torch.autocast(device_type="cuda", dtype=torch.bfloat16):
output = model(imgs)
loss = criterion(output, labels)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
return loss.item()
# Benchmark comparativo
from torchvision import models
import time, gc
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def compare_precisions(model_fn=models.resnet50, n_steps=100,
batch_size=32, img_size=224):
"""Confronta FP32, FP16, BF16 per training e inferenza."""
criterion = nn.CrossEntropyLoss()
configs = [
("FP32", torch.float32, False),
("FP16", torch.float16, True), # Richiede GradScaler
("BF16", torch.bfloat16, False) # No GradScaler
]
results = {}
for name, dtype, use_scaler in configs:
model = model_fn(pretrained=False).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scaler = GradScaler() if use_scaler else None
# Reset memory stats
torch.cuda.reset_peak_memory_stats() if torch.cuda.is_available() else None
gc.collect()
torch.cuda.empty_cache() if torch.cuda.is_available() else None
timings = []
for step in range(n_steps):
imgs = torch.randn(batch_size, 3, img_size, img_size, device=device)
labels = torch.randint(0, 1000, (batch_size,), device=device)
t0 = time.perf_counter()
with torch.autocast(device_type="cuda", dtype=dtype, enabled=(dtype != torch.float32)):
out = model(imgs)
loss = criterion(out, labels)
if scaler:
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
else:
optimizer.zero_grad()
loss.backward()
optimizer.step()
torch.cuda.synchronize() if torch.cuda.is_available() else None
timings.append((time.perf_counter() - t0) * 1000)
vram_peak = torch.cuda.max_memory_allocated() / (1024**2) if torch.cuda.is_available() else 0
results[name] = {
"vram_mb": round(vram_peak, 1),
"step_ms": round(np.mean(timings[10:]), 2),
"throughput_imgs_s": round(batch_size * 1000 / np.mean(timings[10:]), 1)
}
print(f"{name}: VRAM={vram_peak:.0f}MB, {np.mean(timings[10:]):.1f}ms/step, "
f"{batch_size*1000/np.mean(timings[10:]):.0f} img/s")
return results
# Risultati tipici ResNet-50 BS=32 su RTX 4090:
# FP32: VRAM=6200MB, 95ms/step, 336 img/s
# FP16: VRAM=3100MB, 41ms/step, 780 img/s (2x velocità, 50% VRAM)
# BF16: VRAM=3100MB, 38ms/step, 842 img/s (2.2x velocità, 50% VRAM)
Flash Attention: L'Ottimizzazione che Cambia le Regole
Flash Attention (Dao et al., 2022) e forse l'ottimizzazione più impattante per i Transformer degli ultimi anni. Riformula il calcolo dell'attention per essere IO-bound aware: invece di materializzare la matrice di attention completa in HBM (che ha complessità O(n^2) in memoria), calcola l'attention a blocchi rimanendo in SRAM. Il risultato: complessità O(n) in memoria invece di O(n^2), velocità 2-4x su sequenze lunghe.
Flash Attention 2 (2023) migliora ulteriormente il parallelismo su GPU, raggiungendo il 72% dell'utilizzo teorico delle FP16 FLOPS. Flash Attention 3 (2024) aggiunge supporto per FP8 e Hopper-specific optimizations, con speedup fino a 2x rispetto a FA2.
import torch
import torch.nn as nn
import torch.nn.functional as F
import time, math
# ============================================================
# FLASH ATTENTION vs STANDARD ATTENTION: CONFRONTO
# ============================================================
def standard_attention(q, k, v, scale=None):
"""
Attention standard: materializza la matrice NxN completa in GPU memory.
Complessità memoria: O(N^2 * d_head)
"""
if scale is None:
scale = q.size(-1) ** -0.5
# [B, heads, N, N] - questa matrice può essere ENORME per seq lunghe!
attn = torch.softmax((q @ k.transpose(-2, -1)) * scale, dim=-1)
return attn @ v
def flash_attention_native(q, k, v):
"""
Flash Attention tramite PyTorch 2.0+ scaled_dot_product_attention.
Sceglie automaticamente l'implementazione ottimale:
- FlashAttention-2 se disponibile (CUDA Ampere+)
- Memory-efficient attention (xFormers) come fallback
- Standard attention come ultimo fallback
"""
# Automaticamente ottimizzato da PyTorch
return F.scaled_dot_product_attention(q, k, v, is_causal=False)
def benchmark_attention_implementations(
batch_size=4, n_heads=12, seq_lengths=[512, 1024, 2048, 4096, 8192],
d_head=64, device="cuda"
):
"""
Confronta Standard vs Flash Attention su diverse lunghezze di sequenza.
"""
print(f"{'Seq Len':>10} | {'Standard (ms)':>15} | {'Flash (ms)':>12} | "
f"{'Speedup':>10} | {'VRAM Std (MB)':>15} | {'VRAM Flash (MB)':>15}")
print("-" * 90)
for seq_len in seq_lengths:
q = torch.randn(batch_size, n_heads, seq_len, d_head, device=device, dtype=torch.float16)
k = torch.randn_like(q)
v = torch.randn_like(q)
# Warmup
for _ in range(5):
standard_attention(q, k, v)
flash_attention_native(q, k, v)
# Benchmark Standard
torch.cuda.reset_peak_memory_stats()
torch.cuda.synchronize()
t0 = time.perf_counter()
for _ in range(20):
out_std = standard_attention(q, k, v)
torch.cuda.synchronize()
std_ms = (time.perf_counter() - t0) / 20 * 1000
vram_std = torch.cuda.max_memory_allocated() / (1024**2)
# Benchmark Flash Attention
torch.cuda.reset_peak_memory_stats()
torch.cuda.synchronize()
t0 = time.perf_counter()
for _ in range(20):
out_flash = flash_attention_native(q, k, v)
torch.cuda.synchronize()
flash_ms = (time.perf_counter() - t0) / 20 * 1000
vram_flash = torch.cuda.max_memory_allocated() / (1024**2)
speedup = std_ms / flash_ms
print(f"{seq_len:>10} | {std_ms:>15.2f} | {flash_ms:>12.2f} | "
f"{speedup:>10.2f}x | {vram_std:>15.0f} | {vram_flash:>15.0f}")
# Risultati tipici su RTX 4090 (FP16, B=4, heads=12, d_head=64):
# Seq Len | Standard (ms) | Flash (ms) | Speedup | VRAM Std (MB) | VRAM Flash (MB)
# -----------------------------------------------------------------------------------
# 512 | 0.82 | 0.31 | 2.65x | 48 | 12
# 1024 | 2.45 | 0.58 | 4.22x | 192 | 24
# 2048 | 9.12 | 1.12 | 8.14x | 768 | 48
# 4096 | 35.80 | 2.21 | 16.20x | 3072 | 96
# 8192 | 144.20 | 4.38 | 32.92x | 12288 | 192
# Flash Attention scala LINEARMENTE: a seq=8192 usa 64x meno VRAM!
Gradient Checkpointing e Gradient Accumulation
Quando la VRAM e il collo di bottiglia durante il training, due tecniche complementari permettono di allenare batch più grandi senza aggiornare l'hardware:
import torch
import torch.nn as nn
from torch.utils.checkpoint import checkpoint_sequential
import gc
# ============================================================
# GRADIENT CHECKPOINTING
# ============================================================
# Idea: invece di salvare tutte le attivazioni intermedie per il backward pass,
# le ricalcola al momento (tradeoff: +33% compute, -50-70% memoria)
class CheckpointedTransformerBlock(nn.Module):
"""Transformer block con gradient checkpointing."""
def __init__(self, d_model=768, n_heads=12):
super().__init__()
self.attn = nn.MultiheadAttention(d_model, n_heads, batch_first=True)
self.norm1 = nn.LayerNorm(d_model)
self.ff = nn.Sequential(
nn.Linear(d_model, d_model * 4), nn.GELU(),
nn.Linear(d_model * 4, d_model)
)
self.norm2 = nn.LayerNorm(d_model)
def _attn_block(self, x):
attn_out, _ = self.attn(x, x, x)
return self.norm1(x + attn_out)
def _ff_block(self, x):
return self.norm2(x + self.ff(x))
def forward(self, x):
# Gradient checkpointing: ogni sotto-modulo viene ricalcolato
# durante il backward invece di essere salvato
x = torch.utils.checkpoint.checkpoint(self._attn_block, x, use_reentrant=False)
x = torch.utils.checkpoint.checkpoint(self._ff_block, x, use_reentrant=False)
return x
def enable_gradient_checkpointing_hf(model):
"""Abilita gradient checkpointing su modelli HuggingFace."""
model.gradient_checkpointing_enable()
print(f"Gradient checkpointing abilitato su {type(model).__name__}")
# Benchmark Gradient Checkpointing
def compare_checkpointing(seq_len=2048, batch_size=8, d_model=768,
n_layers=12, n_heads=12, device="cuda"):
"""Confronta training con e senza gradient checkpointing."""
class SimpleTransformer(nn.Module):
def __init__(self, use_checkpoint=False):
super().__init__()
self.use_checkpoint = use_checkpoint
self.blocks = nn.ModuleList([
CheckpointedTransformerBlock(d_model, n_heads) if use_checkpoint
else CheckpointedTransformerBlock(d_model, n_heads)
for _ in range(n_layers)
])
self.head = nn.Linear(d_model, 1000)
def forward(self, x):
for block in self.blocks:
if self.use_checkpoint:
x = torch.utils.checkpoint.checkpoint(block, x, use_reentrant=False)
else:
x = block(x)
return self.head(x[:, 0])
results = {}
for use_ckpt in [False, True]:
name = "con checkpointing" if use_ckpt else "senza checkpointing"
gc.collect()
torch.cuda.empty_cache() if torch.cuda.is_available() else None
torch.cuda.reset_peak_memory_stats() if torch.cuda.is_available() else None
model = SimpleTransformer(use_checkpoint=use_ckpt).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
x = torch.randn(batch_size, seq_len, d_model, device=device)
labels = torch.randint(0, 1000, (batch_size,), device=device)
# Forward + backward
torch.cuda.synchronize() if torch.cuda.is_available() else None
t0 = time.perf_counter()
for _ in range(10):
optimizer.zero_grad()
out = model(x)
loss = nn.CrossEntropyLoss()(out, labels)
loss.backward()
optimizer.step()
torch.cuda.synchronize() if torch.cuda.is_available() else None
elapsed = (time.perf_counter() - t0) / 10 * 1000
vram = torch.cuda.max_memory_allocated() / (1024**2) if torch.cuda.is_available() else 0
results[name] = {"vram_mb": round(vram, 1), "step_ms": round(elapsed, 1)}
print(f"{name}: VRAM={vram:.0f}MB, Step={elapsed:.1f}ms")
return results
# Risultati tipici (Transformer 12 layer, seq=2048, BS=8, RTX 3090):
# Senza checkpointing: VRAM=18.4GB, Step=285ms
# Con checkpointing: VRAM= 7.8GB, Step=378ms (-58% VRAM, +33% compute)
# ============================================================
# GRADIENT ACCUMULATION
# ============================================================
def train_with_gradient_accumulation(
model, optimizer, train_loader, criterion,
accumulation_steps: int = 4,
device: str = "cuda"
):
"""
Gradient accumulation: simula batch_size * accumulation_steps
con la memoria di batch_size.
Utile quando il batch_size reale e troppo piccolo per convergenza ottimale.
"""
model = model.to(device).train()
optimizer.zero_grad()
for step, (imgs, labels) in enumerate(train_loader):
imgs, labels = imgs.to(device), labels.to(device)
# Forward pass
with torch.autocast(device_type="cuda", dtype=torch.bfloat16):
output = model(imgs)
# Dividi loss per accumulation steps (mantiene la scala corretta)
loss = criterion(output, labels) / accumulation_steps
loss.backward()
# Aggiorna i pesi ogni N step
if (step + 1) % accumulation_steps == 0:
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
optimizer.zero_grad()
effective_batch = imgs.size(0) * accumulation_steps
print(f"Step {(step+1)//accumulation_steps} | "
f"Effective batch: {effective_batch} | Loss: {loss.item()*accumulation_steps:.4f}")
torch.compile: Ottimizzazione del Grafo
torch.compile (PyTorch 2.0+) compila il modello in kernel ottimizzati tramite Triton o altri backend. E l'ottimizzazione più semplice da applicare: una sola riga di codice può portare a 1.5-2.5x speedup sull'inferenza.
import torch
from torchvision import models
import time, numpy as np
def benchmark_torch_compile():
device = "cuda" if torch.cuda.is_available() else "cpu"
# ============================================================
# MODALITA DI COMPILAZIONE
# ============================================================
# "default": Bilanciamento compile time / speedup
# "reduce-overhead": Minimizza overhead, ottimale per piccoli batch
# "max-autotune": Massima velocità (compile time molto più lungo, ~5-10 min)
# "inductor": Backend default (usa Triton su CUDA, C++ su CPU)
model_fp32 = models.resnet50(pretrained=False).to(device).eval()
# Compilazione eager (default)
model_compiled_default = torch.compile(
models.resnet50(pretrained=False).to(device).eval(),
mode="default"
)
# Compilazione per massima velocità
model_compiled_max = torch.compile(
models.resnet50(pretrained=False).to(device).eval(),
mode="max-autotune",
fullgraph=True # Evita graph breaks per massimo speedup
)
x = torch.randn(32, 3, 224, 224, device=device)
def time_model(model, x, n=100):
"""Benchmark con warmup."""
# Warmup (specialmente importante per torch.compile)
with torch.no_grad():
for _ in range(20):
model(x)
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies = []
with torch.no_grad():
for _ in range(n):
t0 = time.perf_counter()
model(x)
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies.append((time.perf_counter() - t0) * 1000)
return np.mean(latencies)
ms_eager = time_model(model_fp32, x)
ms_default = time_model(model_compiled_default, x)
# ms_max = time_model(model_compiled_max, x) # Richiede molto tempo di compile
print(f"Eager (FP32): {ms_eager:.2f} ms")
print(f"Compiled default: {ms_default:.2f} ms ({ms_eager/ms_default:.2f}x speedup)")
# Con BF16 + compile: effetto moltiplicativo
model_bf16_compiled = torch.compile(
models.resnet50(pretrained=False).to(device).eval(),
mode="default"
)
x_bf16 = x.to(torch.bfloat16)
model_bf16_compiled = model_bf16_compiled.to(torch.bfloat16)
ms_bf16_compiled = time_model(model_bf16_compiled, x_bf16)
print(f"BF16 + Compiled: {ms_bf16_compiled:.2f} ms ({ms_eager/ms_bf16_compiled:.2f}x speedup)")
# Risultati tipici RTX 4090:
# Eager FP32: 12.4 ms/step (BS=32)
# Compiled default: 7.8 ms/step (1.59x)
# BF16 + Compiled: 5.1 ms/step (2.43x)
benchmark_torch_compile()
KV Cache: Ottimizzazione per LLM Autoregressive Inference
Nei modelli autoregressive, ogni token generato deve attendere l'attenzione su tutti i token precedenti. Senza ottimizzazioni, le chiavi (K) e i valori (V) vengono ricalcolati ad ogni step — con complessità O(n^2) per una sequenza di n token. Il KV Cache salva K e V di ogni layer dopo ogni step, riducendo il costo di generazione da O(n^2) a O(n).
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, Tuple
# ============================================================
# TRANSFORMER CON KV CACHE
# ============================================================
class CachedMultiHeadAttention(nn.Module):
"""
Multi-head attention con KV cache per generazione autogressiva.
Il cache evita di ricalcolare K, V per token passati.
"""
def __init__(self, d_model: int, n_heads: int):
super().__init__()
self.n_heads = n_heads
self.d_head = d_model // n_heads
self.scale = self.d_head ** -0.5
self.q_proj = nn.Linear(d_model, d_model, bias=False)
self.k_proj = nn.Linear(d_model, d_model, bias=False)
self.v_proj = nn.Linear(d_model, d_model, bias=False)
self.out_proj = nn.Linear(d_model, d_model, bias=False)
def forward(
self,
x: torch.Tensor, # [B, seq_len, d_model]
kv_cache: Optional[Tuple] = None # (K_cache, V_cache) o None
) -> Tuple[torch.Tensor, Tuple]:
B, T, D = x.shape
# Proietta Q, K, V
q = self.q_proj(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
k = self.k_proj(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
v = self.v_proj(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
# Concatena con cache esistente
if kv_cache is not None:
k_cache, v_cache = kv_cache
k = torch.cat([k_cache, k], dim=2) # [B, heads, T_total, d_head]
v = torch.cat([v_cache, v], dim=2)
# Attention (Flash Attention automatica con PyTorch 2.0+)
out = F.scaled_dot_product_attention(q, k, v, is_causal=(kv_cache is None))
out = out.transpose(1, 2).contiguous().view(B, T, D)
return self.out_proj(out), (k, v) # Ritorna output + nuovo cache
class CachedTransformerDecoder(nn.Module):
"""Decoder Transformer con KV cache per generazione efficiente."""
def __init__(self, vocab_size: int, d_model: int = 512,
n_heads: int = 8, n_layers: int = 6):
super().__init__()
self.embed = nn.Embedding(vocab_size, d_model)
self.pos_embed = nn.Embedding(2048, d_model)
self.layers = nn.ModuleList([
CachedMultiHeadAttention(d_model, n_heads)
for _ in range(n_layers)
])
self.norms = nn.ModuleList([nn.LayerNorm(d_model) for _ in range(n_layers)])
self.head = nn.Linear(d_model, vocab_size)
self.n_layers = n_layers
@torch.no_grad()
def generate(
self,
input_ids: torch.Tensor, # [B, seq_len]
max_new_tokens: int = 100,
temperature: float = 1.0
) -> torch.Tensor:
"""
Generazione autogressiva con KV cache.
Ogni step utilizza il cache dei token precedenti.
"""
B, T = input_ids.shape
device = input_ids.device
# Processa il prompt (prefill)
x = self.embed(input_ids)
positions = torch.arange(T, device=device).unsqueeze(0)
x = x + self.pos_embed(positions)
# Inizializza cache per ogni layer
kv_caches = [None] * self.n_layers
for i, (layer, norm) in enumerate(zip(self.layers, self.norms)):
x_norm = norm(x)
attn_out, kv_caches[i] = layer(x_norm, kv_caches[i])
x = x + attn_out
# Generazione token per token (usando il cache)
generated = []
for step in range(max_new_tokens):
# Solo l'ultimo token come query
last_token = input_ids[:, -1:] if step == 0 else new_token
x_new = self.embed(last_token)
pos = torch.tensor([[T + step]], device=device)
x_new = x_new + self.pos_embed(pos)
for i, (layer, norm) in enumerate(zip(self.layers, self.norms)):
x_norm = norm(x_new)
attn_out, kv_caches[i] = layer(x_norm, kv_caches[i])
x_new = x_new + attn_out
# Campiona prossimo token
logits = self.head(x_new[:, -1, :]) / temperature
new_token = torch.multinomial(torch.softmax(logits, -1), 1)
generated.append(new_token)
return torch.cat(generated, dim=1)
# Benchmark KV cache vs no cache
def benchmark_generation(model, vocab_size=32000, seq_len=128,
max_new=50, device="cuda"):
model = model.to(device).eval()
input_ids = torch.randint(0, vocab_size, (1, seq_len), device=device)
# Con KV cache (normale)
t0 = time.perf_counter()
with torch.no_grad():
output = model.generate(input_ids, max_new_tokens=max_new)
t_cached = (time.perf_counter() - t0) * 1000
tokens_per_sec = max_new / (t_cached / 1000)
print(f"Con KV Cache: {t_cached:.1f}ms totale, {tokens_per_sec:.1f} token/s")
Confronto Sistematico: da 48GB a 8GB RTX
Riepiloghiamo tutte le ottimizzazioni viste nella serie applicandole progressivamente a un modello base, mostrando il trade-off accuratezza/memoria/velocità.
Confronto Completo: Llama-3.1-8B su RTX 3090 (24GB)
| Configurazione | VRAM | Throughput | HellaSwag | Perplexity | Note |
|---|---|---|---|---|---|
| BF16 baseline | 16.0 GB | 38 t/s | 82.1% | 6.14 | Benchmark di riferimento |
| + Flash Attention 2 | 14.2 GB | 52 t/s | 82.1% | 6.14 | -11% VRAM, +37% velocità |
| + torch.compile | 14.2 GB | 68 t/s | 82.1% | 6.14 | +31% su Flash Attention |
| INT8 (bitsandbytes) | 8.5 GB | 35 t/s | 81.8% | 6.21 | -47% VRAM, -0.3% acc |
| INT4 NF4 (bnb) | 4.9 GB | 42 t/s | 81.2% | 6.47 | -69% VRAM, -0.9% acc |
| GPTQ INT4 | 4.8 GB | 55 t/s | 81.5% | 6.39 | -70% VRAM, -0.6% acc |
| AWQ INT4 | 4.7 GB | 52 t/s | 81.6% | 6.35 | -71% VRAM, -0.5% acc |
| GGUF Q4_K_M (CPU) | 0 VRAM (5 GB RAM) | 18 t/s | 81.3% | 6.42 | Nessuna GPU richiesta |
Valori indicativi su RTX 3090 (24GB VRAM). Throughput misurato con batch=1, seq=512.
Guida Decisionale: Quale Ottimizzazione per Quale Scenario
# ALBERO DECISIONALE PER OTTIMIZZAZIONE DL
def recommend_optimization(
vram_available_gb: float,
task: str, # "training" | "inference" | "edge"
accuracy_critical: bool,
hardware: str # "server_gpu" | "consumer_gpu" | "cpu" | "edge"
) -> dict:
"""
Raccomanda le ottimizzazioni più appropriate per il proprio scenario.
"""
recommendations = []
priority = []
# === SEMPRE DA FARE (zero o quasi zero costo) ===
priority.append("1. Mixed Precision (BF16/FP16): abilita SEMPRE su GPU Ampere+")
priority.append("2. Flash Attention: abilita se seq_len > 512")
priority.append("3. torch.compile: abilita se PyTorch 2.0+, +30-50% speedup inference")
priority.append("4. KV Cache: abilita SEMPRE per LLM autoregressive generation")
if task == "training":
if vram_available_gb < 24:
priority.append("5. Gradient Checkpointing: -50% VRAM, +33% compute")
priority.append("6. Gradient Accumulation: simula batch più grandi")
if hardware in ["consumer_gpu", "edge"]:
priority.append("7. QLoRA: fine-tuning con INT4 + LoRA su GPU consumer")
if task in ["inference", "edge"]:
if not accuracy_critical:
if hardware == "server_gpu":
priority.append("5. GPTQ INT4: massimo throughput su GPU NVIDIA")
elif hardware in ["consumer_gpu", "cpu"]:
priority.append("5. AWQ INT4 o GGUF Q4_K_M: per hardware eterogeneo")
elif hardware == "edge":
priority.append("5. GGUF Q3_K_M o Q4_K_M: per Raspberry Pi / embedded")
else:
priority.append("5. INT8 (bitsandbytes): minima perdita di accuratezza")
if vram_available_gb < 16:
priority.append("6. ONNX Export: riduzione overhead runtime +20-40%")
priority.append("7. Considera distillazione verso modello più piccolo")
print("=== RACCOMANDAZIONI OTTIMIZZAZIONE ===")
for p in priority:
print(f" {p}")
return {"priorities": priority}
# Esempi:
print("--- Scenario 1: Fine-tuning su RTX 4080 (16GB) ---")
recommend_optimization(16, "training", True, "consumer_gpu")
print("\n--- Scenario 2: Inferenza su Raspberry Pi ---")
recommend_optimization(0, "inference", False, "edge")
print("\n--- Scenario 3: Produzione su A100 (80GB) ---")
recommend_optimization(80, "inference", True, "server_gpu")
Riepilogo Ottimizzazioni: Impatto Atteso
| Tecnica | VRAM Saving | Speedup | Acc Loss | Complessità |
|---|---|---|---|---|
| Mixed Precision BF16 | -50% | 2-3x | 0% | Bassa (1 riga) |
| Flash Attention 2 | -50-90% | 2-8x | 0% | Bassa (1 riga) |
| torch.compile | 0% | 1.5-2.5x | 0% | Bassa (1 riga) |
| KV Cache | +VRAM | 10-50x gen | 0% | Bassa |
| Gradient Checkpointing | -50-70% | -0.7x | 0% | Bassa |
| INT8 Quantization | -50% | 0.9-1.1x | 0-0.5% | Bassa |
| INT4 GPTQ/AWQ | -75% | 1.3-1.8x | 0.5-1.5% | Media |
| Distillazione | -70-90% | 5-20x | 5-15% | Alta |
| Pruning strutturato | -30-70% | 2-5x | 2-10% | Alta |
Conclusioni della Serie
Abbiamo percorso l'intera serie Deep Learning Avanzato e Edge Deployment: dall'attention mechanism nei Transformer al fine-tuning con LoRA, dalla quantizzazione GPTQ al pruning strutturato, dalla distillazione ai Vision Transformer, dal NAS all'edge deployment con Raspberry Pi e Jetson, da Ollama a questo benchmark finale.
Il messaggio centrale e chiaro: non esiste un'unica tecnica "migliore". La scelta ottimale dipende sempre dal contesto — hardware disponibile, requisiti di accuratezza, latenza target, costi operativi. Ma con il framework di benchmarking sistematico presentato in questo articolo, puoi misurare invece di indovinare, e prendere decisioni informate.
Il trend del 2026 e chiaro: i modelli si spostano verso l'edge. Gartner 2027 prevede che i SLM superino i LLM cloud 3x in utilizzo. Le tecniche in questa serie — quantizzazione, distillazione, edge deployment, Ollama — non sono nicchie accademiche: sono le competenze fondamentali per chiunque voglia lavorare con AI nei prossimi anni.
Riepilogo della Serie: Deep Learning Avanzato
- Articolo 1: Attention Mechanism nei Transformer
- Articolo 2: Fine-tuning con LoRA e QLoRA
- Articolo 3: Quantizzazione GPTQ, AWQ, INT8
- Articolo 4: Knowledge Distillation
- Articolo 5: Pruning Reti Neurali
- Articolo 6: Vision Transformer (ViT)
- Articolo 7: Neural Architecture Search
- Articolo 8: Deep Learning su Edge Devices
- Articolo 9: Ollama e LLM Locali
- Articolo 10 (questo): Benchmark e Ottimizzazione
Serie correlate: MLOps | Computer Vision | AI Engineering







