Rurociąg emisji Zakres 3 Łańcuch wartości: od surowych danych do ścieżki audytu
Il 70-90% całkowitej emisji firmy technologicznej lub oprogramowania ukrywa się poza własną firmą granice: w serwerach chmurowych zakupionych jako usługa, w laptopach pracowników, w lotach biznesowych, w kodzie które klienci uruchamiają na swoich urządzeniach. To są emisje Zakres 3i w większości część organizacji cyfrowych stanowi najbardziej złożony problem pomiarowy, jaki istnieje Obszar ESG.
W przeciwieństwie do Zakresu 1 (spalamy paliwo) i Zakresu 2 (kupujemy energię elektryczną), Zakres 3 wymaga gromadzenia danych od setek dostawców, stosowania heterogenicznych wskaźników emisji, zarządzać bardzo wysokim stopniem niepewności i tworzyć ścieżkę audytu możliwą do sprawdzenia przez audytorów zewnętrznych. Z CSRD/ESRS E1 co sprawia, że raportowanie w zakresie 3 jest obowiązkowe dla dużych przedsiębiorstw, począwszy od 2025–2026, a w kontekście MŚP do roku 2028 kwestia ta nie jest już akademicka: inżynieria.
W tym artykule budujemy taki kompletny rurociąg do obliczenia emisji z zakresu 3 łańcuch wartości: od architektury ETL do gromadzenia danych o dostawcach, po integrację z platformami takimi jak CDP i EcoVadis, od obliczeń opartych na aktywności i wydatkach, aż po Airflow DAG do automatyzacji i do niezmiennej ścieżki audytu dla weryfikatorów. Każda sekcja zawiera działający kod Pythona i najlepsze praktyki przetestowane pod kątem działania w rzeczywistych kontekstach.
Czego się nauczysz
- 15 kategorii protokołu GHG z zakresu 3 i które z nich są istotne dla firm zajmujących się oprogramowaniem/SaaS
- Architektura ETL/ELT do gromadzenia danych o dostawcach: kwestionariusze, CDP, EcoVadis i bezpośrednie API
- Oparte na działaniach a na wydatkach: formuły, dokładność i kiedy zastosować które podejście
- Potok Pythona z Apache Airflow: DAG do zautomatyzowanych i skalowalnych obliczeń w zakresie 3
- Punktacja jakości danych i propagacja niepewności statystycznej w szacunkach
- Niezmienna ścieżka audytu z łańcuchem skrótów SHA-256 dla identyfikowalności zewnętrznego weryfikatora
- Wizualizacja diagramu Sankeya dla kategorii priorytetów łańcucha wartości i mapy cieplnej
- Wymagania CSRD/ESRS E1: co musisz ujawnić i z jaką szczegółowością
- Kompletne studium przypadku: firma SaaS z 50 dostawcami, kompleksowe obliczenia w zakresie 3
- Integracja z EcoVadis Carbon Data Network i Climatiq API dla wskaźników emisji
Seria Green Software — 10 artykułów
| # | Przedmiot | Temat |
|---|---|---|
| 1 | Zasady Fundacji Zielonego Oprogramowania | Efektywność węglowa, GSF, SCI |
| 2 | CodeCarbon: Pomiar kodu | Pomiar, dashboard, optymalizacja |
| 3 | Climatiq API: Obliczenia emisji dwutlenku węgla | REST API, protokół GHG, zakres 1-3 |
| 4 | Pakiet SDK uwzględniający emisję dwutlenku węgla | Zmiana czasu, zmiana lokalizacji |
| 5 | Zakres 1-2-3: Modelowanie danych ESG | Struktura danych, obliczenia, agregacja |
| 6 | GreenOps: Kubernetes świadomy emisji dwutlenku węgla | Planowanie, skalowanie, monitorowanie |
| 7 | Rurociąg emisji Zakres 3 Łańcuch wartości | Ten artykuł |
| 8 | API raportowania ESG: CSRD | API, przepływ pracy, zgodność |
| 9 | Zrównoważone wzorce architektoniczne | Przechowywanie, buforowanie, wsadowanie |
| 10 | AI i węgiel: szkolenie ML | Szkolenia ML, optymalizacja, Green AI |
15 kategorii protokołu dotyczącego gazów cieplarnianych, zakres 3
Il Standard korporacyjnego łańcucha wartości protokołu GHG (zakres 3). to ramy międzynarodowe odniesienie opublikowane w 2011 r. i obecnie w trakcie rewizji, a aktualizacje spodziewane są w 2026 r. To dzieli pośrednie emisje łańcucha wartości w 15 różnych kategorii, zorganizowane w dwóch makrogrupy: pod prąd (czynności przed produkcją/świadczeniem usług) e w dół rzeki (działania po sprzedaży klientowi).
15 Kategorie Zakresu 3: Wydobycie i Downstream
| Kot. | Nazwa | Przepływ | Znaczenie SaaS/Tech |
|---|---|---|---|
| 1 | Zakupione towary i usługi | Pod prąd | WYSOKI: sprzęt serwerowy, licencje na oprogramowanie, usługi doradcze |
| 2 | Dobra kapitałowe | Pod prąd | MEDIA: sprzęt data center, laptopy, telefony firmowe |
| 3 | Działalność związana z paliwami i energią | Pod prąd | ŚREDNIA: emisje z zakupionej produkcji energii (wydobycie, zakres 2) |
| 4 | Transport i dystrybucja wyższego szczebla | Pod prąd | NISKI: dostawy sprzętu do biur i centrów danych |
| 5 | Odpady powstające w trakcie działalności | Pod prąd | NISKA: WEEE, papier, odpady biurowe |
| 6 | Podróże służbowe | Pod prąd | WYSOKI: loty, hotele, pociągi dla rozproszonych zespołów |
| 7 | Dojazdy pracowników | Pod prąd | WYSOKI: podróże z domu do biura, szczególnie w przypadku zespołów hybrydowych |
| 8 | Aktywa leasingowane na rynku wyższego szczebla | Pod prąd | MEDIA: biura do wynajęcia (jeżeli nie ujęte w Zakresie 1/2) |
| 9 | Dalszy transport i dystrybucja | W dół rzeki | NISKI: dystrybucja oprogramowania na nośnikach fizycznych (rzadko) |
| 10 | Obróbka sprzedanych produktów | W dół rzeki | Nie dotyczy: Nie dotyczy czystego oprogramowania |
| 11 | Wykorzystanie sprzedanych produktów | W dół rzeki | BARDZO WYSOKI: Energia zużywana przez klientów korzystających z SaaS |
| 12 | Obróbka po zakończeniu życia sprzedanych produktów | W dół rzeki | NISKI: koniec życia urządzeń użytkownika |
| 13 | Aktywa leasingowane na niższym szczeblu łańcucha dostaw | W dół rzeki | MEDIA: Sprzęt wynajmowany klientom |
| 14 | Franczyzy | W dół rzeki | Nie dotyczy: Nie dotyczy |
| 15 | Inwestycje | W dół rzeki | WYSOKI: portfel korporacyjny, inwestycje kapitałowe w start-upy |
W przypadku firmy zajmującej się usługami SaaS lub oprogramowaniem najbardziej odpowiednie kategorie to zazwyczaj: Kot. 1 (zakupione towary i usługi, często największy przedmiot), Kot. 6 (podróże służbowe), Kot. 7 (dojazdy pracowników) e Kot. 11 (wykorzystanie sprzedanych produktów). Tam podwójna materialność prośba z CSRD wymaga określenia, które kategorie są istotne zarówno z punktu widzenia wpływu ryzyko środowiskowe i finansowe dla firmy.
Częsty błąd: pomijanie kota. 11 dla SaaS
Wiele producentów oprogramowania wyklucza kategorię 11 („Wykorzystanie sprzedanych produktów”), zakładając, że nie ma ona zastosowania. W rzeczywistości każde wywołanie API, każde zapytanie, każdy wat zużywany przez klientów na uruchomienie Twojego oprogramowanie to Scope 3 Cat. 11 emisji, za które odpowiadasz. Dla SaaS z milionami użytkowników, może to być kategoria dominująca. Metoda obliczeniowa wykorzystuje oprogramowanie do pomiaru intensywności emisji dwutlenku węgla (SCI). pomnożona przez dostarczone jednostki funkcjonalne.
Architektura potoku gromadzenia danych
Gromadzenie wiarygodnych danych z całego łańcucha wartości to wąskie gardło numer jeden dla każdej firmy Projekt z zakresu 3. Rurociąg musi obsługiwać heterogeniczne źródła: ręczne kwestionariusze, zewnętrzne platformy ESG części, bezpośrednie API z dostawcami, pliki CSV wysyłane e-mailem, wewnętrzne dane ERP. Następująca architektura przyjąć wzór Trójwarstwowy ETL (Brązowy/Srebrny/Złoty) inspirowany domem nad jeziorem.
Architektura rurociągów Zakres 3: Brąz / Srebro / Złoto
| Warstwy | Treść | Technologia | Zakres |
|---|---|---|---|
| Brąz (surowy) | Niezmienne, surowe dane od dostawców | S3/GCS, Jezioro Delta | Ścieżka audytu, powtórka, źródło prawdy |
| Srebrny (standaryzowany) | Dane znormalizowane według jednostek i walut | dbt, Spark, Pandy | Obliczanie emisji, połączenie ze współczynnikami emisji |
| Złoto (raportowanie) | Łączne emisje według kategorii gazów cieplarnianych | PostgreSQL-a, BigQuery | Dashboardy, raporty CSRD, weryfikatory |
Warstwa brązowa jest niezbędna: wszystkie otrzymane dane są zapisywane tak jak jest ze znacznikiem czasu spożycia, skrót SHA-256 treści i metadanych źródłowych. To gwarantuje taką możliwość do ponownego przetworzenia całego rurociągu w przypadku zmiany współczynników emisji lub metodologii, bez strat oryginalne dane.
# models/scope3_pipeline.py
# Struttura dati per la pipeline Scope 3
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
import hashlib
import json
class DataSource(Enum):
SUPPLIER_QUESTIONNAIRE = "supplier_questionnaire"
CDP_API = "cdp_api"
ECOVADIS_API = "ecovadis_api"
ERP_EXPORT = "erp_export"
MANUAL_UPLOAD = "manual_upload"
CLIMATIQ_API = "climatiq_api"
class CalculationMethod(Enum):
ACTIVITY_BASED = "activity_based"
SPEND_BASED = "spend_based"
HYBRID = "hybrid"
SUPPLIER_SPECIFIC = "supplier_specific"
class DataQualityTier(Enum):
TIER_1 = "primary_data" # Dati primari dal supplier
TIER_2 = "secondary_sector" # Fattori settoriali
TIER_3 = "spend_estimated" # Stima basata su spesa
@dataclass
class RawSupplierData:
"""Layer Bronze: dato grezzo immutabile"""
supplier_id: str
source: DataSource
raw_payload: dict
received_at: datetime
content_hash: str = field(init=False)
def __post_init__(self):
payload_str = json.dumps(self.raw_payload, sort_keys=True)
self.content_hash = hashlib.sha256(
payload_str.encode()
).hexdigest()
@dataclass
class StandardizedActivity:
"""Layer Silver: attività normalizzata"""
activity_id: str
supplier_id: str
scope3_category: int # 1-15
activity_type: str # es. "freight_transport"
quantity: float
unit: str # es. "tonne.km"
reporting_period_start: datetime
reporting_period_end: datetime
source: DataSource
quality_tier: DataQualityTier
emission_factor_id: Optional[str] = None
uncertainty_pct: float = 0.0
raw_data_hash: str = "" # Ref al Bronze layer
@dataclass
class EmissionResult:
"""Layer Gold: emissione calcolata"""
result_id: str
activity_id: str
scope3_category: int
co2e_tonnes: float
calculation_method: CalculationMethod
emission_factor_source: str # es. "climatiq:IPCC_2021"
emission_factor_value: float
quality_tier: DataQualityTier
uncertainty_pct: float
calculated_at: datetime
pipeline_version: str
audit_hash: str = field(init=False)
def __post_init__(self):
audit_data = {
"result_id": self.result_id,
"activity_id": self.activity_id,
"co2e_tonnes": self.co2e_tonnes,
"emission_factor_source": self.emission_factor_source,
"calculated_at": self.calculated_at.isoformat(),
"pipeline_version": self.pipeline_version,
}
self.audit_hash = hashlib.sha256(
json.dumps(audit_data, sort_keys=True).encode()
).hexdigest()
Integracja danych dostawców: CDP, EcoVadis i Direct API
Gromadzenie danych od dostawców odbywa się wieloma kanałami o różnych poziomach jakości i zupełnie inna automatyzacja. The Projekt ujawniania informacji o emisji dwutlenku węgla (CDP) zbiera dane z ponad 24 000 firm i udostępnia interfejs API umożliwiający dostęp do zweryfikowanych raportów. EcoVadis uruchomiła w 2025 r. sieć danych dotyczących emisji gazów cieplarnianych, w ramach której ponad 48 000 reporterów dotyczących gazów cieplarnianych udostępnia dane w sposób standaryzowane. Wreszcie wielu dużych dostawców udostępnia zastrzeżone interfejsy API do bezpośredniego udostępniania swoich śladów.
# collectors/supplier_collector.py
# Integrazione con fonti dati supplier
import httpx
import asyncio
from typing import AsyncGenerator
from datetime import datetime
from models.scope3_pipeline import RawSupplierData, DataSource
class ClimatiqEmissionFactors:
"""Client per Climatiq API - emission factors database"""
BASE_URL = "https://beta3.api.climatiq.io"
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
async def get_emission_factor(
self,
activity_id: str,
year: int = 2024,
region: str = "IT"
) -> dict:
"""Recupera fattore di emissione per attività specifica"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/estimate",
headers=self.headers,
json={
"emission_factor": {
"activity_id": activity_id,
"data_version": "^21",
"year": year,
"region": region
},
"parameters": {
"money": 1.0,
"money_unit": "eur"
}
}
)
response.raise_for_status()
return response.json()
async def batch_estimate(
self,
activities: list[dict]
) -> list[dict]:
"""Stima batch per multiple attività - ottimizza le API call"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/batch",
headers=self.headers,
json={"batch": activities},
timeout=30.0
)
response.raise_for_status()
return response.json().get("results", [])
class EcoVadisCollector:
"""Raccoglie dati Scope 3 dalla piattaforma EcoVadis"""
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
async def fetch_supplier_carbon_data(
self,
supplier_ecovadis_id: str,
reporting_year: int
) -> RawSupplierData:
"""Recupera dati carbonio per un supplier dalla Carbon Data Network"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/v1/suppliers/{supplier_ecovadis_id}/carbon",
headers={"X-API-Key": self.api_key},
params={"year": reporting_year},
timeout=15.0
)
if response.status_code == 404:
# Supplier non ha condiviso dati primari
return self._create_no_data_record(supplier_ecovadis_id)
response.raise_for_status()
payload = response.json()
return RawSupplierData(
supplier_id=supplier_ecovadis_id,
source=DataSource.ECOVADIS_API,
raw_payload=payload,
received_at=datetime.utcnow()
)
def _create_no_data_record(self, supplier_id: str) -> RawSupplierData:
return RawSupplierData(
supplier_id=supplier_id,
source=DataSource.ECOVADIS_API,
raw_payload={"status": "no_data", "supplier_id": supplier_id},
received_at=datetime.utcnow()
)
class CDPCollector:
"""Raccoglie dati da CDP (Carbon Disclosure Project)"""
CDP_API_URL = "https://api.cdp.net/v1"
def __init__(self, api_token: str):
self.api_token = api_token
async def search_supplier(
self,
company_name: str,
year: int = 2024
) -> RawSupplierData | None:
"""Cerca un supplier nel database CDP e recupera dati GHG"""
async with httpx.AsyncClient() as client:
# Ricerca azienda
search_resp = await client.get(
f"{self.CDP_API_URL}/companies/search",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"q": company_name, "year": year}
)
if not search_resp.json().get("results"):
return None
company_id = search_resp.json()["results"][0]["id"]
# Recupera dati GHG disclosure
ghg_resp = await client.get(
f"{self.CDP_API_URL}/companies/{company_id}/ghg-emissions",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"year": year}
)
if ghg_resp.status_code != 200:
return None
return RawSupplierData(
supplier_id=company_name,
source=DataSource.CDP_API,
raw_payload=ghg_resp.json(),
received_at=datetime.utcnow()
)
class BronzeLayerStorage:
"""Salvataggio immutabile nel layer Bronze"""
def __init__(self, storage_client, bucket: str):
self.storage = storage_client
self.bucket = bucket
async def store(self, raw_data: RawSupplierData) -> str:
"""Salva dato grezzo con path deterministico basato su hash"""
path = (
f"scope3/bronze/"
f"{raw_data.received_at.year}/"
f"{raw_data.received_at.month:02d}/"
f"{raw_data.supplier_id}/"
f"{raw_data.content_hash}.json"
)
await self.storage.upload_json(
bucket=self.bucket,
path=path,
data={
"supplier_id": raw_data.supplier_id,
"source": raw_data.source.value,
"received_at": raw_data.received_at.isoformat(),
"content_hash": raw_data.content_hash,
"payload": raw_data.raw_payload
}
)
return path
Oparte na aktywności a wydatki: wybór właściwej metody
Protokół GHG definiuje cztery metody obliczeń dla Zakresu 3, które w praktyce tak ograniczyć się do dwóch podstawowych podejść: oparte na działaniu e oparte na wydatkach. Wybór zależy od dostępności danych i istotności kategorii oraz dojrzałość relacji z dostawcą.
Porównanie metodologiczne: oparte na aktywności i na wydatkach
| Rozmiar | Oparte na aktywności | Oparte na wydatkach |
|---|---|---|
| Formuła | Ilość × współczynnik emisji (jednostki/kg CO2e) | Wydatki (EUR) × współczynnik EEIO (kg CO2e/EUR) |
| Dokładność | Wysoki (±5-15% z danymi pierwotnymi) | Niski-Średni (±50-100%) |
| Żądano danych | Wielkości fizyczne (kg, km, kWh, t) | Tylko faktury księgowe (EUR, USD) |
| Źródło EF | Climatiq, IPCC, DEFRA, ecoinvent | USEEIO, EXIOBASE, WIOD |
| Kiedy go używać | Kategorie materiałów, duzi dostawcy | Kick-off, mali dostawcy, Kat. <1% |
| Zbiórka | Wysoki: wymaga współpracy z dostawcami | Niski: Dane już w ERP/SAP |
| Akceptowalność CSRD | Ulubiony w kategoriach materiałowych | Zaakceptowany jako początkowy pełnomocnik |
Optymalna strategia to jedno podejście progresywna hybryda: zacznijmy od w oparciu o wydatki, aby uzyskać szybki punkt odniesienia dla całego łańcucha wartości, następnie jest on stopniowo migrowany w kierunku działań opartych na zidentyfikowanych kategoriach materialnych. Protokół GHG definiuje trzy poziomy jakości danych (poziom 1, 2, 3), które dokładnie odpowiadają temu postępowi.
# calculators/emission_calculator.py
# Calcolo emissioni activity-based e spend-based
from dataclasses import dataclass
from typing import Optional
import math
# ============================================================
# EMISSION FACTORS DATABASE (simplified)
# In produzione: usa Climatiq API o database ecoinvent
# ============================================================
EMISSION_FACTORS: dict[str, dict] = {
# Cat. 1: Purchased goods & services
"cloud_compute_kwh": {
"value": 0.233, # kg CO2e/kWh (IT grid mix 2024)
"unit": "kWh",
"source": "IEA 2024",
"uncertainty_pct": 10.0
},
"hardware_laptop": {
"value": 350.0, # kg CO2e/unit (embodied carbon)
"unit": "unit",
"source": "Dell 2024 PCF",
"uncertainty_pct": 20.0
},
# Cat. 6: Business travel
"flight_economy_short": {
"value": 0.255, # kg CO2e/passenger.km
"unit": "passenger.km",
"source": "DEFRA 2024",
"uncertainty_pct": 15.0
},
"flight_economy_long": {
"value": 0.195,
"unit": "passenger.km",
"source": "DEFRA 2024",
"uncertainty_pct": 15.0
},
# Cat. 7: Employee commuting
"car_average": {
"value": 0.170, # kg CO2e/km
"unit": "km",
"source": "DEFRA 2024",
"uncertainty_pct": 12.0
},
"public_transport_it": {
"value": 0.048,
"unit": "passenger.km",
"source": "Ispra 2024",
"uncertainty_pct": 18.0
},
}
# EEIO Spend-based factors (EXIOBASE 3.8)
# kg CO2e per EUR di spesa per categoria merceologica
EEIO_FACTORS: dict[str, float] = {
"it_services": 0.312, # IT e telecomunicazioni
"professional_services": 0.198, # Consulenza, legale, etc.
"office_supplies": 0.445,
"cloud_hosting": 0.287,
"marketing": 0.231,
"utilities": 0.892,
"hr_services": 0.167,
"travel_accommodation": 0.521,
}
def calculate_activity_based(
activity_type: str,
quantity: float,
custom_ef: Optional[float] = None
) -> tuple[float, float]:
"""
Calcola emissioni con metodo activity-based.
Returns:
(co2e_kg, uncertainty_pct)
"""
if custom_ef is not None:
return quantity * custom_ef, 30.0 # alta incertezza EF custom
ef_data = EMISSION_FACTORS.get(activity_type)
if not ef_data:
raise ValueError(f"Emission factor non trovato: {activity_type}")
co2e_kg = quantity * ef_data["value"]
uncertainty = ef_data["uncertainty_pct"]
return co2e_kg, uncertainty
def calculate_spend_based(
spend_eur: float,
procurement_category: str,
inflation_correction: float = 1.0
) -> tuple[float, float]:
"""
Calcola emissioni con metodo spend-based (EEIO).
Args:
spend_eur: importo in EUR
procurement_category: categoria merceologica EEIO
inflation_correction: fattore per correggere inflazione vs anno base EEIO
Returns:
(co2e_kg, uncertainty_pct)
"""
eeio_factor = EEIO_FACTORS.get(procurement_category)
if not eeio_factor:
raise ValueError(f"EEIO factor non trovato: {procurement_category}")
# Corregge per inflazione (EEIO factors spesso in EUR 2015)
adjusted_spend = spend_eur / inflation_correction
co2e_kg = adjusted_spend * eeio_factor
# Lo spend-based ha incertezza intrinsecamente alta
uncertainty = 75.0
return co2e_kg, uncertainty
def propagate_uncertainty(
values: list[float],
uncertainties_pct: list[float]
) -> float:
"""
Propagazione incertezza quadratica (somma in quadratura).
Valida quando le incertezze sono indipendenti.
Returns:
uncertainty_pct sul totale
"""
weighted_variance_sum = sum(
(v * u/100) ** 2
for v, u in zip(values, uncertainties_pct)
)
total = sum(values)
if total == 0:
return 0.0
combined_std = math.sqrt(weighted_variance_sum)
return (combined_std / total) * 100
def calculate_category_total(
activities: list[dict]
) -> dict:
"""
Calcola totale categoria Scope 3 con propagazione incertezza.
activities: lista di {method, value_kg, uncertainty_pct}
"""
if not activities:
return {"total_co2e_kg": 0.0, "uncertainty_pct": 0.0}
values = [a["value_kg"] for a in activities]
uncertainties = [a["uncertainty_pct"] for a in activities]
total_co2e = sum(values)
combined_uncertainty = propagate_uncertainty(values, uncertainties)
# Qualità aggregata: peggiore del gruppo determina il tier
quality_tiers = [a.get("quality_tier", "TIER_3") for a in activities]
dominant_tier = min(quality_tiers) # TIER_1 < TIER_2 < TIER_3 lexicograficamente
return {
"total_co2e_kg": total_co2e,
"total_co2e_tonnes": total_co2e / 1000,
"uncertainty_pct": combined_uncertainty,
"uncertainty_kg": total_co2e * combined_uncertainty / 100,
"dominant_quality_tier": dominant_tier,
"activity_count": len(activities)
}
Przepływ powietrza w rurociągu: DAG do zautomatyzowanych obliczeń w zakresie 3
Orkiestracja potoku zakresu 3 wymaga dobrze zorganizowanego DAG, który zarządza pełny cykl życia: gromadzenie danych, standaryzacja, obliczanie emisji, kontrola jakości i publikowanie w warstwie Gold. DAG musi być idempotentny (wykonywalny kilka razy bez skutków ubocznych) e resetowalne w przypadku częściowa awaria.
# dags/scope3_pipeline_dag.py
# Apache Airflow DAG per pipeline emissioni Scope 3
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task, task_group
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import Variable
import json
import logging
logger = logging.getLogger(__name__)
# ============================================================
# CONFIGURAZIONE DAG
# ============================================================
SCOPE3_DAG_CONFIG = {
"reporting_year": 2024,
"companies": [
{"id": "S001", "name": "AWS", "tier": "TIER_1", "source": "ecovadis"},
{"id": "S002", "name": "Microsoft Azure", "tier": "TIER_1", "source": "cdp"},
{"id": "S003", "name": "Supplier_XYZ", "tier": "TIER_2", "source": "questionnaire"},
# ... altri supplier
],
"categories_enabled": [1, 2, 3, 6, 7, 11, 15],
"quality_threshold_pct": 80.0,
"alert_email": "esg-team@company.com"
}
default_args = {
"owner": "esg-team",
"depends_on_past": False,
"email_on_failure": True,
"email": [SCOPE3_DAG_CONFIG["alert_email"]],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="scope3_emissions_pipeline",
default_args=default_args,
description="Pipeline calcolo emissioni Scope 3 value chain",
schedule_interval="@quarterly", # Esecuzione trimestrale
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["emissions", "scope3", "esg", "ghg-protocol"],
max_active_runs=1, # Serializza: mai due calcoli in parallelo
) as dag:
# ============================================================
# FASE 1: RACCOLTA DATI SUPPLIER (in parallelo per supplier)
# ============================================================
@task_group(group_id="data_collection")
def collect_supplier_data():
@task(task_id="fetch_ecovadis_suppliers")
def fetch_ecovadis() -> list[dict]:
"""Raccoglie dati da EcoVadis Carbon Data Network"""
from collectors.supplier_collector import EcoVadisCollector
import asyncio
api_key = Variable.get("ECOVADIS_API_KEY", deserialize_json=False)
collector = EcoVadisCollector(api_key, "https://api.ecovadis.com")
suppliers_ecovadis = [
s for s in SCOPE3_DAG_CONFIG["companies"]
if s["source"] == "ecovadis"
]
results = []
for supplier in suppliers_ecovadis:
raw = asyncio.run(
collector.fetch_supplier_carbon_data(
supplier["id"],
SCOPE3_DAG_CONFIG["reporting_year"]
)
)
results.append({
"supplier_id": raw.supplier_id,
"content_hash": raw.content_hash,
"status": "fetched",
"has_data": raw.raw_payload.get("status") != "no_data"
})
logger.info(f"EcoVadis - Supplier {supplier['id']}: fetched")
return results
@task(task_id="fetch_cdp_suppliers")
def fetch_cdp() -> list[dict]:
"""Raccoglie dati verificati da CDP"""
from collectors.supplier_collector import CDPCollector
import asyncio
api_token = Variable.get("CDP_API_TOKEN")
collector = CDPCollector(api_token)
suppliers_cdp = [
s for s in SCOPE3_DAG_CONFIG["companies"]
if s["source"] == "cdp"
]
results = []
for supplier in suppliers_cdp:
raw = asyncio.run(
collector.search_supplier(
supplier["name"],
SCOPE3_DAG_CONFIG["reporting_year"]
)
)
if raw:
results.append({
"supplier_id": supplier["id"],
"content_hash": raw.content_hash,
"status": "fetched",
"has_data": True
})
else:
results.append({
"supplier_id": supplier["id"],
"status": "not_found",
"has_data": False
})
return results
@task(task_id="process_manual_questionnaires")
def process_questionnaires() -> list[dict]:
"""Processa questionari manuali caricati in S3"""
# In produzione: legge da bucket S3 o SharePoint
# Qui restituiamo dati di esempio
return [{
"supplier_id": "S003",
"status": "processed",
"has_data": True,
"scope3_cat1_tco2e": 45.2,
"scope3_cat6_tco2e": 12.8
}]
ev = fetch_ecovadis()
cdp = fetch_cdp()
q = process_questionnaires()
return [ev, cdp, q]
# ============================================================
# FASE 2: STANDARDIZZAZIONE E CALCOLO EMISSIONI
# ============================================================
@task(task_id="standardize_activities")
def standardize_activities(collection_results: list) -> list[dict]:
"""Normalizza tutti i dati in unità fisiche standard"""
from normalizers.activity_normalizer import ActivityNormalizer
normalizer = ActivityNormalizer()
standardized = []
for batch in collection_results:
for result in batch:
if result.get("has_data"):
activities = normalizer.normalize(result)
standardized.extend(activities)
logger.info(f"Standardizzate {len(standardized)} attività")
return standardized
@task(task_id="calculate_emissions")
def calculate_emissions(activities: list[dict]) -> list[dict]:
"""Calcola emissioni CO2e per ogni attività standardizzata"""
from calculators.emission_calculator import (
calculate_activity_based,
calculate_spend_based
)
from models.scope3_pipeline import CalculationMethod
results = []
for activity in activities:
if activity["method"] == "activity_based":
co2e_kg, uncertainty = calculate_activity_based(
activity["activity_type"],
activity["quantity"]
)
method = CalculationMethod.ACTIVITY_BASED
else:
co2e_kg, uncertainty = calculate_spend_based(
activity["spend_eur"],
activity["procurement_category"]
)
method = CalculationMethod.SPEND_BASED
results.append({
**activity,
"co2e_kg": co2e_kg,
"co2e_tonnes": co2e_kg / 1000,
"uncertainty_pct": uncertainty,
"calculation_method": method.value,
"calculated_at": datetime.utcnow().isoformat()
})
return results
# ============================================================
# FASE 3: DATA QUALITY CHECK
# ============================================================
@task(task_id="data_quality_check")
def data_quality_check(results: list[dict]) -> dict:
"""Verifica qualità dati e genera score per categoria"""
from quality.data_quality_scorer import DataQualityScorer
scorer = DataQualityScorer()
quality_report = scorer.score_results(results)
if quality_report["overall_score"] < SCOPE3_DAG_CONFIG["quality_threshold_pct"]:
logger.warning(
f"Quality score sotto soglia: {quality_report['overall_score']}%"
)
return quality_report
# ============================================================
# FASE 4: AUDIT TRAIL E PUBBLICAZIONE GOLD LAYER
# ============================================================
@task(task_id="create_audit_trail")
def create_audit_trail(
results: list[dict],
quality_report: dict
) -> str:
"""Crea audit trail immutabile con hash chain"""
from audit.hash_chain import HashChain
chain = HashChain()
chain_id = chain.create_chain(
calculation_results=results,
quality_report=quality_report,
pipeline_version="2.1.0",
methodology="GHG_Protocol_Scope3_2011",
reporting_standard="CSRD_ESRS_E1"
)
logger.info(f"Audit trail creato: {chain_id}")
return chain_id
@task(task_id="publish_gold_layer")
def publish_gold_layer(
results: list[dict],
audit_chain_id: str
) -> None:
"""Pubblica dati aggregati nel Gold layer (PostgreSQL)"""
hook = PostgresHook(postgres_conn_id="emissions_db")
for result in results:
hook.run(
"""
INSERT INTO scope3_emissions_gold (
supplier_id, scope3_category, co2e_tonnes,
calculation_method, uncertainty_pct,
quality_tier, audit_chain_id,
reporting_year, published_at
) VALUES (
%(supplier_id)s, %(scope3_category)s, %(co2e_tonnes)s,
%(calculation_method)s, %(uncertainty_pct)s,
%(quality_tier)s, %(audit_chain_id)s,
%(reporting_year)s, NOW()
)
ON CONFLICT (supplier_id, scope3_category, reporting_year)
DO UPDATE SET
co2e_tonnes = EXCLUDED.co2e_tonnes,
updated_at = NOW()
""",
parameters={
**result,
"audit_chain_id": audit_chain_id,
"reporting_year": SCOPE3_DAG_CONFIG["reporting_year"]
}
)
logger.info(f"Pubblicati {len(results)} record nel Gold layer")
# ============================================================
# WIRING DEL DAG
# ============================================================
collection_results = collect_supplier_data()
standardized = standardize_activities(collection_results)
emission_results = calculate_emissions(standardized)
quality = data_quality_check(emission_results)
chain_id = create_audit_trail(emission_results, quality)
publish_gold_layer(emission_results, chain_id)
Ocena jakości danych i propagacja niepewności
Standard dotyczący zakresu 3 protokołu GHG wyraźnie uznaje tę kategorię. 1-15 nie nigdy nie wiadomo z całkowitą pewnością. Sprawozdawczość dotycząca jakości musi zawierać kosztorys zniepewność ilościowa powiązane z każdą kategorią. IPCC dokonało sformalizowania metodę propagacji niepewności opisaną w Wytycznych dobrych praktyk.
# quality/data_quality_scorer.py
# Scoring qualità dati Scope 3
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import math
from datetime import datetime, timedelta
class QualityDimension(Enum):
COMPLETENESS = "completeness"
ACCURACY = "accuracy"
TIMELINESS = "timeliness"
VERIFICATION = "verification"
GRANULARITY = "granularity"
@dataclass
class QualityScore:
dimension: QualityDimension
score: float # 0-100
weight: float # peso nel calcolo aggregato
notes: str = ""
def score_supplier_data_quality(
supplier: dict,
reference_date: datetime = None
) -> dict[str, float]:
"""
Calcola score qualità multi-dimensionale per i dati di un supplier.
Basato su GHG Protocol Data Quality Guidance.
"""
if reference_date is None:
reference_date = datetime.utcnow()
scores = []
# 1. COMPLETENESS: quante delle categorie richieste sono presenti?
required_fields = [
"scope3_cat1_tco2e", "scope3_cat6_tco2e", "scope3_cat7_tco2e"
]
present = sum(1 for f in required_fields if supplier.get(f) is not None)
completeness_score = (present / len(required_fields)) * 100
scores.append(QualityScore(
dimension=QualityDimension.COMPLETENESS,
score=completeness_score,
weight=0.30
))
# 2. TIMELINESS: quanto sono recenti i dati?
data_year = supplier.get("reporting_year", 2020)
current_year = reference_date.year
age_years = current_year - data_year
if age_years <= 1:
timeliness_score = 100.0
elif age_years == 2:
timeliness_score = 75.0
elif age_years == 3:
timeliness_score = 50.0
else:
timeliness_score = 20.0
scores.append(QualityScore(
dimension=QualityDimension.TIMELINESS,
score=timeliness_score,
weight=0.20
))
# 3. VERIFICATION: i dati sono stati verificati da terze parti?
verification_level = supplier.get("verification", "none")
verification_score = {
"independent_assured": 100.0, # GHG verificato da auditor indipendente
"limited_assurance": 80.0, # Limited assurance
"internal_reviewed": 60.0, # Solo review interna
"supplier_declared": 40.0, # Auto-dichiarazione
"estimated": 20.0, # Stima spend-based
"none": 0.0
}.get(verification_level, 20.0)
scores.append(QualityScore(
dimension=QualityDimension.VERIFICATION,
score=verification_score,
weight=0.30
))
# 4. GRANULARITY: attività-specifico o aggregato?
data_type = supplier.get("data_type", "aggregated")
granularity_score = {
"site_specific": 100.0, # Dati per sito produttivo
"product_specific": 90.0, # PCF per prodotto/servizio
"supplier_specific": 70.0, # Dato totale supplier
"sector_average": 40.0, # Media settoriale
"aggregated": 20.0
}.get(data_type, 20.0)
scores.append(QualityScore(
dimension=QualityDimension.GRANULARITY,
score=granularity_score,
weight=0.20
))
# Calcolo score aggregato ponderato
overall = sum(s.score * s.weight for s in scores)
# Mappa score a Tier GHG Protocol
if overall >= 80:
tier = "TIER_1"
uncertainty_band_pct = 15.0
elif overall >= 50:
tier = "TIER_2"
uncertainty_band_pct = 40.0
else:
tier = "TIER_3"
uncertainty_band_pct = 75.0
return {
"overall_score": round(overall, 1),
"tier": tier,
"uncertainty_band_pct": uncertainty_band_pct,
"dimension_scores": {
s.dimension.value: round(s.score, 1)
for s in scores
}
}
def monte_carlo_uncertainty(
base_estimate_tco2e: float,
uncertainty_pct: float,
n_simulations: int = 10_000
) -> dict:
"""
Stima intervallo di confidenza con simulazione Monte Carlo.
Per reporting CSRD si raccomanda almeno 1.000 simulazioni.
"""
import random
# Distribuzione log-normale (emissioni non possono essere negative)
sigma = math.log(1 + (uncertainty_pct / 100) ** 2) ** 0.5
mu = math.log(base_estimate_tco2e) - sigma ** 2 / 2
simulated = [
math.exp(random.gauss(mu, sigma))
for _ in range(n_simulations)
]
simulated_sorted = sorted(simulated)
p05 = simulated_sorted[int(n_simulations * 0.05)]
p50 = simulated_sorted[int(n_simulations * 0.50)]
p95 = simulated_sorted[int(n_simulations * 0.95)]
return {
"base_estimate_tco2e": base_estimate_tco2e,
"p05_tco2e": round(p05, 2),
"p50_tco2e": round(p50, 2),
"p95_tco2e": round(p95, 2),
"confidence_interval_90pct": {
"lower": round(p05, 2),
"upper": round(p95, 2)
},
"coefficient_of_variation": round(
(p95 - p05) / (2 * p50) * 100, 1
)
}
Niezmienny ślad audytu z łańcuchem mieszającym
La identyfikowalność od końca do końca jest to jedno z najważniejszych wymagań weryfikowalne raportowanie w zakresie 3. Audytorzy zewnętrzni muszą być w stanie prześledzić każdą liczbę w raporcie aż do pierwotnego źródła danych, przechodząc przez wszystkie etapy transformacji. A łańcuchy haszowe inspirowany technologią blockchain (ale bez złożoności rozproszone) gwarantuje niezmienność ścieżki audytu.
# audit/hash_chain.py
# Audit trail immutabile per emissioni Scope 3
import hashlib
import json
import uuid
from datetime import datetime
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class AuditRecord:
"""Singolo record nell'audit chain"""
def __init__(
self,
record_type: str,
payload: dict,
previous_hash: str,
chain_id: str,
sequence: int
):
self.record_id = str(uuid.uuid4())
self.record_type = record_type
self.payload = payload
self.previous_hash = previous_hash
self.chain_id = chain_id
self.sequence = sequence
self.created_at = datetime.utcnow().isoformat()
self.record_hash = self._compute_hash()
def _compute_hash(self) -> str:
"""SHA-256 hash di tutti i campi del record (eccetto il hash stesso)"""
data = {
"record_id": self.record_id,
"record_type": self.record_type,
"chain_id": self.chain_id,
"sequence": self.sequence,
"previous_hash": self.previous_hash,
"created_at": self.created_at,
"payload_hash": hashlib.sha256(
json.dumps(self.payload, sort_keys=True, default=str).encode()
).hexdigest()
}
return hashlib.sha256(
json.dumps(data, sort_keys=True).encode()
).hexdigest()
def to_dict(self) -> dict:
return {
"record_id": self.record_id,
"record_type": self.record_type,
"chain_id": self.chain_id,
"sequence": self.sequence,
"previous_hash": self.previous_hash,
"record_hash": self.record_hash,
"created_at": self.created_at,
"payload": self.payload
}
class HashChain:
"""
Hash chain per audit trail immutabile emissioni Scope 3.
Ogni record contiene l'hash del record precedente,
rendendo impossibile modificare un record senza invalidare
tutti i record successivi.
"""
GENESIS_HASH = "0" * 64 # Hash del primo record della chain
def __init__(self, db_client=None):
self.db = db_client
self.records: list[AuditRecord] = []
def create_chain(
self,
calculation_results: list[dict],
quality_report: dict,
pipeline_version: str,
methodology: str,
reporting_standard: str
) -> str:
"""Crea una nuova chain per un calcolo Scope 3 completo"""
chain_id = str(uuid.uuid4())
previous_hash = self.GENESIS_HASH
# Record 1: Metadati della pipeline
pipeline_record = AuditRecord(
record_type="PIPELINE_METADATA",
payload={
"version": pipeline_version,
"methodology": methodology,
"reporting_standard": reporting_standard,
"calculation_timestamp": datetime.utcnow().isoformat(),
"total_activities": len(calculation_results)
},
previous_hash=previous_hash,
chain_id=chain_id,
sequence=0
)
self.records.append(pipeline_record)
previous_hash = pipeline_record.record_hash
# Record 2: Quality report
quality_record = AuditRecord(
record_type="QUALITY_ASSESSMENT",
payload=quality_report,
previous_hash=previous_hash,
chain_id=chain_id,
sequence=1
)
self.records.append(quality_record)
previous_hash = quality_record.record_hash
# Record 3..N: Singoli risultati di emissione
for i, result in enumerate(calculation_results):
emission_record = AuditRecord(
record_type="EMISSION_CALCULATION",
payload={
"supplier_id": result.get("supplier_id"),
"scope3_category": result.get("scope3_category"),
"co2e_tonnes": result.get("co2e_tonnes"),
"calculation_method": result.get("calculation_method"),
"emission_factor_source": result.get("emission_factor_source"),
"uncertainty_pct": result.get("uncertainty_pct"),
"quality_tier": result.get("quality_tier")
},
previous_hash=previous_hash,
chain_id=chain_id,
sequence=2 + i
)
self.records.append(emission_record)
previous_hash = emission_record.record_hash
# Persist su DB (o storage immutabile)
if self.db:
self._persist_chain(chain_id)
logger.info(
f"Chain {chain_id} creata con {len(self.records)} record. "
f"Final hash: {previous_hash[:16]}..."
)
return chain_id
def verify_chain_integrity(self, chain_id: str) -> bool:
"""
Verifica che nessun record sia stato alterato.
Percorre la chain ricomputando ogni hash.
"""
records = self._load_chain(chain_id)
if not records:
return False
expected_previous = self.GENESIS_HASH
for record_dict in records:
# Ricomputa hash
record = AuditRecord(
record_type=record_dict["record_type"],
payload=record_dict["payload"],
previous_hash=record_dict["previous_hash"],
chain_id=record_dict["chain_id"],
sequence=record_dict["sequence"]
)
if record_dict["previous_hash"] != expected_previous:
logger.error(
f"Chain corrotta al record {record_dict['sequence']}: "
f"previous_hash non corrisponde"
)
return False
expected_previous = record.record_hash
return True
def _persist_chain(self, chain_id: str) -> None:
"""Salva tutti i record della chain nel DB"""
for record in self.records:
self.db.insert("scope3_audit_chain", record.to_dict())
def _load_chain(self, chain_id: str) -> list[dict]:
"""Carica i record della chain dal DB in ordine di sequenza"""
if not self.db:
return [r.to_dict() for r in self.records]
return self.db.query(
"SELECT * FROM scope3_audit_chain WHERE chain_id = %s ORDER BY sequence",
[chain_id]
)
Wizualizacja: diagram Sankeya i kategorie mapy cieplnej
Dobrze zbudowany potok Zakresu 3 musi także generować wizualizacje renderujące dane zrozumiałe dla zainteresowanych stron technicznych i nietechnicznych. The diagram Sankeya to jest idealne narzędzie do pokazania przepływów emisji w łańcuchu wartości, podczas gdy a mapa cieplna pozwala szybko zidentyfikować najbardziej istotne kategorie oraz te o niższej jakości danych.
# visualizations/scope3_charts.py
# Generazione Sankey diagram e heatmap Scope 3
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
from typing import Optional
def create_scope3_sankey(
emission_data: list[dict],
title: str = "Scope 3 Value Chain Emissions"
) -> go.Figure:
"""
Crea Sankey diagram per visualizzare flussi emissioni Scope 3.
Struttura: Supplier -> Categoria S3 -> Totale Scope 3
"""
# Raccoglie nodi unici
suppliers = list(set(d["supplier_id"] for d in emission_data))
categories = list(set(f"Cat. {d['scope3_category']}" for d in emission_data))
all_nodes = suppliers + categories + ["Scope 3 Total"]
node_index = {node: i for i, node in enumerate(all_nodes)}
# Costruisce link source->target->value
source_indices = []
target_indices = []
values = []
link_labels = []
for record in emission_data:
supplier = record["supplier_id"]
category = f"Cat. {record['scope3_category']}"
tco2e = record["co2e_tonnes"]
# Supplier -> Categoria
source_indices.append(node_index[supplier])
target_indices.append(node_index[category])
values.append(tco2e)
link_labels.append(f"{tco2e:.1f} tCO2e")
# Categoria -> Total
for cat in categories:
cat_total = sum(
d["co2e_tonnes"]
for d in emission_data
if f"Cat. {d['scope3_category']}" == cat
)
source_indices.append(node_index[cat])
target_indices.append(node_index["Scope 3 Total"])
values.append(cat_total)
link_labels.append(f"{cat_total:.1f} tCO2e")
# Colori nodi
node_colors = (
["#2196F3"] * len(suppliers) + # Blu per supplier
["#FF9800"] * len(categories) + # Arancione per categorie
["#4CAF50"] # Verde per totale
)
fig = go.Figure(go.Sankey(
arrangement="snap",
node=dict(
pad=20,
thickness=20,
line=dict(color="white", width=0.5),
label=all_nodes,
color=node_colors,
hovertemplate="{label}
tCO2e: {value:.1f}<extra></extra>"
),
link=dict(
source=source_indices,
target=target_indices,
value=values,
label=link_labels,
color="rgba(100,100,100,0.3)"
)
))
fig.update_layout(
title_text=title,
font_size=12,
height=600,
paper_bgcolor="rgba(0,0,0,0)",
plot_bgcolor="rgba(0,0,0,0)"
)
return fig
def create_category_heatmap(
emission_data: list[dict]
) -> go.Figure:
"""
Heatmap: asse X = categoria Scope 3, asse Y = qualità dato.
Colore = tCO2e. Aiuta a prioritizzare effort raccolta dati.
"""
df = pd.DataFrame(emission_data)
# Aggrega per categoria e tier qualità
pivot = df.pivot_table(
values="co2e_tonnes",
index="quality_tier",
columns="scope3_category",
aggfunc="sum",
fill_value=0
)
# Ordina tier (TIER_1 migliore in alto)
tier_order = ["TIER_1", "TIER_2", "TIER_3"]
pivot = pivot.reindex(
[t for t in tier_order if t in pivot.index]
)
fig = go.Figure(go.Heatmap(
z=pivot.values,
x=[f"Cat. {c}" for c in pivot.columns],
y=list(pivot.index),
colorscale="RdYlGn_r", # Rosso = alta emissione (critico)
text=pivot.values.round(1),
texttemplate="%{text} t",
textfont={"size": 11},
hovertemplate="Categoria: %{x}
Tier: %{y}
%{z:.1f} tCO2e<extra></extra>",
colorbar=dict(title="tCO2e")
))
fig.update_layout(
title="Heatmap Scope 3: Emissioni per Categoria e Qualità Dato",
xaxis_title="Categoria GHG Protocol",
yaxis_title="Tier Qualità Dato",
height=350,
margin=dict(l=80, r=20, t=60, b=60)
)
return fig
def generate_scope3_dashboard_html(
emission_data: list[dict],
output_path: str
) -> None:
"""Genera report HTML standalone con tutti i grafici"""
sankey = create_scope3_sankey(emission_data)
heatmap = create_category_heatmap(emission_data)
total_tco2e = sum(d["co2e_tonnes"] for d in emission_data)
by_category = {}
for d in emission_data:
cat = d["scope3_category"]
by_category[cat] = by_category.get(cat, 0) + d["co2e_tonnes"]
top_category = max(by_category, key=by_category.get)
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>Scope 3 Emissions Report</title>
<meta charset="utf-8">
</head>
<body>
<h1>Scope 3 Value Chain Emissions Report</h1>
<p>Totale: <strong>{total_tco2e:.1f} tCO2e</strong></p>
<p>Categoria più materiale: Cat. {top_category}
({by_category[top_category]:.1f} tCO2e)</p>
{sankey.to_html(full_html=False)}
{heatmap.to_html(full_html=False)}
</body>
</html>
"""
with open(output_path, "w") as f:
f.write(html_content)
CSRD/ESRS E1 Wymagania dotyczące zakresu raportowania 3
La Dyrektywa w sprawie raportowania zrównoważonego rozwoju korporacyjnego (CSRD) i związaną z nim normę ESRS E1 (zmiany klimatyczne) przekształcają raportowanie w zakresie 3 z dobrowolnego na obowiązkowe dla tysięcy europejskich przedsiębiorstw. Harmonogram realizacji jest rozłożony i już trwa.
Wymagany harmonogram CSRD Zakres 3
| FY | Zgłoś, wejdę | Przedmioty | Notatki |
|---|---|---|---|
| 2024 | Początek 2025 r | Duże JIP już objęte NFRO (>500 pracowników) | Pierwsza fala: ~12 000 przedsiębiorstw z UE |
| 2025 | Początek 2026 r | Wszystkie duże firmy (>250 działów lub >40 mln EUR) | ~50 000 firm z UE |
| 2026 | Początek 2027 r | Notowane MŚP | Uproszczony standard ESRS |
| 2028 | Początek 2029 r | Spółki spoza UE posiadające spółki zależne w UE | Znaczący wpływ globalny |
ESRS E1 wymaga w szczególności dla emisji z zakresu 3:
- Ujawnienie wszystkich kategorii materiałów: należy określić istotność poprzez analizę podwójnej istotności (wpływ + ryzyko finansowe). W większości firm technologicznych co najmniej 4-6 kategorii jest istotnych.
- Podział według kategorii: wartości nie można podawać jako pojedynczej sumy kruszywo; każda kategoria materiału musi mieć własne dane w tCO2e.
- Jasna metodologia: dla każdej kategorii należy zadeklarować metodę obliczeń (oparte na działaniach, wydatkach, specyficzne dla dostawcy), źródło współczynników emisji i poziom dokładności jakość danych.
- Brak siatki z kredytami węglowymi: Należy zgłosić emisję brutto oddzielnie od zakupionego odszkodowania lub kompensacji emisji dwutlenku węgla.
- Obowiązkowe ubezpieczenie: początkowo ograniczona pewność, której celem jest w przyszłości uzyskać wystarczającą pewność. Ścieżka audytu opisana w tym artykule tego właśnie będą wymagać recenzenci.
- Cel i plan przejścia: firmy muszą deklarować cele redukcja dostosowana do 1,5°C (najlepiej zatwierdzona przez SBTi) z pośrednimi kamieniami milowymi.
Uwaga: Zakres 3 i podwójna istotność
ESRS E1 nie wymaga raportowania wszystkich 15 kategorii z zakresu 3, a jedynie te zidentyfikowane jak przybory w analizie podwójnej materialności. Jednak proces określenie istotności musi być udokumentowane i podlegać audytowi. Wyklucz jeden kategoria „ze względu na brak danych” nie jest dopuszczalnym uzasadnieniem: należy ją wykazać że kategoria ta nie jest istotna dla konkretnej działalności.
Studium przypadku: Firma SaaS z 50 dostawcami
Przełóżmy wszystko na konkretny przykład: włoski SaaS średniej wielkości zatrudniający 150 pracowników, przychody 15 mln EUR, infrastruktura na AWS i 50 aktywnych dostawców. Kierownictwo zdecydowało rozpocznij obliczenia Zakresu 3 przed CSRD i miej 3-miesięczny okres na realizację dane weryfikowalne przez audytora.
Profil firmy: SaaS Italia S.r.l.
| Parametr | Wartość |
|---|---|
| Pracownicy | 150 (70% inteligentnej pracy) |
| Lokalizacje | Siedziba główna w Mediolanie + biuro w Rzymie |
| Infrastruktura | AWS eu-west-1 (podstawowy), GCP europe-west1 (zapasowy) |
| Aktywni dostawcy | 50 (8 dużych, 42 małych/średnich) |
| Koszt zakupu | ~4,2 mln EUR/rok |
| Coroczne loty | ~380 lotów (konferencje + klienci) |
Faza 1 – Analiza istotności (tydzień 1-2): Zespół ESG ma przeprowadził szybką analizę w celu zidentyfikowania kategorii materiałów z zakresu 3. Korzystając z danych z wydatków z ERP (SAP) jako początkowego wskaźnika zastępczego z czynnikami EEIO, uzyskali to oszacowanie „przesiew”:
Zakres 3 Kontrola istotności — SaaS Italia S.r.l.
| Kot. | Opis | Szacunki oparte na wydatkach (tCO2e) | % całości | Decyzja |
|---|---|---|---|---|
| 1 | Zakupione towary i usługi (chmura, SW) | 342 | 54% | MATERIAŁ → Oparte na działaniu |
| 6 | Podróże służbowe | 98 | 15% | MATERIAŁ → Oparte na działaniu |
| 7 | Dojazdy pracowników | 87 | 14% | MATERIAŁ → Badania pracownicze |
| 11 | Wykorzystanie sprzedanych produktów | 76 | 12% | MATERIAŁ → Pomiar NART |
| 2 | Dobra kapitałowe (laptopy, sprzęt) | 28 | 4% | MATERIAŁ → Dostawca PCF |
| Inny | Kot. 3, 5, 8, 15 | 6 | 1% | Niematerialne → Oparte na wydatkach |
Faza 2 – Zbieranie danych (Tydzień 2-8):
- Kot. 1 (Chmura): Narzędzie do pomiaru śladu węglowego klienta AWS i ślad węglowy GCP dostarczają miesięcznych danych dotyczących emisji na konto. Dane wyodrębnione poprzez API i załadowane do Bronze warstwy. Jakość: TIER 1 (specyficzna dla dostawcy, zweryfikowana przez AWS).
- Kot. 1 (oprogramowanie i usługi): Skontaktowano się z 8 dużymi dostawcami (>50 tys. EUR/rok). z ustrukturyzowanym kwestionariuszem. 5 odpowiedziało danymi pierwotnymi (m.in. Microsoft ERP, Slack, Salesforce’a). 3 nie mają danych → oparte na wydatkach z EEIO.
- Kot. 6 (Podróże służbowe): dane pochodzące z biura podróży (Carlson Wagonlit) poprzez API: 380 lotów z trasą i klasą. Kalkulacja oparta na aktywności za pomocą DEFRA 2024.
- Kot. 7 (dojazdy): anonimowa ankieta skierowana do wszystkich 150 pracowników (wskaźnik odpowiedzi 82%). Środki transportu, średnia odległość, dni w tygodniu w biurze.
- Kot. 11 (Korzystanie ze sprzedanych produktów): Obliczono SCI (ang. Software Carbon Intensity). z CodeCarbon w zakresie infrastruktury produkcyjnej. Pomnożona przez liczbę aktywnych sesji/miesiąc.
# case_study/saas_italia_scope3.py
# Calcolo completo Scope 3 per SaaS Italia S.r.l.
from calculators.emission_calculator import (
calculate_activity_based,
calculate_spend_based,
calculate_category_total
)
def calculate_cat1_cloud() -> dict:
"""Cat. 1: Emissioni cloud AWS + GCP (dati primari vendor)"""
# Dati estratti dall'AWS Customer Carbon Footprint API
aws_kwh_year = 187_500 # kWh totali 2024
gcp_kwh_year = 12_300
aws_ef = 0.233 # kg CO2e/kWh IT grid (AWS eu-west-1)
gcp_ef = 0.198 # kg CO2e/kWh GCP europe-west1
aws_co2, aws_unc = calculate_activity_based("cloud_compute_kwh", aws_kwh_year, aws_ef)
gcp_co2, gcp_unc = calculate_activity_based("cloud_compute_kwh", gcp_kwh_year, gcp_ef)
activities = [
{"value_kg": aws_co2, "uncertainty_pct": 8.0, "quality_tier": "TIER_1"},
{"value_kg": gcp_co2, "uncertainty_pct": 10.0, "quality_tier": "TIER_1"},
]
result = calculate_category_total(activities)
result["category"] = 1
result["sub_category"] = "cloud_infrastructure"
return result
def calculate_cat6_business_travel() -> dict:
"""Cat. 6: Business travel (dati agenzia viaggi)"""
# 380 voli totali anno 2024
# 60% corto raggio (<1500km), 40% lungo raggio
short_haul_pkm = 380 * 0.6 * 850 # 850km avg corto raggio
long_haul_pkm = 380 * 0.4 * 3200 # 3200km avg lungo raggio
short_co2, short_unc = calculate_activity_based(
"flight_economy_short", short_haul_pkm
)
long_co2, long_unc = calculate_activity_based(
"flight_economy_long", long_haul_pkm
)
# Radiative forcing factor x1.9 per quota alta
rf_factor = 1.9
short_co2 *= rf_factor
long_co2 *= rf_factor
activities = [
{"value_kg": short_co2, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"},
{"value_kg": long_co2, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"},
]
result = calculate_category_total(activities)
result["category"] = 6
return result
def calculate_cat7_commuting() -> dict:
"""Cat. 7: Employee commuting (survey 123/150 dipendenti)"""
# Risultati survey (valori medi per dipendente/anno)
commuters = {
"car_solo": {"count": 38, "km_day": 28, "days_year": 120},
"car_shared": {"count": 12, "km_day": 22, "days_year": 110},
"public_transport": {"count": 52, "km_day": 35, "days_year": 140},
"cycling_walking": {"count": 21, "km_day": 4, "days_year": 150},
}
activities = []
# Auto privata
car_pkm = (
commuters["car_solo"]["count"] *
commuters["car_solo"]["km_day"] *
commuters["car_solo"]["days_year"]
)
co2_car, unc = calculate_activity_based("car_average", car_pkm)
activities.append({"value_kg": co2_car, "uncertainty_pct": 15.0, "quality_tier": "TIER_2"})
# Trasporto pubblico
pt_pkm = (
commuters["public_transport"]["count"] *
commuters["public_transport"]["km_day"] *
commuters["public_transport"]["days_year"]
)
co2_pt, unc = calculate_activity_based("public_transport_it", pt_pkm)
activities.append({"value_kg": co2_pt, "uncertainty_pct": 20.0, "quality_tier": "TIER_2"})
# Ciclismo/piedi: zero emissioni dirette
activities.append({"value_kg": 0.0, "uncertainty_pct": 0.0, "quality_tier": "TIER_1"})
result = calculate_category_total(activities)
result["category"] = 7
result["survey_response_rate"] = 82.0
return result
def calculate_cat11_use_of_products() -> dict:
"""Cat. 11: Energia consumata dai clienti usando il SaaS"""
# SCI = 0.045 gCO2e per ogni API call (misurato con CodeCarbon)
sci_gco2e_per_call = 0.045
avg_calls_per_month = 48_500_000 # 48.5M calls/mese (dati produzione)
months = 12
total_calls = avg_calls_per_month * months
co2e_grams = total_calls * sci_gco2e_per_call
co2e_kg = co2e_grams / 1000
activities = [
{"value_kg": co2e_kg, "uncertainty_pct": 25.0, "quality_tier": "TIER_2"}
]
result = calculate_category_total(activities)
result["category"] = 11
result["metric"] = "API calls"
result["total_calls"] = total_calls
return result
def run_full_scope3_calculation() -> dict:
"""Esegue il calcolo completo Scope 3 per SaaS Italia S.r.l."""
results = {
"cat_1_cloud": calculate_cat1_cloud(),
"cat_6_travel": calculate_cat6_business_travel(),
"cat_7_commuting": calculate_cat7_commuting(),
"cat_11_use": calculate_cat11_use_of_products(),
}
# Categoria residuale (spend-based per tutto il resto)
residual_spend_eur = 210_000 # ~5% della spesa totale
residual_co2_kg, res_unc = calculate_spend_based(
residual_spend_eur, "it_services"
)
results["cat_residual"] = {
"total_co2e_tonnes": residual_co2_kg / 1000,
"uncertainty_pct": res_unc,
"category": "other",
"dominant_quality_tier": "TIER_3"
}
# Totale Scope 3
total_tco2e = sum(
v["total_co2e_tonnes"] for v in results.values()
)
from calculators.emission_calculator import propagate_uncertainty
all_values = [v["total_co2e_tonnes"] for v in results.values()]
all_uncertainties = [v["uncertainty_pct"] for v in results.values()]
overall_uncertainty = propagate_uncertainty(all_values, all_uncertainties)
return {
"company": "SaaS Italia S.r.l.",
"reporting_year": 2024,
"methodology": "GHG Protocol Corporate Value Chain Standard",
"scope3_total_tco2e": round(total_tco2e, 1),
"overall_uncertainty_pct": round(overall_uncertainty, 1),
"categories": results,
"notes": "Cat. 11 include radiative forcing factor per aviation"
}
if __name__ == "__main__":
import json
report = run_full_scope3_calculation()
print("=" * 50)
print(f"SCOPE 3 TOTALE: {report['scope3_total_tco2e']} tCO2e")
print(f"Incertezza: +/- {report['overall_uncertainty_pct']}%")
print("=" * 50)
for name, cat in report["categories"].items():
tco2e = cat.get("total_co2e_tonnes", 0)
unc = cat.get("uncertainty_pct", 0)
pct = tco2e / report["scope3_total_tco2e"] * 100
print(f" {name:25s} {tco2e:6.1f} tCO2e ({pct:.0f}%) ±{unc:.0f}%")
Wynik kalkulacji dla SaaS Italia S.r.l. produkuje:
Wyniki końcowe zakresu 3 — SaaS Italia S.r.l. (rok 2024)
| Kategoria | tCO2e | % Całkowity | Niepewność | Schody |
|---|---|---|---|---|
| Kot. 1 – Chmura i usługi | 43,7 | 36% | ±9% | POZIOM 1 |
| Kot. 6 – Podróże służbowe | 33,5 | 28% | ±20% | POZIOM 2 |
| Kot. 7 – Dojazdy | 23.8 | 20% | ±17% | POZIOM 2 |
| Kot. 11 – Korzystanie z Produktów | 26.2 | 22% | ±25% | POZIOM 2 |
| Pozostały kot | 6.5 | 5% | ±75% | POZIOM 3 |
| CAŁKOWITY ZAKRES 3 | 133,7 | 100% | ±13% | POZIOM 2 |
Dodanie Zakresu 1 (~8 tCO2e z kotła HQ) i Zakresu 2 (~12 tCO2e z energii elektrycznej biur), otrzymujesz całkowity ślad ~154 tCO2e na rok 2024, z czego 87% to Zakres 3. Dokładnie typowy schemat firm zajmujących się oprogramowaniem.
Najlepsze praktyki i antywzorce w rurociągu zakresu 3
Lista kontrolna wdrożenia rurociągu w zakresie 3
| Obszar | Najlepsze praktyki | Anty-wzorce, których należy unikać |
|---|---|---|
| Dane | Warstwa brązowa niezmienna dla wszystkich otrzymanych danych | Zastąp surowe dane poprawionymi wersjami |
| Obliczenie | Wersja zastosowanych współczynników emisji | Użyj EF bez podawania roku i źródła |
| Niepewność | Zawsze propaguj niepewność w każdej kategorii | Podaj tylko dokładną wartość bez zakresu |
| Jakość | Wyraźny i udokumentowany wynik jakości | Mieszaj TIER 1 i TIER 3 bez rozróżnienia |
| Audyty | Łańcuch mieszający dla każdego obliczenia, weryfikowalny poza łańcuchem | Raport programu Excel nie jest wersjonowany i nie można go prześledzić |
| Dostawca | Nadaj priorytet 20 najlepszym dostawcom według wydatków/emisji | Traktuj wszystkich 50 dostawców tak samo |
| Aktualizacja | Przedstawiono roczny plan poprawy jakości | Zaakceptuj metodę wydatkową jako rozwiązanie stałe |
| Miotły | Wyraźnie dokumentuj uzasadnione wyłączenia | Wyklucz kategorie bez formalnego uzasadnienia |
Plan progresywnego doskonalenia (mapa drogowa dotycząca dojrzałości danych)
Protokół dotyczący gazów cieplarnianych wyraźnie zachęca do podejścia postępowego: lepiej to mieć obecnie niskiej jakości dane oparte na wydatkach, które nie mają żadnego charakteru. Celem jest poprawa co roku poziom jakości kategorii materiałowych:
- Rok 1 (wartość bazowa): 100% oparte na wydatkach, błąd ±75%, TIER 3
- Rok 2: 10 najlepszych dostawców opartych na aktywności, ±40%, TIER 2
- Rok 3: 20 najlepszych dostawców ze zweryfikowanymi danymi pierwotnymi, ±20%, TIER 2
- Rok 4+: Integracja EcoVadis/CDP dla wszystkich dostawców, ±10%, TIER 1
Tę postępującą poprawę można udokumentować w raporcie CSRD jako: „ewolucji metodologii” i została pozytywnie oceniona przez recenzentów.
Schemat bazy danych dla warstwy złota
Warstwa Gold wymaga schematu zaprojektowanego do obsługi szybkich zapytań agregujących w przypadku raportowania CSRD, utrzymanie identyfikowalności w łańcuchu audytów.
-- schema/scope3_gold.sql
-- Schema PostgreSQL per il Gold Layer Scope 3
-- Tabella principale: emissioni aggregate per categoria
CREATE TABLE scope3_emissions_gold (
id BIGSERIAL PRIMARY KEY,
company_id VARCHAR(50) NOT NULL,
reporting_year INTEGER NOT NULL,
scope3_category INTEGER NOT NULL CHECK (scope3_category BETWEEN 1 AND 15),
supplier_id VARCHAR(100),
-- Valori emissioni
co2e_tonnes DECIMAL(12, 3) NOT NULL,
co2_tonnes DECIMAL(12, 3),
ch4_tonnes_co2e DECIMAL(12, 3),
n2o_tonnes_co2e DECIMAL(12, 3),
-- Metodologia e qualità
calculation_method VARCHAR(30) NOT NULL, -- activity_based, spend_based, etc.
emission_factor_source VARCHAR(100) NOT NULL,
emission_factor_value DECIMAL(10, 6),
quality_tier VARCHAR(10) NOT NULL, -- TIER_1, TIER_2, TIER_3
uncertainty_pct DECIMAL(5, 1) NOT NULL,
uncertainty_tonnes DECIMAL(12, 3) GENERATED ALWAYS AS
(co2e_tonnes * uncertainty_pct / 100) STORED,
-- Tracciabilità
audit_chain_id UUID NOT NULL REFERENCES scope3_audit_chain(chain_id),
pipeline_version VARCHAR(20) NOT NULL,
reporting_standard VARCHAR(50) DEFAULT 'GHG_Protocol_Scope3_2011',
-- Timestamps
published_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Unicità per reporting period
CONSTRAINT uq_emission_period
UNIQUE (company_id, reporting_year, scope3_category, supplier_id)
);
-- Indici per performance query CSRD report
CREATE INDEX idx_scope3_company_year
ON scope3_emissions_gold (company_id, reporting_year);
CREATE INDEX idx_scope3_category
ON scope3_emissions_gold (scope3_category);
CREATE INDEX idx_scope3_quality
ON scope3_emissions_gold (quality_tier, uncertainty_pct);
-- View per report aggregato CSRD
CREATE VIEW v_scope3_csrd_report AS
SELECT
company_id,
reporting_year,
scope3_category,
SUM(co2e_tonnes) AS total_co2e_tonnes,
-- Propagazione incertezza quadratica
SQRT(SUM(POWER(co2e_tonnes * uncertainty_pct / 100, 2))) /
NULLIF(SUM(co2e_tonnes), 0) * 100 AS combined_uncertainty_pct,
-- Qualità aggregata (tier peggiore nella categoria)
MIN(quality_tier) AS data_quality_tier,
-- Metodo più usato
MODE() WITHIN GROUP (ORDER BY calculation_method) AS primary_method,
COUNT(DISTINCT supplier_id) AS supplier_count,
MAX(updated_at) AS last_updated
FROM scope3_emissions_gold
GROUP BY company_id, reporting_year, scope3_category
ORDER BY company_id, reporting_year, scope3_category;
-- Tabella audit chain
CREATE TABLE scope3_audit_chain (
chain_id UUID PRIMARY KEY,
record_id UUID NOT NULL DEFAULT gen_random_uuid(),
sequence INTEGER NOT NULL,
record_type VARCHAR(50) NOT NULL,
previous_hash CHAR(64) NOT NULL,
record_hash CHAR(64) NOT NULL UNIQUE,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT uq_chain_sequence UNIQUE (chain_id, sequence)
);
-- Indice per verifica integrità chain
CREATE INDEX idx_audit_chain_id_seq
ON scope3_audit_chain (chain_id, sequence);
Wnioski i dalsze kroki
Budowa solidnego rurociągu dla emisji z zakresu 3 nie jest zadaniem akademickim: to jestkrytyczna infrastruktura danych które staną się obowiązkowe tysięcy europejskich firm do roku 2028. Kluczowe zasady poznaliśmy w tym artykule mają zastosowanie niezależnie od wielkości firmy:
- Niezmienność surowych danych: Warstwa brązu z gwarancjami skrótu SHA-256 że każdy recenzent może zawsze prześledzić dane do pierwotnego źródła, nawet po latach.
- Postęp metodologiczny: Zacznij od wydatków i migruj w kierunku podejście oparte na aktywności w odniesieniu do kategorii materiałów jest podejściem zalecanym w Protokole dotyczącym GHG samo w sobie, a nie na skróty.
- Kwantyfikacja niepewności: Raportuj emisje bez przerwy zaufania to informacja niekompletna. Kwadratowa propagacja niepewności jest proste do wdrożenia i ma fundamentalne znaczenie dla wiarygodności raportu.
- Weryfikowalna ścieżka audytu: Łańcuch mieszający umożliwia zewnętrzny weryfikator w celu matematycznego potwierdzenia, że żadne dane nie zostały zmienione po obliczeniach.
- Integracja ekosystemów: platformy takie jak EcoVadis Carbon Data Network i CDP znacznie zmniejsza obciążenie związane z gromadzeniem danych, szczególnie w przypadku dużych łańcuchów dostaw.
Studium przypadku SaaS Italia S.r.l. pokazuje, że nawet średniej wielkości firma może sporządzić raport zgodny z Zakresem 3 CSRD w ciągu 3 miesięcy, współpracując z zespołem 2-3 osób dane pierwotne dla kategorii materiałów i oparte na wydatkach dla pozostałości. Kluczem jest priorytetyzacja: nie szukaj wszędzie doskonałości, ale skoncentruj swój wysiłek gdzie emisja jest najwyższa.
Przydatne zasoby
- Standard protokołu GHG, zakres 3: ghgprotocol.org/corporate-value-chain-scope-3-standard
- API Climatiqa (baza danych współczynników emisji): climatiq.io
- Sieć danych EcoVadis dotycząca emisji dwutlenku węgla: ecovadis.com/solutions/carbon
- ESRS E1 Zmiany klimatyczne (oficjalny tekst UE): EFRAG ESRS E1
- EXIOBAZA 3.8 (Czynniki oparte na wydatkach EEIO): exiobase.eu
Następny artykuł z serii
W następnym artykule API raportowania ESG: Integracja z przepływem pracy CSRD zbudujemy warstwę API REST na podstawie danych Scope 3 obliczonych w tym artykule, wdrażając punkty końcowe zgodne z formatami wymaganymi przez dyrektywę europejską i integrujące przepływ pracy zatwierdzenie raportu z podpisem cyfrowym audytora.
Zobaczymy również, jak eksponować dane w formacie XBRL/iXBRL do złożenia do ESEF (European Single Electronic Format), obowiązkowego formatu raportów CSRD notowana na europejskiej giełdzie papierów wartościowych.







