ESG Raporlama API'si: CSRD İş Akışı ile Entegrasyon
Il 1 Ocak 2024 Avrupalı şirketler için çığır açan bir dönüm noktası oldu: Kurumsal Sürdürülebilirlik Raporlama Direktifi (CSRD) yürürlüğe girerek ÇSY raporlamasını dönüştürüyor Gönüllü uygulamadan kesin teknik standartlarla yasal yükümlülüğe kadar. Konu şirketlerinin yapması gerekenler artık uyumlu raporlar üretin Avrupa Sürdürülebilirlik Raporlama Standartları (ESRS), doğrulanabilir verilerle, XBRL formatında etiketlenmiş ve dış güvenceye tabi tutulmuştur.
Teknik ekiplerin karşılaştığı zorluk somuttur: düzinelerce şirket sisteminden ESG verileri toplamak (ERP, enerji yönetimi, İK, lojistik), bunları ESRS sınıflandırmalarına göre toplayın, ölçümleri hesaplayın Kapsam 1-3 emisyonları ve su yoğunluğu gibi onay sürecini yönetmek çok düzeylidir ve düzenleyiciye hazır iXBRL formatında çıktı üretir. Bütün bunlar gerektirir bir Özel API mimarisi, paylaşılan Excel sayfaları değil.
Bu makalede ESRS uyumlu veri modelinden REST API'ye kadar eksiksiz bir sistem oluşturacağız. veri toplama ve doğrulamadan, birleştirme ve onaylama iş akışına, üretime kadar XBRL çıktısının. Arka uç için Python/FastAPI'yi, yapılandırılmış depolama için PostgreSQL'i kullanacağız. SAP Sustainability ve Oracle ESG'nin yukarı akış veri kaynakları olarak nasıl entegre edileceğini analiz edeceğiz.
Ne Öğreneceksiniz
- CSRD zaman çizelgesi ve uygulama çevresi: Kim neyi, ne zaman yapmalı?
- ESRS E1-E5, S1-S4, G1: Standartların yapısı ve zorunlu ölçümler
- Çift önemlilik değerlendirmesi: etki + finansal algoritmik uygulama
- ESRS uyumlu ESG veri toplama için PostgreSQL veri modeli
- ESG verilerinin toplanması, doğrulanması ve toplanması için REST/GraphQL API'si
- Çok düzeyli onay ve birleştirme için iş akışı otomasyonu
- Python ile XBRL/iXBRL etiketleme: dijital raporun otomatik oluşturulması
- ERP entegrasyonu: SAP Sürdürülebilirlik Ayak İzi Yönetimi, Oracle Fusion ESG
- Dış güvence için veri kökeni ve denetim takibi
- Tam örnek olay incelemesi: 250 çalışana sahip orta ölçekli imalat şirketi
Yeşil Yazılım Mühendisliği Serisi
Bu makale, Yeşil Yazılım Mühendisliği hakkındaki kapsamlı serinin sekizincisidir. Kural emisyonlarının ölçülmesinden düzenleyici ÇSY raporlamasına kadar:
| # | Öğe | Odak |
|---|---|---|
| 1 | Yeşil Yazılım Mühendisliği Prensipleri | GSF, SCI spesifikasyonu, 8 temel prensip |
| 2 | CodeCarbon: Kod Emisyonlarını Ölçme | Python kitaplığı, kontrol paneli, CI/CD entegrasyonu |
| 3 | Climatiq API: Arka Uçta Sera Gazı Hesaplamaları | REST API, Kapsam 1-3, FastAPI entegrasyonu |
| 4 | Karbon Bilinçli SDK | İş yükü değişimi, şebeke yoğunluğu, zaman değişimi |
| 5 | Kapsam 3 Boru Hattı | Değer zinciri emisyonları, tedarikçiler, LCA |
| 6 | Yeşil Mimari Desenleri | Sunucusuz, olay odaklı, sürdürülebilir önbelleğe alma |
| 7 | GreenOps: Sürdürülebilir DevOps | Yeşil CI/CD, bulutta doğru boyutlandırma, FinOps |
| 8 | ESG Raporlama API'si: CSRD İş Akışı (bu makale) | ESRS, XBRL, önemlilik, ERP entegrasyonu |
| 9 | Yapay Zeka Karbon Ayak İzi | Yüksek Lisans eğitimi/çıkarımı, sürdürülebilir makine öğrenimi |
| 10 | Gelişmiş Kapsam Modelleme | Sera Gazı Protokolü metodolojileri, belirli sektörler |
CSRD: Sürdürülebilirlik Raporlamasının Yeni Paradigması
CSRD (Direktif 2022/2464/EU), Finansal Olmayan Raporlama Direktifinin (NFRD) yerini almıştır. uygulama kapsamını önemli ölçüde genişletiyor ve raporların gerekli kalitesini yükseltiyor. NFRD yaklaşık iken 11.000 şirket Avrupa ülkeleri, CSRD onları içeriyor 50.000+ilk kez listelenen KOBİ'ler ve Avrupa'daki bağlı ortaklıkları da dahil olmak üzere AB dışı gruplar
Başvuru Zaman Çizelgesi
| Tarih | Konular | İlk Rapor |
|---|---|---|
| OCAK 2024 (2024 Mali Yılı) | Halihazırda NFRD'ye tabi olan büyük şirketler (>500 çalışan) | 2025 |
| OCAK 2025 (2025 Mali Yılı) | Büyük AB şirketleri (>250 çalışan VEYA >40 milyon Avro ciro VEYA >20 milyon varlık) | 2026 |
| OCAK 2026 (2026 Mali Yılı) | Düzenlenmiş piyasalarda listelenen KOBİ'ler (10-250 çalışan) | 2027 |
| OCAK 2028 (2028 Mali Yılı) | AB dışı grupların AB yan kuruluşları (>150 milyon Avro AB cirosu) | 2029 |
Çok Amaçlı Basitleştirme 2025
Şubat 2025'te Avrupa Komisyonu, basitleştirmeler öneren Omnibus Paketini yayınladı. CSRD için önemli: KOBİ kapsamının daraltılması, 2. dalga şirketler için 2 yıl erteleme ve 3 ve bazı ESRS veri noktalarının basitleştirilmesi. Fakat, büyük dalga 1 şirketleri kalıyor tamamen konu. Değişiklikler henüz yasama sürecinde bu makalenin yayınlanması. Mevzuatın güncel durumunu daima kontrol edin.
ESRS'nin yapısı
Avrupa Sürdürülebilirlik Raporlama Standartları üç tematik alanda düzenlenmiştir: artı iki standart çapraz kesim:
| Standart | Alan | Konular | Zorunlu |
|---|---|---|---|
| ESRS1 | Çapraz kesme | Genel gereksinimler ve ilkeler | Si |
| ESRS2 | Çapraz kesme | Genel bilgiler (yönetişim, strateji, önemlilik) | Si |
| ESRS E1 | Çevre | İklim değişikliği (GHG, enerji, TCFD) | Eğer maddi |
| ESRS E2 | Çevre | Kirlilik (hava, su, toprak, maddeler) | Eğer maddi |
| ESRS E3 | Çevre | Su ve deniz kaynakları | Eğer maddi |
| ESRS E4 | Çevre | Biyoçeşitlilik ve ekosistemler | Eğer maddi |
| ESRS E5 | Çevre | Kaynak kullanımı ve döngüsel ekonomi | Eğer maddi |
| ESRS S1 | Sosyal | Kendi iş gücü (çalışma koşulları, D&I, sağlık) | Eğer maddi |
| ESRS S2 | Sosyal | Değer zincirindeki işçiler | Eğer maddi |
| ESRS S3 | Sosyal | Etkilenen topluluklar | Eğer maddi |
| ESRS S4 | Sosyal | Tüketiciler ve son kullanıcılar | Eğer maddi |
| ESRS G1 | Yönetişim | İş davranışı (etik, yolsuzlukla mücadele, lobicilik) | Eğer maddi |
ESRS E1 İklimi: Toplamanız Gereken Metrikler
ESRS E1 genellikle veri toplama açısından en pahalı konudur. Açıklama gerektirir Sera gazı emisyonları (Kapsam 1, 2, 3), bilime uygun azaltım hedefleri (SBTi), enerji kaynağa göre tüketilenler ve TCFD'ye göre iklim riskleri/fırsatları. bakalım zorunlu veri noktaları en alakalı:
Sera Gazı Emisyonları - Zorunlu Ölçümler ESRS E1-6
| Veri Noktaları | Birim | Tanım |
|---|---|---|
| Brüt Kapsam 1 sera gazı emisyonları | tCO2eşd | Sahip olunan veya kontrol edilen kaynaklardan doğrudan emisyonlar |
| Brüt Kapsam 2 sera gazı emisyonları (lokasyon bazlı) | tCO2eşd | Enerji alımından kaynaklanan dolaylı emisyonlar, konum yöntemi |
| Brüt Kapsam 2 sera gazı emisyonları (piyasa bazlı) | tCO2eşd | Dolaylı emisyonlar, piyasa yöntemi (sertifikalar) |
| Brüt Kapsam 3 sera gazı emisyonları (15 kategori) | tCO2eşd | Değer zincirindeki dolaylı emisyonlar |
| Sera gazı yoğunluğu (gelir) | tCO2eq / M EUR | Birim ciro başına yoğunluk |
| Toplam enerji tüketimi | MWh | Toplam enerji tüketimi |
| Yenilenebilir enerji payı | % | Yenilenebilir kaynaklardan elde edilen enerjinin yüzdesi |
| Sera gazının uzaklaştırılması ve depolanması | tCO2eşd | Ağaçlandırma, CCS vb. yoluyla karbon giderimi |
Çifte Önemlilik Değerlendirmesi: Algoritmik Uygulama
CSRD'nin metodolojik kalbi, çifte önemlilik: her şirket Eş zamanlı iki bakış açısını dikkate alarak hangi ÇSY konularının öncelikli olduğunu değerlendirmelidir:
- Etki önemliliği: Şirketin önemli etkilerinin (olumlu veya olumsuz, o konu için insanlar ve çevre hakkında mevcut mu yoksa potansiyel mi?
- Finansal önemlilik: Bu ESG konusu riskler veya fırsatlar yaratıyor Şirket için önemli finansal varlıklar (kısa, orta, uzun vadeli)?
Bir konu, iki kriterden en az birini karşılıyorsa önceliklidir. Bakalım nasıl uygulanacaktır özel bir API ile bu işlem:
# materiality_assessment.py
# Implementazione Double Materiality Assessment per CSRD ESRS 2-IRO
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
import uuid
from datetime import datetime
class MaterialityDimension(str, Enum):
IMPACT = "impact"
FINANCIAL = "financial"
class ImpactType(str, Enum):
ACTUAL_NEGATIVE = "actual_negative"
ACTUAL_POSITIVE = "actual_positive"
POTENTIAL_NEGATIVE = "potential_negative"
POTENTIAL_POSITIVE = "potential_positive"
class TimeHorizon(str, Enum):
SHORT_TERM = "short_term" # 0-1 anno
MEDIUM_TERM = "medium_term" # 1-5 anni
LONG_TERM = "long_term" # oltre 5 anni
@dataclass
class ImpactMaterialityScore:
"""
Score impact materiality per un topic ESG.
Basato su: severity x likelihood (per impatti negativi)
o scale x likelihood (per impatti positivi)
"""
topic_id: str
impact_type: ImpactType
# Score 1-5 per ogni dimensione
scale: int # Quante persone/ecosistemi impattati
scope: int # Reversibilita' dell'impatto
irremediable: int # Difficolta' di rimediare
likelihood: int # Probabilità' che accada
@property
def severity(self) -> float:
"""Severity = media pesata di scale, scope, irremediable"""
return (self.scale * 0.4 + self.scope * 0.3 + self.irremediable * 0.3)
@property
def score(self) -> float:
"""Score finale 1-25 per ordinamento"""
return self.severity * self.likelihood
@property
def is_material(self) -> bool:
"""Soglia materialita': score >= 9 (severity >= 3, likelihood >= 3)"""
return self.score >= 9.0
@dataclass
class FinancialMaterialityScore:
"""
Score financial materiality per un topic ESG.
Basato su: magnitude x likelihood, per time horizon
"""
topic_id: str
is_risk: bool # True = rischio, False = opportunità'
magnitude: int # 1-5: impatto finanziario potenziale
likelihood: int # 1-5: probabilità'
time_horizon: TimeHorizon
# Fattori qualitativi
quantifiable: bool # Possiamo quantificarlo in EUR?
estimated_impact_eur: Optional[float] = None
@property
def score(self) -> float:
"""Score base con discount per time horizon"""
base = self.magnitude * self.likelihood
# Discount: short=1.0, medium=0.8, long=0.6
discount = {
TimeHorizon.SHORT_TERM: 1.0,
TimeHorizon.MEDIUM_TERM: 0.8,
TimeHorizon.LONG_TERM: 0.6
}[self.time_horizon]
return base * discount
@property
def is_material(self) -> bool:
return self.score >= 7.5 # magnitude >= 3, likelihood >= 3 con discount short
@dataclass
class TopicMaterialityResult:
topic_id: str
topic_name: str
esrs_standard: str
impact_scores: list[ImpactMaterialityScore] = field(default_factory=list)
financial_scores: list[FinancialMaterialityScore] = field(default_factory=list)
@property
def impact_material(self) -> bool:
return any(s.is_material for s in self.impact_scores)
@property
def financial_material(self) -> bool:
return any(s.is_material for s in self.financial_scores)
@property
def is_material(self) -> bool:
"""Double materiality: materiale se almeno una dimensione e' materiale"""
return self.impact_material or self.financial_material
@property
def materiality_level(self) -> str:
if self.impact_material and self.financial_material:
return "DOUBLE_MATERIAL"
elif self.impact_material:
return "IMPACT_MATERIAL"
elif self.financial_material:
return "FINANCIAL_MATERIAL"
else:
return "NOT_MATERIAL"
def run_materiality_assessment(
company_id: str,
topics: list[dict]
) -> list[TopicMaterialityResult]:
"""
Esegue il materiality assessment completo.
Input: lista topic con scores raccolti via stakeholder engagement.
Output: ranking per materialita'.
"""
results = []
for topic_data in topics:
result = TopicMaterialityResult(
topic_id=topic_data["topic_id"],
topic_name=topic_data["topic_name"],
esrs_standard=topic_data["esrs_standard"]
)
# Aggiungi impact scores
for impact in topic_data.get("impact_scores", []):
result.impact_scores.append(ImpactMaterialityScore(**impact))
# Aggiungi financial scores
for financial in topic_data.get("financial_scores", []):
result.financial_scores.append(FinancialMaterialityScore(**financial))
results.append(result)
# Ordina per materialita' discendente
return sorted(
results,
key=lambda r: (
r.is_material,
r.impact_material and r.financial_material,
max((s.score for s in r.impact_scores), default=0) +
max((s.score for s in r.financial_scores), default=0)
),
reverse=True
)
CSRD için PostgreSQL Veri Modeli
CSRD uyumlu bir sistem, ESRS'nin yapısını yansıtan bir veritabanı şeması gerektirir. Veri kökenini destekleyin ve denetim takibi için değişiklik geçmişini koruyun. İşte burada çekirdek şeması:
-- Schema PostgreSQL per CSRD ESG Data Collection
-- Ottimizzato per ESRS data points con audit trail completo
-- Tabella aziende e perimetro di consolidamento
CREATE TABLE companies (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
legal_name VARCHAR(255) NOT NULL,
lei_code VARCHAR(20), -- Legal Entity Identifier
country_code CHAR(2) NOT NULL,
nace_code VARCHAR(10), -- Classificazione attivita'
fiscal_year_end DATE,
employees_fte DECIMAL(10,2),
revenue_eur DECIMAL(20,2),
total_assets_eur DECIMAL(20,2),
is_parent BOOLEAN DEFAULT FALSE,
parent_id UUID REFERENCES companies(id),
csrd_wave INTEGER, -- 1, 2, 3, 4
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Periodi di reporting
CREATE TABLE reporting_periods (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id UUID NOT NULL REFERENCES companies(id),
year INTEGER NOT NULL,
period_type VARCHAR(20) DEFAULT 'ANNUAL',
start_date DATE NOT NULL,
end_date DATE NOT NULL,
status VARCHAR(30) DEFAULT 'DRAFT',
-- DRAFT, COLLECTING, REVIEWING, APPROVED, FILED
submitted_at TIMESTAMPTZ,
approved_by UUID,
UNIQUE (company_id, year, period_type)
);
-- Catalog ESRS data points (reference table)
CREATE TABLE esrs_data_points (
id VARCHAR(50) PRIMARY KEY,
-- es. "E1-6_GrossScope1", "S1-7_GenderPayGap"
esrs_standard VARCHAR(10) NOT NULL, -- E1, E2, S1, G1...
topic VARCHAR(100) NOT NULL,
sub_topic VARCHAR(100),
name VARCHAR(255) NOT NULL,
description TEXT,
unit_of_measure VARCHAR(50),
data_type VARCHAR(20), -- NUMERIC, BOOLEAN, TEXT, DATE, ENUM
is_mandatory BOOLEAN DEFAULT FALSE,
is_phase_in BOOLEAN DEFAULT FALSE, -- Permesso rinvio anni 1-3
ghg_protocol_scope VARCHAR(10), -- S1, S2_LB, S2_MB, S3
scope3_category INTEGER, -- 1-15 per Scope 3
tags TEXT[]
);
-- Raccolta dati ESG per periodo
CREATE TABLE esg_data_submissions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id UUID NOT NULL REFERENCES companies(id),
reporting_period_id UUID NOT NULL REFERENCES reporting_periods(id),
data_point_id VARCHAR(50) NOT NULL REFERENCES esrs_data_points(id),
-- Valori (uno solo populated in base a data_type)
value_numeric DECIMAL(30, 6),
value_boolean BOOLEAN,
value_text TEXT,
value_date DATE,
-- Metadata qualità'
unit_of_measure VARCHAR(50),
estimation_method VARCHAR(100),
confidence_level VARCHAR(20), -- HIGH, MEDIUM, LOW
is_estimated BOOLEAN DEFAULT FALSE,
-- Data lineage
source_system VARCHAR(100), -- SAP, Oracle, Manual, API
source_document_ref VARCHAR(255),
collected_by UUID,
collection_timestamp TIMESTAMPTZ DEFAULT NOW(),
-- Workflow
status VARCHAR(30) DEFAULT 'DRAFT',
-- DRAFT, SUBMITTED, REVIEWED, APPROVED, REJECTED
reviewed_by UUID,
review_timestamp TIMESTAMPTZ,
review_comment TEXT,
approved_by UUID,
approval_timestamp TIMESTAMPTZ,
-- Versioning per audit
version INTEGER DEFAULT 1,
previous_version_id UUID REFERENCES esg_data_submissions(id),
CONSTRAINT unique_submission UNIQUE (
company_id, reporting_period_id, data_point_id, version
)
);
-- Indici per performance
CREATE INDEX idx_submissions_period ON esg_data_submissions(reporting_period_id);
CREATE INDEX idx_submissions_status ON esg_data_submissions(status);
CREATE INDEX idx_submissions_datapoint ON esg_data_submissions(data_point_id);
-- Audit log immutabile
CREATE TABLE audit_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
entity_type VARCHAR(50) NOT NULL,
entity_id UUID NOT NULL,
action VARCHAR(20) NOT NULL, -- CREATE, UPDATE, DELETE, APPROVE
actor_id UUID NOT NULL,
actor_email VARCHAR(255),
timestamp TIMESTAMPTZ DEFAULT NOW(),
ip_address INET,
old_value JSONB,
new_value JSONB,
change_reason TEXT
);
-- View aggregata emissioni GHG per periodo
CREATE OR REPLACE VIEW v_ghg_emissions_summary AS
SELECT
c.legal_name,
rp.year,
SUM(CASE WHEN dp.id LIKE 'E1%Scope1%' THEN eds.value_numeric ELSE 0 END) AS scope1_tco2eq,
SUM(CASE WHEN dp.id LIKE 'E1%Scope2%LB%' THEN eds.value_numeric ELSE 0 END) AS scope2_lb_tco2eq,
SUM(CASE WHEN dp.id LIKE 'E1%Scope2%MB%' THEN eds.value_numeric ELSE 0 END) AS scope2_mb_tco2eq,
SUM(CASE WHEN dp.id LIKE 'E1%Scope3%' THEN eds.value_numeric ELSE 0 END) AS scope3_tco2eq,
SUM(CASE WHEN dp.id = 'E1-5_TotalEnergy' THEN eds.value_numeric ELSE 0 END) AS total_energy_mwh,
SUM(CASE WHEN dp.id = 'E1-5_RenewableEnergy' THEN eds.value_numeric ELSE 0 END) AS renewable_mwh
FROM esg_data_submissions eds
JOIN companies c ON c.id = eds.company_id
JOIN reporting_periods rp ON rp.id = eds.reporting_period_id
JOIN esrs_data_points dp ON dp.id = eds.data_point_id
WHERE eds.status = 'APPROVED'
GROUP BY c.legal_name, rp.year;
API Mimarisi: ESG Veri Toplama ve Doğrulama
CSRD sisteminin API mimarisi üç ana katmana ayrılmıştır: iş birimleri ve kaynak sistemleri, ESRS kurallarına göre doğrulama ve zenginleştirme, e grup konsolidasyonu için toplama. FastAPI ile arka ucu uygulayalım:
# main.py - FastAPI ESG Reporting API
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, validator
from typing import Optional, List
from uuid import UUID
from datetime import datetime, date
from decimal import Decimal
import asyncpg
import json
app = FastAPI(
title="CSRD ESG Reporting API",
description="API per raccolta e gestione dati ESG conformi ESRS",
version="1.0.0"
)
# === PYDANTIC MODELS ===
class ESGDataPointSubmission(BaseModel):
data_point_id: str = Field(..., description="ID ESRS data point (es. E1-6_GrossScope1)")
value_numeric: Optional[Decimal] = None
value_boolean: Optional[bool] = None
value_text: Optional[str] = None
value_date: Optional[date] = None
unit_of_measure: Optional[str] = None
estimation_method: Optional[str] = None
confidence_level: str = Field(default="MEDIUM", pattern="^(HIGH|MEDIUM|LOW)$")
is_estimated: bool = False
source_system: Optional[str] = None
source_document_ref: Optional[str] = None
@validator('value_numeric')
def validate_numeric(cls, v, values):
if v is not None and v < 0:
raise ValueError('Valori numerici GHG non possono essere negativi')
return v
class BulkESGSubmission(BaseModel):
reporting_period_id: UUID
submissions: List[ESGDataPointSubmission]
submission_note: Optional[str] = None
class DataPointValidationResult(BaseModel):
data_point_id: str
is_valid: bool
errors: List[str] = []
warnings: List[str] = []
class ValidationReport(BaseModel):
period_id: UUID
total_mandatory_points: int
submitted_points: int
approved_points: int
coverage_pct: float
validation_results: List[DataPointValidationResult]
overall_valid: bool
# === VALIDATION SERVICE ===
class ESRSValidationService:
"""
Validazione regole ESRS per data points.
Implementa controlli incrociati richiesti dallo standard.
"""
CONSISTENCY_RULES = [
{
"rule": "SCOPE1_PLUS_SCOPE2_CONSISTENCY",
"description": "Total GHG = Scope1 + Scope2 (market) deve essere coerente",
"severity": "ERROR"
},
{
"rule": "ENERGY_GHG_RATIO",
"description": "Intensità' GHG su energia deve essere plausibile",
"severity": "WARNING"
},
{
"rule": "RENEWABLE_WITHIN_TOTAL",
"description": "Energia rinnovabile non può' superare totale energia",
"severity": "ERROR"
},
{
"rule": "YOY_VARIANCE_THRESHOLD",
"description": "Variazione YoY > 30% richiede commento",
"severity": "WARNING"
}
]
async def validate_submission(
self,
submissions: List[ESGDataPointSubmission],
period_id: UUID,
db_pool
) -> ValidationReport:
errors_by_point = {}
warnings_by_point = {}
# 1. Validazione singoli data point
for sub in submissions:
errors = []
warnings = []
# Carica definizione data point da catalogo
dp_def = await self._get_data_point_def(sub.data_point_id, db_pool)
if not dp_def:
errors.append(f"Data point {sub.data_point_id} non trovato nel catalogo ESRS")
else:
# Verifica unita' di misura
if dp_def['unit_of_measure'] and sub.unit_of_measure:
if sub.unit_of_measure != dp_def['unit_of_measure']:
warnings.append(
f"Unit of measure {sub.unit_of_measure} diversa da "
f"attesa {dp_def['unit_of_measure']}"
)
# Verifica tipo dato
if dp_def['data_type'] == 'NUMERIC' and sub.value_numeric is None:
errors.append(f"Data point numerico richiede value_numeric")
errors_by_point[sub.data_point_id] = errors
warnings_by_point[sub.data_point_id] = warnings
# 2. Controlli incrociati
await self._run_cross_checks(submissions, errors_by_point, warnings_by_point)
# 3. Calcola copertura mandatory
mandatory_coverage = await self._calc_mandatory_coverage(period_id, submissions, db_pool)
results = [
DataPointValidationResult(
data_point_id=dp_id,
is_valid=len(errs) == 0,
errors=errs,
warnings=warnings_by_point.get(dp_id, [])
)
for dp_id, errs in errors_by_point.items()
]
overall_valid = all(r.is_valid for r in results)
return ValidationReport(
period_id=period_id,
total_mandatory_points=mandatory_coverage['total'],
submitted_points=mandatory_coverage['submitted'],
approved_points=mandatory_coverage['approved'],
coverage_pct=mandatory_coverage['coverage_pct'],
validation_results=results,
overall_valid=overall_valid
)
async def _run_cross_checks(self, submissions, errors_by_point, warnings_by_point):
"""Controlli incrociati tra data points correlati."""
values = {s.data_point_id: s.value_numeric for s in submissions if s.value_numeric}
# Energia rinnovabile <= totale energia
renewable = values.get('E1-5_RenewableEnergy')
total_energy = values.get('E1-5_TotalEnergy')
if renewable and total_energy and renewable > total_energy:
errors_by_point.setdefault('E1-5_RenewableEnergy', []).append(
"Energia rinnovabile non può' superare energia totale"
)
async def _get_data_point_def(self, dp_id: str, db_pool):
async with db_pool.acquire() as conn:
return await conn.fetchrow(
"SELECT * FROM esrs_data_points WHERE id = $1", dp_id
)
async def _calc_mandatory_coverage(self, period_id, submissions, db_pool):
async with db_pool.acquire() as conn:
total = await conn.fetchval(
"SELECT COUNT(*) FROM esrs_data_points WHERE is_mandatory = TRUE"
)
submitted = len([s for s in submissions if s.value_numeric is not None
or s.value_boolean is not None or s.value_text is not None])
return {
'total': total or 0,
'submitted': submitted,
'approved': 0,
'coverage_pct': (submitted / total * 100) if total else 0
}
# === ENDPOINTS ===
@app.post("/api/v1/periods/{period_id}/submissions/bulk")
async def bulk_submit_esg_data(
period_id: UUID,
payload: BulkESGSubmission,
background_tasks: BackgroundTasks,
db_pool = Depends(get_db_pool)
):
"""
Submission bulk di dati ESG per un periodo.
Accetta fino a 500 data points per chiamata.
"""
if len(payload.submissions) > 500:
raise HTTPException(400, "Max 500 data points per richiesta")
validator = ESRSValidationService()
validation_report = await validator.validate_submission(
payload.submissions, period_id, db_pool
)
if not validation_report.overall_valid:
raise HTTPException(422, {
"message": "Validation failed",
"validation_report": validation_report.dict()
})
# Salva in batch
async with db_pool.acquire() as conn:
async with conn.transaction():
for sub in payload.submissions:
await conn.execute("""
INSERT INTO esg_data_submissions
(company_id, reporting_period_id, data_point_id,
value_numeric, value_boolean, value_text,
unit_of_measure, estimation_method, confidence_level,
is_estimated, source_system, source_document_ref,
status)
SELECT
rp.company_id, $1, $2,
$3, $4, $5,
$6, $7, $8,
$9, $10, $11,
'SUBMITTED'
FROM reporting_periods rp WHERE rp.id = $1
ON CONFLICT (company_id, reporting_period_id, data_point_id, version)
DO UPDATE SET
value_numeric = EXCLUDED.value_numeric,
status = 'SUBMITTED',
collection_timestamp = NOW()
""",
period_id,
str(sub.data_point_id),
sub.value_numeric,
sub.value_boolean,
sub.value_text,
sub.unit_of_measure,
sub.estimation_method,
sub.confidence_level,
sub.is_estimated,
sub.source_system,
sub.source_document_ref
)
# Trigger notifica al reviewer in background
background_tasks.add_task(notify_reviewers, period_id, len(payload.submissions))
return {
"status": "submitted",
"period_id": str(period_id),
"submitted_count": len(payload.submissions),
"validation_report": validation_report.dict()
}
@app.get("/api/v1/periods/{period_id}/validation-report")
async def get_validation_report(
period_id: UUID,
db_pool = Depends(get_db_pool)
):
"""Restituisce report di completezza e validità per un periodo."""
async with db_pool.acquire() as conn:
# Aggregazione stato submissions per mandatory data points
rows = await conn.fetch("""
SELECT
dp.id as data_point_id,
dp.name,
dp.is_mandatory,
dp.unit_of_measure,
eds.status,
eds.value_numeric,
eds.confidence_level
FROM esrs_data_points dp
LEFT JOIN esg_data_submissions eds ON
eds.data_point_id = dp.id
AND eds.reporting_period_id = $1
WHERE dp.is_mandatory = TRUE
ORDER BY dp.esrs_standard, dp.id
""", period_id)
missing_mandatory = [r for r in rows if r['status'] is None]
approved = [r for r in rows if r['status'] == 'APPROVED']
return {
"period_id": str(period_id),
"total_mandatory": len(rows),
"approved": len(approved),
"missing": len(missing_mandatory),
"coverage_pct": round(len(approved) / len(rows) * 100, 1) if rows else 0,
"missing_data_points": [
{"id": r['data_point_id'], "name": r['name']}
for r in missing_mandatory
]
}
async def notify_reviewers(period_id: UUID, count: int):
"""Background task: notifica reviewer via email/webhook."""
print(f"[NOTIFY] Period {period_id}: {count} data points submitted for review")
async def get_db_pool():
"""Dependency injection per asyncpg pool."""
# In produzione: pool globale inizializzato al startup
pool = await asyncpg.create_pool(
"postgresql://user:password@localhost/esg_db",
min_size=5,
max_size=20
)
try:
yield pool
finally:
await pool.close()
İş Akışı Otomasyonu: Çok Düzeyli Onay
Kurumsal bir CSRD sistemi, veri toplamayı yöneten yapılandırılmış bir iş akışı gerektirir. birden fazla iş birimi, inceleme ve onay döngüleri ve nihai konsolidasyon. İşte iş akışı motorunun uygulaması:
# workflow_engine.py
# Workflow CSRD multi-livello: collect -> review -> approve -> file
from enum import Enum
from dataclasses import dataclass
from typing import Callable, Optional
import asyncio
class WorkflowState(str, Enum):
DRAFT = "DRAFT"
DATA_COLLECTION = "DATA_COLLECTION"
INTERNAL_REVIEW = "INTERNAL_REVIEW"
CFO_APPROVAL = "CFO_APPROVAL"
EXTERNAL_ASSURANCE = "EXTERNAL_ASSURANCE"
BOARD_APPROVAL = "BOARD_APPROVAL"
FILED = "FILED"
REJECTED = "REJECTED"
class WorkflowTransition:
"""Definisce una transizione di stato valida con condizioni e azioni."""
def __init__(
self,
from_state: WorkflowState,
to_state: WorkflowState,
action: str,
condition: Optional[Callable] = None,
pre_actions: list = None,
post_actions: list = None
):
self.from_state = from_state
self.to_state = to_state
self.action = action
self.condition = condition
self.pre_actions = pre_actions or []
self.post_actions = post_actions or []
# Definizione FSM per workflow CSRD
CSRD_WORKFLOW_TRANSITIONS = [
WorkflowTransition(
from_state=WorkflowState.DRAFT,
to_state=WorkflowState.DATA_COLLECTION,
action="START_COLLECTION",
post_actions=["send_collection_invites", "set_deadlines"]
),
WorkflowTransition(
from_state=WorkflowState.DATA_COLLECTION,
to_state=WorkflowState.INTERNAL_REVIEW,
action="SUBMIT_FOR_REVIEW",
condition=lambda ctx: ctx.get("mandatory_coverage_pct", 0) >= 80,
pre_actions=["run_validation", "generate_completeness_report"]
),
WorkflowTransition(
from_state=WorkflowState.INTERNAL_REVIEW,
to_state=WorkflowState.DATA_COLLECTION,
action="REQUEST_CORRECTIONS",
post_actions=["notify_data_owners", "log_review_comments"]
),
WorkflowTransition(
from_state=WorkflowState.INTERNAL_REVIEW,
to_state=WorkflowState.CFO_APPROVAL,
action="APPROVE_INTERNAL_REVIEW",
condition=lambda ctx: ctx.get("reviewer_approved", False),
pre_actions=["generate_draft_report"]
),
WorkflowTransition(
from_state=WorkflowState.CFO_APPROVAL,
to_state=WorkflowState.EXTERNAL_ASSURANCE,
action="CFO_SIGN_OFF",
post_actions=["prepare_assurance_package", "notify_auditor"]
),
WorkflowTransition(
from_state=WorkflowState.EXTERNAL_ASSURANCE,
to_state=WorkflowState.BOARD_APPROVAL,
action="ASSURANCE_COMPLETE",
condition=lambda ctx: ctx.get("assurance_opinion") in ["UNQUALIFIED", "QUALIFIED"],
post_actions=["attach_assurance_report"]
),
WorkflowTransition(
from_state=WorkflowState.BOARD_APPROVAL,
to_state=WorkflowState.FILED,
action="BOARD_APPROVE_AND_FILE",
post_actions=["generate_ixbrl", "submit_to_oam", "publish_report"]
),
]
class CSRDWorkflowEngine:
def __init__(self, db_pool):
self.db_pool = db_pool
self.transitions = {
(t.from_state, t.action): t
for t in CSRD_WORKFLOW_TRANSITIONS
}
async def execute_transition(
self,
period_id: str,
action: str,
actor_id: str,
context: dict = None
) -> dict:
"""
Esegue una transizione di workflow per un periodo di reporting.
Restituisce il nuovo stato o errore se la transizione non e' valida.
"""
context = context or {}
async with self.db_pool.acquire() as conn:
# Carica stato corrente
period = await conn.fetchrow(
"SELECT status, company_id FROM reporting_periods WHERE id = $1",
period_id
)
if not period:
raise ValueError(f"Periodo {period_id} non trovato")
current_state = WorkflowState(period['status'])
transition_key = (current_state, action)
if transition_key not in self.transitions:
raise ValueError(
f"Transizione {action} non valida da stato {current_state.value}"
)
transition = self.transitions[transition_key]
# Verifica condizione (arricchisci context con dati DB se necessario)
if transition.condition:
enriched_ctx = await self._enrich_context(period_id, context, conn)
if not transition.condition(enriched_ctx):
raise ValueError(
f"Condizione per {action} non soddisfatta: "
f"mandatory coverage = {enriched_ctx.get('mandatory_coverage_pct', 0)}%"
)
# Esegui pre-actions
for pre_action in transition.pre_actions:
await self._execute_action(pre_action, period_id, context, conn)
# Aggiorna stato
await conn.execute(
"UPDATE reporting_periods SET status = $1, updated_at = NOW() WHERE id = $2",
transition.to_state.value,
period_id
)
# Audit log
await conn.execute("""
INSERT INTO audit_log (entity_type, entity_id, action, actor_id, new_value)
VALUES ('reporting_period', $1, $2, $3, $4)
""", period_id, action, actor_id,
json.dumps({"from": current_state.value, "to": transition.to_state.value}))
# Esegui post-actions in background (notifiche, generazione documenti)
for post_action in transition.post_actions:
asyncio.create_task(
self._execute_action_background(post_action, period_id, context)
)
return {
"period_id": period_id,
"previous_state": current_state.value,
"new_state": transition.to_state.value,
"action": action,
"actor_id": actor_id,
"timestamp": datetime.now().isoformat()
}
async def _enrich_context(self, period_id: str, context: dict, conn) -> dict:
"""Arricchisce context con dati calcolati dal DB."""
mandatory_total = await conn.fetchval(
"SELECT COUNT(*) FROM esrs_data_points WHERE is_mandatory = TRUE"
)
approved = await conn.fetchval("""
SELECT COUNT(*) FROM esg_data_submissions
WHERE reporting_period_id = $1 AND status = 'APPROVED'
""", period_id)
enriched = dict(context)
enriched['mandatory_coverage_pct'] = (
(approved / mandatory_total * 100) if mandatory_total else 0
)
return enriched
async def _execute_action(self, action_name: str, period_id: str, context: dict, conn):
"""Esegue un'azione sincrona nel workflow."""
action_map = {
"run_validation": self._action_run_validation,
"generate_completeness_report": self._action_gen_completeness,
"generate_draft_report": self._action_gen_draft,
}
if action_name in action_map:
await action_map[action_name](period_id, context, conn)
async def _action_run_validation(self, period_id, context, conn):
print(f"[WORKFLOW] Running validation for period {period_id}")
async def _action_gen_completeness(self, period_id, context, conn):
print(f"[WORKFLOW] Generating completeness report for {period_id}")
async def _action_gen_draft(self, period_id, context, conn):
print(f"[WORKFLOW] Generating draft CSRD report for {period_id}")
async def _execute_action_background(self, action_name: str, period_id: str, context: dict):
"""Esegue azione in background (notifiche, documenti)."""
print(f"[BACKGROUND] Executing {action_name} for period {period_id}")
XBRL/iXBRL Etiketleme: Dijital Rapor Oluşturma
CSRD, yıllık raporun şu formatta hazırlanmasını gerektirir: Etiketlemeli XHTML iXBRL (Satır içi XBRL), yapılandırılmış verileri doğrudan okunabilir HTML'ye gömer. EFRAG, her veri noktasını iXBRL etiketine eşleyen ESRS XBRL Taksonomisini yayınladı. Otomatik oluşturmayı uygulayalım:
# xbrl_generator.py
# Generazione iXBRL per CSRD secondo ESRS Taxonomy
from lxml import etree
from decimal import Decimal
from typing import Optional
import uuid
# ESRS XBRL Taxonomy namespace
ESRS_NS = "https://xbrl.efrag.org/taxonomy/draft-esrs/2022-11-22"
XBRLI_NS = "http://www.xbrl.org/2003/instance"
IX_NS = "http://www.xbrl.org/2013/inlineXBRL"
LINK_NS = "http://www.xbrl.org/2003/linkbase"
class ESRSXBRLGenerator:
"""
Genera documenti iXBRL conformi alla ESRS Taxonomy.
Output: XHTML con embedded XBRL facts.
"""
def __init__(self, company_data: dict, period_data: dict):
self.company = company_data
self.period = period_data
self.context_id = f"ctx_{uuid.uuid4().hex[:8]}"
self.facts = []
def add_fact(
self,
element_name: str,
value,
unit: Optional[str] = None,
decimals: int = 0,
scale: int = 0
):
"""Aggiunge un fact XBRL al report."""
self.facts.append({
"element": element_name,
"value": value,
"unit": unit,
"decimals": decimals,
"scale": scale
})
def generate_ixbrl(self) -> str:
"""
Genera il documento iXBRL completo.
Returns: stringa XHTML con tagging iXBRL embedded.
"""
# Namespace map per il documento
nsmap = {
"ix": IX_NS,
"xbrli": XBRLI_NS,
"esrs": ESRS_NS,
None: "http://www.w3.org/1999/xhtml"
}
# Root XHTML
html = etree.Element("html", nsmap=nsmap)
head = etree.SubElement(html, "head")
body = etree.SubElement(html, "body")
# Header XBRL nel head
ix_header = etree.SubElement(head, f"{{IX_NS}}header")
hidden = etree.SubElement(ix_header, f"{{IX_NS}}hidden")
# Context: entità' e periodo
resources = etree.SubElement(ix_header, f"{{IX_NS}}resources")
xbrli_context = etree.SubElement(resources, f"{{XBRLI_NS}}context")
xbrli_context.set("id", self.context_id)
entity = etree.SubElement(xbrli_context, f"{{XBRLI_NS}}entity")
identifier = etree.SubElement(entity, f"{{XBRLI_NS}}identifier")
identifier.set("scheme", "http://www.lei.info")
identifier.text = self.company.get("lei_code", "000000000000000000")
period = etree.SubElement(xbrli_context, f"{{XBRLI_NS}}period")
start_date = etree.SubElement(period, f"{{XBRLI_NS}}startDate")
start_date.text = self.period.get("start_date", "2024-01-01")
end_date = etree.SubElement(period, f"{{XBRLI_NS}}endDate")
end_date.text = self.period.get("end_date", "2024-12-31")
# Unit monetaria e tCO2eq
unit_co2 = etree.SubElement(resources, f"{{XBRLI_NS}}unit")
unit_co2.set("id", "tCO2eq")
measure_co2 = etree.SubElement(unit_co2, f"{{XBRLI_NS}}measure")
measure_co2.text = "esrs:metricTon"
# Body: report HTML con facts inline
h1 = etree.SubElement(body, "h1")
h1.text = f"CSRD Sustainability Report {self.period.get('year', 2024)}"
h2 = etree.SubElement(body, "h2")
h2.text = "E1 - Climate Change: GHG Emissions"
# Tabella emissioni con tagging iXBRL inline
table = etree.SubElement(body, "table")
thead = etree.SubElement(table, "thead")
tr_head = etree.SubElement(thead, "tr")
etree.SubElement(tr_head, "th").text = "Metric"
etree.SubElement(tr_head, "th").text = "Value"
etree.SubElement(tr_head, "th").text = "Unit"
tbody = etree.SubElement(table, "tbody")
# Aggiungi facts come righe di tabella con tag iXBRL
for fact in self.facts:
tr = etree.SubElement(tbody, "tr")
etree.SubElement(tr, "td").text = fact["element"].split(":")[-1]
td_value = etree.SubElement(tr, "td")
# Tag iXBRL inline per il valore
ix_nonfraction = etree.SubElement(
td_value,
f"{{IX_NS}}nonFraction"
)
ix_nonfraction.set("name", fact["element"])
ix_nonfraction.set("contextRef", self.context_id)
ix_nonfraction.set("unitRef", fact.get("unit", "tCO2eq"))
ix_nonfraction.set("decimals", str(fact.get("decimals", 0)))
ix_nonfraction.set("scale", str(fact.get("scale", 0)))
ix_nonfraction.set("format", "ixt:num-dot-decimal")
ix_nonfraction.text = str(fact["value"])
etree.SubElement(tr, "td").text = fact.get("unit", "tCO2eq")
# Serializza come XHTML
return etree.tostring(
html,
pretty_print=True,
xml_declaration=True,
encoding="UTF-8",
method="xml"
).decode("utf-8")
def generate_csrd_report_from_db(approved_submissions: list, company: dict, period: dict) -> str:
"""
Genera report iXBRL da submissions approvate nel DB.
"""
generator = ESRSXBRLGenerator(company, period)
# Mapping data_point_id -> XBRL element name
ESRS_XBRL_MAP = {
"E1-6_GrossScope1": "esrs:GrossScope1GHGEmissions",
"E1-6_GrossScope2LB": "esrs:GrossScope2GHGEmissionsLocationBased",
"E1-6_GrossScope2MB": "esrs:GrossScope2GHGEmissionsMarketBased",
"E1-6_GrossScope3Total": "esrs:GrossScope3GHGEmissions",
"E1-5_TotalEnergy": "esrs:TotalEnergyConsumption",
"E1-5_RenewableEnergy": "esrs:EnergyConsumptionFromRenewableSources",
"E1-5_NonRenewableEnergy": "esrs:EnergyConsumptionFromNonRenewableSources",
}
for sub in approved_submissions:
element = ESRS_XBRL_MAP.get(sub['data_point_id'])
if element and sub['value_numeric'] is not None:
generator.add_fact(
element_name=element,
value=float(sub['value_numeric']),
unit=sub.get('unit_of_measure', 'tCO2eq'),
decimals=0
)
return generator.generate_ixbrl()
ERP entegrasyonu: SAP Sürdürülebilirlik ve Oracle ESG
Çoğu ESG verisi halihazırda kurumsal ERP sistemlerinde bulunmaktadır: enerji tüketimi PM/PM modülünde, SuccessFactors'ta İK verileri, TM modülünde lojistik. Konektörleri uyguluyoruz kurumsal pazarın iki ana platformu için:
# erp_connectors.py
# Connettori per SAP Sustainability Footprint Management (SFM)
# e Oracle Fusion ESG
import httpx
import asyncio
from abc import ABC, abstractmethod
from typing import Optional
class ERPConnector(ABC):
"""Interfaccia base per connettori ERP."""
@abstractmethod
async def get_energy_consumption(self, year: int, entity_id: str) -> dict:
pass
@abstractmethod
async def get_ghg_emissions(self, year: int, entity_id: str) -> dict:
pass
@abstractmethod
async def get_workforce_metrics(self, year: int, entity_id: str) -> dict:
pass
class SAPSustainabilityConnector(ERPConnector):
"""
Connettore per SAP Sustainability Footprint Management (SFM).
Usa SAP OData API v4.
API base: /sap/opu/odata4/sap/api_sustainability_footprint/
"""
def __init__(
self,
base_url: str,
client_id: str,
client_secret: str,
token_url: str
):
self.base_url = base_url.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self._token: Optional[str] = None
self._client = httpx.AsyncClient(timeout=30)
async def _get_token(self) -> str:
"""OAuth2 client credentials per SAP BTP."""
if self._token:
return self._token
response = await self._client.post(
self.token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
}
)
response.raise_for_status()
self._token = response.json()["access_token"]
return self._token
async def _get(self, path: str, params: dict = None) -> dict:
token = await self._get_token()
response = await self._client.get(
f"{self.base_url}{path}",
headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
params=params
)
response.raise_for_status()
return response.json()
async def get_energy_consumption(self, year: int, entity_id: str) -> dict:
"""
Estrae consumo energetico da SAP SFM per anno e entità'.
OData endpoint: /EnergyConsumption
"""
data = await self._get(
"/EnergyConsumption",
params={
"$filter": f"Year eq {year} and EntityId eq '{entity_id}'",
"$select": "Year,EntityId,EnergyType,Quantity,Unit,Source"
}
)
result = {"total_mwh": 0, "renewable_mwh": 0, "non_renewable_mwh": 0, "by_source": []}
for record in data.get("value", []):
qty_mwh = self._convert_to_mwh(record["Quantity"], record["Unit"])
result["total_mwh"] += qty_mwh
result["by_source"].append({
"source": record["EnergyType"],
"mwh": qty_mwh,
"is_renewable": record["EnergyType"] in [
"SOLAR", "WIND", "HYDRO", "BIOMASS", "GEOTHERMAL"
]
})
if record["EnergyType"] in ["SOLAR", "WIND", "HYDRO", "BIOMASS", "GEOTHERMAL"]:
result["renewable_mwh"] += qty_mwh
else:
result["non_renewable_mwh"] += qty_mwh
return result
async def get_ghg_emissions(self, year: int, entity_id: str) -> dict:
"""Estrae emissioni GHG da SAP SFM, suddivise per scope."""
data = await self._get(
"/GHGEmission",
params={
"$filter": f"Year eq {year} and EntityId eq '{entity_id}'",
"$select": "Year,EntityId,GHGScope,Category,EmissionTCO2e"
}
)
result = {"scope1": 0, "scope2_lb": 0, "scope2_mb": 0, "scope3_by_category": {}}
for r in data.get("value", []):
scope = r["GHGScope"]
tco2e = float(r["EmissionTCO2e"])
if scope == "1":
result["scope1"] += tco2e
elif scope == "2" and r.get("Category") == "LB":
result["scope2_lb"] += tco2e
elif scope == "2" and r.get("Category") == "MB":
result["scope2_mb"] += tco2e
elif scope == "3":
cat = r.get("Category", "UNKNOWN")
result["scope3_by_category"][cat] = (
result["scope3_by_category"].get(cat, 0) + tco2e
)
result["scope3_total"] = sum(result["scope3_by_category"].values())
return result
async def get_workforce_metrics(self, year: int, entity_id: str) -> dict:
"""Estrae metriche workforce per ESRS S1 da SAP SuccessFactors."""
# SAP SuccessFactors OData API
data = await self._get(
"/WorkforceMetrics",
params={
"$filter": f"Year eq {year} and EntityId eq '{entity_id}'",
"$select": "Metric,Value,Gender,EmployeeType"
}
)
result = {
"headcount_total": 0,
"headcount_female": 0,
"headcount_male": 0,
"fte_total": 0,
"turnover_rate": 0,
"gender_pay_gap_pct": None
}
for r in data.get("value", []):
metric = r.get("Metric")
value = float(r.get("Value", 0))
if metric == "HEADCOUNT":
result["headcount_total"] += value
if r.get("Gender") == "F":
result["headcount_female"] += value
elif r.get("Gender") == "M":
result["headcount_male"] += value
elif metric == "FTE":
result["fte_total"] += value
elif metric == "TURNOVER_RATE":
result["turnover_rate"] = value
elif metric == "GENDER_PAY_GAP":
result["gender_pay_gap_pct"] = value
return result
def _convert_to_mwh(self, quantity: float, unit: str) -> float:
conversions = {"GJ": 0.2778, "MWh": 1.0, "kWh": 0.001, "TJ": 277.78}
return quantity * conversions.get(unit, 1.0)
class OracleESGConnector(ERPConnector):
"""
Connettore per Oracle Fusion Sustainability (Oracle ESG).
Usa Oracle REST API + BICC per bulk extraction.
"""
def __init__(self, base_url: str, username: str, password: str):
self.base_url = base_url.rstrip("/")
self.auth = (username, password)
self._client = httpx.AsyncClient(timeout=60)
async def _get(self, path: str, params: dict = None) -> dict:
response = await self._client.get(
f"{self.base_url}{path}",
auth=self.auth,
headers={"Accept": "application/json"},
params=params
)
response.raise_for_status()
return response.json()
async def get_energy_consumption(self, year: int, entity_id: str) -> dict:
data = await self._get(
"/fscmRestApi/resources/11.13.18.05/sustainabilityMetrics",
params={
"q": f"metricCategory=ENERGY;fiscalYear={year};legalEntityId={entity_id}",
"fields": "metricName,metricValue,uom,dataSource,lastUpdateDate"
}
)
total_mwh = 0
renewable_mwh = 0
RENEWABLE_METRICS = {"SOLAR_ENERGY", "WIND_ENERGY", "HYDRO_ENERGY", "PPA_RENEWABLE"}
for item in data.get("items", []):
mwh = self._to_mwh(item["metricValue"], item["uom"])
total_mwh += mwh
if item["metricName"] in RENEWABLE_METRICS:
renewable_mwh += mwh
return {
"total_mwh": total_mwh,
"renewable_mwh": renewable_mwh,
"non_renewable_mwh": total_mwh - renewable_mwh
}
async def get_ghg_emissions(self, year: int, entity_id: str) -> dict:
data = await self._get(
"/fscmRestApi/resources/11.13.18.05/sustainabilityMetrics",
params={
"q": f"metricCategory=GHG_EMISSIONS;fiscalYear={year};legalEntityId={entity_id}",
"fields": "metricName,ghgScope,metricValue,uom"
}
)
result = {"scope1": 0, "scope2_lb": 0, "scope2_mb": 0, "scope3_total": 0}
for item in data.get("items", []):
scope = item.get("ghgScope", "")
val = float(item.get("metricValue", 0))
if scope == "SCOPE_1":
result["scope1"] += val
elif scope == "SCOPE_2_LB":
result["scope2_lb"] += val
elif scope == "SCOPE_2_MB":
result["scope2_mb"] += val
elif scope.startswith("SCOPE_3"):
result["scope3_total"] += val
return result
async def get_workforce_metrics(self, year: int, entity_id: str) -> dict:
data = await self._get(
"/fscmRestApi/resources/11.13.18.05/sustainabilityMetrics",
params={
"q": f"metricCategory=WORKFORCE;fiscalYear={year};legalEntityId={entity_id}"
}
)
result = {"headcount_total": 0, "fte_total": 0, "gender_pay_gap_pct": None}
for item in data.get("items", []):
name = item.get("metricName", "")
val = float(item.get("metricValue", 0))
if name == "TOTAL_HEADCOUNT":
result["headcount_total"] = val
elif name == "TOTAL_FTE":
result["fte_total"] = val
elif name == "GENDER_PAY_GAP":
result["gender_pay_gap_pct"] = val
return result
def _to_mwh(self, value: float, uom: str) -> float:
factors = {"MWH": 1.0, "KWH": 0.001, "GJ": 0.2778, "MMBTU": 0.29307}
return value * factors.get(uom.upper(), 1.0)
class ERPDataAggregator:
"""
Aggrega dati da multipli connettori ERP e mappa su ESRS data points.
"""
def __init__(self, connectors: list[ERPConnector]):
self.connectors = connectors
async def collect_and_map_to_esrs(
self,
year: int,
entity_id: str,
period_id: str
) -> list[dict]:
"""
Raccoglie dati da tutti i connettori e produce submissions ESRS-ready.
"""
results = []
# Raccolta parallela da tutti i connettori
energy_tasks = [c.get_energy_consumption(year, entity_id) for c in self.connectors]
ghg_tasks = [c.get_ghg_emissions(year, entity_id) for c in self.connectors]
wf_tasks = [c.get_workforce_metrics(year, entity_id) for c in self.connectors]
energy_results = await asyncio.gather(*energy_tasks, return_exceptions=True)
ghg_results = await asyncio.gather(*ghg_tasks, return_exceptions=True)
wf_results = await asyncio.gather(*wf_tasks, return_exceptions=True)
# Aggregazione (media pesata o somma in base al tipo)
total_energy = sum(
r["total_mwh"] for r in energy_results
if isinstance(r, dict)
)
total_renewable = sum(
r["renewable_mwh"] for r in energy_results
if isinstance(r, dict)
)
total_scope1 = sum(
r.get("scope1", 0) for r in ghg_results
if isinstance(r, dict)
)
# Map su ESRS data points
if total_energy > 0:
results.append({
"data_point_id": "E1-5_TotalEnergy",
"value_numeric": total_energy,
"unit_of_measure": "MWh",
"source_system": "ERP_AGGREGATED",
"confidence_level": "HIGH"
})
if total_renewable > 0:
results.append({
"data_point_id": "E1-5_RenewableEnergy",
"value_numeric": total_renewable,
"unit_of_measure": "MWh",
"source_system": "ERP_AGGREGATED",
"confidence_level": "HIGH"
})
if total_scope1 > 0:
results.append({
"data_point_id": "E1-6_GrossScope1",
"value_numeric": total_scope1,
"unit_of_measure": "tCO2eq",
"source_system": "ERP_AGGREGATED",
"confidence_level": "HIGH"
})
return results
Dış Güvence için Veri Kaynağı ve Denetim Yolu
CSRD gerektirir sınırlı güvence (ve perspektif olarak makul güvence) harici bir denetçi tarafından. Bu, rapordaki her verinin olması gerektiği anlamına gelir. tam izlenebilirlik: nereden geldiği, kimin yerleştirdiği, kimin onayladığı, ve nasıl hesaplandığı. Veri kökeni izleyiciyi uygulayalım:
# data_lineage.py
# Data lineage tracker per assurance CSRD
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import hashlib
import json
@dataclass
class LineageNode:
"""Nodo nel grafo di data lineage."""
node_id: str
node_type: str # SOURCE, TRANSFORM, SUBMISSION, APPROVAL
description: str
actor: Optional[str] = None
timestamp: Optional[datetime] = None
metadata: dict = field(default_factory=dict)
hash: Optional[str] = None
def compute_hash(self) -> str:
"""Hash del contenuto per rilevare manomissioni."""
content = json.dumps({
"node_id": self.node_id,
"node_type": self.node_type,
"description": self.description,
"actor": self.actor,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"metadata": self.metadata
}, sort_keys=True)
self.hash = hashlib.sha256(content.encode()).hexdigest()
return self.hash
@dataclass
class LineageEdge:
"""Arco tra nodi nel grafo di lineage."""
from_node: str
to_node: str
relationship: str # DERIVED_FROM, APPROVED_BY, TRANSFORMED_BY
metadata: dict = field(default_factory=dict)
class DataLineageTracker:
"""
Traccia il percorso completo di ogni dato ESG dalla sorgente al report.
Struttura: DAG (Directed Acyclic Graph) di LineageNode.
"""
def __init__(self, db_pool):
self.db_pool = db_pool
async def record_extraction(
self,
data_point_id: str,
source_system: str,
source_query: str,
raw_value,
actor_id: str
) -> str:
"""Registra estrazione da sistema sorgente."""
node = LineageNode(
node_id=f"extract_{data_point_id}_{datetime.now().timestamp():.0f}",
node_type="SOURCE",
description=f"Extracted from {source_system}",
actor=actor_id,
timestamp=datetime.now(),
metadata={
"source_system": source_system,
"source_query": source_query,
"raw_value": str(raw_value),
"data_point_id": data_point_id
}
)
node.compute_hash()
await self._save_node(node)
return node.node_id
async def record_transformation(
self,
from_node_id: str,
data_point_id: str,
transformation_type: str,
input_value,
output_value,
formula: Optional[str],
actor_id: str
) -> str:
"""Registra una trasformazione (unit conversion, aggregation, etc.)."""
node = LineageNode(
node_id=f"transform_{data_point_id}_{datetime.now().timestamp():.0f}",
node_type="TRANSFORM",
description=f"{transformation_type} applied",
actor=actor_id,
timestamp=datetime.now(),
metadata={
"transformation_type": transformation_type,
"input_value": str(input_value),
"output_value": str(output_value),
"formula": formula,
"data_point_id": data_point_id
}
)
node.compute_hash()
await self._save_node(node)
edge = LineageEdge(
from_node=from_node_id,
to_node=node.node_id,
relationship="DERIVED_FROM"
)
await self._save_edge(edge)
return node.node_id
async def record_approval(
self,
submission_node_id: str,
data_point_id: str,
approver_id: str,
approval_comment: Optional[str]
) -> str:
"""Registra approvazione da parte del reviewer."""
node = LineageNode(
node_id=f"approval_{data_point_id}_{datetime.now().timestamp():.0f}",
node_type="APPROVAL",
description="Data point approved for reporting",
actor=approver_id,
timestamp=datetime.now(),
metadata={
"data_point_id": data_point_id,
"approval_comment": approval_comment,
"approver_id": approver_id
}
)
node.compute_hash()
await self._save_node(node)
edge = LineageEdge(
from_node=submission_node_id,
to_node=node.node_id,
relationship="APPROVED_BY"
)
await self._save_edge(edge)
return node.node_id
async def get_lineage_for_data_point(
self,
data_point_id: str,
period_id: str
) -> dict:
"""
Restituisce il grafo completo di lineage per un data point.
Usato dall'auditor per tracciare l'origine dei dati.
"""
async with self.db_pool.acquire() as conn:
nodes = await conn.fetch("""
SELECT * FROM lineage_nodes
WHERE metadata->>'data_point_id' = $1
ORDER BY timestamp ASC
""", data_point_id)
edges = await conn.fetch("""
SELECT le.* FROM lineage_edges le
JOIN lineage_nodes ln ON ln.node_id = le.from_node
WHERE ln.metadata->>'data_point_id' = $1
""", data_point_id)
return {
"data_point_id": data_point_id,
"period_id": period_id,
"nodes": [dict(n) for n in nodes],
"edges": [dict(e) for e in edges],
"integrity_check": await self._verify_chain_integrity(nodes)
}
async def _verify_chain_integrity(self, nodes) -> dict:
"""Verifica che gli hash non siano stati manomessi."""
for node in nodes:
expected_hash = node.compute_hash() if hasattr(node, 'compute_hash') else None
if expected_hash and node.get('hash') != expected_hash:
return {"valid": False, "tampered_node": node['node_id']}
return {"valid": True, "nodes_verified": len(nodes)}
async def _save_node(self, node: LineageNode):
async with self.db_pool.acquire() as conn:
await conn.execute("""
INSERT INTO lineage_nodes
(node_id, node_type, description, actor, timestamp, metadata, hash)
VALUES ($1, $2, $3, $4, $5, $6, $7)
""",
node.node_id, node.node_type, node.description,
node.actor, node.timestamp, json.dumps(node.metadata), node.hash)
async def _save_edge(self, edge: LineageEdge):
async with self.db_pool.acquire() as conn:
await conn.execute("""
INSERT INTO lineage_edges (from_node, to_node, relationship, metadata)
VALUES ($1, $2, $3, $4)
""",
edge.from_node, edge.to_node, edge.relationship,
json.dumps(edge.metadata))
Örnek Olay: 250 Çalışanlı Üretim Şirketi
Gerçek uygulamayı analiz edelim MetalTech S.r.l., imalat şirketi 250 çalışanlı, iki üretim tesisi (Torino ve Bari), CSRD dalga 2'ye tabi (2025 Mali Yılı, 2026'daki ilk rapor). Projeyi şu şekilde yapılandırdılar:
Aşama 1: Boşluk Analizi ve Önemlilik Değerlendirmesi (1-2. Aylar)
MetalTech süreç boyunca imalat sektörü için öncelikli konuları belirledi çifte önemlilik:
- ESRS E1 (İklim): ÇİFT önemlilik - önemli miktarda enerji tüketimi (eritme fırını, üretim hatları) ve aşırı olaylardan kaynaklanan fiziksel risk
- ESRS E2 (Kirlilik): Darbe malzemesi - PM2.5 partikül emisyonları ve boyadan elde edilen VOC bileşikleri
- ESRS E5 (Döngüsel Ekonomi): Malzeme - metal atık (malzemenin %35'i) girişte), paketleme, endüstriyel atık su
- ESRS S1 (İşgücü): Malzeme - 250 çalışan, yüksek kaza oranı sektörde cinsiyetler arası ücret farkı
- ESRS G1 (İş Davranışı): Temel düzenleyici gerekliliklere ilişkin materyal
Aşama 2: Teknik Altyapı (2-4 Ay)
MetalTech tarafından benimsenen teknoloji yığını:
| Bileşen | Teknoloji | Kullanım |
|---|---|---|
| API Arka Uçları | FastAPI + eşzamansız sayfa | ESG veri toplama ve doğrulama |
| Veritabanları | PostgreSQL 16 + pgvektör | Depolama ESRS veri noktaları + benzerlik araması |
| ERP Entegrasyonu | SAP SFM OData API'si | SAP'den enerji, emisyonlar ve iş gücü |
| İş akışı | Özel FSM + Redis | Çok düzeyli onay iş akışı |
| XBRL Nesli | Python lxml + ESRS Taksonomisi | OAM/CONSOB için iXBRL çıkışı |
| Veri Kökeni | PostgreSQL'de Özel DAG | Güvence için denetim takibi |
| Bildirimler | FastAPI ArkaplanGörevleri + SMTP | Son tarih uyarısı, inceleme isteği |
| Başlangıç aşaması | Açısal 17 + Malzeme | Toplama ve izleme kontrol paneli |
Aşama 3: 2025 Mali Yılı Veri Toplama (4-14. Aylar)
# Esempio raccolta dati MetalTech FY2025
# Script orchestrazione completa per stabilimento Torino
import asyncio
from erp_connectors import SAPSustainabilityConnector, ERPDataAggregator
from workflow_engine import CSRDWorkflowEngine, WorkflowState
async def run_metaltech_fy2025_collection():
# Configurazione connettore SAP
sap = SAPSustainabilityConnector(
base_url="https://metaltech.hana.ondemand.com",
client_id="METALTECH_CSRD_CLIENT",
client_secret="***", # Da env vars
token_url="https://metaltech.authentication.eu12.hana.ondemand.com/oauth/token"
)
aggregator = ERPDataAggregator([sap])
# Raccolta dati stabilimento Torino (entity TO001)
print("[1/3] Raccolta dati da SAP per stabilimento Torino...")
submissions_to = await aggregator.collect_and_map_to_esrs(
year=2025,
entity_id="TO001",
period_id="period_fy2025"
)
# Raccolta dati stabilimento Bari (entity BA001)
print("[2/3] Raccolta dati da SAP per stabilimento Bari...")
submissions_ba = await aggregator.collect_and_map_to_esrs(
year=2025,
entity_id="BA001",
period_id="period_fy2025"
)
# Consolidamento gruppo: somma Scope 1+2, media intensità'
print("[3/3] Consolidamento dati di gruppo...")
consolidated = consolidate_group_data([submissions_to, submissions_ba])
# Output dati FY2025 MetalTech consolidati
for sub in consolidated:
print(f" {sub['data_point_id']}: {sub['value_numeric']:.2f} {sub['unit_of_measure']}")
return consolidated
def consolidate_group_data(entity_submissions: list) -> list:
"""Consolida submissions di multiple entità per il gruppo."""
by_data_point = {}
for entity_subs in entity_submissions:
for sub in entity_subs:
dp_id = sub['data_point_id']
if dp_id not in by_data_point:
by_data_point[dp_id] = {**sub, "value_numeric": 0}
# Per emissioni e energia: somma (non media)
by_data_point[dp_id]["value_numeric"] += sub['value_numeric']
by_data_point[dp_id]["source_system"] = "CONSOLIDATED_GROUP"
return list(by_data_point.values())
# Risultati tipici per azienda manifatturiera 250 dip.
METALTECH_FY2025_RESULTS = {
"E1-6_GrossScope1": {
"value": 2_847, # tCO2eq - principalmente gas naturale forno fusione
"confidence": "HIGH",
"source": "SAP SFM - Gas meter readings"
},
"E1-6_GrossScope2LB": {
"value": 1_523, # tCO2eq - elettricita' rete
"confidence": "HIGH",
"source": "SAP SFM - Electricity bills"
},
"E1-6_GrossScope2MB": {
"value": 980, # tCO2eq - market-based con PPA solare
"confidence": "HIGH",
"source": "SAP SFM + GO certificates"
},
"E1-5_TotalEnergy": {
"value": 18_450, # MWh totali
"confidence": "HIGH",
"source": "SAP SFM - Meters + bills"
},
"E1-5_RenewableEnergy": {
"value": 6_200, # MWh (33.6% via PPA + impianto solare 500kWp)
"confidence": "MEDIUM", # Stima PPA
"source": "SAP SFM + PPA contract"
},
"GHG_Intensity": {
"value": 42.3, # tCO2eq / M EUR fatturato
"confidence": "HIGH",
"source": "Calculated: (S1+S2MB) / revenue"
}
}
Sonuçlar ve Alınan Dersler
MetalTech FY2025: CSRD Projesinin Sonuçları
- Veri toplama süresi: 6 haftadan (manuel) 4 güne (otomatik)
- Zorunlu ESRS kapsamı: Toplanan ve onaylanan zorunlu veri noktalarının %94'ü
- Veri kalitesi: Başvuruların %78'ine güven YÜKSEK (önceki yılın %45'ine kıyasla)
- Denetim süreleri: Otomatik veri kökeni sayesinde %60 oranında azaltıldı
- Dış güvence maliyeti: Daha az örnekleme çabası için -%25
- Ana Bulgu: Kapsam 3 kedi. 1 (yukarı akış malzemeleri) %40 oranında eksik tahmin edildi önceki manuel hesaplamalarla karşılaştırıldığında
Temel Zorluklar ve Bunların Üstesinden Gelme Yolları
Kaçınılması Gereken Anti-Desenler
- Her şeyi sıfırdan inşa etmeyin: Piyasa SaaS CSRD çözümleri sunuyor KOBİ'ler için (Workiva, Watershed, Persefoni). Yap-satın al değerlendirmesini yapın: şirket içi inşa etme yalnızca yüksek veri karmaşıklığına veya benzersiz entegrasyon ihtiyaçlarınıza sahipseniz mantıklıdır.
- Önemlilik değerlendirmesini geciktirmeyin: Her şeyin ön şartıdır. Bu olmadan hangi verileri toplayacağınızı bilemezsiniz. Basitleştirilmiş bir yöntemle bile hemen başlayın.
- Kapsam 3'ü göz ardı etmeyin: Birçok imalat şirketi için Kapsam 3 kat. 1 (hammaddeler) ve kedi. 11 (ürün kullanımı) toplam emisyonların %70'ini aşıyor. Kapsam 3'ün eksik raporlanması en yaygın yeşil aklamadır.
- XBRL'ye son bir düşünce olarak bakmayın: iXBRL etiketleme gerektirir ESRS Taksonomisine aşina. En az 2-3 aylık bir gelişim planlayın.
- Konsolidasyon kapsamını unutmayın: CSRD gerektirir tüm bağlı ortaklıkları içerir >%50. "Basitlik" nedeniyle varlıkları hariç tutun bu bir uyum riskidir.
Gelişmiş ESG Sorguları için GraphQL API'si
ESG analitik kontrol panelleri ve etkileşimli raporlama araçları için GraphQL API'si şunları sunar: toplamalarla belirli sorgulara izin veren klasik REST'e göre üstün esneklik anında:
# graphql_schema.py
# Schema GraphQL per ESG Analytics API con Strawberry
import strawberry
from strawberry.types import Info
from typing import Optional, List
from datetime import date
from decimal import Decimal
@strawberry.type
class GHGEmissionsSummary:
year: int
scope1_tco2eq: Decimal
scope2_lb_tco2eq: Decimal
scope2_mb_tco2eq: Decimal
scope3_tco2eq: Decimal
total_tco2eq: Decimal
ghg_intensity: Optional[Decimal]
yoy_change_pct: Optional[Decimal]
@strawberry.type
class EnergySummary:
year: int
total_mwh: Decimal
renewable_mwh: Decimal
non_renewable_mwh: Decimal
renewable_share_pct: Decimal
@strawberry.type
class ESGReportingStatus:
period_id: str
year: int
workflow_state: str
mandatory_coverage_pct: Decimal
approved_data_points: int
pending_review: int
missing_mandatory: int
@strawberry.type
class Query:
@strawberry.field
async def ghg_emissions(
self,
info: Info,
company_id: str,
years: List[int]
) -> List[GHGEmissionsSummary]:
"""Emissioni GHG per anni richiesti con calcolo YoY automatico."""
db = info.context["db_pool"]
async with db.acquire() as conn:
rows = await conn.fetch("""
SELECT
rp.year,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope1%'
THEN eds.value_numeric ELSE 0 END), 0) as scope1,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope2%LB%'
THEN eds.value_numeric ELSE 0 END), 0) as scope2_lb,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope2%MB%'
THEN eds.value_numeric ELSE 0 END), 0) as scope2_mb,
COALESCE(SUM(CASE WHEN dp.id LIKE '%Scope3%'
THEN eds.value_numeric ELSE 0 END), 0) as scope3
FROM esg_data_submissions eds
JOIN reporting_periods rp ON rp.id = eds.reporting_period_id
JOIN esrs_data_points dp ON dp.id = eds.data_point_id
WHERE rp.company_id = $1
AND rp.year = ANY($2)
AND eds.status = 'APPROVED'
AND dp.esrs_standard = 'E1'
GROUP BY rp.year
ORDER BY rp.year
""", company_id, years)
results = []
prev_total = None
for row in rows:
total = row['scope1'] + row['scope2_mb'] + row['scope3']
yoy = None
if prev_total and prev_total > 0:
yoy = ((total - prev_total) / prev_total) * 100
results.append(GHGEmissionsSummary(
year=row['year'],
scope1_tco2eq=Decimal(str(row['scope1'])),
scope2_lb_tco2eq=Decimal(str(row['scope2_lb'])),
scope2_mb_tco2eq=Decimal(str(row['scope2_mb'])),
scope3_tco2eq=Decimal(str(row['scope3'])),
total_tco2eq=Decimal(str(total)),
ghg_intensity=None,
yoy_change_pct=Decimal(str(yoy)) if yoy else None
))
prev_total = total
return results
@strawberry.field
async def reporting_status(
self,
info: Info,
company_id: str,
year: int
) -> Optional[ESGReportingStatus]:
"""Stato del processo di reporting per un anno."""
db = info.context["db_pool"]
async with db.acquire() as conn:
period = await conn.fetchrow("""
SELECT rp.id, rp.year, rp.status,
COUNT(eds.id) FILTER (WHERE eds.status = 'APPROVED') as approved,
COUNT(eds.id) FILTER (WHERE eds.status IN ('SUBMITTED','REVIEWING')) as pending,
(SELECT COUNT(*) FROM esrs_data_points WHERE is_mandatory = TRUE) as mandatory_total
FROM reporting_periods rp
LEFT JOIN esg_data_submissions eds ON eds.reporting_period_id = rp.id
WHERE rp.company_id = $1 AND rp.year = $2
GROUP BY rp.id, rp.year, rp.status
""", company_id, year)
if not period:
return None
mandatory_total = period['mandatory_total'] or 1
approved = period['approved'] or 0
coverage = (approved / mandatory_total) * 100
return ESGReportingStatus(
period_id=str(period['id']),
year=period['year'],
workflow_state=period['status'],
mandatory_coverage_pct=Decimal(str(round(coverage, 1))),
approved_data_points=approved,
pending_review=period['pending'] or 0,
missing_mandatory=max(0, mandatory_total - approved)
)
schema = strawberry.Schema(query=Query)
# Query di esempio per il frontend
EXAMPLE_QUERY = """
query ESGDashboard($companyId: String!, $years: [Int!]!) {
ghgEmissions(companyId: $companyId, years: $years) {
year
scope1Tco2eq
scope2MbTco2eq
scope3Tco2eq
totalTco2eq
yoyChangePct
}
reportingStatus(companyId: $companyId, year: 2025) {
workflowState
mandatoryCoveragePct
approvedDataPoints
missingMandatory
}
}
"""
Yapay Zeka Yasası ve CSRD ile Kesişime İlişkin Hususlar
2025-2026'da ortaya çıkan bir husus, CSRD ile AI Yasası AB arasındaki kesişmedir. Sistemler Yapay zeka, emisyonları hesaplamak, iklim tahminleri yapmak veya ÇSY raporlamasını otomatikleştirmek için kullanılıyor Yapay Zeka Yasasının yüksek riskli Yapay Zeka kategorilerine girebilir ve şunları gerektirir:
- Teknik dokümantasyon Emisyon tahminleri için kullanılan yapay zeka sisteminin (ESRS metodolojik şeffaflığı)
- İnsan gözetimi otomatik önemlilik kararları hakkında
- Denetim izleri Yapay zeka tahminlerinin oranı (zaten veri kökenimiz kapsamındadır)
- Ayrımcılık yapmama ESRS S1 ölçümleri için kullanılan AI İK sistemlerinde
AI Yasası x CSRD Zaman Çizelgesi
- Şubat 2025: Yürürlükteki AI Yasası - yasaklanmış uygulamaları yasaklıyor
- Ağustos 2025: GPAI (Genel Amaçlı Yapay Zeka) yükümlülükleri - Yapay Zeka temel modellerinin şeffaflığı
- Ağustos 2026: Yüksek Riskli Yapay Zeka Kapsamlı Yükümlülükleri - İK ve kredi kararları için yapay zeka sistemlerini içerir
- Ağustos 2027: Tüm kullanım durumları için tam AI Yasası uygulaması
ESG/CSRD sistemlerini oluşturan ekiplerin zaten uyumluluk için plan yapması gerekiyor Yığınlarının makine öğrenimi bileşenleri için Yapay Zeka Yasası: Kapsam 3 emisyon tahmin modelleri, ESG verilerinde anormallik tespiti, tedarikçi değerlendirmesi için NLP.
Sonuçlar ve Sonraki Adımlar
CSRD uyumlu bir sistem oluşturmak yalnızca bir uyumluluk çalışması değildir: yaratacak bir sürdürülebilirlik veri mimarisi yaratma fırsatı Yıllardır değer. Gördüğümüz bileşenler: ESRS veri modeli, toplama API'si doğrulama, FSM iş akışı, ERP entegrasyonu, XBRL oluşturucu, veri kökeni — bunların hepsi yeniden kullanılabilir ve modülerdir.
Eve götürülecek önemli noktalar:
- Önemlilik değerlendirmesinden başlayın: çifte önemlilik olmadan ne olduğunu bilemezsiniz topla. Bu, gerçek paydaş katılımını ve daha fazlasını gerektiren bir süreçtir. algoritmalar.
- ERP toplamayı otomatikleştirin: ESRS E1 ve S1 verilerinin %60-70'i mevcut zaten sistemlerinizde. SAP ve Oracle için OData bağlayıcıları yatırımdır haftalar içinde iyileşir.
- Şimdi veri kökenine yatırım yapın: dış sigorta maliyeti doğrudan veri izlenebilirliğine bağlıdır. İyi bir denetim takibi sistemi Denetim maliyetini %20-30 oranında azaltır.
- Omnibus Paketini göz ardı etmeyin: 2025'te önerilen basitleştirmeler KOBİ'lerin bazı yükümlülüklerini hafifletebilir. Yasama sürecini izleyin ancak Nihai onaydan önce bunları planlayın.
Serinin bir sonraki makalesinde - Yapay Zeka Karbon Ayak İzi — AI modellerinin çevresel etkisini nasıl ölçeceğimizi ve azaltacağımızı keşfedeceğiz: eğitimden CodeCarbon gibi araçlarla Büyük Dil Modellerinin üretimdeki çıkarımlara dönüştürülmesi, ML CO2 Sürdürülebilir ML mühendisliği için etki ve stratejiler.
Daha fazla bilgi edinmek için kaynaklar
- EFRAG ESRS Taksonomisi: efrag.org - ESRS XBRL Taksonomisi
- CSRD Resmi Metni: EUR-Lex - Direktif 2022/2464/AB
- Sera Gazı Protokolü Kurumsal Standardı: ghgprotocol.org
- SAP Sürdürülebilirlik Ayak İzi Yönetimi: sap.com/sustainability
- Python XBRL kitaplıkları: arelle (açık kaynak), python-xbrl
- CSRD SaaS Araçları: Workiva, Havza, Persefoni, Süpürme, Yeşillik
Yeşil Yazılım Mühendisliği Serisi - Navigasyon
Önceki makale: GreenOps: Sürdürülebilir DevOps — CI/CD yeşil, DevOps ekipleri için bulutun doğru boyutlandırılması, FinOps ve sürdürülebilirlik ölçümleri.
Sonraki makale: Yapay Zeka Karbon Ayak İzi — Ölç ve azalt pratik araçlar ve kıyaslamalarla birlikte eğitim ve çıkarımda yapay zeka modellerinin emisyonları sektörünün.
Bu dizi yolculuğun bir parçası Veri ve Yapay Zeka İşi seriyi de içeren federicocalo.dev'de İşletmeler için MLOps (IDs 306-315) kitlenin derinliklerine inmek isteyenler için hesaplama verimliliğine dikkat ederek yapay zeka modellerinin üretiminde ve yönetişim.







