Skip to content

Training

src.training.helper

Modul für das Setup des MLFlow Loggings.

configure_mlflow_tracking(tracking_uri='https://mlflowplus.jafar.live')

Konfiguriert die Verbindung zu MLFlow.

Parameters: - tracking_uri: URI des Tracking Servers.

Source code in src\training\helper.py
def configure_mlflow_tracking(tracking_uri: str = "https://mlflowplus.jafar.live"):
    """
    Konfiguriert die Verbindung zu MLFlow.

    Parameters:
    - tracking_uri: URI des Tracking Servers.
    """

    # Lade Authentifizierungsdaten für den Server
    load_dotenv(".env", override=True)

    # Setze die MLFlow Uri für das Logging
    mlflow.set_tracking_uri(tracking_uri)

create_default_training_pipeline(model_name)

Erstellt eine Standard-Trainingspipeline bestehend aus Preprocessing, Vektorisierung und finalem Modell/Estimator. Die Pipeline-Steps heißen entsprechend 'text_cleaner', 'vectorizer' und 'model'.

Parameters:

Name Type Description Default
model_name str

Die Abkürzung des gewünschten Modells (z.B. 'LR', 'RF').

required

Returns:

Type Description
Pipeline

sklearn.pipeline.Pipeline: Eine Pipeline mit TextCleaner, TfidfVectorizer und

Pipeline

finalem Modell/Estimator.

Source code in src\training\helper.py
def create_default_training_pipeline(model_name: str) -> Pipeline:
    """
    Erstellt eine Standard-Trainingspipeline bestehend aus Preprocessing, Vektorisierung
    und finalem Modell/Estimator. Die Pipeline-Steps heißen entsprechend 'text_cleaner',
    'vectorizer' und 'model'.

    Args:
        model_name (str): Die Abkürzung des gewünschten Modells (z.B. 'LR', 'RF').

    Returns:
        sklearn.pipeline.Pipeline: Eine Pipeline mit TextCleaner, TfidfVectorizer und
        finalem Modell/Estimator.
    """
    # Validiere, ob der übergebene Modellname ein unterstütztes Modell ist
    validate_model_support(model_name)

    # Lade ein entsprechendes Modell-Objekt
    model = load_model_by_name(model_name)

    # Erstelle die Default-Pipeline mit den drei Schritten 'text_cleaner', 'vectorizer'
    # und 'model'
    training_pipeline = Pipeline(
        [
            (
                "text_cleaner",
                TextCleaner(
                    language="english",
                    to_lowercase=True,
                    whitespace_normalization=True,
                    remove_punctuation=True,
                    replace_digits_with="",
                    replace_html_with="",
                    replace_mailaddresses_with="",
                    apply_lemmatization=True,
                ),
            ),
            ("vectorizer", TfidfVectorizer(max_features=5000,
                                           ngram_range=(1, 2),
                                           min_df=2,
                                           max_df=0.85,
                                           sublinear_tf=True)),
            ("model", model),
        ]
    )

    return training_pipeline

get_minio_client()

Initialisiert und gibt einen MinIO-Client basierend auf den gesetzten Umgebungs- variablen zurück.

Die Funktion liest hierfür die folgenden Umgebungsvariablen aus: - 'MLFLOW_S3_ENDPOINT_URL' - 'AWS_ACCESS_KEY_ID' - 'AWS_SECRET_ACCESS_KEY'

Sie überprüft dafür, ob alle benötigten Variablen vorhanden sind, bildet daraus den MinIO-Endpunkt und gibt eine Instanz des entsprechenden MinIO-Clients her.

Raises:

Type Description
EnvironmentError

Wenn eine oder mehrere benötigte Umgebungsvariablen fehlen.

ConnectionError

Wenn die Verbindung zum MinIO-Server nicht hergestellt

Returns:

Name Type Description
Minio Minio

Eine Instanz des MinIO-Clients.

Source code in src\training\helper.py
def get_minio_client() -> Minio:
    """
    Initialisiert und gibt einen MinIO-Client basierend auf den gesetzten Umgebungs-
    variablen zurück.

    Die Funktion liest hierfür die folgenden Umgebungsvariablen aus:
    - 'MLFLOW_S3_ENDPOINT_URL'
    - 'AWS_ACCESS_KEY_ID'
    - 'AWS_SECRET_ACCESS_KEY'

    Sie überprüft dafür, ob alle benötigten Variablen vorhanden sind, bildet daraus den
    MinIO-Endpunkt und gibt eine Instanz des entsprechenden MinIO-Clients her.

    Raises:
        EnvironmentError: Wenn eine oder mehrere benötigte Umgebungsvariablen fehlen.
        ConnectionError: Wenn die Verbindung zum MinIO-Server nicht hergestellt
        werden kann.

    Returns:
        Minio: Eine Instanz des MinIO-Clients.
    """

    # Überprüfe, ob alle benötigten Environment Variablen vorhanden sind
    required_vars = [
        "MLFLOW_S3_ENDPOINT_URL",
        "AWS_ACCESS_KEY_ID",
        "AWS_SECRET_ACCESS_KEY",
    ]

    # Erstelle eine Liste der fehlenden Variablen
    missing_vars = [var for var in required_vars if not os.environ.get(var)]

    # Werfe einen Fehler, wenn erforderliche Variablen fehlen
    if missing_vars:
        raise EnvironmentError(
            f"Fehlende, nicht gesetzte Environment-Variablen: {', '.join(missing_vars)}"
        )

    # Lese die MinIO Konfiguration aus den Environment Variablen ein
    minio_endpoint_url = urlparse(os.environ.get("MLFLOW_S3_ENDPOINT_URL")).netloc
    aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID")
    aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY")

    # Initialisiere den MinIO Client
    try:
        client = Minio(
            minio_endpoint_url,
            access_key=aws_access_key_id,
            secret_key=aws_secret_access_key,
            secure=True
        )
    except Exception as e:
        # Werfe einen Fehler, wenn die Verbindung zum MinIO Server fehlschlägt
        raise ConnectionError(
            f"Verbindung zum MinIO Server konnte nicht hergestellt werden: \n {e}"
        )

    # Gib den MinIO Client zurück
    return client

get_model_name(pipeline)

Gibt den Namen des in der Pipeline verwendeten Modells zurück.

Parameters:

Name Type Description Default
pipeline Pipeline

Ein scikit-learn Pipeline Objekt mit einem Schritt 'model'.

required

Returns:

Name Type Description
str str

Modellname.

Raises:

Type Description
ValueError

Wenn das Modell nicht unterstützt wird.

Source code in src\training\helper.py
def get_model_name(pipeline: Pipeline) -> str:
    """
    Gibt den Namen des in der Pipeline verwendeten Modells zurück.

    Args:
        pipeline (Pipeline): Ein scikit-learn Pipeline Objekt mit einem Schritt 'model'.

    Returns:
        str: Modellname.

    Raises:
        ValueError: Wenn das Modell nicht unterstützt wird.
    """
    # Überprüfe, ob das Pipeline Objekt einen Schritt 'vectorizer' enthält
    if "model" not in pipeline.named_steps:
        raise ValueError(
            "Die übergebene Pipeline hat keinen Schritt mit dem Namen 'model'."
        )

    # Extrahiere das Modell aus der Pipeline
    model = pipeline.named_steps.get("model")

    # Überprüfe, ob das Modell eines von den erlaubten Modellen ist
    if isinstance(model, SVC):
        return "SVC"
    elif isinstance(model, LinearSVC):
        return "Linear_SVC"
    elif isinstance(model, KNeighborsClassifier):
        return "KNN"
    elif isinstance(model, LogisticRegression):
        return "LR"
    elif isinstance(model, MultinomialNB):
        return "MNB"
    elif isinstance(model, ComplementNB):
        return "CNB"
    elif isinstance(model, RandomForestClassifier):
        return "RF"
    elif isinstance(model, DecisionTreeClassifier):
        return "DT"
    else:
        raise ValueError(
            "Nicht unterstützter Modelltyp. Aktuell werden nur SVC, LinearSVC, "
            "KNN, LogisticRegression, ++MultinomialNB, ComplementNB, DecisionTree "
            "und RandomForestClassifier unterstützt."
        )

get_vectorizer_name(pipeline)

Gibt den Namen des in einer Pipeline verwendeten Vektorisierers zurück.

Parameters:

Name Type Description Default
pipeline Pipeline

Ein scikit-learn Pipeline Objekt mit einem Schritt mit dem

required

Returns:

Name Type Description
str str

Name des Vektorisierers.

Raises:

Type Description
ValueError

Wenn der Vektorisierer nicht unterstützt wird.

Source code in src\training\helper.py
def get_vectorizer_name(pipeline: Pipeline) -> str:
    """
    Gibt den Namen des in einer Pipeline verwendeten Vektorisierers zurück.

    Args:
        pipeline (Pipeline): Ein scikit-learn Pipeline Objekt mit einem Schritt mit dem
        Namen 'vectorizer'.

    Returns:
        str: Name des Vektorisierers.

    Raises:
        ValueError: Wenn der Vektorisierer nicht unterstützt wird.
    """
    # Überprüfe, ob das Pipeline Objekt einen Schritt 'vectorizer' enthält
    if "vectorizer" not in pipeline.named_steps:
        raise ValueError(
            "Die übergebene Pipeline hat keinen Schritt mit dem Namen 'vectorizer'."
        )

    # Lade den vectorizer aus der Pipeline
    vectorizer = pipeline.named_steps.get("vectorizer")

    # Überprüfe, ob der vectorizer von einem erlaubten Typ ist
    if isinstance(vectorizer, TfidfVectorizer):
        return "TF-IDF Vectorizer"
    raise ValueError(
        "Nicht unterstützter Vektorisierer. Aktuell wird nur der TF-IDF Vectorizer"
        " unterstützt."
    )

import_from_string(dotted_path)

Importiert ein Python-Objekt (Klasse oder Funktion) über einen Punktnotations- String, der auf das entsprechende Objekt zeigt.

Beispiel: 'sklearn.linear_model.LogisticRegression'

Parameters:

Name Type Description Default
dotted_path str

Der gesamte Pfad zum zu importierenden Modul/Objekt.

required

Returns:

Name Type Description
Any

