Lanțul valoric al gazoductului de emisii Scope 3: de la datele brute la pista de audit
Il 70-90% din totalul emisiilor a unei companii de tehnologie sau software se ascunde în afara propriei sale limite: în serverele cloud achiziționate ca serviciu, în laptopurile angajaților, în zborurile de afaceri, în cod pe care clienții rulează pe dispozitivele lor. Acestea sunt emisiile Domeniul de aplicare 3, și în cea mai mare parte parte a organizațiilor digitale reprezintă cea mai complexă problemă de măsurare care există în zona ESG.
Spre deosebire de Scope 1 (ardem combustibilul) și Scope 2 (cumpărăm electricitatea), cel Scopul 3 necesită colectarea de date de la sute de furnizori, aplicarea factorilor de emisie eterogene, gestionează grade foarte ridicate de incertitudine și produce o pistă de audit verificabilă de către auditorii externi. Cu CSRD/ESRS E1 ceea ce face ca raportarea Scope 3 să fie obligatorie pentru companiile mari începând de la 2025-2026, iar cu IMM-urile în miză până în 2028, întrebarea nu mai este academică: este inginerie.
În acest articol construim unul conductă completă pentru calcularea emisiilor Scope 3 lanțul valoric: de la arhitectura ETL pentru colectarea datelor furnizorilor, până la integrarea cu platforme precum CDP și EcoVadis, de la calculul bazat pe activitate versus calculul bazat pe cheltuieli, până la DAG Airflow pentru automatizare și la pista de audit imuabilă pentru verificatori. Fiecare secțiune include cod Python funcțional și cele mai bune practici operațional testat în contexte reale.
Ce vei învăța
- Cele 15 categorii GHG Protocol Scope 3 și care sunt relevante pentru companiile software/SaaS
- Arhitectură ETL/ELT pentru colectarea datelor furnizorilor: chestionare, CDP, EcoVadis și API-uri directe
- Bazat pe activitate vs pe baza cheltuielilor: formule, acuratețe și când să utilizați ce abordare
- Conducta Python cu Apache Airflow: DAG pentru calcule automate și scalabile Scope 3
- Scorul calității datelor și propagarea incertitudinii statistice în estimări
- Pista de audit imuabilă cu lanț hash SHA-256 pentru trasabilitatea verificatorului extern
- Vizualizare diagramă Sankey pentru lanțul valoric și categoriile prioritare de hărți termice
- Cerințe CSRD/ESRS E1: ce trebuie să dezvăluiți și cu ce granularitate
- Studiu de caz complet: Companie SaaS cu 50 de furnizori, calcul end-to-end Scope 3
- Integrare cu EcoVadis Carbon Data Network și Climatiq API pentru factorii de emisie
Green Software Series — 10 articole
| # | Articol | Subiect |
|---|---|---|
| 1 | Principiile Green Software Foundation | Eficiența carbonului, GSF, SCI |
| 2 | CodeCarbon: Măsurarea codului | Măsurare, tablou de bord, optimizare |
| 3 | Climatiq API: Calcule de carbon | REST API, GHG Protocol, Scop 1-3 |
| 4 | SDK Carbon Aware | Schimbarea timpului, schimbarea locației |
| 5 | Domeniul 1-2-3: Modelarea datelor ESG | Structura datelor, calcule, agregare |
| 6 | GreenOps: Kubernetes Carbon-Aware | Programare, scalare, monitorizare |
| 7 | Lanțul valoric Scope 3 al conductei de emisii | Acest articol |
| 8 | API de raportare ESG: CSRD | API, flux de lucru, conformitate |
| 9 | Modele arhitecturale durabile | Stocare, stocare în cache, lot |
| 10 | AI și Carbon: ML Training | Antrenament ML, optimizare, Green AI |
Cele 15 categorii de protocol GHG Scope 3
Il Standard pentru lanțul valoric corporativ al protocolului GHG (Scope 3). este cadrul internațional referință publicată în 2011 și în prezent în curs de revizuire cu actualizări așteptate în 2026. Se divide emisiile indirecte ale lanțului valoric în 15 categorii distincte, organizat în două macrogrupuri: în amonte (activități înainte de producție/prestare servicii) e în aval (activitati dupa vanzarea catre client).
15 Domeniul de aplicare 3 Categorii: în amonte și în aval
| Pisică. | Nume | Flux | Relevanța SaaS/Tech |
|---|---|---|---|
| 1 | Bunuri și servicii achiziționate | în amonte | HIGH: server hardware, licențe software, servicii de consultanță |
| 2 | Bunuri de capital | în amonte | MEDIA: echipamente pentru centre de date, laptopuri, telefoane de companie |
| 3 | Activități legate de combustibil și energie | în amonte | MEDIE: emisii din producția de energie achiziționată (în amonte Scope 2) |
| 4 | Transport și distribuție în amonte | în amonte | LOW: livrări de hardware către birouri și centre de date |
| 5 | Deșeuri generate în operațiuni | în amonte | SCĂZUT: DEEE, hârtie, deșeuri de birou |
| 6 | Călătorii de afaceri | în amonte | HIGH: zboruri, hoteluri, trenuri pentru echipe distribuite |
| 7 | Naveta angajaților | în amonte | HIGH: călătorii acasă la birou, în special pentru echipele hibride |
| 8 | Activele închiriate în amonte | în amonte | MEDIA: birouri de închiriat (dacă nu sunt contabilizate în Scop 1/2) |
| 9 | Transport și distribuție în aval | în aval | LOW: distribuție de software pe suport fizic (rar) |
| 10 | Prelucrarea produselor vândute | în aval | N/A: Nu se aplică pentru software pur |
| 11 | Utilizarea produselor vândute | în aval | FOARTE MARE: Energie consumată de clienții care utilizează SaaS |
| 12 | Tratarea la sfârșitul vieții produselor vândute | în aval | LOW: dispozitivele utilizatorului la sfârșitul vieții |
| 13 | Activele închiriate în aval | în aval | MEDIA: Hardware închiriat clienților |
| 14 | Francize | în aval | N/A: Nu se aplică |
| 15 | Investiții | în aval | HIGH: portofoliu corporativ, investiții de capital în startup-uri |
Pentru o companie SaaS sau de dezvoltare de software, cele mai relevante categorii sunt de obicei: Pisică. 1 (bunuri și servicii achiziționate, adesea cel mai mare articol), Pisică. 6 (călătorii de afaceri), Pisică. 7 (naveta angajatului) e Pisică. 11 (utilizarea produselor vândute). Acolo dublă materialitate cerere din CSRD impune identificarea categoriilor care sunt materiale atât din punct de vedere al impactului riscul de mediu și financiar pentru companie.
Greșeală comună: Omiterea Cat. 11 pentru SaaS
Multe companii de software exclud Categoria 11 („Utilizarea produselor vândute”), presupunând că nu se aplică. În realitate, fiecare apel API, fiecare interogare, fiecare watt consumat de clienți pentru a-l rula pe al tău software-ul este Scope 3 Cat. 11 emisii care sunt responsabilitatea dumneavoastră. Pentru un SaaS cu milioane de utilizatori, aceasta poate fi categoria dominantă. Metoda de calcul folosește software pentru intensitatea carbonului (SCI). înmulțit cu unitățile funcționale furnizate.
Arhitectura conductei de colectare a datelor
Colectarea de date fiabile de pe întreg lanțul valoric este blocajul numărul unu pentru orice afacere Proiect Scope 3. Conducta trebuie să gestioneze surse eterogene: chestionare manuale, platforme ESG terțe piese, API-uri directe cu furnizorii, fișiere CSV trimise prin e-mail, date ERP interne. Următoarea arhitectură adopta un model ETL cu trei straturi (Bronz/Argint/Aur) inspirat de Lakehouse.
Pipeline Architecture Scop 3: Bronz / Argint / Aur
| Straturi | Conţinut | Tehnologie | Domeniul de aplicare |
|---|---|---|---|
| Bronz (brut) | Date brute imuabile de la furnizori | S3/GCS, Lacul Delta | Pista de audit, reluare, sursă de adevăr |
| Argint (standardizat) | Date normalizate pe unitate și monedă | dbt, Spark, Pandas | Calculul emisiilor, combinați cu factorii de emisie |
| Aur (Raportare) | Emisii agregate pe categorii de GES | PostgreSQL, BigQuery | Tablouri de bord, rapoarte CSRD, verificatoare |
Stratul Bronz este esențial: toate datele primite sunt salvate așa cum este cu marca temporală de ingerare, hash SHA-256 al conținutului și metadatelor sursă. Aceasta garantează posibilitatea să reproceseze întreaga conductă dacă factorii de emisie sau metodologia se schimbă, fără a pierde datele originale.
# models/scope3_pipeline.py
# Struttura dati per la pipeline Scope 3
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import hashlib
import json
class DataSource(Enum):
SUPPLIER_QUESTIONNAIRE = "supplier_questionnaire"
CDP_API = "cdp_api"
ECOVADIS_API = "ecovadis_api"
ERP_EXPORT = "erp_export"
MANUAL_UPLOAD = "manual_upload"
CLIMATIQ_API = "climatiq_api"
class CalculationMethod(Enum):
ACTIVITY_BASED = "activity_based"
SPEND_BASED = "spend_based"
HYBRID = "hybrid"
SUPPLIER_SPECIFIC = "supplier_specific"
class DataQualityTier(Enum):
TIER_1 = "primary_data" # Dati primari dal supplier
TIER_2 = "secondary_sector" # Fattori settoriali
TIER_3 = "spend_estimated" # Stima basata su spesa
@dataclass
class RawSupplierData:
"""Layer Bronze: dato grezzo immutabile"""
supplier_id: str
source: DataSource
raw_payload: dict
received_at: datetime
content_hash: str = field(init=False)
def __post_init__(self):
payload_str = json.dumps(self.raw_payload, sort_keys=True)
self.content_hash = hashlib.sha256(
payload_str.encode()
).hexdigest()
@dataclass
class StandardizedActivity:
"""Layer Silver: attività normalizzata"""
activity_id: str
supplier_id: str
scope3_category: int # 1-15
activity_type: str # es. "freight_transport"
quantity: float
unit: str # es. "tonne.km"
reporting_period_start: datetime
reporting_period_end: datetime
source: DataSource
quality_tier: DataQualityTier
emission_factor_id: Optional[str] = None
uncertainty_pct: float = 0.0
raw_data_hash: str = "" # Ref al Bronze layer
@dataclass
class EmissionResult:
"""Layer Gold: emissione calcolata"""
result_id: str
activity_id: str
scope3_category: int
co2e_tonnes: float
calculation_method: CalculationMethod
emission_factor_source: str # es. "climatiq:IPCC_2021"
emission_factor_value: float
quality_tier: DataQualityTier
uncertainty_pct: float
calculated_at: datetime
pipeline_version: str
audit_hash: str = field(init=False)
def __post_init__(self):
audit_data = {
"result_id": self.result_id,
"activity_id": self.activity_id,
"co2e_tonnes": self.co2e_tonnes,
"emission_factor_source": self.emission_factor_source,
"calculated_at": self.calculated_at.isoformat(),
"pipeline_version": self.pipeline_version,
}
self.audit_hash = hashlib.sha256(
json.dumps(audit_data, sort_keys=True).encode()
).hexdigest()
Integrarea datelor furnizorilor: CDP, EcoVadis și Direct API
Colectarea datelor de la furnizori are loc prin mai multe canale cu niveluri de calitate și automatizare foarte diferită. The Carbon Disclosure Project (CDP) colectează date de la peste 24.000 de companii și expune un API pentru a accesa rapoartele verificate. EcoVadis a lansat Carbon Data Network în 2025, cu peste 48.000 de reporteri GHG care împărtășesc date într-un fel standardizate. În cele din urmă, mulți furnizori mari expun API-uri proprietare pentru partajare directă a urmelor sale.
# collectors/supplier_collector.py
# Integrazione con fonti dati supplier
import httpx
import asyncio
from typing import AsyncGenerator
from datetime import datetime
from models.scope3_pipeline import RawSupplierData, DataSource
class ClimatiqEmissionFactors:
"""Client per Climatiq API - emission factors database"""
BASE_URL = "https://beta3.api.climatiq.io"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async def get_emission_factor(
self,
activity_id: str,
year: int = 2024,
region: str = "IT"
) -> dict:
"""Recupera fattore di emissione per attività specifica"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/estimate",
headers=self.headers,
json={
"emission_factor": {
"activity_id": activity_id,
"data_version": "^21",
"year": year,
"region": region
},
"parameters": {
"money": 1.0,
"money_unit": "eur"
}
}
)
response.raise_for_status()
return response.json()
async def batch_estimate(
self,
activities: list[dict]
) -> list[dict]:
"""Stima batch per multiple attività - ottimizza le API call"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/batch",
headers=self.headers,
json={"batch": activities},
timeout=30.0
)
response.raise_for_status()
return response.json().get("results", [])
class EcoVadisCollector:
"""Raccoglie dati Scope 3 dalla piattaforma EcoVadis"""
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
async def fetch_supplier_carbon_data(
self,
supplier_ecovadis_id: str,
reporting_year: int
) -> RawSupplierData:
"""Recupera dati carbonio per un supplier dalla Carbon Data Network"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/v1/suppliers/{supplier_ecovadis_id}/carbon",
headers={"X-API-Key": self.api_key},
params={"year": reporting_year},
timeout=15.0
)
if response.status_code == 404:
# Supplier non ha condiviso dati primari
return self._create_no_data_record(supplier_ecovadis_id)
response.raise_for_status()
payload = response.json()
return RawSupplierData(
supplier_id=supplier_ecovadis_id,
source=DataSource.ECOVADIS_API,
raw_payload=payload,
received_at=datetime.utcnow()
)
def _create_no_data_record(self, supplier_id: str) -> RawSupplierData:
return RawSupplierData(
supplier_id=supplier_id,
source=DataSource.ECOVADIS_API,
raw_payload={"status": "no_data", "supplier_id": supplier_id},
received_at=datetime.utcnow()
)
class CDPCollector:
"""Raccoglie dati da CDP (Carbon Disclosure Project)"""
CDP_API_URL = "https://api.cdp.net/v1"
def __init__(self, api_token: str):
self.api_token = api_token
async def search_supplier(
self,
company_name: str,
year: int = 2024
) -> RawSupplierData | None:
"""Cerca un supplier nel database CDP e recupera dati GHG"""
async with httpx.AsyncClient() as client:
# Ricerca azienda
search_resp = await client.get(
f"{self.CDP_API_URL}/companies/search",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"q": company_name, "year": year}
)
if not search_resp.json().get("results"):
return None
company_id = search_resp.json()["results"][0]["id"]
# Recupera dati GHG disclosure
ghg_resp = await client.get(
f"{self.CDP_API_URL}/companies/{company_id}/ghg-emissions",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"year": year}
)
if ghg_resp.status_code != 200:
return None
return RawSupplierData(
supplier_id=company_name,
source=DataSource.CDP_API,
raw_payload=ghg_resp.json(),
received_at=datetime.utcnow()
)
class BronzeLayerStorage:
"""Salvataggio immutabile nel layer Bronze"""
def __init__(self, storage_client, bucket: str):
self.storage = storage_client
self.bucket = bucket
async def store(self, raw_data: RawSupplierData) -> str:
"""Salva dato grezzo con path deterministico basato su hash"""
path = (
f"scope3/bronze/"
f"{raw_data.received_at.year}/"
f"{raw_data.received_at.month:02d}/"
f"{raw_data.supplier_id}/"
f"{raw_data.content_hash}.json"
)
await self.storage.upload_json(
bucket=self.bucket,
path=path,
data={
"supplier_id": raw_data.supplier_id,
"source": raw_data.source.value,
"received_at": raw_data.received_at.isoformat(),
"content_hash": raw_data.content_hash,
"payload": raw_data.raw_payload
}
)
return path
Bazat pe activitate vs bazat pe cheltuieli: alegerea metodei potrivite
Protocolul GHG definește patru metode de calcul pentru Scope 3, care în practică da reduceți la două abordări fundamentale: bazate pe activitate e bazat pe cheltuieli. Alegerea depinde de disponibilitatea datelor și de materialitatea categoriei și maturitatea relației cu furnizorul.
Comparație metodologică: bazată pe activitate vs bazată pe cheltuieli
| Dimensiune | Bazat pe activitate | Bazat pe cheltuieli |
|---|---|---|
| Formula | Cantitate × factor de emisie (unități/kg CO2e) | Cheltuieli (EUR) × factor EEIO (kg CO2e/EUR) |
| Precizie | Ridicat (±5-15% cu datele primare) | Scăzut-Mediu (±50-100%) |
| Date solicitate | Cantități fizice (kg, km, kWh, t) | Numai facturi contabile (EUR, USD) |
| Sursa EF | Climatiq, IPCC, DEFRA, ecoinvent | USEEIO, EXIOBASE, WIOD |
| Când să-l folosești | Categorii de materiale, furnizori mari | Kick-off, furnizori mici, Cat. <1% |
| Efort de colectare | Ridicat: necesită colaborarea furnizorilor | Scăzut: date deja în ERP/SAP |
| Acceptabilitatea CSRD | Favorit pentru categoriile de materiale | Acceptat ca proxy inițial |
Strategia optimă este o abordare hibrid progresiv: să începem cu bazat pe cheltuieli pentru a avea o linie de bază rapidă pe întreg lanțul valoric, apoi este migrat progresiv spre activitate bazată pe categoriile materiale identificate. Protocolul GHG definește trei niveluri de calitate a datelor (Tier 1, 2, 3) care corespund exact acestei progresii.
# calculators/emission_calculator.py
# Calcolo emissioni activity-based e spend-based
from dataclasses import dataclass
from typing import Optional
import math
# ============================================================
# EMISSION FACTORS DATABASE (simplified)
# In produzione: usa Climatiq API o database ecoinvent
# ============================================================
EMISSION_FACTORS: dict[str, dict] = {
# Cat. 1: Purchased goods & services
"cloud_compute_kwh": {
"value": 0.233, # kg CO2e/kWh (IT grid mix 2024)
"unit": "kWh",
"source": "IEA 2024",
"uncertainty_pct": 10.0
},
"hardware_laptop": {
"value": 350.0, # kg CO2e/unit (embodied carbon)
"unit": "unit",
"source": "Dell 2024 PCF",
"uncertainty_pct": 20.0
},
# Cat. 6: Business travel
"flight_economy_short": {
"value": 0.255, # kg CO2e/passenger.km
"unit": "passenger.km",
"source": "DEFRA 2024",
"uncertainty_pct": 15.0
},
"flight_economy_long": {
"value": 0.195,
"unit": "passenger.km",
"source": "DEFRA 2024",
"uncertainty_pct": 15.0
},
# Cat. 7: Employee commuting
"car_average": {
"value": 0.170, # kg CO2e/km
"unit": "km",
"source": "DEFRA 2024",
"uncertainty_pct": 12.0
},
"public_transport_it": {
"value": 0.048,
"unit": "passenger.km",
"source": "Ispra 2024",
"uncertainty_pct": 18.0
},
}
# EEIO Spend-based factors (EXIOBASE 3.8)
# kg CO2e per EUR di spesa per categoria merceologica
EEIO_FACTORS: dict[str, float] = {
"it_services": 0.312, # IT e telecomunicazioni
"professional_services": 0.198, # Consulenza, legale, etc.
"office_supplies": 0.445,
"cloud_hosting": 0.287,
"marketing": 0.231,
"utilities": 0.892,
"hr_services": 0.167,
"travel_accommodation": 0.521,
}
def calculate_activity_based(
activity_type: str,
quantity: float,
custom_ef: Optional[float] = None
) -> tuple[float, float]:
"""
Calcola emissioni con metodo activity-based.
Returns:
(co2e_kg, uncertainty_pct)
"""
if custom_ef is not None:
return quantity * custom_ef, 30.0 # alta incertezza EF custom
ef_data = EMISSION_FACTORS.get(activity_type)
if not ef_data:
raise ValueError(f"Emission factor non trovato: {activity_type}")
co2e_kg = quantity * ef_data["value"]
uncertainty = ef_data["uncertainty_pct"]
return co2e_kg, uncertainty
def calculate_spend_based(
spend_eur: float,
procurement_category: str,
inflation_correction: float = 1.0
) -> tuple[float, float]:
"""
Calcola emissioni con metodo spend-based (EEIO).
Args:
spend_eur: importo in EUR
procurement_category: categoria merceologica EEIO
inflation_correction: fattore per correggere inflazione vs anno base EEIO
Returns:
(co2e_kg, uncertainty_pct)
"""
eeio_factor = EEIO_FACTORS.get(procurement_category)
if not eeio_factor:
raise ValueError(f"EEIO factor non trovato: {procurement_category}")
# Corregge per inflazione (EEIO factors spesso in EUR 2015)
adjusted_spend = spend_eur / inflation_correction
co2e_kg = adjusted_spend * eeio_factor
# Lo spend-based ha incertezza intrinsecamente alta
uncertainty = 75.0
return co2e_kg, uncertainty
def propagate_uncertainty(
values: list[float],
uncertainties_pct: list[float]
) -> float:
"""
Propagazione incertezza quadratica (somma in quadratura).
Valida quando le incertezze sono indipendenti.
Returns:
uncertainty_pct sul totale
"""
weighted_variance_sum = sum(
(v * u/100) ** 2
for v, u in zip(values, uncertainties_pct)
)
total = sum(values)
if total == 0:
return 0.0
combined_std = math.sqrt(weighted_variance_sum)
return (combined_std / total) * 100
def calculate_category_total(
activities: list[dict]
) -> dict:
"""
Calcola totale categoria Scope 3 con propagazione incertezza.
activities: lista di {method, value_kg, uncertainty_pct}
"""
if not activities:
return {"total_co2e_kg": 0.0, "uncertainty_pct": 0.0}
values = [a["value_kg"] for a in activities]
uncertainties = [a["uncertainty_pct"] for a in activities]
total_co2e = sum(values)
combined_uncertainty = propagate_uncertainty(values, uncertainties)
# Qualità aggregata: peggiore del gruppo determina il tier
quality_tiers = [a.get("quality_tier", "TIER_3") for a in activities]
dominant_tier = min(quality_tiers) # TIER_1 < TIER_2 < TIER_3 lexicograficamente
return {
"total_co2e_kg": total_co2e,
"total_co2e_tonnes": total_co2e / 1000,
"uncertainty_pct": combined_uncertainty,
"uncertainty_kg": total_co2e * combined_uncertainty / 100,
"dominant_quality_tier": dominant_tier,
"activity_count": len(activities)
}
Flux de aer în conductă: DAG pentru calculul automatizat pentru Scope 3
Orchestrarea pipelinei Scope 3 necesită un DAG bine structurat care să gestioneze ciclu de viață complet: colectarea datelor, standardizare, calculul emisiilor, verificarea calității și publicarea în stratul Gold. DAG-ul trebuie să fie idempotent (executabil de mai multe ori fără efecte secundare) e resetabil în caz de eșec parțial.
# dags/scope3_pipeline_dag.py
# Apache Airflow DAG per pipeline emissioni Scope 3
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task, task_group
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import Variable
import json
import logging
logger = logging.getLogger(__name__)
# ============================================================
# CONFIGURAZIONE DAG
# ============================================================
SCOPE3_DAG_CONFIG = {
"reporting_year": 2024,
"companies": [
{"id": "S001", "name": "AWS", "tier": "TIER_1", "source": "ecovadis"},
{"id": "S002", "name": "Microsoft Azure", "tier": "TIER_1", "source": "cdp"},
{"id": "S003", "name": "Supplier_XYZ", "tier": "TIER_2", "source": "questionnaire"},
# ... altri supplier
],
"categories_enabled": [1, 2, 3, 6, 7, 11, 15],
"quality_threshold_pct": 80.0,
"alert_email": "esg-team@company.com"
}
default_args = {
"owner": "esg-team",
"depends_on_past": False,
"email_on_failure": True,
"email": [SCOPE3_DAG_CONFIG["alert_email"]],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="scope3_emissions_pipeline",
default_args=default_args,
description="Pipeline calcolo emissioni Scope 3 value chain",
schedule_interval="@quarterly", # Esecuzione trimestrale
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["emissions", "scope3", "esg", "ghg-protocol"],
max_active_runs=1, # Serializza: mai due calcoli in parallelo
) as dag:
# ============================================================
# FASE 1: RACCOLTA DATI SUPPLIER (in parallelo per supplier)
# ============================================================
@task_group(group_id="data_collection")
def collect_supplier_data():
@task(task_id="fetch_ecovadis_suppliers")
def fetch_ecovadis() -> list[dict]:
"""Raccoglie dati da EcoVadis Carbon Data Network"""
from collectors.supplier_collector import EcoVadisCollector
import asyncio
api_key = Variable.get("ECOVADIS_API_KEY", deserialize_json=False)
collector = EcoVadisCollector(api_key, "https://api.ecovadis.com")
suppliers_ecovadis = [
s for s in SCOPE3_DAG_CONFIG["companies"]
if s["source"] == "ecovadis"
]
results = []
for supplier in suppliers_ecovadis:
raw = asyncio.run(
collector.fetch_supplier_carbon_data(
supplier["id"],
SCOPE3_DAG_CONFIG["reporting_year"]
)
)
results.append({
"supplier_id": raw.supplier_id,
"content_hash": raw.content_hash,
"status": "fetched",
"has_data": raw.raw_payload.get("status") != "no_data"
})
logger.info(f"EcoVadis - Supplier {supplier['id']}: fetched")
return results
@task(task_id="fetch_cdp_suppliers")
def fetch_cdp() -> list[dict]:
"""Raccoglie dati verificati da CDP"""
from collectors.supplier_collector import CDPCollector
import asyncio
api_token = Variable.get("CDP_API_TOKEN")
collector = CDPCollector(api_token)
suppliers_cdp = [
s for s in SCOPE3_DAG_CONFIG["companies"]
if s["source"] == "cdp"
]
results = []
for supplier in suppliers_cdp:
raw = asyncio.run(
collector.search_supplier(
supplier["name"],
SCOPE3_DAG_CONFIG["reporting_year"]
)
)
if raw:
results.append({
"supplier_id": supplier["id"],
"content_hash": raw.content_hash,
"status": "fetched",
"has_data": True
})
else:
results.append({
"supplier_id": supplier["id"],
"status": "not_found",
"has_data": False
})
return results
@task(task_id="process_manual_questionnaires")
def process_questionnaires() -> list[dict]:
"""Processa questionari manuali caricati in S3"""
# In produzione: legge da bucket S3 o SharePoint
# Qui restituiamo dati di esempio
return [{
"supplier_id": "S003",
"status": "processed",
"has_data": True,
"scope3_cat1_tco2e": 45.2,
"scope3_cat6_tco2e": 12.8
}]
ev = fetch_ecovadis()
cdp = fetch_cdp()
q = process_questionnaires()
return [ev, cdp, q]
# ============================================================
# FASE 2: STANDARDIZZAZIONE E CALCOLO EMISSIONI
# ============================================================
@task(task_id="standardize_activities")
def standardize_activities(collection_results: list) -> list[dict]:
"""Normalizza tutti i dati in unità fisiche standard"""
from normalizers.activity_normalizer import ActivityNormalizer
normalizer = ActivityNormalizer()
standardized = []
for batch in collection_results:
for result in batch:
if result.get("has_data"):
activities = normalizer.normalize(result)
standardized.extend(activities)
logger.info(f"Standardizzate {len(standardized)} attività")
return standardized
@task(task_id="calculate_emissions")
def calculate_emissions(activities: list[dict]) -> list[dict]:
"""Calcola emissioni CO2e per ogni attività standardizzata"""
from calculators.emission_calculator import (
calculate_activity_based,
calculate_spend_based
)
from models.scope3_pipeline import CalculationMethod
results = []
for activity in activities:
if activity["method"] == "activity_based":
co2e_kg, uncertainty = calculate_activity_based(
activity["activity_type"],
activity["quantity"]
)
method = CalculationMethod.ACTIVITY_BASED
else:
co2e_kg, uncertainty = calculate_spend_based(
activity["spend_eur"],
activity["procurement_category"]
)
method = CalculationMethod.SPEND_BASED
results.append({
**activity,
"co2e_kg": co2e_kg,
"co2e_tonnes": co2e_kg / 1000,
"uncertainty_pct": uncertainty,
"calculation_method": method.value,
"calculated_at": datetime.utcnow().isoformat()
})
return results
# ============================================================
# FASE 3: DATA QUALITY CHECK
# ============================================================
@task(task_id="data_quality_check")
def data_quality_check(results: list[dict]) -> dict:
"""Verifica qualità dati e genera score per categoria"""
from quality.data_quality_scorer import DataQualityScorer
scorer = DataQualityScorer()
quality_report = scorer.score_results(results)
if quality_report["overall_score"] < SCOPE3_DAG_CONFIG["quality_threshold_pct"]:
logger.warning(
f"Quality score sotto soglia: {quality_report['overall_score']}%"
)
return quality_report
# ============================================================
# FASE 4: AUDIT TRAIL E PUBBLICAZIONE GOLD LAYER
# ============================================================
@task(task_id="create_audit_trail")
def create_audit_trail(
results: list[dict],
quality_report: dict
) -> str:
"""Crea audit trail immutabile con hash chain"""
from audit.hash_chain import HashChain
chain = HashChain()
chain_id = chain.create_chain(
calculation_results=results,
quality_report=quality_report,
pipeline_version="2.1.0",
methodology="GHG_Protocol_Scope3_2011",
reporting_standard="CSRD_ESRS_E1"
)
logger.info(f"Audit trail creato: {chain_id}")
return chain_id
@task(task_id="publish_gold_layer")
def publish_gold_layer(
results: list[dict],
audit_chain_id: str
) -> None:
"""Pubblica dati aggregati nel Gold layer (PostgreSQL)"""
hook = PostgresHook(postgres_conn_id="emissions_db")
for result in results:
hook.run(
"""
INSERT INTO scope3_emissions_gold (
supplier_id, scope3_category, co2e_tonnes,
calculation_method, uncertainty_pct,
quality_tier, audit_chain_id,
reporting_year, published_at
) VALUES (
%(supplier_id)s, %(scope3_category)s, %(co2e_tonnes)s,
%(calculation_method)s, %(uncertainty_pct)s,
%(quality_tier)s, %(audit_chain_id)s,
%(reporting_year)s, NOW()
)
ON CONFLICT (supplier_id, scope3_category, reporting_year)
DO UPDATE SET
co2e_tonnes = EXCLUDED.co2e_tonnes,
updated_at = NOW()
""",
parameters={
**result,
"audit_chain_id": audit_chain_id,
"reporting_year": SCOPE3_DAG_CONFIG["reporting_year"]
}
)
logger.info(f"Pubblicati {len(results)} record nel Gold layer")
# ============================================================
# WIRING DEL DAG
# ============================================================
collection_results = collect_supplier_data()
standardized = standardize_activities(collection_results)
emission_results = calculate_emissions(standardized)
quality = data_quality_check(emission_results)
chain_id = create_audit_trail(emission_results, quality)
publish_gold_layer(emission_results, chain_id)
Scorul calității datelor și propagarea incertitudinii
Standardul GHG Protocol Scope 3 recunoaște în mod explicit că cat. 1-15 nu sunt niciodată cunoscut cu certitudine absolută. Raportarea calității trebuie să includă o estimare alincertitudinea cantitativă asociat fiecărei categorii. IPCC s-a oficializat metoda de propagare a incertitudinii în Ghidul său de bune practici.
# quality/data_quality_scorer.py
# Scoring qualità dati Scope 3
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import math
from datetime import datetime, timedelta
class QualityDimension(Enum):
COMPLETENESS = "completeness"
ACCURACY = "accuracy"
TIMELINESS = "timeliness"
VERIFICATION = "verification"
GRANULARITY = "granularity"
@dataclass
class QualityScore:
dimension: QualityDimension
score: float # 0-100
weight: float # peso nel calcolo aggregato
notes: str = ""
def score_supplier_data_quality(
supplier: dict,
reference_date: datetime = None
) -> dict[str, float]:
"""
Calcola score qualità multi-dimensionale per i dati di un supplier.
Basato su GHG Protocol Data Quality Guidance.
"""
if reference_date is None:
reference_date = datetime.utcnow()
scores = []
# 1. COMPLETENESS: quante delle categorie richieste sono presenti?
required_fields = [
"scope3_cat1_tco2e", "scope3_cat6_tco2e", "scope3_cat7_tco2e"
]
present = sum(1 for f in required_fields if supplier.get(f) is not None)
completeness_score = (present / len(required_fields)) * 100
scores.append(QualityScore(
dimension=QualityDimension.COMPLETENESS,
score=completeness_score,
weight=0.30
))
# 2. TIMELINESS: quanto sono recenti i dati?
data_year = supplier.get("reporting_year", 2020)
current_year = reference_date.year
age_years = current_year - data_year
if age_years <= 1:
timeliness_score = 100.0
elif age_years == 2:
timeliness_score = 75.0
elif age_years == 3:
timeliness_score = 50.0
else:
timeliness_score = 20.0
scores.append(QualityScore(
dimension=QualityDimension.TIMELINESS,
score=timeliness_score,
weight=0.20
))
# 3. VERIFICATION: i dati sono stati verificati da terze parti?
verification_level = supplier.get("verification", "none")
verification_score = {
"independent_assured": 100.0, # GHG verificato da auditor indipendente
"limited_assurance": 80.0, # Limited assurance
"internal_reviewed": 60.0, # Solo review interna
"supplier_declared": 40.0, # Auto-dichiarazione
"estimated": 20.0, # Stima spend-based
"none": 0.0
}.get(verification_level, 20.0)
scores.append(QualityScore(
dimension=QualityDimension.VERIFICATION,
score=verification_score,
weight=0.30
))
# 4. GRANULARITY: attività-specifico o aggregato?
data_type = supplier.get("data_type", "aggregated")
granularity_score = {
"site_specific": 100.0, # Dati per sito produttivo
"product_specific": 90.0, # PCF per prodotto/servizio
"supplier_specific": 70.0, # Dato totale supplier
"sector_average": 40.0, # Media settoriale
"aggregated": 20.0
}.get(data_type, 20.0)
scores.append(QualityScore(
dimension=QualityDimension.GRANULARITY,
score=granularity_score,
weight=0.20
))
# Calcolo score aggregato ponderato
overall = sum(s.score * s.weight for s in scores)
# Mappa score a Tier GHG Protocol
if overall >= 80:
tier = "TIER_1"
uncertainty_band_pct = 15.0
elif overall >= 50:
tier = "TIER_2"
uncertainty_band_pct = 40.0
else:
tier = "TIER_3"
uncertainty_band_pct = 75.0
return {
"overall_score": round(overall, 1),
"tier": tier,
"uncertainty_band_pct": uncertainty_band_pct,
"dimension_scores": {
s.dimension.value: round(s.score, 1)
for s in scores
}
}
def monte_carlo_uncertainty(
base_estimate_tco2e: float,
uncertainty_pct: float,
n_simulations: int = 10_000
) -> dict:
"""
Stima intervallo di confidenza con simulazione Monte Carlo.
Per reporting CSRD si raccomanda almeno 1.000 simulazioni.
"""
import random
# Distribuzione log-normale (emissioni non possono essere negative)
sigma = math.log(1 + (uncertainty_pct / 100) ** 2) ** 0.5
mu = math.log(base_estimate_tco2e) - sigma ** 2 / 2
simulated = [
math.exp(random.gauss(mu, sigma))
for _ in range(n_simulations)
]
simulated_sorted = sorted(simulated)
p05 = simulated_sorted[int(n_simulations * 0.05)]
p50 = simulated_sorted[int(n_simulations * 0.50)]
p95 = simulated_sorted[int(n_simulations * 0.95)]
return {
"base_estimate_tco2e": base_estimate_tco2e,
"p05_tco2e": round(p05, 2),
"p50_tco2e": round(p50, 2),
"p95_tco2e": round(p95, 2),
"confidence_interval_90pct": {
"lower": round(p05, 2),
"upper": round(p95, 2)
},
"coefficient_of_variation": round(
(p95 - p05) / (2 * p50) * 100, 1
)
}
Pista de audit imuabilă cu lanț hash
La trasabilitate de la capăt la capăt este una dintre cele mai critice cerinţe pentru raportare verificabilă Scope 3. Auditorii externi trebuie să poată urmări fiecare număr din raport final până la sursa primară a datelor, trecând prin toate etapele de transformare. O lanțuri de hash inspirat din tehnologia blockchain (dar fără complexitate distribuit) garantează imuabilitatea pistei de audit.
# audit/hash_chain.py
# Audit trail immutabile per emissioni Scope 3
import hashlib
import json
import uuid
from datetime import datetime
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class AuditRecord:
"""Singolo record nell'audit chain"""
def __init__(
self,
record_type: str,
payload: dict,
previous_hash: str,
chain_id: str,
sequence: int
):
self.record_id = str(uuid.uuid4())
self.record_type = record_type
self.payload = payload
self.previous_hash = previous_hash
self.chain_id = chain_id
self.sequence = sequence
self.created_at = datetime.utcnow().isoformat()
self.record_hash = self._compute_hash()
def _compute_hash(self) -> str:
"""SHA-256 hash di tutti i campi del record (eccetto il hash stesso)"""
data = {
"record_id": self.record_id,
"record_type": self.record_type,
"chain_id": self.chain_id,
"sequence": self.sequence,
"previous_hash": self.previous_hash,
"created_at": self.created_at,
"payload_hash": hashlib.sha256(
json.dumps(self.payload, sort_keys=True, default=str).encode()
).hexdigest()
}
return hashlib.sha256(
json.dumps(data, sort_keys=True).encode()
).hexdigest()
def to_dict(self) -> dict:
return {
"record_id": self.record_id,
"record_type": self.record_type,
"chain_id": self.chain_id,
"sequence": self.sequence,
"previous_hash": self.previous_hash,
"record_hash": self.record_hash,
"created_at": self.created_at,
"payload": self.payload
}
class HashChain:
"""
Hash chain per audit trail immutabile emissioni Scope 3.
Ogni record contiene l'hash del record precedente,
rendendo impossibile modificare un record senza invalidare
tutti i record successivi.
"""
GENESIS_HASH = "0" * 64 # Hash del primo record della chain
def __init__(self, db_client=None):
self.db = db_client
self.records: list[AuditRecord] = []
def create_chain(
self,
calculation_results: list[dict],
quality_report: dict,
pipeline_version: str,
methodology: str,
reporting_standard: str
) -> str:
"""Crea una nuova chain per un calcolo Scope 3 completo"""
chain_id = str(uuid.uuid4())
previous_hash = self.GENESIS_HASH
# Record 1: Metadati della pipeline
pipeline_record = AuditRecord(
record_type="PIPELINE_METADATA",
payload={
"version": pipeline_version,
"methodology": methodology,
"reporting_standard": reporting_standard,
"calculation_timestamp": datetime.utcnow().isoformat(),
"total_activities": len(calculation_results)
},
previous_hash=previous_hash,
chain_id=chain_id,
sequence=0
)
self.records.append(pipeline_record)
previous_hash = pipeline_record.record_hash
# Record 2: Quality report
quality_record = AuditRecord(
record_type="QUALITY_ASSESSMENT",
payload=quality_report,
previous_hash=previous_hash,
chain_id=chain_id,
sequence=1
)
self.records.append(quality_record)
previous_hash = quality_record.record_hash
# Record 3..N: Singoli risultati di emissione
for i, result in enumerate(calculation_results):
emission_record = AuditRecord(
record_type="EMISSION_CALCULATION",
payload={
"supplier_id": result.get("supplier_id"),
"scope3_category": result.get("scope3_category"),
"co2e_tonnes": result.get("co2e_tonnes"),
"calculation_method": result.get("calculation_method"),
"emission_factor_source": result.get("emission_factor_source"),
"uncertainty_pct": result.get("uncertainty_pct"),
"quality_tier": result.get("quality_tier")
},
previous_hash=previous_hash,
chain_id=chain_id,
sequence=2 + i
)
self.records.append(emission_record)
previous_hash = emission_record.record_hash
# Persist su DB (o storage immutabile)
if self.db:
self._persist_chain(chain_id)
logger.info(
f"Chain {chain_id} creata con {len(self.records)} record. "
f"Final hash: {previous_hash[:16]}..."
)
return chain_id
def verify_chain_integrity(self, chain_id: str) -> bool:
"""
Verifica che nessun record sia stato alterato.
Percorre la chain ricomputando ogni hash.
"""
records = self._load_chain(chain_id)
if not records:
return False
expected_previous = self.GENESIS_HASH
for record_dict in records:
# Ricomputa hash
record = AuditRecord(
record_type=record_dict["record_type"],
payload=record_dict["payload"],
previous_hash=record_dict["previous_hash"],
chain_id=record_dict["chain_id"],
sequence=record_dict["sequence"]
)
if record_dict["previous_hash"] != expected_previous:
logger.error(
f"Chain corrotta al record {record_dict['sequence']}: "
f"previous_hash non corrisponde"
)
return False
expected_previous = record.record_hash
return True
def _persist_chain(self, chain_id: str) -> None:
"""Salva tutti i record della chain nel DB"""
for record in self.records:
self.db.insert("scope3_audit_chain", record.to_dict())
def _load_chain(self, chain_id: str) -> list[dict]:
"""Carica i record della chain dal DB in ordine di sequenza"""
if not self.db:
return [r.to_dict() for r in self.records]
return self.db.query(
"SELECT * FROM scope3_audit_chain WHERE chain_id = %s ORDER BY sequence",
[chain_id]
)
Vizualizare: Diagramă Sankey și categorii de hărți termice
O conductă Scope 3 bine construită trebuie, de asemenea, să producă vizualizări care redau datele de înțeles pentru părțile interesate tehnice și non-tehnice. The Diagrama Sankey este instrumentul ideal pentru a arăta fluxurile de emisii de-a lungul lanțului valoric, în timp ce a hartă termică vă permite să identificați rapid cele mai multe categorii materiale și cele cu o calitate mai scăzută a datelor.
# visualizations/scope3_charts.py
# Generazione Sankey diagram e heatmap Scope 3
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
from typing import Optional
def create_scope3_sankey(
emission_data: list[dict],
title: str = "Scope 3 Value Chain Emissions"
) -> go.Figure:
"""
Crea Sankey diagram per visualizzare flussi emissioni Scope 3.
Struttura: Supplier -> Categoria S3 -> Totale Scope 3
"""
# Raccoglie nodi unici
suppliers = list(set(d["supplier_id"] for d in emission_data))
categories = list(set(f"Cat. {d['scope3_category']}" for d in emission_data))
all_nodes = suppliers + categories + ["Scope 3 Total"]
node_index = {node: i for i, node in enumerate(all_nodes)}
# Costruisce link source->target->value
source_indices = []
target_indices = []
values = []
link_labels = []
for record in emission_data:
supplier = record["supplier_id"]
category = f"Cat. {record['scope3_category']}"
tco2e = record["co2e_tonnes"]
# Supplier -> Categoria
source_indices.append(node_index[supplier])
target_indices.append(node_index[category])
values.append(tco2e)
link_labels.append(f"{tco2e:.1f} tCO2e")
# Categoria -> Total
for cat in categories:
cat_total = sum(
d["co2e_tonnes"]
for d in emission_data
if f"Cat. {d['scope3_category']}" == cat
)
source_indices.append(node_index[cat])
target_indices.append(node_index["Scope 3 Total"])
values.append(cat_total)
link_labels.append(f"{cat_total:.1f} tCO2e")
# Colori nodi
node_colors = (
["#2196F3"] * len(suppliers) + # Blu per supplier
["#FF9800"] * len(categories) + # Arancione per categorie
["#4CAF50"] # Verde per totale
)
fig = go.Figure(go.Sankey(
arrangement="snap",
node=dict(
pad=20,
thickness=20,
line=dict(color="white", width=0.5),
label=all_nodes,
color=node_colors,
hovertemplate="{label}
tCO2e: {value:.1f}<extra></extra>"
),
link=dict(
source=source_indices,
target=target_indices,
value=values,
label=link_labels,
color="rgba(100,100,100,0.3)"
)
))
fig.update_layout(
title_text=title,
font_size=12,
height=600,
paper_bgcolor="rgba(0,0,0,0)",
plot_bgcolor="rgba(0,0,0,0)"
)
return fig
def create_category_heatmap(
emission_data: list[dict]
) -> go.Figure:
"""
Heatmap: asse X = categoria Scope 3, asse Y = qualità dato.
Colore = tCO2e. Aiuta a prioritizzare effort raccolta dati.
"""
df = pd.DataFrame(emission_data)
# Aggrega per categoria e tier qualità
pivot = df.pivot_table(
values="co2e_tonnes",
index="quality_tier",
columns="scope3_category",
aggfunc="sum",
fill_value=0
)
# Ordina tier (TIER_1 migliore in alto)
tier_order = ["TIER_1", "TIER_2", "TIER_3"]
pivot = pivot.reindex(
[t for t in tier_order if t in pivot.index]
)
fig = go.Figure(go.Heatmap(
z=pivot.values,
x=[f"Cat. {c}" for c in pivot.columns],
y=list(pivot.index),
colorscale="RdYlGn_r", # Rosso = alta emissione (critico)
text=pivot.values.round(1),
texttemplate="%{text} t",
textfont={"size": 11},
hovertemplate="Categoria: %{x}
Tier: %{y}
%{z:.1f} tCO2e<extra></extra>",
colorbar=dict(title="tCO2e")
))
fig.update_layout(
title="Heatmap Scope 3: Emissioni per Categoria e Qualità Dato",
xaxis_title="Categoria GHG Protocol",
yaxis_title="Tier Qualità Dato",
height=350,
margin=dict(l=80, r=20, t=60, b=60)
)
return fig
def generate_scope3_dashboard_html(
emission_data: list[dict],
output_path: str
) -> None:
"""Genera report HTML standalone con tutti i grafici"""
sankey = create_scope3_sankey(emission_data)
heatmap = create_category_heatmap(emission_data)
total_tco2e = sum(d["co2e_tonnes"] for d in emission_data)
by_category = {}
for d in emission_data:
cat = d["scope3_category"]
by_category[cat] = by_category.get(cat, 0) + d["co2e_tonnes"]
top_category = max(by_category, key=by_category.get)
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>Scope 3 Emissions Report</title>
<meta charset="utf-8">
</head>
<body>
<h1>Scope 3 Value Chain Emissions Report</h1>
<p>Totale: <strong>{total_tco2e:.1f} tCO2e</strong></p>
<p>Categoria più materiale: Cat. {top_category}
({by_category[top_category]:.1f} tCO2e)</p>
{sankey.to_html(full_html=False)}
{heatmap.to_html(full_html=False)}
</body>
</html>
"""
with open(output_path, "w") as f:
f.write(html_content)
Cerințe CSRD/ESRS E1 pentru domeniul de raportare 3
La Directiva privind raportarea durabilității corporative (CSRD) și standardul aferent ESRS E1 (schimbări climatice) ele transformă raportarea Scope 3 din voluntară în obligatoriu pentru mii de companii europene. Calendarul de implementare este eșalonat și deja în curs.
CSRD Scope 3 Cronologie obligatorie
| FY | Raportați că voi intra | Subiecte | Note |
|---|---|---|---|
| 2024 | La începutul anului 2025 | PIE mari supuse deja NFRD (>500 de angajați) | Primul val: ~12.000 de companii din UE |
| 2025 | La începutul anului 2026 | Toate companiile mari (>250 departament sau >40 milioane EUR) | ~50.000 de companii din UE |
| 2026 | La începutul anului 2027 | IMM-uri listate | Standard ESRS simplificat |
| 2028 | La începutul anului 2029 | Companii din afara UE cu filiale din UE | Impact global semnificativ |
ESRS E1 cere în mod special pentru emisiile Scope 3:
- Dezvăluirea tuturor categoriilor de materiale: trebuie determinată materialitatea prin analiza de dubla materialitate (impact + risc financiar). În cea mai mare parte dintre companiile de tehnologie, cel puțin 4-6 categorii sunt materiale.
- Defalcare pe categorii: valorile nu pot fi raportate ca un singur total agregat; fiecare categorie de materiale trebuie să aibă propriile sale date în tCO2e.
- Metodologie explicită: pentru fiecare categorie trebuie declarata metoda de calcul (pe bază de activitate, pe bază de cheltuieli, specific furnizor), sursa factorilor de emisie și nivelul calitatea datelor.
- Fără compensare cu credite de carbon: Emisiile brute trebuie raportate separat de orice compensație sau compensare de carbon achiziționată.
- Asigurare obligatorie: asigurare limitata initial, cu obiectivul de trece la asigurarea rezonabilă în viitor. Pista de audit descrisă în acest articol exact asta vor cere recenzenții.
- Țintă și plan de tranziție: firmele trebuie să declare obiective ale reducerea aliniată la 1,5°C (de preferință validată de SBTi) cu repere intermediare.
Atenție: Scop 3 și dublă materialitate
ESRS E1 nu necesită raportarea tuturor celor 15 categorii Scope 3, ci doar a celor identificate cum materiale în analiza materialităţii duale. Cu toate acestea, procesul de determinarea semnificației trebuie să fie documentată și auditabilă. Excludeți unul categoria „din lipsă de date” nu este o justificare acceptabilă: trebuie demonstrată că acea categorie nu este semnificativă pentru afacerea specifică.
Studiu de caz: Companie SaaS cu 50 de furnizori
Să traducem totul într-un exemplu concret: un SaaS mediu italian cu 150 de angajați, venituri 15 milioane EUR, infrastructură pe AWS și 50 de furnizori activi. Conducerea a decis să începeți calculul Scope 3 înainte de CSRD și aveți o fereastră de 3 luni pentru livrare date verificabile de auditorul dumneavoastră.
Profil companie: SaaS Italia S.r.l.
| Parametru | Valoare |
|---|---|
| Angajatii | 150 (70% lucru inteligent) |
| Locații | Sediul central din Milano + biroul Roma |
| Infrastructură | AWS eu-west-1 (principal), GCP europe-west1 (de rezervă) |
| Furnizori activi | 50 (8 mari, 42 mici/medii) |
| Cheltuieli de achiziții | ~4,2 milioane EUR/an |
| Zboruri anuale | ~380 de zboruri (conferințe + clienți) |
Faza 1 – Analiza de semnificație (Săptămâna 1-2): Echipa ESG are a efectuat o analiză rapidă pentru a identifica categoriile de materiale Scope 3. Utilizarea datelor de la cheltuielile din ERP (SAP) ca proxy inițial cu factori EEIO, au obținut această estimare de "screening":
Analiza materialității Scope 3 — SaaS Italia S.r.l.
| Pisică. | Descriere | Estimare bazată pe cheltuieli (tCO2e) | % din total | Decizie |
|---|---|---|---|---|
| 1 | Bunuri și servicii achiziționate (cloud, SW) | 342 | 54% | MATERIAL → Bazat pe activitate |
| 6 | Călătorii de afaceri | 98 | 15% | MATERIAL → Bazat pe activitate |
| 7 | Naveta angajaților | 87 | 14% | MATERIAL → Sondajele angajaților |
| 11 | Utilizarea produselor vândute | 76 | 12% | MATERIAL → Măsurare SKI |
| 2 | Bunuri de capital (laptop-uri, hardware) | 28 | 4% | MATERIAL → Furnizor PCF |
| Alte | Pisică. 3, 5, 8, 15 | 6 | 1% | Nematerial → Bazat pe cheltuieli |
Faza 2 – Colectarea datelor (Săptămâna 2-8):
- Pisică. 1 (Nor): AWS Customer Carbon Footprint Tool și GCP Carbon Footprint furnizează date lunare de emitere per cont. Date extrase prin API și încărcate în Bronze straturi. Calitate: TIER 1 (specific furnizorului, verificat AWS).
- Pisică. 1 (Software și servicii): 8 furnizori mari (>50K EUR/an) contactați cu un chestionar structurat. 5 au răspuns cu date primare (inclusiv Microsoft ERP, Slack, Salesforce). 3 nu au date → bazate pe cheltuieli cu EEIO.
- Pisică. 6 (Călătorii de afaceri): date extrase de la agenția de turism (Carlson Wagonlit) prin API: 380 de zboruri cu rutare și clasă. Calcul bazat pe activitate cu DEFRA 2024.
- Pisică. 7 (naveta): sondaj anonim pentru toți cei 150 de angajați (rată de răspuns de 82%). Mijloace de transport, distanta medie, zile pe saptamana la birou.
- Pisică. 11 (Utilizarea produselor vândute): SCI (Software Carbon Intensity) calculat cu CodeCarbon pe infrastructura de producție. Înmulțit cu numărul de sesiuni active/lună.
# case_study/saas_italia_scope3.py
# Calcolo completo Scope 3 per SaaS Italia S.r.l.
from calculators.emission_calculator import (
calculate_activity_based,
calculate_spend_based,
calculate_category_total
)
def calculate_cat1_cloud() -> dict:
"""Cat. 1: Emissioni cloud AWS + GCP (dati primari vendor)"""
# Dati estratti dall'AWS Customer Carbon Footprint API
aws_kwh_year = 187_500 # kWh totali 2024
gcp_kwh_year = 12_300
aws_ef = 0.233 # kg CO2e/kWh IT grid (AWS eu-west-1)
gcp_ef = 0.198 # kg CO2e/kWh GCP europe-west1
aws_co2, aws_unc = calculate_activity_based("cloud_compute_kwh", aws_kwh_year, aws_ef)
gcp_co2, gcp_unc = calculate_activity_based("cloud_compute_kwh", gcp_kwh_year, gcp_ef)
activities = [
{"value_kg": aws_co2, "uncertainty_pct": 8.0, "quality_tier": "TIER_1"},
{"value_kg": gcp_co2, "uncertainty_pct": 10.0, "quality_tier": "TIER_1"},
]
result = calculate_category_total(activities)
result["category"] = 1
result["sub_category"] = "cloud_infrastructure"
return result
def calculate_cat6_business_travel() -> dict:
"""Cat. 6: Business travel (dati agenzia viaggi)"""
# 380 voli totali anno 2024
# 60% corto raggio (<1500km), 40% lungo raggio
short_haul_pkm = 380 * 0.6 * 850 # 850km avg corto raggio
long_haul_pkm = 380 * 0.4 * 3200 # 3200km avg lungo raggio
short_co2, short_unc = calculate_activity_based(
"flight_economy_short", short_haul_pkm
)
long_co2, long_unc = calculate_activity_based(
"flight_economy_long", long_haul_pkm
)
# Radiative forcing factor x1.9 per quota alta
rf_factor = 1.9
short_co2 *= rf_factor
long_co2 *= rf_factor
activities = [
{"value_kg": short_co2, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"},
{"value_kg": long_co2, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"},
]
result = calculate_category_total(activities)
result["category"] = 6
return result
def calculate_cat7_commuting() -> dict:
"""Cat. 7: Employee commuting (survey 123/150 dipendenti)"""
# Risultati survey (valori medi per dipendente/anno)
commuters = {
"car_solo": {"count": 38, "km_day": 28, "days_year": 120},
"car_shared": {"count": 12, "km_day": 22, "days_year": 110},
"public_transport": {"count": 52, "km_day": 35, "days_year": 140},
"cycling_walking": {"count": 21, "km_day": 4, "days_year": 150},
}
activities = []
# Auto privata
car_pkm = (
commuters["car_solo"]["count"] *
commuters["car_solo"]["km_day"] *
commuters["car_solo"]["days_year"]
)
co2_car, unc = calculate_activity_based("car_average", car_pkm)
activities.append({"value_kg": co2_car, "uncertainty_pct": 15.0, "quality_tier": "TIER_2"})
# Trasporto pubblico
pt_pkm = (
commuters["public_transport"]["count"] *
commuters["public_transport"]["km_day"] *
commuters["public_transport"]["days_year"]
)
co2_pt, unc = calculate_activity_based("public_transport_it", pt_pkm)
activities.append({"value_kg": co2_pt, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"})
# Ciclismo/piedi: zero emissioni dirette
activities.append({"value_kg": 0.0, "uncertainty_pct": 0.0, "quality_tier": "TIER_1"})
result = calculate_category_total(activities)
result["category"] = 7
result["survey_response_rate"] = 82.0
return result
def calculate_cat11_use_of_products() -> dict:
"""Cat. 11: Energia consumata dai clienti usando il SaaS"""
# SCI = 0.045 gCO2e per ogni API call (misurato con CodeCarbon)
sci_gco2e_per_call = 0.045
avg_calls_per_month = 48_500_000 # 48.5M calls/mese (dati produzione)
months = 12
total_calls = avg_calls_per_month * months
co2e_grams = total_calls * sci_gco2e_per_call
co2e_kg = co2e_grams / 1000
activities = [
{"value_kg": co2e_kg, "uncertainty_pct": 25.0, "quality_tier": "TIER_2"}
]
result = calculate_category_total(activities)
result["category"] = 11
result["metric"] = "API calls"
result["total_calls"] = total_calls
return result
def run_full_scope3_calculation() -> dict:
"""Esegue il calcolo completo Scope 3 per SaaS Italia S.r.l."""
results = {
"cat_1_cloud": calculate_cat1_cloud(),
"cat_6_travel": calculate_cat6_business_travel(),
"cat_7_commuting": calculate_cat7_commuting(),
"cat_11_use": calculate_cat11_use_of_products(),
}
# Categoria residuale (spend-based per tutto il resto)
residual_spend_eur = 210_000 # ~5% della spesa totale
residual_co2_kg, res_unc = calculate_spend_based(
residual_spend_eur, "it_services"
)
results["cat_residual"] = {
"total_co2e_tonnes": residual_co2_kg / 1000,
"uncertainty_pct": res_unc,
"category": "other",
"dominant_quality_tier": "TIER_3"
}
# Totale Scope 3
total_tco2e = sum(
v["total_co2e_tonnes"] for v in results.values()
)
from calculators.emission_calculator import propagate_uncertainty
all_values = [v["total_co2e_tonnes"] for v in results.values()]
all_uncertainties = [v["uncertainty_pct"] for v in results.values()]
overall_uncertainty = propagate_uncertainty(all_values, all_uncertainties)
return {
"company": "SaaS Italia S.r.l.",
"reporting_year": 2024,
"methodology": "GHG Protocol Corporate Value Chain Standard",
"scope3_total_tco2e": round(total_tco2e, 1),
"overall_uncertainty_pct": round(overall_uncertainty, 1),
"categories": results,
"notes": "Cat. 11 include radiative forcing factor per aviation"
}
if __name__ == "__main__":
import json
report = run_full_scope3_calculation()
print("=" * 50)
print(f"SCOPE 3 TOTALE: {report['scope3_total_tco2e']} tCO2e")
print(f"Incertezza: +/- {report['overall_uncertainty_pct']}%")
print("=" * 50)
for name, cat in report["categories"].items():
tco2e = cat.get("total_co2e_tonnes", 0)
unc = cat.get("uncertainty_pct", 0)
pct = tco2e / report["scope3_total_tco2e"] * 100
print(f" {name:25s} {tco2e:6.1f} tCO2e ({pct:.0f}%) ±{unc:.0f}%")
Rezultatul calculului pentru SaaS Italia S.r.l. produce:
Rezultate finale Scope 3 — SaaS Italia S.r.l. (AF 2024)
| Categorie | tCO2e | % Total | Incertitudine | Niveluri |
|---|---|---|---|---|
| Pisică. 1 – Cloud și servicii | 43.7 | 36% | ±9% | NIVELUL 1 |
| Pisică. 6 – Călătorii de afaceri | 33.5 | 28% | ±20% | NIVELUL 2 |
| Pisică. 7 – Naveta | 23.8 | 20% | ±17% | NIVELUL 2 |
| Pisică. 11 – Utilizarea Produselor | 26.2 | 22% | ±25% | NIVELUL 2 |
| Cat rezidual | 6.5 | 5% | ±75% | NIVELUL 3 |
| DOMENIU TOTAL 3 | 133,7 | 100% | ±13% | NIVELUL 2 |
Adăugarea domeniului 1 (~8 tCO2e de la cazanul HQ) și domeniul 2 (~12 tCO2e de la electricitate al birourilor), obțineți un amprenta totală de ~154 tCO2e pentru 2024, din care 87% sunt Scope 3. Exact tiparul tipic al companiilor de software.
Cele mai bune practici și anti-modele în pipeline Scope 3
Scopul 3 Lista de verificare a implementării conductei
| Zonă | Cele mai bune practici | Anti-modele de evitat |
|---|---|---|
| Date | Stratul de bronz imuabil pentru fiecare dată primită | Suprascrieți datele brute cu versiuni corectate |
| Calcul | Versiune factorii de emisie utilizați | Utilizați EF fără a indica anul și sursa |
| Incertitudine | Propagați întotdeauna incertitudinea în fiecare categorie | Raportați numai valoarea exactă fără interval |
| Calitate | Scorul de calitate explicit și documentat | Amestecați TIER 1 și TIER 3 fără distincție |
| Audituri | Lanț hash pentru fiecare calcul, în afara lanțului verificabil | Raportul Excel nu este versiune și nu poate fi urmărit |
| Furnizor | Prioritizează primii 20 de furnizori în funcție de cheltuieli/emisii | Tratează toți cei 50 de furnizori la fel |
| Actualizare | Planul anual de îmbunătățire a calității oferit | Acceptați bazat pe cheltuieli ca soluție permanentă |
| Mături | Documentați în mod explicit excluderile motivate | Excludeți categorii fără justificare formală |
Plan de îmbunătățire progresivă (foaia de parcurs pentru maturitatea datelor)
Protocolul GHG încurajează în mod explicit o abordare progresivă: este mai bine să ai date de calitate scăzută bazate pe cheltuieli astăzi, care nu au nimic. Scopul este de a îmbunătăți în fiecare an nivelul de calitate al categoriilor de materiale:
- Anul 1 (linie de bază): 100% bazat pe cheltuieli, eroare ±75%, TIER 3
- Anul 2: Top 10 furnizor cu activitate, ±40%, TIER 2
- Anul 3: primii 20 furnizori cu date primare verificate, ±20%, TIER 2
- Anul 4+: Integrare EcoVadis/CDP pentru toți furnizorii, ±10%, TIER 1
Această îmbunătățire progresivă poate fi documentată în raportul CSRD ca „evoluția metodologiei” și este evaluată pozitiv de către recenzori.
Schema bazei de date pentru stratul de aur
Stratul Gold necesită o schemă concepută pentru a suporta interogări agregate rapide pentru raportarea CSRD, menținerea trasabilității către lanțul de audit.
-- schema/scope3_gold.sql
-- Schema PostgreSQL per il Gold Layer Scope 3
-- Tabella principale: emissioni aggregate per categoria
CREATE TABLE scope3_emissions_gold (
id BIGSERIAL PRIMARY KEY,
company_id VARCHAR(50) NOT NULL,
reporting_year INTEGER NOT NULL,
scope3_category INTEGER NOT NULL CHECK (scope3_category BETWEEN 1 AND 15),
supplier_id VARCHAR(100),
-- Valori emissioni
co2e_tonnes DECIMAL(12, 3) NOT NULL,
co2_tonnes DECIMAL(12, 3),
ch4_tonnes_co2e DECIMAL(12, 3),
n2o_tonnes_co2e DECIMAL(12, 3),
-- Metodologia e qualità
calculation_method VARCHAR(30) NOT NULL, -- activity_based, spend_based, etc.
emission_factor_source VARCHAR(100) NOT NULL,
emission_factor_value DECIMAL(10, 6),
quality_tier VARCHAR(10) NOT NULL, -- TIER_1, TIER_2, TIER_3
uncertainty_pct DECIMAL(5, 1) NOT NULL,
uncertainty_tonnes DECIMAL(12, 3) GENERATED ALWAYS AS
(co2e_tonnes * uncertainty_pct / 100) STORED,
-- Tracciabilità
audit_chain_id UUID NOT NULL REFERENCES scope3_audit_chain(chain_id),
pipeline_version VARCHAR(20) NOT NULL,
reporting_standard VARCHAR(50) DEFAULT 'GHG_Protocol_Scope3_2011',
-- Timestamps
published_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Unicità per reporting period
CONSTRAINT uq_emission_period
UNIQUE (company_id, reporting_year, scope3_category, supplier_id)
);
-- Indici per performance query CSRD report
CREATE INDEX idx_scope3_company_year
ON scope3_emissions_gold (company_id, reporting_year);
CREATE INDEX idx_scope3_category
ON scope3_emissions_gold (scope3_category);
CREATE INDEX idx_scope3_quality
ON scope3_emissions_gold (quality_tier, uncertainty_pct);
-- View per report aggregato CSRD
CREATE VIEW v_scope3_csrd_report AS
SELECT
company_id,
reporting_year,
scope3_category,
SUM(co2e_tonnes) AS total_co2e_tonnes,
-- Propagazione incertezza quadratica
SQRT(SUM(POWER(co2e_tonnes * uncertainty_pct / 100, 2))) /
NULLIF(SUM(co2e_tonnes), 0) * 100 AS combined_uncertainty_pct,
-- Qualità aggregata (tier peggiore nella categoria)
MIN(quality_tier) AS data_quality_tier,
-- Metodo più usato
MODE() WITHIN GROUP (ORDER BY calculation_method) AS primary_method,
COUNT(DISTINCT supplier_id) AS supplier_count,
MAX(updated_at) AS last_updated
FROM scope3_emissions_gold
GROUP BY company_id, reporting_year, scope3_category
ORDER BY company_id, reporting_year, scope3_category;
-- Tabella audit chain
CREATE TABLE scope3_audit_chain (
chain_id UUID PRIMARY KEY,
record_id UUID NOT NULL DEFAULT gen_random_uuid(),
sequence INTEGER NOT NULL,
record_type VARCHAR(50) NOT NULL,
previous_hash CHAR(64) NOT NULL,
record_hash CHAR(64) NOT NULL UNIQUE,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT uq_chain_sequence UNIQUE (chain_id, sequence)
);
-- Indice per verifica integrità chain
CREATE INDEX idx_audit_chain_id_seq
ON scope3_audit_chain (chain_id, sequence);
Concluzii și pașii următori
Construirea unei conducte solide pentru emisiile din Scopul 3 nu este un exercițiu academic: este oinfrastructura critică de date care va deveni obligatoriu pt mii de companii europene până în 2028. Principiile cheie pe care le-am văzut în acest articol sunt aplicabile indiferent de dimensiunea companiei:
- Imutabilitatea datelor brute: Strat de bronz cu garanții de hash SHA-256 că fiecare recenzent poate urmări întotdeauna datele înapoi la sursa originală, chiar și ani mai târziu.
- Progresivitatea metodologică: Începeți cu pe baza cheltuielilor și migrați către bazată pe activitate pentru categoriile de materiale este abordarea recomandată de Protocolul GHG în sine, nu o scurtătură.
- Cuantificarea incertitudinii: Raportați emisiile fără interval de încredere este o informație incompletă. Propagarea pătratică a incertitudinii este simplu de implementat și fundamental pentru credibilitatea raportului.
- Pista de audit verificabilă: Lanțul hash permite un verificator extern pentru a confirma matematic că nicio dată nu a fost modificată după calcul.
- Integrarea ecosistemelor: platforme precum EcoVadis Carbon Data Network și CDP reduce foarte mult sarcina colectării datelor, în special pentru lanțurile mari de aprovizionare.
Studiul de caz al SaaS Italia S.r.l. arată că chiar şi o întreprindere mijlocie poate realizarea unui raport conform Scope 3 CSRD în 3 luni cu o echipă de 2-3 persoane, combinând date primare pentru categoriile de materiale și bazate pe cheltuieli pentru cele reziduale. Cheia este prioritizarea: nu căuta perfecțiunea peste tot, ci concentrează-ți efortul unde emisiile sunt cele mai mari.
Resurse utile
- Protocolul GHG Scopul 3 Standard: ghgprotocol.org/corporate-value-chain-scope-3-standard
- Climatiq API (baza de date cu factori de emisie): climatiq.io
- EcoVadis Carbon Data Network: ecovadis.com/solutions/carbon
- ESRS E1 Schimbările climatice (text oficial UE): EFRAG ESRS E1
- EXIOBAZA 3.8 (factori bazați pe cheltuieli EEIO): exiobase.eu
Următorul articol din serie
În articolul următor API de raportare ESG: Integrare cu fluxul de lucru CSRD vom construi un strat API REST pe deasupra datelor Scope 3 calculate în acest articol, implementând terminale conforme cu formatele cerute de directiva europeană și care integrează fluxul de lucru aprobarea raportului cu semnătura digitală a auditorului.
Vom vedea, de asemenea, cum să expunem datele în format XBRL/iXBRL pentru depunere la ESEF (European Single Electronic Format), formatul obligatoriu pentru rapoartele CSRD listata la bursa europeana de valori.







