ESG Reporting API: Integrace s CSRD Workflow
Il 1. ledna 2024 znamenal epochální zlom pro evropské společnosti: Corporate V platnost vstoupila směrnice o podávání zpráv o udržitelnosti (CSRD), která transformuje podávání zpráv ESG od dobrovolné praxe k zákonné povinnosti s přesnými technickými normami. Předmět společnosti musí now produce compliant reports Evropské standardy pro podávání zpráv o udržitelnosti (ESRS), s ověřitelnými údaji, označenými ve formátu XBRL a podrobenými externímu ujištění.
Výzva pro technické týmy je konkrétní: sbírat ESG data z desítek podnikových systémů (ERP, energetický management, HR, logistika), agregovat je podle taxonomií ESRS, vypočítat metriky jako jsou emise rozsahu 1-3 a intenzita vody, řídit schvalovací proces víceúrovňové a produkují výstup ve formátu iXBRL připraveném pro regulátor. To vše vyžaduje jeden Dedikovaná architektura API, nikoli sdílené excelové listy.
V tomto článku sestavíme kompletní systém: od datového modelu vyhovujícího ESRS až po REST API pro sběr a validaci dat, až po konsolidaci a schvalovací pracovní postup až po generování výstupu XBRL. Pro backend použijeme Python/FastAPI, pro strukturované úložiště PostgreSQL, a budeme analyzovat, jak integrovat SAP Sustainability a Oracle ESG jako upstream zdroje dat.
Co se naučíte
- Časová osa CSRD a perimetr aplikace: kdo musí co dělat a kdy
- ESRS E1-E5, S1-S4, G1: Struktura standardů a povinné metriky
- Dvojité posouzení významnosti: dopad + implementace finančního algoritmu
- Datový model PostgreSQL pro sběr dat ESG vyhovující ESRS
- REST/GraphQL API pro sběr, validaci a agregaci dat ESG
- Automatizace pracovních postupů pro víceúrovňové schvalování a konsolidaci
- XBRL/iXBRL tagování s Pythonem: automatické generování digitální zprávy
- Integrace ERP: SAP Sustainability Footprint Management, Oracle Fusion ESG
- Datový řád a auditní záznam pro externí ujištění
- Kompletní případová studie: Střední výrobní společnost s 250 zaměstnanci
Zelená řada softwarového inženýrství
Tento článek je osmý v obsáhlé sérii o Green Software Engineering, která pokrývá od měření emisí kódu po regulační hlášení ESG:
| # | Položka | Soustředit |
|---|---|---|
| 1 | Principy zeleného softwarového inženýrství | GSF, SCI spec, 8 základních principů |
| 2 | CodeCarbon: Kód měření emisí | Knihovna Pythonu, dashboard, integrace CI/CD |
| 3 | Climatiq API: Výpočty skleníkových plynů v backendu | REST API, rozsah 1-3, integrace FastAPI |
| 4 | Carbon Aware SDK | Přesouvání pracovní zátěže, intenzita mřížky, časový posun |
| 5 | Rozsah 3 Potrubí | Emise hodnotového řetězce, dodavatelé, LCA |
| 6 | Zelené vzory architektury | Bezserverové, událostmi řízené, udržitelné ukládání do mezipaměti |
| 7 | GreenOps: Udržitelné DevOps | Zelené CI/CD, cloud rightsizing, FinOps |
| 8 | ESG Reporting API: CSRD Workflow (tento článek) | ESRS, XBRL, významnost, integrace ERP |
| 9 | AI uhlíková stopa | Školení/inference LLM, udržitelné ML |
| 10 | Pokročilé modelování rozsahu | Metodiky protokolu GHG, konkrétní sektory |
CSRD: Nové paradigma podávání zpráv o udržitelnosti
CSRD (směrnice 2022/2464/EU) nahradila směrnici o nefinančním výkaznictví (NFRD), drastické rozšíření aplikačního perimetru a zvýšení požadované kvality reportů. Zatímco NFRD bylo cca 11 000 společností evropské země, CSRD je zahrnuje 50 000+, včetně poprvé kotovaných malých a středních podniků a evropských dceřiných společností skupiny mimo EU.
Časová osa aplikace
| Datum | Předměty | První zpráva |
|---|---|---|
| LEDEN 2024 (FY 2024) | Velké společnosti již podléhají NFRD (>500 zaměstnanců) | 2025 |
| LEDEN 2025 (FY 2025) | Velké společnosti z EU (> 250 zaměstnanců NEBO > 40 milionů EUR obrat NEBO > 20 milionů aktiv) | 2026 |
| LEDEN 2026 (FY 2026) | MSP kotované na regulovaných trzích (10–250 zaměstnanců) | 2027 |
| LEDEN 2028 (FY 2028) | Dceřiné společnosti EU skupin mimo EU (>150 milionů EUR obrat v EU) | 2029 |
Souhrnné zjednodušení 2025
V únoru 2025 zveřejnila Evropská komise souhrnný balíček, který navrhuje zjednodušení významné pro CSRD: snížení rozsahu malých a středních podniků, dvouletý odklad pro společnosti z druhé vlny a 3 a zjednodušení některých datových bodů ESRS. Však, velké vlny 1 společnosti zůstávají plně předmětem. Změny jsou v současné době stále v legislativním procesu zveřejnění tohoto článku. Vždy kontrolujte aktuální stav legislativy.
Struktura ESRS
Evropské standardy pro podávání zpráv o udržitelnosti jsou uspořádány do tří tematických oblastí, plus dva standardní příčné řezy:
| Norma | Plocha | Témata | Povinný |
|---|---|---|---|
| ESRS 1 | Příčné řezání | Obecné požadavky a zásady | Si |
| ESRS 2 | Příčné řezání | Obecné informace (řízení, strategie, významnost) | Si |
| ESRS E1 | Prostředí | Změna klimatu (GHG, energie, TCFD) | Pokud materiál |
| ESRS E2 | Prostředí | Znečištění (vzduch, voda, půda, látky) | Pokud materiál |
| ESRS E3 | Prostředí | Vodní a mořské zdroje | Pokud materiál |
| ESRS E4 | Prostředí | Biodiverzita a ekosystémy | Pokud materiál |
| ESRS E5 | Prostředí | Využívání zdrojů a oběhové hospodářství | Pokud materiál |
| ESRS S1 | Sociální | Vlastní pracovní síla (pracovní podmínky, D&I, zdraví) | Pokud materiál |
| ESRS S2 | Sociální | Pracovníci v hodnotovém řetězci | Pokud materiál |
| ESRS S3 | Sociální | Postižené komunity | Pokud materiál |
| ESRS S4 | Sociální | Spotřebitelé a koncoví uživatelé | Pokud materiál |
| ESRS G1 | Vládnutí | Obchodní chování (etika, boj proti korupci, lobbing) | Pokud materiál |
ESRS E1 Climate: Metriky, které potřebujete sbírat
ESRS E1 je obvykle nejdražší téma pro sběr dat. Vyžaduje zveřejnění na emise skleníkových plynů (rozsah 1, 2, 3), vědecky zaměřené cíle snížení (SBTi), energii spotřebované podle zdroje a klimatická rizika/příležitosti podle TCFD. Podívejme se na povinné datové body nejrelevantnější:
Emise skleníkových plynů – povinné metriky ESRS E1-6
| Datové body | Jednotka | Popis |
|---|---|---|
| Hrubý rozsah 1 emise skleníkových plynů | tCO2ekv | Přímé emise z vlastních nebo kontrolovaných zdrojů |
| Hrubý rozsah 2 emisí skleníkových plynů (podle místa) | tCO2ekv | Nepřímé emise z nákupu energie, způsob lokalizace |
| Hrubé emise skleníkových plynů v rozsahu 2 (tržní) | tCO2ekv | Nepřímé emise, tržní metoda (certifikáty) |
| Hrubý rozsah emisí skleníkových plynů 3 (15 kategorií) | tCO2ekv | Nepřímé emise v hodnotovém řetězci |
| Intenzita skleníkových plynů (výnosy) | tCO2ekv. / mil. EUR | Intenzita na jednotku obratu |
| Celková spotřeba energie | MWh | Celková spotřeba energie |
| Podíl obnovitelné energie | % | Procento energie z obnovitelných zdrojů |
| Odstraňování a skladování skleníkových plynů | tCO2ekv | Odstraňování uhlíku zalesňováním, CCS atd. |
Dvojité posouzení významnosti: Algoritmická implementace
Metodickým srdcem CSRD je dvojí významnost: každá společnost musí vyhodnotit, která témata ESG jsou významná, a to zvážením dvou současných perspektiv:
- Významnost dopadu: Společnost má významné dopady (pozitivní nebo negativní, aktuální nebo potenciální) o lidech a prostředí pro toto téma?
- Finanční významnost: Toto téma ESG generuje rizika nebo příležitosti významný finanční majetek pro firmu (krátkodobý, střednědobý, dlouhodobý)?
Téma je významné, pokud splňuje alespoň jedno ze dvou kritérií. Podívejme se, jak implementovat tento proces s vyhrazeným API:
# materiality_assessment.py
# Implementazione Double Materiality Assessment per CSRD ESRS 2-IRO
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
import uuid
from datetime import datetime
class MaterialityDimension(str, Enum):
IMPACT = "impact"
FINANCIAL = "financial"
class ImpactType(str, Enum):
ACTUAL_NEGATIVE = "actual_negative"
ACTUAL_POSITIVE = "actual_positive"
POTENTIAL_NEGATIVE = "potential_negative"
POTENTIAL_POSITIVE = "potential_positive"
class TimeHorizon(str, Enum):
SHORT_TERM = "short_term" # 0-1 anno
MEDIUM_TERM = "medium_term" # 1-5 anni
LONG_TERM = "long_term" # oltre 5 anni
@dataclass
class ImpactMaterialityScore:
"""
Score impact materiality per un topic ESG.
Basato su: severity x likelihood (per impatti negativi)
o scale x likelihood (per impatti positivi)
"""
topic_id: str
impact_type: ImpactType
# Score 1-5 per ogni dimensione
scale: int # Quante persone/ecosistemi impattati
scope: int # Reversibilita' dell'impatto
irremediable: int # Difficolta' di rimediare
likelihood: int # Probabilità' che accada
@property
def severity(self) -> float:
"""Severity = media pesata di scale, scope, irremediable"""
return (self.scale * 0.4 + self.scope * 0.3 + self.irremediable * 0.3)
@property
def score(self) -> float:
"""Score finale 1-25 per ordinamento"""
return self.severity * self.likelihood
@property
def is_material(self) -> bool:
"""Soglia materialita': score >= 9 (severity >= 3, likelihood >= 3)"""
return self.score >= 9.0
@dataclass
class FinancialMaterialityScore:
"""
Score financial materiality per un topic ESG.
Basato su: magnitude x likelihood, per time horizon
"""
topic_id: str
is_risk: bool # True = rischio, False = opportunità'
magnitude: int # 1-5: impatto finanziario potenziale
likelihood: int # 1-5: probabilità'
time_horizon: TimeHorizon
# Fattori qualitativi
quantifiable: bool # Possiamo quantificarlo in EUR?
estimated_impact_eur: Optional[float] = None
@property
def score(self) -> float:
"""Score base con discount per time horizon"""
base = self.magnitude * self.likelihood
# Discount: short=1.0, medium=0.8, long=0.6
discount = {
TimeHorizon.SHORT_TERM: 1.0,
TimeHorizon.MEDIUM_TERM: 0.8,
TimeHorizon.LONG_TERM: 0.6
}[self.time_horizon]
return base * discount
@property
def is_material(self) -> bool:
return self.score >= 7.5 # magnitude >= 3, likelihood >= 3 con discount short
@dataclass
class TopicMaterialityResult:
topic_id: str
topic_name: str
esrs_standard: str
impact_scores: list[ImpactMaterialityScore] = field(default_factory=list)
financial_scores: list[FinancialMaterialityScore] = field(default_factory=list)
@property
def impact_material(self) -> bool:
return any(s.is_material for s in self.impact_scores)
@property
def financial_material(self) -> bool:
return any(s.is_material for s in self.financial_scores)
@property
def is_material(self) -> bool:
"""Double materiality: materiale se almeno una dimensione e' materiale"""
return self.impact_material or self.financial_material
@property
def materiality_level(self) -> str:
if self.impact_material and self.financial_material:
return "DOUBLE_MATERIAL"
elif self.impact_material:
return "IMPACT_MATERIAL"
elif self.financial_material:
return "FINANCIAL_MATERIAL"
else:
return "NOT_MATERIAL"
def run_materiality_assessment(
company_id: str,
topics: list[dict]
) -> list[TopicMaterialityResult]:
"""
Esegue il materiality assessment completo.
Input: lista topic con scores raccolti via stakeholder engagement.
Output: ranking per materialita'.
"""
results = []
for topic_data in topics:
result = TopicMaterialityResult(
topic_id=topic_data["topic_id"],
topic_name=topic_data["topic_name"],
esrs_standard=topic_data["esrs_standard"]
)
# Aggiungi impact scores
for impact in topic_data.get("impact_scores", []):
result.impact_scores.append(ImpactMaterialityScore(**impact))
# Aggiungi financial scores
for financial in topic_data.get("financial_scores", []):
result.financial_scores.append(FinancialMaterialityScore(**financial))
results.append(result)
# Ordina per materialita' discendente
return sorted(
results,
key=lambda r: (
r.is_material,
r.impact_material and r.financial_material,
max((s.score for s in r.impact_scores), default=0) +
max((s.score for s in r.financial_scores), default=0)
),
reverse=True
)
Datový model PostgreSQL pro CSRD
Systém vyhovující CSRD vyžaduje databázové schéma, které odráží strukturu ESRS, podporuje datovou linii a udržuje historii změn pro auditní záznam. Tady to je základní schéma:
-- Schema PostgreSQL per CSRD ESG Data Collection
-- Ottimizzato per ESRS data points con audit trail completo
-- Tabella aziende e perimetro di consolidamento
CREATE TABLE companies (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
legal_name VARCHAR(255) NOT NULL,
lei_code VARCHAR(20), -- Legal Entity Identifier
country_code CHAR(2) NOT NULL,
nace_code VARCHAR(10), -- Classificazione attivita'
fiscal_year_end DATE,
employees_fte DECIMAL(10,2),
revenue_eur DECIMAL(20,2),
total_assets_eur DECIMAL(20,2),
is_parent BOOLEAN DEFAULT FALSE,
parent_id UUID REFERENCES companies(id),
csrd_wave INTEGER, -- 1, 2, 3, 4
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Periodi di reporting
CREATE TABLE reporting_periods (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id UUID NOT NULL REFERENCES companies(id),
year INTEGER NOT NULL,
period_type VARCHAR(20) DEFAULT 'ANNUAL',
start_date DATE NOT NULL,
end_date DATE NOT NULL,
status VARCHAR(30) DEFAULT 'DRAFT',
-- DRAFT, COLLECTING, REVIEWING, APPROVED, FILED
submitted_at TIMESTAMPTZ,
approved_by UUID,
UNIQUE (company_id, year, period_type)
);
-- Catalog ESRS data points (reference table)
CREATE TABLE esrs_data_points (
id VARCHAR(50) PRIMARY KEY,
-- es. "E1-6_GrossScope1", "S1-7_GenderPayGap"
esrs_standard VARCHAR(10) NOT NULL, -- E1, E2, S1, G1...
topic VARCHAR(100) NOT NULL,
sub_topic VARCHAR(100),
name VARCHAR(255) NOT NULL,
description TEXT,
unit_of_measure VARCHAR(50),
data_type VARCHAR(20), -- NUMERIC, BOOLEAN, TEXT, DATE, ENUM
is_mandatory BOOLEAN DEFAULT FALSE,
is_phase_in BOOLEAN DEFAULT FALSE, -- Permesso rinvio anni 1-3
ghg_protocol_scope VARCHAR(10), -- S1, S2_LB, S2_MB, S3
scope3_category INTEGER, -- 1-15 per Scope 3
tags TEXT[]
);
-- Raccolta dati ESG per periodo
CREATE TABLE esg_data_submissions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id UUID NOT NULL REFERENCES companies(id),
reporting_period_id UUID NOT NULL REFERENCES reporting_periods(id),
data_point_id VARCHAR(50) NOT NULL REFERENCES esrs_data_points(id),
-- Valori (uno solo populated in base a data_type)
value_numeric DECIMAL(30, 6),
value_boolean BOOLEAN,
value_text TEXT,
value_date DATE,
-- Metadata qualità'
unit_of_measure VARCHAR(50),
estimation_method VARCHAR(100),
confidence_level VARCHAR(20), -- HIGH, MEDIUM, LOW
is_estimated BOOLEAN DEFAULT FALSE,
-- Data lineage
source_system VARCHAR(100), -- SAP, Oracle, Manual, API
source_document_ref VARCHAR(255),
collected_by UUID,
collection_timestamp TIMESTAMPTZ DEFAULT NOW(),
-- Workflow
status VARCHAR(30) DEFAULT 'DRAFT',
-- DRAFT, SUBMITTED, REVIEWED, APPROVED, REJECTED
reviewed_by UUID,
review_timestamp TIMESTAMPTZ,
review_comment TEXT,
approved_by UUID,
approval_timestamp TIMESTAMPTZ,
-- Versioning per audit
version INTEGER DEFAULT 1,
previous_version_id UUID REFERENCES esg_data_submissions(id),
CONSTRAINT unique_submission UNIQUE (
company_id, reporting_period_id, data_point_id, version
)
);
-- Indici per performance
CREATE INDEX idx_submissions_period ON esg_data_submissions(reporting_period_id);
CREATE INDEX idx_submissions_status ON esg_data_submissions(status);
CREATE INDEX idx_submissions_datapoint ON esg_data_submissions(data_point_id);
-- Audit log immutabile
CREATE TABLE audit_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
entity_type VARCHAR(50) NOT NULL,
entity_id UUID NOT NULL,
action VARCHAR(20) NOT NULL, -- CREATE, UPDATE, DELETE, APPROVE
actor_id UUID NOT NULL,
actor_email VARCHAR(255),
timestamp TIMESTAMPTZ DEFAULT NOW(),
ip_address INET,
old_value JSONB,
new_value JSONB,
change_reason TEXT
);
-- View aggregata emissioni GHG per periodo
CREATE OR REPLACE VIEW v_ghg_emissions_summary AS
SELECT
c.legal_name,
rp.year,
SUM(CASE WHEN dp.id LIKE 'E1%Scope1%' THEN eds.value_numeric ELSE 0 END) AS scope1_tco2eq,
SUM(CASE WHEN dp.id LIKE 'E1%Scope2%LB%' THEN eds.value_numeric ELSE 0 END) AS scope2_lb_tco2eq,
SUM(CASE WHEN dp.id LIKE 'E1%Scope2%MB%' THEN eds.value_numeric ELSE 0 END) AS scope2_mb_tco2eq,
SUM(CASE WHEN dp.id LIKE 'E1%Scope3%' THEN eds.value_numeric ELSE 0 END) AS scope3_tco2eq,
SUM(CASE WHEN dp.id = 'E1-5_TotalEnergy' THEN eds.value_numeric ELSE 0 END) AS total_energy_mwh,
SUM(CASE WHEN dp.id = 'E1-5_RenewableEnergy' THEN eds.value_numeric ELSE 0 END) AS renewable_mwh
FROM esg_data_submissions eds
JOIN companies c ON c.id = eds.company_id
JOIN reporting_periods rp ON rp.id = eds.reporting_period_id
JOIN esrs_data_points dp ON dp.id = eds.data_point_id
WHERE eds.status = 'APPROVED'
GROUP BY c.legal_name, rp.year;
Architektura API: Shromažďování a ověřování dat ESG
API architektura systému CSRD je rozdělena do tří hlavních vrstev: sběr dat z obchodní jednotky a zdrojové systémy, validace a obohacování podle pravidel ESRS, e agregace pro konsolidaci skupiny. Pojďme implementovat backend s FastAPI:
# main.py - FastAPI ESG Reporting API
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from uuid import UUID
from datetime import datetime, date
from decimal import Decimal
import asyncpg
import json
app = FastAPI(
title="CSRD ESG Reporting API",
description="API per raccolta e gestione dati ESG conformi ESRS",
version="1.0.0"
)
# === PYDANTIC MODELS ===
class ESGDataPointSubmission(BaseModel):
data_point_id: str = Field(..., description="ID ESRS data point (es. E1-6_GrossScope1)")
value_numeric: Optional[Decimal] = None
value_boolean: Optional[bool] = None
value_text: Optional[str] = None
value_date: Optional[date] = None
unit_of_measure: Optional[str] = None
estimation_method: Optional[str] = None
confidence_level: str = Field(default="MEDIUM", pattern="^(HIGH|MEDIUM|LOW)$")
is_estimated: bool = False
source_system: Optional[str] = None
source_document_ref: Optional[str] = None
@validator('value_numeric')
def validate_numeric(cls, v, values):
if v is not None and v < 0:
raise ValueError('Valori numerici GHG non possono essere negativi')
return v
class BulkESGSubmission(BaseModel):
reporting_period_id: UUID
submissions: List[ESGDataPointSubmission]
submission_note: Optional[str] = None
class DataPointValidationResult(BaseModel):
data_point_id: str
is_valid: bool
errors: List[str] = []
warnings: List[str] = []
class ValidationReport(BaseModel):
period_id: UUID
total_mandatory_points: int
submitted_points: int
approved_points: int
coverage_pct: float
validation_results: List[DataPointValidationResult]
overall_valid: bool
# === VALIDATION SERVICE ===
class ESRSValidationService:
"""
Validazione regole ESRS per data points.
Implementa controlli incrociati richiesti dallo standard.
"""
CONSISTENCY_RULES = [
{
"rule": "SCOPE1_PLUS_SCOPE2_CONSISTENCY",
"description": "Total GHG = Scope1 + Scope2 (market) deve essere coerente",
"severity": "ERROR"
},
{
"rule": "ENERGY_GHG_RATIO",
"description": "Intensità' GHG su energia deve essere plausibile",
"severity": "WARNING"
},
{
"rule": "RENEWABLE_WITHIN_TOTAL",
"description": "Energia rinnovabile non può' superare totale energia",
"severity": "ERROR"
},
{
"rule": "YOY_VARIANCE_THRESHOLD",
"description": "Variazione YoY > 30% richiede commento",
"severity": "WARNING"
}
]
async def validate_submission(
self,
submissions: List[ESGDataPointSubmission],
period_id: UUID,
db_pool
) -> ValidationReport:
errors_by_point = {}
warnings_by_point = {}
# 1. Validazione singoli data point
for sub in submissions:
errors = []
warnings = []
# Carica definizione data point da catalogo
dp_def = await self._get_data_point_def(sub.data_point_id, db_pool)
if not dp_def:
errors.append(f"Data point {sub.data_point_id} non trovato nel catalogo ESRS")
else:
# Verifica unita' di misura
if dp_def['unit_of_measure'] and sub.unit_of_measure:
if sub.unit_of_measure != dp_def['unit_of_measure']:
warnings.append(
f"Unit of measure {sub.unit_of_measure} diversa da "
f"attesa {dp_def['unit_of_measure']}"
)
# Verifica tipo dato
if dp_def['data_type'] == 'NUMERIC' and sub.value_numeric is None:
errors.append(f"Data point numerico richiede value_numeric")
errors_by_point[sub.data_point_id] = errors
warnings_by_point[sub.data_point_id] = warnings
# 2. Controlli incrociati
await self._run_cross_checks(submissions, errors_by_point, warnings_by_point)
# 3. Calcola copertura mandatory
mandatory_coverage = await self._calc_mandatory_coverage(period_id, submissions, db_pool)
results = [
DataPointValidationResult(
data_point_id=dp_id,
is_valid=len(errs) == 0,
errors=errs,
warnings=warnings_by_point.get(dp_id, [])
)
for dp_id, errs in errors_by_point.items()
]
overall_valid = all(r.is_valid for r in results)
return ValidationReport(
period_id=period_id,
total_mandatory_points=mandatory_coverage['total'],
submitted_points=mandatory_coverage['submitted'],
approved_points=mandatory_coverage['approved'],
coverage_pct=mandatory_coverage['coverage_pct'],
validation_results=results,
overall_valid=overall_valid
)
async def _run_cross_checks(self, submissions, errors_by_point, warnings_by_point):
"""Controlli incrociati tra data points correlati."""
values = {s.data_point_id: s.value_numeric for s in submissions if s.value_numeric}
# Energia rinnovabile <= totale energia
renewable = values.get('E1-5_RenewableEnergy')
total_energy = values.get('E1-5_TotalEnergy')
if renewable and total_energy and renewable > total_energy:
errors_by_point.setdefault('E1-5_RenewableEnergy', []).append(
"Energia rinnovabile non può' superare energia totale"
)
async def _get_data_point_def(self, dp_id: str, db_pool):
async with db_pool.acquire() as conn:
return await conn.fetchrow(
"SELECT * FROM esrs_data_points WHERE id = $1", dp_id
)
async def _calc_mandatory_coverage(self, period_id, submissions, db_pool):
async with db_pool.acquire() as conn:
total = await conn.fetchval(
"SELECT COUNT(*) FROM esrs_data_points WHERE is_mandatory = TRUE"
)
submitted = len([s for s in submissions if s.value_numeric is not None
or s.value_boolean is not None or s.value_text is not None])
return {
'total': total or 0,
'submitted': submitted,
'approved': 0,
'coverage_pct': (submitted / total * 100) if total else 0
}
# === ENDPOINTS ===
@app.post("/api/v1/periods/{period_id}/submissions/bulk")
async def bulk_submit_esg_data(
period_id: UUID,
payload: BulkESGSubmission,
background_tasks: BackgroundTasks,
db_pool = Depends(get_db_pool)
):
"""
Submission bulk di dati ESG per un periodo.
Accetta fino a 500 data points per chiamata.
"""
if len(payload.submissions) > 500:
raise HTTPException(400, "Max 500 data points per richiesta")
validator = ESRSValidationService()
validation_report = await validator.validate_submission(
payload.submissions, period_id, db_pool
)
if not validation_report.overall_valid:
raise HTTPException(422, {
"message": "Validation failed",
"validation_report": validation_report.dict()
})
# Salva in batch
async with db_pool.acquire() as conn:
async with conn.transaction():
for sub in payload.submissions:
await conn.execute("""
INSERT INTO esg_data_submissions
(company_id, reporting_period_id, data_point_id,
value_numeric, value_boolean, value_text,
unit_of_measure, estimation_method, confidence_level,
is_estimated, source_system, source_document_ref,
status)
SELECT
rp.company_id, $1, $2,
$3, $4, $5,
$6, $7, $8,
$9, $10, $11,
'SUBMITTED'
FROM reporting_periods rp WHERE rp.id = $1
ON CONFLICT (company_id, reporting_period_id, data_point_id, version)
DO UPDATE SET
value_numeric = EXCLUDED.value_numeric,
status = 'SUBMITTED',
collection_timestamp = NOW()
""",
period_id,
str(sub.data_point_id),
sub.value_numeric,
sub.value_boolean,
sub.value_text,
sub.unit_of_measure,
sub.estimation_method,
sub.confidence_level,
sub.is_estimated,
sub.source_system,
sub.source_document_ref
)
# Trigger notifica al reviewer in background
background_tasks.add_task(notify_reviewers, period_id, len(payload.submissions))
return {
"status": "submitted",
"period_id": str(period_id),
"submitted_count": len(payload.submissions),
"validation_report": validation_report.dict()
}
@app.get("/api/v1/periods/{period_id}/validation-report")
async def get_validation_report(
period_id: UUID,
db_pool = Depends(get_db_pool)
):
"""Restituisce report di completezza e validità per un periodo."""
async with db_pool.acquire() as conn:
# Aggregazione stato submissions per mandatory data points
rows = await conn.fetch("""
SELECT
dp.id as data_point_id,
dp.name,
dp.is_mandatory,
dp.unit_of_measure,
eds.status,
eds.value_numeric,
eds.confidence_level
FROM esrs_data_points dp
LEFT JOIN esg_data_submissions eds ON
eds.data_point_id = dp.id
AND eds.reporting_period_id = $1
WHERE dp.is_mandatory = TRUE
ORDER BY dp.esrs_standard, dp.id
""", period_id)
missing_mandatory = [r for r in rows if r['status'] is None]
approved = [r for r in rows if r['status'] == 'APPROVED']
return {
"period_id": str(period_id),
"total_mandatory": len(rows),
"approved": len(approved),
"missing": len(missing_mandatory),
"coverage_pct": round(len(approved) / len(rows) * 100, 1) if rows else 0,
"missing_data_points": [
{"id": r['data_point_id'], "name": r['name']}
for r in missing_mandatory
]
}
async def notify_reviewers(period_id: UUID, count: int):
"""Background task: notifica reviewer via email/webhook."""
print(f"[NOTIFY] Period {period_id}: {count} data points submitted for review")
async def get_db_pool():
"""Dependency injection per asyncpg pool."""
# In produzione: pool globale inizializzato al startup
pool = await asyncpg.create_pool(
"postgresql://user:password@localhost/esg_db",
min_size=5,
max_size=20
)
try:
yield pool
finally:
await pool.close()
Automatizace pracovního postupu: Víceúrovňové schválení
Podnikový systém CSRD vyžaduje strukturovaný pracovní postup, který spravuje sběr z více obchodních jednotek, kontrolní a schvalovací cykly a konečná konsolidace. Zde je implementace motoru workflow:
# workflow_engine.py
# Workflow CSRD multi-livello: collect -> review -> approve -> file
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional
import asyncio
class WorkflowState(str, Enum):
DRAFT = "DRAFT"
DATA_COLLECTION = "DATA_COLLECTION"
INTERNAL_REVIEW = "INTERNAL_REVIEW"
CFO_APPROVAL = "CFO_APPROVAL"
EXTERNAL_ASSURANCE = "EXTERNAL_ASSURANCE"
BOARD_APPROVAL = "BOARD_APPROVAL"
FILED = "FILED"
REJECTED = "REJECTED"
class WorkflowTransition:
"""Definisce una transizione di stato valida con condizioni e azioni."""
def __init__(
self,
from_state: WorkflowState,
to_state: WorkflowState,
action: str,
condition: Optional[Callable] = None,
pre_actions: list = None,
post_actions: list = None
):
self.from_state = from_state
self.to_state = to_state
self.action = action
self.condition = condition
self.pre_actions = pre_actions or []
self.post_actions = post_actions or []
# Definizione FSM per workflow CSRD
CSRD_WORKFLOW_TRANSITIONS = [
WorkflowTransition(
from_state=WorkflowState.DRAFT,
to_state=WorkflowState.DATA_COLLECTION,
action="START_COLLECTION",
post_actions=["send_collection_invites", "set_deadlines"]
),
WorkflowTransition(
from_state=WorkflowState.DATA_COLLECTION,
to_state=WorkflowState.INTERNAL_REVIEW,
action="SUBMIT_FOR_REVIEW",
condition=lambda ctx: ctx.get("mandatory_coverage_pct", 0) >= 80,
pre_actions=["run_validation", "generate_completeness_report"]
),
WorkflowTransition(
from_state=WorkflowState.INTERNAL_REVIEW,
to_state=WorkflowState.DATA_COLLECTION,
action="REQUEST_CORRECTIONS",
post_actions=["notify_data_owners", "log_review_comments"]
),
WorkflowTransition(
from_state=WorkflowState.INTERNAL_REVIEW,
to_state=WorkflowState.CFO_APPROVAL,
action="APPROVE_INTERNAL_REVIEW",
condition=lambda ctx: ctx.get("reviewer_approved", False),
pre_actions=["generate_draft_report"]
),
WorkflowTransition(
from_state=WorkflowState.CFO_APPROVAL,
to_state=WorkflowState.EXTERNAL_ASSURANCE,
action="CFO_SIGN_OFF",
post_actions=["prepare_assurance_package", "notify_auditor"]
),
WorkflowTransition(
from_state=WorkflowState.EXTERNAL_ASSURANCE,
to_state=WorkflowState.BOARD_APPROVAL,
action="ASSURANCE_COMPLETE",
condition=lambda ctx: ctx.get("assurance_opinion") in ["UNQUALIFIED", "QUALIFIED"],
post_actions=["attach_assurance_report"]
),
WorkflowTransition(
from_state=WorkflowState.BOARD_APPROVAL,
to_state=WorkflowState.FILED,
action="BOARD_APPROVE_AND_FILE",
post_actions=["generate_ixbrl", "submit_to_oam", "publish_report"]
),
]
class CSRDWorkflowEngine:
def __init__(self, db_pool):
self.db_pool = db_pool
self.transitions = {
(t.from_state, t.action): t
for t in CSRD_WORKFLOW_TRANSITIONS
}
async def execute_transition(
self,
period_id: str,
action: str,
actor_id: str,
context: dict = None
) -> dict:
"""
Esegue una transizione di workflow per un periodo di reporting.
Restituisce il nuovo stato o errore se la transizione non e' valida.
"""
context = context or {}
async with self.db_pool.acquire() as conn:
# Carica stato corrente
period = await conn.fetchrow(
"SELECT status, company_id FROM reporting_periods WHERE id = $1",
period_id
)
if not period:
raise ValueError(f"Periodo {period_id} non trovato")
current_state = WorkflowState(period['status'])
transition_key = (current_state, action)
if transition_key not in self.transitions:
raise ValueError(
f"Transizione {action} non valida da stato {current_state.value}"
)
transition = self.transitions[transition_key]
# Verifica condizione (arricchisci context con dati DB se necessario)
if transition.condition:
enriched_ctx = await self._enrich_context(period_id, context, conn)
if not transition.condition(enriched_ctx):
raise ValueError(
f"Condizione per {action} non soddisfatta: "
f"mandatory coverage = {enriched_ctx.get('mandatory_coverage_pct', 0)}%"
)
# Esegui pre-actions
for pre_action in transition.pre_actions:
await self._execute_action(pre_action, period_id, context, conn)
# Aggiorna stato
await conn.execute(
"UPDATE reporting_periods SET status = $1, updated_at = NOW() WHERE id = $2",
transition.to_state.value,
period_id
)
# Audit log
await conn.execute("""
INSERT INTO audit_log (entity_type, entity_id, action, actor_id, new_value)
VALUES ('reporting_period', $1, $2, $3, $4)
""", period_id, action, actor_id,
json.dumps({"from": current_state.value, "to": transition.to_state.value}))
# Esegui post-actions in background (notifiche, generazione documenti)
for post_action in transition.post_actions:
asyncio.create_task(
self._execute_action_background(post_action, period_id, context)
)
return {
"period_id": period_id,
"previous_state": current_state.value,
"new_state": transition.to_state.value,
"action": action,
"actor_id": actor_id,
"timestamp": datetime.now().isoformat()
}
async def _enrich_context(self, period_id: str, context: dict, conn) -> dict:
"""Arricchisce context con dati calcolati dal DB."""
mandatory_total = await conn.fetchval(
"SELECT COUNT(*) FROM esrs_data_points WHERE is_mandatory = TRUE"
)
approved = await conn.fetchval("""
SELECT COUNT(*) FROM esg_data_submissions
WHERE reporting_period_id = $1 AND status = 'APPROVED'
""", period_id)
enriched = dict(context)
enriched['mandatory_coverage_pct'] = (
(approved / mandatory_total * 100) if mandatory_total else 0
)
return enriched
async def _execute_action(self, action_name: str, period_id: str, context: dict, conn):
"""Esegue un'azione sincrona nel workflow."""
action_map = {
"run_validation": self._action_run_validation,
"generate_completeness_report": self._action_gen_completeness,
"generate_draft_report": self._action_gen_draft,
}
if action_name in action_map:
await action_map[action_name](period_id, context, conn)
async def _action_run_validation(self, period_id, context, conn):
print(f"[WORKFLOW] Running validation for period {period_id}")
async def _action_gen_completeness(self, period_id, context, conn):
print(f"[WORKFLOW] Generating completeness report for {period_id}")
async def _action_gen_draft(self, period_id, context, conn):
print(f"[WORKFLOW] Generating draft CSRD report for {period_id}")
async def _execute_action_background(self, action_name: str, period_id: str, context: dict):
"""Esegue azione in background (notifiche, documenti)."""
print(f"[BACKGROUND] Executing {action_name} for period {period_id}")
XBRL/iXBRL Tagging: Digitální generování zpráv
CSRD požaduje, aby výroční zpráva byla vypracována ve formátu XHTML s tagováním iXBRL (Inline XBRL), který vkládá strukturovaná data přímo do čitelného HTML. EFRAG publikoval ESRS XBRL Taxonomy, která mapuje každý datový bod na jeho značku iXBRL. Implementujeme automatické generování:
# xbrl_generator.py
# Generazione iXBRL per CSRD secondo ESRS Taxonomy
from lxml import etree
from decimal import Decimal
from typing import Optional
import uuid
# ESRS XBRL Taxonomy namespace
ESRS_NS = "https://xbrl.efrag.org/taxonomy/draft-esrs/2022-11-22"
XBRLI_NS = "http://www.xbrl.org/2003/instance"
IX_NS = "http://www.xbrl.org/2013/inlineXBRL"
LINK_NS = "http://www.xbrl.org/2003/linkbase"
class ESRSXBRLGenerator:
"""
Genera documenti iXBRL conformi alla ESRS Taxonomy.
Output: XHTML con embedded XBRL facts.
"""
def __init__(self, company_data: dict, period_data: dict):
self.company = company_data
self.period = period_data
self.context_id = f"ctx_{uuid.uuid4().hex[:8]}"
self.facts = []
def add_fact(
self,
element_name: str,
value,
unit: Optional[str] = None,
decimals: int = 0,
scale: int = 0
):
"""Aggiunge un fact XBRL al report."""
self.facts.append({
"element": element_name,
"value": value,
"unit": unit,
"decimals": decimals,
"scale": scale
})
def generate_ixbrl(self) -> str:
"""
Genera il documento iXBRL completo.
Returns: stringa XHTML con tagging iXBRL embedded.
"""
# Namespace map per il documento
nsmap = {
"ix": IX_NS,
"xbrli": XBRLI_NS,
"esrs": ESRS_NS,
None: "http://www.w3.org/1999/xhtml"
}
# Root XHTML
html = etree.Element("html", nsmap=nsmap)
head = etree.SubElement(html, "head")
body = etree.SubElement(html, "body")
# Header XBRL nel head
ix_header = etree.SubElement(head, f"{{IX_NS}}header")
hidden = etree.SubElement(ix_header, f"{{IX_NS}}hidden")
# Context: entità' e periodo
resources = etree.SubElement(ix_header, f"{{IX_NS}}resources")
xbrli_context = etree.SubElement(resources, f"{{XBRLI_NS}}context")
xbrli_context.set("id", self.context_id)
entity = etree.SubElement(xbrli_context, f"{{XBRLI_NS}}entity")
identifier = etree.SubElement(entity, f"{{XBRLI_NS}}identifier")
identifier.set("scheme", "http://www.lei.info")
identifier.text = self.company.get("lei_code", "000000000000000000")
period = etree.SubElement(xbrli_context, f"{{XBRLI_NS}}period")
start_date = etree.SubElement(period, f"{{XBRLI_NS}}startDate")
start_date.text = self.period.get("start_date", "2024-01-01")
end_date = etree.SubElement(period, f"{{XBRLI_NS}}endDate")
end_date.text = self.period.get("end_date", "2024-12-31")
# Unit monetaria e tCO2eq
unit_co2 = etree.SubElement(resources, f"{{XBRLI_NS}}unit")
unit_co2.set("id", "tCO2eq")
measure_co2 = etree.SubElement(unit_co2, f"{{XBRLI_NS}}measure")
measure_co2.text = "esrs:metricTon"
# Body: report HTML con facts inline
h1 = etree.SubElement(body, "h1")
h1.text = f"CSRD Sustainability Report {self.period.get('year', 2024)}"
h2 = etree.SubElement(body, "h2")
h2.text = "E1 - Climate Change: GHG Emissions"
# Tabella emissioni con tagging iXBRL inline
table = etree.SubElement(body, "table")
thead = etree.SubElement(table, "thead")
tr_head = etree.SubElement(thead, "tr")
etree.SubElement(tr_head, "th").text = "Metric"
etree.SubElement(tr_head, "th").text = "Value"
etree.SubElement(tr_head, "th").text = "Unit"
tbody = etree.SubElement(table, "tbody")
# Aggiungi facts come righe di tabella con tag iXBRL
for fact in self.facts:
tr = etree.SubElement(tbody, "tr")
etree.SubElement(tr, "td").text = fact["element"].split(":")[-1]
td_value = etree.SubElement(tr, "td")
# Tag iXBRL inline per il valore
ix_nonfraction = etree.SubElement(
td_value,
f"{{IX_NS}}nonFraction"
)
ix_nonfraction.set("name", fact["element"])
ix_nonfraction.set("contextRef", self.context_id)
ix_nonfraction.set("unitRef", fact.get("unit", "tCO2eq"))
ix_nonfraction.set("decimals", str(fact.get("decimals", 0)))
ix_nonfraction.set("scale", str(fact.get("scale", 0)))
ix_nonfraction.set("format", "ixt:num-dot-decimal")
ix_nonfraction.text = str(fact["value"])
etree.SubElement(tr, "td").text = fact.get("unit", "tCO2eq")
# Serializza come XHTML
return etree.tostring(
html,
pretty_print=True,
xml_declaration=True,
encoding="UTF-8",
method="xml"
).decode("utf-8")
def generate_csrd_report_from_db(approved_submissions: list, company: dict, period: dict) -> str:
"""
Genera report iXBRL da submissions approvate nel DB.
"""
generator = ESRSXBRLGenerator(company, period)
# Mapping data_point_id -> XBRL element name
ESRS_XBRL_MAP = {
"E1-6_GrossScope1": "esrs:GrossScope1GHGEmissions",
"E1-6_GrossScope2LB": "esrs:GrossScope2GHGEmissionsLocationBased",
"E1-6_GrossScope2MB": "esrs:GrossScope2GHGEmissionsMarketBased",
"E1-6_GrossScope3Total": "esrs:GrossScope3GHGEmissions",
"E1-5_TotalEnergy": "esrs:TotalEnergyConsumption",
"E1-5_RenewableEnergy": "esrs:EnergyConsumptionFromRenewableSources",
"E1-5_NonRenewableEnergy": "esrs:EnergyConsumptionFromNonRenewableSources",
}
for sub in approved_submissions:
element = ESRS_XBRL_MAP.get(sub['data_point_id'])
if element and sub['value_numeric'] is not None:
generator.add_fact(
element_name=element,
value=float(sub['value_numeric']),
unit=sub.get('unit_of_measure', 'tCO2eq'),
decimals=0
)
return generator.generate_ixbrl()
Integrace ERP: SAP Sustainability a Oracle ESG
Většina dat ESG se již nachází v podnikových systémech ERP: spotřeba energie v modulu PM/PM, HR data v SuccessFactors, logistika v modulu TM. Realizujeme konektory pro dvě hlavní platformy podnikového trhu:
# erp_connectors.py
# Connettori per SAP Sustainability Footprint Management (SFM)
# e Oracle Fusion ESG
import httpx
import asyncio
from abc import ABC, abstractmethod
from typing import Optional
class ERPConnector(ABC):
"""Interfaccia base per connettori ERP."""
@abstractmethod
async def get_energy_consumption(self, year: int, entity_id: str) -> dict:
pass
@abstractmethod
async def get_ghg_emissions(self, year: int, entity_id: str) -> dict:
pass
@abstractmethod
async def get_workforce_metrics(self, year: int, entity_id: str) -> dict:
pass
class SAPSustainabilityConnector(ERPConnector):
"""
Connettore per SAP Sustainability Footprint Management (SFM).
Usa SAP OData API v4.
API base: /sap/opu/odata4/sap/api_sustainability_footprint/
"""
def __init__(
self,
base_url: str,
client_id: str,
client_secret: str,
token_url: str
):
self.base_url = base_url.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self._token: Optional[str] = None
self._client = httpx.AsyncClient(timeout=30)
async def _get_token(self) -> str:
"""OAuth2 client credentials per SAP BTP."""
if self._token:
return self._token
response = await self._client.post(
self.token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
}
)
response.raise_for_status()
self._token = response.json()["access_token"]
return self._token
async def _get(self, path: str, params: dict = None) -> dict:
token = await self._get_token()
response = await self._client.get(
f"{self.base_url}{path}",
headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
params=params
)
response.raise_for_status()
return response.json()
async def get_energy_consumption(self, year: int, entity_id: str) -> dict:
"""
Estrae consumo energetico da SAP SFM per anno e entità'.
OData endpoint: /EnergyConsumption
"""
data = await self._get(
"/EnergyConsumption",
params={
"$filter": f"Year eq {year} and EntityId eq '{entity_id}'",
"$select": "Year,EntityId,EnergyType,Quantity,Unit,Source"
}
)
result = {"total_mwh": 0, "renewable_mwh": 0, "non_renewable_mwh": 0, "by_source": []}
for record in data.get("value", []):
qty_mwh = self._convert_to_mwh(record["Quantity"], record["Unit"])
result["total_mwh"] += qty_mwh
result["by_source"].append({
"source": record["EnergyType"],
"mwh": qty_mwh,
"is_renewable": record["EnergyType"] in [
"SOLAR", "WIND", "HYDRO", "BIOMASS", "GEOTHERMAL"
]
})
if record["EnergyType"] in ["SOLAR", "WIND", "HYDRO", "BIOMASS", "GEOTHERMAL"]:
result["renewable_mwh"] += qty_mwh
else:
result["non_renewable_mwh"] += qty_mwh
return result
async def get_ghg_emissions(self, year: int, entity_id: str) -> dict:
"""Estrae emissioni GHG da SAP SFM, suddivise per scope."""
data = await self._get(
"/GHGEmission",
params={
"$filter": f"Year eq {year} and EntityId eq '{entity_id}'",
"$select": "Year,EntityId,GHGScope,Category,EmissionTCO2e"
}
)
result = {"scope1": 0, "scope2_lb": 0, "scope2_mb": 0, "scope3_by_category": {}}
for r in data.get("value", []):
scope = r["GHGScope"]
tco2e = float(r["EmissionTCO2e"])
if scope == "1":
result["scope1"] += tco2e
elif scope == "2" and r.get("Category") == "LB":
result["scope2_lb"] += tco2e
elif scope == "2" and r.get("Category") == "MB":
result["scope2_mb"] += tco2e
elif scope == "3":
cat = r.get("Category", "UNKNOWN")
result["scope3_by_category"][cat] = (
result["scope3_by_category"].get(cat, 0) + tco2e
)
result["scope3_total"] = sum(result["scope3_by_category"].values())
return result
async def get_workforce_metrics(self, year: int, entity_id: str) -> dict:
"""Estrae metriche workforce per ESRS S1 da SAP SuccessFactors."""
# SAP SuccessFactors OData API
data = await self._get(
"/WorkforceMetrics",
params={
"$filter": f"Year eq {year} and EntityId eq '{entity_id}'",
"$select": "Metric,Value,Gender,EmployeeType"
}
)
result = {
"headcount_total": 0,
"headcount_female": 0,
"headcount_male": 0,
"fte_total": 0,
"turnover_rate": 0,
"gender_pay_gap_pct": None
}
for r in data.get("value", []):
metric = r.get("Metric")
value = float(r.get("Value", 0))
if metric == "HEADCOUNT":
result["headcount_total"] += value
if r.get("Gender") == "F":
result["headcount_female"] += value
elif r.get("Gender") == "M":
result["headcount_male"] += value
elif metric == "FTE":
result["fte_total"] += value
elif metric == "TURNOVER_RATE":
result["turnover_rate"] = value
elif metric == "GENDER_PAY_GAP":
result["gender_pay_gap_pct"] = value
return result
def _convert_to_mwh(self, quantity: float, unit: str) -> float:
conversions = {"GJ": 0.2778, "MWh": 1.0, "kWh": 0.001, "TJ": 277.78}
return quantity * conversions.get(unit, 1.0)
class OracleESGConnector(ERPConnector):
"""
Connettore per Oracle Fusion Sustainability (Oracle ESG).
Usa Oracle REST API + BICC per bulk extraction.
"""
def __init__(self, base_url: str, username: str, password: str):
self.base_url = base_url.rstrip("/")
self.auth = (username, password)
self._client = httpx.AsyncClient(timeout=60)
async def _get(self, path: str, params: dict = None) -> dict:
response = await self._client.get(
f"{self.base_url}{path}",
auth=self.auth,
headers={"Accept": "application/json"},
params=params
)
response.raise_for_status()
return response.json()
async def get_energy_consumption(self, year: int, entity_id: str) -> dict:
data = await self._get(
"/fscmRestApi/resources/11.13.18.05/sustainabilityMetrics",
params={
"q": f"metricCategory=ENERGY;fiscalYear={year};legalEntityId={entity_id}",
"fields": "metricName,metricValue,uom,dataSource,lastUpdateDate"
}
)
total_mwh = 0
renewable_mwh = 0
RENEWABLE_METRICS = {"SOLAR_ENERGY", "WIND_ENERGY", "HYDRO_ENERGY", "PPA_RENEWABLE"}
for item in data.get("items", []):
mwh = self._to_mwh(item["metricValue"], item["uom"])
total_mwh += mwh
if item["metricName"] in RENEWABLE_METRICS:
renewable_mwh += mwh
return {
"total_mwh": total_mwh,
"renewable_mwh": renewable_mwh,
"non_renewable_mwh": total_mwh - renewable_mwh
}
async def get_ghg_emissions(self, year: int, entity_id: str) -> dict:
data = await self._get(
"/fscmRestApi/resources/11.13.18.05/sustainabilityMetrics",
params={
"q": f"metricCategory=GHG_EMISSIONS;fiscalYear={year};legalEntityId={entity_id}",
"fields": "metricName,ghgScope,metricValue,uom"
}
)
result = {"scope1": 0, "scope2_lb": 0, "scope2_mb": 0, "scope3_total": 0}
for item in data.get("items", []):
scope = item.get("ghgScope", "")
val = float(item.get("metricValue", 0))
if scope == "SCOPE_1":
result["scope1"] += val
elif scope == "SCOPE_2_LB":
result["scope2_lb"] += val
elif scope == "SCOPE_2_MB":
result["scope2_mb"] += val
elif scope.startswith("SCOPE_3"):
result["scope3_total"] += val
return result
async def get_workforce_metrics(self, year: int, entity_id: str) -> dict:
data = await self._get(
"/fscmRestApi/resources/11.13.18.05/sustainabilityMetrics",
params={
"q": f"metricCategory=WORKFORCE;fiscalYear={year};legalEntityId={entity_id}"
}
)
result = {"headcount_total": 0, "fte_total": 0, "gender_pay_gap_pct": None}
for item in data.get("items", []):
name = item.get("metricName", "")
val = float(item.get("metricValue", 0))
if name == "TOTAL_HEADCOUNT":
result["headcount_total"] = val
elif name == "TOTAL_FTE":
result["fte_total"] = val
elif name == "GENDER_PAY_GAP":
result["gender_pay_gap_pct"] = val
return result
def _to_mwh(self, value: float, uom: str) -> float:
factors = {"MWH": 1.0, "KWH": 0.001, "GJ": 0.2778, "MMBTU": 0.29307}
return value * factors.get(uom.upper(), 1.0)
class ERPDataAggregator:
"""
Aggrega dati da multipli connettori ERP e mappa su ESRS data points.
"""
def __init__(self, connectors: list[ERPConnector]):
self.connectors = connectors
async def collect_and_map_to_esrs(
self,
year: int,
entity_id: str,
period_id: str
) -> list[dict]:
"""
Raccoglie dati da tutti i connettori e produce submissions ESRS-ready.
"""
results = []
# Raccolta parallela da tutti i connettori
energy_tasks = [c.get_energy_consumption(year, entity_id) for c in self.connectors]
ghg_tasks = [c.get_ghg_emissions(year, entity_id) for c in self.connectors]
wf_tasks = [c.get_workforce_metrics(year, entity_id) for c in self.connectors]
energy_results = await asyncio.gather(*energy_tasks, return_exceptions=True)
ghg_results = await asyncio.gather(*ghg_tasks, return_exceptions=True)
wf_results = await asyncio.gather(*wf_tasks, return_exceptions=True)
# Aggregazione (media pesata o somma in base al tipo)
total_energy = sum(
r["total_mwh"] for r in energy_results
if isinstance(r, dict)
)
total_renewable = sum(
r["renewable_mwh"] for r in energy_results
if isinstance(r, dict)
)
total_scope1 = sum(
r.get("scope1", 0) for r in ghg_results
if isinstance(r, dict)
)
# Map su ESRS data points
if total_energy > 0:
results.append({
"data_point_id": "E1-5_TotalEnergy",
"value_numeric": total_energy,
"unit_of_measure": "MWh",
"source_system": "ERP_AGGREGATED",
"confidence_level": "HIGH"
})
if total_renewable > 0:
results.append({
"data_point_id": "E1-5_RenewableEnergy",
"value_numeric": total_renewable,
"unit_of_measure": "MWh",
"source_system": "ERP_AGGREGATED",
"confidence_level": "HIGH"
})
if total_scope1 > 0:
results.append({
"data_point_id": "E1-6_GrossScope1",
"value_numeric": total_scope1,
"unit_of_measure": "tCO2eq",
"source_system": "ERP_AGGREGATED",
"confidence_level": "HIGH"
})
return results
Data Lineage a Audit Trail pro externí ověření
CSRD požaduje omezené ujištění (a v perspektivě přiměřené ujištění) externím auditorem. To znamená, že každý údaj ve zprávě musí mít úplná sledovatelnost: odkud pochází, kdo jej vložil, kdo jej schválil, a jak se to počítalo. Pojďme implementovat sledovač datové linie:
# data_lineage.py
# Data lineage tracker per assurance CSRD
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import hashlib
import json
@dataclass
class LineageNode:
"""Nodo nel grafo di data lineage."""
node_id: str
node_type: str # SOURCE, TRANSFORM, SUBMISSION, APPROVAL
description: str
actor: Optional[str] = None
timestamp: Optional[datetime] = None
metadata: dict = field(default_factory=dict)
hash: Optional[str] = None
def compute_hash(self) -> str:
"""Hash del contenuto per rilevare manomissioni."""
content = json.dumps({
"node_id": self.node_id,
"node_type": self.node_type,
"description": self.description,
"actor": self.actor,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"metadata": self.metadata
}, sort_keys=True)
self.hash = hashlib.sha256(content.encode()).hexdigest()
return self.hash
@dataclass
class LineageEdge:
"""Arco tra nodi nel grafo di lineage."""
from_node: str
to_node: str
relationship: str # DERIVED_FROM, APPROVED_BY, TRANSFORMED_BY
metadata: dict = field(default_factory=dict)
class DataLineageTracker:
"""
Traccia il percorso completo di ogni dato ESG dalla sorgente al report.
Struttura: DAG (Directed Acyclic Graph) di LineageNode.
"""
def __init__(self, db_pool):
self.db_pool = db_pool
async def record_extraction(
self,
data_point_id: str,
source_system: str,
source_query: str,
raw_value,
actor_id: str
) -> str:
"""Registra estrazione da sistema sorgente."""
node = LineageNode(
node_id=f"extract_{data_point_id}_{datetime.now().timestamp():.0f}",
node_type="SOURCE",
description=f"Extracted from {source_system}",
actor=actor_id,
timestamp=datetime.now(),
metadata={
"source_system": source_system,
"source_query": source_query,
"raw_value": str(raw_value),
"data_point_id": data_point_id
}
)
node.compute_hash()
await self._save_node(node)
return node.node_id
async def record_transformation(
self,
from_node_id: str,
data_point_id: str,
transformation_type: str,
input_value,
output_value,
formula: Optional[str],
actor_id: str
) -> str:
"""Registra una trasformazione (unit conversion, aggregation, etc.)."""
node = LineageNode(
node_id=f"transform_{data_point_id}_{datetime.now().timestamp():.0f}",
node_type="TRANSFORM",
description=f"{transformation_type} applied",
actor=actor_id,
timestamp=datetime.now(),
metadata={
"transformation_type": transformation_type,
"input_value": str(input_value),
"output_value": str(output_value),
"formula": formula,
"data_point_id": data_point_id
}
)
node.compute_hash()
await self._save_node(node)
edge = LineageEdge(
from_node=from_node_id,
to_node=node.node_id,
relationship="DERIVED_FROM"
)
await self._save_edge(edge)
return node.node_id
async def record_approval(
self,
submission_node_id: str,
data_point_id: str,
approver_id: str,
approval_comment: Optional[str]
) -> str:
"""Registra approvazione da parte del reviewer."""
node = LineageNode(
node_id=f"approval_{data_point_id}_{datetime.now().timestamp():.0f}",
node_type="APPROVAL",
description="Data point approved for reporting",
actor=approver_id,
timestamp=datetime.now(),
metadata={
"data_point_id": data_point_id,
"approval_comment": approval_comment,
"approver_id": approver_id
}
)
node.compute_hash()
await self._save_node(node)
edge = LineageEdge(
from_node=submission_node_id,
to_node=node.node_id,
relationship="APPROVED_BY"
)
await self._save_edge(edge)
return node.node_id
async def get_lineage_for_data_point(
self,
data_point_id: str,
period_id: str
) -> dict:
"""
Restituisce il grafo completo di lineage per un data point.
Usato dall'auditor per tracciare l'origine dei dati.
"""
async with self.db_pool.acquire() as conn:
nodes = await conn.fetch("""
SELECT * FROM lineage_nodes
WHERE metadata->>'data_point_id' = $1
ORDER BY timestamp ASC
""", data_point_id)
edges = await conn.fetch("""
SELECT le.* FROM lineage_edges le
JOIN lineage_nodes ln ON ln.node_id = le.from_node
WHERE ln.metadata->>'data_point_id' = $1
""", data_point_id)
return {
"data_point_id": data_point_id,
"period_id": period_id,
"nodes": [dict(n) for n in nodes],
"edges": [dict(e) for e in edges],
"integrity_check": await self._verify_chain_integrity(nodes)
}
async def _verify_chain_integrity(self, nodes) -> dict:
"""Verifica che gli hash non siano stati manomessi."""
for node in nodes:
expected_hash = node.compute_hash() if hasattr(node, 'compute_hash') else None
if expected_hash and node.get('hash') != expected_hash:
return {"valid": False, "tampered_node": node['node_id']}
return {"valid": True, "nodes_verified": len(nodes)}
async def _save_node(self, node: LineageNode):
async with self.db_pool.acquire() as conn:
await conn.execute("""
INSERT INTO lineage_nodes
(node_id, node_type, description, actor, timestamp, metadata, hash)
VALUES ($1, $2, $3, $4, $5, $6, $7)
""",
node.node_id, node.node_type, node.description,
node.actor, node.timestamp, json.dumps(node.metadata), node.hash)
async def _save_edge(self, edge: LineageEdge):
async with self.db_pool.acquire() as conn:
await conn.execute("""
INSERT INTO lineage_edges (from_node, to_node, relationship, metadata)
VALUES ($1, $2, $3, $4)
""",
edge.from_node, edge.to_node, edge.relationship,
json.dumps(edge.metadata))
Případová studie: Výrobní společnost s 250 zaměstnanci
Pojďme analyzovat skutečnou implementaci pro Společnost MetalTech S.r.l., výrobní společnost s 250 zaměstnanci, dvěma výrobními závody (Turín a Bari), podléhající vlně CSRD 2 (FY 2025, první zpráva v roce 2026). Zde je návod, jak strukturovali projekt:
Fáze 1: Analýza mezer a posouzení významnosti (1.–2. měsíce)
MetalTech identifikoval materiálová témata pro výrobní sektor prostřednictvím procesu dvojí významnosti:
- ESRS E1 (klima): DVOJNÁSOBNÁ materialita – značná spotřeba energie (tavicí pec, výrobní linky) a fyzické riziko extrémních událostí
- ESRS E2 (znečištění): Nárazový materiál – emise částic PM2,5 a VOC sloučeniny z barev
- ESRS E5 (cirkulární ekonomika): Materiál - kovový odpad (35% materiálu na vstupu), obaly, průmyslové odpadní vody
- ESRS S1 (pracovní síla): Materiál - 250 zaměstnanců, vysoká úrazovost v tomto odvětví rozdíly v odměňování žen a mužů
- ESRS G1 (obchodní chování): Materiál pro základní regulační požadavek
Fáze 2: Technická infrastruktura (2.–4. měsíce)
Technologický balíček přijatý společností MetalTech:
| Komponent | Technologie | Používání |
|---|---|---|
| Backendy API | FastAPI + asyncpg | Sběr a validace dat ESG |
| Databáze | PostgreSQL 16 + pgvector | Ukládání datových bodů ESRS + hledání podobnosti |
| Integrace ERP | SAP SFM OData API | Energie, emise, pracovní síla od SAP |
| Pracovní postup | Vlastní FSM + Redis | Víceúrovňový pracovní postup schvalování |
| generace XBRL | Python lxml + taxonomie ESRS | iXBRL výstup pro OAM/CONSOB |
| Data Lineage | Vlastní DAG na PostgreSQL | Auditní stopa pro ujištění |
| Oznámení | FastAPI BackgroundTasks + SMTP | Upozornění na konečný termín, žádost o kontrolu |
| Frontend | Angular 17 + Materiál | Řídicí panel sběru a monitorování |
Fáze 3: Sběr dat FY2025 (4.–14. měsíce)
# Esempio raccolta dati MetalTech FY2025
# Script orchestrazione completa per stabilimento Torino
import asyncio
from erp_connectors import SAPSustainabilityConnector, ERPDataAggregator
from workflow_engine import CSRDWorkflowEngine, WorkflowState
async def run_metaltech_fy2025_collection():
# Configurazione connettore SAP
sap = SAPSustainabilityConnector(
base_url="https://metaltech.hana.ondemand.com",
client_id="METALTECH_CSRD_CLIENT",
client_secret="***", # Da env vars
token_url="https://metaltech.authentication.eu12.hana.ondemand.com/oauth/token"
)
aggregator = ERPDataAggregator([sap])
# Raccolta dati stabilimento Torino (entity TO001)
print("[1/3] Raccolta dati da SAP per stabilimento Torino...")
submissions_to = await aggregator.collect_and_map_to_esrs(
year=2025,
entity_id="TO001",
period_id="period_fy2025"
)
# Raccolta dati stabilimento Bari (entity BA001)
print("[2/3] Raccolta dati da SAP per stabilimento Bari...")
submissions_ba = await aggregator.collect_and_map_to_esrs(
year=2025,
entity_id="BA001",
period_id="period_fy2025"
)
# Consolidamento gruppo: somma Scope 1+2, media intensità'
print("[3/3] Consolidamento dati di gruppo...")
consolidated = consolidate_group_data([submissions_to, submissions_ba])
# Output dati FY2025 MetalTech consolidati
for sub in consolidated:
print(f" {sub['data_point_id']}: {sub['value_numeric']:.2f} {sub['unit_of_measure']}")
return consolidated
def consolidate_group_data(entity_submissions: list) -> list:
"""Consolida submissions di multiple entità per il gruppo."""
by_data_point = {}
for entity_subs in entity_submissions:
for sub in entity_subs:
dp_id = sub['data_point_id']
if dp_id not in by_data_point:
by_data_point[dp_id] = {**sub, "value_numeric": 0}
# Per emissioni e energia: somma (non media)
by_data_point[dp_id]["value_numeric"] += sub['value_numeric']
by_data_point[dp_id]["source_system"] = "CONSOLIDATED_GROUP"
return list(by_data_point.values())
# Risultati tipici per azienda manifatturiera 250 dip.
METALTECH_FY2025_RESULTS = {
"E1-6_GrossScope1": {
"value": 2_847, # tCO2eq - principalmente gas naturale forno fusione
"confidence": "HIGH",
"source": "SAP SFM - Gas meter readings"
},
"E1-6_GrossScope2LB": {
"value": 1_523, # tCO2eq - elettricita' rete
"confidence": "HIGH",
"source": "SAP SFM - Electricity bills"
},
"E1-6_GrossScope2MB": {
"value": 980, # tCO2eq - market-based con PPA solare
"confidence": "HIGH",
"source": "SAP SFM + GO certificates"
},
"E1-5_TotalEnergy": {
"value": 18_450, # MWh totali
"confidence": "HIGH",
"source": "SAP SFM - Meters + bills"
},
"E1-5_RenewableEnergy": {
"value": 6_200, # MWh (33.6% via PPA + impianto solare 500kWp)
"confidence": "MEDIUM", # Stima PPA
"source": "SAP SFM + PPA contract"
},
"GHG_Intensity": {
"value": 42.3, # tCO2eq / M EUR fatturato
"confidence": "HIGH",
"source": "Calculated: (S1+S2MB) / revenue"
}
}
Výsledky a získané poznatky
MetalTech FY2025: Výsledky projektu CSRD
- Doba sběru dat: Od 6 týdnů (manuálně) do 4 dnů (automaticky)
- Povinné pokrytí ESRS: Bylo shromážděno a schváleno 94 % povinných datových bodů
- Kvalita dat: Důvěra VYSOKÁ u 78 % příspěvků (oproti 45 % v předchozím roce)
- Časy auditu: Snížení o 60 % díky automatickému vedení dat
- Náklady na externí zajištění: -25 % pro menší námahu při odběru vzorků
- Hlavní nález: Rozsah 3 kat. 1 (předchozí materiály) byla podhodnocena o 40 % ve srovnání s předchozími manuálními výpočty
Hlavní potíže a jak je překonat
Anti-vzory, kterým je třeba se vyhnout
- Nestavte vše od začátku: Trh nabízí SaaS CSRD řešení (Workiva, Watershed, Persefoni) pro malé a střední podniky. Vyhodnoťte build-vs-buy: stavba v domě má smysl pouze v případě, že máte vysokou složitost dat nebo jedinečné potřeby integrace.
- Neodkládejte posouzení významnosti: Je to předpoklad všeho. Bez něj nevíte, jaká data sbírat. Začněte ihned, i se zjednodušenou metodou.
- Neignorujte rozsah 3: Pro mnoho výrobních společností je rozsah 3 kat. 1 (suroviny) a kat. 11 (používání produktu) přesahují 70 % celkových emisí. Underreporting Scope 3 je nejběžnější greenwashing.
- Neberte XBRL jako poslední myšlenku: Vyžaduje značkování iXBRL obeznámeni s taxonomií ESRS. Naplánujte si alespoň 2-3 měsíce vývoje.
- Nezapomeňte na rozsah konsolidace: CSRD požaduje zahrnují všechny dceřiné společnosti >50 %. Vyloučit entity pro „jednoduchost“ je to riziko compliance.
GraphQL API pro pokročilé ESG dotazy
Pro analytické panely ESG a interaktivní nástroje pro vytváření sestav nabízí GraphQL API vynikající flexibilita oproti klasickému REST, umožňující specifické dotazy s agregacemi za letu:
# graphql_schema.py
# Schema GraphQL per ESG Analytics API con Strawberry
import strawberry
from strawberry.types import Info
from typing import Optional, List
from datetime import date
from decimal import Decimal
@strawberry.type
class GHGEmissionsSummary:
year: int
scope1_tco2eq: Decimal
scope2_lb_tco2eq: Decimal
scope2_mb_tco2eq: Decimal
scope3_tco2eq: Decimal
total_tco2eq: Decimal
ghg_intensity: Optional[Decimal]
yoy_change_pct: Optional[Decimal]
@strawberry.type
class EnergySummary:
year: int
total_mwh: Decimal
renewable_mwh: Decimal
non_renewable_mwh: Decimal
renewable_share_pct: Decimal
@strawberry.type
class ESGReportingStatus:
period_id: str
year: int
workflow_state: str
mandatory_coverage_pct: Decimal
approved_data_points: int
pending_review: int
missing_mandatory: int
@strawberry.type
class Query:
@strawberry.field
async def ghg_emissions(
self,
info: Info,
company_id: str,
years: List[int]
) -> List[GHGEmissionsSummary]:
"""Emissioni GHG per anni richiesti con calcolo YoY automatico."""
db = info.context["db_pool"]
async with db.acquire() as conn:
rows = await conn.fetch("""
SELECT
rp.year,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope1%'
THEN eds.value_numeric ELSE 0 END), 0) as scope1,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope2%LB%'
THEN eds.value_numeric ELSE 0 END), 0) as scope2_lb,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope2%MB%'
THEN eds.value_numeric ELSE 0 END), 0) as scope2_mb,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope3%'
THEN eds.value_numeric ELSE 0 END), 0) as scope3
FROM esg_data_submissions eds
JOIN reporting_periods rp ON rp.id = eds.reporting_period_id
JOIN esrs_data_points dp ON dp.id = eds.data_point_id
WHERE rp.company_id = $1
AND rp.year = ANY($2)
AND eds.status = 'APPROVED'
AND dp.esrs_standard = 'E1'
GROUP BY rp.year
ORDER BY rp.year
""", company_id, years)
results = []
prev_total = None
for row in rows:
total = row['scope1'] + row['scope2_mb'] + row['scope3']
yoy = None
if prev_total and prev_total > 0:
yoy = ((total - prev_total) / prev_total) * 100
results.append(GHGEmissionsSummary(
year=row['year'],
scope1_tco2eq=Decimal(str(row['scope1'])),
scope2_lb_tco2eq=Decimal(str(row['scope2_lb'])),
scope2_mb_tco2eq=Decimal(str(row['scope2_mb'])),
scope3_tco2eq=Decimal(str(row['scope3'])),
total_tco2eq=Decimal(str(total)),
ghg_intensity=None,
yoy_change_pct=Decimal(str(yoy)) if yoy else None
))
prev_total = total
return results
@strawberry.field
async def reporting_status(
self,
info: Info,
company_id: str,
year: int
) -> Optional[ESGReportingStatus]:
"""Stato del processo di reporting per un anno."""
db = info.context["db_pool"]
async with db.acquire() as conn:
period = await conn.fetchrow("""
SELECT rp.id, rp.year, rp.status,
COUNT(eds.id) FILTER (WHERE eds.status = 'APPROVED') as approved,
COUNT(eds.id) FILTER (WHERE eds.status IN ('SUBMITTED','REVIEWING')) as pending,
(SELECT COUNT(*) FROM esrs_data_points WHERE is_mandatory = TRUE) as mandatory_total
FROM reporting_periods rp
LEFT JOIN esg_data_submissions eds ON eds.reporting_period_id = rp.id
WHERE rp.company_id = $1 AND rp.year = $2
GROUP BY rp.id, rp.year, rp.status
""", company_id, year)
if not period:
return None
mandatory_total = period['mandatory_total'] or 1
approved = period['approved'] or 0
coverage = (approved / mandatory_total) * 100
return ESGReportingStatus(
period_id=str(period['id']),
year=period['year'],
workflow_state=period['status'],
mandatory_coverage_pct=Decimal(str(round(coverage, 1))),
approved_data_points=approved,
pending_review=period['pending'] or 0,
missing_mandatory=max(0, mandatory_total - approved)
)
schema = strawberry.Schema(query=Query)
# Query di esempio per il frontend
EXAMPLE_QUERY = """
query ESGDashboard($companyId: String!, $years: [Int!]!) {
ghgEmissions(companyId: $companyId, years: $years) {
year
scope1Tco2eq
scope2MbTco2eq
scope3Tco2eq
totalTco2eq
yoyChangePct
}
reportingStatus(companyId: $companyId, year: 2025) {
workflowState
mandatoryCoveragePct
approvedDataPoints
missingMandatory
}
}
"""
Úvahy o zákonu o AI a průniku s CSRD
Nově se objevujícím aspektem v letech 2025–2026 je průnik mezi CSRD a AI Act EU. Systémy AI používaná k výpočtu emisí, vytváření klimatických projekcí nebo automatizaci hlášení ESG může spadat do vysoce rizikových kategorií umělé inteligence podle zákona o AI, které vyžadují:
- Technická dokumentace systému AI používaného pro odhady emisí (metodická transparentnost ESRS)
- Lidský nadhled o automatizovaném rozhodování o významnosti
- Auditní stopy předpovědí umělé inteligence (již pokryto naší datovou řadou)
- Zákaz diskriminace v systémech AI HR používaných pro metriky ESRS S1
AI Act x CSRD Timeline
- Únor 2025: Platný zákon o AI – zakazuje zakázané praktiky
- Srpen 2025: Povinnosti GPAI (General Purpose AI) – transparentnost modelů základů AI
- Srpen 2026: High Risk AI Comprehensive Obligations – zahrnuje systémy AI pro HR a úvěrová rozhodnutí
- Srpen 2027: Plná aplikace AI Act pro všechny případy použití
Týmy vytvářející systémy ESG/CSRD již musí plánovat dodržování předpisů Zákon o umělé inteligenci pro součásti ML jejich zásobníku: modely odhadu emisí rozsahu 3, detekce anomálií na datech ESG, NLP pro posouzení dodavatele.
Závěry a další kroky
Vybudování systému v souladu s CSRD není jen cvičením shody: je příležitost vytvořit datovou architekturu udržitelnosti, která bude generovat hodnotu za roky. Komponenty, které jsme viděli — datový model ESRS, sběrné API s validace, FSM workflow, ERP integrace, XBRL generátor, datová linie – to vše je opakovaně použitelné a modulární.
Klíčové body, které si můžete vzít domů:
- Začněte od posouzení významnosti: bez dvojí materiality nevíte co sbírat. Je to proces, který vyžaduje skutečné zapojení zainteresovaných stran a další algoritmy.
- Automatizace sběru ERP: Existuje 60–70 % dat ESRS E1 a S1 již ve vašich systémech. Konektory OData pro SAP a Oracle jsou investice které se obnoví během týdnů.
- Investujte do datové řady hned teď: náklady na externí pojištění závisí přímo na sledovatelnosti dat. Dobrý systém auditních záznamů snižuje náklady na audit o 20–30 %.
- Neignorujte balíček Omnibus: zjednodušení navržená v roce 2025 by mohly zmírnit některé povinnosti pro malé a střední podniky. Sledovat legislativní proces, ale ne plánovat je před konečným schválením.
V dalším článku seriálu — AI uhlíková stopa — prozkoumáme, jak měřit a snižovat dopad modelů umělé inteligence na životní prostředí: od školení velkých jazykových modelů k odvození ve výrobě pomocí nástrojů jako CodeCarbon, ML CO2 Dopad a strategie pro udržitelné ML inženýrství.
Zdroje, kde se dozvíte více
- Taxonomie EFRAG ESRS: efrag.org - ESRS XBRL Taxonomie
- Oficiální text CSRD: EUR-Lex - Směrnice 2022/2464/EU
- Firemní standard protokolu GHG: ghgprotocol.org
- SAP Sustainability Footprint Management: sap.com/sustainability
- Knihovny Python XBRL: arelle (open source), python-xbrl
- CSRD SaaS nástroje: Workiva, Watershed, Persefoni, Sweep, Greenly
Green Software Engineering Series - Navigace
Předchozí článek: GreenOps: Udržitelné DevOps — CI/CD zelená, cloud Rightsizing, FinOps a metriky udržitelnosti pro týmy DevOps.
Další článek: AI uhlíková stopa — Změřit a snížit emise modelů umělé inteligence při školení a vyvozování s praktickými nástroji a měřítky sektoru.
Tato série je součástí cesty Obchod s daty a AI na federicocalo.dev, který také zahrnuje sérii MLOps for Business (ID 306-315) pro ty, kteří se chtějí ponořit hlouběji do hmoty při výrobě modelů AI s důrazem na výpočetní efektivitu a vládnutí.