Das importierte Python-Objekt (z.B. Klasse oder Funktion).

Source code in src\training\helper.py
def import_from_string(dotted_path: str):
    """
    Importiert ein Python-Objekt (Klasse oder Funktion) über einen Punktnotations-
    String, der auf das entsprechende Objekt zeigt.

    Beispiel: 'sklearn.linear_model.LogisticRegression'

    Args:
        dotted_path (str): Der gesamte Pfad zum zu importierenden Modul/Objekt.

    Returns:
        Any: Das importierte Python-Objekt (z.B. Klasse oder Funktion).
    """
    # Aufteilen des Pfads in den Modulpfad und den Objektpfad
    module_path, object_name = dotted_path.rsplit(".", 1)

    # Importieren des Moduls und Zugriff auf das gewünschte Objekt
    module = importlib.import_module(module_path)
    return getattr(module, object_name)

is_model_supported(model_name)

Prüft, ob ein bestimmter Modellname in der Konfigurationsdatei als unterstützter Modelltyp definiert ist.

Parameters:

Name Type Description Default
model_name str

Der Name des Modells, z.B. 'SVM' oder 'LR'.

required

Returns:

Name Type Description
bool bool

True, wenn das Modell unterstützt wird, sonst False.

Source code in src\training\helper.py
def is_model_supported(model_name: str) -> bool:
    """
    Prüft, ob ein bestimmter Modellname in der Konfigurationsdatei als unterstützter
    Modelltyp definiert ist.

    Args:
        model_name (str): Der Name des Modells, z.B. 'SVM' oder 'LR'.

    Returns:
        bool: True, wenn das Modell unterstützt wird, sonst False.
    """
    # Lade die Konfigurationsdatei
    config = load_config("config/training_config.yaml")

    # Prüfen, ob der Modellname in der Konfigurationsdatei enthalten ist
    return model_name in config["models"]

load_default_param_grid(model_name)

Lädt das Standard-Hyperparametergrid für ein Modell aus der YAML-Konfiguration.

Parameters:

Name Type Description Default
model_name str

Abkürzung des Namens des Modells.

required

Returns:

Name Type Description
dict dict

Dictionary mit Hyperparametern für z.B. GridSearchCV.

Source code in src\training\helper.py
def load_default_param_grid(model_name: str) -> dict:
    """
    Lädt das Standard-Hyperparametergrid für ein Modell aus der YAML-Konfiguration.

    Args:
        model_name (str): Abkürzung des Namens des Modells.

    Returns:
        dict: Dictionary mit Hyperparametern für z.B. GridSearchCV.
    """
    # Validiere, ob der übergebene Modellname ein unterstütztes Modell ist
    validate_model_support(model_name)

    # Lade das Default Parametergrid für das entsprechende Modell
    default_param_grids = load_config("config/training_config.yaml")[
        "default_param_grids"
    ]

    return default_param_grids[model_name]

load_model_by_name(model_name, **kwargs)

Lädt ein Modellobjekt anhand des Modellnamens und initialisiert es mit optionalen Parametern.

Parameters:

Name Type Description Default
model_name str

Abkürzung des Modells, z.B. 'LR'.

required
**kwargs

Zusätzliche Initialisierungsparameter für das Modell, falls gewünscht.

{}

Returns:

Type Description

sklearn.base.BaseEstimator: Initialisiertes Modellobjekt.

Source code in src\training\helper.py
def load_model_by_name(model_name: str, **kwargs):
    """
    Lädt ein Modellobjekt anhand des Modellnamens und initialisiert es mit optionalen
    Parametern.

    Args:
        model_name (str): Abkürzung des Modells, z.B. 'LR'.

        **kwargs: Zusätzliche Initialisierungsparameter für das Modell, falls gewünscht.

    Returns:
        sklearn.base.BaseEstimator: Initialisiertes Modellobjekt.
    """
    # Validiere, ob der übergebene Modellname ein unterstütztes Modell ist
    validate_model_support(model_name)

    # Lade das Modell-Klassen Mapping und initialisiere das Modell
    model_mapping = load_model_mapping()
    model = model_mapping[model_name]

    return model(**kwargs)

load_model_mapping()

Lädt das Abkürzung-Klassenpfad Modell-Mapping aus der YAML-Konfiguration und gibt ein Dictionary zurück, das die Modellnamen auf die Python-Objekte mappt.

Returns:

Name Type Description
dict dict

Mapping von Modellnamen auf entsprechende Python-Klassen.

Source code in src\training\helper.py
def load_model_mapping() -> dict:
    """
    Lädt das Abkürzung-Klassenpfad Modell-Mapping aus der YAML-Konfiguration und gibt
    ein Dictionary zurück, das die Modellnamen auf die Python-Objekte mappt.

    Returns:
        dict: Mapping von Modellnamen auf entsprechende Python-Klassen.
    """
    # Lade das Mapping aus der training_config.yaml Datei
    raw_mapping = load_config("config/training_config.yaml")["model_mapping"]

    # Ändere die String-Repräsentationen in echte Python-Objekte um
    resolved_mapping = {
        key: import_from_string(import_path) for key, import_path in raw_mapping.items()
    }

    return resolved_mapping

load_prefixed_default_param_grid(model_name)

Lädt das Hyperparametergrid und fügt jedem Hyperparameter den Präfix 'model__' hinzu. Erforderlich für die 'Pipeline' Kompatibilität mit u.a. mit 'GridSearchCV'.

Parameters:

Name Type Description Default
model_name str

Abkürzung des Modells.

required

Returns:

Name Type Description
dict dict

Voreingestelltes Default-Parametergrid, bereit für die Benutzung mit

dict

einer sklearn Pipeline.

Source code in src\training\helper.py
def load_prefixed_default_param_grid(model_name: str) -> dict:
    """
    Lädt das Hyperparametergrid und fügt jedem Hyperparameter den Präfix
    'model__' hinzu. Erforderlich für die 'Pipeline' Kompatibilität mit u.a.
    mit 'GridSearchCV'.

    Args:
        model_name (str): Abkürzung des Modells.

    Returns:
        dict: Voreingestelltes Default-Parametergrid, bereit für die Benutzung mit
        einer sklearn Pipeline.
    """
    # Lade das Default-Parametergrid für das entsprechende Modell aus der config Datei
    raw_grid = load_default_param_grid(model_name)

    # Füge für jeden Parameter den Präfix 'model__' hinzu
    prefixed_grid = {f"model__{k}": v for k, v in raw_grid.items()}

    return prefixed_grid

parse_classification_report_to_dataframes(report)

Erstellt aus einem sklearn Klassifikationsreport einen pandas Dataframe. Der Dataframe enthält den gesamten Klassifikationsreport bis auf die Accuracy-Zeile.

Die erzeugten DataFrames sind zusätzlich wie folgend formatiert: - Großgeschriebene Spaltenüberschriften - Prozentformatierung mit zwei Nachkommastellen - Keine 'support'-Spalte - Großgeschriebene Namen im zusammenfassenden DataFrame - Leere Zeichenketten anstelle von None/NaN-Werten

Parameters:

Name Type Description Default
report dict

Der Klassifikationsreport als verschachteltes Dictionary aus 'sklearn.metrics.classification_report(output_dict=True)'.

required

Returns:

Type Description
DataFrame
  • df (pd.DataFrame): Klassifikationsreport als Dataframe.
Source code in src\training\helper.py
def parse_classification_report_to_dataframes(
            report: dict
        ) -> pd.DataFrame:
    """
    Erstellt aus einem sklearn Klassifikationsreport einen pandas Dataframe. Der
    Dataframe enthält den gesamten Klassifikationsreport bis auf die Accuracy-Zeile.

    Die erzeugten DataFrames sind zusätzlich wie folgend formatiert:
    - Großgeschriebene Spaltenüberschriften
    - Prozentformatierung mit zwei Nachkommastellen
    - Keine 'support'-Spalte
    - Großgeschriebene Namen im zusammenfassenden DataFrame
    - Leere Zeichenketten anstelle von None/NaN-Werten

    Args:
        report (dict): Der Klassifikationsreport als verschachteltes Dictionary aus
                       'sklearn.metrics.classification_report(output_dict=True)'.

    Returns:
        - df (pd.DataFrame): Klassifikationsreport als Dataframe.
    """

    # Initialisierung der individuellen Dictionaries
    metrics = report
    if metrics.get("accuracy"):
        metrics.pop("accuracy")

    # Übertragen der Inhalte des Klassifikationsreports in die beiden Dictionaries
    filtered_metrics = {}
    for key, value in metrics.items():
        if (
            isinstance(value, dict) and
            all(k in value for k in ['precision', 'recall', 'f1-score'])
        ):
            # Filtere gültige Klassennamen mit vollständigen Metriken
            filtered_metrics[key] = value

    # Erstelle den DataFrame für Metriken pro Klasse
    df = pd.DataFrame.from_dict(filtered_metrics, orient='index').reset_index()
    df = df.rename(columns={
        'index': 'Name',
        'precision': 'Precision (%)',
        'recall': 'Recall (%)',
        'f1-score': 'F1-Score (%)'
    })
    df = df[['Name', 'Precision (%)', 'Recall (%)', 'F1-Score (%)']]

    # Umbenennung der Average Row Namen
    rename_map = {
        'macro avg': 'Arithmetischer Durchschnitt',
        'weighted avg': 'Gewichteter Durchschnitt'
    }

    df['Name'] = df['Name'].replace(rename_map)

    # Konvertiere die Metriken in Prozentwerte mit zwei Dezimalstellen
    for col in ['Precision (%)', 'Recall (%)', 'F1-Score (%)']:
        df_col = df[col]
        df[col] = df_col.apply(lambda x: round(x * 100, 2) if pd.notnull(x) else np.nan)

    # Gebe den Dataframe zurück
    return df

style_df(df)

Eine einheitliche Styling-Funktion für pd.Dataframes, welche im Dashboard angezeigt werden sollen. Die Werte in dem übergebenen Dataframe werden gemäß ihres Wertes nach der colormap 'Flare' farblich codiert. Außerdem werden alle Einträge im Dataframe aufg die zweite Nachkommastelle gerundet.

Parameters:

Name Type Description Default
df DataFrame

