8 min read

AI Engineer's Guide to MLOps and AI Infrastructure

Training pipelines, model serving, monitoring, and the infrastructure behind production AI

Why MLOps Is an AI Engineer’s Core Skill

You can train a great model in a notebook. But getting that model into production—reliably, at scale, with monitoring and retraining—requires MLOps. This is the discipline that bridges the gap between “it works on my laptop” and “it serves 10,000 requests per second with 99.9% uptime.”

The MLOps Lifecycle

Data Collection → Feature Engineering → Training → Evaluation → Deployment → Monitoring
      ↑                                                                          │
      └──────────────────── Retraining Trigger ←──────────────────────────────────┘

Training Infrastructure

Experiment Tracking

Every training run should be tracked. No exceptions.

import mlflow

mlflow.set_experiment("ctr-prediction-v2")

with mlflow.start_run(run_name="dcn-v2-lr0.001"):
    # Log parameters
    mlflow.log_params({
        "model": "DCN-v2",
        "learning_rate": 0.001,
        "batch_size": 4096,
        "embedding_dim": 64,
        "num_cross_layers": 3,
    })

    # Train
    model = train_model(config)

    # Log metrics
    mlflow.log_metrics({
        "auc_roc": 0.7845,
        "log_loss": 0.4231,
        "train_time_minutes": 45,
    })

    # Log model artifact
    mlflow.pytorch.log_model(model, "model")

    # Log dataset info
    mlflow.log_input(
        mlflow.data.from_pandas(train_df, name="training_data"),
    )

Tools: MLflow, Weights & Biases, Neptune.ai, Comet ML

Distributed Training

When your model or data doesn’t fit on one GPU:

# PyTorch Distributed Data Parallel (DDP)
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

