Por Jose R. Zapata
Ultima actualización: 12/May/2025
Ejemplo de Experiment tracking
En el siguiente ejemplo se realizará un experimento de clasificación, donde se realizaran los procesos de entrenamiento y evaluación de un modelo, usando buenas practicas con pipelines de Scikit-learn y el registro del modelo, sus parametros y métricas usando MLflow como herramienta de experiment tracking.
📚 Importar librerias
from pathlib import Path
import mlflow
import mlflow.sklearn
import pandas as pd
from joblib import dump
from mlflow.models import infer_signature
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.metrics import f1_score, precision_score, recall_score, roc_auc_score
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder
💾 Cargar datos
url_data = "https://www.openml.org/data/get_csv/16826755/phpMYEkMl"
dataset = pd.read_csv(url_data, low_memory=False, na_values="?")
👷 Preparacion de datos
selected_features = [
"pclass",
"sex",
"age",
"sibsp",
"parch",
"fare",
"embarked",
"survived",
]
dataset_features = dataset[selected_features]
Definicion de tipo de datos
# numerical columns
cols_numeric_float = ["age", "fare"]
cols_numeric_int = ["sibsp", "parch"]
cols_numeric = cols_numeric_float + cols_numeric_int
# categorical columns
cols_categoric = ["sex", "embarked"]
cols_categoric_ord = ["pclass"]
cols_categorical = cols_categoric + cols_categoric_ord
Variables categoricas
dataset[cols_categoric] = dataset[cols_categoric].astype("category")
dataset["pclass"] = pd.Categorical(dataset["pclass"], categories=[3, 2, 1], ordered=True)
Variables Numericas
dataset[cols_numeric_float] = dataset[cols_numeric_float].astype("float")
dataset[cols_numeric_int] = dataset[cols_numeric_int].astype("int8")
Variable Objetivo (target)
target = "survived"
dataset[target] = dataset[target].astype("int8")
Los valores duplicados generan sesgos en el conjunto de datos y el modelo se entrenará con los mismos datos, lo que provocará sobreajuste y problemas de fuga de datos (data leakage). Es necesario eliminarlos.
dataset = dataset.drop_duplicates()
👨🏭 Feature Engineering
numeric_pipe = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="median")),
]
)
categorical_pipe = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="most_frequent")),
("onehot", OneHotEncoder()),
]
)
categorical_ord_pipe = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="most_frequent")),
("onehot", OrdinalEncoder()),
]
)
preprocessor = ColumnTransformer(
transformers=[
("numeric", numeric_pipe, cols_numeric),
("categoric", categorical_pipe, cols_categoric),
("categoric ordinal", categorical_ord_pipe, cols_categoric_ord),
]
)
Train / Test split
X_features = dataset.drop(target, axis="columns")
Y_target = dataset[target]
x_train, x_test, y_train, y_test = train_test_split(
X_features, Y_target, stratify=Y_target, test_size=0.2, random_state=42
)
Crear pipeline del modelo
Pipeline de preprocesamiento de datos + Modelo
data_model_pipeline = Pipeline(
steps=[("preprocessor", preprocessor), ("model", RandomForestClassifier())]
)
Ajuste de Hiperparametros
Seleccionan los mejores hiperparametros para el modelo seleccionado en el paso anterior.
Random Forest
score = "recall"
hyperparameters = {
"model__max_depth": [4, 5, 7, 9, 10],
"model__max_features": [2, 3, 4, 5, 6, 7, 8, 9],
"model__criterion": ["gini", "entropy"],
}
grid_search = GridSearchCV(
data_model_pipeline,
hyperparameters,
cv=5,
scoring=score,
n_jobs=8,
)
grid_search.fit(x_train, y_train)
# mejor modelo
best_data_model_pipeline = grid_search.best_estimator_
# mejores hiperparametros
best_model_params = grid_search.best_params_
Evaluacion del modelo
y_pred = best_data_model_pipeline.predict(x_test)
recall = recall_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
auc = roc_auc_score(y_test, y_pred)
print(f"recall: {recall}")
print(f"precision: {precision}")
print(f"f1: {f1}")
print(f"auc: {auc}")
recall: 0.73
precision: 0.8295454545454546
f1: 0.776595744680851
auc: 0.8187037037037037
Track del Experimento con MLflow
mlflow.set_tracking_uri("mlruns").as_uri()
exp = mlflow.set_experiment(experiment_name="titanic_models")
with mlflow.start_run() as run:
# Obtener la información del modelo
signature = infer_signature(x_train, best_data_model_pipeline.predict(x_train))
# Definir tags
mlflow.set_tag("Training Info", "First model: Random Forest")
# Registrar el modelo
mlflow.sklearn.log_model(
sk_model=best_data_model_pipeline,
artifact_path="titanic_model",
signature=signature,
input_example=x_train.head(),
registered_model_name="first_model",
)
# Registrar las métricas
mlflow.log_metrics({
"recall": recall,
"precision": precision,
"f1": f1,
"auc": auc,
})
# Registrar los parámetros
mlflow.log_params(best_model_params)
Grabar modelo con MLflow
DATA_MODEL = Path.cwd().resolve().parents[1] / "models"
# Save the model with mlflow
mlflow.sklearn.save_model(
sk_model=best_data_model_pipeline,
path=DATA_MODEL / "titanic_model_1",
signature=signature,
input_example=x_train.head(),
)
Cargar Modelo con MLflow
load_model = mlflow.sklearn.load_model(model_uri=DATA_MODEL / "titanic_model_1")
predictions = load_model.predict(x_test)
predictions[:12]
array([0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0], dtype=int8)