Der Dataframe, auf den das Styling angewendet werden soll.

required

Returns:

Name Type Description
df DataFrame

Den Dataframe mit angewendetem Styling.

Source code in src\training\helper.py
def style_df(df: pd.DataFrame):
    """
    Eine einheitliche Styling-Funktion für pd.Dataframes, welche im Dashboard angezeigt
    werden sollen. Die Werte in dem übergebenen Dataframe werden gemäß ihres Wertes
    nach der colormap 'Flare' farblich codiert. Außerdem werden alle Einträge im
    Dataframe aufg die zweite Nachkommastelle gerundet.

    Args:
        df (pd.DataFrame): Der Dataframe, auf den das Styling angewendet werden soll.

    Returns:
        df (pd.DataFrame): Den Dataframe mit angewendetem Styling.
    """
    def bold_last_two_rows(row):
        if row.name >= len(df) - 2:
            return ['font-weight: bold'] * len(row)
        else:
            return [''] * len(row)

    columns = ['Precision (%)', 'Recall (%)', 'F1-Score (%)']
    flare_cmap = sns.color_palette("flare", as_cmap=True)

    return (
        df.style
        .format({
            'Precision (%)': '{:.2f}',
            'Recall (%)': '{:.2f}',
            'F1-Score (%)': '{:.2f}'
        }, na_rep="")
        .background_gradient(cmap=flare_cmap, subset=columns)
        .apply(bold_last_two_rows, axis=1)
    )

validate_model_support(model_name)

Validiert, ob ein Modell mit dem übergebenen Modellnamen unterstützt wird. Falls nicht, wird eine Exception geworfen.

Parameters:

Name Type Description Default
model_name str

Der Modellname, der überprüft werden soll.

required

Raises:

Type Description
ValueError

Falls das Modell nicht in der Konfiguration definiert ist.

Source code in src\training\helper.py
def validate_model_support(model_name: str):
    """
    Validiert, ob ein Modell mit dem übergebenen Modellnamen unterstützt wird. Falls
    nicht, wird eine Exception geworfen.

    Args:
        model_name (str): Der Modellname, der überprüft werden soll.

    Raises:
        ValueError: Falls das Modell nicht in der Konfiguration definiert ist.
    """
    if not is_model_supported(model_name):
        config = load_config("config/training_config.yaml")
        raise ValueError(
            f"Nicht unterstützter Modelltyp: '{model_name}'.\n"
            f"Derzeit unterstützte Modelltypen sind: \n"
            + "\n".join(f"- {m}" for m in config["models"])
        )

src.training.logging.Logger

Verwaltet das Tracking und Logging von Experimenten mit MLflow.