def train_ddp(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

    model = MyModel().to(rank)
    model = DDP(model, device_ids=[rank])

    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(num_epochs):
        sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
        dataloader = DataLoader(dataset, sampler=sampler, batch_size=batch_size)

        for batch in dataloader:
            loss = model(batch)
            loss.backward()  # Gradients automatically synchronized
            optimizer.step()
            optimizer.zero_grad()

# Launch: torchrun --nproc_per_node=4 train.py

When to use what:

StrategyUse CaseFramework
Data ParallelModel fits on 1 GPU, data is largePyTorch DDP
Model ParallelModel doesn’t fit on 1 GPUPyTorch FSDP, DeepSpeed
Pipeline ParallelVery large models (100B+)Megatron-LM, DeepSpeed
Fully Sharded (FSDP)Large models on commodity GPUsPyTorch FSDP

GPU Management

# Monitor GPU usage
nvidia-smi --query-gpu=utilization.gpu,memory.used,memory.total --format=csv -l 1

# Common GPU issues:
# 1. OOM → Reduce batch size, use gradient accumulation, use mixed precision
# 2. Low utilization → Data loading bottleneck, increase num_workers
# 3. Memory fragmentation → Use memory-efficient attention, gradient checkpointing
# Mixed precision training (2x speed, 50% memory)
from torch.amp import autocast, GradScaler

scaler = GradScaler()

for batch in dataloader:
    optimizer.zero_grad()
    with autocast(device_type='cuda'):
        output = model(batch)
        loss = criterion(output, target)

    scaler.scale(loss).backward()
    scaler.step(optimizer)
    scaler.update()

Model Serving

Serving Patterns

Pattern 1: Online Inference (Real-time)

Client → API Gateway → Model Server → Response

                     Feature Store (real-time features)

Pattern 2: Batch Inference

Schedule → Data Pipeline → Model → Results to DB/Cache

Pattern 3: Streaming Inference

Event Stream (Kafka) → Model Service → Output Stream

Model Serving Frameworks

vLLM (LLM serving):

from vllm import LLM, SamplingParams

llm = LLM(
    model="meta-llama/Llama-3.1-8B-Instruct",
    tensor_parallel_size=2,  # Use 2 GPUs
    max_model_len=8192,
)

sampling_params = SamplingParams(temperature=0.7, max_tokens=512)
outputs = llm.generate(prompts, sampling_params)

Key vLLM features:

  • PagedAttention: Efficient KV-cache management
  • Continuous batching: Don’t wait for batch to fill
  • Tensor parallelism: Split model across GPUs
  • Quantization: AWQ, GPTQ, SqueezeLLM support

TorchServe (General PyTorch):

# handler.py
class ModelHandler:
    def initialize(self, context):
        self.model = load_model(context.system_properties["model_dir"])

    def preprocess(self, data):
        return transform(data)

    def inference(self, data):
        with torch.no_grad():
            return self.model(data)

    def postprocess(self, data):
        return format_response(data)

Triton Inference Server (Multi-framework, high performance):

model_repository/
├── ensemble_model/
│   └── config.pbtxt
├── preprocessing/
│   ├── 1/model.py
│   └── config.pbtxt
├── main_model/
│   ├── 1/model.onnx
│   └── config.pbtxt
└── postprocessing/
    ├── 1/model.py
    └── config.pbtxt

Model Optimization for Serving

# 1. ONNX Export (cross-framework, optimized runtime)
import torch.onnx

torch.onnx.export(
    model,
    dummy_input,
    "model.onnx",
    opset_version=17,
    dynamic_axes={"input": {0: "batch_size"}}
)

# 2. Quantization (4x smaller, 2-4x faster)
from torch.quantization import quantize_dynamic

quantized_model = quantize_dynamic(
    model,
    {torch.nn.Linear},
    dtype=torch.qint8
)

# 3. TensorRT (NVIDIA GPU optimization)
import tensorrt as trt
# Compile ONNX model to TensorRT engine for maximum GPU throughput
OptimizationSpeedupQuality LossWhen to Use
ONNX Runtime1.5-3xNoneAlways for non-LLM models
Dynamic Quantization2-4xMinimalCPU serving
Static Quantization2-4xSmallWhen calibration data available
TensorRT3-10xNone-MinimalNVIDIA GPU serving
DistillationVariableSmallWhen you have a teacher model

CI/CD for ML

ML Pipeline with GitHub Actions

# .github/workflows/ml-pipeline.yml
name: ML Pipeline

on:
  push:
    paths: ['src/models/**', 'src/features/**', 'configs/**']

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Run unit tests
        run: pytest tests/unit/ -v
      - name: Run data validation
        run: python scripts/validate_data.py
      - name: Run model tests
        run: pytest tests/model/ -v

  train:
    needs: test
    runs-on: [self-hosted, gpu]
    steps:
      - name: Train model
        run: python train.py --config configs/production.yaml
      - name: Evaluate model
        run: python evaluate.py --model outputs/model.pt
      - name: Check quality gates
        run: |
          python scripts/quality_gate.py \
            --min-auc 0.78 \
            --max-latency-p99 50ms \
            --max-model-size 500MB

  deploy:
    needs: train
    if: github.ref == 'refs/heads/main'
    steps:
      - name: Deploy canary (10% traffic)
        run: kubectl apply -f k8s/canary-deployment.yaml
      - name: Monitor canary (30 min)
        run: python scripts/monitor_canary.py --duration 1800
      - name: Promote to full deployment
        run: kubectl apply -f k8s/full-deployment.yaml

Model Registry

# Register model with metadata
import mlflow

model_uri = f"runs:/{run_id}/model"

mlflow.register_model(
    model_uri=model_uri,
    name="ctr-prediction",
    tags={
        "dataset_version": "2025-04-10",
        "auc_roc": "0.7845",
        "training_time": "45min",
        "approved_by": "ml-team",
    }
)

# Promote model through stages
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
    name="ctr-prediction",
    version=5,
    stage="Production",
)

Monitoring and Observability

What to Monitor

