API de raportare ESG: Integrare cu fluxul de lucru CSRD
Il 1 ianuarie 2024 a marcat un punct de cotitură epocal pentru companiile europene: Corporate Directiva de raportare a durabilității (CSRD) a intrat în vigoare, transformând raportarea ESG de la practica voluntară la obligația legală cu standarde tehnice precise. Companiile subiect trebuie acum produce rapoarte conforme Standardele europene de raportare de durabilitate (ESRS), cu date verificabile, etichetate în format XBRL și supuse asigurării externe.
Provocarea pentru echipele tehnice este concretă: colectați date ESG din zeci de sisteme ale companiei (ERP, managementul energiei, HR, logistică), agregați-le în funcție de taxonomiile ESRS, calculați metrica cum ar fi emisiile Scope 1-3 și intensitatea apei, gestionați procesul de aprobare multi-nivel și produc ieșiri în format iXBRL pregătit pentru regulator. Toate acestea necesită unul Arhitectură API dedicată, nu foile Excel partajate.
În acest articol vom construi un sistem complet: de la modelul de date compatibil cu ESRS la API-ul REST pentru colectarea și validarea datelor, la fluxul de lucru de consolidare și aprobare, până la generație a ieșirii XBRL. Vom folosi Python/FastAPI pentru backend, PostgreSQL pentru stocarea structurată, și vom analiza cum să integrăm SAP Sustainability și Oracle ESG ca surse de date upstream.
Ce vei învăța
- Cronologia CSRD și perimetrul aplicației: cine trebuie să facă ce și când
- ESRS E1-E5, S1-S4, G1: Structura standardelor și metrici obligatorii
- Evaluare dublă materialitate: impact + implementare algoritmică financiară
- Model de date PostgreSQL pentru colectarea de date ESG conform ESRS
- API REST/GraphQL pentru colectarea, validarea și agregarea datelor ESG
- Automatizarea fluxului de lucru pentru aprobare și consolidare pe mai multe niveluri
- Etichetarea XBRL/iXBRL cu Python: generarea automată a raportului digital
- Integrare ERP: SAP Sustainability Footprint Management, Oracle Fusion ESG
- Linia de date și pistă de audit pentru asigurare externă
- Studiu de caz complet: Companie de producție medie cu 250 de angajați
Seria Green Software Engineering
Acest articol este al optulea din seria cuprinzătoare despre Green Software Engineering, care acoperă de la măsurarea emisiilor de cod până la raportarea ESG de reglementare:
| # | Articol | Concentrează-te |
|---|---|---|
| 1 | Principii Green Software Engineering | GSF, SCI spec, 8 principii fundamentale |
| 2 | CodeCarbon: Măsurarea emisiilor de cod | Bibliotecă Python, tablou de bord, integrare CI/CD |
| 3 | Climatiq API: calcule GHG în backend | API REST, Scop 1-3, integrare FastAPI |
| 4 | SDK Carbon Aware | Schimbarea sarcinii de lucru, intensitatea grilei, schimbarea timpului |
| 5 | Conducta Scope 3 | Emisii lanțului valoric, furnizori, LCA |
| 6 | Modele de arhitectură verde | Memorare în cache durabilă fără server, bazată pe evenimente |
| 7 | GreenOps: DevOps sustenabil | Green CI/CD, cloud rightizing, FinOps |
| 8 | API de raportare ESG: flux de lucru CSRD (acest articol) | ESRS, XBRL, materialitate, integrare ERP |
| 9 | Amprenta de carbon AI | Training LLM/inferență, ML sustenabil |
| 10 | Modelare avansată a domeniului | Metodologii GHG Protocol, sectoare specifice |
CSRD: Noua paradigmă a raportării de durabilitate
CSRD (Directiva 2022/2464/UE) a înlocuit Directiva de raportare nefinanciară (NFRD), extinderea drastică a perimetrului aplicației și creșterea calității necesare a rapoartelor. În timp ce NFRD era de aproximativ cca 11.000 de companii Țările europene, CSRD le implică 50.000+, inclusiv pentru prima dată IMM-urile listate și filialele europene ale grupuri non-UE.
Cronologia aplicației
| Data | Subiecte | Primul Raport |
|---|---|---|
| IAN 2024 (FY 2024) | Companii mari deja supuse NFRD (>500 de angajați) | 2025 |
| IAN 2025 (AF 2025) | Companii mari din UE (>250 de angajați SAU cifră de afaceri >40 milioane EUR SAU >20 milioane active) | 2026 |
| IAN 2026 (FY 2026) | IMM-uri listate pe piețele reglementate (10-250 de angajați) | 2027 |
| IAN 2028 (FY 2028) | Filiale din UE ale unor grupuri din afara UE (cifra de afaceri UE >150 milioane EUR) | 2029 |
Simplificare Omnibus 2025
În februarie 2025, Comisia Europeană a publicat Pachetul Omnibus care propune simplificări semnificativ pentru CSRD: reducerea sferei IMM, amânare cu 2 ani pentru companiile din valul 2 și 3 și simplificarea unor puncte de date ESRS. Cu toate acestea, raman companiile de mare val 1 subiect pe deplin. Modificările sunt încă în proces legislativ la momentul de publicarea acestui articol. Verificați întotdeauna starea actualizată a legislației.
Structura ESRS
Standardele europene de raportare de durabilitate sunt organizate în trei domenii tematice, plus două transversale standard:
| Standard | Zonă | Subiecte | Obligatoriu |
|---|---|---|---|
| ESRS 1 | Tăiere transversală | Cerințe și principii generale | Si |
| ESRS 2 | Tăiere transversală | Informații generale (guvernare, strategie, semnificație) | Si |
| ESRS E1 | Mediu | Schimbările climatice (GES, energie, TCFD) | Dacă material |
| ESRS E2 | Mediu | Poluarea (aer, apa, sol, substante) | Dacă material |
| ESRS E3 | Mediu | Resursele de apă și marine | Dacă material |
| ESRS E4 | Mediu | Biodiversitate și ecosisteme | Dacă material |
| ESRS E5 | Mediu | Utilizarea resurselor și economia circulară | Dacă material |
| ESRS S1 | Social | Forța de muncă proprie (condiții de muncă, D&I, sănătate) | Dacă material |
| ESRS S2 | Social | Lucrătorii din lanțul valoric | Dacă material |
| ESRS S3 | Social | Comunitățile afectate | Dacă material |
| ESRS S4 | Social | Consumatorii și utilizatorii finali | Dacă material |
| ESRS G1 | Guvernare | Conduita în afaceri (etică, anticorupție, lobby) | Dacă material |
Clima ESRS E1: valorile pe care trebuie să le colectați
ESRS E1 este de obicei cel mai scump subiect pentru colectarea datelor. Necesită dezvăluire privind emisiile de GES (Scope 1, 2, 3), obiective de reducere aliniate la știință (SBTi), energie consumate după sursă și riscurile/oportunitățile climatice conform TCFD. Să vedem puncte de date obligatorii cele mai relevante:
Emisii de GES - Măsuri obligatorii ESRS E1-6
| Puncte de date | Unitate | Descriere |
|---|---|---|
| Emisii brute de GES din domeniul 1 | tCO2eq | Emisii directe din surse deținute sau controlate |
| Emisii brute de GES din domeniul de aplicare 2 (în funcție de locație) | tCO2eq | Emisii indirecte din achiziția de energie, metoda de localizare |
| Emisii brute de GES din domeniul de aplicare 2 (bazate pe piață) | tCO2eq | Emisii indirecte, metoda de piata (certificate) |
| Emisii brute de GES Scope 3 (15 categorii) | tCO2eq | Emisii indirecte în lanțul valoric |
| Intensitatea GES (venit) | tCO2eq / M EUR | Intensitatea pe unitatea de cifra de afaceri |
| Consumul total de energie | MWh | Consumul total de energie |
| Ponderea energiei regenerabile | % | Procentul de energie din surse regenerabile |
| Eliminarea și depozitarea GES | tCO2eq | Eliminarea carbonului prin împădurire, CCS etc. |
Evaluare dublă materialitate: implementare algoritmică
Inima metodologică a CSRD este dublă materialitate: fiecare companie trebuie să evalueze ce subiecte ESG sunt materiale, luând în considerare două perspective simultane:
- Materialitatea impactului: Compania are impacturi semnificative (pozitive sau negative, actual sau potențial) despre oameni și mediu pentru subiectul respectiv?
- Materialitatea financiară: Acel subiect ESG generează riscuri sau oportunități active financiare semnificative pentru companie (pe termen scurt, mediu, lung)?
Un subiect este material dacă îndeplinește cel puțin unul dintre cele două criterii. Să vedem cum să implementăm acest proces cu un API dedicat:
# materiality_assessment.py
# Implementazione Double Materiality Assessment per CSRD ESRS 2-IRO
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
import uuid
from datetime import datetime
class MaterialityDimension(str, Enum):
IMPACT = "impact"
FINANCIAL = "financial"
class ImpactType(str, Enum):
ACTUAL_NEGATIVE = "actual_negative"
ACTUAL_POSITIVE = "actual_positive"
POTENTIAL_NEGATIVE = "potential_negative"
POTENTIAL_POSITIVE = "potential_positive"
class TimeHorizon(str, Enum):
SHORT_TERM = "short_term" # 0-1 anno
MEDIUM_TERM = "medium_term" # 1-5 anni
LONG_TERM = "long_term" # oltre 5 anni
@dataclass
class ImpactMaterialityScore:
"""
Score impact materiality per un topic ESG.
Basato su: severity x likelihood (per impatti negativi)
o scale x likelihood (per impatti positivi)
"""
topic_id: str
impact_type: ImpactType
# Score 1-5 per ogni dimensione
scale: int # Quante persone/ecosistemi impattati
scope: int # Reversibilita' dell'impatto
irremediable: int # Difficolta' di rimediare
likelihood: int # Probabilità' che accada
@property
def severity(self) -> float:
"""Severity = media pesata di scale, scope, irremediable"""
return (self.scale * 0.4 + self.scope * 0.3 + self.irremediable * 0.3)
@property
def score(self) -> float:
"""Score finale 1-25 per ordinamento"""
return self.severity * self.likelihood
@property
def is_material(self) -> bool:
"""Soglia materialita': score >= 9 (severity >= 3, likelihood >= 3)"""
return self.score >= 9.0
@dataclass
class FinancialMaterialityScore:
"""
Score financial materiality per un topic ESG.
Basato su: magnitude x likelihood, per time horizon
"""
topic_id: str
is_risk: bool # True = rischio, False = opportunità'
magnitude: int # 1-5: impatto finanziario potenziale
likelihood: int # 1-5: probabilità'
time_horizon: TimeHorizon
# Fattori qualitativi
quantifiable: bool # Possiamo quantificarlo in EUR?
estimated_impact_eur: Optional[float] = None
@property
def score(self) -> float:
"""Score base con discount per time horizon"""
base = self.magnitude * self.likelihood
# Discount: short=1.0, medium=0.8, long=0.6
discount = {
TimeHorizon.SHORT_TERM: 1.0,
TimeHorizon.MEDIUM_TERM: 0.8,
TimeHorizon.LONG_TERM: 0.6
}[self.time_horizon]
return base * discount
@property
def is_material(self) -> bool:
return self.score >= 7.5 # magnitude >= 3, likelihood >= 3 con discount short
@dataclass
class TopicMaterialityResult:
topic_id: str
topic_name: str
esrs_standard: str
impact_scores: list[ImpactMaterialityScore] = field(default_factory=list)
financial_scores: list[FinancialMaterialityScore] = field(default_factory=list)
@property
def impact_material(self) -> bool:
return any(s.is_material for s in self.impact_scores)
@property
def financial_material(self) -> bool:
return any(s.is_material for s in self.financial_scores)
@property
def is_material(self) -> bool:
"""Double materiality: materiale se almeno una dimensione e' materiale"""
return self.impact_material or self.financial_material
@property
def materiality_level(self) -> str:
if self.impact_material and self.financial_material:
return "DOUBLE_MATERIAL"
elif self.impact_material:
return "IMPACT_MATERIAL"
elif self.financial_material:
return "FINANCIAL_MATERIAL"
else:
return "NOT_MATERIAL"
def run_materiality_assessment(
company_id: str,
topics: list[dict]
) -> list[TopicMaterialityResult]:
"""
Esegue il materiality assessment completo.
Input: lista topic con scores raccolti via stakeholder engagement.
Output: ranking per materialita'.
"""
results = []
for topic_data in topics:
result = TopicMaterialityResult(
topic_id=topic_data["topic_id"],
topic_name=topic_data["topic_name"],
esrs_standard=topic_data["esrs_standard"]
)
# Aggiungi impact scores
for impact in topic_data.get("impact_scores", []):
result.impact_scores.append(ImpactMaterialityScore(**impact))
# Aggiungi financial scores
for financial in topic_data.get("financial_scores", []):
result.financial_scores.append(FinancialMaterialityScore(**financial))
results.append(result)
# Ordina per materialita' discendente
return sorted(
results,
key=lambda r: (
r.is_material,
r.impact_material and r.financial_material,
max((s.score for s in r.impact_scores), default=0) +
max((s.score for s in r.financial_scores), default=0)
),
reverse=True
)
Model de date PostgreSQL pentru CSRD
Un sistem compatibil cu CSRD necesită o schemă de bază de date care să reflecte structura ESRS, susține descendența datelor și menține istoricul modificărilor pentru pista de audit. Aici este schema de baza:
-- Schema PostgreSQL per CSRD ESG Data Collection
-- Ottimizzato per ESRS data points con audit trail completo
-- Tabella aziende e perimetro di consolidamento
CREATE TABLE companies (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
legal_name VARCHAR(255) NOT NULL,
lei_code VARCHAR(20), -- Legal Entity Identifier
country_code CHAR(2) NOT NULL,
nace_code VARCHAR(10), -- Classificazione attivita'
fiscal_year_end DATE,
employees_fte DECIMAL(10,2),
revenue_eur DECIMAL(20,2),
total_assets_eur DECIMAL(20,2),
is_parent BOOLEAN DEFAULT FALSE,
parent_id UUID REFERENCES companies(id),
csrd_wave INTEGER, -- 1, 2, 3, 4
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Periodi di reporting
CREATE TABLE reporting_periods (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id UUID NOT NULL REFERENCES companies(id),
year INTEGER NOT NULL,
period_type VARCHAR(20) DEFAULT 'ANNUAL',
start_date DATE NOT NULL,
end_date DATE NOT NULL,
status VARCHAR(30) DEFAULT 'DRAFT',
-- DRAFT, COLLECTING, REVIEWING, APPROVED, FILED
submitted_at TIMESTAMPTZ,
approved_by UUID,
UNIQUE (company_id, year, period_type)
);
-- Catalog ESRS data points (reference table)
CREATE TABLE esrs_data_points (
id VARCHAR(50) PRIMARY KEY,
-- es. "E1-6_GrossScope1", "S1-7_GenderPayGap"
esrs_standard VARCHAR(10) NOT NULL, -- E1, E2, S1, G1...
topic VARCHAR(100) NOT NULL,
sub_topic VARCHAR(100),
name VARCHAR(255) NOT NULL,
description TEXT,
unit_of_measure VARCHAR(50),
data_type VARCHAR(20), -- NUMERIC, BOOLEAN, TEXT, DATE, ENUM
is_mandatory BOOLEAN DEFAULT FALSE,
is_phase_in BOOLEAN DEFAULT FALSE, -- Permesso rinvio anni 1-3
ghg_protocol_scope VARCHAR(10), -- S1, S2_LB, S2_MB, S3
scope3_category INTEGER, -- 1-15 per Scope 3
tags TEXT[]
);
-- Raccolta dati ESG per periodo
CREATE TABLE esg_data_submissions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id UUID NOT NULL REFERENCES companies(id),
reporting_period_id UUID NOT NULL REFERENCES reporting_periods(id),
data_point_id VARCHAR(50) NOT NULL REFERENCES esrs_data_points(id),
-- Valori (uno solo populated in base a data_type)
value_numeric DECIMAL(30, 6),
value_boolean BOOLEAN,
value_text TEXT,
value_date DATE,
-- Metadata qualità'
unit_of_measure VARCHAR(50),
estimation_method VARCHAR(100),
confidence_level VARCHAR(20), -- HIGH, MEDIUM, LOW
is_estimated BOOLEAN DEFAULT FALSE,
-- Data lineage
source_system VARCHAR(100), -- SAP, Oracle, Manual, API
source_document_ref VARCHAR(255),
collected_by UUID,
collection_timestamp TIMESTAMPTZ DEFAULT NOW(),
-- Workflow
status VARCHAR(30) DEFAULT 'DRAFT',
-- DRAFT, SUBMITTED, REVIEWED, APPROVED, REJECTED
reviewed_by UUID,
review_timestamp TIMESTAMPTZ,
review_comment TEXT,
approved_by UUID,
approval_timestamp TIMESTAMPTZ,
-- Versioning per audit
version INTEGER DEFAULT 1,
previous_version_id UUID REFERENCES esg_data_submissions(id),
CONSTRAINT unique_submission UNIQUE (
company_id, reporting_period_id, data_point_id, version
)
);
-- Indici per performance
CREATE INDEX idx_submissions_period ON esg_data_submissions(reporting_period_id);
CREATE INDEX idx_submissions_status ON esg_data_submissions(status);
CREATE INDEX idx_submissions_datapoint ON esg_data_submissions(data_point_id);
-- Audit log immutabile
CREATE TABLE audit_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
entity_type VARCHAR(50) NOT NULL,
entity_id UUID NOT NULL,
action VARCHAR(20) NOT NULL, -- CREATE, UPDATE, DELETE, APPROVE
actor_id UUID NOT NULL,
actor_email VARCHAR(255),
timestamp TIMESTAMPTZ DEFAULT NOW(),
ip_address INET,
old_value JSONB,
new_value JSONB,
change_reason TEXT
);
-- View aggregata emissioni GHG per periodo
CREATE OR REPLACE VIEW v_ghg_emissions_summary AS
SELECT
c.legal_name,
rp.year,
SUM(CASE WHEN dp.id LIKE 'E1%Scope1%' THEN eds.value_numeric ELSE 0 END) AS scope1_tco2eq,
SUM(CASE WHEN dp.id LIKE 'E1%Scope2%LB%' THEN eds.value_numeric ELSE 0 END) AS scope2_lb_tco2eq,
SUM(CASE WHEN dp.id LIKE 'E1%Scope2%MB%' THEN eds.value_numeric ELSE 0 END) AS scope2_mb_tco2eq,
SUM(CASE WHEN dp.id LIKE 'E1%Scope3%' THEN eds.value_numeric ELSE 0 END) AS scope3_tco2eq,
SUM(CASE WHEN dp.id = 'E1-5_TotalEnergy' THEN eds.value_numeric ELSE 0 END) AS total_energy_mwh,
SUM(CASE WHEN dp.id = 'E1-5_RenewableEnergy' THEN eds.value_numeric ELSE 0 END) AS renewable_mwh
FROM esg_data_submissions eds
JOIN companies c ON c.id = eds.company_id
JOIN reporting_periods rp ON rp.id = eds.reporting_period_id
JOIN esrs_data_points dp ON dp.id = eds.data_point_id
WHERE eds.status = 'APPROVED'
GROUP BY c.legal_name, rp.year;
Arhitectura API: Colectarea și validarea datelor ESG
Arhitectura API a sistemului CSRD este împărțită în trei straturi principale: colectarea datelor de la unități de afaceri și sisteme sursă, validare și îmbogățire conform regulilor ESRS, e agregare pentru consolidarea grupului. Să implementăm backend-ul cu FastAPI:
# main.py - FastAPI ESG Reporting API
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from uuid import UUID
from datetime import datetime, date
from decimal import Decimal
import asyncpg
import json
app = FastAPI(
title="CSRD ESG Reporting API",
description="API per raccolta e gestione dati ESG conformi ESRS",
version="1.0.0"
)
# === PYDANTIC MODELS ===
class ESGDataPointSubmission(BaseModel):
data_point_id: str = Field(..., description="ID ESRS data point (es. E1-6_GrossScope1)")
value_numeric: Optional[Decimal] = None
value_boolean: Optional[bool] = None
value_text: Optional[str] = None
value_date: Optional[date] = None
unit_of_measure: Optional[str] = None
estimation_method: Optional[str] = None
confidence_level: str = Field(default="MEDIUM", pattern="^(HIGH|MEDIUM|LOW)$")
is_estimated: bool = False
source_system: Optional[str] = None
source_document_ref: Optional[str] = None
@validator('value_numeric')
def validate_numeric(cls, v, values):
if v is not None and v < 0:
raise ValueError('Valori numerici GHG non possono essere negativi')
return v
class BulkESGSubmission(BaseModel):
reporting_period_id: UUID
submissions: List[ESGDataPointSubmission]
submission_note: Optional[str] = None
class DataPointValidationResult(BaseModel):
data_point_id: str
is_valid: bool
errors: List[str] = []
warnings: List[str] = []
class ValidationReport(BaseModel):
period_id: UUID
total_mandatory_points: int
submitted_points: int
approved_points: int
coverage_pct: float
validation_results: List[DataPointValidationResult]
overall_valid: bool
# === VALIDATION SERVICE ===
class ESRSValidationService:
"""
Validazione regole ESRS per data points.
Implementa controlli incrociati richiesti dallo standard.
"""
CONSISTENCY_RULES = [
{
"rule": "SCOPE1_PLUS_SCOPE2_CONSISTENCY",
"description": "Total GHG = Scope1 + Scope2 (market) deve essere coerente",
"severity": "ERROR"
},
{
"rule": "ENERGY_GHG_RATIO",
"description": "Intensità' GHG su energia deve essere plausibile",
"severity": "WARNING"
},
{
"rule": "RENEWABLE_WITHIN_TOTAL",
"description": "Energia rinnovabile non può' superare totale energia",
"severity": "ERROR"
},
{
"rule": "YOY_VARIANCE_THRESHOLD",
"description": "Variazione YoY > 30% richiede commento",
"severity": "WARNING"
}
]
async def validate_submission(
self,
submissions: List[ESGDataPointSubmission],
period_id: UUID,
db_pool
) -> ValidationReport:
errors_by_point = {}
warnings_by_point = {}
# 1. Validazione singoli data point
for sub in submissions:
errors = []
warnings = []
# Carica definizione data point da catalogo
dp_def = await self._get_data_point_def(sub.data_point_id, db_pool)
if not dp_def:
errors.append(f"Data point {sub.data_point_id} non trovato nel catalogo ESRS")
else:
# Verifica unita' di misura
if dp_def['unit_of_measure'] and sub.unit_of_measure:
if sub.unit_of_measure != dp_def['unit_of_measure']:
warnings.append(
f"Unit of measure {sub.unit_of_measure} diversa da "
f"attesa {dp_def['unit_of_measure']}"
)
# Verifica tipo dato
if dp_def['data_type'] == 'NUMERIC' and sub.value_numeric is None:
errors.append(f"Data point numerico richiede value_numeric")
errors_by_point[sub.data_point_id] = errors
warnings_by_point[sub.data_point_id] = warnings
# 2. Controlli incrociati
await self._run_cross_checks(submissions, errors_by_point, warnings_by_point)
# 3. Calcola copertura mandatory
mandatory_coverage = await self._calc_mandatory_coverage(period_id, submissions, db_pool)
results = [
DataPointValidationResult(
data_point_id=dp_id,
is_valid=len(errs) == 0,
errors=errs,
warnings=warnings_by_point.get(dp_id, [])
)
for dp_id, errs in errors_by_point.items()
]
overall_valid = all(r.is_valid for r in results)
return ValidationReport(
period_id=period_id,
total_mandatory_points=mandatory_coverage['total'],
submitted_points=mandatory_coverage['submitted'],
approved_points=mandatory_coverage['approved'],
coverage_pct=mandatory_coverage['coverage_pct'],
validation_results=results,
overall_valid=overall_valid
)
async def _run_cross_checks(self, submissions, errors_by_point, warnings_by_point):
"""Controlli incrociati tra data points correlati."""
values = {s.data_point_id: s.value_numeric for s in submissions if s.value_numeric}
# Energia rinnovabile <= totale energia
renewable = values.get('E1-5_RenewableEnergy')
total_energy = values.get('E1-5_TotalEnergy')
if renewable and total_energy and renewable > total_energy:
errors_by_point.setdefault('E1-5_RenewableEnergy', []).append(
"Energia rinnovabile non può' superare energia totale"
)
async def _get_data_point_def(self, dp_id: str, db_pool):
async with db_pool.acquire() as conn:
return await conn.fetchrow(
"SELECT * FROM esrs_data_points WHERE id = $1", dp_id
)
async def _calc_mandatory_coverage(self, period_id, submissions, db_pool):
async with db_pool.acquire() as conn:
total = await conn.fetchval(
"SELECT COUNT(*) FROM esrs_data_points WHERE is_mandatory = TRUE"
)
submitted = len([s for s in submissions if s.value_numeric is not None
or s.value_boolean is not None or s.value_text is not None])
return {
'total': total or 0,
'submitted': submitted,
'approved': 0,
'coverage_pct': (submitted / total * 100) if total else 0
}
# === ENDPOINTS ===
@app.post("/api/v1/periods/{period_id}/submissions/bulk")
async def bulk_submit_esg_data(
period_id: UUID,
payload: BulkESGSubmission,
background_tasks: BackgroundTasks,
db_pool = Depends(get_db_pool)
):
"""
Submission bulk di dati ESG per un periodo.
Accetta fino a 500 data points per chiamata.
"""
if len(payload.submissions) > 500:
raise HTTPException(400, "Max 500 data points per richiesta")
validator = ESRSValidationService()
validation_report = await validator.validate_submission(
payload.submissions, period_id, db_pool
)
if not validation_report.overall_valid:
raise HTTPException(422, {
"message": "Validation failed",
"validation_report": validation_report.dict()
})
# Salva in batch
async with db_pool.acquire() as conn:
async with conn.transaction():
for sub in payload.submissions:
await conn.execute("""
INSERT INTO esg_data_submissions
(company_id, reporting_period_id, data_point_id,
value_numeric, value_boolean, value_text,
unit_of_measure, estimation_method, confidence_level,
is_estimated, source_system, source_document_ref,
status)
SELECT
rp.company_id, $1, $2,
$3, $4, $5,
$6, $7, $8,
$9, $10, $11,
'SUBMITTED'
FROM reporting_periods rp WHERE rp.id = $1
ON CONFLICT (company_id, reporting_period_id, data_point_id, version)
DO UPDATE SET
value_numeric = EXCLUDED.value_numeric,
status = 'SUBMITTED',
collection_timestamp = NOW()
""",
period_id,
str(sub.data_point_id),
sub.value_numeric,
sub.value_boolean,
sub.value_text,
sub.unit_of_measure,
sub.estimation_method,
sub.confidence_level,
sub.is_estimated,
sub.source_system,
sub.source_document_ref
)
# Trigger notifica al reviewer in background
background_tasks.add_task(notify_reviewers, period_id, len(payload.submissions))
return {
"status": "submitted",
"period_id": str(period_id),
"submitted_count": len(payload.submissions),
"validation_report": validation_report.dict()
}
@app.get("/api/v1/periods/{period_id}/validation-report")
async def get_validation_report(
period_id: UUID,
db_pool = Depends(get_db_pool)
):
"""Restituisce report di completezza e validità per un periodo."""
async with db_pool.acquire() as conn:
# Aggregazione stato submissions per mandatory data points
rows = await conn.fetch("""
SELECT
dp.id as data_point_id,
dp.name,
dp.is_mandatory,
dp.unit_of_measure,
eds.status,
eds.value_numeric,
eds.confidence_level
FROM esrs_data_points dp
LEFT JOIN esg_data_submissions eds ON
eds.data_point_id = dp.id
AND eds.reporting_period_id = $1
WHERE dp.is_mandatory = TRUE
ORDER BY dp.esrs_standard, dp.id
""", period_id)
missing_mandatory = [r for r in rows if r['status'] is None]
approved = [r for r in rows if r['status'] == 'APPROVED']
return {
"period_id": str(period_id),
"total_mandatory": len(rows),
"approved": len(approved),
"missing": len(missing_mandatory),
"coverage_pct": round(len(approved) / len(rows) * 100, 1) if rows else 0,
"missing_data_points": [
{"id": r['data_point_id'], "name": r['name']}
for r in missing_mandatory
]
}
async def notify_reviewers(period_id: UUID, count: int):
"""Background task: notifica reviewer via email/webhook."""
print(f"[NOTIFY] Period {period_id}: {count} data points submitted for review")
async def get_db_pool():
"""Dependency injection per asyncpg pool."""
# In produzione: pool globale inizializzato al startup
pool = await asyncpg.create_pool(
"postgresql://user:password@localhost/esg_db",
min_size=5,
max_size=20
)
try:
yield pool
finally:
await pool.close()
Automatizarea fluxului de lucru: aprobare pe mai multe niveluri
Un sistem CSRD de întreprindere necesită un flux de lucru structurat care gestionează colectarea de la unități de afaceri multiple, cicluri de revizuire și aprobare și consolidare finală. Iată implementarea motorului de flux de lucru:
# workflow_engine.py
# Workflow CSRD multi-livello: collect -> review -> approve -> file
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional
import asyncio
class WorkflowState(str, Enum):
DRAFT = "DRAFT"
DATA_COLLECTION = "DATA_COLLECTION"
INTERNAL_REVIEW = "INTERNAL_REVIEW"
CFO_APPROVAL = "CFO_APPROVAL"
EXTERNAL_ASSURANCE = "EXTERNAL_ASSURANCE"
BOARD_APPROVAL = "BOARD_APPROVAL"
FILED = "FILED"
REJECTED = "REJECTED"
class WorkflowTransition:
"""Definisce una transizione di stato valida con condizioni e azioni."""
def __init__(
self,
from_state: WorkflowState,
to_state: WorkflowState,
action: str,
condition: Optional[Callable] = None,
pre_actions: list = None,
post_actions: list = None
):
self.from_state = from_state
self.to_state = to_state
self.action = action
self.condition = condition
self.pre_actions = pre_actions or []
self.post_actions = post_actions or []
# Definizione FSM per workflow CSRD
CSRD_WORKFLOW_TRANSITIONS = [
WorkflowTransition(
from_state=WorkflowState.DRAFT,
to_state=WorkflowState.DATA_COLLECTION,
action="START_COLLECTION",
post_actions=["send_collection_invites", "set_deadlines"]
),
WorkflowTransition(
from_state=WorkflowState.DATA_COLLECTION,
to_state=WorkflowState.INTERNAL_REVIEW,
action="SUBMIT_FOR_REVIEW",
condition=lambda ctx: ctx.get("mandatory_coverage_pct", 0) >= 80,
pre_actions=["run_validation", "generate_completeness_report"]
),
WorkflowTransition(
from_state=WorkflowState.INTERNAL_REVIEW,
to_state=WorkflowState.DATA_COLLECTION,
action="REQUEST_CORRECTIONS",
post_actions=["notify_data_owners", "log_review_comments"]
),
WorkflowTransition(
from_state=WorkflowState.INTERNAL_REVIEW,
to_state=WorkflowState.CFO_APPROVAL,
action="APPROVE_INTERNAL_REVIEW",
condition=lambda ctx: ctx.get("reviewer_approved", False),
pre_actions=["generate_draft_report"]
),
WorkflowTransition(
from_state=WorkflowState.CFO_APPROVAL,
to_state=WorkflowState.EXTERNAL_ASSURANCE,
action="CFO_SIGN_OFF",
post_actions=["prepare_assurance_package", "notify_auditor"]
),
WorkflowTransition(
from_state=WorkflowState.EXTERNAL_ASSURANCE,
to_state=WorkflowState.BOARD_APPROVAL,
action="ASSURANCE_COMPLETE",
condition=lambda ctx: ctx.get("assurance_opinion") in ["UNQUALIFIED", "QUALIFIED"],
post_actions=["attach_assurance_report"]
),
WorkflowTransition(
from_state=WorkflowState.BOARD_APPROVAL,
to_state=WorkflowState.FILED,
action="BOARD_APPROVE_AND_FILE",
post_actions=["generate_ixbrl", "submit_to_oam", "publish_report"]
),
]
class CSRDWorkflowEngine:
def __init__(self, db_pool):
self.db_pool = db_pool
self.transitions = {
(t.from_state, t.action): t
for t in CSRD_WORKFLOW_TRANSITIONS
}
async def execute_transition(
self,
period_id: str,
action: str,
actor_id: str,
context: dict = None
) -> dict:
"""
Esegue una transizione di workflow per un periodo di reporting.
Restituisce il nuovo stato o errore se la transizione non e' valida.
"""
context = context or {}
async with self.db_pool.acquire() as conn:
# Carica stato corrente
period = await conn.fetchrow(
"SELECT status, company_id FROM reporting_periods WHERE id = $1",
period_id
)
if not period:
raise ValueError(f"Periodo {period_id} non trovato")
current_state = WorkflowState(period['status'])
transition_key = (current_state, action)
if transition_key not in self.transitions:
raise ValueError(
f"Transizione {action} non valida da stato {current_state.value}"
)
transition = self.transitions[transition_key]
# Verifica condizione (arricchisci context con dati DB se necessario)
if transition.condition:
enriched_ctx = await self._enrich_context(period_id, context, conn)
if not transition.condition(enriched_ctx):
raise ValueError(
f"Condizione per {action} non soddisfatta: "
f"mandatory coverage = {enriched_ctx.get('mandatory_coverage_pct', 0)}%"
)
# Esegui pre-actions
for pre_action in transition.pre_actions:
await self._execute_action(pre_action, period_id, context, conn)
# Aggiorna stato
await conn.execute(
"UPDATE reporting_periods SET status = $1, updated_at = NOW() WHERE id = $2",
transition.to_state.value,
period_id
)
# Audit log
await conn.execute("""
INSERT INTO audit_log (entity_type, entity_id, action, actor_id, new_value)
VALUES ('reporting_period', $1, $2, $3, $4)
""", period_id, action, actor_id,
json.dumps({"from": current_state.value, "to": transition.to_state.value}))
# Esegui post-actions in background (notifiche, generazione documenti)
for post_action in transition.post_actions:
asyncio.create_task(
self._execute_action_background(post_action, period_id, context)
)
return {
"period_id": period_id,
"previous_state": current_state.value,
"new_state": transition.to_state.value,
"action": action,
"actor_id": actor_id,
"timestamp": datetime.now().isoformat()
}
async def _enrich_context(self, period_id: str, context: dict, conn) -> dict:
"""Arricchisce context con dati calcolati dal DB."""
mandatory_total = await conn.fetchval(
"SELECT COUNT(*) FROM esrs_data_points WHERE is_mandatory = TRUE"
)
approved = await conn.fetchval("""
SELECT COUNT(*) FROM esg_data_submissions
WHERE reporting_period_id = $1 AND status = 'APPROVED'
""", period_id)
enriched = dict(context)
enriched['mandatory_coverage_pct'] = (
(approved / mandatory_total * 100) if mandatory_total else 0
)
return enriched
async def _execute_action(self, action_name: str, period_id: str, context: dict, conn):
"""Esegue un'azione sincrona nel workflow."""
action_map = {
"run_validation": self._action_run_validation,
"generate_completeness_report": self._action_gen_completeness,
"generate_draft_report": self._action_gen_draft,
}
if action_name in action_map:
await action_map[action_name](period_id, context, conn)
async def _action_run_validation(self, period_id, context, conn):
print(f"[WORKFLOW] Running validation for period {period_id}")
async def _action_gen_completeness(self, period_id, context, conn):
print(f"[WORKFLOW] Generating completeness report for {period_id}")
async def _action_gen_draft(self, period_id, context, conn):
print(f"[WORKFLOW] Generating draft CSRD report for {period_id}")
async def _execute_action_background(self, action_name: str, period_id: str, context: dict):
"""Esegue azione in background (notifiche, documenti)."""
print(f"[BACKGROUND] Executing {action_name} for period {period_id}")
Etichetare XBRL/iXBRL: generare de rapoarte digitale
CSRD cere ca raportul anual să fie întocmit în format XHTML cu etichetare iXBRL (XBRL inline), care încorporează date structurate direct în HTML care poate fi citit. EFRAG a publicat taxonomia ESRS XBRL care mapează fiecare punct de date cu eticheta iXBRL. Să implementăm generarea automată:
# xbrl_generator.py
# Generazione iXBRL per CSRD secondo ESRS Taxonomy
from lxml import etree
from decimal import Decimal
from typing import Optional
import uuid
# ESRS XBRL Taxonomy namespace
ESRS_NS = "https://xbrl.efrag.org/taxonomy/draft-esrs/2022-11-22"
XBRLI_NS = "http://www.xbrl.org/2003/instance"
IX_NS = "http://www.xbrl.org/2013/inlineXBRL"
LINK_NS = "http://www.xbrl.org/2003/linkbase"
class ESRSXBRLGenerator:
"""
Genera documenti iXBRL conformi alla ESRS Taxonomy.
Output: XHTML con embedded XBRL facts.
"""
def __init__(self, company_data: dict, period_data: dict):
self.company = company_data
self.period = period_data
self.context_id = f"ctx_{uuid.uuid4().hex[:8]}"
self.facts = []
def add_fact(
self,
element_name: str,
value,
unit: Optional[str] = None,
decimals: int = 0,
scale: int = 0
):
"""Aggiunge un fact XBRL al report."""
self.facts.append({
"element": element_name,
"value": value,
"unit": unit,
"decimals": decimals,
"scale": scale
})
def generate_ixbrl(self) -> str:
"""
Genera il documento iXBRL completo.
Returns: stringa XHTML con tagging iXBRL embedded.
"""
# Namespace map per il documento
nsmap = {
"ix": IX_NS,
"xbrli": XBRLI_NS,
"esrs": ESRS_NS,
None: "http://www.w3.org/1999/xhtml"
}
# Root XHTML
html = etree.Element("html", nsmap=nsmap)
head = etree.SubElement(html, "head")
body = etree.SubElement(html, "body")
# Header XBRL nel head
ix_header = etree.SubElement(head, f"{{IX_NS}}header")
hidden = etree.SubElement(ix_header, f"{{IX_NS}}hidden")
# Context: entità' e periodo
resources = etree.SubElement(ix_header, f"{{IX_NS}}resources")
xbrli_context = etree.SubElement(resources, f"{{XBRLI_NS}}context")
xbrli_context.set("id", self.context_id)
entity = etree.SubElement(xbrli_context, f"{{XBRLI_NS}}entity")
identifier = etree.SubElement(entity, f"{{XBRLI_NS}}identifier")
identifier.set("scheme", "http://www.lei.info")
identifier.text = self.company.get("lei_code", "000000000000000000")
period = etree.SubElement(xbrli_context, f"{{XBRLI_NS}}period")
start_date = etree.SubElement(period, f"{{XBRLI_NS}}startDate")
start_date.text = self.period.get("start_date", "2024-01-01")
end_date = etree.SubElement(period, f"{{XBRLI_NS}}endDate")
end_date.text = self.period.get("end_date", "2024-12-31")
# Unit monetaria e tCO2eq
unit_co2 = etree.SubElement(resources, f"{{XBRLI_NS}}unit")
unit_co2.set("id", "tCO2eq")
measure_co2 = etree.SubElement(unit_co2, f"{{XBRLI_NS}}measure")
measure_co2.text = "esrs:metricTon"
# Body: report HTML con facts inline
h1 = etree.SubElement(body, "h1")
h1.text = f"CSRD Sustainability Report {self.period.get('year', 2024)}"
h2 = etree.SubElement(body, "h2")
h2.text = "E1 - Climate Change: GHG Emissions"
# Tabella emissioni con tagging iXBRL inline
table = etree.SubElement(body, "table")
thead = etree.SubElement(table, "thead")
tr_head = etree.SubElement(thead, "tr")
etree.SubElement(tr_head, "th").text = "Metric"
etree.SubElement(tr_head, "th").text = "Value"
etree.SubElement(tr_head, "th").text = "Unit"
tbody = etree.SubElement(table, "tbody")
# Aggiungi facts come righe di tabella con tag iXBRL
for fact in self.facts:
tr = etree.SubElement(tbody, "tr")
etree.SubElement(tr, "td").text = fact["element"].split(":")[-1]
td_value = etree.SubElement(tr, "td")
# Tag iXBRL inline per il valore
ix_nonfraction = etree.SubElement(
td_value,
f"{{IX_NS}}nonFraction"
)
ix_nonfraction.set("name", fact["element"])
ix_nonfraction.set("contextRef", self.context_id)
ix_nonfraction.set("unitRef", fact.get("unit", "tCO2eq"))
ix_nonfraction.set("decimals", str(fact.get("decimals", 0)))
ix_nonfraction.set("scale", str(fact.get("scale", 0)))
ix_nonfraction.set("format", "ixt:num-dot-decimal")
ix_nonfraction.text = str(fact["value"])
etree.SubElement(tr, "td").text = fact.get("unit", "tCO2eq")
# Serializza come XHTML
return etree.tostring(
html,
pretty_print=True,
xml_declaration=True,
encoding="UTF-8",
method="xml"
).decode("utf-8")
def generate_csrd_report_from_db(approved_submissions: list, company: dict, period: dict) -> str:
"""
Genera report iXBRL da submissions approvate nel DB.
"""
generator = ESRSXBRLGenerator(company, period)
# Mapping data_point_id -> XBRL element name
ESRS_XBRL_MAP = {
"E1-6_GrossScope1": "esrs:GrossScope1GHGEmissions",
"E1-6_GrossScope2LB": "esrs:GrossScope2GHGEmissionsLocationBased",
"E1-6_GrossScope2MB": "esrs:GrossScope2GHGEmissionsMarketBased",
"E1-6_GrossScope3Total": "esrs:GrossScope3GHGEmissions",
"E1-5_TotalEnergy": "esrs:TotalEnergyConsumption",
"E1-5_RenewableEnergy": "esrs:EnergyConsumptionFromRenewableSources",
"E1-5_NonRenewableEnergy": "esrs:EnergyConsumptionFromNonRenewableSources",
}
for sub in approved_submissions:
element = ESRS_XBRL_MAP.get(sub['data_point_id'])
if element and sub['value_numeric'] is not None:
generator.add_fact(
element_name=element,
value=float(sub['value_numeric']),
unit=sub.get('unit_of_measure', 'tCO2eq'),
decimals=0
)
return generator.generate_ixbrl()
Integrare ERP: SAP Sustainability și Oracle ESG
Majoritatea datelor ESG se află deja în sistemele ERP corporative: consumul de energie în modulul PM/PM, date HR în SuccessFactors, logistică în modulul TM. Implementăm conectori pentru cele două platforme principale ale pieței întreprinderilor:
# erp_connectors.py
# Connettori per SAP Sustainability Footprint Management (SFM)
# e Oracle Fusion ESG
import httpx
import asyncio
from abc import ABC, abstractmethod
from typing import Optional
class ERPConnector(ABC):
"""Interfaccia base per connettori ERP."""
@abstractmethod
async def get_energy_consumption(self, year: int, entity_id: str) -> dict:
pass
@abstractmethod
async def get_ghg_emissions(self, year: int, entity_id: str) -> dict:
pass
@abstractmethod
async def get_workforce_metrics(self, year: int, entity_id: str) -> dict:
pass
class SAPSustainabilityConnector(ERPConnector):
"""
Connettore per SAP Sustainability Footprint Management (SFM).
Usa SAP OData API v4.
API base: /sap/opu/odata4/sap/api_sustainability_footprint/
"""
def __init__(
self,
base_url: str,
client_id: str,
client_secret: str,
token_url: str
):
self.base_url = base_url.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self._token: Optional[str] = None
self._client = httpx.AsyncClient(timeout=30)
async def _get_token(self) -> str:
"""OAuth2 client credentials per SAP BTP."""
if self._token:
return self._token
response = await self._client.post(
self.token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
}
)
response.raise_for_status()
self._token = response.json()["access_token"]
return self._token
async def _get(self, path: str, params: dict = None) -> dict:
token = await self._get_token()
response = await self._client.get(
f"{self.base_url}{path}",
headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
params=params
)
response.raise_for_status()
return response.json()
async def get_energy_consumption(self, year: int, entity_id: str) -> dict:
"""
Estrae consumo energetico da SAP SFM per anno e entità'.
OData endpoint: /EnergyConsumption
"""
data = await self._get(
"/EnergyConsumption",
params={
"$filter": f"Year eq {year} and EntityId eq '{entity_id}'",
"$select": "Year,EntityId,EnergyType,Quantity,Unit,Source"
}
)
result = {"total_mwh": 0, "renewable_mwh": 0, "non_renewable_mwh": 0, "by_source": []}
for record in data.get("value", []):
qty_mwh = self._convert_to_mwh(record["Quantity"], record["Unit"])
result["total_mwh"] += qty_mwh
result["by_source"].append({
"source": record["EnergyType"],
"mwh": qty_mwh,
"is_renewable": record["EnergyType"] in [
"SOLAR", "WIND", "HYDRO", "BIOMASS", "GEOTHERMAL"
]
})
if record["EnergyType"] in ["SOLAR", "WIND", "HYDRO", "BIOMASS", "GEOTHERMAL"]:
result["renewable_mwh"] += qty_mwh
else:
result["non_renewable_mwh"] += qty_mwh
return result
async def get_ghg_emissions(self, year: int, entity_id: str) -> dict:
"""Estrae emissioni GHG da SAP SFM, suddivise per scope."""
data = await self._get(
"/GHGEmission",
params={
"$filter": f"Year eq {year} and EntityId eq '{entity_id}'",
"$select": "Year,EntityId,GHGScope,Category,EmissionTCO2e"
}
)
result = {"scope1": 0, "scope2_lb": 0, "scope2_mb": 0, "scope3_by_category": {}}
for r in data.get("value", []):
scope = r["GHGScope"]
tco2e = float(r["EmissionTCO2e"])
if scope == "1":
result["scope1"] += tco2e
elif scope == "2" and r.get("Category") == "LB":
result["scope2_lb"] += tco2e
elif scope == "2" and r.get("Category") == "MB":
result["scope2_mb"] += tco2e
elif scope == "3":
cat = r.get("Category", "UNKNOWN")
result["scope3_by_category"][cat] = (
result["scope3_by_category"].get(cat, 0) + tco2e
)
result["scope3_total"] = sum(result["scope3_by_category"].values())
return result
async def get_workforce_metrics(self, year: int, entity_id: str) -> dict:
"""Estrae metriche workforce per ESRS S1 da SAP SuccessFactors."""
# SAP SuccessFactors OData API
data = await self._get(
"/WorkforceMetrics",
params={
"$filter": f"Year eq {year} and EntityId eq '{entity_id}'",
"$select": "Metric,Value,Gender,EmployeeType"
}
)
result = {
"headcount_total": 0,
"headcount_female": 0,
"headcount_male": 0,
"fte_total": 0,
"turnover_rate": 0,
"gender_pay_gap_pct": None
}
for r in data.get("value", []):
metric = r.get("Metric")
value = float(r.get("Value", 0))
if metric == "HEADCOUNT":
result["headcount_total"] += value
if r.get("Gender") == "F":
result["headcount_female"] += value
elif r.get("Gender") == "M":
result["headcount_male"] += value
elif metric == "FTE":
result["fte_total"] += value
elif metric == "TURNOVER_RATE":
result["turnover_rate"] = value
elif metric == "GENDER_PAY_GAP":
result["gender_pay_gap_pct"] = value
return result
def _convert_to_mwh(self, quantity: float, unit: str) -> float:
conversions = {"GJ": 0.2778, "MWh": 1.0, "kWh": 0.001, "TJ": 277.78}
return quantity * conversions.get(unit, 1.0)
class OracleESGConnector(ERPConnector):
"""
Connettore per Oracle Fusion Sustainability (Oracle ESG).
Usa Oracle REST API + BICC per bulk extraction.
"""
def __init__(self, base_url: str, username: str, password: str):
self.base_url = base_url.rstrip("/")
self.auth = (username, password)
self._client = httpx.AsyncClient(timeout=60)
async def _get(self, path: str, params: dict = None) -> dict:
response = await self._client.get(
f"{self.base_url}{path}",
auth=self.auth,
headers={"Accept": "application/json"},
params=params
)
response.raise_for_status()
return response.json()
async def get_energy_consumption(self, year: int, entity_id: str) -> dict:
data = await self._get(
"/fscmRestApi/resources/11.13.18.05/sustainabilityMetrics",
params={
"q": f"metricCategory=ENERGY;fiscalYear={year};legalEntityId={entity_id}",
"fields": "metricName,metricValue,uom,dataSource,lastUpdateDate"
}
)
total_mwh = 0
renewable_mwh = 0
RENEWABLE_METRICS = {"SOLAR_ENERGY", "WIND_ENERGY", "HYDRO_ENERGY", "PPA_RENEWABLE"}
for item in data.get("items", []):
mwh = self._to_mwh(item["metricValue"], item["uom"])
total_mwh += mwh
if item["metricName"] in RENEWABLE_METRICS:
renewable_mwh += mwh
return {
"total_mwh": total_mwh,
"renewable_mwh": renewable_mwh,
"non_renewable_mwh": total_mwh - renewable_mwh
}
async def get_ghg_emissions(self, year: int, entity_id: str) -> dict:
data = await self._get(
"/fscmRestApi/resources/11.13.18.05/sustainabilityMetrics",
params={
"q": f"metricCategory=GHG_EMISSIONS;fiscalYear={year};legalEntityId={entity_id}",
"fields": "metricName,ghgScope,metricValue,uom"
}
)
result = {"scope1": 0, "scope2_lb": 0, "scope2_mb": 0, "scope3_total": 0}
for item in data.get("items", []):
scope = item.get("ghgScope", "")
val = float(item.get("metricValue", 0))
if scope == "SCOPE_1":
result["scope1"] += val
elif scope == "SCOPE_2_LB":
result["scope2_lb"] += val
elif scope == "SCOPE_2_MB":
result["scope2_mb"] += val
elif scope.startswith("SCOPE_3"):
result["scope3_total"] += val
return result
async def get_workforce_metrics(self, year: int, entity_id: str) -> dict:
data = await self._get(
"/fscmRestApi/resources/11.13.18.05/sustainabilityMetrics",
params={
"q": f"metricCategory=WORKFORCE;fiscalYear={year};legalEntityId={entity_id}"
}
)
result = {"headcount_total": 0, "fte_total": 0, "gender_pay_gap_pct": None}
for item in data.get("items", []):
name = item.get("metricName", "")
val = float(item.get("metricValue", 0))
if name == "TOTAL_HEADCOUNT":
result["headcount_total"] = val
elif name == "TOTAL_FTE":
result["fte_total"] = val
elif name == "GENDER_PAY_GAP":
result["gender_pay_gap_pct"] = val
return result
def _to_mwh(self, value: float, uom: str) -> float:
factors = {"MWH": 1.0, "KWH": 0.001, "GJ": 0.2778, "MMBTU": 0.29307}
return value * factors.get(uom.upper(), 1.0)
class ERPDataAggregator:
"""
Aggrega dati da multipli connettori ERP e mappa su ESRS data points.
"""
def __init__(self, connectors: list[ERPConnector]):
self.connectors = connectors
async def collect_and_map_to_esrs(
self,
year: int,
entity_id: str,
period_id: str
) -> list[dict]:
"""
Raccoglie dati da tutti i connettori e produce submissions ESRS-ready.
"""
results = []
# Raccolta parallela da tutti i connettori
energy_tasks = [c.get_energy_consumption(year, entity_id) for c in self.connectors]
ghg_tasks = [c.get_ghg_emissions(year, entity_id) for c in self.connectors]
wf_tasks = [c.get_workforce_metrics(year, entity_id) for c in self.connectors]
energy_results = await asyncio.gather(*energy_tasks, return_exceptions=True)
ghg_results = await asyncio.gather(*ghg_tasks, return_exceptions=True)
wf_results = await asyncio.gather(*wf_tasks, return_exceptions=True)
# Aggregazione (media pesata o somma in base al tipo)
total_energy = sum(
r["total_mwh"] for r in energy_results
if isinstance(r, dict)
)
total_renewable = sum(
r["renewable_mwh"] for r in energy_results
if isinstance(r, dict)
)
total_scope1 = sum(
r.get("scope1", 0) for r in ghg_results
if isinstance(r, dict)
)
# Map su ESRS data points
if total_energy > 0:
results.append({
"data_point_id": "E1-5_TotalEnergy",
"value_numeric": total_energy,
"unit_of_measure": "MWh",
"source_system": "ERP_AGGREGATED",
"confidence_level": "HIGH"
})
if total_renewable > 0:
results.append({
"data_point_id": "E1-5_RenewableEnergy",
"value_numeric": total_renewable,
"unit_of_measure": "MWh",
"source_system": "ERP_AGGREGATED",
"confidence_level": "HIGH"
})
if total_scope1 > 0:
results.append({
"data_point_id": "E1-6_GrossScope1",
"value_numeric": total_scope1,
"unit_of_measure": "tCO2eq",
"source_system": "ERP_AGGREGATED",
"confidence_level": "HIGH"
})
return results
Linia de date și pista de audit pentru asigurarea externă
CSRD cere asigurare limitată (și în perspectivă asigurare rezonabilă) de către un auditor extern. Aceasta înseamnă că fiecare dată din raport trebuie să aibă trasabilitate completă: de unde provine, cine a introdus-o, cine a aprobat-o, si cum a fost calculat. Să implementăm instrumentul de urmărire a descendenței datelor:
# data_lineage.py
# Data lineage tracker per assurance CSRD
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import hashlib
import json
@dataclass
class LineageNode:
"""Nodo nel grafo di data lineage."""
node_id: str
node_type: str # SOURCE, TRANSFORM, SUBMISSION, APPROVAL
description: str
actor: Optional[str] = None
timestamp: Optional[datetime] = None
metadata: dict = field(default_factory=dict)
hash: Optional[str] = None
def compute_hash(self) -> str:
"""Hash del contenuto per rilevare manomissioni."""
content = json.dumps({
"node_id": self.node_id,
"node_type": self.node_type,
"description": self.description,
"actor": self.actor,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"metadata": self.metadata
}, sort_keys=True)
self.hash = hashlib.sha256(content.encode()).hexdigest()
return self.hash
@dataclass
class LineageEdge:
"""Arco tra nodi nel grafo di lineage."""
from_node: str
to_node: str
relationship: str # DERIVED_FROM, APPROVED_BY, TRANSFORMED_BY
metadata: dict = field(default_factory=dict)
class DataLineageTracker:
"""
Traccia il percorso completo di ogni dato ESG dalla sorgente al report.
Struttura: DAG (Directed Acyclic Graph) di LineageNode.
"""
def __init__(self, db_pool):
self.db_pool = db_pool
async def record_extraction(
self,
data_point_id: str,
source_system: str,
source_query: str,
raw_value,
actor_id: str
) -> str:
"""Registra estrazione da sistema sorgente."""
node = LineageNode(
node_id=f"extract_{data_point_id}_{datetime.now().timestamp():.0f}",
node_type="SOURCE",
description=f"Extracted from {source_system}",
actor=actor_id,
timestamp=datetime.now(),
metadata={
"source_system": source_system,
"source_query": source_query,
"raw_value": str(raw_value),
"data_point_id": data_point_id
}
)
node.compute_hash()
await self._save_node(node)
return node.node_id
async def record_transformation(
self,
from_node_id: str,
data_point_id: str,
transformation_type: str,
input_value,
output_value,
formula: Optional[str],
actor_id: str
) -> str:
"""Registra una trasformazione (unit conversion, aggregation, etc.)."""
node = LineageNode(
node_id=f"transform_{data_point_id}_{datetime.now().timestamp():.0f}",
node_type="TRANSFORM",
description=f"{transformation_type} applied",
actor=actor_id,
timestamp=datetime.now(),
metadata={
"transformation_type": transformation_type,
"input_value": str(input_value),
"output_value": str(output_value),
"formula": formula,
"data_point_id": data_point_id
}
)
node.compute_hash()
await self._save_node(node)
edge = LineageEdge(
from_node=from_node_id,
to_node=node.node_id,
relationship="DERIVED_FROM"
)
await self._save_edge(edge)
return node.node_id
async def record_approval(
self,
submission_node_id: str,
data_point_id: str,
approver_id: str,
approval_comment: Optional[str]
) -> str:
"""Registra approvazione da parte del reviewer."""
node = LineageNode(
node_id=f"approval_{data_point_id}_{datetime.now().timestamp():.0f}",
node_type="APPROVAL",
description="Data point approved for reporting",
actor=approver_id,
timestamp=datetime.now(),
metadata={
"data_point_id": data_point_id,
"approval_comment": approval_comment,
"approver_id": approver_id
}
)
node.compute_hash()
await self._save_node(node)
edge = LineageEdge(
from_node=submission_node_id,
to_node=node.node_id,
relationship="APPROVED_BY"
)
await self._save_edge(edge)
return node.node_id
async def get_lineage_for_data_point(
self,
data_point_id: str,
period_id: str
) -> dict:
"""
Restituisce il grafo completo di lineage per un data point.
Usato dall'auditor per tracciare l'origine dei dati.
"""
async with self.db_pool.acquire() as conn:
nodes = await conn.fetch("""
SELECT * FROM lineage_nodes
WHERE metadata->>'data_point_id' = $1
ORDER BY timestamp ASC
""", data_point_id)
edges = await conn.fetch("""
SELECT le.* FROM lineage_edges le
JOIN lineage_nodes ln ON ln.node_id = le.from_node
WHERE ln.metadata->>'data_point_id' = $1
""", data_point_id)
return {
"data_point_id": data_point_id,
"period_id": period_id,
"nodes": [dict(n) for n in nodes],
"edges": [dict(e) for e in edges],
"integrity_check": await self._verify_chain_integrity(nodes)
}
async def _verify_chain_integrity(self, nodes) -> dict:
"""Verifica che gli hash non siano stati manomessi."""
for node in nodes:
expected_hash = node.compute_hash() if hasattr(node, 'compute_hash') else None
if expected_hash and node.get('hash') != expected_hash:
return {"valid": False, "tampered_node": node['node_id']}
return {"valid": True, "nodes_verified": len(nodes)}
async def _save_node(self, node: LineageNode):
async with self.db_pool.acquire() as conn:
await conn.execute("""
INSERT INTO lineage_nodes
(node_id, node_type, description, actor, timestamp, metadata, hash)
VALUES ($1, $2, $3, $4, $5, $6, $7)
""",
node.node_id, node.node_type, node.description,
node.actor, node.timestamp, json.dumps(node.metadata), node.hash)
async def _save_edge(self, edge: LineageEdge):
async with self.db_pool.acquire() as conn:
await conn.execute("""
INSERT INTO lineage_edges (from_node, to_node, relationship, metadata)
VALUES ($1, $2, $3, $4)
""",
edge.from_node, edge.to_node, edge.relationship,
json.dumps(edge.metadata))
Studiu de caz: Companie de producție cu 250 de angajați
Să analizăm implementarea reală pentru MetalTech S.r.l., firma producatoare cu 250 de angajați, două fabrici de producție (Torino și Bari), supuse valului 2 CSRD (FY 2025, primul raport în 2026). Iată cum au structurat proiectul:
Faza 1: Analiza decalajului și evaluarea semnificației (Lunile 1-2)
MetalTech a identificat subiecte materiale pentru sectorul de producție prin intermediul procesului de dublă materialitate:
- ESRS E1 (clima): Materialitate DUBLA - consum semnificativ de energie (cuptor de topire, linii de producție) și riscul fizic de la evenimente extreme
- ESRS E2 (Poluare): Material de impact - emisii de particule PM2.5 și compuși VOC din vopsea
- ESRS E5 (Economie circulară): Material - deșeuri metalice (35% din material în intrare), ambalaje, ape uzate industriale
- ESRS S1 (forța de muncă): Material - 250 de angajati, rata de accidentare mare pe sector, diferența de remunerare între femei și bărbați
- ESRS G1 (Conduita în afaceri): Material pentru cerințele de reglementare de bază
Faza 2: Infrastructura tehnică (Lunile 2-4)
Stiva tehnologică adoptată de MetalTech:
| Componentă | Tehnologie | Utilizare |
|---|---|---|
| Backend-uri API | FastAPI + asyncpg | Colectarea și validarea datelor ESG |
| Baze de date | PostgreSQL 16 + pgvector | Puncte de date ESRS de stocare + căutare de similaritate |
| Integrare ERP | SAP SFM OData API | Energie, emisii, forță de muncă de la SAP |
| Fluxul de lucru | Personalizat FSM + Redis | Flux de lucru de aprobare pe mai multe niveluri |
| Generația XBRL | Python lxml + Taxonomie ESRS | Ieșire iXBRL pentru OAM/CONSOB |
| Linia de date | DAG personalizat pe PostgreSQL | Pista de audit pentru asigurare |
| Notificări | FastAPI BackgroundTasks + SMTP | Alertă de termen limită, cerere de revizuire |
| În față | Angular 17 + Material | Tabloul de bord de colectare și monitorizare |
Faza 3: Colectarea datelor FY2025 (Lunile 4-14)
# Esempio raccolta dati MetalTech FY2025
# Script orchestrazione completa per stabilimento Torino
import asyncio
from erp_connectors import SAPSustainabilityConnector, ERPDataAggregator
from workflow_engine import CSRDWorkflowEngine, WorkflowState
async def run_metaltech_fy2025_collection():
# Configurazione connettore SAP
sap = SAPSustainabilityConnector(
base_url="https://metaltech.hana.ondemand.com",
client_id="METALTECH_CSRD_CLIENT",
client_secret="***", # Da env vars
token_url="https://metaltech.authentication.eu12.hana.ondemand.com/oauth/token"
)
aggregator = ERPDataAggregator([sap])
# Raccolta dati stabilimento Torino (entity TO001)
print("[1/3] Raccolta dati da SAP per stabilimento Torino...")
submissions_to = await aggregator.collect_and_map_to_esrs(
year=2025,
entity_id="TO001",
period_id="period_fy2025"
)
# Raccolta dati stabilimento Bari (entity BA001)
print("[2/3] Raccolta dati da SAP per stabilimento Bari...")
submissions_ba = await aggregator.collect_and_map_to_esrs(
year=2025,
entity_id="BA001",
period_id="period_fy2025"
)
# Consolidamento gruppo: somma Scope 1+2, media intensità'
print("[3/3] Consolidamento dati di gruppo...")
consolidated = consolidate_group_data([submissions_to, submissions_ba])
# Output dati FY2025 MetalTech consolidati
for sub in consolidated:
print(f" {sub['data_point_id']}: {sub['value_numeric']:.2f} {sub['unit_of_measure']}")
return consolidated
def consolidate_group_data(entity_submissions: list) -> list:
"""Consolida submissions di multiple entità per il gruppo."""
by_data_point = {}
for entity_subs in entity_submissions:
for sub in entity_subs:
dp_id = sub['data_point_id']
if dp_id not in by_data_point:
by_data_point[dp_id] = {**sub, "value_numeric": 0}
# Per emissioni e energia: somma (non media)
by_data_point[dp_id]["value_numeric"] += sub['value_numeric']
by_data_point[dp_id]["source_system"] = "CONSOLIDATED_GROUP"
return list(by_data_point.values())
# Risultati tipici per azienda manifatturiera 250 dip.
METALTECH_FY2025_RESULTS = {
"E1-6_GrossScope1": {
"value": 2_847, # tCO2eq - principalmente gas naturale forno fusione
"confidence": "HIGH",
"source": "SAP SFM - Gas meter readings"
},
"E1-6_GrossScope2LB": {
"value": 1_523, # tCO2eq - elettricita' rete
"confidence": "HIGH",
"source": "SAP SFM - Electricity bills"
},
"E1-6_GrossScope2MB": {
"value": 980, # tCO2eq - market-based con PPA solare
"confidence": "HIGH",
"source": "SAP SFM + GO certificates"
},
"E1-5_TotalEnergy": {
"value": 18_450, # MWh totali
"confidence": "HIGH",
"source": "SAP SFM - Meters + bills"
},
"E1-5_RenewableEnergy": {
"value": 6_200, # MWh (33.6% via PPA + impianto solare 500kWp)
"confidence": "MEDIUM", # Stima PPA
"source": "SAP SFM + PPA contract"
},
"GHG_Intensity": {
"value": 42.3, # tCO2eq / M EUR fatturato
"confidence": "HIGH",
"source": "Calculated: (S1+S2MB) / revenue"
}
}
Rezultate și lecții învățate
MetalTech FY2025: Rezultatele proiectului CSRD
- Timp de colectare a datelor: De la 6 săptămâni (manual) până la 4 zile (automat)
- Acoperire obligatorie ESRS: 94% din punctele de date obligatorii colectate și aprobate
- Calitatea datelor: Încredere ridicată pentru 78% din trimiteri (față de 45% anul precedent)
- Timp de audit: Redusă cu 60% datorită generației automate de date
- Costul asigurării externe: -25% pentru mai puțin efort de prelevare
- Constatare principală: Scop 3 cat. 1 (materiale din amonte) a fost subestimat cu 40% comparativ cu calculele manuale anterioare
Principalele dificultăți și cum să le depășești
Anti-modele de evitat
- Nu construi totul de la zero: Piața oferă soluții SaaS CSRD (Workiva, Watershed, Persefoni) pentru IMM-uri. Evaluați build-vs-buy: construirea internă are sens doar dacă aveți o complexitate ridicată a datelor sau nevoi unice de integrare.
- Nu întârziați evaluarea semnificației: Este o condiție prealabilă pentru orice. Fără el nu știi ce date să colectezi. Începeți imediat, chiar și cu o metodă simplificată.
- Nu ignora Scopul 3: Pentru multe companii producătoare, Scope 3 cat. 1 (materii prime) și cat. 11 (utilizarea produsului) depășește 70% din emisiile totale. Subraportarea Scopul 3 este cea mai comună greenwashing.
- Nu tratați XBRL ca pe un ultim gând: Etichetarea iXBRL necesită familiarizat cu taxonomia ESRS. Planificați cel puțin 2-3 luni de dezvoltare.
- Nu uitați de domeniul de consolidare: CSRD cere include toate filialele >50%. Excludeți entitățile pentru „simplitate” este un risc de conformitate.
API-ul GraphQL pentru interogări ESG avansate
Pentru tablourile de bord de analiză ESG și instrumentele interactive de raportare, un API GraphQL oferă flexibilitate superioară față de REST clasic, permițând interogări specifice cu agregare din mers:
# graphql_schema.py
# Schema GraphQL per ESG Analytics API con Strawberry
import strawberry
from strawberry.types import Info
from typing import Optional, List
from datetime import date
from decimal import Decimal
@strawberry.type
class GHGEmissionsSummary:
year: int
scope1_tco2eq: Decimal
scope2_lb_tco2eq: Decimal
scope2_mb_tco2eq: Decimal
scope3_tco2eq: Decimal
total_tco2eq: Decimal
ghg_intensity: Optional[Decimal]
yoy_change_pct: Optional[Decimal]
@strawberry.type
class EnergySummary:
year: int
total_mwh: Decimal
renewable_mwh: Decimal
non_renewable_mwh: Decimal
renewable_share_pct: Decimal
@strawberry.type
class ESGReportingStatus:
period_id: str
year: int
workflow_state: str
mandatory_coverage_pct: Decimal
approved_data_points: int
pending_review: int
missing_mandatory: int
@strawberry.type
class Query:
@strawberry.field
async def ghg_emissions(
self,
info: Info,
company_id: str,
years: List[int]
) -> List[GHGEmissionsSummary]:
"""Emissioni GHG per anni richiesti con calcolo YoY automatico."""
db = info.context["db_pool"]
async with db.acquire() as conn:
rows = await conn.fetch("""
SELECT
rp.year,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope1%'
THEN eds.value_numeric ELSE 0 END), 0) as scope1,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope2%LB%'
THEN eds.value_numeric ELSE 0 END), 0) as scope2_lb,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope2%MB%'
THEN eds.value_numeric ELSE 0 END), 0) as scope2_mb,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope3%'
THEN eds.value_numeric ELSE 0 END), 0) as scope3
FROM esg_data_submissions eds
JOIN reporting_periods rp ON rp.id = eds.reporting_period_id
JOIN esrs_data_points dp ON dp.id = eds.data_point_id
WHERE rp.company_id = $1
AND rp.year = ANY($2)
AND eds.status = 'APPROVED'
AND dp.esrs_standard = 'E1'
GROUP BY rp.year
ORDER BY rp.year
""", company_id, years)
results = []
prev_total = None
for row in rows:
total = row['scope1'] + row['scope2_mb'] + row['scope3']
yoy = None
if prev_total and prev_total > 0:
yoy = ((total - prev_total) / prev_total) * 100
results.append(GHGEmissionsSummary(
year=row['year'],
scope1_tco2eq=Decimal(str(row['scope1'])),
scope2_lb_tco2eq=Decimal(str(row['scope2_lb'])),
scope2_mb_tco2eq=Decimal(str(row['scope2_mb'])),
scope3_tco2eq=Decimal(str(row['scope3'])),
total_tco2eq=Decimal(str(total)),
ghg_intensity=None,
yoy_change_pct=Decimal(str(yoy)) if yoy else None
))
prev_total = total
return results
@strawberry.field
async def reporting_status(
self,
info: Info,
company_id: str,
year: int
) -> Optional[ESGReportingStatus]:
"""Stato del processo di reporting per un anno."""
db = info.context["db_pool"]
async with db.acquire() as conn:
period = await conn.fetchrow("""
SELECT rp.id, rp.year, rp.status,
COUNT(eds.id) FILTER (WHERE eds.status = 'APPROVED') as approved,
COUNT(eds.id) FILTER (WHERE eds.status IN ('SUBMITTED','REVIEWING')) as pending,
(SELECT COUNT(*) FROM esrs_data_points WHERE is_mandatory = TRUE) as mandatory_total
FROM reporting_periods rp
LEFT JOIN esg_data_submissions eds ON eds.reporting_period_id = rp.id
WHERE rp.company_id = $1 AND rp.year = $2
GROUP BY rp.id, rp.year, rp.status
""", company_id, year)
if not period:
return None
mandatory_total = period['mandatory_total'] or 1
approved = period['approved'] or 0
coverage = (approved / mandatory_total) * 100
return ESGReportingStatus(
period_id=str(period['id']),
year=period['year'],
workflow_state=period['status'],
mandatory_coverage_pct=Decimal(str(round(coverage, 1))),
approved_data_points=approved,
pending_review=period['pending'] or 0,
missing_mandatory=max(0, mandatory_total - approved)
)
schema = strawberry.Schema(query=Query)
# Query di esempio per il frontend
EXAMPLE_QUERY = """
query ESGDashboard($companyId: String!, $years: [Int!]!) {
ghgEmissions(companyId: $companyId, years: $years) {
year
scope1Tco2eq
scope2MbTco2eq
scope3Tco2eq
totalTco2eq
yoyChangePct
}
reportingStatus(companyId: $companyId, year: 2025) {
workflowState
mandatoryCoveragePct
approvedDataPoints
missingMandatory
}
}
"""
Considerații privind Legea AI și intersecția cu CSRD
Un aspect emergent în 2025-2026 este intersecția dintre CSRD și AI Act EU. Sistemele IA folosită pentru a calcula emisiile, a face proiecții climatice sau pentru a automatiza raportarea ESG poate intra în categoriile AI cu risc ridicat din Legea AI, care necesită:
- Documentatie tehnica a sistemului AI utilizat pentru estimările emisiilor (transparența metodologică ESRS)
- Supravegherea umană asupra deciziilor automate de semnificație
- Piste de audit de predicții AI (acoperite deja de descendența noastră de date)
- Nediscriminare în sistemele AI HR utilizate pentru metricile ESRS S1
Cronologie AI Act x CSRD
- februarie 2025: Actul AI în vigoare - interzice practicile interzise
- august 2025: Obligații GPAI (General Purpose AI) - transparența modelelor de fundație AI
- august 2026: Obligații cuprinzătoare AI cu risc ridicat - include sisteme AI pentru decizii de HR și credit
- august 2027: Aplicație completă AI Act pentru toate cazurile de utilizare
Echipele care construiesc sisteme ESG/CSRD trebuie deja să planifice pentru conformitate AI Act pentru componentele ML ale stivei lor: modele de estimare a emisiilor Scope 3, detectarea anomaliilor pe datele ESG, NLP pentru evaluarea furnizorilor.
Concluzii și pașii următori
Construirea unui sistem compatibil cu CSRD nu este doar un exercițiu de conformitate: este o oportunitate de a crea o arhitectură de date de sustenabilitate care va genera valoare de ani de zile. Componentele pe care le-am văzut — modelul de date ESRS, API-ul de colectare cu validare, flux de lucru FSM, integrare ERP, generator XBRL, generație de date - toate sunt reutilizabile și modulare.
Puncte cheie de luat acasă:
- Începeți de la evaluarea semnificației: fără dublă materialitate nu știi ce aduna. Este un proces care necesită implicarea reală a părților interesate și nu numai algoritmi.
- Automatizați colectarea ERP: 60-70% din datele ESRS E1 și S1 există deja în sistemele dvs. Conectorii OData pentru SAP și Oracle sunt investiții care se recuperează în săptămâni.
- Investește acum în descendența datelor: costul asigurării externe depinde direct de trasabilitatea datelor. Un sistem bun de urmărire de audit reduce costul auditului cu 20-30%.
- Nu ignora Pachetul Omnibus: simplificările propuse în 2025 ar putea ușura unele obligații pentru IMM-uri. Monitorizează procesul legislativ, dar nu planificați-le înainte de aprobarea finală.
În următorul articol al seriei - Amprenta de carbon AI — vom explora cum să măsuram și să reducem impactul asupra mediului al modelelor AI: de la formare de modele de limbaj mari la inferențe în producție, cu instrumente precum CodeCarbon, Impactul ML CO2 și strategii pentru inginerie durabilă ML.
Resurse pentru a afla mai multe
- Taxonomia EFRAG ESRS: efrag.org - Taxonomie ESRS XBRL
- Text oficial CSRD: EUR-Lex - Directiva 2022/2464/UE
- Standard corporativ al protocolului GHG: ghgprotocol.org
- SAP Sustainability Footprint Management: sap.com/sustainability
- Biblioteci Python XBRL: arelle (sursă deschisă), python-xbrl
- Instrumente CSRD SaaS: Workiva, Watershed, Persefoni, Sweep, Greenly
Seria Green Software Engineering - Navigație
Articolul precedent: GreenOps: DevOps sustenabil — CI/CD verde, dimensionarea dreptului în cloud, FinOps și metrici de sustenabilitate pentru echipele DevOps.
Articolul următor: Amprenta de carbon AI — Măsurați și reduceți emisiile de modele AI în formare și inferență, cu instrumente practice și repere de sector.
Această serie face parte din călătorie Afaceri cu date și inteligență artificială pe federicocalo.dev, care include și serialul MLOps pentru afaceri (ID-uri 306-315) pentru cei care doresc să se adâncească în masă în producția de modele AI cu atenție la eficiența computațională și guvernare.