Source code in src\training\logging.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
class Logger:
    """Verwaltet das Tracking und Logging von Experimenten mit MLflow."""

    def __init__(self, pipeline: Pipeline = None, experiment_name: str = None):
        """
        Initialisiert den Logger mit Pipeline-Metadaten und setzt das MLflow-Experiment.

        Args:
            pipeline (Pipeline): Ein sklearn-Pipeline-Objekt.
            experiment_name (str, optional): Name des MLflow-Experiments. Wenn None,
                wird der Name vom Vectorizer abgeleitet.
        """
        # Lade die Umgebungsvariablen
        load_dotenv(".env")

        # Initialisiere die Instanzvariablen
        self.active_run = None
        self.active_child_run = None
        self.trained_pipeline = None

        # Setze den Experimentnamen
        if experiment_name:
            self.experiment_name = experiment_name
        else:
            self.experiment_name = get_vectorizer_name(pipeline)

        # Setze den Run-Namen. Dieser wird zunächst aus dem Modellnamen abgeleitet
        if pipeline:
            self.untrained_pipeline = pipeline
            self.run_name = get_model_name(pipeline)

        # Setze das MLflow Experiment
        mlflow.set_experiment(self.experiment_name)

    def start_run(self, run_name: str = None):
        """
        Startet einen neuen MLflow-Run und loggt die Pipeline-Parameter.

        Args:
            run_name (str): Name des MLflow-Runs.
        """
        # Falls schon ein Run gestartet wurde
        if self.active_run:
            pass
        # Falls noch kein Run gestartet wurde und ein custom Run-Name übergeben wurde
        elif run_name:
            self.run_name = run_name
            self.active_run = mlflow.start_run(run_name=run_name)
        # Falls noch kein Run gestartet wurde und kein custom Run-Name übergeben wurde.
        # Default Name: Modell der Pipeline
        else:
            self.active_run = mlflow.start_run(run_name=self.run_name)

        #self._log_pipeline_parameters(self.untrained_pipeline)

    def start_nested_run(self, run_name: str):
        """
        Startet einen verschachtelten MLflow-Run unterhalb des bereits aktivierten Runs.

        Args:
            run_name (str): Name des verschachtelten Runs.
        """
        self.active_child_run = mlflow.start_run(run_name=run_name, nested=True)

    def end_run(self):
        """
        Beendet den aktuellen MLflow-Run. Sofern ein verschachtelter MLflow-(Child-)Run
        aktiv ist, wird zuerst dieser beendet.
        """
        mlflow.end_run()

    def end_nested_run(self):
        """
        Beendet den aktuell aktiven, verschachtelten MLflow-(Child-)Run.
        """
        if self.active_child_run:
            mlflow.end_run()
        else:
            raise RuntimeError("Kein verschachtelter MLflow-Run aktiv.")

    def _log_f1_score(self, f1_score):
        """
        Loggt den F1-Score als Metrik in MLflow.

        Args:
            f1_score (float): Zu loggender F1-Score.
        """
        mlflow.log_metric("F1-Score", f1_score)

    def _log_accuracy_score(self, acc):
        """
        Loggt den Accuracy-Score als Metrik in MLflow.

        Args:
            acc (float): Zu loggender Accuracy-Score.
        """
        mlflow.log_metric("Accuracy", acc)

    def _log_precision_score(self, prec):
        """
        Loggt die Precision als Metrik in MLflow.

        Args:
            prec (float): Zu loggender Präzisions-Score.
        """
        mlflow.log_metric("Precision", prec)

    def _log_recall_score(self, rec):
        """
        Loggt den Recall-Score als Metrik in MLflow.

        Args:
            rec (float): Zu loggender Recall-Score.
        """
        mlflow.log_metric("Recall", rec)

    def log_param_grid(self, param_grid: dict):
        """
        Loggt das Hyperparameter-Grid eines GridSearch.

        Args:
            param_grid (dict): Parametergrid des GridSearch.
        """
        mlflow.log_param("Grid Search Parametergrid", param_grid)

    def log_duration(self, start_time: float, end_time: float):
        """
        Loggt die Trainingsdauer in Sekunden.

        Args:
            start_time (float): Startzeit in Sekunden.
            end_time (float): Endzeit in Sekunden.
        """
        duration = round(end_time - start_time, 1)
        mlflow.log_param("Trainingsdauer in Sekunden", duration)

    def log_gridsearch_hyperparameters(self, grid_search: GridSearchCV):
        """
        Loggt die besten Hyperparameter einer durchgeführten GridSearchCV.

        Args:
            grid_search (GridSearchCV): Ausgeführtes GridSearchCV-Objekt.
        """
        # Überprüfe, ob das übergebene GridSearchCV Objekt gefitted wurde
        if not hasattr(grid_search, "best_params_"):
            raise AttributeError(
                "Übergebenes GridSearchCV Objekt wurde nicht gefitted."
            )

        # Logge jeden einzelnen der besten Hyperparameter als Parameter
        for param_name, param_value in grid_search.best_params_.items():
            mlflow.log_param(f"Best Parameter Value - {param_name}", param_value)

    def log_fold_count(self, num_folds: int):
        """
        Loggt die Anzahl der Cross-Validation-Folds.

        Args:
            num_folds (int): Anzahl der Folds.
        """
        mlflow.log_param("Anzahl äußerer Folds", num_folds)

    def get_experiment_name(self):
        """
        Gibt den Namen des Experiments zurück, zu dem dieser Logger gehört.

        Returns:
            str: Name des MLflow-Experiments.
        """
        return self.experiment_name

    def get_run_id(self):
        if self.active_child_run:
            return self.active_child_run.info.run_id
        elif self.active_run:
            return self.active_run.info.run_id
        else:
            raise ValueError("Kein aktiver MLflow-Run gestartet.")

    def log_mean_f1(self, mean_f1: float):
        """
        Loggt den durchschnittlichen F1-Score.

        Args:
            mean_f1 (float): Durchschnittlicher F1-Score.
        """
        mlflow.log_param("Durchschnittlicher F1-Score", mean_f1)

    def log_std_f1(self, std_f1: float):
        """
        Loggt die Standardabweichung des durschschnittlichen F1-Scores.

        Args:
            std_f1 (float): Standardabweichung des durchschnittlichen F1-Scores.
        """
        mlflow.log_param("Standardabweichung durchschnittlicher F1-Score", std_f1)

    def log_best_f1(self, fold_id, f1_score):
        """
        Loggt den Fold mit dem höchsten F1-Score, der bei einer Kreuzvalidierung
        erreicht wurde.

        Args:
            f1_score (float): Der höchste F1-Score.
        """
        mlflow.log_param("Bester Fold", fold_id)
        mlflow.log_param("Höchster F1-Score eines Folds", f1_score)

    def log_scores(
        self, f1_macro: float, prec_macro: float, rec_macro: float, acc: float
    ):
        """
        Loggt alle relevanten Klassifikationsmetriken in MLflow.

        Args:
            f1_macro (float): Makro-F1-Score.
            prec_macro (float): Makro-Precision.
            rec_macro (float): Makro-Recall.
            acc (float): Genauigkeit (Accuracy).
        """
        # Überprüfe, dass alle Metriken übergeben wurden
        for name, value in locals().items():
            if value is None:
                raise ValueError("Eine der notwendigen Metriken wurde nicht übergeben.")

        # Logge die Metriken in MLflow
        self._log_f1_score(f1_macro)
        self._log_precision_score(prec_macro)
        self._log_recall_score(rec_macro)
        self._log_accuracy_score(acc)

    def _log_pipeline_parameters(self, pipeline: Pipeline):
        """
        Loggt die Parameter jedes Schritts der Pipeline in MLflow.

        Args:
            pipeline (Pipeline): Ein scikit-learn Pipeline Objekt.
        """
        # Überprüfe, ob die Pipeline Schritte enthält
        if not len(pipeline.steps) > 0:
            raise ValueError("Die übergebene Pipeline hat keine Schritte definiert.")

        # Logge die Schritte und deren Parameter
        for step_name, step_obj in pipeline.named_steps.items():
            params = step_obj.get_params()
            for param_name, param_value in params.items():
                # Wandle alle Parameternamen in Strings um
                key = f"{step_name} - {param_name}"
                mlflow.log_param(key, str(param_value))

        if 'undersampler' in pipeline.named_steps.keys():
            undersampler = pipeline.named_steps['undersampler']
            mlflow.log_param('undersampler - type', type(undersampler).__name__)

        if 'oversampler' in pipeline.named_steps.keys():
            oversampler = pipeline.named_steps['oversampler']
            mlflow.log_param('oversampler - type', type(oversampler).__name__)

        if 'model' in pipeline.named_steps.keys():
            model = pipeline.named_steps['model']
            mlflow.log_param('model - type', type(model).__name__)

    def log_confusion_matrix(
        self,
        y_true,
        y_pred,
        labels=None,
        artifact_name="Konfusionsmatrix.png",
    ):
        """
        Erstellt und loggt eine Konfusionsmatrix als .png Datei.

        Args:
            y_true (array-like): Wahre Labels.

            y_pred (array-like): Vorhergesagte Labels.

            labels (list, optional): Labelnamen. Falls None werden diese aus y_true
            extrahiert.

            artifact_name (str, optional): Dateiname der zu loggenden Bilddatei.
        """
        # Extrahiere alle vorkommenden Labels
        if not labels:
            labels = y_true.unique()

        # Berechne die Konfusionsmatrix
        cm = confusion_matrix(y_true, y_pred, labels=labels)

        # Erstellen der pyplot figure mit passenden Farb-Settings für das Streamlit
        # Dashboard
        streamlit_backgound_color = "#0E1117" # Streamlit Dark Mode Hintergrund
        fig, ax = plt.subplots(figsize=(8, 6))
        fig.patch.set_facecolor(streamlit_backgound_color)
        ax.set_facecolor(streamlit_backgound_color)
        cmap = sns.color_palette("flare", as_cmap=True)

        # Erstellen der Konfusionsmatrix-Heatmap
        heatmap = sns.heatmap(
            cm,
            annot=True,
            fmt="d",
            cmap=cmap,
            xticklabels=labels,
            yticklabels=labels,
            ax=ax,
            cbar=True
        )

        # Konfiguration der Achsenbeschriftung
        label_color = "#D5E5F4" # High Contrast Farbe passend zum Hintergrund
        ax.set_xlabel("Vorhergesagte Klasse", color=label_color)
        ax.set_ylabel("Tatsächliche Klasse", color=label_color)
        ax.tick_params(colors=label_color)

        # Konfiguration der Farbe der Spine-Labels
        for spine in ax.spines.values():
            spine.set_color(label_color)

        # Konfiguration der Farbe der Labels der Colorbar der Heatmap
        cbar = heatmap.collections[0].colorbar
        cbar.ax.tick_params(labelcolor=label_color)

        # Anpassung des pyplot layouts sodass die nichts am Rand abgeschnitten ist
        plt.tight_layout()

        # Logge die figure als .png in MLflow
        mlflow.log_figure(fig, artifact_file=artifact_name)
        print("Die Konfusionsmatrix wurde erfolgreich unter"
              f" '{artifact_name}' gespeichert.")

        # Schließt die Figure um den Speicher nicht unnötig zu belegen
        plt.close(fig)

        # Logge die figure als .pkl in MLflow
        pickle_file = "Konfusionsmatrix.pkl"
        with open(pickle_file, "wb") as f:
            pickle.dump(fig, f)
        mlflow.log_artifact(pickle_file)

        return fig

    def log_model(self, model, X_train=None, create_signature: bool=False):
        """
        Protokolliert ein trainiertes ML-Modell als Artefakt in einem MLflow-Run.

        Args:
            model: Das trainierte Modellobjekt (muss von mlflow.sklearn unterstützt
            werden).
        """
        self.trained_pipeline = model
        self._log_pipeline_parameters(self.trained_pipeline)

        model_info = None
        if X_train is not None and create_signature:
            model_info = mlflow.sklearn.log_model(
                model,
                artifact_path=f"{get_model_name(model)}_pipeline",
                signature=mlflow.models.infer_signature(
                    X_train, model.predict(X_train)
                ),
            )
        else:
            model_info = mlflow.sklearn.log_model(
                model, artifact_path=f"{get_model_name(model)}_pipeline"
            )

        self._log_model_uri(model_info.model_uri)

        run_id = self.get_run_id()
        print(
            f"Das trainierte Modell wurde erfolgreich unter der Run-ID '{run_id}'"
            " gespeichert.\n"
        )

    def log_classification_report(
            self,
            y_test,
            y_pred,
            artifact_name: str = "Klassifikationsreport.json"
    ) -> dict:
        """
        Erstellt einen Klassifikationsreport im Dictionary Format und logged ihn als
        MLflow-Artefakt. Der Dictionary-Klassifikationsreport wird anschließend
        zurückgegeben.

        Args:
            y_test (array-like): Wahre Zielwerte.

            y_pred (array-like): Vorhergesagte Zielwerte.

            artifact_name (str, optional): Dateiname für das gespeicherte Artefakt.
                Default ist "Klassifikationsreport.json".

        Returns:
            dict: Der Klassifikationsreport als Dictionary.
        """

        # Erstelle den Klassifikationsreport als Dictionary
        cr_dict = classification_report(y_test,
                                        y_pred,
                                        zero_division=0,
                                        output_dict=True)

        # Schreibe den Inhalt in einen In-Memory Text-Puffer
        buffer = io.StringIO()
        json.dump(cr_dict, buffer)

        # Konvertiere den Text-Puffer in einen Byte-Puffer
        buffer.seek(0)
        byte_buffer = io.BytesIO(buffer.getvalue().encode('utf-8'))

        # Erstelle eine temporäre Datei, die später gelogged werden soll.
        # Speichere den Inhalt des Byte-Puffers in der Datei.
        with tempfile.NamedTemporaryFile(delete=False, mode='wb') as temp_file:
            temp_file.write(byte_buffer.getvalue())

            # Merken des Dateinamens
            temp_file_name = temp_file.name

        # Falls Datei mit Zielnamen bereits existiert, lösche sie
        if os.path.exists(artifact_name):
            os.remove(artifact_name)

        # Gebe der temporären Datei den gewünschten Dateinamen
        shutil.move(temp_file_name, artifact_name)

        # Logge die Datei als Mlflow Artefakt
        try:
            mlflow.log_artifact(artifact_name)
            print("Der Klassifikationsreport wurde erfolgreich unter "
                  f"'{artifact_name}' gespeichert.")
        except Exception as e:
            # Fehler beim Logging
            print(f"Fehler beim loggen des Artifakts {artifact_name} in Mlflow: {e}")
        finally:
            # Löschen der lokalen, temporären Datei nach dem Logging
            os.remove(artifact_name)

        return cr_dict

    def log_dataset_name(self, dataset_name: str):
        """
        Logged den Namen der Datensatz-Datei, die für ein Training verwendet wurde.

        Args:
            dataset_name (str): Der Name der Datensatz-Datei.
        """
        mlflow.log_param("dataset - name", dataset_name)

    def log_resampler(self, resampler):
        """
        Loggt die Parameter eines Resamplers.

        Args:
            resampler: Ein imbalanced-learn Resampler Objekt.
        """
        # Überprüfe, ob die der Resampler die Funktion get_params() besitzt
        if not isinstance(resampler, BaseSampler):
            raise ValueError("Das übergebene Objekt ist kein imblearn Resampler.")

        # Logge den Resampler Typ
        type = None
        if isinstance(resampler, BaseOverSampler):
            type = "oversampler"
            mlflow.log_param(type, str(type(resampler).__name__))
        elif isinstance(resampler, BaseUnderSampler):
            type = "undersampler"
            mlflow.log_param(type, str(type(resampler).__name__))

        # Logge die Parameter des Resamplers
        params = resampler.get_params()
        for param_name, param_value in params.items():
                # Wandle alle Parameternamen in Strings um
                if type:
                    key = f"{type} - {param_name}"
                    mlflow.log_param(key, str(param_value))
                else:
                    raise ValueError("Das übergebene Objekt ist weder ein Oversampler"
                                     " noch ein Undersampler.")

    def _log_model_uri(self, model_uri: str):
        mlflow.log_param("model_uri", model_uri)

    def log_hyperopt_space_description(self, space_description: dict):
        mlflow.log_param("Hyperopt_Space", space_description)

    def log_hyperopt_result(self, best: dict):
        mlflow.log_param("Hyperopt_FMin_Result:", best)

    def log_train_f1_score(self, train_score: float):
        mlflow.log_metric("F1-Score - Training", train_score)

    def log_dataset_language(self, lang: str):
        if lang == "en":
            mlflow.log_param("sprache", "English")
        elif lang =="de":
            mlflow.log_param("sprache", "Deutsch")
        else:
            raise ValueError(
                f"Die Sprache {lang} des Datensatzes wird nicht unterstützt."
            )

__init__(pipeline=None, experiment_name=None)

Initialisiert den Logger mit Pipeline-Metadaten und setzt das MLflow-Experiment.

Parameters:

Name Type Description Default
pipeline Pipeline

Ein sklearn-Pipeline-Objekt.

None
experiment_name str

Name des MLflow-Experiments. Wenn None, wird der Name vom Vectorizer abgeleitet.

