Hodnotový řetězec emisního potrubí Rozsah 3: Od nezpracovaných dat po auditní záznam
Il 70-90 % celkových emisí technologické nebo softwarové společnosti skrývá mimo své vlastní hranice: v cloudových serverech zakoupených jako služba, v laptopech zaměstnanců, v obchodních letech, v kódu které zákazníci provozují na svých zařízeních. To jsou emise Rozsah 3a z velké části část digitálních organizací představuje nejsložitější problém měření, který existuje Oblast ESG.
Na rozdíl od rozsahu 1 (spalujeme palivo) a rozsahu 2 (kupujeme elektřinu), Rozsah 3 vyžaduje shromažďování údajů od stovek dodavatelů za použití heterogenních emisních faktorů, zvládat velmi vysoké stupně nejistoty a vytvářet auditní stopu ověřitelnou externími auditory. s CSRD/ESRS E1 což činí ohlašování podle rozsahu 3 povinným pro velké společnosti počínaje 2025–2026 a s malými a středními podniky v hledáčku do roku 2028 už tato otázka není akademická: strojírenství.
V tomto článku jeden postavíme kompletní potrubí pro výpočet emisí rozsahu 3 hodnotový řetězec: od architektury ETL pro sběr dodavatelských dat až po integraci s platformami, jako je CDP a EcoVadis, od výpočtu založeného na aktivitě vs. útraty až po Airflow DAG pro automatizaci a na neměnnou auditní stopu pro ověřovatele. Každá sekce obsahuje funkční kód Pythonu a osvědčené postupy funkční testován v reálných kontextech.
Co se naučíte
- 15 kategorií GHG Protocol Scope 3 a které z nich jsou relevantní pro softwarové/SaaS společnosti
- Architektura ETL/ELT pro sběr dodavatelských dat: dotazníky, CDP, EcoVadis a přímá API
- Na základě aktivity vs. založené na výdajích: vzorce, přesnost a kdy jaký přístup použít
- Python pipeline s Apache Airflow: DAG pro automatizovaný a škálovatelný výpočet Scope 3
- Bodování kvality dat a šíření statistické nejistoty v odhadech
- Neměnný auditní záznam s hašovacím řetězcem SHA-256 pro sledovatelnost externího ověřovatele
- Vizualizace Sankey diagramu pro prioritní kategorie hodnotového řetězce a teplotní mapy
- Požadavky CSRD/ESRS E1: co musíte zveřejnit a s jakou podrobností
- Kompletní případová studie: SaaS společnost s 50 dodavateli, end-to-end kalkulace rozsahu 3
- Integrace s EcoVadis Carbon Data Network a Climatiq API pro emisní faktory
Green Software Series – 10 článků
| # | Položka | Podrobit |
|---|---|---|
| 1 | Principy Green Software Foundation | Uhlíková účinnost, GSF, SCI |
| 2 | CodeCarbon: Měření kódu | Měření, dashboard, optimalizace |
| 3 | Climatiq API: Carbon Calculations | REST API, protokol GHG, rozsah 1-3 |
| 4 | Carbon Aware SDK | Posun času, posun místa |
| 5 | Rozsah 1-2-3: Modelování dat ESG | Struktura dat, výpočty, agregace |
| 6 | GreenOps: Carbon-Aware Kubernetes | Plánování, škálování, monitorování |
| 7 | Hodnotový řetězec emisního potrubí Rozsah 3 | Tento článek |
| 8 | ESG Reporting API: CSRD | API, workflow, compliance |
| 9 | Udržitelné architektonické vzory | Ukládání, ukládání do mezipaměti, dávka |
| 10 | AI a Carbon: Školení ML | Školení ML, optimalizace, zelená AI |
15 GHG Protocol Rozsah 3 kategorie
Il GHG Protocol Corporate Value Chain (rozsah 3) Standard je to mezinárodní rámec odkaz zveřejněný v roce 2011 a v současné době je revidován s aktualizací očekávanou v roce 2026. Rozděluje se nepřímé emise hodnotového řetězce v 15 různých kategorií, organizované ve dvou makroskupiny: proti proudu (činnosti před poskytnutím výroby/služby) e po proudu (činnosti po prodeji zákazníkovi).
15 Rozsah 3 Kategorie: Proti proudu a po proudu
| Kočka. | Jméno | Tok | SaaS/Tech Relevance |
|---|---|---|---|
| 1 | Zakoupené zboží a služby | Proti proudu | VYSOKÉ: serverový hardware, softwarové licence, poradenské služby |
| 2 | Kapitálové statky | Proti proudu | MÉDIA: vybavení datových center, notebooky, firemní telefony |
| 3 | Činnosti související s palivy a energií | Proti proudu | PRŮMĚR: emise z nakoupené výroby energie (předřazený rozsah 2) |
| 4 | Upstream doprava a distribuce | Proti proudu | NÍZKÁ: dodávky hardwaru do kanceláří a datových center |
| 5 | Odpad vznikající při provozu | Proti proudu | NÍZKÁ: OEEZ, papír, kancelářský odpad |
| 6 | Služební cesty | Proti proudu | VYSOKÉ: lety, hotely, vlaky pro distribuované týmy |
| 7 | Dojíždění zaměstnanců | Proti proudu | VYSOKÉ: cestování z domova do kanceláře, zejména pro hybridní týmy |
| 8 | Upstream pronajatá aktiva | Proti proudu | MÉDIA: kanceláře k pronájmu (pokud nejsou zahrnuty v rozsahu 1/2) |
| 9 | Následná doprava a distribuce | Po proudu | NÍZKÁ: distribuce softwaru na fyzickém médiu (vzácné) |
| 10 | Zpracování prodávaných výrobků | Po proudu | N/A: Nevztahuje se na čistý software |
| 11 | Použití prodávaných produktů | Po proudu | VELMI VYSOKÉ: Energie spotřebovaná zákazníky používajícími SaaS |
| 12 | Ošetření prodávaných výrobků po skončení životnosti | Po proudu | NÍZKÁ: uživatelská zařízení na konci životnosti |
| 13 | Následný pronajatý majetek | Po proudu | MÉDIA: Hardware pronajatý zákazníkům |
| 14 | Franšízy | Po proudu | N/A: Nelze použít |
| 15 | Investice | Po proudu | VYSOKÉ: firemní portfolio, kapitálové investice do startupů |
Pro SaaS nebo společnost zabývající se vývojem softwaru jsou nejrelevantnější kategorie obvykle: Kočka. 1 (nakoupené zboží a služby, často největší položka), Kočka. 6 (služební cesty), Kočka. 7 (dojíždění zaměstnanců) e Kočka. 11 (použití prodaných výrobků). Tam dvojí významnost žádost z CSRD vyžaduje identifikaci, které kategorie jsou významné jak z hlediska dopadu environmentální a finanční riziko pro společnost.
Častá chyba: Vynechání kat. 11 pro SaaS
Mnoho softwarových společností vylučuje kategorii 11 („Použití prodaných produktů“) za předpokladu, že se na ni nevztahuje. Ve skutečnosti každé volání API, každý dotaz, každý watt spotřebovaný zákazníky na spuštění toho vašeho software je Scope 3 Cat. 11 emisí, za které zodpovídáte. Pro SaaS s miliony uživatelů, může to být dominantní kategorie. Metoda výpočtu používá software uhlíkové intenzity (SCI). vynásobené dodanými funkčními jednotkami.
Architektura potrubí sběru dat
Shromažďování spolehlivých dat z celého hodnotového řetězce je překážkou číslo jedna pro jakýkoli podnik Rozsah 3 projekt. Potrubí musí spravovat heterogenní zdroje: manuální dotazníky, platformy ESG třetích stran části, přímá rozhraní API s dodavateli, soubory CSV zaslané e-mailem, interní data ERP. Následující architektura přijmout vzor Třívrstvý ETL (Bronz/Silver/Gold) inspirovaný Lakehouse.
Pipeline Architecture Scope 3: Bronz / Silver / Gold
| Vrstvy | Obsah | Technologie | Rozsah |
|---|---|---|---|
| bronz (surový) | Neměnná nezpracovaná data od dodavatelů | S3/GCS, jezero Delta | Audit trail, replay, zdroj pravdy |
| Stříbro (standardizované) | Data normalizovaná podle jednotky a měny | dbt, Spark, Pandy | Výpočet emisí, spojení s emisními faktory |
| Zlato (přehledy) | Souhrnné emise podle kategorie GHG | PostgreSQL, BigQuery | Dashboardy, zprávy CSRD, ověřovatelé |
Bronzová vrstva je nezbytná: všechna přijatá data jsou uložena tak jak je s časovým razítkem zpracování, hash SHA-256 obsahu a zdrojových metadat. Tím je zaručena možnost přepracovat celé potrubí, pokud se změní emisní faktory nebo metodika, bez ztráty původní údaje.
# models/scope3_pipeline.py
# Struttura dati per la pipeline Scope 3
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import hashlib
import json
class DataSource(Enum):
SUPPLIER_QUESTIONNAIRE = "supplier_questionnaire"
CDP_API = "cdp_api"
ECOVADIS_API = "ecovadis_api"
ERP_EXPORT = "erp_export"
MANUAL_UPLOAD = "manual_upload"
CLIMATIQ_API = "climatiq_api"
class CalculationMethod(Enum):
ACTIVITY_BASED = "activity_based"
SPEND_BASED = "spend_based"
HYBRID = "hybrid"
SUPPLIER_SPECIFIC = "supplier_specific"
class DataQualityTier(Enum):
TIER_1 = "primary_data" # Dati primari dal supplier
TIER_2 = "secondary_sector" # Fattori settoriali
TIER_3 = "spend_estimated" # Stima basata su spesa
@dataclass
class RawSupplierData:
"""Layer Bronze: dato grezzo immutabile"""
supplier_id: str
source: DataSource
raw_payload: dict
received_at: datetime
content_hash: str = field(init=False)
def __post_init__(self):
payload_str = json.dumps(self.raw_payload, sort_keys=True)
self.content_hash = hashlib.sha256(
payload_str.encode()
).hexdigest()
@dataclass
class StandardizedActivity:
"""Layer Silver: attività normalizzata"""
activity_id: str
supplier_id: str
scope3_category: int # 1-15
activity_type: str # es. "freight_transport"
quantity: float
unit: str # es. "tonne.km"
reporting_period_start: datetime
reporting_period_end: datetime
source: DataSource
quality_tier: DataQualityTier
emission_factor_id: Optional[str] = None
uncertainty_pct: float = 0.0
raw_data_hash: str = "" # Ref al Bronze layer
@dataclass
class EmissionResult:
"""Layer Gold: emissione calcolata"""
result_id: str
activity_id: str
scope3_category: int
co2e_tonnes: float
calculation_method: CalculationMethod
emission_factor_source: str # es. "climatiq:IPCC_2021"
emission_factor_value: float
quality_tier: DataQualityTier
uncertainty_pct: float
calculated_at: datetime
pipeline_version: str
audit_hash: str = field(init=False)
def __post_init__(self):
audit_data = {
"result_id": self.result_id,
"activity_id": self.activity_id,
"co2e_tonnes": self.co2e_tonnes,
"emission_factor_source": self.emission_factor_source,
"calculated_at": self.calculated_at.isoformat(),
"pipeline_version": self.pipeline_version,
}
self.audit_hash = hashlib.sha256(
json.dumps(audit_data, sort_keys=True).encode()
).hexdigest()
Integrace dodavatelských dat: CDP, EcoVadis a Direct API
Sběr dat od dodavatelů probíhá prostřednictvím několika kanálů s úrovní kvality a kvality velmi odlišná automatizace. The Carbon Disclosure Project (CDP) sbírá data z více než 24 000 společností a vystavuje API pro přístup k ověřeným reportům. EcoVadis v roce 2025 spustila síť Carbon Data Network s více než 48 000 reportéry o skleníkových plynech, které sdílejí data určitým způsobem standardizované. Konečně, mnoho velkých dodavatelů nabízí proprietární API pro přímé sdílení svých stop.
# collectors/supplier_collector.py
# Integrazione con fonti dati supplier
import httpx
import asyncio
from typing import AsyncGenerator
from datetime import datetime
from models.scope3_pipeline import RawSupplierData, DataSource
class ClimatiqEmissionFactors:
"""Client per Climatiq API - emission factors database"""
BASE_URL = "https://beta3.api.climatiq.io"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async def get_emission_factor(
self,
activity_id: str,
year: int = 2024,
region: str = "IT"
) -> dict:
"""Recupera fattore di emissione per attività specifica"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/estimate",
headers=self.headers,
json={
"emission_factor": {
"activity_id": activity_id,
"data_version": "^21",
"year": year,
"region": region
},
"parameters": {
"money": 1.0,
"money_unit": "eur"
}
}
)
response.raise_for_status()
return response.json()
async def batch_estimate(
self,
activities: list[dict]
) -> list[dict]:
"""Stima batch per multiple attività - ottimizza le API call"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/batch",
headers=self.headers,
json={"batch": activities},
timeout=30.0
)
response.raise_for_status()
return response.json().get("results", [])
class EcoVadisCollector:
"""Raccoglie dati Scope 3 dalla piattaforma EcoVadis"""
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
async def fetch_supplier_carbon_data(
self,
supplier_ecovadis_id: str,
reporting_year: int
) -> RawSupplierData:
"""Recupera dati carbonio per un supplier dalla Carbon Data Network"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/v1/suppliers/{supplier_ecovadis_id}/carbon",
headers={"X-API-Key": self.api_key},
params={"year": reporting_year},
timeout=15.0
)
if response.status_code == 404:
# Supplier non ha condiviso dati primari
return self._create_no_data_record(supplier_ecovadis_id)
response.raise_for_status()
payload = response.json()
return RawSupplierData(
supplier_id=supplier_ecovadis_id,
source=DataSource.ECOVADIS_API,
raw_payload=payload,
received_at=datetime.utcnow()
)
def _create_no_data_record(self, supplier_id: str) -> RawSupplierData:
return RawSupplierData(
supplier_id=supplier_id,
source=DataSource.ECOVADIS_API,
raw_payload={"status": "no_data", "supplier_id": supplier_id},
received_at=datetime.utcnow()
)
class CDPCollector:
"""Raccoglie dati da CDP (Carbon Disclosure Project)"""
CDP_API_URL = "https://api.cdp.net/v1"
def __init__(self, api_token: str):
self.api_token = api_token
async def search_supplier(
self,
company_name: str,
year: int = 2024
) -> RawSupplierData | None:
"""Cerca un supplier nel database CDP e recupera dati GHG"""
async with httpx.AsyncClient() as client:
# Ricerca azienda
search_resp = await client.get(
f"{self.CDP_API_URL}/companies/search",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"q": company_name, "year": year}
)
if not search_resp.json().get("results"):
return None
company_id = search_resp.json()["results"][0]["id"]
# Recupera dati GHG disclosure
ghg_resp = await client.get(
f"{self.CDP_API_URL}/companies/{company_id}/ghg-emissions",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"year": year}
)
if ghg_resp.status_code != 200:
return None
return RawSupplierData(
supplier_id=company_name,
source=DataSource.CDP_API,
raw_payload=ghg_resp.json(),
received_at=datetime.utcnow()
)
class BronzeLayerStorage:
"""Salvataggio immutabile nel layer Bronze"""
def __init__(self, storage_client, bucket: str):
self.storage = storage_client
self.bucket = bucket
async def store(self, raw_data: RawSupplierData) -> str:
"""Salva dato grezzo con path deterministico basato su hash"""
path = (
f"scope3/bronze/"
f"{raw_data.received_at.year}/"
f"{raw_data.received_at.month:02d}/"
f"{raw_data.supplier_id}/"
f"{raw_data.content_hash}.json"
)
await self.storage.upload_json(
bucket=self.bucket,
path=path,
data={
"supplier_id": raw_data.supplier_id,
"source": raw_data.source.value,
"received_at": raw_data.received_at.isoformat(),
"content_hash": raw_data.content_hash,
"payload": raw_data.raw_payload
}
)
return path
Na základě aktivity vs. Na základě útraty: Výběr správné metody
Protokol GHG definuje čtyři metody výpočtu pro rozsah 3, což v praxi ano zredukovat na dva základní přístupy: na základě činnosti e založené na výdajích. Výběr závisí na dostupnosti dat a závažnosti kategorie a vyspělosti vztahu s dodavatelem.
Metodologické srovnání: Activity-Based vs Spend-Based
| Velikost | Na základě aktivity | Na základě útraty |
|---|---|---|
| Vzorec | Množství × emisní faktor (jednotky/kg CO2e) | Výdaje (EUR) × faktor EEIO (kg CO2e/EUR) |
| Přesnost | Vysoká (±5–15 % s primárními údaji) | Nízký-Střední (±50-100%) |
| Údaje jsou požadovány | Fyzikální veličiny (kg, km, kWh, t) | Pouze účetní faktury (EUR, USD) |
| Zdroj EF | Climatiq, IPCC, DEFRA, ecoinvent | USEEIO, EXIOBASE, WIOD |
| Kdy jej použít | Materiálové kategorie, velcí dodavatelé | Výkop, malí dodavatelé, Kat. <1 % |
| Snaha o sběr | Vysoká: vyžaduje spolupráci dodavatele | Nízká: Data jsou již v ERP/SAP |
| Přijatelnost CSRD | Oblíbené pro kategorie materiálů | Přijímáno jako počáteční proxy |
Optimální strategie je jeden přístup progresivní hybrid: Začněme s založené na výdajích, aby měl rychlý základ pro celý hodnotový řetězec, pak je postupně migrován směrem k činnosti založené na identifikovaných kategoriích materiálů. Protokol GHG definuje tři úrovně kvality dat (Tier 1, 2, 3), které přesně odpovídají tomuto postupu.
# calculators/emission_calculator.py
# Calcolo emissioni activity-based e spend-based
from dataclasses import dataclass
from typing import Optional
import math
# ============================================================
# EMISSION FACTORS DATABASE (simplified)
# In produzione: usa Climatiq API o database ecoinvent
# ============================================================
EMISSION_FACTORS: dict[str, dict] = {
# Cat. 1: Purchased goods & services
"cloud_compute_kwh": {
"value": 0.233, # kg CO2e/kWh (IT grid mix 2024)
"unit": "kWh",
"source": "IEA 2024",
"uncertainty_pct": 10.0
},
"hardware_laptop": {
"value": 350.0, # kg CO2e/unit (embodied carbon)
"unit": "unit",
"source": "Dell 2024 PCF",
"uncertainty_pct": 20.0
},
# Cat. 6: Business travel
"flight_economy_short": {
"value": 0.255, # kg CO2e/passenger.km
"unit": "passenger.km",
"source": "DEFRA 2024",
"uncertainty_pct": 15.0
},
"flight_economy_long": {
"value": 0.195,
"unit": "passenger.km",
"source": "DEFRA 2024",
"uncertainty_pct": 15.0
},
# Cat. 7: Employee commuting
"car_average": {
"value": 0.170, # kg CO2e/km
"unit": "km",
"source": "DEFRA 2024",
"uncertainty_pct": 12.0
},
"public_transport_it": {
"value": 0.048,
"unit": "passenger.km",
"source": "Ispra 2024",
"uncertainty_pct": 18.0
},
}
# EEIO Spend-based factors (EXIOBASE 3.8)
# kg CO2e per EUR di spesa per categoria merceologica
EEIO_FACTORS: dict[str, float] = {
"it_services": 0.312, # IT e telecomunicazioni
"professional_services": 0.198, # Consulenza, legale, etc.
"office_supplies": 0.445,
"cloud_hosting": 0.287,
"marketing": 0.231,
"utilities": 0.892,
"hr_services": 0.167,
"travel_accommodation": 0.521,
}
def calculate_activity_based(
activity_type: str,
quantity: float,
custom_ef: Optional[float] = None
) -> tuple[float, float]:
"""
Calcola emissioni con metodo activity-based.
Returns:
(co2e_kg, uncertainty_pct)
"""
if custom_ef is not None:
return quantity * custom_ef, 30.0 # alta incertezza EF custom
ef_data = EMISSION_FACTORS.get(activity_type)
if not ef_data:
raise ValueError(f"Emission factor non trovato: {activity_type}")
co2e_kg = quantity * ef_data["value"]
uncertainty = ef_data["uncertainty_pct"]
return co2e_kg, uncertainty
def calculate_spend_based(
spend_eur: float,
procurement_category: str,
inflation_correction: float = 1.0
) -> tuple[float, float]:
"""
Calcola emissioni con metodo spend-based (EEIO).
Args:
spend_eur: importo in EUR
procurement_category: categoria merceologica EEIO
inflation_correction: fattore per correggere inflazione vs anno base EEIO
Returns:
(co2e_kg, uncertainty_pct)
"""
eeio_factor = EEIO_FACTORS.get(procurement_category)
if not eeio_factor:
raise ValueError(f"EEIO factor non trovato: {procurement_category}")
# Corregge per inflazione (EEIO factors spesso in EUR 2015)
adjusted_spend = spend_eur / inflation_correction
co2e_kg = adjusted_spend * eeio_factor
# Lo spend-based ha incertezza intrinsecamente alta
uncertainty = 75.0
return co2e_kg, uncertainty
def propagate_uncertainty(
values: list[float],
uncertainties_pct: list[float]
) -> float:
"""
Propagazione incertezza quadratica (somma in quadratura).
Valida quando le incertezze sono indipendenti.
Returns:
uncertainty_pct sul totale
"""
weighted_variance_sum = sum(
(v * u/100) ** 2
for v, u in zip(values, uncertainties_pct)
)
total = sum(values)
if total == 0:
return 0.0
combined_std = math.sqrt(weighted_variance_sum)
return (combined_std / total) * 100
def calculate_category_total(
activities: list[dict]
) -> dict:
"""
Calcola totale categoria Scope 3 con propagazione incertezza.
activities: lista di {method, value_kg, uncertainty_pct}
"""
if not activities:
return {"total_co2e_kg": 0.0, "uncertainty_pct": 0.0}
values = [a["value_kg"] for a in activities]
uncertainties = [a["uncertainty_pct"] for a in activities]
total_co2e = sum(values)
combined_uncertainty = propagate_uncertainty(values, uncertainties)
# Qualità aggregata: peggiore del gruppo determina il tier
quality_tiers = [a.get("quality_tier", "TIER_3") for a in activities]
dominant_tier = min(quality_tiers) # TIER_1 < TIER_2 < TIER_3 lexicograficamente
return {
"total_co2e_kg": total_co2e,
"total_co2e_tonnes": total_co2e / 1000,
"uncertainty_pct": combined_uncertainty,
"uncertainty_kg": total_co2e * combined_uncertainty / 100,
"dominant_quality_tier": dominant_tier,
"activity_count": len(activities)
}
Průtok vzduchu potrubím: DAG pro automatický výpočet rozsahu 3
Orchestrování kanálu Scope 3 vyžaduje dobře strukturovaný DAG, který spravuje kompletní životní cyklus: sběr dat, standardizace, výpočet emisí, kontrola kvality a publikování ve zlaté vrstvě. DAG musí být idempotentní (spustitelný několikrát bez vedlejších účinků) e resetovatelný v případě částečné selhání.
# dags/scope3_pipeline_dag.py
# Apache Airflow DAG per pipeline emissioni Scope 3
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task, task_group
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import Variable
import json
import logging
logger = logging.getLogger(__name__)
# ============================================================
# CONFIGURAZIONE DAG
# ============================================================
SCOPE3_DAG_CONFIG = {
"reporting_year": 2024,
"companies": [
{"id": "S001", "name": "AWS", "tier": "TIER_1", "source": "ecovadis"},
{"id": "S002", "name": "Microsoft Azure", "tier": "TIER_1", "source": "cdp"},
{"id": "S003", "name": "Supplier_XYZ", "tier": "TIER_2", "source": "questionnaire"},
# ... altri supplier
],
"categories_enabled": [1, 2, 3, 6, 7, 11, 15],
"quality_threshold_pct": 80.0,
"alert_email": "esg-team@company.com"
}
default_args = {
"owner": "esg-team",
"depends_on_past": False,
"email_on_failure": True,
"email": [SCOPE3_DAG_CONFIG["alert_email"]],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="scope3_emissions_pipeline",
default_args=default_args,
description="Pipeline calcolo emissioni Scope 3 value chain",
schedule_interval="@quarterly", # Esecuzione trimestrale
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["emissions", "scope3", "esg", "ghg-protocol"],
max_active_runs=1, # Serializza: mai due calcoli in parallelo
) as dag:
# ============================================================
# FASE 1: RACCOLTA DATI SUPPLIER (in parallelo per supplier)
# ============================================================
@task_group(group_id="data_collection")
def collect_supplier_data():
@task(task_id="fetch_ecovadis_suppliers")
def fetch_ecovadis() -> list[dict]:
"""Raccoglie dati da EcoVadis Carbon Data Network"""
from collectors.supplier_collector import EcoVadisCollector
import asyncio
api_key = Variable.get("ECOVADIS_API_KEY", deserialize_json=False)
collector = EcoVadisCollector(api_key, "https://api.ecovadis.com")
suppliers_ecovadis = [
s for s in SCOPE3_DAG_CONFIG["companies"]
if s["source"] == "ecovadis"
]
results = []
for supplier in suppliers_ecovadis:
raw = asyncio.run(
collector.fetch_supplier_carbon_data(
supplier["id"],
SCOPE3_DAG_CONFIG["reporting_year"]
)
)
results.append({
"supplier_id": raw.supplier_id,
"content_hash": raw.content_hash,
"status": "fetched",
"has_data": raw.raw_payload.get("status") != "no_data"
})
logger.info(f"EcoVadis - Supplier {supplier['id']}: fetched")
return results
@task(task_id="fetch_cdp_suppliers")
def fetch_cdp() -> list[dict]:
"""Raccoglie dati verificati da CDP"""
from collectors.supplier_collector import CDPCollector
import asyncio
api_token = Variable.get("CDP_API_TOKEN")
collector = CDPCollector(api_token)
suppliers_cdp = [
s for s in SCOPE3_DAG_CONFIG["companies"]
if s["source"] == "cdp"
]
results = []
for supplier in suppliers_cdp:
raw = asyncio.run(
collector.search_supplier(
supplier["name"],
SCOPE3_DAG_CONFIG["reporting_year"]
)
)
if raw:
results.append({
"supplier_id": supplier["id"],
"content_hash": raw.content_hash,
"status": "fetched",
"has_data": True
})
else:
results.append({
"supplier_id": supplier["id"],
"status": "not_found",
"has_data": False
})
return results
@task(task_id="process_manual_questionnaires")
def process_questionnaires() -> list[dict]:
"""Processa questionari manuali caricati in S3"""
# In produzione: legge da bucket S3 o SharePoint
# Qui restituiamo dati di esempio
return [{
"supplier_id": "S003",
"status": "processed",
"has_data": True,
"scope3_cat1_tco2e": 45.2,
"scope3_cat6_tco2e": 12.8
}]
ev = fetch_ecovadis()
cdp = fetch_cdp()
q = process_questionnaires()
return [ev, cdp, q]
# ============================================================
# FASE 2: STANDARDIZZAZIONE E CALCOLO EMISSIONI
# ============================================================
@task(task_id="standardize_activities")
def standardize_activities(collection_results: list) -> list[dict]:
"""Normalizza tutti i dati in unità fisiche standard"""
from normalizers.activity_normalizer import ActivityNormalizer
normalizer = ActivityNormalizer()
standardized = []
for batch in collection_results:
for result in batch:
if result.get("has_data"):
activities = normalizer.normalize(result)
standardized.extend(activities)
logger.info(f"Standardizzate {len(standardized)} attività")
return standardized
@task(task_id="calculate_emissions")
def calculate_emissions(activities: list[dict]) -> list[dict]:
"""Calcola emissioni CO2e per ogni attività standardizzata"""
from calculators.emission_calculator import (
calculate_activity_based,
calculate_spend_based
)
from models.scope3_pipeline import CalculationMethod
results = []
for activity in activities:
if activity["method"] == "activity_based":
co2e_kg, uncertainty = calculate_activity_based(
activity["activity_type"],
activity["quantity"]
)
method = CalculationMethod.ACTIVITY_BASED
else:
co2e_kg, uncertainty = calculate_spend_based(
activity["spend_eur"],
activity["procurement_category"]
)
method = CalculationMethod.SPEND_BASED
results.append({
**activity,
"co2e_kg": co2e_kg,
"co2e_tonnes": co2e_kg / 1000,
"uncertainty_pct": uncertainty,
"calculation_method": method.value,
"calculated_at": datetime.utcnow().isoformat()
})
return results
# ============================================================
# FASE 3: DATA QUALITY CHECK
# ============================================================
@task(task_id="data_quality_check")
def data_quality_check(results: list[dict]) -> dict:
"""Verifica qualità dati e genera score per categoria"""
from quality.data_quality_scorer import DataQualityScorer
scorer = DataQualityScorer()
quality_report = scorer.score_results(results)
if quality_report["overall_score"] < SCOPE3_DAG_CONFIG["quality_threshold_pct"]:
logger.warning(
f"Quality score sotto soglia: {quality_report['overall_score']}%"
)
return quality_report
# ============================================================
# FASE 4: AUDIT TRAIL E PUBBLICAZIONE GOLD LAYER
# ============================================================
@task(task_id="create_audit_trail")
def create_audit_trail(
results: list[dict],
quality_report: dict
) -> str:
"""Crea audit trail immutabile con hash chain"""
from audit.hash_chain import HashChain
chain = HashChain()
chain_id = chain.create_chain(
calculation_results=results,
quality_report=quality_report,
pipeline_version="2.1.0",
methodology="GHG_Protocol_Scope3_2011",
reporting_standard="CSRD_ESRS_E1"
)
logger.info(f"Audit trail creato: {chain_id}")
return chain_id
@task(task_id="publish_gold_layer")
def publish_gold_layer(
results: list[dict],
audit_chain_id: str
) -> None:
"""Pubblica dati aggregati nel Gold layer (PostgreSQL)"""
hook = PostgresHook(postgres_conn_id="emissions_db")
for result in results:
hook.run(
"""
INSERT INTO scope3_emissions_gold (
supplier_id, scope3_category, co2e_tonnes,
calculation_method, uncertainty_pct,
quality_tier, audit_chain_id,
reporting_year, published_at
) VALUES (
%(supplier_id)s, %(scope3_category)s, %(co2e_tonnes)s,
%(calculation_method)s, %(uncertainty_pct)s,
%(quality_tier)s, %(audit_chain_id)s,
%(reporting_year)s, NOW()
)
ON CONFLICT (supplier_id, scope3_category, reporting_year)
DO UPDATE SET
co2e_tonnes = EXCLUDED.co2e_tonnes,
updated_at = NOW()
""",
parameters={
**result,
"audit_chain_id": audit_chain_id,
"reporting_year": SCOPE3_DAG_CONFIG["reporting_year"]
}
)
logger.info(f"Pubblicati {len(results)} record nel Gold layer")
# ============================================================
# WIRING DEL DAG
# ============================================================
collection_results = collect_supplier_data()
standardized = standardize_activities(collection_results)
emission_results = calculate_emissions(standardized)
quality = data_quality_check(emission_results)
chain_id = create_audit_trail(emission_results, quality)
publish_gold_layer(emission_results, chain_id)
Hodnocení kvality dat a šíření nejistoty
GHG Protocol Scope 3 Standard výslovně uznává, že kat. 1-15 nejsou nikdy neznámé s naprostou jistotou. Kvalitní reporting musí obsahovat odhad zkvantitativní nejistota spojené s každou kategorií. IPCC se formalizoval metodu šíření nejistoty ve svých pokynech pro správnou praxi.
# quality/data_quality_scorer.py
# Scoring qualità dati Scope 3
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import math
from datetime import datetime, timedelta
class QualityDimension(Enum):
COMPLETENESS = "completeness"
ACCURACY = "accuracy"
TIMELINESS = "timeliness"
VERIFICATION = "verification"
GRANULARITY = "granularity"
@dataclass
class QualityScore:
dimension: QualityDimension
score: float # 0-100
weight: float # peso nel calcolo aggregato
notes: str = ""
def score_supplier_data_quality(
supplier: dict,
reference_date: datetime = None
) -> dict[str, float]:
"""
Calcola score qualità multi-dimensionale per i dati di un supplier.
Basato su GHG Protocol Data Quality Guidance.
"""
if reference_date is None:
reference_date = datetime.utcnow()
scores = []
# 1. COMPLETENESS: quante delle categorie richieste sono presenti?
required_fields = [
"scope3_cat1_tco2e", "scope3_cat6_tco2e", "scope3_cat7_tco2e"
]
present = sum(1 for f in required_fields if supplier.get(f) is not None)
completeness_score = (present / len(required_fields)) * 100
scores.append(QualityScore(
dimension=QualityDimension.COMPLETENESS,
score=completeness_score,
weight=0.30
))
# 2. TIMELINESS: quanto sono recenti i dati?
data_year = supplier.get("reporting_year", 2020)
current_year = reference_date.year
age_years = current_year - data_year
if age_years <= 1:
timeliness_score = 100.0
elif age_years == 2:
timeliness_score = 75.0
elif age_years == 3:
timeliness_score = 50.0
else:
timeliness_score = 20.0
scores.append(QualityScore(
dimension=QualityDimension.TIMELINESS,
score=timeliness_score,
weight=0.20
))
# 3. VERIFICATION: i dati sono stati verificati da terze parti?
verification_level = supplier.get("verification", "none")
verification_score = {
"independent_assured": 100.0, # GHG verificato da auditor indipendente
"limited_assurance": 80.0, # Limited assurance
"internal_reviewed": 60.0, # Solo review interna
"supplier_declared": 40.0, # Auto-dichiarazione
"estimated": 20.0, # Stima spend-based
"none": 0.0
}.get(verification_level, 20.0)
scores.append(QualityScore(
dimension=QualityDimension.VERIFICATION,
score=verification_score,
weight=0.30
))
# 4. GRANULARITY: attività-specifico o aggregato?
data_type = supplier.get("data_type", "aggregated")
granularity_score = {
"site_specific": 100.0, # Dati per sito produttivo
"product_specific": 90.0, # PCF per prodotto/servizio
"supplier_specific": 70.0, # Dato totale supplier
"sector_average": 40.0, # Media settoriale
"aggregated": 20.0
}.get(data_type, 20.0)
scores.append(QualityScore(
dimension=QualityDimension.GRANULARITY,
score=granularity_score,
weight=0.20
))
# Calcolo score aggregato ponderato
overall = sum(s.score * s.weight for s in scores)
# Mappa score a Tier GHG Protocol
if overall >= 80:
tier = "TIER_1"
uncertainty_band_pct = 15.0
elif overall >= 50:
tier = "TIER_2"
uncertainty_band_pct = 40.0
else:
tier = "TIER_3"
uncertainty_band_pct = 75.0
return {
"overall_score": round(overall, 1),
"tier": tier,
"uncertainty_band_pct": uncertainty_band_pct,
"dimension_scores": {
s.dimension.value: round(s.score, 1)
for s in scores
}
}
def monte_carlo_uncertainty(
base_estimate_tco2e: float,
uncertainty_pct: float,
n_simulations: int = 10_000
) -> dict:
"""
Stima intervallo di confidenza con simulazione Monte Carlo.
Per reporting CSRD si raccomanda almeno 1.000 simulazioni.
"""
import random
# Distribuzione log-normale (emissioni non possono essere negative)
sigma = math.log(1 + (uncertainty_pct / 100) ** 2) ** 0.5
mu = math.log(base_estimate_tco2e) - sigma ** 2 / 2
simulated = [
math.exp(random.gauss(mu, sigma))
for _ in range(n_simulations)
]
simulated_sorted = sorted(simulated)
p05 = simulated_sorted[int(n_simulations * 0.05)]
p50 = simulated_sorted[int(n_simulations * 0.50)]
p95 = simulated_sorted[int(n_simulations * 0.95)]
return {
"base_estimate_tco2e": base_estimate_tco2e,
"p05_tco2e": round(p05, 2),
"p50_tco2e": round(p50, 2),
"p95_tco2e": round(p95, 2),
"confidence_interval_90pct": {
"lower": round(p05, 2),
"upper": round(p95, 2)
},
"coefficient_of_variation": round(
(p95 - p05) / (2 * p50) * 100, 1
)
}
Immutable Audit Trail s Hash Chain
La end-to-end sledovatelnost je to jeden z nejkritičtějších požadavků ověřitelné hlášení rozsahu 3. Externí auditoři musí být schopni zpětně dohledat každé číslo ve zprávě finální až po primární zdroj dat, procházející všemi transformačními kroky. A hash řetězce inspirováno technologií blockchain (ale bez složitosti distribuovaný) zaručuje neměnnost auditní stopy.
# audit/hash_chain.py
# Audit trail immutabile per emissioni Scope 3
import hashlib
import json
import uuid
from datetime import datetime
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class AuditRecord:
"""Singolo record nell'audit chain"""
def __init__(
self,
record_type: str,
payload: dict,
previous_hash: str,
chain_id: str,
sequence: int
):
self.record_id = str(uuid.uuid4())
self.record_type = record_type
self.payload = payload
self.previous_hash = previous_hash
self.chain_id = chain_id
self.sequence = sequence
self.created_at = datetime.utcnow().isoformat()
self.record_hash = self._compute_hash()
def _compute_hash(self) -> str:
"""SHA-256 hash di tutti i campi del record (eccetto il hash stesso)"""
data = {
"record_id": self.record_id,
"record_type": self.record_type,
"chain_id": self.chain_id,
"sequence": self.sequence,
"previous_hash": self.previous_hash,
"created_at": self.created_at,
"payload_hash": hashlib.sha256(
json.dumps(self.payload, sort_keys=True, default=str).encode()
).hexdigest()
}
return hashlib.sha256(
json.dumps(data, sort_keys=True).encode()
).hexdigest()
def to_dict(self) -> dict:
return {
"record_id": self.record_id,
"record_type": self.record_type,
"chain_id": self.chain_id,
"sequence": self.sequence,
"previous_hash": self.previous_hash,
"record_hash": self.record_hash,
"created_at": self.created_at,
"payload": self.payload
}
class HashChain:
"""
Hash chain per audit trail immutabile emissioni Scope 3.
Ogni record contiene l'hash del record precedente,
rendendo impossibile modificare un record senza invalidare
tutti i record successivi.
"""
GENESIS_HASH = "0" * 64 # Hash del primo record della chain
def __init__(self, db_client=None):
self.db = db_client
self.records: list[AuditRecord] = []
def create_chain(
self,
calculation_results: list[dict],
quality_report: dict,
pipeline_version: str,
methodology: str,
reporting_standard: str
) -> str:
"""Crea una nuova chain per un calcolo Scope 3 completo"""
chain_id = str(uuid.uuid4())
previous_hash = self.GENESIS_HASH
# Record 1: Metadati della pipeline
pipeline_record = AuditRecord(
record_type="PIPELINE_METADATA",
payload={
"version": pipeline_version,
"methodology": methodology,
"reporting_standard": reporting_standard,
"calculation_timestamp": datetime.utcnow().isoformat(),
"total_activities": len(calculation_results)
},
previous_hash=previous_hash,
chain_id=chain_id,
sequence=0
)
self.records.append(pipeline_record)
previous_hash = pipeline_record.record_hash
# Record 2: Quality report
quality_record = AuditRecord(
record_type="QUALITY_ASSESSMENT",
payload=quality_report,
previous_hash=previous_hash,
chain_id=chain_id,
sequence=1
)
self.records.append(quality_record)
previous_hash = quality_record.record_hash
# Record 3..N: Singoli risultati di emissione
for i, result in enumerate(calculation_results):
emission_record = AuditRecord(
record_type="EMISSION_CALCULATION",
payload={
"supplier_id": result.get("supplier_id"),
"scope3_category": result.get("scope3_category"),
"co2e_tonnes": result.get("co2e_tonnes"),
"calculation_method": result.get("calculation_method"),
"emission_factor_source": result.get("emission_factor_source"),
"uncertainty_pct": result.get("uncertainty_pct"),
"quality_tier": result.get("quality_tier")
},
previous_hash=previous_hash,
chain_id=chain_id,
sequence=2 + i
)
self.records.append(emission_record)
previous_hash = emission_record.record_hash
# Persist su DB (o storage immutabile)
if self.db:
self._persist_chain(chain_id)
logger.info(
f"Chain {chain_id} creata con {len(self.records)} record. "
f"Final hash: {previous_hash[:16]}..."
)
return chain_id
def verify_chain_integrity(self, chain_id: str) -> bool:
"""
Verifica che nessun record sia stato alterato.
Percorre la chain ricomputando ogni hash.
"""
records = self._load_chain(chain_id)
if not records:
return False
expected_previous = self.GENESIS_HASH
for record_dict in records:
# Ricomputa hash
record = AuditRecord(
record_type=record_dict["record_type"],
payload=record_dict["payload"],
previous_hash=record_dict["previous_hash"],
chain_id=record_dict["chain_id"],
sequence=record_dict["sequence"]
)
if record_dict["previous_hash"] != expected_previous:
logger.error(
f"Chain corrotta al record {record_dict['sequence']}: "
f"previous_hash non corrisponde"
)
return False
expected_previous = record.record_hash
return True
def _persist_chain(self, chain_id: str) -> None:
"""Salva tutti i record della chain nel DB"""
for record in self.records:
self.db.insert("scope3_audit_chain", record.to_dict())
def _load_chain(self, chain_id: str) -> list[dict]:
"""Carica i record della chain dal DB in ordine di sequenza"""
if not self.db:
return [r.to_dict() for r in self.records]
return self.db.query(
"SELECT * FROM scope3_audit_chain WHERE chain_id = %s ORDER BY sequence",
[chain_id]
)
Vizualizace: Sankey Diagram a kategorie Heatmap
Dobře sestavený kanál Scope 3 musí také produkovat vizualizace, které vykreslují data srozumitelné pro technické i netechnické zúčastněné strany. The Sankeyho diagram to je ideální nástroj pro zobrazení toků emisí v hodnotovém řetězci, zatímco a teplotní mapa umožňuje rychle identifikovat většinu kategorií materiálů a ty s nižší kvalitou dat.
# visualizations/scope3_charts.py
# Generazione Sankey diagram e heatmap Scope 3
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
from typing import Optional
def create_scope3_sankey(
emission_data: list[dict],
title: str = "Scope 3 Value Chain Emissions"
) -> go.Figure:
"""
Crea Sankey diagram per visualizzare flussi emissioni Scope 3.
Struttura: Supplier -> Categoria S3 -> Totale Scope 3
"""
# Raccoglie nodi unici
suppliers = list(set(d["supplier_id"] for d in emission_data))
categories = list(set(f"Cat. {d['scope3_category']}" for d in emission_data))
all_nodes = suppliers + categories + ["Scope 3 Total"]
node_index = {node: i for i, node in enumerate(all_nodes)}
# Costruisce link source->target->value
source_indices = []
target_indices = []
values = []
link_labels = []
for record in emission_data:
supplier = record["supplier_id"]
category = f"Cat. {record['scope3_category']}"
tco2e = record["co2e_tonnes"]
# Supplier -> Categoria
source_indices.append(node_index[supplier])
target_indices.append(node_index[category])
values.append(tco2e)
link_labels.append(f"{tco2e:.1f} tCO2e")
# Categoria -> Total
for cat in categories:
cat_total = sum(
d["co2e_tonnes"]
for d in emission_data
if f"Cat. {d['scope3_category']}" == cat
)
source_indices.append(node_index[cat])
target_indices.append(node_index["Scope 3 Total"])
values.append(cat_total)
link_labels.append(f"{cat_total:.1f} tCO2e")
# Colori nodi
node_colors = (
["#2196F3"] * len(suppliers) + # Blu per supplier
["#FF9800"] * len(categories) + # Arancione per categorie
["#4CAF50"] # Verde per totale
)
fig = go.Figure(go.Sankey(
arrangement="snap",
node=dict(
pad=20,
thickness=20,
line=dict(color="white", width=0.5),
label=all_nodes,
color=node_colors,
hovertemplate="{label}
tCO2e: {value:.1f}<extra></extra>"
),
link=dict(
source=source_indices,
target=target_indices,
value=values,
label=link_labels,
color="rgba(100,100,100,0.3)"
)
))
fig.update_layout(
title_text=title,
font_size=12,
height=600,
paper_bgcolor="rgba(0,0,0,0)",
plot_bgcolor="rgba(0,0,0,0)"
)
return fig
def create_category_heatmap(
emission_data: list[dict]
) -> go.Figure:
"""
Heatmap: asse X = categoria Scope 3, asse Y = qualità dato.
Colore = tCO2e. Aiuta a prioritizzare effort raccolta dati.
"""
df = pd.DataFrame(emission_data)
# Aggrega per categoria e tier qualità
pivot = df.pivot_table(
values="co2e_tonnes",
index="quality_tier",
columns="scope3_category",
aggfunc="sum",
fill_value=0
)
# Ordina tier (TIER_1 migliore in alto)
tier_order = ["TIER_1", "TIER_2", "TIER_3"]
pivot = pivot.reindex(
[t for t in tier_order if t in pivot.index]
)
fig = go.Figure(go.Heatmap(
z=pivot.values,
x=[f"Cat. {c}" for c in pivot.columns],
y=list(pivot.index),
colorscale="RdYlGn_r", # Rosso = alta emissione (critico)
text=pivot.values.round(1),
texttemplate="%{text} t",
textfont={"size": 11},
hovertemplate="Categoria: %{x}
Tier: %{y}
%{z:.1f} tCO2e<extra></extra>",
colorbar=dict(title="tCO2e")
))
fig.update_layout(
title="Heatmap Scope 3: Emissioni per Categoria e Qualità Dato",
xaxis_title="Categoria GHG Protocol",
yaxis_title="Tier Qualità Dato",
height=350,
margin=dict(l=80, r=20, t=60, b=60)
)
return fig
def generate_scope3_dashboard_html(
emission_data: list[dict],
output_path: str
) -> None:
"""Genera report HTML standalone con tutti i grafici"""
sankey = create_scope3_sankey(emission_data)
heatmap = create_category_heatmap(emission_data)
total_tco2e = sum(d["co2e_tonnes"] for d in emission_data)
by_category = {}
for d in emission_data:
cat = d["scope3_category"]
by_category[cat] = by_category.get(cat, 0) + d["co2e_tonnes"]
top_category = max(by_category, key=by_category.get)
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>Scope 3 Emissions Report</title>
<meta charset="utf-8">
</head>
<body>
<h1>Scope 3 Value Chain Emissions Report</h1>
<p>Totale: <strong>{total_tco2e:.1f} tCO2e</strong></p>
<p>Categoria più materiale: Cat. {top_category}
({by_category[top_category]:.1f} tCO2e)</p>
{sankey.to_html(full_html=False)}
{heatmap.to_html(full_html=False)}
</body>
</html>
"""
with open(output_path, "w") as f:
f.write(html_content)
Požadavky CSRD/ESRS E1 na rozsah vykazování 3
La Směrnice Corporate Sustainability Reporting Directive (CSRD) a související norma ESRS E1 (změna klimatu) převádějí hlášení podle rozsahu 3 z dobrovolného na dobrovolné povinné pro tisíce evropských společností. Harmonogram implementace je rozložený a již probíhá.
Je vyžadován rozsah 3 CSRD
| FY | Hlášení Půjdu dovnitř | Předměty | Poznámky |
|---|---|---|---|
| 2024 | Začátek roku 2025 | Velké PIE již podléhají NFRD (> 500 zaměstnanců) | První vlna: ~12 000 společností z EU |
| 2025 | Začátek roku 2026 | Všechny velké společnosti (>250 oddělení nebo >40 milionů EUR) | ~50 000 společností z EU |
| 2026 | Začátek roku 2027 | Kotované malé a střední podniky | Zjednodušený standard ESRS |
| 2028 | Začátek roku 2029 | Společnosti mimo EU s dceřinými společnostmi v EU | Významný globální dopad |
ESRS E1 konkrétně vyžaduje pro emise rozsahu 3:
- Zveřejnění všech kategorií materiálů: je třeba určit významnost prostřednictvím analýzy dvojí významnosti (dopad + finanční riziko). Z velké části z technologických společností je alespoň 4-6 kategorií materiálních.
- Rozdělení podle kategorií: hodnoty nelze vykázat jako jeden součet agregát; každá kategorie materiálů musí mít své vlastní údaje v tCO2e.
- Explicitní metodologie: pro každou kategorii musí být uvedena metoda výpočtu (na základě činností, výdajů, specifických pro dodavatele), zdroj emisních faktorů a úroveň kvalita dat.
- Žádná síť s uhlíkovými kredity: Musí být uvedeny hrubé emise odděleně od jakékoli zakoupené kompenzace nebo uhlíkové kompenzace.
- Povinné pojištění: zpočátku omezené ujištění s cílem v budoucnu přejít k přiměřené jistotě. Auditní stopa popsaná v tomto článku přesně to budou recenzenti požadovat.
- Cíl a plán přechodu: společnosti musí deklarovat cíle snížení v souladu o 1,5 °C (nejlépe ověřeno SBTi) s dílčími milníky.
Pozor: Rozsah 3 a Dvojitá významnost
ESRS E1 nevyžaduje hlášení všech 15 kategorií rozsahu 3, ale pouze těch identifikovaných jak materiálů v analýze dvojí významnosti. Nicméně, proces stanovení významnosti musí být zdokumentováno a kontrolovatelné. Vyloučit jednu kategorie „kvůli nedostatku údajů“ není přijatelné odůvodnění: musí být prokázáno že tato kategorie není pro konkrétní firmu významná.
Případová studie: Společnost SaaS s 50 dodavateli
Pojďme si vše převést na konkrétní příklad: italský SaaS střední velikosti se 150 zaměstnanci, tržby 15 mil. EUR, infrastruktura na AWS a 50 aktivních dodavatelů. Vedení rozhodlo zahajte výpočet rozsahu 3 před CSRD a získejte 3měsíční okno na dodání údaje ověřitelné vaším auditorem.
Profil společnosti: SaaS Italia S.r.l.
| Parametr | Hodnota |
|---|---|
| Zaměstnanci | 150 (70 % chytré práce) |
| Místa | Velitelství Milána + kancelář v Římě |
| Infrastruktura | AWS eu-west-1 (primární), GCP europe-west1 (záložní) |
| Aktivní dodavatelé | 50 (8 velkých, 42 malých/středních) |
| Výdaj na pořízení | ~4,2 mil. EUR/rok |
| Roční lety | ~380 letů (konference + zákazníci) |
Fáze 1 – Analýza významnosti (1.–2. týden): Tým ESG má provedla rychlou analýzu k identifikaci kategorií materiálů rozsahu 3. Použití dat z výdaje z ERP (SAP) jako počáteční proxy s faktory EEIO, získali tento odhad "screening":
Rozsah 3 Prověřování závažnosti — SaaS Italia S.r.l.
| Kočka. | Popis | Odhad na základě útraty (tCO2e) | % z celkem | Rozhodnutí |
|---|---|---|---|---|
| 1 | Nakoupené zboží a služby (cloud, SW) | 342 | 54 % | MATERIÁL → Podle aktivity |
| 6 | Služební cesty | 98 | 15 % | MATERIÁL → Podle aktivity |
| 7 | Dojíždění zaměstnanců | 87 | 14 % | MATERIÁL → Průzkumy zaměstnanců |
| 11 | Použití prodávaných produktů | 76 | 12 % | MATERIÁL → Měření LYŽE |
| 2 | Kapitálové zboží (notebooky, hardware) | 28 | 4% | MATERIÁL → Dodavatel PCF |
| Ostatní | Kočka. 3, 5, 8, 15 | 6 | 1% | Nehmotné → Na základě útraty |
Fáze 2 – Sběr dat (2.–8. týden):
- Kočka. 1 (Cloud): AWS Customer Carbon Footprint Tool a GCP Carbon Footprint poskytují údaje o měsíčních vydáních na účet. Data extrahovaná přes API a načtená do Bronze vrstvy. Kvalita: TIER 1 (specifická pro dodavatele, ověřeno AWS).
- Kočka. 1 (Software a služby): Kontaktováno 8 velkých dodavatelů (>50 000 EUR/rok). se strukturovaným dotazníkem. 5 odpovědělo primárními daty (včetně Microsoft ERP, Slack, Salesforce). 3 nemají žádná data → na základě útraty s EEIO.
- Kočka. 6 (služební cesty): údaje získané z cestovní kanceláře (Carlson Wagonlit) přes API: 380 letů s trasou a třídou. Výpočet na základě aktivity s DEFRA 2024.
- Kočka. 7 (Dojíždění): anonymní průzkum všem 150 zaměstnancům (82% míra odpovědí). Dopravní prostředek, průměrná vzdálenost, dny v týdnu v kanceláři.
- Kočka. 11 (Použití prodaných produktů): Vypočteno SCI (Software Carbon Intensity). s CodeCarbon na výrobní infrastruktuře. Vynásobeno počtem aktivních relací/měsíc.
# case_study/saas_italia_scope3.py
# Calcolo completo Scope 3 per SaaS Italia S.r.l.
from calculators.emission_calculator import (
calculate_activity_based,
calculate_spend_based,
calculate_category_total
)
def calculate_cat1_cloud() -> dict:
"""Cat. 1: Emissioni cloud AWS + GCP (dati primari vendor)"""
# Dati estratti dall'AWS Customer Carbon Footprint API
aws_kwh_year = 187_500 # kWh totali 2024
gcp_kwh_year = 12_300
aws_ef = 0.233 # kg CO2e/kWh IT grid (AWS eu-west-1)
gcp_ef = 0.198 # kg CO2e/kWh GCP europe-west1
aws_co2, aws_unc = calculate_activity_based("cloud_compute_kwh", aws_kwh_year, aws_ef)
gcp_co2, gcp_unc = calculate_activity_based("cloud_compute_kwh", gcp_kwh_year, gcp_ef)
activities = [
{"value_kg": aws_co2, "uncertainty_pct": 8.0, "quality_tier": "TIER_1"},
{"value_kg": gcp_co2, "uncertainty_pct": 10.0, "quality_tier": "TIER_1"},
]
result = calculate_category_total(activities)
result["category"] = 1
result["sub_category"] = "cloud_infrastructure"
return result
def calculate_cat6_business_travel() -> dict:
"""Cat. 6: Business travel (dati agenzia viaggi)"""
# 380 voli totali anno 2024
# 60% corto raggio (<1500km), 40% lungo raggio
short_haul_pkm = 380 * 0.6 * 850 # 850km avg corto raggio
long_haul_pkm = 380 * 0.4 * 3200 # 3200km avg lungo raggio
short_co2, short_unc = calculate_activity_based(
"flight_economy_short", short_haul_pkm
)
long_co2, long_unc = calculate_activity_based(
"flight_economy_long", long_haul_pkm
)
# Radiative forcing factor x1.9 per quota alta
rf_factor = 1.9
short_co2 *= rf_factor
long_co2 *= rf_factor
activities = [
{"value_kg": short_co2, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"},
{"value_kg": long_co2, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"},
]
result = calculate_category_total(activities)
result["category"] = 6
return result
def calculate_cat7_commuting() -> dict:
"""Cat. 7: Employee commuting (survey 123/150 dipendenti)"""
# Risultati survey (valori medi per dipendente/anno)
commuters = {
"car_solo": {"count": 38, "km_day": 28, "days_year": 120},
"car_shared": {"count": 12, "km_day": 22, "days_year": 110},
"public_transport": {"count": 52, "km_day": 35, "days_year": 140},
"cycling_walking": {"count": 21, "km_day": 4, "days_year": 150},
}
activities = []
# Auto privata
car_pkm = (
commuters["car_solo"]["count"] *
commuters["car_solo"]["km_day"] *
commuters["car_solo"]["days_year"]
)
co2_car, unc = calculate_activity_based("car_average", car_pkm)
activities.append({"value_kg": co2_car, "uncertainty_pct": 15.0, "quality_tier": "TIER_2"})
# Trasporto pubblico
pt_pkm = (
commuters["public_transport"]["count"] *
commuters["public_transport"]["km_day"] *
commuters["public_transport"]["days_year"]
)
co2_pt, unc = calculate_activity_based("public_transport_it", pt_pkm)
activities.append({"value_kg": co2_pt, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"})
# Ciclismo/piedi: zero emissioni dirette
activities.append({"value_kg": 0.0, "uncertainty_pct": 0.0, "quality_tier": "TIER_1"})
result = calculate_category_total(activities)
result["category"] = 7
result["survey_response_rate"] = 82.0
return result
def calculate_cat11_use_of_products() -> dict:
"""Cat. 11: Energia consumata dai clienti usando il SaaS"""
# SCI = 0.045 gCO2e per ogni API call (misurato con CodeCarbon)
sci_gco2e_per_call = 0.045
avg_calls_per_month = 48_500_000 # 48.5M calls/mese (dati produzione)
months = 12
total_calls = avg_calls_per_month * months
co2e_grams = total_calls * sci_gco2e_per_call
co2e_kg = co2e_grams / 1000
activities = [
{"value_kg": co2e_kg, "uncertainty_pct": 25.0, "quality_tier": "TIER_2"}
]
result = calculate_category_total(activities)
result["category"] = 11
result["metric"] = "API calls"
result["total_calls"] = total_calls
return result
def run_full_scope3_calculation() -> dict:
"""Esegue il calcolo completo Scope 3 per SaaS Italia S.r.l."""
results = {
"cat_1_cloud": calculate_cat1_cloud(),
"cat_6_travel": calculate_cat6_business_travel(),
"cat_7_commuting": calculate_cat7_commuting(),
"cat_11_use": calculate_cat11_use_of_products(),
}
# Categoria residuale (spend-based per tutto il resto)
residual_spend_eur = 210_000 # ~5% della spesa totale
residual_co2_kg, res_unc = calculate_spend_based(
residual_spend_eur, "it_services"
)
results["cat_residual"] = {
"total_co2e_tonnes": residual_co2_kg / 1000,
"uncertainty_pct": res_unc,
"category": "other",
"dominant_quality_tier": "TIER_3"
}
# Totale Scope 3
total_tco2e = sum(
v["total_co2e_tonnes"] for v in results.values()
)
from calculators.emission_calculator import propagate_uncertainty
all_values = [v["total_co2e_tonnes"] for v in results.values()]
all_uncertainties = [v["uncertainty_pct"] for v in results.values()]
overall_uncertainty = propagate_uncertainty(all_values, all_uncertainties)
return {
"company": "SaaS Italia S.r.l.",
"reporting_year": 2024,
"methodology": "GHG Protocol Corporate Value Chain Standard",
"scope3_total_tco2e": round(total_tco2e, 1),
"overall_uncertainty_pct": round(overall_uncertainty, 1),
"categories": results,
"notes": "Cat. 11 include radiative forcing factor per aviation"
}
if __name__ == "__main__":
import json
report = run_full_scope3_calculation()
print("=" * 50)
print(f"SCOPE 3 TOTALE: {report['scope3_total_tco2e']} tCO2e")
print(f"Incertezza: +/- {report['overall_uncertainty_pct']}%")
print("=" * 50)
for name, cat in report["categories"].items():
tco2e = cat.get("total_co2e_tonnes", 0)
unc = cat.get("uncertainty_pct", 0)
pct = tco2e / report["scope3_total_tco2e"] * 100
print(f" {name:25s} {tco2e:6.1f} tCO2e ({pct:.0f}%) ±{unc:.0f}%")
Výsledek výpočtu pro SaaS Italia S.r.l. produkuje:
Rozsah 3 Konečné výsledky — SaaS Italia S.r.l. (FY 2024)
| Kategorie | tCO2e | % celkem | Nejistota | Úrovně |
|---|---|---|---|---|
| Kočka. 1 – Cloud a služby | 43,7 | 36 % | ±9 % | STUPEŇ 1 |
| Kočka. 6 – Služební cesty | 33.5 | 28 % | ±20 % | STUPEŇ 2 |
| Kočka. 7 – Dojíždění | 23.8 | 20 % | ±17 % | STUPEŇ 2 |
| Kočka. 11 – Používání produktů | 26.2 | 22 % | ±25 % | STUPEŇ 2 |
| Zbytková Kat | 6.5 | 5% | ±75 % | STUPEŇ 3 |
| CELKOVÝ ROZSAH 3 | 133,7 | 100 % | ±13 % | STUPEŇ 2 |
Přidání rozsahu 1 (~8 tCO2e z kotle HQ) a rozsahu 2 (~12 tCO2e z elektřiny úřadů), získáte a celková stopa ~154 t CO2e za rok 2024, z toho 87 % je rozsah 3. Přesně typický vzor softwarových společností.
Osvědčené postupy a anti-vzorce v potrubí Scope 3
Rozsah 3 Kontrolní seznam pro implementaci potrubí
| Plocha | Nejlepší postupy | Anti-vzory, kterým je třeba se vyhnout |
|---|---|---|
| Data | Bronzová vrstva neměnná pro všechna přijatá data | Přepište nezpracovaná data opravenými verzemi |
| Výpočet | Verze použitých emisních faktorů | Použijte EF bez uvedení roku a zdroje |
| Nejistota | Vždy propagujte nejistotu napříč každou kategorií | Hlásit pouze přesnou hodnotu bez rozsahu |
| Kvalitní | Explicitní a zdokumentované skóre kvality | Smíchejte TIER 1 a TIER 3 bez rozdílu |
| Audity | Hash chain pro každý výpočet, ověřitelný off-chain | Zpráva Excel nemá verzi a nelze ji sledovat |
| Dodavatel | Upřednostněte 20 nejlepších dodavatelů podle výdajů/emisí | Zacházejte se všemi 50 dodavateli stejně |
| Aktualizovat | Vydán roční plán zlepšování kvality | Přijměte řešení založené na výdajích jako trvalé řešení |
| Košťata | Explicitně zdokumentujte odůvodněné výjimky | Vyloučit kategorie bez formálního odůvodnění |
Plán progresivního zlepšování (plán zralosti dat)
Protokol o skleníkových plynech výslovně podporuje progresivní přístup: je lepší mít dnes nekvalitní data založená na výdajích, která nemají nic. Cílem je zlepšit se každý rok úroveň kvality kategorií materiálů:
- Rok 1 (základ): 100% založené na útratě, chyba ±75 %, TIER 3
- Rok 2: 10 nejlepších dodavatelů podle aktivity, ±40 %, TIER 2
- Rok 3: 20 nejlepších dodavatelů s ověřenými primárními daty, ±20 %, TIER 2
- Rok 4+: Integrace EcoVadis/CDP pro všechny dodavatele, ±10 %, TIER 1
Toto progresivní zlepšení lze dokumentovat ve zprávě CSRD as "metodology evolution" a je recenzenty hodnocena kladně.
Schéma databáze pro zlatou vrstvu
Zlatá vrstva vyžaduje schéma navržené pro podporu rychlých agregačních dotazů pro podávání zpráv o CSRD, zachování sledovatelnosti v řetězci auditů.
-- schema/scope3_gold.sql
-- Schema PostgreSQL per il Gold Layer Scope 3
-- Tabella principale: emissioni aggregate per categoria
CREATE TABLE scope3_emissions_gold (
id BIGSERIAL PRIMARY KEY,
company_id VARCHAR(50) NOT NULL,
reporting_year INTEGER NOT NULL,
scope3_category INTEGER NOT NULL CHECK (scope3_category BETWEEN 1 AND 15),
supplier_id VARCHAR(100),
-- Valori emissioni
co2e_tonnes DECIMAL(12, 3) NOT NULL,
co2_tonnes DECIMAL(12, 3),
ch4_tonnes_co2e DECIMAL(12, 3),
n2o_tonnes_co2e DECIMAL(12, 3),
-- Metodologia e qualità
calculation_method VARCHAR(30) NOT NULL, -- activity_based, spend_based, etc.
emission_factor_source VARCHAR(100) NOT NULL,
emission_factor_value DECIMAL(10, 6),
quality_tier VARCHAR(10) NOT NULL, -- TIER_1, TIER_2, TIER_3
uncertainty_pct DECIMAL(5, 1) NOT NULL,
uncertainty_tonnes DECIMAL(12, 3) GENERATED ALWAYS AS
(co2e_tonnes * uncertainty_pct / 100) STORED,
-- Tracciabilità
audit_chain_id UUID NOT NULL REFERENCES scope3_audit_chain(chain_id),
pipeline_version VARCHAR(20) NOT NULL,
reporting_standard VARCHAR(50) DEFAULT 'GHG_Protocol_Scope3_2011',
-- Timestamps
published_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Unicità per reporting period
CONSTRAINT uq_emission_period
UNIQUE (company_id, reporting_year, scope3_category, supplier_id)
);
-- Indici per performance query CSRD report
CREATE INDEX idx_scope3_company_year
ON scope3_emissions_gold (company_id, reporting_year);
CREATE INDEX idx_scope3_category
ON scope3_emissions_gold (scope3_category);
CREATE INDEX idx_scope3_quality
ON scope3_emissions_gold (quality_tier, uncertainty_pct);
-- View per report aggregato CSRD
CREATE VIEW v_scope3_csrd_report AS
SELECT
company_id,
reporting_year,
scope3_category,
SUM(co2e_tonnes) AS total_co2e_tonnes,
-- Propagazione incertezza quadratica
SQRT(SUM(POWER(co2e_tonnes * uncertainty_pct / 100, 2))) /
NULLIF(SUM(co2e_tonnes), 0) * 100 AS combined_uncertainty_pct,
-- Qualità aggregata (tier peggiore nella categoria)
MIN(quality_tier) AS data_quality_tier,
-- Metodo più usato
MODE() WITHIN GROUP (ORDER BY calculation_method) AS primary_method,
COUNT(DISTINCT supplier_id) AS supplier_count,
MAX(updated_at) AS last_updated
FROM scope3_emissions_gold
GROUP BY company_id, reporting_year, scope3_category
ORDER BY company_id, reporting_year, scope3_category;
-- Tabella audit chain
CREATE TABLE scope3_audit_chain (
chain_id UUID PRIMARY KEY,
record_id UUID NOT NULL DEFAULT gen_random_uuid(),
sequence INTEGER NOT NULL,
record_type VARCHAR(50) NOT NULL,
previous_hash CHAR(64) NOT NULL,
record_hash CHAR(64) NOT NULL UNIQUE,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT uq_chain_sequence UNIQUE (chain_id, sequence)
);
-- Indice per verifica integrità chain
CREATE INDEX idx_audit_chain_id_seq
ON scope3_audit_chain (chain_id, sequence);
Závěry a další kroky
Vybudování robustního potrubí pro emise z rozsahu 3 není akademické cvičení: je to akritická datová infrastruktura který se stane povinným pro tisíce evropských společností do roku 2028. Klíčové principy jsme viděli v tomto článku platí bez ohledu na velikost společnosti:
- Neměnnost hrubých dat: Bronzová vrstva se zárukami hash SHA-256 že každý recenzent může vždy vysledovat data zpět k původnímu zdroji, a to i po letech.
- Metodologická progresivita: Začněte s útratou a přejděte k Protokol o skleníkových plynech doporučuje přístup založený na aktivitě pro kategorie materiálů sám, nikoli zkratka.
- Kvantifikace nejistoty: Hlásit emise bez intervalu důvěra je neúplná informace. Kvadratické šíření nejistoty snadno se implementuje a je zásadní pro důvěryhodnost zprávy.
- Ověřitelná auditní stopa: Řetězec hash umožňuje externí ověřovatel matematicky potvrdit, že žádná data nebyla po výpočtu změněna.
- Ekosystémová integrace: platformy jako EcoVadis Carbon Data Network a CDP výrazně snižuje zátěž při sběru dat, zejména u velkých dodavatelských řetězců.
Případová studie SaaS Italia S.r.l. ukazuje, že to dokáže i středně velká firma vypracovat zprávu v souladu s rozsahem 3 CSRD za 3 měsíce s týmem 2–3 lidí, primární data pro kategorie materiálu a založená na výdajích pro zbytek. Klíčem je prioritizace: nehledejte dokonalost všude, ale soustřeďte své úsilí kde jsou emise nejvyšší.
Užitečné zdroje
- GHG Protocol Rozsah 3 Standard: ghgprotocol.org/corporate-value-chain-scope-3-standard
- Climatiq API (databáze emisních faktorů): climatiq.io
- Uhlíková datová síť EcoVadis: ecovadis.com/solutions/carbon
- ESRS E1 Změna klimatu (oficiální text EU): EFRAG ESRS E1
- EXIOBÁZE 3.8 (faktory založené na výdajích EEIO): exiobase.eu
Další článek v seriálu
V dalším článku ESG Reporting API: Integrace s CSRD Workflow vytvoříme vrstvu REST API na vrcholu dat rozsahu 3 vypočítaných v tomto článku, implementujeme koncové body vyhovující formátům požadovaným evropskou směrnicí a integrující pracovní postup schválení zprávy s digitálním podpisem auditora.
Uvidíme také, jak vystavit data ve formátu XBRL/iXBRL pro podání na ESEF (evropský jednotný elektronický formát), povinný formát zpráv CSRD kotované na evropské burze cenných papírů.