class ModelMonitor:
    def __init__(self):
        self.metrics = PrometheusMetrics()

    def log_prediction(self, request, response, latency_ms):
        # Operational metrics
        self.metrics.latency.observe(latency_ms)
        self.metrics.requests.inc()

        # Model quality metrics
        self.metrics.prediction_distribution.observe(response.score)

        # Feature drift detection
        for feature_name, value in request.features.items():
            self.metrics.feature_value.labels(feature=feature_name).observe(value)

    def check_data_drift(self, current_features, reference_features):
        """Detect if input distribution has shifted"""
        from scipy.stats import ks_2samp

        drift_report = {}
        for feature in current_features.columns:
            stat, p_value = ks_2samp(
                reference_features[feature],
                current_features[feature]
            )
            drift_report[feature] = {
                "statistic": stat,
                "p_value": p_value,
                "drifted": p_value < 0.05,
            }
        return drift_report

    def check_prediction_drift(self, current_predictions, reference_predictions):
        """Detect if model outputs have shifted"""
        current_mean = np.mean(current_predictions)
        reference_mean = np.mean(reference_predictions)
        relative_change = abs(current_mean - reference_mean) / reference_mean

        if relative_change > 0.1:  # >10% shift
            self.alert("Prediction drift detected", relative_change)

Monitoring Dashboard Essentials

Real-time Panel:
├── Requests/sec (throughput)
├── Latency p50/p95/p99
├── Error rate
├── GPU utilization
└── Queue depth

Model Quality Panel:
├── Prediction score distribution (histogram)
├── Feature value distributions (vs baseline)
├── Data drift score per feature
├── Online metric (CTR, conversion) with confidence intervals
└── A/B test comparison

Alerts:
├── Latency p99 > 200ms for 5 minutes
├── Error rate > 1% for 2 minutes
├── Data drift detected (KS test p < 0.01)
├── Prediction distribution shift > 10%
└── GPU memory > 90%

Retraining Triggers

class RetrainingOrchestrator:
    def should_retrain(self) -> tuple[bool, str]:
        # Time-based
        if self.days_since_last_training() > 7:
            return True, "Scheduled weekly retraining"

        # Performance-based
        online_auc = self.get_online_auc(window="24h")
        if online_auc < self.performance_threshold:
            return True, f"Performance degraded: AUC={online_auc}"

        # Drift-based
        drift_score = self.get_drift_score()
        if drift_score > self.drift_threshold:
            return True, f"Data drift detected: score={drift_score}"

        # Volume-based
        new_samples = self.count_new_training_samples()
        if new_samples > self.sample_threshold:
            return True, f"New data available: {new_samples} samples"

        return False, "No retraining needed"

Infrastructure Patterns

Kubernetes for ML

# Model serving deployment with GPU
apiVersion: apps/v1
kind: Deployment
metadata:
  name: model-serving
spec:
  replicas: 3
  template:
    spec:
      containers:
        - name: model-server
          image: model-server:v1.2
          resources:
            limits:
              nvidia.com/gpu: 1
              memory: "16Gi"
            requests:
              memory: "8Gi"
          ports:
            - containerPort: 8080
          readinessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 30
          env:
            - name: MODEL_PATH
              value: "s3://models/ctr-v5/model.pt"
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: model-serving-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: model-serving
  minReplicas: 2
  maxReplicas: 10
  metrics:
    - type: Pods
      pods:
        metric:
          name: gpu_utilization
        target:
          type: AverageValue
          averageValue: "70"

Cost Optimization

StrategySavingsTrade-off
Spot/preemptible instances for training60-80%May be interrupted
Model quantization50-75% GPU costSlight quality loss
Batch inference where possible80%+ vs real-timeHigher latency
Auto-scaling based on traffic30-50%Cold start latency
Model distillation50-90% computeRequires teacher model
Caching frequent predictionsVariableStale predictions

Takeaways

  1. Track every experiment—you will need to reproduce results months later
  2. Start with simple serving (FastAPI + single GPU) and add complexity as traffic demands
  3. Monitor data drift, not just model accuracy—drift is the leading cause of model degradation
  4. Automate retraining but keep humans in the loop for deployment approval
  5. Optimize serving costs with quantization and caching before scaling hardware
  6. Use feature stores—the investment pays off quickly when you have multiple models
  7. Version everything: data, features, models, and configs