None
Source code in src\training\logging.py
def __init__(self, pipeline: Pipeline = None, experiment_name: str = None):
    """
    Initialisiert den Logger mit Pipeline-Metadaten und setzt das MLflow-Experiment.

    Args:
        pipeline (Pipeline): Ein sklearn-Pipeline-Objekt.
        experiment_name (str, optional): Name des MLflow-Experiments. Wenn None,
            wird der Name vom Vectorizer abgeleitet.
    """
    # Lade die Umgebungsvariablen
    load_dotenv(".env")

    # Initialisiere die Instanzvariablen
    self.active_run = None
    self.active_child_run = None
    self.trained_pipeline = None

    # Setze den Experimentnamen
    if experiment_name:
        self.experiment_name = experiment_name
    else:
        self.experiment_name = get_vectorizer_name(pipeline)

    # Setze den Run-Namen. Dieser wird zunächst aus dem Modellnamen abgeleitet
    if pipeline:
        self.untrained_pipeline = pipeline
        self.run_name = get_model_name(pipeline)

    # Setze das MLflow Experiment
    mlflow.set_experiment(self.experiment_name)

end_nested_run()

Beendet den aktuell aktiven, verschachtelten MLflow-(Child-)Run.

Source code in src\training\logging.py
def end_nested_run(self):
    """
    Beendet den aktuell aktiven, verschachtelten MLflow-(Child-)Run.
    """
    if self.active_child_run:
        mlflow.end_run()
    else:
        raise RuntimeError("Kein verschachtelter MLflow-Run aktiv.")

end_run()

Beendet den aktuellen MLflow-Run. Sofern ein verschachtelter MLflow-(Child-)Run aktiv ist, wird zuerst dieser beendet.

Source code in src\training\logging.py
def end_run(self):
    """
    Beendet den aktuellen MLflow-Run. Sofern ein verschachtelter MLflow-(Child-)Run
    aktiv ist, wird zuerst dieser beendet.
    """
    mlflow.end_run()

get_experiment_name()

Gibt den Namen des Experiments zurück, zu dem dieser Logger gehört.

Returns:

Name Type Description
str

Name des MLflow-Experiments.

Source code in src\training\logging.py
def get_experiment_name(self):
    """
    Gibt den Namen des Experiments zurück, zu dem dieser Logger gehört.

    Returns:
        str: Name des MLflow-Experiments.
    """
    return self.experiment_name

log_best_f1(fold_id, f1_score)

Loggt den Fold mit dem höchsten F1-Score, der bei einer Kreuzvalidierung erreicht wurde.

Parameters:

Name Type Description Default
f1_score float

Der höchste F1-Score.

required
Source code in src\training\logging.py
def log_best_f1(self, fold_id, f1_score):
    """
    Loggt den Fold mit dem höchsten F1-Score, der bei einer Kreuzvalidierung
    erreicht wurde.

    Args:
        f1_score (float): Der höchste F1-Score.
    """
    mlflow.log_param("Bester Fold", fold_id)
    mlflow.log_param("Höchster F1-Score eines Folds", f1_score)

log_classification_report(y_test, y_pred, artifact_name='Klassifikationsreport.json')

Erstellt einen Klassifikationsreport im Dictionary Format und logged ihn als MLflow-Artefakt. Der Dictionary-Klassifikationsreport wird anschließend zurückgegeben.

Parameters:

Name Type Description Default
y_test array - like

Wahre Zielwerte.

required
y_pred array - like

Vorhergesagte Zielwerte.

required
artifact_name str

Dateiname für das gespeicherte Artefakt. Default ist "Klassifikationsreport.json".

'Klassifikationsreport.json'

Returns:

Name Type Description
dict dict

Der Klassifikationsreport als Dictionary.

Source code in src\training\logging.py
def log_classification_report(
        self,
        y_test,
        y_pred,
        artifact_name: str = "Klassifikationsreport.json"
) -> dict:
    """
    Erstellt einen Klassifikationsreport im Dictionary Format und logged ihn als
    MLflow-Artefakt. Der Dictionary-Klassifikationsreport wird anschließend
    zurückgegeben.

    Args:
        y_test (array-like): Wahre Zielwerte.

        y_pred (array-like): Vorhergesagte Zielwerte.

        artifact_name (str, optional): Dateiname für das gespeicherte Artefakt.
            Default ist "Klassifikationsreport.json".

    Returns:
        dict: Der Klassifikationsreport als Dictionary.
    """

    # Erstelle den Klassifikationsreport als Dictionary
    cr_dict = classification_report(y_test,
                                    y_pred,
                                    zero_division=0,
                                    output_dict=True)

    # Schreibe den Inhalt in einen In-Memory Text-Puffer
    buffer = io.StringIO()
    json.dump(cr_dict, buffer)

    # Konvertiere den Text-Puffer in einen Byte-Puffer
    buffer.seek(0)
    byte_buffer = io.BytesIO(buffer.getvalue().encode('utf-8'))

    # Erstelle eine temporäre Datei, die später gelogged werden soll.
    # Speichere den Inhalt des Byte-Puffers in der Datei.
    with tempfile.NamedTemporaryFile(delete=False, mode='wb') as temp_file:
        temp_file.write(byte_buffer.getvalue())

        # Merken des Dateinamens
        temp_file_name = temp_file.name

    # Falls Datei mit Zielnamen bereits existiert, lösche sie
    if os.path.exists(artifact_name):
        os.remove(artifact_name)

    # Gebe der temporären Datei den gewünschten Dateinamen
    shutil.move(temp_file_name, artifact_name)

    # Logge die Datei als Mlflow Artefakt
    try:
        mlflow.log_artifact(artifact_name)
        print("Der Klassifikationsreport wurde erfolgreich unter "
              f"'{artifact_name}' gespeichert.")
    except Exception as e:
        # Fehler beim Logging
        print(f"Fehler beim loggen des Artifakts {artifact_name} in Mlflow: {e}")
    finally:
        # Löschen der lokalen, temporären Datei nach dem Logging
        os.remove(artifact_name)

    return cr_dict

log_confusion_matrix(y_true, y_pred, labels=None, artifact_name='Konfusionsmatrix.png')

Erstellt und loggt eine Konfusionsmatrix als .png Datei.

Parameters:

Name Type Description Default
y_true array - like

Wahre Labels.

required
y_pred array - like

Vorhergesagte Labels.

required
labels list

Labelnamen. Falls None werden diese aus y_true

None
artifact_name str

Dateiname der zu loggenden Bilddatei.

'Konfusionsmatrix.png'
Source code in src\training\logging.py
def log_confusion_matrix(
    self,
    y_true,
    y_pred,
    labels=None,
    artifact_name="Konfusionsmatrix.png",
):
    """
    Erstellt und loggt eine Konfusionsmatrix als .png Datei.

    Args:
        y_true (array-like): Wahre Labels.

        y_pred (array-like): Vorhergesagte Labels.

        labels (list, optional): Labelnamen. Falls None werden diese aus y_true
        extrahiert.

        artifact_name (str, optional): Dateiname der zu loggenden Bilddatei.
    """
    # Extrahiere alle vorkommenden Labels
    if not labels:
        labels = y_true.unique()

    # Berechne die Konfusionsmatrix
    cm = confusion_matrix(y_true, y_pred, labels=labels)

    # Erstellen der pyplot figure mit passenden Farb-Settings für das Streamlit
    # Dashboard
    streamlit_backgound_color = "#0E1117" # Streamlit Dark Mode Hintergrund
    fig, ax = plt.subplots(figsize=(8, 6))
    fig.patch.set_facecolor(streamlit_backgound_color)
    ax.set_facecolor(streamlit_backgound_color)
    cmap = sns.color_palette("flare", as_cmap=True)

    # Erstellen der Konfusionsmatrix-Heatmap
    heatmap = sns.heatmap(
        cm,
        annot=True,
        fmt="d",
        cmap=cmap,
        xticklabels=labels,
        yticklabels=labels,
        ax=ax,
        cbar=True
    )

    # Konfiguration der Achsenbeschriftung
    label_color = "#D5E5F4" # High Contrast Farbe passend zum Hintergrund
    ax.set_xlabel("Vorhergesagte Klasse", color=label_color)
    ax.set_ylabel("Tatsächliche Klasse", color=label_color)
    ax.tick_params(colors=label_color)

    # Konfiguration der Farbe der Spine-Labels
    for spine in ax.spines.values():
        spine.set_color(label_color)

    # Konfiguration der Farbe der Labels der Colorbar der Heatmap
    cbar = heatmap.collections[0].colorbar
    cbar.ax.tick_params(labelcolor=label_color)

    # Anpassung des pyplot layouts sodass die nichts am Rand abgeschnitten ist
    plt.tight_layout()

    # Logge die figure als .png in MLflow
    mlflow.log_figure(fig, artifact_file=artifact_name)
    print("Die Konfusionsmatrix wurde erfolgreich unter"
          f" '{artifact_name}' gespeichert.")

    # Schließt die Figure um den Speicher nicht unnötig zu belegen
    plt.close(fig)

    # Logge die figure als .pkl in MLflow
    pickle_file = "Konfusionsmatrix.pkl"
    with open(pickle_file, "wb") as f:
        pickle.dump(fig, f)
    mlflow.log_artifact(pickle_file)

    return fig

log_dataset_name(dataset_name)

Logged den Namen der Datensatz-Datei, die für ein Training verwendet wurde.

Parameters:

Name Type Description Default
dataset_name str

Der Name der Datensatz-Datei.

required
Source code in src\training\logging.py
def log_dataset_name(self, dataset_name: str):
    """
    Logged den Namen der Datensatz-Datei, die für ein Training verwendet wurde.

    Args:
        dataset_name (str): Der Name der Datensatz-Datei.
    """
    mlflow.log_param("dataset - name", dataset_name)

log_duration(start_time, end_time)

Loggt die Trainingsdauer in Sekunden.

Parameters:

Name Type Description Default
start_time float

Startzeit in Sekunden.

required
end_time float

Endzeit in Sekunden.

required
Source code in src\training\logging.py
def log_duration(self, start_time: float, end_time: float):
    """
    Loggt die Trainingsdauer in Sekunden.

    Args:
        start_time (float): Startzeit in Sekunden.
        end_time (float): Endzeit in Sekunden.
    """
    duration = round(end_time - start_time, 1)
    mlflow.log_param("Trainingsdauer in Sekunden", duration)

log_fold_count(num_folds)

Loggt die Anzahl der Cross-Validation-Folds.

Parameters:

Name Type Description Default
num_folds int

Anzahl der Folds.

