AI Engineer's Guide to MLOps and AI Infrastructure
Training pipelines, model serving, monitoring, and the infrastructure behind production AI
Why MLOps Is an AI Engineer’s Core Skill
You can train a great model in a notebook. But getting that model into production—reliably, at scale, with monitoring and retraining—requires MLOps. This is the discipline that bridges the gap between “it works on my laptop” and “it serves 10,000 requests per second with 99.9% uptime.”
The MLOps Lifecycle
Data Collection → Feature Engineering → Training → Evaluation → Deployment → Monitoring
↑ │
└──────────────────── Retraining Trigger ←──────────────────────────────────┘
Training Infrastructure
Experiment Tracking
Every training run should be tracked. No exceptions.
import mlflow
mlflow.set_experiment("ctr-prediction-v2")
with mlflow.start_run(run_name="dcn-v2-lr0.001"):
# Log parameters
mlflow.log_params({
"model": "DCN-v2",
"learning_rate": 0.001,
"batch_size": 4096,
"embedding_dim": 64,
"num_cross_layers": 3,
})
# Train
model = train_model(config)
# Log metrics
mlflow.log_metrics({
"auc_roc": 0.7845,
"log_loss": 0.4231,
"train_time_minutes": 45,
})
# Log model artifact
mlflow.pytorch.log_model(model, "model")
# Log dataset info
mlflow.log_input(
mlflow.data.from_pandas(train_df, name="training_data"),
)
Tools: MLflow, Weights & Biases, Neptune.ai, Comet ML
Distributed Training
When your model or data doesn’t fit on one GPU:
# PyTorch Distributed Data Parallel (DDP)
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def train_ddp(rank, world_size):
dist.init_process_group("nccl", rank=rank, world_size=world_size)
model = MyModel().to(rank)
model = DDP(model, device_ids=[rank])
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(num_epochs):
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = DataLoader(dataset, sampler=sampler, batch_size=batch_size)
for batch in dataloader:
loss = model(batch)
loss.backward() # Gradients automatically synchronized
optimizer.step()
optimizer.zero_grad()
# Launch: torchrun --nproc_per_node=4 train.py
When to use what:
| Strategy | Use Case | Framework |
|---|---|---|
| Data Parallel | Model fits on 1 GPU, data is large | PyTorch DDP |
| Model Parallel | Model doesn’t fit on 1 GPU | PyTorch FSDP, DeepSpeed |
| Pipeline Parallel | Very large models (100B+) | Megatron-LM, DeepSpeed |
| Fully Sharded (FSDP) | Large models on commodity GPUs | PyTorch FSDP |
GPU Management
# Monitor GPU usage
nvidia-smi --query-gpu=utilization.gpu,memory.used,memory.total --format=csv -l 1
# Common GPU issues:
# 1. OOM → Reduce batch size, use gradient accumulation, use mixed precision
# 2. Low utilization → Data loading bottleneck, increase num_workers
# 3. Memory fragmentation → Use memory-efficient attention, gradient checkpointing
# Mixed precision training (2x speed, 50% memory)
from torch.amp import autocast, GradScaler
scaler = GradScaler()
for batch in dataloader:
optimizer.zero_grad()
with autocast(device_type='cuda'):
output = model(batch)
loss = criterion(output, target)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
Model Serving
Serving Patterns
Pattern 1: Online Inference (Real-time)
Client → API Gateway → Model Server → Response
↓
Feature Store (real-time features)
Pattern 2: Batch Inference
Schedule → Data Pipeline → Model → Results to DB/Cache
Pattern 3: Streaming Inference
Event Stream (Kafka) → Model Service → Output Stream
Model Serving Frameworks
vLLM (LLM serving):
from vllm import LLM, SamplingParams
llm = LLM(
model="meta-llama/Llama-3.1-8B-Instruct",
tensor_parallel_size=2, # Use 2 GPUs
max_model_len=8192,
)
sampling_params = SamplingParams(temperature=0.7, max_tokens=512)
outputs = llm.generate(prompts, sampling_params)
Key vLLM features:
- PagedAttention: Efficient KV-cache management
- Continuous batching: Don’t wait for batch to fill
- Tensor parallelism: Split model across GPUs
- Quantization: AWQ, GPTQ, SqueezeLLM support
TorchServe (General PyTorch):
# handler.py
class ModelHandler:
def initialize(self, context):
self.model = load_model(context.system_properties["model_dir"])
def preprocess(self, data):
return transform(data)
def inference(self, data):
with torch.no_grad():
return self.model(data)
def postprocess(self, data):
return format_response(data)
Triton Inference Server (Multi-framework, high performance):
model_repository/
├── ensemble_model/
│ └── config.pbtxt
├── preprocessing/
│ ├── 1/model.py
│ └── config.pbtxt
├── main_model/
│ ├── 1/model.onnx
│ └── config.pbtxt
└── postprocessing/
├── 1/model.py
└── config.pbtxt
Model Optimization for Serving
# 1. ONNX Export (cross-framework, optimized runtime)
import torch.onnx
torch.onnx.export(
model,
dummy_input,
"model.onnx",
opset_version=17,
dynamic_axes={"input": {0: "batch_size"}}
)
# 2. Quantization (4x smaller, 2-4x faster)
from torch.quantization import quantize_dynamic
quantized_model = quantize_dynamic(
model,
{torch.nn.Linear},
dtype=torch.qint8
)
# 3. TensorRT (NVIDIA GPU optimization)
import tensorrt as trt
# Compile ONNX model to TensorRT engine for maximum GPU throughput
| Optimization | Speedup | Quality Loss | When to Use |
|---|---|---|---|
| ONNX Runtime | 1.5-3x | None | Always for non-LLM models |
| Dynamic Quantization | 2-4x | Minimal | CPU serving |
| Static Quantization | 2-4x | Small | When calibration data available |
| TensorRT | 3-10x | None-Minimal | NVIDIA GPU serving |
| Distillation | Variable | Small | When you have a teacher model |
CI/CD for ML
ML Pipeline with GitHub Actions
# .github/workflows/ml-pipeline.yml
name: ML Pipeline
on:
push:
paths: ['src/models/**', 'src/features/**', 'configs/**']
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run unit tests
run: pytest tests/unit/ -v
- name: Run data validation
run: python scripts/validate_data.py
- name: Run model tests
run: pytest tests/model/ -v
train:
needs: test
runs-on: [self-hosted, gpu]
steps:
- name: Train model
run: python train.py --config configs/production.yaml
- name: Evaluate model
run: python evaluate.py --model outputs/model.pt
- name: Check quality gates
run: |
python scripts/quality_gate.py \
--min-auc 0.78 \
--max-latency-p99 50ms \
--max-model-size 500MB
deploy:
needs: train
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy canary (10% traffic)
run: kubectl apply -f k8s/canary-deployment.yaml
- name: Monitor canary (30 min)
run: python scripts/monitor_canary.py --duration 1800
- name: Promote to full deployment
run: kubectl apply -f k8s/full-deployment.yaml
Model Registry
# Register model with metadata
import mlflow
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(
model_uri=model_uri,
name="ctr-prediction",
tags={
"dataset_version": "2025-04-10",
"auc_roc": "0.7845",
"training_time": "45min",
"approved_by": "ml-team",
}
)
# Promote model through stages
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="ctr-prediction",
version=5,
stage="Production",
)
Monitoring and Observability
What to Monitor
class ModelMonitor:
def __init__(self):
self.metrics = PrometheusMetrics()
def log_prediction(self, request, response, latency_ms):
# Operational metrics
self.metrics.latency.observe(latency_ms)
self.metrics.requests.inc()
# Model quality metrics
self.metrics.prediction_distribution.observe(response.score)
# Feature drift detection
for feature_name, value in request.features.items():
self.metrics.feature_value.labels(feature=feature_name).observe(value)
def check_data_drift(self, current_features, reference_features):
"""Detect if input distribution has shifted"""
from scipy.stats import ks_2samp
drift_report = {}
for feature in current_features.columns:
stat, p_value = ks_2samp(
reference_features[feature],
current_features[feature]
)
drift_report[feature] = {
"statistic": stat,
"p_value": p_value,
"drifted": p_value < 0.05,
}
return drift_report
def check_prediction_drift(self, current_predictions, reference_predictions):
"""Detect if model outputs have shifted"""
current_mean = np.mean(current_predictions)
reference_mean = np.mean(reference_predictions)
relative_change = abs(current_mean - reference_mean) / reference_mean
if relative_change > 0.1: # >10% shift
self.alert("Prediction drift detected", relative_change)
Monitoring Dashboard Essentials
Real-time Panel:
├── Requests/sec (throughput)
├── Latency p50/p95/p99
├── Error rate
├── GPU utilization
└── Queue depth
Model Quality Panel:
├── Prediction score distribution (histogram)
├── Feature value distributions (vs baseline)
├── Data drift score per feature
├── Online metric (CTR, conversion) with confidence intervals
└── A/B test comparison
Alerts:
├── Latency p99 > 200ms for 5 minutes
├── Error rate > 1% for 2 minutes
├── Data drift detected (KS test p < 0.01)
├── Prediction distribution shift > 10%
└── GPU memory > 90%
Retraining Triggers
class RetrainingOrchestrator:
def should_retrain(self) -> tuple[bool, str]:
# Time-based
if self.days_since_last_training() > 7:
return True, "Scheduled weekly retraining"
# Performance-based
online_auc = self.get_online_auc(window="24h")
if online_auc < self.performance_threshold:
return True, f"Performance degraded: AUC={online_auc}"
# Drift-based
drift_score = self.get_drift_score()
if drift_score > self.drift_threshold:
return True, f"Data drift detected: score={drift_score}"
# Volume-based
new_samples = self.count_new_training_samples()
if new_samples > self.sample_threshold:
return True, f"New data available: {new_samples} samples"
return False, "No retraining needed"
Infrastructure Patterns
Kubernetes for ML
# Model serving deployment with GPU
apiVersion: apps/v1
kind: Deployment
metadata:
name: model-serving
spec:
replicas: 3
template:
spec:
containers:
- name: model-server
image: model-server:v1.2
resources:
limits:
nvidia.com/gpu: 1
memory: "16Gi"
requests:
memory: "8Gi"
ports:
- containerPort: 8080
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
env:
- name: MODEL_PATH
value: "s3://models/ctr-v5/model.pt"
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: model-serving-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: model-serving
minReplicas: 2
maxReplicas: 10
metrics:
- type: Pods
pods:
metric:
name: gpu_utilization
target:
type: AverageValue
averageValue: "70"
Cost Optimization
| Strategy | Savings | Trade-off |
|---|---|---|
| Spot/preemptible instances for training | 60-80% | May be interrupted |
| Model quantization | 50-75% GPU cost | Slight quality loss |
| Batch inference where possible | 80%+ vs real-time | Higher latency |
| Auto-scaling based on traffic | 30-50% | Cold start latency |
| Model distillation | 50-90% compute | Requires teacher model |
| Caching frequent predictions | Variable | Stale predictions |
Takeaways
- Track every experiment—you will need to reproduce results months later
- Start with simple serving (FastAPI + single GPU) and add complexity as traffic demands
- Monitor data drift, not just model accuracy—drift is the leading cause of model degradation
- Automate retraining but keep humans in the loop for deployment approval
- Optimize serving costs with quantization and caching before scaling hardware
- Use feature stores—the investment pays off quickly when you have multiple models
- Version everything: data, features, models, and configs