Diplom-Biologe | Senior IT-Consultant
|
SH |
Sascha Hess xenosystems.de - IT-Consulting & Data Management |
www.xenosystems.de |
|
|
NOTFALL-KIT – SQL SERVER 2026 |
|
|
SQL Server |
|
|
Datenquellen & ETL-Prozesse |
|
|
Zuverlässige Datenpipelines — von der Quelle bis ins Dashboard |
WAS SIE IN DIESEM KIT ERHALTEN:
|
|
1 |
10 ETL-Fallen Die häufigsten Fehler in Datenpipelines — mit Diagnose und Fix |
|
|
2 |
Quelltypenkatalog ERP, REST-API, CSV, Excel, Datenbank, Stream — Anbindungsrezepte |
|
|
3 |
ETL-Architekturmuster Full Load, Delta Load, CDC, Streaming — wann welche Strategie? |
|
|
4 |
Tool-Vergleich 2026 SSIS, ADF, Power Query, dbt, Python — was passt zu welchem KMU? |
|
|
5 |
30-Tage-Pipeline-Plan Von der ersten Datenquelle zur robusten, überwachten Datenpipeline |
HAFTUNGSAUSSCHLUSS
Alle Skripte, Architekturempfehlungen und Methoden wurden sorgfältig auf Basis langjähriger praktischer Erfahrung in der Datenintegration und BI-Entwicklung erarbeitet. Da jede Systemlandschaft individuell ist, übernimmt der Autor keinerlei Haftung für Datenverlust, Systemausfälle oder sonstige Schäden. Testen Sie alle Pipelines zunächst in einer Nicht-Produktionsumgebung und erstellen Sie vor jedem Eingriff vollständige Backups.
KEINE ERGEBNISGARANTIE
Genannte Performance-Werte und Implementierungsaufwände sind Erfahrungswerte aus realen Projekten und keine verbindliche Zusicherung. Tatsächliche Ergebnisse hängen von Datenvolumen, Systemarchitektur, Netzwerkinfrastruktur und internen Ressourcen ab.
VERSIONSHINWEIS
Die Inhalte beziehen sich auf SQL Server 2022/2025, Azure Data Factory (Q1 2026), SSIS 2019/2022, dbt Core 1.8 und Python 3.12, Stand März 2026. Alle genannten Plattformen werden regelmäßig aktualisiert — Syntax und Funktionsumfang können abweichen.
URHEBERRECHT
Dieses Dokument ist für den persönlichen oder betriebsinternen Gebrauch des Käufers lizenziert. Weiterverkauf, Weitergabe an Dritte und öffentliche Veröffentlichung sind ohne schriftliche Genehmigung nicht gestattet.
MARKENRECHTE
SQL Server, SSIS, Azure Data Factory, Power Query und Power BI sind eingetragene Marken der Microsoft Corporation. dbt ist ein Produkt von dbt Labs, Inc. Python ist ein Produkt der Python Software Foundation. Apache Kafka ist eine eingetragene Marke der Apache Software Foundation. Alle anderen genannten Produkt- und Unternehmensnamen sind Eigentum ihrer jeweiligen Inhaber.
Eine ausführliche Version dieses Haftungsausschlusses befindet sich am Ende dieses Dokuments.
01 Einleitung
Warum ETL die unsichtbare Grundlage jedes BI-Projekts ist
02 Die 10 ETL-Fallen
Häufigste Fehler in Datenpipelines — mit Diagnose und Fix
03 Quelltypenkatalog
ERP, REST-API, CSV, Excel, DB, Stream — Anbindungsrezepte
04 ETL-Architekturmuster
Full Load, Delta, CDC, Streaming — wann welche Strategie
05 Staging und Transformation
Rohdaten sichern, bereinigen, anreichern — Schicht für Schicht
06 Fehlerbehandlung & Monitoring
Robuste Pipelines die bei Problemen sofort eskalieren
07 Tool-Vergleich 2026
SSIS, ADF, Power Query, dbt, Python — ehrlicher Vergleich
08 Datenpipeline-Dokumentation
Lineage, Testfälle und Betriebshandbuch für jede Pipeline
09 Sicherheit & Compliance
Zugänge, Verschlüsselung, DSGVO in ETL-Prozessen
10 30-Tage-Pipeline-Plan
Von der ersten Quelle zur robusten, überwachten Pipeline
01
Ein Power-BI-Dashboard sieht hervorragend aus. Schicke Charts, klare KPI-Cards, professionelles Farbschema. Der Geschäftsführer öffnet es an einem Montagmorgen — und fragt sich, warum der Umsatz des Freitags nicht enthalten ist. Weil der ETL-Job Samstagmorgen um 03:00 Uhr abgebrochen ist. Wegen einer veränderten Spalte in der ERP-Quelltabelle. Die seit dem letzten Patch umbenannt wurde. Und niemand hat den Fehler bis Montag bemerkt, weil kein Alerting eingerichtet war.
ETL-Prozesse sind die unsichtbare Grundlage jedes BI-Systems. Wenn sie funktionieren, denkt niemand an sie. Wenn sie versagen, versagt das gesamte Dashboard.
Und doch werden ETL-Prozesse in mittelständischen Projekten systematisch unterschätzt. Das Budget für das Dashboard: hoch. Das Budget für die Pipeline, die das Dashboard mit Daten versorgt: minimal. Die Dokumentation des Dashboards: vorhanden. Die Dokumentation der Pipeline: "liegt im Kopf des Entwicklers."
Die Konsequenzen sind überall dieselben:
→ Datenqualitätsprobleme, die im ETL entstehen, werden erst im Dashboard bemerkt — weit vom Ursprung entfernt, schwer zu diagnostizieren.
→ Jede Änderung an Quellsystemen (ERP-Update, neue API-Version, geändertes CSV-Format) bricht die Pipeline — weil keine Tests und keine Schemavalidierung existieren.
→ Manuelle Korrektur-Workflows ersetzen automatisierte Prozesse. Weil der ETL es "nicht kann", zieht jemand monatlich eine Excel-Datei, bereinigt sie von Hand, und lädt sie manuell hoch. Zeitaufwand: 4 Stunden. Fehlerrisiko: hoch. Dokumentiert: nicht.
→ Skalierung ist unmöglich: Was für 100.000 Zeilen funktioniert, bricht bei 5 Millionen Zeilen zusammen — weil der Full-Load-Ansatz nicht durch Delta-Logik ersetzt wurde.
Dieses Kit ist das vollständige Praxis-Werkzeugkasten für ETL-Prozesse im KMU-Kontext — von der ersten Quellenanbindung über robuste Transformationslogik bis zu Monitoring, Dokumentation und DSGVO-konformer Datenhaltung.
|
|
WAS SIE IN DIESEM BUCH ERWARTEN DÜRFEN ■ 10 ETL-Fallen — Die häufigsten Fehler in Datenpipelines mit konkreten Diagnose-Skripten und Fixes. ■ Quelltypenkatalog — Anbindungsrezepte für ERP, REST-API, CSV, Excel, Datenbanken und Streams. ■ Architekturmuster — Full Load, Delta Load, CDC und Streaming — wann welche Strategie sinnvoll ist. ■ Tool-Vergleich — SSIS, ADF, Power Query, dbt und Python im ehrlichen KMU-Vergleich. ■ 30-Tage-Pipeline-Plan — Von der ersten Datenquelle zur robusten, überwachten, dokumentierten Pipeline. |
|
|
DIE GRUNDREGEL Rohdaten werden niemals überschrieben. Die Staging-Schicht speichert immer die originalen Quelldaten — unverändert, mit Ladedatum und Quellkennzeichen. Transformationen erfolgen ausschließlich in nachgelagerten Schichten. Diese Regel rettet Projekte, wenn Transformationslogik falsch war. |
02
Täglich werden 50 Millionen Zeilen aus dem ERP komplett neu geladen. Der ETL läuft 4 Stunden. Das Wartungsfenster ist 2 Stunden. Jeden Morgen sind Dashboards veraltet, weil der Job nicht rechtzeitig fertig wird.
LÖSUNG:
✓ Delta-Load oder CDC (Change Data Capture) für alle Tabellen mit > 100.000 Zeilen.
✓ Full Load nur für kleine Dimensionstabellen (< 10.000 Zeilen) oder bei expliziter Notwendigkeit.
✓ Faustregel: Full Load dauert > 30 Minuten → Delta-Strategie evaluieren.
|
-- Delta-Load: nur geänderte Datensätze seit letztem Run laden -- Vorbedingung: Quelltabelle hat Timestamp-Spalte (ModifiedDate) DECLARE @LastLoadDate DATETIME2 = (SELECT COALESCE(MAX(LetzterLadezeitpunkt), '2000-01-01') FROM dbo.ETL_Log WHERE JobName = 'ETL_Belege_Delta');
INSERT INTO dbo.Staging_Belege_Delta SELECT *, SYSDATETIME() AS StagingLadedatum FROM [ERP-Server].ERP_Prod.dbo.Belege WHERE ModifiedDate > @LastLoadDate; |
Transformationen und Ladevorgänge laufen direkt gegen die Zieltabelle. Bei einem Fehler in der Mitte des Jobs: teils neue, teils alte Daten — inkonsistenter Zustand. Kein Rollback möglich.
LÖSUNG:
✓ Immer drei Schichten: Staging (Rohdaten) → Integration (transformiert) → Presentation (BI-fertig).
✓ Staging ist append-only: Rohdaten landen unverändert in Staging — mit Ladedatum, Quellkennzeichen, Batch-ID.
✓ Zieltabelle wird erst am Ende eines erfolgreichen Jobs aktualisiert — nie während der Transformation.
|
-- Staging-Tabelle: Rohdaten mit ETL-Metadaten CREATE TABLE dbo.Staging_Belege ( -- Alle Felder aus der Quelle: BelegNr NVARCHAR(20) NOT NULL, BelegDatum DATE NOT NULL, KundenID NVARCHAR(20) NOT NULL, Betrag_Netto DECIMAL(14,2) NOT NULL, Belegart NVARCHAR(5) NOT NULL, -- ETL-Metadaten (nicht aus Quelle): ETL_Batch_ID UNIQUEIDENTIFIER NOT NULL DEFAULT NEWID(), ETL_Ladedatum DATETIME2 NOT NULL DEFAULT SYSDATETIME(), ETL_Quelle NVARCHAR(50) NOT NULL DEFAULT 'ERP_Prod', ETL_Status NVARCHAR(20) NOT NULL DEFAULT 'Geladen' -- KEIN Primary Key auf Staging — Duplikate sind erlaubt und erwünscht ); |
Die ERP-Quelltabelle ändert sich durch ein Update: eine Spalte wird umbenannt, ein Datentyp geändert. Der ETL-Job schlägt lautlos fehl oder — schlimmer — lädt ohne Fehler falsche Daten. Niemand bemerkt es für drei Tage.
LÖSUNG:
✓ Schema-Check als erster Schritt jedes ETL-Jobs: Existieren alle erwarteten Spalten? Stimmen die Datentypen?
✓ Bei Schema-Abweichung: Job sofort abbrechen und Alert senden — nie mit falschem Schema weiterlaufen.
|
-- Schema-Validierung vor ETL-Ausführung DECLARE @Fehler NVARCHAR(MAX) = '';
-- Prüfen ob Pflicht-Spalten existieren: SELECT @Fehler = @Fehler + CASE WHEN NOT EXISTS ( SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'Belege' AND COLUMN_NAME = 'BelegNr' AND DATA_TYPE = 'nvarchar' ) THEN 'FEHLER: Spalte BelegNr fehlt oder falscher Typ. ' ELSE '' END;
SELECT @Fehler = @Fehler + CASE WHEN NOT EXISTS ( SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'Belege' AND COLUMN_NAME = 'Betrag_Netto' AND DATA_TYPE IN ('decimal','numeric') ) THEN 'FEHLER: Spalte Betrag_Netto fehlt oder falscher Typ. ' ELSE '' END;
IF LEN(@Fehler) > 0 BEGIN RAISERROR(@Fehler, 16, 1); RETURN; -- ETL-Job abbrechen END |
Ein ETL-Job schlägt um 03:00 Uhr fehl. Niemand bemerkt es. Um 09:00 Uhr fragt der Vertriebsleiter warum sein Dashboard gestrige Zahlen zeigt. Stundenlange Fehlersuche beginnt — ohne Fehler-Log, ohne Spur.
LÖSUNG:
✓ Strukturiertes ETL-Log für jeden Job: Start, Ende, Zeilenzahl, Status, Fehlermeldung.
✓ E-Mail-Alert bei jedem fehlgeschlagenen Job — innerhalb von 5 Minuten nach Fehler.
✓ SQL Agent Job: Notifications bei Failure konfigurieren (Database Mail erforderlich).
|
-- ETL-Log-Tabelle: CREATE TABLE dbo.ETL_Log ( LogID INT IDENTITY PRIMARY KEY, JobName NVARCHAR(100) NOT NULL, StartZeitpunkt DATETIME2 NOT NULL, EndeZeitpunkt DATETIME2 NULL, Status NVARCHAR(20) NOT NULL DEFAULT 'Laufend', ZeilenGeladen INT NULL, ZeilenFehler INT NULL DEFAULT 0, Fehlermeldung NVARCHAR(MAX) NULL, BatchID UNIQUEIDENTIFIER NOT NULL DEFAULT NEWID(), LetzterLadezeitpunkt DATETIME2 NULL -- Delta-Load-Basis );
-- Job-Start protokollieren: DECLARE @LogID INT; INSERT INTO dbo.ETL_Log (JobName, StartZeitpunkt) VALUES ('ETL_Belege_Delta', SYSDATETIME()); SET @LogID = SCOPE_IDENTITY();
-- Job-Ende protokollieren (in CATCH-Block): UPDATE dbo.ETL_Log SET EndeZeitpunkt = SYSDATETIME(), Status = 'Fehler', Fehlermeldung = ERROR_MESSAGE() WHERE LogID = @LogID; |
CSV-Dateien aus dem ERP enthalten Umlaute. Werden sie mit falscher Codierung (Windows-1252 statt UTF-8) geladen, entstehen Sonderzeichen-Artefakte: "M¸ller" statt "Müller". Betrifft alle Systeme, die Kundennamen oder Adressen aus Dateien laden.
LÖSUNG:
✓ Codierung jeder Dateiquelle explizit dokumentieren und im ETL-Job konfigurieren.
✓ Für deutschsprachige ERP-Systeme: Windows-1252 oder ISO-8859-1 ist häufig, nicht UTF-8.
✓ Validierungsschritt nach dem Laden: Stichprobe auf Umlaute prüfen.
|
# Python: CSV mit korrekter Codierung laden import pandas as pd
# Codierung explizit angeben: df = pd.read_csv( 'export_kunden.csv', encoding='windows-1252', # oder 'utf-8-sig' für UTF-8 mit BOM sep=';', # deutsches CSV: Semikolon als Trennzeichen decimal=',', # deutsches Format: Komma als Dezimalzeichen thousands='.', # Punkt als Tausendertrennzeichen parse_dates=['Datum'], dayfirst=True # deutsches Datumsformat: TT.MM.JJJJ )
# Validierung: Umlaut-Check auf Stichprobe umlaut_check = df['KundeName'].str.contains( '[äöüÄÖÜß]', regex=True, na=False ).sum() print(f"Datensätze mit Umlauten: {umlaut_check} (Plausibilitätsprüfung)") |
Eine REST-API liefert Datumsangaben als "2026-03-15T00:00:00Z" (UTC). Das ERP speichert Daten in lokaler Zeit (MEZ/MESZ). Ohne explizite Zeitzonenbehandlung entstehen Datumsverschiebungen: Buchungen vom 31. März um 23:00 Uhr MEZ landen in der Analyse als 1. April.
LÖSUNG:
✓ Zeitzone jeder Datenquelle dokumentieren und im ETL explizit konvertieren.
✓ Alle Timestamps im Data Warehouse als UTC speichern — Konvertierung zur Darstellungszeit im BI-Tool.
✓ Datum und Uhrzeit immer getrennt halten; nie als reines Datum speichern, wenn Uhrzeit relevant ist.
|
-- Zeitzonenkonvertierung in SQL Server (2016+) -- UTC → MEZ/MESZ: SELECT BelegNr, BelegDatum_UTC, -- Konvertierung UTC nach Europe/Berlin (inkl. Sommerzeit): BelegDatum_UTC AT TIME ZONE 'UTC' AT TIME ZONE 'W. Europe Standard Time' AS BelegDatum_Lokal, -- Nur Datum (ohne Zeit) für Dimensionsanbindung: CAST(BelegDatum_UTC AT TIME ZONE 'UTC' AT TIME ZONE 'W. Europe Standard Time' AS DATE) AS BelegDatum FROM dbo.Staging_Belege_API; |
Der ETL-Job läuft zweimal (manuell neu gestartet nach Fehler). Alle Datensätze sind jetzt doppelt im Data Warehouse. Aggregationen liefern doppelte Werte. Reports zeigen Umsatz × 2.
LÖSUNG:
✓ ETL-Jobs müssen idempotent sein: mehrfaches Ausführen mit identischem Input → identisches Ergebnis.
✓ MERGE (Upsert) statt blindem INSERT für alle Zieltabellen.
✓ Batch-ID in Staging nutzen: vor erneutem Laden alte Batch-Daten löschen.
|
-- Idempotentes Laden via MERGE (Upsert): MERGE dbo.Dim_Kunden AS Ziel USING ( SELECT DISTINCT KundenID, KundeName, PLZ, Stadt FROM dbo.Staging_Kunden WHERE ETL_Batch_ID = @AktuellerBatch ) AS Quelle ON (Ziel.KundenID = Quelle.KundenID) WHEN MATCHED AND ( Ziel.KundeName <> Quelle.KundeName OR Ziel.PLZ <> Quelle.PLZ OR Ziel.Stadt <> Quelle.Stadt ) THEN UPDATE SET KundeName = Quelle.KundeName, PLZ = Quelle.PLZ, Stadt = Quelle.Stadt, GeaendertAm = SYSDATETIME() WHEN NOT MATCHED BY TARGET THEN INSERT (KundenID, KundeName, PLZ, Stadt, AngelgtAm) VALUES (Quelle.KundenID, Quelle.KundeName, Quelle.PLZ, Quelle.Stadt, SYSDATETIME()); |
Datenbankpasswörter, API-Keys und Verbindungsstrings stehen direkt im ETL-Skript oder SSIS-Paket. Wenn der Code in Git eingecheckt wird, sind Zugangsdaten öffentlich. Wenn der Datenbankserver umgezogen wird, müssen alle Scripts manuell angepasst werden.
LÖSUNG:
✓ Verbindungsstrings niemals im Code — immer in Konfigurationsdateien, Umgebungsvariablen oder Secret Stores.
✓ Für SSIS: Project Connection Manager mit Parametern. Für Python: .env-Dateien (nie in Git). Für ADF: Azure Key Vault.
|
# Python: Zugangsdaten aus Umgebungsvariablen (nie im Code!) import os from dotenv import load_dotenv # pip install python-dotenv import pyodbc
# .env-Datei laden (liegt NICHT in Git: in .gitignore eintragen!) load_dotenv()
conn_str = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={os.getenv('DB_SERVER')};" f"DATABASE={os.getenv('DB_NAME')};" f"UID={os.getenv('DB_USER')};" f"PWD={os.getenv('DB_PASSWORD')};" f"Encrypt=yes;TrustServerCertificate=no;" ) conn = pyodbc.connect(conn_str)
# .env-Datei (Beispiel, NIEMALS committen): # DB_SERVER=srv-dw01.intern # DB_NAME=DWH_Prod # DB_USER=etl_service # DB_PASSWORD=SuperSecretPassword! |
Ein SSIS-Paket läuft auf dem Produktions-SQL-Server und liest direkt aus dem ERP. Während der ETL-Job läuft (02:00–05:00 Uhr), laufen auch Nachtbatch-Jobs des ERP. Beide konkurrieren um CPU und I/O. Morgens klagen Nutzer über Systemlangsamkeit.
LÖSUNG:
✓ ETL-Jobs laufen auf einem dedizierten Server oder einer dedizierten Instanz — nie auf dem OLTP-Server.
✓ Für SQL-Server-Umgebungen: Always-On mit Readable Secondary als ETL-Quelle nutzen.
✓ Zeitfenster-Planung: ETL startet erst, wenn Nachtbatch des ERP abgeschlossen ist — via Job-Abhängigkeiten.
Ein ETL-Job verarbeitet 10 Tabellen. Tabelle 7 schlägt fehl. Der gesamte Job wird neu gestartet — die ersten 6 Tabellen werden erneut geladen (Zeitverschwendung), Tabelle 7 schlägt wieder fehl (keine Ursachenanalyse), und der Entwickler verbringt den Vormittag mit dem gleichen Problem.
LÖSUNG:
✓ Checkpoint-Logik: Jede Tabelle/Entität wird einzeln protokolliert. Wiederanlauf beginnt ab dem letzten fehlgeschlagenen Schritt.
✓ Fehlerursache isolieren: Schlägt eine Tabelle fehl, werden die anderen trotzdem geladen.
✓ Retry-Logik für transiente Fehler (Netzwerkunterbrechung, Timeout): max. 3 Versuche, dann Alert.
03
|
-- Linked Server einrichten (einmalig, DBA-Aufgabe): EXEC sp_addlinkedserver @server = 'SRV-ERP01', @srvproduct = 'SQL Server';
EXEC sp_addlinkedsrvlogin @rmtsrvname = 'SRV-ERP01', @useself = 'FALSE', @rmtuser = 'etl_reader', -- Read-only Service Account! @rmtpassword = 'ReadOnlyPassword';
-- Delta-Abfrage über Linked Server: SELECT b.* FROM [SRV-ERP01].[ERP_Prod].[dbo].[Belege] b WHERE b.ModifiedDate > @LastLoadDate AND b.Belegart IN ('RE','GS','ST');
-- Alternativ: Direkte ODBC-Verbindung (besser für große Datenmengen): -- In SSIS: OLE DB Source mit Native Client -- In Python: pyodbc (siehe Falle 08 oben) -- In ADF: SQL Server Linked Service |
|
# Python: REST-API mit Paginierung und Fehlerbehandlung import requests, time, json import pandas as pd from datetime import datetime, timedelta
BASE_URL = "https://api.erp-system.de/v2" API_KEY = os.getenv("ERP_API_KEY") # Nie hardcoded! HEADERS = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
def fetch_orders(since_date: str, page: int = 1) -> dict: """Aufträge seit Datum laden, mit Retry bei Rate-Limit.""" for attempt in range(3): resp = requests.get( f"{BASE_URL}/orders", headers=HEADERS, params={"modified_since": since_date, "page": page, "page_size": 500}, timeout=30 ) if resp.status_code == 429: # Rate Limit time.sleep(int(resp.headers.get("Retry-After", 60))) continue resp.raise_for_status() return resp.json() raise RuntimeError("API nach 3 Versuchen nicht erreichbar")
# Alle Seiten laden (Paginierung): all_orders, page = [], 1 since = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%SZ")
while True: data = fetch_orders(since, page) orders = data.get("items", []) if not orders: break all_orders.extend(orders) if not data.get("has_next_page"): break page += 1
df = pd.json_normalize(all_orders) # Verschachtelte JSON → flach print(f"Geladen: {len(df)} Aufträge seit {since}") |
|
# Python: CSV-Eingang robust laden — mit Validierung import pandas as pd import os, hashlib from pathlib import Path
EINGANG_ORDNER = Path("D:/ETL/Eingang/Kunden") ARCHIV_ORDNER = Path("D:/ETL/Archiv/Kunden")
def lade_csv_datei(pfad: Path) -> pd.DataFrame: """CSV laden, validieren, archivieren.""" # Prüfsumme für Audit-Trail md5 = hashlib.md5(pfad.read_bytes()).hexdigest()
df = pd.read_csv( pfad, encoding='windows-1252', sep=';', decimal=',', dtype={ # Typen explizit — nie raten lassen! 'KundenNr': str, 'PLZ': str, # PLZ als String: '01234' bleibt '01234' 'Umsatz': float }, parse_dates=['AnlagedatumText'], dayfirst=True, na_values=['', 'NULL', 'N/A', '-'] )
# Pflichtfeld-Validierung: fehlend = df[df['KundenNr'].isna() | df['Name'].isna()] if len(fehlend) > 0: raise ValueError( f"{len(fehlend)} Zeilen ohne KundenNr oder Name in {pfad.name}" )
# Datei archivieren (mit Timestamp): archiv_pfad = ARCHIV_ORDNER / f"{pfad.stem}_{pd.Timestamp.now():%Y%m%d_%H%M%S}{pfad.suffix}" pfad.rename(archiv_pfad)
print(f"Geladen: {len(df)} Zeilen | MD5: {md5} | Archiv: {archiv_pfad.name}") return df
# Alle wartenden Dateien verarbeiten: for csv_datei in EINGANG_ORDNER.glob("*.csv"): df = lade_csv_datei(csv_datei) # → weiter in Staging laden |
|
# Python: Excel-Quelldateien — häufig das schwierigste Format import pandas as pd from openpyxl import load_workbook
EXCEL_PFAD = "D:/Planung/Budget_2026.xlsx"
# Strukturiertes Excel laden (Tabelle beginnt in Zelle B3): df_budget = pd.read_excel( EXCEL_PFAD, sheet_name="Umsatzplanung", header=2, # Dritte Zeile ist Header (0-basiert = 2) usecols="B:H", # Nur Spalten B bis H dtype={ 'Monat': str, 'Planwert_EUR': float } )
# Häufige Excel-Probleme abfangen: # 1. Leere Zeilen am Ende entfernen: df_budget = df_budget.dropna(how='all')
# 2. Formatierte Zahlen bereinigen ("1.234.567,89" → 1234567.89): df_budget['Planwert_EUR'] = ( df_budget['Planwert_EUR'] .astype(str) .str.replace('.', '', regex=False) # Tausendertrennzeichen .str.replace(',', '.', regex=False) # Dezimalkomma → Punkt .astype(float) )
# 3. Monate normalisieren (Jan, Januar, 01 → einheitlich): monat_map = { 'Jan': 1, 'Feb': 2, 'Mrz': 3, 'Apr': 4, 'Mai': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Okt': 10, 'Nov': 11, 'Dez': 12, 'Januar': 1, 'Februar': 2 # ... etc. } df_budget['Monat_Nr'] = df_budget['Monat'].map(monat_map) |
|
# Python: OData-Endpunkt (Dynamics 365, SAP S/4HANA) import requests, os import pandas as pd
# Dynamics 365 Business Central OData v4: BC_URL = f"https://api.businesscentral.dynamics.com/v2.0/{os.getenv('BC_TENANT')}/production/api/v2.0" BC_TOKEN = os.getenv("BC_ACCESS_TOKEN") # OAuth2-Token aus Azure AD
def fetch_odata_all_pages(endpoint: str, filter_expr: str = None) -> list: """OData-Endpunkt mit Paginierung laden.""" url = f"{BC_URL}/{endpoint}" params = {"$top": 500} if filter_expr: params["$filter"] = filter_expr
items = [] while url: resp = requests.get( url, headers={"Authorization": f"Bearer {BC_TOKEN}"}, params=params if items == [] else None, # Params nur auf erste Seite timeout=30 ) resp.raise_for_status() data = resp.json() items.extend(data.get("value", [])) url = data.get("@odata.nextLink") # Nächste Seite (None = fertig) return items
# Kunden seit gestern laden: kunden = fetch_odata_all_pages( "customers", filter_expr="lastModifiedDateTime gt 2026-03-14T00:00:00Z" ) df_kunden = pd.json_normalize(kunden) print(f"Kunden geladen: {len(df_kunden)}") |
04
|
Strategie |
Wann geeignet |
Vorteile |
Nachteile |
|
**Full Load** |
Kleine Tabellen (< 100K Zeilen), keine Timestamp-Spalte, wöchentliche Frequenz |
Einfach, robust, kein Delta-State |
Langsam bei großen Mengen, hohe Last auf Quelle |
|
**Delta Load (Timestamp)** |
Tabellen mit ModifiedDate/UpdatedAt-Spalte, tägliche Frequenz |
Schnell, wenig Quelllast |
Benötigt zuverlässigen Timestamp in der Quelle |
|
**Delta Load (Watermark)** |
Tabellen mit steigender ID oder Sequenznummer |
Sehr einfach, kein Timestamp nötig |
Erfasst keine Updates auf alten Zeilen |
|
**CDC (Change Data Capture)** |
Hohe Datenvolumen, häufige Updates, stündliche Frequenz |
Erfasst alle Änderungen zuverlässig |
SQL-Server-Feature, Konfigurationsaufwand |
|
**Streaming** |
Echtzeit-Anforderungen, IoT, Transaktionen < 5 Min. alt |
Minimale Latenz |
Hohe Komplexität, Infrastrukturaufwand |
|
-- CDC auf Datenbank aktivieren (einmalig, DBA-Aufgabe): EXEC sys.sp_cdc_enable_db;
-- CDC für eine Tabelle aktivieren: EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'Belege', @role_name = NULL, -- NULL = keine Rollenbeschränkung @supports_net_changes = 1; -- Net Changes: nur letzter Zustand je Key
-- CDC-Daten abfragen (alle Änderungen seit letztem Run): DECLARE @von BINARY(10) = sys.fn_cdc_get_min_lsn('dbo_Belege'); DECLARE @bis BINARY(10) = sys.fn_cdc_get_max_lsn();
-- Net Changes (nur letzter Stand je Datensatz): SELECT __$operation, -- 1=Delete, 2=Insert, 4=Update (after) BelegNr, BelegDatum, KundenID, Betrag_Netto, Belegart FROM cdc.fn_cdc_get_net_changes_dbo_Belege(@von, @bis, 'all') ORDER BY __$seqval;
-- Achtung: CDC-Log wächst — Retention konfigurieren (Standard: 3 Tage) EXEC sys.sp_cdc_change_job @job_type = 'cleanup', @retention = 4320; -- Minuten = 3 Tage |
|
-- dbt-Modell: incrementales Laden (mart/fact_umsatz.sql) {{ config( materialized = 'incremental', unique_key = 'umsatz_id', on_schema_change = 'fail' ) }}
WITH quelle AS ( SELECT {{ dbt_utils.surrogate_key(['belegNr','belegDatum']) }} AS umsatz_id, belegNr, belegDatum, kundenID, betrag_netto, belegart, CURRENT_TIMESTAMP AS dbt_updated_at FROM {{ ref('stg_erp__belege') }} WHERE belegart IN ('RE','GS','ST') AND storniert = FALSE {% if is_incremental() %} -- Bei inkrementellem Run: nur neue/geänderte Datensätze AND dbt_updated_at > (SELECT MAX(dbt_updated_at) FROM {{ this }}) {% endif %} ) SELECT * FROM quelle |
05
|
DREI-SCHICHTEN-MODELL FÜR KMU-DATENPIPELINES: ══════════════════════════════════════════════════════════════ SCHICHT 1 — STAGING (Raw / Bronze) Zweck: Rohdaten unverändert sichern Inhalt: 1:1-Kopie der Quelle + ETL-Metadaten Schema: Flach, VARCHAR für alles (Typen noch nicht erzwungen) Aktualisierung: Append-only (alte Daten bleiben für Debugging) Aufbewahrung: 7–30 Tage (danach automatisch löschen)
↓ TRANSFORMATION: Typisierung, Bereinigung, Deduplizierung
SCHICHT 2 — INTEGRATION (Cleansed / Silver) Zweck: Bereinigte, typisierte, deduplizierte Daten Inhalt: Alle Quellen integriert, Business Keys vereinheitlicht Schema: Vollständig typisiert, FK-Constraints, NULL-Regeln Aktualisierung: Upsert (Merge), historisiert (SCD2 wo nötig) Aufbewahrung: Vollständig historisch
↓ TRANSFORMATION: Aggregation, KPI-Berechnung, Modellierung
SCHICHT 3 — PRESENTATION (Curated / Gold) Zweck: BI-fertige Daten — Star Schema, flache Tabellen Inhalt: Fact-Tabellen, Dimensions, Pre-aggregierte KPIs Schema: Denormalisiert, für Leseperformance optimiert Aktualisierung: Vollständiger Rebuild oder inkrementell Zugriff: BI-Tools (Power BI, Tableau, Qlik) lesen nur hier ══════════════════════════════════════════════════════════════ |
|
-- BEREINIGUNG: Leerzeichen trimmen, NULL-Behandlung, Standardisierung INSERT INTO dbo.Integration_Kunden SELECT TRIM(KundenNr) AS KundenNr, TRIM(UPPER(KundeName)) AS KundeName, -- Telefonformat vereinheitlichen: REGEXP_REPLACE(Telefon, '[^0-9+]', '') AS Telefon_Bereinigt, -- PLZ auf 5 Stellen auffüllen (österreichische Adressen: 4 Stellen): RIGHT('00000' + TRIM(PLZ), 5) AS PLZ_Normiert, -- NULL-Handling: fehlende E-Mail → Platzhalter: COALESCE(NULLIF(TRIM(Email), ''), 'unbekannt@placeholder.intern') AS Email, -- Datumsbereinigung: ungültige Daten abfangen: CASE WHEN TRY_CAST(AnlagedatumText AS DATE) IS NOT NULL THEN TRY_CAST(AnlagedatumText AS DATE) ELSE NULL END AS Anlagedatum, SYSDATETIME() AS ETL_IntegrationsDatum FROM dbo.Staging_Kunden WHERE ETL_Batch_ID = @AktuellerBatch AND TRIM(KundenNr) IS NOT NULL -- Pflichtfeld-Filter
-- DEDUPLIZIERUNG: Letzten gültigen Datensatz je Business Key behalten WITH Dedupliziert AS ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY KundenNr ORDER BY ETL_Ladedatum DESC, -- neueste Ladung zuerst AnlagedatumText DESC ) AS RN FROM dbo.Staging_Kunden WHERE ETL_Batch_ID = @AktuellerBatch ) SELECT * FROM Dedupliziert WHERE RN = 1; |
|
-- DQ-Check: Vollständigkeit und Konsistenz vor Integration DECLARE @DQ_Fehler TABLE ( CheckName NVARCHAR(100), Anzahl_Zeilen INT, Beschreibung NVARCHAR(500) );
-- Check 1: Fehlende Pflichtfelder INSERT @DQ_Fehler SELECT 'PflichtfeldKundenNr', COUNT(*), 'Datensätze ohne KundenNr' FROM dbo.Staging_Kunden WHERE KundenNr IS NULL OR TRIM(KundenNr) = '' AND ETL_Batch_ID = @AktuellerBatch HAVING COUNT(*) > 0;
-- Check 2: Negative Beträge (nur wenn unzulässig) INSERT @DQ_Fehler SELECT 'NegativeBetrag', COUNT(*), 'Bestellzeilen mit negativem Einzelpreis' FROM dbo.Staging_Bestellzeilen WHERE Einzelpreis < 0 AND Belegart NOT IN ('GS', 'ST') -- Gutschriften/Stornos dürfen negativ sein AND ETL_Batch_ID = @AktuellerBatch HAVING COUNT(*) > 0;
-- Wenn Fehler: in Log schreiben und abbrechen IF EXISTS (SELECT 1 FROM @DQ_Fehler) BEGIN INSERT INTO dbo.DQ_Fehler_Log (BatchID, JobName, CheckName, Anzahl, Beschreibung, Zeitpunkt) SELECT @AktuellerBatch, 'ETL_Integration', CheckName, Anzahl_Zeilen, Beschreibung, SYSDATETIME() FROM @DQ_Fehler;
RAISERROR('DQ-Fehler gefunden — Integration abgebrochen. Siehe DQ_Fehler_Log.', 16, 1); END |
06
|
-- ETL-Status-Übersicht: alle Jobs der letzten 7 Tage SELECT JobName, MAX(CASE WHEN Status = 'Erfolgreich' THEN 1 ELSE 0 END) AS Heute_OK, COUNT(*) FILTER (WHERE Status = 'Erfolgreich' AND StartZeitpunkt >= DATEADD(DAY,-7,GETDATE())) AS Erfolge_7T, COUNT(*) FILTER (WHERE Status = 'Fehler' AND StartZeitpunkt >= DATEADD(DAY,-7,GETDATE())) AS Fehler_7T, MAX(EndeZeitpunkt - StartZeitpunkt) AS Max_Laufzeit, AVG(DATEDIFF(SECOND, StartZeitpunkt, EndeZeitpunkt)) AS Avg_Sek, MAX(ZeilenGeladen) AS Max_Zeilen, MAX(StartZeitpunkt) AS Letzter_Start FROM dbo.ETL_Log WHERE StartZeitpunkt >= DATEADD(DAY, -7, GETDATE()) GROUP BY JobName ORDER BY Fehler_7T DESC, Letzter_Start DESC;
-- Alle aktuell laufenden Jobs (Timeout-Erkennung): SELECT JobName, StartZeitpunkt, DATEDIFF(MINUTE, StartZeitpunkt, GETDATE()) AS Laufzeit_Minuten, CASE WHEN DATEDIFF(MINUTE, StartZeitpunkt, GETDATE()) > 120 THEN 'TIMEOUT-VERDACHT' ELSE 'Laufend' END AS Warnung FROM dbo.ETL_Log WHERE Status = 'Laufend' AND StartZeitpunkt > DATEADD(HOUR, -24, GETDATE()) ORDER BY StartZeitpunkt; |
|
-- SQL Agent Alert: bei fehlgeschlagenem Job E-Mail senden -- (Vorbedingung: Database Mail konfiguriert, SQL Agent läuft)
-- Im CATCH-Block des ETL-Skripts: EXEC msdb.dbo.sp_send_dbmail @profile_name = 'ETL-Alerts', @recipients = 'dba@firma.de; bi-team@firma.de', @subject = 'ETL-FEHLER: ' + @JobName + ' — ' + CONVERT(NVARCHAR, GETDATE(), 104), @body = 'ETL-Job fehlgeschlagen.' + CHAR(13) + 'Job: ' + @JobName + CHAR(13) + 'Start: ' + CONVERT(NVARCHAR, @StartZeit, 120) + CHAR(13) + 'Fehler: ' + ERROR_MESSAGE() + CHAR(13) + 'Zeile: ' + CAST(ERROR_LINE() AS NVARCHAR) + CHAR(13) + CHAR(13) + 'Bitte ETL_Log prüfen (LogID: ' + CAST(@LogID AS NVARCHAR) + ')', @importance = 'High'; |
|
# Python: Retry mit exponentiellem Backoff für API-Aufrufe import time, functools from typing import Callable, TypeVar
T = TypeVar('T')
def mit_retry(max_versuche: int = 3, basis_pause: float = 5.0): """Dekorator: Funktion bei Fehler bis zu max_versuche Mal wiederholen.""" def dekorator(fn: Callable[..., T]) -> Callable[..., T]: @functools.wraps(fn) def wrapper(*args, **kwargs) -> T: letzter_fehler = None for versuch in range(1, max_versuche + 1): try: return fn(*args, **kwargs) except (ConnectionError, TimeoutError) as e: letzter_fehler = e if versuch == max_versuche: break pause = basis_pause * (2 ** (versuch - 1)) # 5s, 10s, 20s print(f"Versuch {versuch} fehlgeschlagen: {e}. " f"Warte {pause:.0f}s...") time.sleep(pause) raise RuntimeError( f"Nach {max_versuche} Versuchen aufgegeben: {letzter_fehler}" ) return wrapper return dekorator
@mit_retry(max_versuche=3, basis_pause=5.0) def lade_api_seite(url: str, headers: dict) -> dict: """API-Aufruf mit automatischem Retry.""" response = requests.get(url, headers=headers, timeout=30) response.raise_for_status() return response.json() |
07
|
Kriterium |
SSIS |
Azure Data Factory |
Power Query |
dbt Core |
Python |
|
Zielgruppe |
DBA / ETL-Entwickler |
Cloud-Architekt |
BI-Entwickler |
Data Engineer |
Data Engineer |
|
Deployment |
On-Premise / Server |
Cloud (Azure) |
Power BI Service |
Any SQL DB |
Any |
|
Vorkenntnisse |
SQL + SSIS UI |
ADF + ARM/JSON |
M-Sprache |
SQL + YAML |
Python + SQL |
|
Einstiegshürde |
Mittel |
Mittel-Hoch |
Niedrig |
Niedrig-Mittel |
Mittel |
|
Kosten |
SQL Server Lizenz |
Pay-per-Use (~0–200 €/Monat) |
In Power BI enthalten |
Kostenlos |
Infrastruktur |
|
Skalierung |
Begrenzt (1 Server) |
Sehr gut (Cloud) |
Eingeschränkt |
Sehr gut |
Sehr gut |
|
Monitoring |
SQL Agent + Alert |
Azure Monitor |
Manuell |
Logs + Tests |
Individuell |
|
Versionierung/CI/CD |
Schlecht |
Gut (ARM/Git) |
Schlecht |
Ausgezeichnet (Git) |
Ausgezeichnet |
|
Empfehlung KMU |
On-Premise, SQL Server vorhanden |
Azure/M365-Stack, Cloud-Bereitschaft |
Power BI ohne separaten ETL |
SQL-affine Teams, DWH |
Maximale Flexibilität |
→ SSIS: Empfehlenswert wenn SQL Server bereits vorhanden ist, die IT-Abteilung SSIS kennt, und keine Cloud-Migration geplant ist. SSIS ist ausgereift, gut dokumentiert und kostet bei vorhandener SQL-Server-Lizenz nichts extra.
→ Azure Data Factory (ADF): Empfehlenswert wenn Microsoft Azure bereits genutzt wird, Cloud-native Skalierung gewünscht ist, und Daten aus vielen heterogenen Quellen (Azure Blob, REST-APIs, On-Premise) integriert werden müssen.
→ Power Query (Dataflows): Empfehlenswert für einfache ETL-Prozesse, die von BI-Entwicklern ohne SQL-Kenntnisse gebaut werden sollen. Grenzen: Transformationslogik schwer testbar, Debugging aufwendig, nicht geeignet für große Datenvolumen.
→ dbt Core: Empfehlenswert für den Transformationsschicht (Silver → Gold) wenn SQL-Kenntnisse vorhanden sind. dbt bringt Versionierung, Tests und Dokumentation nativ mit — ideal in Kombination mit SSIS oder ADF für Extraktion und Python für komplexe Transformation.
→ Python: Empfehlenswert wenn keine geeigneten Konnektoren existieren (exotische APIs, spezielle Dateiformate), komplexe Transformationslogik nötig ist, oder das Team Python-Kenntnisse mitbringt. Python eignet sich hervorragend als Orchestrierungsschicht.
|
EMPFOHLENER ETL-STACK JE KMU-TYP (2026): ══════════════════════════════════════════════════════════════ KMU MIT SQL SERVER ON-PREMISE (klassisch): Extraktion: SSIS (SQL Server Agent getriggert) Transformation: dbt Core (auf SQL Server) Laden: dbt + SSIS Monitoring: SQL Agent + Database Mail + Power BI Kosten: ~0 € Zusatzlizenz
KMU MIT MICROSOFT 365 / AZURE: Extraktion: Azure Data Factory (Managed IR) Transformation: dbt Core oder ADF Data Flow Laden: ADF → Azure SQL oder Synapse Monitoring: Azure Monitor + Logic Apps Alerts Kosten: ~50–300 €/Monat je nach Datenvolumen
KMU MIT HETEROGENEN QUELLEN (viele APIs, Dateien): Extraktion: Python (pandas, requests, pyodbc) Scheduling: Apache Airflow (Docker) oder Prefect Transformation: dbt Core Laden: sqlalchemy → SQL Server / PostgreSQL Monitoring: Airflow UI + E-Mail-Alerts Kosten: Infrastruktur (VM: ~30–80 €/Monat) ══════════════════════════════════════════════════════════════ |
08
|
PIPELINE-DOKUMENTATIONSBLATT: ══════════════════════════════════════════════════════════════ Pipeline-Name: ETL_Belege_Delta Pipeline-ID: ETL-2026-003 Version: 2.1 — Stand 2026-02-15 ────────────────────────────────────────────────────────────── ZWECK: Täglicher Delta-Load der ERP-Buchungsbelege (Umsatzrelevant) von MACH ERP (SRV-ERP01) in das Data Warehouse (SRV-DW01).
ZEITPLAN: Täglich 03:00 Uhr | SQL Agent Job: ETL_DW_Belege Abhängigkeit: startet erst nach Abschluss von ETL-002 (Stammdaten) Erwartete Laufzeit: 8–15 Min. | Timeout-Schwellenwert: 60 Min. ────────────────────────────────────────────────────────────── DATENFLUSS: Quelle: [SRV-ERP01].ERP_Prod.dbo.Belege Filter: ModifiedDate > LastLoadDate Belegart: RE, GS, ST | Storniert = 0 Staging: [SRV-DW01].DWH_Staging.dbo.Staging_Belege_Delta Transform: Typ-Konvertierung, Deduplizierung, Normalisierung Ziel: [SRV-DW01].DWH_Prod.dbo.Fact_Umsatz (MERGE) ────────────────────────────────────────────────────────────── ABHÄNGIGE SYSTEME: → Power BI Dataset "Umsatz KMU v4" (Refresh 06:00) → Report "Monats-Cockpit Vertrieb" → Report "CFO-Dashboard" ────────────────────────────────────────────────────────────── TESTFÄLLE: T1: Delta lädt nur geänderte Datensätze (kein Full Reload) T2: Stornos reduzieren Umsatz korrekt (negative Beträge) T3: Duplikate werden nicht in Zieltabelle übernommen T4: ETL-Log enthält Zeilenanzahl und Statuseintrag T5: Bei Schema-Fehler: Job abbrechend, Alert gesendet ────────────────────────────────────────────────────────────── BEKANNTE SONDERFÄLLE: ■ Monatsabschluss-Korrekturen (immer 5.–8. des Folgemonats): erhöhtes Datenvolumen — Laufzeit bis 25 Min. erwartet ■ ERP-Wartungsfenster: Samstag 22:00–24:00. Job kann fehlschlagen, wird Sonntag 03:00 automatisch wiederholt. ────────────────────────────────────────────────────────────── WIEDERHERSTELLUNG BEI AUSFALL: 1. ETL_Log auf Fehler prüfen 2. Staging_Belege_Delta bereinigen (DELETE WHERE Batch_ID = ...) 3. LastLoadDate im ETL_Log zurücksetzen 4. Job manuell starten Eskalation: DBA-Team → bi-team@firma.de ────────────────────────────────────────────────────────────── Technischer Owner: BI-Entwickler (IT-Abteilung) Fachlicher Owner: CFO / Controlling-Leitung Letzter Review: 2026-02-15 Nächster Review: 2026-08-15 ══════════════════════════════════════════════════════════════ |
09
|
-- Service Accounts für ETL: minimale Rechte (Principle of Least Privilege)
-- 1. Lese-Account für Quelldatenbank (ERP): CREATE LOGIN etl_reader_erp WITH PASSWORD = 'StrongPassword2026!'; USE ERP_Prod; CREATE USER etl_reader_erp FOR LOGIN etl_reader_erp; -- Nur SELECT auf relevante Tabellen: GRANT SELECT ON dbo.Belege TO etl_reader_erp; GRANT SELECT ON dbo.Kunden TO etl_reader_erp; GRANT SELECT ON dbo.Artikel TO etl_reader_erp; -- KEIN INSERT, UPDATE, DELETE — schreibender Zugriff verboten
-- 2. Schreib-Account für Staging (DWH): CREATE LOGIN etl_writer_staging WITH PASSWORD = 'AnotherStrongPwd!'; USE DWH_Staging; CREATE USER etl_writer_staging FOR LOGIN etl_writer_staging; -- Nur auf Staging-Schema: GRANT SELECT, INSERT, DELETE ON SCHEMA::stg TO etl_writer_staging; -- KEIN Zugriff auf Integration- oder Presentation-Schema
-- 3. Transformation-Account (Integration → Presentation): -- Separate Login, darf NUR auf Integration lesen und Presentation schreiben |
|
-- DSGVO-Anforderungen in ETL-Prozessen:
-- 1. Pseudonymisierung personenbezogener Daten in Staging: INSERT INTO dbo.Staging_Kunden_Pseudonymisiert SELECT -- Business Key bleibt erhalten für Joins: KundenNr, -- Personenbezogene Daten pseudonymisieren: CONVERT(NVARCHAR(64), HASHBYTES('SHA2_256', CAST(KundenNr AS NVARCHAR) + 'SaltValue2026!' ), 2) AS KundeName_Hash, -- Adresse auf PLZ-Ebene aggregieren (Geo-Anonymisierung): LEFT(PLZ, 3) + 'XX' AS PLZ_Aggregiert, -- Nicht benötigte Felder weglassen: -- Email, Telefon, Geburtsdatum → nur wenn für Analyse nötig Umsatz_Netto, ETL_Ladedatum FROM dbo.Staging_Kunden_Roh;
-- 2. Automatische Löschroutine: Staging-Daten nach 14 Tagen CREATE OR ALTER PROCEDURE dbo.sp_Staging_Cleanup AS BEGIN -- Staging älter als 14 Tage löschen: DELETE FROM dbo.Staging_Belege_Delta WHERE ETL_Ladedatum < DATEADD(DAY, -14, SYSDATETIME());
DELETE FROM dbo.Staging_Kunden WHERE ETL_Ladedatum < DATEADD(DAY, -14, SYSDATETIME());
-- Protokollieren: INSERT INTO dbo.ETL_Log (JobName, StartZeitpunkt, Status, Fehlermeldung) VALUES ('Staging_Cleanup', SYSDATETIME(), 'Erfolgreich', 'Staging bereinigt: ' + CAST(@@ROWCOUNT AS NVARCHAR) + ' Zeilen'); END; -- Job: täglich 06:00 nach ETL-Abschluss |
|
SECRETS-MANAGEMENT — EMPFEHLUNGEN JE TECHNOLOGIE: ────────────────────────────────────────────────────────────── SSIS: ■ Verschlüsselung mit ProtectionLevel = EncryptSensitiveWithUserKey ■ Verbindungsstrings in SQL Server: SSIS Catalog Environments ■ NIEMALS als EncryptAllWithUserKey: portierbar, aber unsicher
Azure Data Factory: ■ Azure Key Vault Linked Service für alle Secrets ■ Managed Identity statt Service Principal wo möglich ■ ADF kann auf Key Vault zugreifen ohne Passwörter im Code
Python: ■ python-dotenv für lokale Entwicklung (.env NICHT in Git) ■ Produktiv: Umgebungsvariablen des Betriebssystems ■ Windows: Windows Credential Manager (keyring-Bibliothek) ■ Azure: Azure Key Vault über azure-keyvault-secrets
SQL Server: ■ Always Encrypted für hochsensible Spalten (Gehalt, Diagnosen) ■ Transparent Data Encryption (TDE) für Datei-Verschlüsselung ■ Connection Strings immer über Windows-Authentifizierung (Service Account) statt SQL-Login wo möglich ────────────────────────────────────────────────────────────── |
10
Dieser Plan führt ein kleines BI-Team (1–3 Personen) in 30 Tagen von einer unstrukturierten, fehleranfälligen Datenbeschaffung zu einer robusten, dokumentierten, überwachten Datenpipeline für die drei wichtigsten Datenquellen.
|
|
VOR DEM START Identifizieren Sie die drei wichtigsten Datenquellen Ihres BI-Systems — die Quellen, deren Ausfall die größten Auswirkungen auf Ihre Dashboards hätte. Diese drei Quellen werden im 30-Tage-Plan vollständig aufgebaut. Erstellen Sie vollständige Backups aller betroffenen Systeme und richten Sie eine separate Entwicklungsumgebung ein — keine ETL-Entwicklung direkt in der Produktion. |
■ TAG 1–2: IST-ANALYSE DER BESTEHENDEN DATENPIPELINES
■ Alle bestehenden ETL-Jobs, Skripte und manuellen Prozesse inventarisieren
■ Für jeden Prozess: Quelldatentyp, Frequenz, Volumen, letzter bekannter Fehler
■ Manuelle Prozesse identifizieren: Wer zieht welche Daten manuell und wie oft?
■ Schmerzpunkte kategorisieren: Was ist der häufigste Fehlerauslöser?
■ TAG 3–4: TOOL-ENTSCHEIDUNG UND ARCHITEKTURDESIGN
■ Tool-Entscheidung treffen (Kapitel 7): SSIS / ADF / Python / dbt — passend zur Infrastruktur
■ Drei-Schichten-Modell (Kapitel 5) auf eigene Umgebung übertragen: Datenbank-Schemas anlegen
■ ETL-Log-Tabelle (Kapitel 2, Falle 04) und DQ-Fehler-Log anlegen
■ Service-Accounts mit minimalen Rechten einrichten (Kapitel 9)
■ TAG 5–7: ERSTE QUELLENANBINDUNG (Quelle 1)
■ Quellentyp identifizieren (Kapitel 3) und passendes Anbindungsrezept wählen
■ Schema-Validierungs-Skript (Kapitel 2, Falle 03) für Quelle 1 schreiben
■ Staging-Tabelle anlegen und ersten Full Load durchführen
■ ETL-Log-Einträge prüfen: Stimmen Zeilenzahlen? Gibt es Fehler?
■ TAG 8–10: TRANSFORMATIONSLOGIK IMPLEMENTIEREN
■ DQ-Checks (Kapitel 5) für Quelle 1 implementieren: Pflichtfelder, Wertebereich, Typ
■ Bereinigungs-Transformationen schreiben: Trimmen, NULL-Handling, Datumsbereinigung
■ Idempotenz sicherstellen: MERGE statt INSERT, Batch-ID-Logik
■ Ersten vollständigen Durchlauf Staging → Integration → Presentation testen
■ TAG 11–13: QUELLEN 2 UND 3 ANBINDEN
■ Quellenanbindung für Quelle 2 und 3 analog zu Woche 1 aufbauen
■ Delta-Load-Strategie für alle drei Quellen festlegen (Kapitel 4)
■ CDC aktivieren wenn SQL-Server-Quelle und hohes Volumen (Kapitel 4)
■ DQ-Checks für alle drei Quellen implementieren
■ TAG 14: CROSS-SOURCE-VALIDIERUNG
■ Bekannte Kennzahl in neuer Pipeline mit bekanntem Wert aus ERP vergleichen
■ Abweichungen dokumentieren und Ursache identifizieren (Definition, Filter, Datum)
■ Validierung durch fachlichen Owner (Controller): "Stimmt das mit dem überein, was ich kenne?"
■ Erst nach erfolgreicher Validierung: weiter zu Woche 3
■ TAG 15–17: FEHLERBEHANDLUNG IMPLEMENTIEREN
■ TRY/CATCH-Blöcke in alle SQL-ETL-Skripte einbauen
■ E-Mail-Alert bei Fehler konfigurieren (Database Mail + SQL Agent Notification)
■ Retry-Logik für API-Quellen implementieren (Kapitel 6)
■ Checkpoint-Logik: bei Teilfehlern Wiederanlauf ab letztem Fehler-Schritt
■ TAG 18–20: MONITORING-DASHBOARD AUFBAUEN
■ Power BI Dashboard für ETL-Status: alle Jobs, Laufzeiten, Fehlerquoten
■ Stündlicher Freshness-Check: wann waren Daten zuletzt aktuell?
■ Automatischen Timeout-Alert einrichten: Job läuft > 2× Durchschnittslaufzeit → Alert
■ Tägliche ETL-Status-E-Mail konfigurieren: Zusammenfassung aller Jobs (Erfolg/Fehler/Zeilen)
■ TAG 21: SECURITY-REVIEW
■ Alle Verbindungsstrings aus Code entfernen — in Konfiguration oder Secret Store verlagern
■ Service-Account-Berechtigungen prüfen: hat jeder Account wirklich nur die nötigen Rechte?
■ DSGVO-Check: welche Staging-Tabellen enthalten personenbezogene Daten? Löschfristen setzen
■ Pseudonymisierung für personenbezogene Analysedaten implementieren (Kapitel 9)
■ TAG 22–25: PIPELINE-DOKUMENTATION
■ Pipeline-Dokumentationsblatt (Kapitel 8) für alle drei Pipelines vollständig ausfüllen
■ Testfälle je Pipeline dokumentieren — fünf Tests pro Pipeline als Mindestanforderung
■ Wiederherstellungsanleitung schreiben: Was tun wenn Job X fehlschlägt?
■ Lineage-Dokument anlegen (Lineage-Kit Kapitel 3 als Vorlage): Quelle bis Dashboard
■ TAG 26–28: GOVERNANCE-KALENDER ETABLIEREN
■ Monatlichen ETL-Review-Termin: Fehleranalyse, Performance-Trends, anstehende Quelländerungen
■ Quartalsweisen Schema-Check: Haben sich Quellstrukturen geändert?
■ Jährlichen Tool-Review: Ist das gewählte Tool noch das richtige? Gibt es bessere Alternativen?
■ Changemanagement-Prozess: jede Quelländerung wird vorab kommuniziert und im Test evaluiert
■ TAG 29–30: PRODUKTIONS-DEPLOYMENT UND ÜBERGABE
■ Finale Abnahme durch fachlichen Owner: Zahlen stimmen, Freshness stimmt, Monitoring aktiv
■ Alte manuelle Prozesse offiziell abschalten — nach 2 Wochen parallelem Betrieb
■ Team-Übergabe: Jede Pipline wurde von mindestens zwei Personen durchgegangen
■ Ergebnis kommunizieren: "Unsere Datenpipelines sind jetzt dokumentiert, überwacht und robust." ■
|
|
ERGEBNIS NACH 30 TAGEN Ihr Ergebnis nach 30 Tagen: Drei vollständig dokumentierte, robuste Datenpipelines mit Schema-Validierung, DQ-Checks, Fehlerbehandlung, automatischem Alerting und ETL-Monitoring-Dashboard. Keine manuellen Datenbeschaffungsprozesse mehr für die drei kritischen Quellen. Vollständige Lineage-Dokumentation. Und eine Governance-Struktur, die Änderungen an Quellsystemen rechtzeitig erkennt. |
Die in diesem Dokument enthaltenen Skripte, Architekturempfehlungen und Methoden wurden nach bestem Wissen und Gewissen auf Basis langjähriger praktischer Erfahrung in der Datenintegration erstellt. Dennoch kann keine Gewähr für Vollständigkeit, Richtigkeit oder universelle Anwendbarkeit übernommen werden.
Der Autor übernimmt keinerlei Haftung für Datenverlust, Systemausfälle, Datenpannen, DSGVO-Verstöße oder sonstige Schäden, die aus der Anwendung der beschriebenen Methoden und Skripte entstehen. Testen Sie alle Skripte und Prozesse in einer Nicht-Produktionsumgebung, bevor Sie sie produktiv einsetzen.
Die in Kapitel 9 beschriebenen Pseudonymisierungs- und Löschroutinen sind Implementierungsbeispiele und ersetzen keine DSGVO-Compliance-Prüfung durch einen Datenschutzbeauftragten. Für verbindliche DSGVO-Einschätzungen ziehen Sie einen Fachanwalt oder zertifizierten Datenschutzbeauftragten hinzu.
SQL-Skripte wurden auf SQL Server 2022 und 2025 getestet. Python-Code wurde mit Python 3.12 und den angegebenen Bibliotheksversionen getestet. Syntax und API-Verhalten können sich mit Softwareupdates ändern.
Dieses Dokument und alle Inhalte sind urheberrechtlich geschützt. © 2026 Sascha Hess, xenosystems.de. Alle Rechte vorbehalten.
SQL Server, SSIS, Azure Data Factory, Power Query und Power BI sind eingetragene Marken der Microsoft Corporation. dbt ist ein Produkt von dbt Labs, Inc. Python ist ein Produkt der Python Software Foundation. Apache Kafka und Apache Airflow sind eingetragene Marken der Apache Software Foundation. Alle anderen genannten Produkt- und Unternehmensnamen sind Eigentum ihrer jeweiligen Inhaber.
Es gilt ausschließlich deutsches Recht. Gerichtsstand für alle Streitigkeiten ist, soweit gesetzlich zulässig, Weimar, Thüringen, Deutschland.
Sascha Hess ist Diplom-Biologe und IT-Professional mit über 20 Jahren Erfahrung in der Administration von ERP-, BI- und Datenbanksystemen sowie in der Entwicklung und dem Betrieb von Datenpipelines für mittelständische Unternehmen. Er hat mehr als 60 ETL-Prozesse konzipiert, implementiert und in den Produktionsbetrieb überführt — von einfachen CSV-Imports bis zu vollständigen CDC-basierten DWH-Pipelines.
Sein Ansatz verbindet naturwissenschaftliche Präzision mit hochgradiger IT-Spezialisierung. Schwerpunkte: SQL Server Performance-Tuning, ERP-Einführungen, Business Intelligence (Power BI, DeltaMaster), Data Governance, ETL-Architektur und IT-Interim-Management.
Web: www.xenosystems.de | E-Mail: info@xenosystems.de | Standort: Weimar, Thüringen / Remote
|
Service |
Beschreibung |
|
ETL-Architektur & Aufbau |
Konzeption und Implementierung robuster Datenpipelines: Quellenanbindung, Drei-Schichten-Architektur, DQ-Checks, Fehlerbehandlung, Monitoring und vollständige Dokumentation. Scope: 4–8 Wochen. |
|
ETL-Audit & Optimierung |
Analyse bestehender ETL-Prozesse: Fehlerquellen, Performance-Schwachstellen, fehlende Dokumentation, DSGVO-Risiken — mit priorisiertem Maßnahmenplan. Scope: 2–3 Tage. |
|
dbt-Einführung |
Einführung von dbt Core in Ihre SQL-Transformationsumgebung — Projektstruktur, Modelle, Tests, Dokumentation und CI/CD-Integration. Scope: 3–5 Tage. |
|
SQL Server DB Health Check |
Professioneller Audit Ihres SQL Servers — Managementreport, Risikobewertung, Performance-Analyse und Maßnahmenplan. Scope: 3–5 Tage. |
|
Interim IT-Management |
Übernahme der IT-Steuerung auf Zeit — Budgetplanung, Dienstleister-Management, strategische IT-Ausrichtung. |
Vollständiges Dokument
Strategische Wissens-Roadmap · Checklisten · Praxisbeispiele
49,90 €
Sichere Zahlung über PayPal · Sofort-Download nach Zahlungseingang