required
Source code in src\training\logging.py
def log_fold_count(self, num_folds: int):
    """
    Loggt die Anzahl der Cross-Validation-Folds.

    Args:
        num_folds (int): Anzahl der Folds.
    """
    mlflow.log_param("Anzahl äußerer Folds", num_folds)

log_gridsearch_hyperparameters(grid_search)

Loggt die besten Hyperparameter einer durchgeführten GridSearchCV.

Parameters:

Name Type Description Default
grid_search GridSearchCV

Ausgeführtes GridSearchCV-Objekt.

required
Source code in src\training\logging.py
def log_gridsearch_hyperparameters(self, grid_search: GridSearchCV):
    """
    Loggt die besten Hyperparameter einer durchgeführten GridSearchCV.

    Args:
        grid_search (GridSearchCV): Ausgeführtes GridSearchCV-Objekt.
    """
    # Überprüfe, ob das übergebene GridSearchCV Objekt gefitted wurde
    if not hasattr(grid_search, "best_params_"):
        raise AttributeError(
            "Übergebenes GridSearchCV Objekt wurde nicht gefitted."
        )

    # Logge jeden einzelnen der besten Hyperparameter als Parameter
    for param_name, param_value in grid_search.best_params_.items():
        mlflow.log_param(f"Best Parameter Value - {param_name}", param_value)

log_mean_f1(mean_f1)

Loggt den durchschnittlichen F1-Score.

Parameters:

Name Type Description Default
mean_f1 float

Durchschnittlicher F1-Score.

required
Source code in src\training\logging.py
def log_mean_f1(self, mean_f1: float):
    """
    Loggt den durchschnittlichen F1-Score.

    Args:
        mean_f1 (float): Durchschnittlicher F1-Score.
    """
    mlflow.log_param("Durchschnittlicher F1-Score", mean_f1)

log_model(model, X_train=None, create_signature=False)

Protokolliert ein trainiertes ML-Modell als Artefakt in einem MLflow-Run.

Parameters:

Name Type Description Default
model

