AI & Machine Learning 5 min read 1 views

MLOps Best Practices: Production Machine Learning Infrastructure

Build robust MLOps infrastructure for production machine learning. Learn about model versioning, deployment pipelines, monitoring, and continuous training strategies.

A

Agochar

December 20, 2024

MLOps Best Practices: Production Machine Learning Infrastructure

MLOps Best Practices: Production Machine Learning Infrastructure

Machine Learning Operations (MLOps) bridges the gap between model development and production deployment. This guide covers best practices for building reliable, scalable ML infrastructure.

What Is MLOps and Why Is It Essential?

MLOps applies DevOps principles to machine learning:

  • Reproducibility: Every experiment can be reproduced
  • Automation: Pipelines reduce manual intervention
  • Monitoring: Track model performance in production
  • Governance: Audit trails and access controls
  • Scalability: Handle growing data and model complexity
  • How Do You Structure an MLOps Pipeline?

    End-to-End Pipeline Architecture

    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
    

    โ”‚ Data โ”‚โ”€โ”€โ”€>โ”‚ Training โ”‚โ”€โ”€โ”€>โ”‚ Model โ”‚

    โ”‚ Ingestion โ”‚ โ”‚ Pipeline โ”‚ โ”‚ Registry โ”‚

    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

    โ”‚ โ”‚

    v v

    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”

    โ”‚ Experiment โ”‚ โ”‚ Deployment โ”‚

    โ”‚ Tracking โ”‚ โ”‚ Pipeline โ”‚

    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

    โ”‚

    v

    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”

    โ”‚ Serving & โ”‚

    โ”‚ Monitoring โ”‚

    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

    Data Pipeline Implementation

    from prefect import flow, task
    import pandas as pd
    from great_expectations.core.batch import BatchRequest
    
    @task
    def ingest_data(source: str) -> pd.DataFrame:
        """Load raw data from source."""
        if source.startswith('s3://'):
            return pd.read_parquet(source)
        return pd.read_csv(source)
    
    @task
    def validate_data(df: pd.DataFrame, expectation_suite: str) -> pd.DataFrame:
        """Validate data against expectations."""
        context = ge.get_context()
    
        batch_request = BatchRequest(
            datasource_name="pandas_datasource",
            data_asset_name="input_data",
            batch_identifiers={"batch_id": "validation_batch"},
        )
    
        results = context.run_validation_operator(
            "action_list_operator",
            assets_to_validate=[batch_request],
            expectation_suite_name=expectation_suite,
        )
    
        if not results["success"]:
            raise ValueError("Data validation failed")
    
        return df
    
    @task
    def transform_features(df: pd.DataFrame) -> pd.DataFrame:
        """Apply feature engineering transformations."""
        df = df.copy()
    
        # Feature engineering
        df['log_amount'] = np.log1p(df['amount'])
        df['hour_of_day'] = pd.to_datetime(df['timestamp']).dt.hour
        df['is_weekend'] = pd.to_datetime(df['timestamp']).dt.dayofweek >= 5
    
        return df
    
    @flow
    def data_pipeline(source: str, expectation_suite: str) -> pd.DataFrame:
        """Complete data ingestion and transformation pipeline."""
        raw_data = ingest_data(source)
        validated_data = validate_data(raw_data, expectation_suite)
        features = transform_features(validated_data)
        return features

    How Do You Implement Model Versioning?

    MLflow Model Registry

    import mlflow
    from mlflow.tracking import MlflowClient
    
    def train_and_register_model(
        X_train: np.ndarray,
        y_train: np.ndarray,
        model_name: str,
        hyperparameters: dict
    ) -> str:
        """Train model and register in MLflow."""
    
        with mlflow.start_run() as run:
            # Log hyperparameters
            mlflow.log_params(hyperparameters)
    
            # Train model
            model = XGBClassifier(**hyperparameters)
            model.fit(X_train, y_train)
    
            # Log metrics
            predictions = model.predict(X_val)
            accuracy = accuracy_score(y_val, predictions)
            mlflow.log_metric("accuracy", accuracy)
    
            # Log model
            mlflow.xgboost.log_model(
                model,
                "model",
                registered_model_name=model_name,
                signature=infer_signature(X_train, predictions),
                input_example=X_train[:5]
            )
    
            return run.info.run_id
    
    def promote_model_to_production(model_name: str, version: int):
        """Promote a model version to production stage."""
        client = MlflowClient()
    
        # Archive current production model
        for mv in client.search_model_versions(f"name='{model_name}'"):
            if mv.current_stage == "Production":
                client.transition_model_version_stage(
                    name=model_name,
                    version=mv.version,
                    stage="Archived"
                )
    
        # Promote new version
        client.transition_model_version_stage(
            name=model_name,
            version=version,
            stage="Production"
        )

    DVC for Data Versioning

    # dvc.yaml
    stages:
      preprocess:
        cmd: python src/preprocess.py
        deps:
          - src/preprocess.py
          - data/raw/
        outs:
          - data/processed/
        params:
          - preprocess.sample_rate
          - preprocess.features
    
      train:
        cmd: python src/train.py
        deps:
          - src/train.py
          - data/processed/
        outs:
          - models/
        params:
          - train.n_estimators
          - train.learning_rate
        metrics:
          - metrics.json:
              cache: false

    How Do You Deploy Models to Production?

    Kubernetes Model Serving

    # kubernetes/model-deployment.yaml
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: fraud-detection-model
    spec:
      replicas: 3
      selector:
        matchLabels:
          app: fraud-detection
      template:
        metadata:
          labels:
            app: fraud-detection
        spec:
          containers:
          - name: model-server
            image: fraud-model:v1.2.0
            ports:
            - containerPort: 8080
            resources:
              requests:
                memory: "2Gi"
                cpu: "1"
              limits:
                memory: "4Gi"
                cpu: "2"
            livenessProbe:
              httpGet:
                path: /health
                port: 8080
              initialDelaySeconds: 30
              periodSeconds: 10
            readinessProbe:
              httpGet:
                path: /ready
                port: 8080
              initialDelaySeconds: 5
              periodSeconds: 5
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: fraud-detection-service
    spec:
      selector:
        app: fraud-detection
      ports:
      - port: 80
        targetPort: 8080
      type: ClusterIP

    FastAPI Model Server

    from fastapi import FastAPI, HTTPException
    from pydantic import BaseModel
    import mlflow
    import numpy as np
    
    app = FastAPI()
    
    # Load model on startup
    model = mlflow.pyfunc.load_model("models:/fraud-detection/Production")
    
    class PredictionRequest(BaseModel):
        features: list[float]
    
    class PredictionResponse(BaseModel):
        prediction: int
        probability: float
        model_version: str
    
    @app.post("/predict", response_model=PredictionResponse)
    async def predict(request: PredictionRequest):
        try:
            features = np.array(request.features).reshape(1, -1)
            prediction = model.predict(features)[0]
            probability = model.predict_proba(features)[0].max()
    
            return PredictionResponse(
                prediction=int(prediction),
                probability=float(probability),
                model_version=model.metadata.run_id
            )
        except Exception as e:
            raise HTTPException(status_code=500, detail=str(e))
    
    @app.get("/health")
    async def health():
        return {"status": "healthy"}

    How Do You Monitor Models in Production?

    Performance Monitoring

    from evidently import ColumnMapping
    from evidently.report import Report
    from evidently.metric_preset import DataDriftPreset, ClassificationPreset
    
    def generate_monitoring_report(
        reference_data: pd.DataFrame,
        current_data: pd.DataFrame,
        predictions: pd.DataFrame
    ) -> Report:
        """Generate model monitoring report."""
    
        column_mapping = ColumnMapping(
            prediction='prediction',
            target='actual',
            numerical_features=['amount', 'hour', 'day_of_week'],
            categorical_features=['merchant_category', 'country']
        )
    
        report = Report(metrics=[
            DataDriftPreset(),
            ClassificationPreset()
        ])
    
        report.run(
            reference_data=reference_data,
            current_data=current_data.join(predictions),
            column_mapping=column_mapping
        )
    
        return report
    
    def check_for_drift(report: Report) -> bool:
        """Check if data drift exceeds thresholds."""
        drift_detected = report.as_dict()['metrics'][0]['result']['dataset_drift']
        return drift_detected

    Alerting Setup

    from prometheus_client import Counter, Histogram, Gauge
    import logging
    
    # Metrics
    prediction_counter = Counter(
        'model_predictions_total',
        'Total number of predictions',
        ['model_name', 'version', 'prediction']
    )
    
    prediction_latency = Histogram(
        'model_prediction_latency_seconds',
        'Prediction latency in seconds',
        ['model_name']
    )
    
    data_drift_score = Gauge(
        'model_data_drift_score',
        'Current data drift score',
        ['model_name', 'feature']
    )
    
    def log_prediction(model_name: str, version: str, prediction: int, latency: float):
        """Log prediction metrics."""
        prediction_counter.labels(
            model_name=model_name,
            version=version,
            prediction=str(prediction)
        ).inc()
    
        prediction_latency.labels(model_name=model_name).observe(latency)

    What Are the Key MLOps Tools?

    Tool Stack Comparison

    | Category | Open Source | Managed |

    |----------|-------------|---------|

    | Experiment Tracking | MLflow, W&B | Comet, Neptune |

    | Orchestration | Prefect, Airflow | Kubeflow Pipelines |

    | Feature Store | Feast | Tecton, Databricks |

    | Model Serving | Seldon, BentoML | SageMaker, Vertex AI |

    | Monitoring | Evidently, Whylogs | Arize, Fiddler |

    MLOps is essential for bridging the gap between model development and production value. By implementing these practices, you can build reliable ML systems that deliver consistent business impact.

    Share this article:

    Need Help with AI & Machine Learning?

    Contact Agochar for a free consultation. Our experts can help you implement the concepts discussed in this article.

    Get Free Consultation