Test porównawczy i optymalizacja: od procesora graficznego 48 GB do 8 GB RTX
Masz modelkę. Działa na A100 o pojemności 80 GB. Ale musisz go wdrożyć na 24 GB RTX 3090, lub na laptopie RTX 4060 8 GB, a nawet na Raspberry Pi. Skąd wiesz ile Czy tracisz dokładność przy przejściu z FP32 na INT4? Jaką prędkość zyskujesz dzięki Flash Attention? Czy warto to określać ilościowo, czy lepiej destylować? Ile pamięci oszczędza punkt kontrolny gradientu?
Bez systematycznej analizy porównawczej pytania te pozostają bez odpowiedzi – i kończą się dokonywanie nieoptymalnych wyborów w oparciu o intuicję lub opublikowane testy porównawcze z konfiguracjami inny niż twój. W ostatnim artykule z tej serii budujemy ramy kompleksowy benchmarking na miarę każdy rozmiar wydajności: pamięć, opóźnienia, przepustowość, dokładność i zużycie energii.
Następnie systematycznie stosujemy wszystkie techniki widoczne w serii — kwantyzację, przycinanie, destylacja, Flash Attention, kontrola gradientu, precyzja mieszana — i pokazujemy, jak przejść z modelu wymagającego 48 GB na taki, który działa w 8 GB, ze wskaźnikami, które dokładnie pokazują, za co płacisz pod względem jakości.
Czego się nauczysz
- Systematyczne ramy benchmarkingu dla modeli DL
- Dokładnie zmierz pamięć VRAM, opóźnienie, przepustowość i liczbę FLOPów
- Mieszany trening precyzyjny: FP16 vs BF16 vs FP32
- Flash Uwaga 2/3: ile oszczędzasz i kiedy z tego korzystać
- Gradientowy punkt kontrolny: kompromis między pamięcią a obliczeniami
- Akumulacja gradientowa: Praktycznie duże partie
- Optymalizacje Torch.compile i środowiska wykonawczego
- Pamięć podręczna KV: Optymalizacja pod kątem wnioskowania autoregresyjnego LLM
- Porównanie systematyczne: porównanie wszystkich technik
- Wskazówki dotyczące decyzji: jaka optymalizacja dla jakiego scenariusza
Systematyczne ramy benchmarkingu
Przed optymalizacją musisz dokładnie zmierzyć. Ramy benchmarkingu profesjonalny pomiar: szczytowe wykorzystanie VRAM, średnie opóźnienie i P95, przepustowość (token/s lub img/s), FLOPy, zużycie energii i dokładność wykonania określonych zadań. Kluczem jest odtwarzalność: Benchmarki różniące się o 10% pomiędzy uruchomieniami są bezużyteczne.
import torch
import torch.nn as nn
import time
import numpy as np
from dataclasses import dataclass, asdict
from typing import Optional, Callable
import gc
# ============================================================
# DATACLASS PER RISULTATI BENCHMARK
# ============================================================
@dataclass
class BenchmarkResult:
"""Risultati completi di un benchmark."""
name: str
# Memoria
vram_allocated_mb: float
vram_reserved_mb: float
vram_peak_mb: float
# Velocita
latency_ms_mean: float
latency_ms_p50: float
latency_ms_p95: float
latency_ms_p99: float
throughput_per_sec: float
# Modello
params_total: int
params_trainable: int
model_size_mb: float
# Opzionali
accuracy: Optional[float] = None
flops_total: Optional[float] = None
power_watts: Optional[float] = None
def print_summary(self):
print(f"\n=== {self.name} ===")
print(f" VRAM: {self.vram_peak_mb:.0f} MB peak, {self.vram_allocated_mb:.0f} MB alloc")
print(f" Latenza: {self.latency_ms_mean:.1f}ms mean, "
f"{self.latency_ms_p95:.1f}ms p95, {self.latency_ms_p99:.1f}ms p99")
print(f" Throughput: {self.throughput_per_sec:.1f}/s")
print(f" Parametri: {self.params_total:,} ({self.model_size_mb:.1f} MB)")
if self.accuracy:
print(f" Accuratezza: {self.accuracy:.4f}")
# ============================================================
# CLASSE PRINCIPALE DI BENCHMARKING
# ============================================================
class DeepLearningBenchmark:
def __init__(self, device: str = "cuda"):
self.device = device
self.results = []
def _count_params(self, model: nn.Module) -> tuple:
total = sum(p.numel() for p in model.parameters())
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
return total, trainable
def _model_size_mb(self, model: nn.Module) -> float:
total_bytes = sum(p.numel() * p.element_size() for p in model.parameters())
return total_bytes / (1024 ** 2)
def _reset_memory(self):
"""Reset GPU memory per benchmark pulito."""
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
def benchmark_inference(
self,
name: str,
model: nn.Module,
input_fn: Callable[[], tuple],
n_warmup: int = 10,
n_runs: int = 100,
batch_size: int = 1
) -> BenchmarkResult:
"""
Benchmark completo di inferenza.
input_fn: funzione che restituisce input per il modello
"""
model = model.to(self.device).eval()
self._reset_memory()
# Warmup
with torch.no_grad():
for _ in range(n_warmup):
inputs = input_fn()
if isinstance(inputs, dict):
model(**{k: v.to(self.device) for k, v in inputs.items()})
else:
model(inputs.to(self.device))
# Misura memoria post-warmup
if torch.cuda.is_available():
mem_alloc = torch.cuda.memory_allocated() / (1024**2)
mem_reserved = torch.cuda.memory_reserved() / (1024**2)
# Benchmark vero
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies = []
for _ in range(n_runs):
inputs = input_fn()
t0 = time.perf_counter()
with torch.no_grad():
if isinstance(inputs, dict):
_ = model(**{k: v.to(self.device) for k, v in inputs.items()})
else:
_ = model(inputs.to(self.device))
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies.append((time.perf_counter() - t0) * 1000)
if torch.cuda.is_available():
mem_peak = torch.cuda.max_memory_allocated() / (1024**2)
else:
mem_alloc = mem_reserved = mem_peak = 0.0
latencies = np.array(latencies)
total_params, trainable_params = self._count_params(model)
result = BenchmarkResult(
name=name,
vram_allocated_mb=mem_alloc,
vram_reserved_mb=mem_reserved,
vram_peak_mb=mem_peak,
latency_ms_mean=float(np.mean(latencies)),
latency_ms_p50=float(np.percentile(latencies, 50)),
latency_ms_p95=float(np.percentile(latencies, 95)),
latency_ms_p99=float(np.percentile(latencies, 99)),
throughput_per_sec=1000 / np.mean(latencies) * batch_size,
params_total=total_params,
params_trainable=trainable_params,
model_size_mb=self._model_size_mb(model)
)
result.print_summary()
self.results.append(result)
return result
def benchmark_training_step(
self,
name: str,
model: nn.Module,
optimizer: torch.optim.Optimizer,
loss_fn: Callable,
input_fn: Callable,
n_steps: int = 50
) -> dict:
"""Benchmark di un singolo step di training."""
model = model.to(self.device).train()
self._reset_memory()
latencies = []
for step in range(n_steps):
inputs, labels = input_fn()
inputs = inputs.to(self.device)
labels = labels.to(self.device)
t0 = time.perf_counter()
optimizer.zero_grad()
outputs = model(inputs)
loss = loss_fn(outputs, labels)
loss.backward()
optimizer.step()
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies.append((time.perf_counter() - t0) * 1000)
return {
"name": name,
"vram_peak_mb": torch.cuda.max_memory_allocated() / (1024**2) if torch.cuda.is_available() else 0,
"step_ms_mean": float(np.mean(latencies[5:])), # Skip warmup
"step_ms_p95": float(np.percentile(latencies[5:], 95))
}
def compare_results(self) -> None:
"""Stampa tabella comparativa di tutti i risultati."""
if not self.results:
print("Nessun risultato disponibile.")
return
baseline = self.results[0]
print(f"\n{'Config':<30} {'VRAM (MB)':>12} {'Latency (ms)':>14} {'Throughput':>12} {'Speedup':>10}")
print("-" * 82)
for r in self.results:
speedup = baseline.latency_ms_mean / r.latency_ms_mean
print(f"{r.name:<30} {r.vram_peak_mb:>12.0f} {r.latency_ms_mean:>14.2f} "
f"{r.throughput_per_sec:>12.1f} {speedup:>10.2f}x")
# Uso:
bench = DeepLearningBenchmark(device="cuda" if torch.cuda.is_available() else "cpu")
print("Framework di benchmarking inizializzato")
Mieszana precyzja: FP32 vs FP16 vs BF16
Il trening o mieszanej precyzji i pierwsza optymalizacja umożliwiająca: prawie
zerowy narzut na konfigurację, 2x oszczędność pamięci, często 2-3x przyspieszenie sprzętu
Amper+. torch.autocast automatycznie zarządza, jakie operacje należy wykonać
w zmniejszonej precyzji.
Kluczowa różnica między FP16 i BF16 a formatem binarnym: FP16 ma 5 bitów na wykładnik i 10 dla mantysy (zakres od 6e-5 do 6.5e4), podczas gdy BF16 ma 8 bitów na wykładnik i 7 na mantysa (ten sam zakres co FP32, od 1,2e-38 do 3,4e38). BF16 i znacznie bardziej stabilny podczas treningu, ponieważ nie powoduje przepełnienia/niedomiaru przy dużych nachyleniach.
import torch
import torch.nn as nn
from torch.cuda.amp import GradScaler
# ============================================================
# CONFRONTO FP32 vs FP16 vs BF16
# ============================================================
def train_step_fp32(model, optimizer, imgs, labels, criterion):
"""Training step standard FP32."""
optimizer.zero_grad()
output = model(imgs)
loss = criterion(output, labels)
loss.backward()
optimizer.step()
return loss.item()
def train_step_fp16(model, optimizer, imgs, labels, criterion, scaler: GradScaler):
"""
Training step con AMP FP16.
GradScaler necessario: FP16 ha range limitato, loss scaling evita underflow.
"""
optimizer.zero_grad()
with torch.autocast(device_type="cuda", dtype=torch.float16):
output = model(imgs)
loss = criterion(output, labels)
# Scala la loss per evitare underflow in FP16
scaler.scale(loss).backward()
# Decomprime gradienti prima di clip
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
# Aggiorna pesi (salta se ci sono NaN/Inf nei gradienti)
scaler.step(optimizer)
scaler.update()
return loss.item()
def train_step_bf16(model, optimizer, imgs, labels, criterion):
"""
Training step con BF16.
BF16 NON richiede GradScaler: ha range dinamico uguale a FP32.
Disponibile su: A100, RTX 3000+, Apple M-series.
"""
optimizer.zero_grad()
with torch.autocast(device_type="cuda", dtype=torch.bfloat16):
output = model(imgs)
loss = criterion(output, labels)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
return loss.item()
# Benchmark comparativo
from torchvision import models
import time, gc
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def compare_precisions(model_fn=models.resnet50, n_steps=100,
batch_size=32, img_size=224):
"""Confronta FP32, FP16, BF16 per training e inferenza."""
criterion = nn.CrossEntropyLoss()
configs = [
("FP32", torch.float32, False),
("FP16", torch.float16, True), # Richiede GradScaler
("BF16", torch.bfloat16, False) # No GradScaler
]
results = {}
for name, dtype, use_scaler in configs:
model = model_fn(pretrained=False).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scaler = GradScaler() if use_scaler else None
# Reset memory stats
torch.cuda.reset_peak_memory_stats() if torch.cuda.is_available() else None
gc.collect()
torch.cuda.empty_cache() if torch.cuda.is_available() else None
timings = []
for step in range(n_steps):
imgs = torch.randn(batch_size, 3, img_size, img_size, device=device)
labels = torch.randint(0, 1000, (batch_size,), device=device)
t0 = time.perf_counter()
with torch.autocast(device_type="cuda", dtype=dtype, enabled=(dtype != torch.float32)):
out = model(imgs)
loss = criterion(out, labels)
if scaler:
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
else:
optimizer.zero_grad()
loss.backward()
optimizer.step()
torch.cuda.synchronize() if torch.cuda.is_available() else None
timings.append((time.perf_counter() - t0) * 1000)
vram_peak = torch.cuda.max_memory_allocated() / (1024**2) if torch.cuda.is_available() else 0
results[name] = {
"vram_mb": round(vram_peak, 1),
"step_ms": round(np.mean(timings[10:]), 2),
"throughput_imgs_s": round(batch_size * 1000 / np.mean(timings[10:]), 1)
}
print(f"{name}: VRAM={vram_peak:.0f}MB, {np.mean(timings[10:]):.1f}ms/step, "
f"{batch_size*1000/np.mean(timings[10:]):.0f} img/s")
return results
# Risultati tipici ResNet-50 BS=32 su RTX 4090:
# FP32: VRAM=6200MB, 95ms/step, 336 img/s
# FP16: VRAM=3100MB, 41ms/step, 780 img/s (2x velocità, 50% VRAM)
# BF16: VRAM=3100MB, 38ms/step, 842 img/s (2.2x velocità, 50% VRAM)
Uwaga Flash: Optymalizacja, która zmienia zasady
Błyskawiczna uwaga (Dao i in., 2022) i być może najskuteczniejszą optymalizację dla Transformersów ostatnich lat. Zmień sposób obliczania uwagi na Świadomość powiązań IO: zamiast materializować pełną matrycę uwagi w HBM (który ma złożoność O(n^2) w pamięci), oblicza uwagę blokową pozostając w SRAM. Wynik: złożoność pamięci O(n) zamiast O(n^2), przyspieszenie 2-4x w przypadku długich sekwencji.
Flash Attention 2 (2023) dodatkowo poprawia równoległość procesora graficznego, osiągając 72% teoretycznego wykorzystania FLOPS FP16. Flash Attention 3 (2024) dodaje obsługę dla optymalizacji specyficznych dla FP8 i Hoppera, z przyspieszeniem do 2x w porównaniu do FA2.
import torch
import torch.nn as nn
import torch.nn.functional as F
import time, math
# ============================================================
# FLASH ATTENTION vs STANDARD ATTENTION: CONFRONTO
# ============================================================
def standard_attention(q, k, v, scale=None):
"""
Attention standard: materializza la matrice NxN completa in GPU memory.
Complessità memoria: O(N^2 * d_head)
"""
if scale is None:
scale = q.size(-1) ** -0.5
# [B, heads, N, N] - questa matrice può essere ENORME per seq lunghe!
attn = torch.softmax((q @ k.transpose(-2, -1)) * scale, dim=-1)
return attn @ v
def flash_attention_native(q, k, v):
"""
Flash Attention tramite PyTorch 2.0+ scaled_dot_product_attention.
Sceglie automaticamente l'implementazione ottimale:
- FlashAttention-2 se disponibile (CUDA Ampere+)
- Memory-efficient attention (xFormers) come fallback
- Standard attention come ultimo fallback
"""
# Automaticamente ottimizzato da PyTorch
return F.scaled_dot_product_attention(q, k, v, is_causal=False)
def benchmark_attention_implementations(
batch_size=4, n_heads=12, seq_lengths=[512, 1024, 2048, 4096, 8192],
d_head=64, device="cuda"
):
"""
Confronta Standard vs Flash Attention su diverse lunghezze di sequenza.
"""
print(f"{'Seq Len':>10} | {'Standard (ms)':>15} | {'Flash (ms)':>12} | "
f"{'Speedup':>10} | {'VRAM Std (MB)':>15} | {'VRAM Flash (MB)':>15}")
print("-" * 90)
for seq_len in seq_lengths:
q = torch.randn(batch_size, n_heads, seq_len, d_head, device=device, dtype=torch.float16)
k = torch.randn_like(q)
v = torch.randn_like(q)
# Warmup
for _ in range(5):
standard_attention(q, k, v)
flash_attention_native(q, k, v)
# Benchmark Standard
torch.cuda.reset_peak_memory_stats()
torch.cuda.synchronize()
t0 = time.perf_counter()
for _ in range(20):
out_std = standard_attention(q, k, v)
torch.cuda.synchronize()
std_ms = (time.perf_counter() - t0) / 20 * 1000
vram_std = torch.cuda.max_memory_allocated() / (1024**2)
# Benchmark Flash Attention
torch.cuda.reset_peak_memory_stats()
torch.cuda.synchronize()
t0 = time.perf_counter()
for _ in range(20):
out_flash = flash_attention_native(q, k, v)
torch.cuda.synchronize()
flash_ms = (time.perf_counter() - t0) / 20 * 1000
vram_flash = torch.cuda.max_memory_allocated() / (1024**2)
speedup = std_ms / flash_ms
print(f"{seq_len:>10} | {std_ms:>15.2f} | {flash_ms:>12.2f} | "
f"{speedup:>10.2f}x | {vram_std:>15.0f} | {vram_flash:>15.0f}")
# Risultati tipici su RTX 4090 (FP16, B=4, heads=12, d_head=64):
# Seq Len | Standard (ms) | Flash (ms) | Speedup | VRAM Std (MB) | VRAM Flash (MB)
# -----------------------------------------------------------------------------------
# 512 | 0.82 | 0.31 | 2.65x | 48 | 12
# 1024 | 2.45 | 0.58 | 4.22x | 192 | 24
# 2048 | 9.12 | 1.12 | 8.14x | 768 | 48
# 4096 | 35.80 | 2.21 | 16.20x | 3072 | 96
# 8192 | 144.20 | 4.38 | 32.92x | 12288 | 192
# Flash Attention scala LINEARMENTE: a seq=8192 usa 64x meno VRAM!
Gradientowe punkty kontrolne i akumulacja gradientów
Gdy pamięć VRAM stanowi wąskie gardło podczas treningu, stosuje się dwie uzupełniające się techniki pozwalają trenować większe partie bez modernizacji sprzętu:
import torch
import torch.nn as nn
from torch.utils.checkpoint import checkpoint_sequential
import gc
# ============================================================
# GRADIENT CHECKPOINTING
# ============================================================
# Idea: invece di salvare tutte le attivazioni intermedie per il backward pass,
# le ricalcola al momento (tradeoff: +33% compute, -50-70% memoria)
class CheckpointedTransformerBlock(nn.Module):
"""Transformer block con gradient checkpointing."""
def __init__(self, d_model=768, n_heads=12):
super().__init__()
self.attn = nn.MultiheadAttention(d_model, n_heads, batch_first=True)
self.norm1 = nn.LayerNorm(d_model)
self.ff = nn.Sequential(
nn.Linear(d_model, d_model * 4), nn.GELU(),
nn.Linear(d_model * 4, d_model)
)
self.norm2 = nn.LayerNorm(d_model)
def _attn_block(self, x):
attn_out, _ = self.attn(x, x, x)
return self.norm1(x + attn_out)
def _ff_block(self, x):
return self.norm2(x + self.ff(x))
def forward(self, x):
# Gradient checkpointing: ogni sotto-modulo viene ricalcolato
# durante il backward invece di essere salvato
x = torch.utils.checkpoint.checkpoint(self._attn_block, x, use_reentrant=False)
x = torch.utils.checkpoint.checkpoint(self._ff_block, x, use_reentrant=False)
return x
def enable_gradient_checkpointing_hf(model):
"""Abilita gradient checkpointing su modelli HuggingFace."""
model.gradient_checkpointing_enable()
print(f"Gradient checkpointing abilitato su {type(model).__name__}")
# Benchmark Gradient Checkpointing
def compare_checkpointing(seq_len=2048, batch_size=8, d_model=768,
n_layers=12, n_heads=12, device="cuda"):
"""Confronta training con e senza gradient checkpointing."""
class SimpleTransformer(nn.Module):
def __init__(self, use_checkpoint=False):
super().__init__()
self.use_checkpoint = use_checkpoint
self.blocks = nn.ModuleList([
CheckpointedTransformerBlock(d_model, n_heads) if use_checkpoint
else CheckpointedTransformerBlock(d_model, n_heads)
for _ in range(n_layers)
])
self.head = nn.Linear(d_model, 1000)
def forward(self, x):
for block in self.blocks:
if self.use_checkpoint:
x = torch.utils.checkpoint.checkpoint(block, x, use_reentrant=False)
else:
x = block(x)
return self.head(x[:, 0])
results = {}
for use_ckpt in [False, True]:
name = "con checkpointing" if use_ckpt else "senza checkpointing"
gc.collect()
torch.cuda.empty_cache() if torch.cuda.is_available() else None
torch.cuda.reset_peak_memory_stats() if torch.cuda.is_available() else None
model = SimpleTransformer(use_checkpoint=use_ckpt).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
x = torch.randn(batch_size, seq_len, d_model, device=device)
labels = torch.randint(0, 1000, (batch_size,), device=device)
# Forward + backward
torch.cuda.synchronize() if torch.cuda.is_available() else None
t0 = time.perf_counter()
for _ in range(10):
optimizer.zero_grad()
out = model(x)
loss = nn.CrossEntropyLoss()(out, labels)
loss.backward()
optimizer.step()
torch.cuda.synchronize() if torch.cuda.is_available() else None
elapsed = (time.perf_counter() - t0) / 10 * 1000
vram = torch.cuda.max_memory_allocated() / (1024**2) if torch.cuda.is_available() else 0
results[name] = {"vram_mb": round(vram, 1), "step_ms": round(elapsed, 1)}
print(f"{name}: VRAM={vram:.0f}MB, Step={elapsed:.1f}ms")
return results
# Risultati tipici (Transformer 12 layer, seq=2048, BS=8, RTX 3090):
# Senza checkpointing: VRAM=18.4GB, Step=285ms
# Con checkpointing: VRAM= 7.8GB, Step=378ms (-58% VRAM, +33% compute)
# ============================================================
# GRADIENT ACCUMULATION
# ============================================================
def train_with_gradient_accumulation(
model, optimizer, train_loader, criterion,
accumulation_steps: int = 4,
device: str = "cuda"
):
"""
Gradient accumulation: simula batch_size * accumulation_steps
con la memoria di batch_size.
Utile quando il batch_size reale e troppo piccolo per convergenza ottimale.
"""
model = model.to(device).train()
optimizer.zero_grad()
for step, (imgs, labels) in enumerate(train_loader):
imgs, labels = imgs.to(device), labels.to(device)
# Forward pass
with torch.autocast(device_type="cuda", dtype=torch.bfloat16):
output = model(imgs)
# Dividi loss per accumulation steps (mantiene la scala corretta)
loss = criterion(output, labels) / accumulation_steps
loss.backward()
# Aggiorna i pesi ogni N step
if (step + 1) % accumulation_steps == 0:
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
optimizer.zero_grad()
effective_batch = imgs.size(0) * accumulation_steps
print(f"Step {(step+1)//accumulation_steps} | "
f"Effective batch: {effective_batch} | Loss: {loss.item()*accumulation_steps:.4f}")
torch.compile: Optymalizacja wykresu
Torch.compile (PyTorch 2.0+) kompiluje model w zoptymalizowane jądra poprzez Triton lub inny backend. I najprostsza optymalizacja do zastosowania: tylko jedna linia kodu może prowadzić do 1,5-2,5-krotnego przyspieszenia wnioskowania.
import torch
from torchvision import models
import time, numpy as np
def benchmark_torch_compile():
device = "cuda" if torch.cuda.is_available() else "cpu"
# ============================================================
# MODALITA DI COMPILAZIONE
# ============================================================
# "default": Bilanciamento compile time / speedup
# "reduce-overhead": Minimizza overhead, ottimale per piccoli batch
# "max-autotune": Massima velocità (compile time molto più lungo, ~5-10 min)
# "inductor": Backend default (usa Triton su CUDA, C++ su CPU)
model_fp32 = models.resnet50(pretrained=False).to(device).eval()
# Compilazione eager (default)
model_compiled_default = torch.compile(
models.resnet50(pretrained=False).to(device).eval(),
mode="default"
)
# Compilazione per massima velocità
model_compiled_max = torch.compile(
models.resnet50(pretrained=False).to(device).eval(),
mode="max-autotune",
fullgraph=True # Evita graph breaks per massimo speedup
)
x = torch.randn(32, 3, 224, 224, device=device)
def time_model(model, x, n=100):
"""Benchmark con warmup."""
# Warmup (specialmente importante per torch.compile)
with torch.no_grad():
for _ in range(20):
model(x)
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies = []
with torch.no_grad():
for _ in range(n):
t0 = time.perf_counter()
model(x)
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies.append((time.perf_counter() - t0) * 1000)
return np.mean(latencies)
ms_eager = time_model(model_fp32, x)
ms_default = time_model(model_compiled_default, x)
# ms_max = time_model(model_compiled_max, x) # Richiede molto tempo di compile
print(f"Eager (FP32): {ms_eager:.2f} ms")
print(f"Compiled default: {ms_default:.2f} ms ({ms_eager/ms_default:.2f}x speedup)")
# Con BF16 + compile: effetto moltiplicativo
model_bf16_compiled = torch.compile(
models.resnet50(pretrained=False).to(device).eval(),
mode="default"
)
x_bf16 = x.to(torch.bfloat16)
model_bf16_compiled = model_bf16_compiled.to(torch.bfloat16)
ms_bf16_compiled = time_model(model_bf16_compiled, x_bf16)
print(f"BF16 + Compiled: {ms_bf16_compiled:.2f} ms ({ms_eager/ms_bf16_compiled:.2f}x speedup)")
# Risultati tipici RTX 4090:
# Eager FP32: 12.4 ms/step (BS=32)
# Compiled default: 7.8 ms/step (1.59x)
# BF16 + Compiled: 5.1 ms/step (2.43x)
benchmark_torch_compile()
Pamięć podręczna KV: optymalizacja pod kątem wnioskowania autoregresyjnego LLM
W modelach autoregresyjnych każdy wygenerowany token musi poczekać na uwagę wszystkich poprzednie tokeny. Bez optymalizacji klucze (K) i wartości (V) są przeliczane na każdym kroku — ze złożonością O(n^2) dla sekwencji n żetonów. The Pamięć podręczna KV oszczędza K i V każdej warstwy po każdym etapie, zmniejszając koszty generacji od O(n^2) do O(n).
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, Tuple
# ============================================================
# TRANSFORMER CON KV CACHE
# ============================================================
class CachedMultiHeadAttention(nn.Module):
"""
Multi-head attention con KV cache per generazione autogressiva.
Il cache evita di ricalcolare K, V per token passati.
"""
def __init__(self, d_model: int, n_heads: int):
super().__init__()
self.n_heads = n_heads
self.d_head = d_model // n_heads
self.scale = self.d_head ** -0.5
self.q_proj = nn.Linear(d_model, d_model, bias=False)
self.k_proj = nn.Linear(d_model, d_model, bias=False)
self.v_proj = nn.Linear(d_model, d_model, bias=False)
self.out_proj = nn.Linear(d_model, d_model, bias=False)
def forward(
self,
x: torch.Tensor, # [B, seq_len, d_model]
kv_cache: Optional[Tuple] = None # (K_cache, V_cache) o None
) -> Tuple[torch.Tensor, Tuple]:
B, T, D = x.shape
# Proietta Q, K, V
q = self.q_proj(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
k = self.k_proj(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
v = self.v_proj(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
# Concatena con cache esistente
if kv_cache is not None:
k_cache, v_cache = kv_cache
k = torch.cat([k_cache, k], dim=2) # [B, heads, T_total, d_head]
v = torch.cat([v_cache, v], dim=2)
# Attention (Flash Attention automatica con PyTorch 2.0+)
out = F.scaled_dot_product_attention(q, k, v, is_causal=(kv_cache is None))
out = out.transpose(1, 2).contiguous().view(B, T, D)
return self.out_proj(out), (k, v) # Ritorna output + nuovo cache
class CachedTransformerDecoder(nn.Module):
"""Decoder Transformer con KV cache per generazione efficiente."""
def __init__(self, vocab_size: int, d_model: int = 512,
n_heads: int = 8, n_layers: int = 6):
super().__init__()
self.embed = nn.Embedding(vocab_size, d_model)
self.pos_embed = nn.Embedding(2048, d_model)
self.layers = nn.ModuleList([
CachedMultiHeadAttention(d_model, n_heads)
for _ in range(n_layers)
])
self.norms = nn.ModuleList([nn.LayerNorm(d_model) for _ in range(n_layers)])
self.head = nn.Linear(d_model, vocab_size)
self.n_layers = n_layers
@torch.no_grad()
def generate(
self,
input_ids: torch.Tensor, # [B, seq_len]
max_new_tokens: int = 100,
temperature: float = 1.0
) -> torch.Tensor:
"""
Generazione autogressiva con KV cache.
Ogni step utilizza il cache dei token precedenti.
"""
B, T = input_ids.shape
device = input_ids.device
# Processa il prompt (prefill)
x = self.embed(input_ids)
positions = torch.arange(T, device=device).unsqueeze(0)
x = x + self.pos_embed(positions)
# Inizializza cache per ogni layer
kv_caches = [None] * self.n_layers
for i, (layer, norm) in enumerate(zip(self.layers, self.norms)):
x_norm = norm(x)
attn_out, kv_caches[i] = layer(x_norm, kv_caches[i])
x = x + attn_out
# Generazione token per token (usando il cache)
generated = []
for step in range(max_new_tokens):
# Solo l'ultimo token come query
last_token = input_ids[:, -1:] if step == 0 else new_token
x_new = self.embed(last_token)
pos = torch.tensor([[T + step]], device=device)
x_new = x_new + self.pos_embed(pos)
for i, (layer, norm) in enumerate(zip(self.layers, self.norms)):
x_norm = norm(x_new)
attn_out, kv_caches[i] = layer(x_norm, kv_caches[i])
x_new = x_new + attn_out
# Campiona prossimo token
logits = self.head(x_new[:, -1, :]) / temperature
new_token = torch.multinomial(torch.softmax(logits, -1), 1)
generated.append(new_token)
return torch.cat(generated, dim=1)
# Benchmark KV cache vs no cache
def benchmark_generation(model, vocab_size=32000, seq_len=128,
max_new=50, device="cuda"):
model = model.to(device).eval()
input_ids = torch.randint(0, vocab_size, (1, seq_len), device=device)
# Con KV cache (normale)
t0 = time.perf_counter()
with torch.no_grad():
output = model.generate(input_ids, max_new_tokens=max_new)
t_cached = (time.perf_counter() - t0) * 1000
tokens_per_sec = max_new / (t_cached / 1000)
print(f"Con KV Cache: {t_cached:.1f}ms totale, {tokens_per_sec:.1f} token/s")
Porównanie systematyczne: od 48 GB do 8 GB RTX
Podsumowujemy wszystkie optymalizacje widoczne w serii, stosując je stopniowo do modelu podstawowego, pokazującego kompromis w zakresie dokładności/pamięci/szybkości.
Pełne porównanie: Lama-3.1-8B na RTX 3090 (24 GB)
| Konfiguracja | VRAM | Przepustowość | HellaSwag | Zakłopotanie | Notatki |
|---|---|---|---|---|---|
| Wartość podstawowa BF16 | 16,0 GB | 38 t/s | 82,1% | 6.14 | Punkt odniesienia |
| + Błysk Uwaga 2 | 14,2 GB | 52 t/s | 82,1% | 6.14 | -11% VRAM, +37% szybkości |
| + torch.compile | 14,2 GB | 68 t/s | 82,1% | 6.14 | +31% do Błyskowej Uwaga |
| INT8 (bity i bajty) | 8,5 GB | 35 t/s | 81,8% | 6.21 | -47% VRAM, -0,3% wg |
| INT4 NF4 (bnb) | 4,9 GB | 42 t/s | 81,2% | 6,47 | -69% VRAM, -0,9% wg |
| GPTQ INT4 | 4,8 GB | 55 t/s | 81,5% | 6.39 | -70% VRAM, -0,6% wg |
| AWQ INT4 | 4,7 GB | 52 t/s | 81,6% | 6.35 | -71% VRAM, -0,5% wg |
| GGUF Q4_K_M (procesor) | 0 VRAM (5 GB RAM) | 18 t/s | 81,3% | 6.42 | Nie wymaga procesora graficznego |
Przybliżone wartości na RTX 3090 (24 GB VRAM). Wydajność mierzona dla partii=1, seq=512.
Przewodnik po decyzjach: jaka optymalizacja dla jakiego scenariusza
# ALBERO DECISIONALE PER OTTIMIZZAZIONE DL
def recommend_optimization(
vram_available_gb: float,
task: str, # "training" | "inference" | "edge"
accuracy_critical: bool,
hardware: str # "server_gpu" | "consumer_gpu" | "cpu" | "edge"
) -> dict:
"""
Raccomanda le ottimizzazioni più appropriate per il proprio scenario.
"""
recommendations = []
priority = []
# === SEMPRE DA FARE (zero o quasi zero costo) ===
priority.append("1. Mixed Precision (BF16/FP16): abilita SEMPRE su GPU Ampere+")
priority.append("2. Flash Attention: abilita se seq_len > 512")
priority.append("3. torch.compile: abilita se PyTorch 2.0+, +30-50% speedup inference")
priority.append("4. KV Cache: abilita SEMPRE per LLM autoregressive generation")
if task == "training":
if vram_available_gb < 24:
priority.append("5. Gradient Checkpointing: -50% VRAM, +33% compute")
priority.append("6. Gradient Accumulation: simula batch più grandi")
if hardware in ["consumer_gpu", "edge"]:
priority.append("7. QLoRA: fine-tuning con INT4 + LoRA su GPU consumer")
if task in ["inference", "edge"]:
if not accuracy_critical:
if hardware == "server_gpu":
priority.append("5. GPTQ INT4: massimo throughput su GPU NVIDIA")
elif hardware in ["consumer_gpu", "cpu"]:
priority.append("5. AWQ INT4 o GGUF Q4_K_M: per hardware eterogeneo")
elif hardware == "edge":
priority.append("5. GGUF Q3_K_M o Q4_K_M: per Raspberry Pi / embedded")
else:
priority.append("5. INT8 (bitsandbytes): minima perdita di accuratezza")
if vram_available_gb < 16:
priority.append("6. ONNX Export: riduzione overhead runtime +20-40%")
priority.append("7. Considera distillazione verso modello più piccolo")
print("=== RACCOMANDAZIONI OTTIMIZZAZIONE ===")
for p in priority:
print(f" {p}")
return {"priorities": priority}
# Esempi:
print("--- Scenario 1: Fine-tuning su RTX 4080 (16GB) ---")
recommend_optimization(16, "training", True, "consumer_gpu")
print("\n--- Scenario 2: Inferenza su Raspberry Pi ---")
recommend_optimization(0, "inference", False, "edge")
print("\n--- Scenario 3: Produzione su A100 (80GB) ---")
recommend_optimization(80, "inference", True, "server_gpu")
Podsumowanie optymalizacji: oczekiwany wpływ
| Technika | Oszczędzanie VRAM-u | Przyspieszenie | Utrata Ac | Złożoność |
|---|---|---|---|---|
| Mieszana precyzja BF16 | -50% | 2-3x | 0% | Niski (1 linia) |
| Błysk Uwaga 2 | -50-90% | 2-8x | 0% | Niski (1 linia) |
| Torch.compile | 0% | 1,5-2,5x | 0% | Niski (1 linia) |
| Pamięć podręczna KV | + VRAM | 10-50x gen | 0% | Niski |
| Gradientowe punkty kontrolne | -50-70% | -0,7x | 0% | Niski |
| Kwantyzacja INT8 | -50% | 0,9-1,1x | 0-0,5% | Niski |
| INT4 GPTQ/AWQ | -75% | 1,3-1,8x | 0,5-1,5% | Przeciętny |
| Destylacja | -70-90% | 5-20x | 5-15% | Wysoki |
| Przycinanie strukturalne | -30-70% | 2-5x | 2-10% | Wysoki |
Wnioski z serii
Przeszliśmy całą serię Zaawansowane głębokie uczenie się i wdrażanie brzegowe: od mechanizmu uwagi w Transformersach po dostrajanie za pomocą LoRA, od kwantyzacji GPTQ po uporządkowane przycinanie, od destylacji po transformatory wizyjne, od NAS po wdrożenie brzegowe z Raspberry Pi i Jetson, od Ollama do tego końcowego testu porównawczego.
Główny i jasny przekaz: nie ma jednej „najlepszej” techniki. Optymalny wybór zawsze zależy od kontekstu — dostępnego sprzętu, wymagań dotyczących dokładności, docelowego opóźnienia, koszty operacyjne. Jednak dzięki systematycznym ramom benchmarkingu przedstawionym w tym artykule możesz mierzyć zamiast zgadywaći podejmować świadome decyzje.
Trend na rok 2026 jest jasny: modele zmierzają w stronę krawędzi. Gartner 2027 przewiduje, że m.in SLM przewyższa chmurę LLM 3 razy w użyciu. Techniki z tej serii — kwantyzacja, destylacja, wdrażanie rozwiązań brzegowych, Ollama — to nie nisze akademickie: to umiejętności fundamentalne dla każdego, kto chce pracować ze sztuczną inteligencją w nadchodzących latach.
Podsumowanie serii: Zaawansowane głębokie uczenie się
- Artykuł 1: Mechanizm uwagi w transformatorach
- Artykuł 2: Dostrajanie z LoRA i QLoRA
- Artykuł 3: Kwantyzacja GPTQ, AWQ, INT8
- Artykuł 4: Destylacja wiedzy
- Artykuł 5: Przycinanie sieci neuronowych
- Artykuł 6: Transformator wizyjny (ViT)
- Artykuł 7: Wyszukiwanie architektury neuronowej
- Artykuł 8: Głębokie uczenie się na urządzeniach brzegowych
- Artykuł 9: Pomieszczenia Ollama i LLM
- Artykuł 10 (ten): Analiza porównawcza i optymalizacja
Powiązane serie: MLOps | Wizja komputerowa | Inżynieria AI