Das trainierte Modellobjekt (muss von mlflow.sklearn unterstützt

required
Source code in src\training\logging.py
def log_model(self, model, X_train=None, create_signature: bool=False):
    """
    Protokolliert ein trainiertes ML-Modell als Artefakt in einem MLflow-Run.

    Args:
        model: Das trainierte Modellobjekt (muss von mlflow.sklearn unterstützt
        werden).
    """
    self.trained_pipeline = model
    self._log_pipeline_parameters(self.trained_pipeline)

    model_info = None
    if X_train is not None and create_signature:
        model_info = mlflow.sklearn.log_model(
            model,
            artifact_path=f"{get_model_name(model)}_pipeline",
            signature=mlflow.models.infer_signature(
                X_train, model.predict(X_train)
            ),
        )
    else:
        model_info = mlflow.sklearn.log_model(
            model, artifact_path=f"{get_model_name(model)}_pipeline"
        )

    self._log_model_uri(model_info.model_uri)

    run_id = self.get_run_id()
    print(
        f"Das trainierte Modell wurde erfolgreich unter der Run-ID '{run_id}'"
        " gespeichert.\n"
    )

log_param_grid(param_grid)

Loggt das Hyperparameter-Grid eines GridSearch.

Parameters:

Name Type Description Default
param_grid dict

Parametergrid des GridSearch.

required
Source code in src\training\logging.py
def log_param_grid(self, param_grid: dict):
    """
    Loggt das Hyperparameter-Grid eines GridSearch.

    Args:
        param_grid (dict): Parametergrid des GridSearch.
    """
    mlflow.log_param("Grid Search Parametergrid", param_grid)

log_resampler(resampler)

Loggt die Parameter eines Resamplers.

Parameters:

Name Type Description Default
resampler

Ein imbalanced-learn Resampler Objekt.

required
Source code in src\training\logging.py
def log_resampler(self, resampler):
    """
    Loggt die Parameter eines Resamplers.

    Args:
        resampler: Ein imbalanced-learn Resampler Objekt.
    """
    # Überprüfe, ob die der Resampler die Funktion get_params() besitzt
    if not isinstance(resampler, BaseSampler):
        raise ValueError("Das übergebene Objekt ist kein imblearn Resampler.")

    # Logge den Resampler Typ
    type = None
    if isinstance(resampler, BaseOverSampler):
        type = "oversampler"
        mlflow.log_param(type, str(type(resampler).__name__))
    elif isinstance(resampler, BaseUnderSampler):
        type = "undersampler"
        mlflow.log_param(type, str(type(resampler).__name__))

    # Logge die Parameter des Resamplers
    params = resampler.get_params()
    for param_name, param_value in params.items():
            # Wandle alle Parameternamen in Strings um
            if type:
                key = f"{type} - {param_name}"
                mlflow.log_param(key, str(param_value))
            else:
                raise ValueError("Das übergebene Objekt ist weder ein Oversampler"
                                 " noch ein Undersampler.")

log_scores(f1_macro, prec_macro, rec_macro, acc)

Loggt alle relevanten Klassifikationsmetriken in MLflow.

Parameters:

Name Type Description Default
f1_macro float

Makro-F1-Score.

required
prec_macro float

Makro-Precision.

required
rec_macro float

Makro-Recall.

required
acc float

Genauigkeit (Accuracy).

required
Source code in src\training\logging.py
def log_scores(
    self, f1_macro: float, prec_macro: float, rec_macro: float, acc: float
):
    """
    Loggt alle relevanten Klassifikationsmetriken in MLflow.

    Args:
        f1_macro (float): Makro-F1-Score.
        prec_macro (float): Makro-Precision.
        rec_macro (float): Makro-Recall.
        acc (float): Genauigkeit (Accuracy).
    """
    # Überprüfe, dass alle Metriken übergeben wurden
    for name, value in locals().items():
        if value is None:
            raise ValueError("Eine der notwendigen Metriken wurde nicht übergeben.")

    # Logge die Metriken in MLflow
    self._log_f1_score(f1_macro)
    self._log_precision_score(prec_macro)
    self._log_recall_score(rec_macro)
    self._log_accuracy_score(acc)

log_std_f1(std_f1)

Loggt die Standardabweichung des durschschnittlichen F1-Scores.

Parameters:

Name Type Description Default
std_f1 float

Standardabweichung des durchschnittlichen F1-Scores.

required
Source code in src\training\logging.py
def log_std_f1(self, std_f1: float):
    """
    Loggt die Standardabweichung des durschschnittlichen F1-Scores.

    Args:
        std_f1 (float): Standardabweichung des durchschnittlichen F1-Scores.
    """
    mlflow.log_param("Standardabweichung durchschnittlicher F1-Score", std_f1)

start_nested_run(run_name)

Startet einen verschachtelten MLflow-Run unterhalb des bereits aktivierten Runs.

Parameters:

Name Type Description Default
run_name str

Name des verschachtelten Runs.

required
Source code in src\training\logging.py
def start_nested_run(self, run_name: str):
    """
    Startet einen verschachtelten MLflow-Run unterhalb des bereits aktivierten Runs.

    Args:
        run_name (str): Name des verschachtelten Runs.
    """
    self.active_child_run = mlflow.start_run(run_name=run_name, nested=True)

start_run(run_name=None)

Startet einen neuen MLflow-Run und loggt die Pipeline-Parameter.

Parameters:

Name Type Description Default
run_name str

Name des MLflow-Runs.

None
Source code in src\training\logging.py
def start_run(self, run_name: str = None):
    """
    Startet einen neuen MLflow-Run und loggt die Pipeline-Parameter.

    Args:
        run_name (str): Name des MLflow-Runs.
    """
    # Falls schon ein Run gestartet wurde
    if self.active_run:
        pass
    # Falls noch kein Run gestartet wurde und ein custom Run-Name übergeben wurde
    elif run_name:
        self.run_name = run_name
        self.active_run = mlflow.start_run(run_name=run_name)
    # Falls noch kein Run gestartet wurde und kein custom Run-Name übergeben wurde.
    # Default Name: Modell der Pipeline
    else:
        self.active_run = mlflow.start_run(run_name=self.run_name)

src.training.persistence

Modul für die Persistenz während des Machine Learning Prozesses. Enthält Funktionen zum Speichern und Laden von Pipeline Objekten und/oder Klassifizierern/Estimatoren.

extract_final_estimator(filepath)

Lädt ein sklearn Pipeline Objekt aus einer lokalen Datei und extrahiert den finalen Estimator.

Parameters:

Name Type Description Default
filepath str

Pfad zur Pipeline-Datei (z.B. 'models/model.pkl').

required

Returns:

Name Type Description
estimator BaseEstimator

Der finale Estimator der Pipeline (letzter Schritt).

Raises:

Type Description
ValueError

Falls das geladene Objekt keine Pipeline ist oder der letzte Schritt kein Estimator mit predict/predict_proba ist.

Source code in src\training\persistence.py
def extract_final_estimator(filepath: str):
    """
    Lädt ein sklearn Pipeline Objekt aus einer lokalen Datei und extrahiert
    den finalen Estimator.

    Args:
        filepath (str): Pfad zur Pipeline-Datei (z.B. 'models/model.pkl').

    Returns:
        estimator (BaseEstimator): Der finale Estimator der Pipeline (letzter Schritt).

    Raises:
        ValueError: Falls das geladene Objekt keine Pipeline ist oder
            der letzte Schritt kein Estimator mit predict/predict_proba ist.
    """

    # Pipeline laden (interne Funktion)
    pipeline = load_pipeline_locally(filepath)

    # Überprüfe, ob die Pipeline mindestens einen Schritt besitzt
    if not hasattr(pipeline, "steps") or not pipeline.steps:
        raise ValueError("Das Pipeline Objekt hat keine Zwischenschritte definiert.")

    # Extrahiere den letzten Schritt aus der Pipeline (Name, Komponente)
    final_step_name, final_component = pipeline.steps[-1]

    # Überprüfe, ob der finale Schritt ein Estimator mit predict oder predict_proba ist
    if not (
        hasattr(final_component, "predict") or hasattr(final_component, "predict_proba")
    ):
        raise ValueError(
            f"Der letzte Pipeline Schritt '{final_step_name}' ist kein Estimator."
        )

    print(
        f" -> Estimator '{final_step_name}' vom Typ '{type(final_component).__name__}'"
        f" erfolgreich aus der Datei '{filepath}' extrahiert."
    )

    return final_component

load_pipeline(filepath)

Lädt eine mit joblib gespeicherte Pipeline, welche sich unter dem angegebenen Pfad auf dem MinIO Server befindet.

Parameters:

Name Type Description Default
filepath str

Pfad im Format 'bucketname/objectpath', z.B. 'models/model.pkl'.

required

Returns:

Name Type Description
Pipeline

Das gespeicherte sklearn Pipeline Objekt.

Source code in src\training\persistence.py
def load_pipeline(filepath: str):
    """
    Lädt eine mit joblib gespeicherte Pipeline, welche sich unter dem angegebenen Pfad
    auf dem MinIO Server befindet.

    Args:
        filepath (str): Pfad im Format 'bucketname/objectpath', z.B. 'models/model.pkl'.

    Returns:
        Pipeline: Das gespeicherte sklearn Pipeline Objekt.
    """

    # Überprüfe, ob alle benötigten Environment Variablen vorhanden sind
    client = get_minio_client()

    # Trenne Bucket-Name und Objekt-Pfad aus dem Dateipfad
    if "/" not in filepath:
        raise ValueError(
            f"Ungültiges Format für filepath: '{filepath}'. \n"
            "Erwartet: 'bucketname/objectpath'."
        )

    bucket, object_path = filepath.split("/", 1)

    # Pipeline-Datei vom MinIO Bucket abrufen
    try:
        response = client.get_object(bucket, object_path)
        try:
            data = response.read()
        finally:
            response.close()
            response.release_conn()
    except S3Error as e:
        if e.code == "NoSuchKey":
            raise FileNotFoundError(
                f"Datei '{object_path}' konnte im Bucket '{bucket}' nicht gefunden werden."
            )
        elif e.code == "NoSuchBucket":
            raise FileNotFoundError(f"Bucket '{bucket}' konnte nicht gefunden werden.")
        else:
            raise RuntimeError(f"Fehler beim Zugriff auf MinIO: {e}")
    except InvalidResponseError as e:
        raise RuntimeError(f"Fehlerhafte Antwort vom MinIO Server: {e}")
    except Exception as e:
        raise RuntimeError(f"Unbekannter Fehler beim Laden der Datei vom MinIO Server: {e}")

    # Pipeline mit joblib aus Bytes laden
    try:
        pipeline = joblib.load(BytesIO(data))
    except Exception as e:
        raise RuntimeError(f"Fehler beim Laden der Pipeline aus den Daten: {e}")

    print(
        f" -> Pipeline mit joblib aus dem Bucket '{bucket}' und dem Dateipfad "
        f"'{object_path}' erfolgreich geladen"
    )

    return pipeline

load_pipeline_locally(filepath)

Lädt ein sklearn Pipeline Objekt aus einer lokalen Datei, die ein Pipeline Objekt enthält.

Parameters:

Name Type Description Default
filepath str

Pfad zur gespeicherten Pipeline-Datei (z.B. 'models/model.pkl').

required

Returns:

Name Type Description
Pipeline Pipeline

Das geladene sklearn Pipeline Objekt.

Raises:

Type Description
FileNotFoundError

Falls die Datei nicht gefunden wurde.

ValueError

Falls das geladene Objekt keine sklearn Pipeline ist.

Source code in src\training\persistence.py
@st.cache_resource
def load_pipeline_locally(filepath: str) -> Pipeline:
    """
    Lädt ein sklearn Pipeline Objekt aus einer lokalen Datei, die ein Pipeline Objekt
    enthält.

    Args:
        filepath (str): Pfad zur gespeicherten Pipeline-Datei (z.B. 'models/model.pkl').

    Returns:
        Pipeline: Das geladene sklearn Pipeline Objekt.

    Raises:
        FileNotFoundError: Falls die Datei nicht gefunden wurde.
        ValueError: Falls das geladene Objekt keine sklearn Pipeline ist.
    """

    # Lade das gespeicherte Pipeline Objekt
    pipeline = joblib.load(filepath)

    # Überprüfe, ob die angegebene Datei unter dem angegebenen Pfad existiert
    if not os.path.isfile(filepath):
        raise FileNotFoundError(
            f"Die angegebene Datei konnte unter dem Pfad '{filepath}' nicht gefunden"
            " werden."
        )

    # Verifiziere, dass das geladene Objekt eine Pipeline ist
    if not isinstance(pipeline, Pipeline):
        raise ValueError("Das zu ladende Objekt ist kein sklearn Pipeline Objekt.")

    print(f" -> Pipeline Objekt aus {filepath} erfolgreich geladen.")
    return pipeline

save_pipeline(pipeline, filepath)

Speichert eine Pipeline mit joblib in einem MinIO Bucket.

Parameters:

Name Type Description Default
pipeline object

Das zu speichernde Pipeline-Objekt (z.B. sklearn Pipeline).

required
filepath str

Speicherpfad im Format 'bucketname/objectpath', z.B.

required
Source code in src\training\persistence.py
def save_pipeline(pipeline, filepath: str):
    """
    Speichert eine Pipeline mit joblib in einem MinIO Bucket.

    Args:
        pipeline (object): Das zu speichernde Pipeline-Objekt (z.B. sklearn Pipeline).
        filepath (str): Speicherpfad im Format 'bucketname/objectpath', z.B.
        'models/pipeline.pkl'.
    """

    # Überprüfe, ob alle benötigten Environment Variablen vorhanden sind
    required_vars = [
        "MLFLOW_S3_ENDPOINT_URL",
        "AWS_ACCESS_KEY_ID",
        "AWS_SECRET_ACCESS_KEY",
    ]
    missing_vars = [var for var in required_vars if not os.environ.get(var)]

    if missing_vars:
        raise EnvironmentError(
            f"Fehlende, nicht gesetzte Environment-Variablen: {', '.join(missing_vars)}"
        )

    # Lese die MinIO Konfiguration aus den Environment Variablen ein
    minio_endpoint_url = urlparse(os.environ.get("MLFLOW_S3_ENDPOINT_URL")).netloc
    aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID")
    aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY")

    # Trenne Bucket-Name und Objekt-Pfad aus dem Dateipfad
    if "/" not in filepath:
        raise ValueError(
            f"Ungültiges Format für filepath: '{filepath}'. \n"
            "Erwartet: 'bucketname/objectpath'."
        )

    bucket, object_path = filepath.split("/", 1)

    # MinIO Client initialisieren
    try:
        client = Minio(
            minio_endpoint_url,
            access_key=aws_access_key_id,
            secret_key=aws_secret_access_key,
            secure=True,  # hier immer True
        )
    except Exception as e:
        raise ConnectionError(
            f"Verbindung zum MinIO Server konnte nicht hergestellt werden: \n {e}"
        )

    # Serialisiere die Pipeline mit joblib
    bytes_io = BytesIO()
    try:
        joblib.dump(pipeline, bytes_io, compress=0)  # optional: compress=0 for portability
    except Exception as e:
        raise ValueError(f"Serialisierung mit joblib fehlgeschlagen: \n{e}")

    bytes_data = bytes_io.getvalue()

    # Pipeline-Bytes in MinIO hochladen
    try:
        client.put_object(
            bucket_name=bucket,
            object_name=object_path,
            data=BytesIO(bytes_data),
            length=len(bytes_data),
        )
    except S3Error as e:
        raise IOError(f"Hochladen in den MinIO Bucket fehlgeschlagen: \n{e}")

    print(f" -> Pipeline mit joblib in Bucket '{bucket}' unter '{object_path}' gespeichert")

save_pipeline_locally(pipeline, filepath)

Speichert ein Pipeline Objekt lokal unter dem angegebenen Pfad.

Parameters:

Name Type Description Default
pipeline Pipeline

Das sklearn Pipeline Objekt, das gespeichert werden soll.

required
filepath str

Der Pfad inklusive Dateiname, unter dem die Pipeline gespeichert

required

Raises:

Type Description
ValueError

Falls das übergebene Pipeline Objekt kein sklearn Pipeline Objekt

FileNotFoundError

Falls der angegebene Ordnerpfad nicht existiert.

Source code in src\training\persistence.py
def save_pipeline_locally(pipeline: Pipeline, filepath: str):
    """
    Speichert ein Pipeline Objekt lokal unter dem angegebenen Pfad.

    Args:
        pipeline (Pipeline): Das sklearn Pipeline Objekt, das gespeichert werden soll.

        filepath (str): Der Pfad inklusive Dateiname, unter dem die Pipeline gespeichert
        werden soll (z.B. 'models/model.pkl').

    Raises:
        ValueError: Falls das übergebene Pipeline Objekt kein sklearn Pipeline Objekt
        ist.

        FileNotFoundError: Falls der angegebene Ordnerpfad nicht existiert.
    """

    # Überprüfe, ob das übergebene Objekt eine sklearn Pipeline ist
    if not isinstance(pipeline, Pipeline):
        raise ValueError("Das zu speichernde Objekt ist kein sklearn Pipeline Objekt.")

    # Extrahiere den Ordnerpfad aus dem vollständigen Dateipfad
    directory = os.path.dirname(filepath)

    # Überprüfe, ob der angegebene Ordner existiert
    if directory and not os.path.exists(directory):
        raise FileNotFoundError(
            f"Der angegebene Ordner '{directory}' konnte nicht gefunden werden."
        )

    # Speichere das Pipeline Objekt unter dem angegebenen Pfad
    joblib.dump(pipeline, filepath)
    print(f" -> Pipeline erfolgreich gespeichert unter '{filepath}'")

src.training.train

Modul für das Training von sklearn Estimatoren. Enthält Funktionen für Nested Cross Validation, Hyperparameter-Tuning, Training sowie Evaluierung von Modellen.

evaluate_model(pipeline, X_test, y_test, logger, labelEncoder=None)

Evaluiert ein trainiertes Scikit-Learn-Pipeline-Modell auf einem Testdatensatz.

Diese Funktion berechnet mehrere Klassifikationsmetriken und gibt sowohl numerische als auch textuelle Auswertungen auf der Konsole aus. Zusätzlich werden die Ergebnisse über einen Logger gespeichert.

Parameters:

Name Type Description Default
pipeline Pipeline

Ein bereits trainiertes Scikit-Learn-Pipeline-Modell.

required
X_test pd.Series oder pd.DataFrame

Testdaten (Features).

required
y_test Series

Wahre Klassenlabels für den Testdatensatz.

required
logger Logger

Logger-Instanz zur Speicherung der Evaluationsergebnisse.

required
labelEncoder LabelEncoder

Optional. Wird für die Integration von Resampling Methoden benötigt.

None

Returns:

Name Type Description
cm

Die Konfusionsmatrix als pyplot figure Objekt.

cr

Den Klassifikationsreport als dict.

Source code in src\training\train.py
def evaluate_model(
        pipeline,
        X_test, y_test,
        logger: Logger,
        labelEncoder: LabelEncoder=None
    ):
    """
    Evaluiert ein trainiertes Scikit-Learn-Pipeline-Modell auf einem Testdatensatz.

    Diese Funktion berechnet mehrere Klassifikationsmetriken und gibt sowohl
    numerische als auch textuelle Auswertungen auf der Konsole aus. Zusätzlich
    werden die Ergebnisse über einen Logger gespeichert.

    Args:
        pipeline (Pipeline): Ein bereits trainiertes Scikit-Learn-Pipeline-Modell.

        X_test (pd.Series oder pd.DataFrame): Testdaten (Features).

        y_test (pd.Series): Wahre Klassenlabels für den Testdatensatz.

        logger (Logger): Logger-Instanz zur Speicherung der Evaluationsergebnisse.

        labelEncoder (LabelEncoder): Optional. Wird für die Integration von Resampling
            Methoden benötigt.

    Returns:
        cm: Die Konfusionsmatrix als pyplot figure Objekt.

        cr: Den Klassifikationsreport als dict.
    """

    print("[INFO] Beginne Evaluierung des Modells auf dem Testdatensatz")

    # Führe die Vorhersage auf den Testdaten durch
    y_pred = pipeline.predict(X_test)

    # Berechne die Evaluationsmetriken (macro: Mittelwert über alle Klassen)
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average="macro", zero_division=0)
    recall = recall_score(y_test, y_pred, average="macro", zero_division=0)
    f1 = f1_score(y_test, y_pred, average="macro", zero_division=0)

    # Ausgabe der Evaluationsergebnisse
    print("       -> Evaluationsergebnisse des Modells:")
    print(" " * 7 + "-" * 37)
    print(" " * 10 + f"Accuracy : {accuracy:.4f}")
    print(" " * 10 + f"Precision: {precision:.4f}")
    print(" " * 10 + f"Recall   : {recall:.4f}")
    print(" " * 10 + f"F1 Score : {f1:.4f}")

    # Logge die berechneten Metriken und die Konfusionsmatrix
    print("\n[INFO] Speichere die Ergebnisse:")
    print("-" * 32)
    logger.log_scores(f1, precision, recall, accuracy)

    # Falls codierte Daten übergeben wurden: Gebe die Evaluierungen mit originalen
    # Label-Namen aus
    if labelEncoder:
        cm = logger.log_confusion_matrix(
                pd.Series(labelEncoder.inverse_transform(y_test)),
                pd.Series(labelEncoder.inverse_transform(y_pred))
             )

        cr = logger.log_classification_report(
                pd.Series(labelEncoder.inverse_transform(y_test)),
                pd.Series(labelEncoder.inverse_transform(y_pred))
             )
    else:
        cm = logger.log_confusion_matrix(y_test, y_pred)
        cr = logger.log_classification_report(y_test, y_pred)

    return cm, cr

