Karşılaştırma ve Optimizasyon: 48 GB GPU'dan 8 GB RTX'e
Bir modeliniz var. 80GB A100'de çalışır. Ancak bunu 24 GB'lık bir RTX 3090'a dağıtmanız gerekir. veya bir RTX 4060 8GB dizüstü bilgisayarda, hatta bir Raspberry Pi'de. Ne kadar olduğunu nereden biliyorsun? FP32'den INT4'e geçerken doğruluk kaybı mı yaşıyorsunuz? Flash Attention ile ne kadar hız kazanırsınız? Ölçmeye değer mi yoksa damıtmak daha mı iyi? Degrade denetim noktası oluşturma ne kadar bellek tasarrufu sağlar?
Sistematik kıyaslama olmadan bu sorular cevapsız kalır ve sonuçta sezgilere veya konfigürasyonlarla yayınlanmış kıyaslamalara dayalı olarak optimal olmayan seçimler yapmak sizinkinden farklı. Serinin bu son makalesinde bir çerçeve oluşturuyoruz ölçmek için kapsamlı kıyaslama her boyut performans: hafıza, gecikme, verim, doğruluk ve güç tüketimi.
Daha sonra seride görülen tüm teknikleri sistematik olarak uyguluyoruz - kuantizasyon, budama, damıtma, Flash Attention, degrade kontrol noktası belirleme, karma hassasiyet — ve 48 GB gerektiren bir modelden 8 GB ile çalışan bir modele nasıl geçileceğini gösteriyoruz, kalite açısından tam olarak neye ödediğinizi gösteren ölçümlerle.
Ne Öğreneceksiniz
- DL modelleri için sistematik kıyaslama çerçevesi
- VRAM'ı, gecikmeyi, verimi ve FLOP'ları doğru şekilde ölçün
- Karma Hassasiyet Eğitimi: FP16 vs BF16 vs FP32
- Flash Attention 2/3: Ne kadar tasarruf edersiniz ve ne zaman kullanırsınız?
- Gradient Checkpointing: bellek ve bilgi işlem dengesi
- Gradyan Birikimi: Neredeyse büyük parti boyutları
- Torch.compile ve çalışma zamanı optimizasyonları
- KV Önbelleği: LLM otoregresif çıkarımı için optimizasyon
- Sistematik karşılaştırma: tüm teknikler karşılaştırıldı
- Karar rehberliği: hangi senaryo için hangi optimizasyon
Sistematik Kıyaslama Çerçevesi
Optimize etmeden önce hassas bir şekilde ölçmeniz gerekir. Bir kıyaslama çerçevesi profesyonel ölçüm: en yüksek VRAM kullanımı, ortalama gecikme ve P95, verim (jeton/s veya img/s), FLOP'lar, enerji tüketimi ve belirli görevlerde doğruluk. Anahtar şu: tekrarlanabilirlik: Çalıştırmalar arasında %10 oranında değişen kıyaslamalar işe yaramaz.
import torch
import torch.nn as nn
import time
import numpy as np
from dataclasses import dataclass, asdict
from typing import Optional, Callable
import gc
# ============================================================
# DATACLASS PER RISULTATI BENCHMARK
# ============================================================
@dataclass
class BenchmarkResult:
"""Risultati completi di un benchmark."""
name: str
# Memoria
vram_allocated_mb: float
vram_reserved_mb: float
vram_peak_mb: float
# Velocita
latency_ms_mean: float
latency_ms_p50: float
latency_ms_p95: float
latency_ms_p99: float
throughput_per_sec: float
# Modello
params_total: int
params_trainable: int
model_size_mb: float
# Opzionali
accuracy: Optional[float] = None
flops_total: Optional[float] = None
power_watts: Optional[float] = None
def print_summary(self):
print(f"\n=== {self.name} ===")
print(f" VRAM: {self.vram_peak_mb:.0f} MB peak, {self.vram_allocated_mb:.0f} MB alloc")
print(f" Latenza: {self.latency_ms_mean:.1f}ms mean, "
f"{self.latency_ms_p95:.1f}ms p95, {self.latency_ms_p99:.1f}ms p99")
print(f" Throughput: {self.throughput_per_sec:.1f}/s")
print(f" Parametri: {self.params_total:,} ({self.model_size_mb:.1f} MB)")
if self.accuracy:
print(f" Accuratezza: {self.accuracy:.4f}")
# ============================================================
# CLASSE PRINCIPALE DI BENCHMARKING
# ============================================================
class DeepLearningBenchmark:
def __init__(self, device: str = "cuda"):
self.device = device
self.results = []
def _count_params(self, model: nn.Module) -> tuple:
total = sum(p.numel() for p in model.parameters())
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
return total, trainable
def _model_size_mb(self, model: nn.Module) -> float:
total_bytes = sum(p.numel() * p.element_size() for p in model.parameters())
return total_bytes / (1024 ** 2)
def _reset_memory(self):
"""Reset GPU memory per benchmark pulito."""
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()
def benchmark_inference(
self,
name: str,
model: nn.Module,
input_fn: Callable[[], tuple],
n_warmup: int = 10,
n_runs: int = 100,
batch_size: int = 1
) -> BenchmarkResult:
"""
Benchmark completo di inferenza.
input_fn: funzione che restituisce input per il modello
"""
model = model.to(self.device).eval()
self._reset_memory()
# Warmup
with torch.no_grad():
for _ in range(n_warmup):
inputs = input_fn()
if isinstance(inputs, dict):
model(**{k: v.to(self.device) for k, v in inputs.items()})
else:
model(inputs.to(self.device))
# Misura memoria post-warmup
if torch.cuda.is_available():
mem_alloc = torch.cuda.memory_allocated() / (1024**2)
mem_reserved = torch.cuda.memory_reserved() / (1024**2)
# Benchmark vero
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies = []
for _ in range(n_runs):
inputs = input_fn()
t0 = time.perf_counter()
with torch.no_grad():
if isinstance(inputs, dict):
_ = model(**{k: v.to(self.device) for k, v in inputs.items()})
else:
_ = model(inputs.to(self.device))
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies.append((time.perf_counter() - t0) * 1000)
if torch.cuda.is_available():
mem_peak = torch.cuda.max_memory_allocated() / (1024**2)
else:
mem_alloc = mem_reserved = mem_peak = 0.0
latencies = np.array(latencies)
total_params, trainable_params = self._count_params(model)
result = BenchmarkResult(
name=name,
vram_allocated_mb=mem_alloc,
vram_reserved_mb=mem_reserved,
vram_peak_mb=mem_peak,
latency_ms_mean=float(np.mean(latencies)),
latency_ms_p50=float(np.percentile(latencies, 50)),
latency_ms_p95=float(np.percentile(latencies, 95)),
latency_ms_p99=float(np.percentile(latencies, 99)),
throughput_per_sec=1000 / np.mean(latencies) * batch_size,
params_total=total_params,
params_trainable=trainable_params,
model_size_mb=self._model_size_mb(model)
)
result.print_summary()
self.results.append(result)
return result
def benchmark_training_step(
self,
name: str,
model: nn.Module,
optimizer: torch.optim.Optimizer,
loss_fn: Callable,
input_fn: Callable,
n_steps: int = 50
) -> dict:
"""Benchmark di un singolo step di training."""
model = model.to(self.device).train()
self._reset_memory()
latencies = []
for step in range(n_steps):
inputs, labels = input_fn()
inputs = inputs.to(self.device)
labels = labels.to(self.device)
t0 = time.perf_counter()
optimizer.zero_grad()
outputs = model(inputs)
loss = loss_fn(outputs, labels)
loss.backward()
optimizer.step()
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies.append((time.perf_counter() - t0) * 1000)
return {
"name": name,
"vram_peak_mb": torch.cuda.max_memory_allocated() / (1024**2) if torch.cuda.is_available() else 0,
"step_ms_mean": float(np.mean(latencies[5:])), # Skip warmup
"step_ms_p95": float(np.percentile(latencies[5:], 95))
}
def compare_results(self) -> None:
"""Stampa tabella comparativa di tutti i risultati."""
if not self.results:
print("Nessun risultato disponibile.")
return
baseline = self.results[0]
print(f"\n{'Config':<30} {'VRAM (MB)':>12} {'Latency (ms)':>14} {'Throughput':>12} {'Speedup':>10}")
print("-" * 82)
for r in self.results:
speedup = baseline.latency_ms_mean / r.latency_ms_mean
print(f"{r.name:<30} {r.vram_peak_mb:>12.0f} {r.latency_ms_mean:>14.2f} "
f"{r.throughput_per_sec:>12.1f} {speedup:>10.2f}x")
# Uso:
bench = DeepLearningBenchmark(device="cuda" if torch.cuda.is_available() else "cpu")
print("Framework di benchmarking inizializzato")
Karma Hassasiyet: FP32 vs FP16 vs BF16
Il karma hassas eğitim ve etkinleştirilecek ilk optimizasyon: neredeyse
Yapılandırma için sıfır ek yük, 2 kat bellek tasarrufu, genellikle donanımda 2-3 kat hızlanma
Amper+. torch.autocast hangi işlemlerin gerçekleştirileceğini otomatik olarak yönetir
azaltılmış hassasiyette.
FP16 ve BF16 ile ikili format arasındaki temel fark: FP16'nın üs için 5 biti vardır ve mantis için 10 (6e-5 ila 6.5e4 aralığı), BF16'da ise üs için 8 bit ve üs için 7 bit bulunur. mantis (FP32 ile aynı aralık, 1,2e-38'den 3,4e38'e). BF16 ve sırasında çok daha kararlı çünkü büyük eğimlerde taşma/düşük akışa neden olmaz.
import torch
import torch.nn as nn
from torch.cuda.amp import GradScaler
# ============================================================
# CONFRONTO FP32 vs FP16 vs BF16
# ============================================================
def train_step_fp32(model, optimizer, imgs, labels, criterion):
"""Training step standard FP32."""
optimizer.zero_grad()
output = model(imgs)
loss = criterion(output, labels)
loss.backward()
optimizer.step()
return loss.item()
def train_step_fp16(model, optimizer, imgs, labels, criterion, scaler: GradScaler):
"""
Training step con AMP FP16.
GradScaler necessario: FP16 ha range limitato, loss scaling evita underflow.
"""
optimizer.zero_grad()
with torch.autocast(device_type="cuda", dtype=torch.float16):
output = model(imgs)
loss = criterion(output, labels)
# Scala la loss per evitare underflow in FP16
scaler.scale(loss).backward()
# Decomprime gradienti prima di clip
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
# Aggiorna pesi (salta se ci sono NaN/Inf nei gradienti)
scaler.step(optimizer)
scaler.update()
return loss.item()
def train_step_bf16(model, optimizer, imgs, labels, criterion):
"""
Training step con BF16.
BF16 NON richiede GradScaler: ha range dinamico uguale a FP32.
Disponibile su: A100, RTX 3000+, Apple M-series.
"""
optimizer.zero_grad()
with torch.autocast(device_type="cuda", dtype=torch.bfloat16):
output = model(imgs)
loss = criterion(output, labels)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
return loss.item()
# Benchmark comparativo
from torchvision import models
import time, gc
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def compare_precisions(model_fn=models.resnet50, n_steps=100,
batch_size=32, img_size=224):
"""Confronta FP32, FP16, BF16 per training e inferenza."""
criterion = nn.CrossEntropyLoss()
configs = [
("FP32", torch.float32, False),
("FP16", torch.float16, True), # Richiede GradScaler
("BF16", torch.bfloat16, False) # No GradScaler
]
results = {}
for name, dtype, use_scaler in configs:
model = model_fn(pretrained=False).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scaler = GradScaler() if use_scaler else None
# Reset memory stats
torch.cuda.reset_peak_memory_stats() if torch.cuda.is_available() else None
gc.collect()
torch.cuda.empty_cache() if torch.cuda.is_available() else None
timings = []
for step in range(n_steps):
imgs = torch.randn(batch_size, 3, img_size, img_size, device=device)
labels = torch.randint(0, 1000, (batch_size,), device=device)
t0 = time.perf_counter()
with torch.autocast(device_type="cuda", dtype=dtype, enabled=(dtype != torch.float32)):
out = model(imgs)
loss = criterion(out, labels)
if scaler:
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
else:
optimizer.zero_grad()
loss.backward()
optimizer.step()
torch.cuda.synchronize() if torch.cuda.is_available() else None
timings.append((time.perf_counter() - t0) * 1000)
vram_peak = torch.cuda.max_memory_allocated() / (1024**2) if torch.cuda.is_available() else 0
results[name] = {
"vram_mb": round(vram_peak, 1),
"step_ms": round(np.mean(timings[10:]), 2),
"throughput_imgs_s": round(batch_size * 1000 / np.mean(timings[10:]), 1)
}
print(f"{name}: VRAM={vram_peak:.0f}MB, {np.mean(timings[10:]):.1f}ms/step, "
f"{batch_size*1000/np.mean(timings[10:]):.0f} img/s")
return results
# Risultati tipici ResNet-50 BS=32 su RTX 4090:
# FP32: VRAM=6200MB, 95ms/step, 336 img/s
# FP16: VRAM=3100MB, 41ms/step, 780 img/s (2x velocità, 50% VRAM)
# BF16: VRAM=3100MB, 38ms/step, 842 img/s (2.2x velocità, 50% VRAM)
Flash Dikkat: Kuralları Değiştiren Optimizasyon
Flaş Dikkati (Dao ve diğerleri, 2022) ve belki de en etkili optimizasyon Son yılların Transformers'ları için. Dikkat hesaplamasını yeniden formüle edin GÇ'ye bağlı farkındalık: Dikkat matrisinin tamamını HBM'de hayata geçirmek yerine (bellekte O(n^2) karmaşıklığı vardır), SRAM'de kalırken blok dikkatini hesaplar. Sonuç: Bellekte O(n^2) yerine O(n) karmaşıklığı, uzun dizilerde 2-4 kat hızlanma.
Flash Attention 2 (2023), GPU'daki paralelliği daha da geliştirerek FP16 FLOPS'un teorik kullanımının %72'si. Flash Attention 3 (2024) desteği eklendi FP8 ve Hopper'a özel optimizasyonlar için, FA2'ye kıyasla 2 kata kadar hızlanma.
import torch
import torch.nn as nn
import torch.nn.functional as F
import time, math
# ============================================================
# FLASH ATTENTION vs STANDARD ATTENTION: CONFRONTO
# ============================================================
def standard_attention(q, k, v, scale=None):
"""
Attention standard: materializza la matrice NxN completa in GPU memory.
Complessità memoria: O(N^2 * d_head)
"""
if scale is None:
scale = q.size(-1) ** -0.5
# [B, heads, N, N] - questa matrice può essere ENORME per seq lunghe!
attn = torch.softmax((q @ k.transpose(-2, -1)) * scale, dim=-1)
return attn @ v
def flash_attention_native(q, k, v):
"""
Flash Attention tramite PyTorch 2.0+ scaled_dot_product_attention.
Sceglie automaticamente l'implementazione ottimale:
- FlashAttention-2 se disponibile (CUDA Ampere+)
- Memory-efficient attention (xFormers) come fallback
- Standard attention come ultimo fallback
"""
# Automaticamente ottimizzato da PyTorch
return F.scaled_dot_product_attention(q, k, v, is_causal=False)
def benchmark_attention_implementations(
batch_size=4, n_heads=12, seq_lengths=[512, 1024, 2048, 4096, 8192],
d_head=64, device="cuda"
):
"""
Confronta Standard vs Flash Attention su diverse lunghezze di sequenza.
"""
print(f"{'Seq Len':>10} | {'Standard (ms)':>15} | {'Flash (ms)':>12} | "
f"{'Speedup':>10} | {'VRAM Std (MB)':>15} | {'VRAM Flash (MB)':>15}")
print("-" * 90)
for seq_len in seq_lengths:
q = torch.randn(batch_size, n_heads, seq_len, d_head, device=device, dtype=torch.float16)
k = torch.randn_like(q)
v = torch.randn_like(q)
# Warmup
for _ in range(5):
standard_attention(q, k, v)
flash_attention_native(q, k, v)
# Benchmark Standard
torch.cuda.reset_peak_memory_stats()
torch.cuda.synchronize()
t0 = time.perf_counter()
for _ in range(20):
out_std = standard_attention(q, k, v)
torch.cuda.synchronize()
std_ms = (time.perf_counter() - t0) / 20 * 1000
vram_std = torch.cuda.max_memory_allocated() / (1024**2)
# Benchmark Flash Attention
torch.cuda.reset_peak_memory_stats()
torch.cuda.synchronize()
t0 = time.perf_counter()
for _ in range(20):
out_flash = flash_attention_native(q, k, v)
torch.cuda.synchronize()
flash_ms = (time.perf_counter() - t0) / 20 * 1000
vram_flash = torch.cuda.max_memory_allocated() / (1024**2)
speedup = std_ms / flash_ms
print(f"{seq_len:>10} | {std_ms:>15.2f} | {flash_ms:>12.2f} | "
f"{speedup:>10.2f}x | {vram_std:>15.0f} | {vram_flash:>15.0f}")
# Risultati tipici su RTX 4090 (FP16, B=4, heads=12, d_head=64):
# Seq Len | Standard (ms) | Flash (ms) | Speedup | VRAM Std (MB) | VRAM Flash (MB)
# -----------------------------------------------------------------------------------
# 512 | 0.82 | 0.31 | 2.65x | 48 | 12
# 1024 | 2.45 | 0.58 | 4.22x | 192 | 24
# 2048 | 9.12 | 1.12 | 8.14x | 768 | 48
# 4096 | 35.80 | 2.21 | 16.20x | 3072 | 96
# 8192 | 144.20 | 4.38 | 32.92x | 12288 | 192
# Flash Attention scala LINEARMENTE: a seq=8192 usa 64x meno VRAM!
Gradyan Kontrol Noktalaması ve Gradyan Birikimi
VRAM eğitim sırasında darboğaz olduğunda, iki tamamlayıcı teknik donanımı yükseltmeden daha büyük grupları eğitmenize olanak tanır:
import torch
import torch.nn as nn
from torch.utils.checkpoint import checkpoint_sequential
import gc
# ============================================================
# GRADIENT CHECKPOINTING
# ============================================================
# Idea: invece di salvare tutte le attivazioni intermedie per il backward pass,
# le ricalcola al momento (tradeoff: +33% compute, -50-70% memoria)
class CheckpointedTransformerBlock(nn.Module):
"""Transformer block con gradient checkpointing."""
def __init__(self, d_model=768, n_heads=12):
super().__init__()
self.attn = nn.MultiheadAttention(d_model, n_heads, batch_first=True)
self.norm1 = nn.LayerNorm(d_model)
self.ff = nn.Sequential(
nn.Linear(d_model, d_model * 4), nn.GELU(),
nn.Linear(d_model * 4, d_model)
)
self.norm2 = nn.LayerNorm(d_model)
def _attn_block(self, x):
attn_out, _ = self.attn(x, x, x)
return self.norm1(x + attn_out)
def _ff_block(self, x):
return self.norm2(x + self.ff(x))
def forward(self, x):
# Gradient checkpointing: ogni sotto-modulo viene ricalcolato
# durante il backward invece di essere salvato
x = torch.utils.checkpoint.checkpoint(self._attn_block, x, use_reentrant=False)
x = torch.utils.checkpoint.checkpoint(self._ff_block, x, use_reentrant=False)
return x
def enable_gradient_checkpointing_hf(model):
"""Abilita gradient checkpointing su modelli HuggingFace."""
model.gradient_checkpointing_enable()
print(f"Gradient checkpointing abilitato su {type(model).__name__}")
# Benchmark Gradient Checkpointing
def compare_checkpointing(seq_len=2048, batch_size=8, d_model=768,
n_layers=12, n_heads=12, device="cuda"):
"""Confronta training con e senza gradient checkpointing."""
class SimpleTransformer(nn.Module):
def __init__(self, use_checkpoint=False):
super().__init__()
self.use_checkpoint = use_checkpoint
self.blocks = nn.ModuleList([
CheckpointedTransformerBlock(d_model, n_heads) if use_checkpoint
else CheckpointedTransformerBlock(d_model, n_heads)
for _ in range(n_layers)
])
self.head = nn.Linear(d_model, 1000)
def forward(self, x):
for block in self.blocks:
if self.use_checkpoint:
x = torch.utils.checkpoint.checkpoint(block, x, use_reentrant=False)
else:
x = block(x)
return self.head(x[:, 0])
results = {}
for use_ckpt in [False, True]:
name = "con checkpointing" if use_ckpt else "senza checkpointing"
gc.collect()
torch.cuda.empty_cache() if torch.cuda.is_available() else None
torch.cuda.reset_peak_memory_stats() if torch.cuda.is_available() else None
model = SimpleTransformer(use_checkpoint=use_ckpt).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
x = torch.randn(batch_size, seq_len, d_model, device=device)
labels = torch.randint(0, 1000, (batch_size,), device=device)
# Forward + backward
torch.cuda.synchronize() if torch.cuda.is_available() else None
t0 = time.perf_counter()
for _ in range(10):
optimizer.zero_grad()
out = model(x)
loss = nn.CrossEntropyLoss()(out, labels)
loss.backward()
optimizer.step()
torch.cuda.synchronize() if torch.cuda.is_available() else None
elapsed = (time.perf_counter() - t0) / 10 * 1000
vram = torch.cuda.max_memory_allocated() / (1024**2) if torch.cuda.is_available() else 0
results[name] = {"vram_mb": round(vram, 1), "step_ms": round(elapsed, 1)}
print(f"{name}: VRAM={vram:.0f}MB, Step={elapsed:.1f}ms")
return results
# Risultati tipici (Transformer 12 layer, seq=2048, BS=8, RTX 3090):
# Senza checkpointing: VRAM=18.4GB, Step=285ms
# Con checkpointing: VRAM= 7.8GB, Step=378ms (-58% VRAM, +33% compute)
# ============================================================
# GRADIENT ACCUMULATION
# ============================================================
def train_with_gradient_accumulation(
model, optimizer, train_loader, criterion,
accumulation_steps: int = 4,
device: str = "cuda"
):
"""
Gradient accumulation: simula batch_size * accumulation_steps
con la memoria di batch_size.
Utile quando il batch_size reale e troppo piccolo per convergenza ottimale.
"""
model = model.to(device).train()
optimizer.zero_grad()
for step, (imgs, labels) in enumerate(train_loader):
imgs, labels = imgs.to(device), labels.to(device)
# Forward pass
with torch.autocast(device_type="cuda", dtype=torch.bfloat16):
output = model(imgs)
# Dividi loss per accumulation steps (mantiene la scala corretta)
loss = criterion(output, labels) / accumulation_steps
loss.backward()
# Aggiorna i pesi ogni N step
if (step + 1) % accumulation_steps == 0:
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
optimizer.zero_grad()
effective_batch = imgs.size(0) * accumulation_steps
print(f"Step {(step+1)//accumulation_steps} | "
f"Effective batch: {effective_batch} | Loss: {loss.item()*accumulation_steps:.4f}")
torch.compile: Grafik Optimizasyonu
meşale.derleme (PyTorch 2.0+) modeli optimize edilmiş çekirdekler halinde derler Triton veya başka bir arka uç aracılığıyla. Ve uygulanacak en basit optimizasyon: yalnızca bir tane kod satırı çıkarımda 1,5-2,5 kat hızlanmaya yol açabilir.
import torch
from torchvision import models
import time, numpy as np
def benchmark_torch_compile():
device = "cuda" if torch.cuda.is_available() else "cpu"
# ============================================================
# MODALITA DI COMPILAZIONE
# ============================================================
# "default": Bilanciamento compile time / speedup
# "reduce-overhead": Minimizza overhead, ottimale per piccoli batch
# "max-autotune": Massima velocità (compile time molto più lungo, ~5-10 min)
# "inductor": Backend default (usa Triton su CUDA, C++ su CPU)
model_fp32 = models.resnet50(pretrained=False).to(device).eval()
# Compilazione eager (default)
model_compiled_default = torch.compile(
models.resnet50(pretrained=False).to(device).eval(),
mode="default"
)
# Compilazione per massima velocità
model_compiled_max = torch.compile(
models.resnet50(pretrained=False).to(device).eval(),
mode="max-autotune",
fullgraph=True # Evita graph breaks per massimo speedup
)
x = torch.randn(32, 3, 224, 224, device=device)
def time_model(model, x, n=100):
"""Benchmark con warmup."""
# Warmup (specialmente importante per torch.compile)
with torch.no_grad():
for _ in range(20):
model(x)
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies = []
with torch.no_grad():
for _ in range(n):
t0 = time.perf_counter()
model(x)
torch.cuda.synchronize() if torch.cuda.is_available() else None
latencies.append((time.perf_counter() - t0) * 1000)
return np.mean(latencies)
ms_eager = time_model(model_fp32, x)
ms_default = time_model(model_compiled_default, x)
# ms_max = time_model(model_compiled_max, x) # Richiede molto tempo di compile
print(f"Eager (FP32): {ms_eager:.2f} ms")
print(f"Compiled default: {ms_default:.2f} ms ({ms_eager/ms_default:.2f}x speedup)")
# Con BF16 + compile: effetto moltiplicativo
model_bf16_compiled = torch.compile(
models.resnet50(pretrained=False).to(device).eval(),
mode="default"
)
x_bf16 = x.to(torch.bfloat16)
model_bf16_compiled = model_bf16_compiled.to(torch.bfloat16)
ms_bf16_compiled = time_model(model_bf16_compiled, x_bf16)
print(f"BF16 + Compiled: {ms_bf16_compiled:.2f} ms ({ms_eager/ms_bf16_compiled:.2f}x speedup)")
# Risultati tipici RTX 4090:
# Eager FP32: 12.4 ms/step (BS=32)
# Compiled default: 7.8 ms/step (1.59x)
# BF16 + Compiled: 5.1 ms/step (2.43x)
benchmark_torch_compile()
KV Önbelleği: Yüksek Lisans Otoregresif Çıkarımı için Optimizasyon
Otoregresif modellerde, oluşturulan her bir jetonun tüm platformlarda ilgiyi beklemesi gerekir. önceki jetonlar. Optimizasyon olmadan anahtarlar (K) ve değerler (V) yeniden hesaplanır her adımda - n jetonluk bir dizi için O(n^2) karmaşıklığıyla. KV Önbelleği Her adımdan sonra her katmanın K ve V'sinden tasarruf ederek maliyeti düşürür O(n^2)'den O(n)'ye nesil.
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, Tuple
# ============================================================
# TRANSFORMER CON KV CACHE
# ============================================================
class CachedMultiHeadAttention(nn.Module):
"""
Multi-head attention con KV cache per generazione autogressiva.
Il cache evita di ricalcolare K, V per token passati.
"""
def __init__(self, d_model: int, n_heads: int):
super().__init__()
self.n_heads = n_heads
self.d_head = d_model // n_heads
self.scale = self.d_head ** -0.5
self.q_proj = nn.Linear(d_model, d_model, bias=False)
self.k_proj = nn.Linear(d_model, d_model, bias=False)
self.v_proj = nn.Linear(d_model, d_model, bias=False)
self.out_proj = nn.Linear(d_model, d_model, bias=False)
def forward(
self,
x: torch.Tensor, # [B, seq_len, d_model]
kv_cache: Optional[Tuple] = None # (K_cache, V_cache) o None
) -> Tuple[torch.Tensor, Tuple]:
B, T, D = x.shape
# Proietta Q, K, V
q = self.q_proj(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
k = self.k_proj(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
v = self.v_proj(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
# Concatena con cache esistente
if kv_cache is not None:
k_cache, v_cache = kv_cache
k = torch.cat([k_cache, k], dim=2) # [B, heads, T_total, d_head]
v = torch.cat([v_cache, v], dim=2)
# Attention (Flash Attention automatica con PyTorch 2.0+)
out = F.scaled_dot_product_attention(q, k, v, is_causal=(kv_cache is None))
out = out.transpose(1, 2).contiguous().view(B, T, D)
return self.out_proj(out), (k, v) # Ritorna output + nuovo cache
class CachedTransformerDecoder(nn.Module):
"""Decoder Transformer con KV cache per generazione efficiente."""
def __init__(self, vocab_size: int, d_model: int = 512,
n_heads: int = 8, n_layers: int = 6):
super().__init__()
self.embed = nn.Embedding(vocab_size, d_model)
self.pos_embed = nn.Embedding(2048, d_model)
self.layers = nn.ModuleList([
CachedMultiHeadAttention(d_model, n_heads)
for _ in range(n_layers)
])
self.norms = nn.ModuleList([nn.LayerNorm(d_model) for _ in range(n_layers)])
self.head = nn.Linear(d_model, vocab_size)
self.n_layers = n_layers
@torch.no_grad()
def generate(
self,
input_ids: torch.Tensor, # [B, seq_len]
max_new_tokens: int = 100,
temperature: float = 1.0
) -> torch.Tensor:
"""
Generazione autogressiva con KV cache.
Ogni step utilizza il cache dei token precedenti.
"""
B, T = input_ids.shape
device = input_ids.device
# Processa il prompt (prefill)
x = self.embed(input_ids)
positions = torch.arange(T, device=device).unsqueeze(0)
x = x + self.pos_embed(positions)
# Inizializza cache per ogni layer
kv_caches = [None] * self.n_layers
for i, (layer, norm) in enumerate(zip(self.layers, self.norms)):
x_norm = norm(x)
attn_out, kv_caches[i] = layer(x_norm, kv_caches[i])
x = x + attn_out
# Generazione token per token (usando il cache)
generated = []
for step in range(max_new_tokens):
# Solo l'ultimo token come query
last_token = input_ids[:, -1:] if step == 0 else new_token
x_new = self.embed(last_token)
pos = torch.tensor([[T + step]], device=device)
x_new = x_new + self.pos_embed(pos)
for i, (layer, norm) in enumerate(zip(self.layers, self.norms)):
x_norm = norm(x_new)
attn_out, kv_caches[i] = layer(x_norm, kv_caches[i])
x_new = x_new + attn_out
# Campiona prossimo token
logits = self.head(x_new[:, -1, :]) / temperature
new_token = torch.multinomial(torch.softmax(logits, -1), 1)
generated.append(new_token)
return torch.cat(generated, dim=1)
# Benchmark KV cache vs no cache
def benchmark_generation(model, vocab_size=32000, seq_len=128,
max_new=50, device="cuda"):
model = model.to(device).eval()
input_ids = torch.randint(0, vocab_size, (1, seq_len), device=device)
# Con KV cache (normale)
t0 = time.perf_counter()
with torch.no_grad():
output = model.generate(input_ids, max_new_tokens=max_new)
t_cached = (time.perf_counter() - t0) * 1000
tokens_per_sec = max_new / (t_cached / 1000)
print(f"Con KV Cache: {t_cached:.1f}ms totale, {tokens_per_sec:.1f} token/s")
Sistematik Karşılaştırma: 48 GB'tan 8 GB RTX'e
Seride görülen tüm optimizasyonları aşamalı olarak uygulayarak özetliyoruz doğruluk/bellek/hız dengesini gösteren temel bir modele.
Tam Karşılaştırma: RTX 3090 (24 GB) üzerinde Llama-3.1-8B
| Yapılandırma | VRAM | Verim | HellaSwag | Şaşkınlık | Notlar |
|---|---|---|---|---|---|
| BF16 temel çizgisi | 16,0 GB | 38 ton/sn | %82,1 | 6.14 | Referans kıyaslaması |
| + Flaş Dikkati 2 | 14,2 GB | 52 ton/sn | %82,1 | 6.14 | -%11 VRAM, +%37 hız |
| + torch.compile | 14,2 GB | 68 ton/sn | %82,1 | 6.14 | Flaş Dikkatinde +%31 |
| INT8 (bit ve bayt) | 8,5GB | 35 ton/sn | %81,8 | 6.21 | -%47 VRAM, -%0,3 acc |
| INT4 NF4 (bnb) | 4,9GB | 42 ton/sn | %81,2 | 6.47 | -%69 VRAM, -%0,9 acc |
| GPTQ INT4 | 4,8 GB | 55 ton/sn | %81,5 | 6.39 | -%70 VRAM, -%0,6 acc |
| AWQ INT4 | 4,7 GB | 52 ton/sn | %81,6 | 6.35 | -%71 VRAM, -%0,5 acc |
| GGUF Q4_K_M (CPU) | 0 VRAM (5 GB RAM) | 18 ton/sn | %81,3 | 6.42 | GPU'ya gerek yok |
RTX 3090 (24GB VRAM) için yaklaşık değerler. Toplu iş=1, sıra=512 ile ölçülen verim.
Karar Kılavuzu: Hangi Senaryo İçin Hangi Optimizasyon
# ALBERO DECISIONALE PER OTTIMIZZAZIONE DL
def recommend_optimization(
vram_available_gb: float,
task: str, # "training" | "inference" | "edge"
accuracy_critical: bool,
hardware: str # "server_gpu" | "consumer_gpu" | "cpu" | "edge"
) -> dict:
"""
Raccomanda le ottimizzazioni più appropriate per il proprio scenario.
"""
recommendations = []
priority = []
# === SEMPRE DA FARE (zero o quasi zero costo) ===
priority.append("1. Mixed Precision (BF16/FP16): abilita SEMPRE su GPU Ampere+")
priority.append("2. Flash Attention: abilita se seq_len > 512")
priority.append("3. torch.compile: abilita se PyTorch 2.0+, +30-50% speedup inference")
priority.append("4. KV Cache: abilita SEMPRE per LLM autoregressive generation")
if task == "training":
if vram_available_gb < 24:
priority.append("5. Gradient Checkpointing: -50% VRAM, +33% compute")
priority.append("6. Gradient Accumulation: simula batch più grandi")
if hardware in ["consumer_gpu", "edge"]:
priority.append("7. QLoRA: fine-tuning con INT4 + LoRA su GPU consumer")
if task in ["inference", "edge"]:
if not accuracy_critical:
if hardware == "server_gpu":
priority.append("5. GPTQ INT4: massimo throughput su GPU NVIDIA")
elif hardware in ["consumer_gpu", "cpu"]:
priority.append("5. AWQ INT4 o GGUF Q4_K_M: per hardware eterogeneo")
elif hardware == "edge":
priority.append("5. GGUF Q3_K_M o Q4_K_M: per Raspberry Pi / embedded")
else:
priority.append("5. INT8 (bitsandbytes): minima perdita di accuratezza")
if vram_available_gb < 16:
priority.append("6. ONNX Export: riduzione overhead runtime +20-40%")
priority.append("7. Considera distillazione verso modello più piccolo")
print("=== RACCOMANDAZIONI OTTIMIZZAZIONE ===")
for p in priority:
print(f" {p}")
return {"priorities": priority}
# Esempi:
print("--- Scenario 1: Fine-tuning su RTX 4080 (16GB) ---")
recommend_optimization(16, "training", True, "consumer_gpu")
print("\n--- Scenario 2: Inferenza su Raspberry Pi ---")
recommend_optimization(0, "inference", False, "edge")
print("\n--- Scenario 3: Produzione su A100 (80GB) ---")
recommend_optimization(80, "inference", True, "server_gpu")
Optimizasyon Özeti: Beklenen Etki
| Teknik | VRAM Tasarrufu | Hızlanma | Acc Kaybı | Karmaşıklık |
|---|---|---|---|---|
| Karışık Hassas BF16 | -%50 | 2-3x | 0% | Düşük (1 satır) |
| Flaş Dikkati 2 | -50-90% | 2-8x | 0% | Düşük (1 satır) |
| meşale.derleme | 0% | 1,5-2,5x | 0% | Düşük (1 satır) |
| KV Önbelleği | +VRAM | 10-50x gen | 0% | Düşük |
| Gradyan Kontrol Noktalaması | -50-70% | -0,7x | 0% | Düşük |
| INT8 Niceleme | -%50 | 0,9-1,1x | %0-0,5 | Düşük |
| INT4 GPTQ/AWQ | -75% | 1,3-1,8x | %0,5-1,5 | Ortalama |
| Damıtma | -70-90% | 5-20x | %5-15 | Yüksek |
| Yapılandırılmış budama | -30-70% | 2-5x | %2-10 | Yüksek |
Serinin Sonuçları
Bütün seriyi yaşadık Gelişmiş Derin Öğrenme ve Uç Dağıtımı: Transformers'taki dikkat mekanizmasından LoRA ile ince ayar yapmaya, GPTQ nicemlemesinden damıtmadan Vision Transformers'a, NAS'tan uç dağıtıma kadar yapılandırılmış budamaya kadar Raspberry Pi ve Jetson ile Ollama'dan bu son kıyaslamaya kadar.
Merkezi ve açık mesaj: Tek bir "en iyi" teknik yoktur. En uygun seçim her zaman bağlama bağlıdır; mevcut donanım, doğruluk gereksinimleri, hedef gecikme süresi, işletme maliyetleri. Ancak bu makalede sunulan sistematik kıyaslama çerçevesiyle, yapabilirsin ölçüm yerine tahmin etmekve bilinçli kararlar verin.
2026'nın trendi açık: modeller uç noktaya doğru ilerliyor. Gartner 2027 şunu öngörüyor: SLM, kullanımda bulut LLM'den 3 kat daha iyi performans gösteriyor. Bu serideki teknikler — kuantizasyon, damıtma, uç dağıtım, Ollama — bunlar akademik nişler değil: becerilerdir Önümüzdeki yıllarda yapay zeka ile çalışmak isteyen herkes için temel bir konu.
Dizi Özeti: Gelişmiş Derin Öğrenme
- Madde 1: Transformatörlerde Dikkat Mekanizması
- Madde 2: LoRA ve QLoRA ile ince ayar yapma
- Madde 3: Niceleme GPTQ, AWQ, INT8
- Madde 4: Bilginin Damıtılması
- Madde 5: Sinir Ağlarını Budamak
- Madde 6: Vizyon Transformatörü (ViT)
- Madde 7: Sinir Mimarisi Araştırması
- Madde 8: Uç Cihazlarda Derin Öğrenme
- Madde 9: Ollama ve Yüksek Lisans Tesisleri
- Madde 10 (bu): Kıyaslama ve Optimizasyon
İlgili seri: MLOps | Bilgisayarla Görme | Yapay Zeka Mühendisliği







