MLOps Best Practices: Production Machine Learning Infrastructure
Machine Learning Operations (MLOps) bridges the gap between model development and production deployment. This guide covers best practices for building reliable, scalable ML infrastructure.
What Is MLOps and Why Is It Essential?
MLOps applies DevOps principles to machine learning:
How Do You Structure an MLOps Pipeline?
End-to-End Pipeline Architecture
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โ Data โโโโ>โ Training โโโโ>โ Model โ
โ Ingestion โ โ Pipeline โ โ Registry โ
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โ โ
v v
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โ Experiment โ โ Deployment โ
โ Tracking โ โ Pipeline โ
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โ
v
โโโโโโโโโโโโโโโ
โ Serving & โ
โ Monitoring โ
โโโโโโโโโโโโโโโ
Data Pipeline Implementation
from prefect import flow, task
import pandas as pd
from great_expectations.core.batch import BatchRequest
@task
def ingest_data(source: str) -> pd.DataFrame:
"""Load raw data from source."""
if source.startswith('s3://'):
return pd.read_parquet(source)
return pd.read_csv(source)
@task
def validate_data(df: pd.DataFrame, expectation_suite: str) -> pd.DataFrame:
"""Validate data against expectations."""
context = ge.get_context()
batch_request = BatchRequest(
datasource_name="pandas_datasource",
data_asset_name="input_data",
batch_identifiers={"batch_id": "validation_batch"},
)
results = context.run_validation_operator(
"action_list_operator",
assets_to_validate=[batch_request],
expectation_suite_name=expectation_suite,
)
if not results["success"]:
raise ValueError("Data validation failed")
return df
@task
def transform_features(df: pd.DataFrame) -> pd.DataFrame:
"""Apply feature engineering transformations."""
df = df.copy()
# Feature engineering
df['log_amount'] = np.log1p(df['amount'])
df['hour_of_day'] = pd.to_datetime(df['timestamp']).dt.hour
df['is_weekend'] = pd.to_datetime(df['timestamp']).dt.dayofweek >= 5
return df
@flow
def data_pipeline(source: str, expectation_suite: str) -> pd.DataFrame:
"""Complete data ingestion and transformation pipeline."""
raw_data = ingest_data(source)
validated_data = validate_data(raw_data, expectation_suite)
features = transform_features(validated_data)
return featuresHow Do You Implement Model Versioning?
MLflow Model Registry
import mlflow
from mlflow.tracking import MlflowClient
def train_and_register_model(
X_train: np.ndarray,
y_train: np.ndarray,
model_name: str,
hyperparameters: dict
) -> str:
"""Train model and register in MLflow."""
with mlflow.start_run() as run:
# Log hyperparameters
mlflow.log_params(hyperparameters)
# Train model
model = XGBClassifier(**hyperparameters)
model.fit(X_train, y_train)
# Log metrics
predictions = model.predict(X_val)
accuracy = accuracy_score(y_val, predictions)
mlflow.log_metric("accuracy", accuracy)
# Log model
mlflow.xgboost.log_model(
model,
"model",
registered_model_name=model_name,
signature=infer_signature(X_train, predictions),
input_example=X_train[:5]
)
return run.info.run_id
def promote_model_to_production(model_name: str, version: int):
"""Promote a model version to production stage."""
client = MlflowClient()
# Archive current production model
for mv in client.search_model_versions(f"name='{model_name}'"):
if mv.current_stage == "Production":
client.transition_model_version_stage(
name=model_name,
version=mv.version,
stage="Archived"
)
# Promote new version
client.transition_model_version_stage(
name=model_name,
version=version,
stage="Production"
)DVC for Data Versioning
# dvc.yaml
stages:
preprocess:
cmd: python src/preprocess.py
deps:
- src/preprocess.py
- data/raw/
outs:
- data/processed/
params:
- preprocess.sample_rate
- preprocess.features
train:
cmd: python src/train.py
deps:
- src/train.py
- data/processed/
outs:
- models/
params:
- train.n_estimators
- train.learning_rate
metrics:
- metrics.json:
cache: falseHow Do You Deploy Models to Production?
Kubernetes Model Serving
# kubernetes/model-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: fraud-detection-model
spec:
replicas: 3
selector:
matchLabels:
app: fraud-detection
template:
metadata:
labels:
app: fraud-detection
spec:
containers:
- name: model-server
image: fraud-model:v1.2.0
ports:
- containerPort: 8080
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: fraud-detection-service
spec:
selector:
app: fraud-detection
ports:
- port: 80
targetPort: 8080
type: ClusterIPFastAPI Model Server
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow
import numpy as np
app = FastAPI()
# Load model on startup
model = mlflow.pyfunc.load_model("models:/fraud-detection/Production")
class PredictionRequest(BaseModel):
features: list[float]
class PredictionResponse(BaseModel):
prediction: int
probability: float
model_version: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
try:
features = np.array(request.features).reshape(1, -1)
prediction = model.predict(features)[0]
probability = model.predict_proba(features)[0].max()
return PredictionResponse(
prediction=int(prediction),
probability=float(probability),
model_version=model.metadata.run_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
return {"status": "healthy"}How Do You Monitor Models in Production?
Performance Monitoring
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, ClassificationPreset
def generate_monitoring_report(
reference_data: pd.DataFrame,
current_data: pd.DataFrame,
predictions: pd.DataFrame
) -> Report:
"""Generate model monitoring report."""
column_mapping = ColumnMapping(
prediction='prediction',
target='actual',
numerical_features=['amount', 'hour', 'day_of_week'],
categorical_features=['merchant_category', 'country']
)
report = Report(metrics=[
DataDriftPreset(),
ClassificationPreset()
])
report.run(
reference_data=reference_data,
current_data=current_data.join(predictions),
column_mapping=column_mapping
)
return report
def check_for_drift(report: Report) -> bool:
"""Check if data drift exceeds thresholds."""
drift_detected = report.as_dict()['metrics'][0]['result']['dataset_drift']
return drift_detectedAlerting Setup
from prometheus_client import Counter, Histogram, Gauge
import logging
# Metrics
prediction_counter = Counter(
'model_predictions_total',
'Total number of predictions',
['model_name', 'version', 'prediction']
)
prediction_latency = Histogram(
'model_prediction_latency_seconds',
'Prediction latency in seconds',
['model_name']
)
data_drift_score = Gauge(
'model_data_drift_score',
'Current data drift score',
['model_name', 'feature']
)
def log_prediction(model_name: str, version: str, prediction: int, latency: float):
"""Log prediction metrics."""
prediction_counter.labels(
model_name=model_name,
version=version,
prediction=str(prediction)
).inc()
prediction_latency.labels(model_name=model_name).observe(latency)What Are the Key MLOps Tools?
Tool Stack Comparison
| Category | Open Source | Managed |
|----------|-------------|---------|
| Experiment Tracking | MLflow, W&B | Comet, Neptune |
| Orchestration | Prefect, Airflow | Kubeflow Pipelines |
| Feature Store | Feast | Tecton, Databricks |
| Model Serving | Seldon, BentoML | SageMaker, Vertex AI |
| Monitoring | Evidently, Whylogs | Arize, Fiddler |
MLOps is essential for bridging the gap between model development and production value. By implementing these practices, you can build reliable ML systems that deliver consistent business impact.