perform_nested_cross_validation(X, y, estimator, param_grid, logger=None, outer_splits=5, inner_splits=5)

Führt eine Nested Cross Validation durch zur Auswahl des besten Modells auf dem übergebenen Datensatz.

Parameters:

Name Type Description Default
X DataFrame

Eingabedaten.

required
y Series

Zielvariablen.

required
estimator BaseEstimator

Modellpipeline.

required
param_grid dict

Hyperparameterkonfiguration.

required
logger Logger

Logger zur Protokollierung.

None
outer_splits int

Anzahl äußerer Folds. Default sind 5.

5
inner_splits int

Anzahl innerer Folds. Default sind 5.

5

Returns:

Name Type Description
tuple

Das beste Modell und eine Liste der F1-Scores pro Fold.

Source code in src\training\train.py
def perform_nested_cross_validation(
    X,
    y,
    estimator: BaseEstimator,
    param_grid,
    logger: Logger = None,
    outer_splits: int = 5,
    inner_splits: int = 5,
):
    """
    Führt eine Nested Cross Validation durch zur Auswahl des besten Modells auf dem
    übergebenen Datensatz.

    Args:
        X (pd.DataFrame): Eingabedaten.

        y (pd.Series): Zielvariablen.

        estimator (BaseEstimator): Modellpipeline.

        param_grid (dict): Hyperparameterkonfiguration.

        logger (Logger, optional): Logger zur Protokollierung.

        outer_splits (int, optional): Anzahl äußerer Folds. Default sind 5.

        inner_splits (int, optional): Anzahl innerer Folds. Default sind 5.

    Returns:
        tuple: Das beste Modell und eine Liste der F1-Scores pro Fold.
    """

    # Starte das logging, falls ein Logger Objekt übergeben wurde
    if logger:
        run_name = logger.run_name
        logger.start_run(run_name)
        print(
            f"-> Beginne Logging in Experiment '{logger.get_experiment_name()}' als"
            f" Run '{run_name}'"
        )
        logger.log_fold_count(outer_splits)

    # Initialisiere die Bewertungsparameter
    scores = []
    best_score = 0
    best_estimator = None
    best_fold = None

    # Teile den initialen Datensatz stratifiziert in #outer_splits Subdatensätze
    folds = StratifiedKFold(n_splits=outer_splits, shuffle=True, random_state=42)

    # Führe den äußeren Loop der Nested Cross Validation durch
    for fold_id, (train_id, test_id) in enumerate(folds.split(X, y)):
        X_train, X_test = X.iloc[train_id], X.iloc[test_id]
        y_train, y_test = y.iloc[train_id], y.iloc[test_id]

        model, f1 = _process_cv_fold(
            fold_id,
            X_train,
            y_train,
            X_test,
            y_test,
            estimator,
            param_grid,
            inner_splits,
            logger,
        )

        scores.append(f1)

        # Speichere das beste Modell und dessen Score
        if f1 > best_score:
            best_score = f1
            best_estimator = model
            best_fold = fold_id

    # Berechne die durchschnittlichen Werte, die für alle Folds erreicht wurden
    mean_f1 = np.mean(scores)
    std_f1 = np.std(scores)

    # Logge die Ergebnisse, sofern ein Logger-Objekt übergeben wurde
    if logger:
        logger.log_mean_f1(mean_f1)
        logger.log_std_f1(std_f1)
        logger.log_best_f1(best_fold, best_score)
        logger.end_run()
        print(
            f"  -> Logging in Experiment {logger.get_experiment_name()}, Run {run_name}"
            " ist beendet."
        )

    print(" -> Training beendet.")
    print(f" -> Nested CV abgeschlossen. Durchschnittlicher F1-Score: {mean_f1:.4f}")
    print(f" -> Standardabweichung: {std_f1:.4f}")

    return best_estimator, scores

tune_and_train_model(pipeline, X_train, y_train, param_grid, logger=None)

Führt ein Hyperparameter-Tuning und darauffolgendes Modelltraining für das übergebene Pipeline-Objekt auf dem übergebenen Datensatz durch.

Parameters:

Name Type Description Default
pipeline Pipeline

Pipeline mit Vorverarbeitung und einem finalen

required
X_train pd.Series oder pd.DataFrame

Trainingsmerkmale.

required
y_train Series

Trainingslabels.

required
param_grid dict

Dictionary mit den zu testenden Hyperparametern.

required
logger Logger

logger-Objekt für das logging in MLflow.

None

Returns:

Name Type Description
BaseEstimator

Das Modell mit optimalen Hyperparametern auf dem Datensatz.

Source code in src\training\train.py
def tune_and_train_model(
    pipeline: Pipeline, X_train, y_train, param_grid: dict, logger: Logger = None
):
    """
    Führt ein Hyperparameter-Tuning und darauffolgendes Modelltraining für
    das übergebene Pipeline-Objekt auf dem übergebenen Datensatz durch.

    Args:
        pipeline (Pipeline): Pipeline mit Vorverarbeitung und einem finalen
        Estimator-Schritt.

        X_train (pd.Series oder pd.DataFrame): Trainingsmerkmale.

        y_train (pd.Series): Trainingslabels.

        param_grid (dict): Dictionary mit den zu testenden Hyperparametern.

        logger (Logger): logger-Objekt für das logging in MLflow.

    Returns:
        BaseEstimator: Das Modell mit optimalen Hyperparametern auf dem Datensatz.
    """

    # Starte das logging in MLflow, falls ein Logger-Objekt übergeben wurde
    if logger:
        logger.start_run(get_model_name(pipeline))

    print("[INFO] Beginne Hyperparameter-Tuning auf den Trainingsdaten")

    # Zerteile den Datensatz stratifiziert in fünf Sub-Datensätze
    cv_strategy = StratifiedKFold(n_splits=4, shuffle=True, random_state=42)

    # Starte den Grid Search über das übergebene Parametergrid
    print("       -> Beginne Grid Search:\n" + " " * 7 + "-" * 23)

    grid_search = GridSearchCV(
        estimator=pipeline,
        param_grid=param_grid,
        scoring="f1_macro",
        cv=cv_strategy,
        refit=True,
        n_jobs=-1,
        verbose=3,
    )

    grid_search.fit(X_train, y_train)
    best_model = grid_search.best_estimator_
    logger.log_gridsearch_hyperparameters(grid_search)


    # Gebe die wichtigsten Ergebnisse des Grid Search aus
    print(
        "\n[INFO] Die beste Hyperparameter-Kombination für das Modell: \n" +
        " " * 10 + f"{grid_search.best_params_}"
    )
    print("[INFO] Das beste Kreuzvalidierungsergebnis:\n" + " " * 10 + f"{grid_search.best_score_}\n")

    return best_model