AI Engineer's Guide to Data Engineering
Feature stores, data pipelines, streaming, and batch processing for AI systems
Why AI Engineers Need Data Engineering Skills
The most common bottleneck in AI projects isn’t the model—it’s the data. Models are only as good as the data flowing into them, and getting clean, timely, and correctly-featured data to your models in production is a data engineering problem.
As an AI engineer, you don’t need to become a full-time data engineer, but you need to understand the fundamentals deeply enough to design systems that work and communicate effectively with data teams.
The Modern Data Stack for AI
Data Sources → Ingestion → Storage → Processing → Feature Store → Model
↓
Event Stream → Stream Processing → Real-time Features → Serving
Batch vs. Streaming
| Aspect | Batch | Streaming |
|---|---|---|
| Latency | Minutes to hours | Milliseconds to seconds |
| Processing | Complete datasets | Individual events |
| Tools | Spark, dbt, Airflow | Kafka, Flink, Spark Streaming |
| Use Cases | Training data, reports | Real-time features, alerts |
| Cost | Lower | Higher |
| Complexity | Lower | Higher |
Feature Engineering and Feature Stores
What Is a Feature Store?
A feature store is a centralized system for managing, storing, and serving ML features. It solves the critical problem of training-serving skew: ensuring the features used during training are identical to those used during inference.
┌─────────────────┐
│ Feature Store │
├─────────────────┤
Batch Pipeline → │ Offline Store │ → Training Data
│ (data warehouse)│
├─────────────────┤
Stream Pipeline → │ Online Store │ → Real-time Serving
│ (Redis/DynamoDB)│
├─────────────────┤
│ Feature Registry│ → Discovery & Governance
│ (metadata) │
└─────────────────┘
Feast (Open-Source Feature Store)
# feature_repo/features.py
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from datetime import timedelta
# Define data source
user_stats_source = FileSource(
path="s3://data/user_stats.parquet",
timestamp_field="event_timestamp",
)
# Define entity
user = Entity(
name="user_id",
value_type=ValueType.STRING,
description="User identifier",
)
# Define feature view
user_stats_fv = FeatureView(
name="user_stats",
entities=[user],
ttl=timedelta(days=1),
schema=[
Feature(name="total_purchases", dtype=ValueType.INT64),
Feature(name="avg_order_value", dtype=ValueType.FLOAT),
Feature(name="days_since_last_purchase", dtype=ValueType.INT64),
Feature(name="favorite_category", dtype=ValueType.STRING),
],
source=user_stats_source,
)
# Serving: get features for inference
from feast import FeatureStore
store = FeatureStore(repo_path="feature_repo/")
# Online: single user, real-time
features = store.get_online_features(
features=[
"user_stats:total_purchases",
"user_stats:avg_order_value",
"user_stats:days_since_last_purchase",
],
entity_rows=[{"user_id": "user_123"}],
).to_dict()
# Offline: batch for training
training_df = store.get_historical_features(
entity_df=entity_df, # DataFrame with user_id and timestamp
features=[
"user_stats:total_purchases",
"user_stats:avg_order_value",
],
).to_df()
Feature Engineering Patterns
Temporal Features (most predictive, most error-prone):
def compute_temporal_features(events_df, reference_time):
"""Compute time-windowed aggregations."""
windows = [1, 7, 30, 90] # days
features = {}
for window in windows:
cutoff = reference_time - timedelta(days=window)
window_df = events_df[events_df["timestamp"] >= cutoff]
features[f"count_{window}d"] = len(window_df)
features[f"sum_{window}d"] = window_df["amount"].sum()
features[f"avg_{window}d"] = window_df["amount"].mean()
features[f"std_{window}d"] = window_df["amount"].std()
features[f"unique_categories_{window}d"] = window_df["category"].nunique()
# Ratios (trend detection)
if features["count_7d"] > 0:
features["trend_7d_30d"] = features["count_7d"] / max(features["count_30d"] / 4.28, 1)
return features
Embedding Features:
def compute_embedding_features(user_history: list[str], item_catalog: dict):
"""Represent user as average of interacted item embeddings."""
item_embeddings = [item_catalog[item_id].embedding for item_id in user_history
if item_id in item_catalog]
if not item_embeddings:
return np.zeros(embedding_dim)
# Weighted average: recent items count more
weights = np.exp(np.linspace(-1, 0, len(item_embeddings)))
weights /= weights.sum()
return np.average(item_embeddings, axis=0, weights=weights)
Cross Features:
def compute_cross_features(user_features: dict, item_features: dict):
"""Feature interactions that capture user-item affinity."""
return {
"user_age_x_item_category": f"{user_features['age_bucket']}_{item_features['category']}",
"user_device_x_item_media": f"{user_features['device']}_{item_features['media_type']}",
"price_vs_avg_spend": item_features["price"] / max(user_features["avg_spend"], 0.01),
"category_match": int(item_features["category"] in user_features["favorite_categories"]),
}
Data Pipelines
Apache Airflow (Batch Orchestration)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
"owner": "ml-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
"ml_feature_pipeline",
default_args=default_args,
schedule_interval="@daily",
start_date=datetime(2025, 1, 1),
catchup=False,
) as dag:
extract = PythonOperator(
task_id="extract_raw_data",
python_callable=extract_from_sources,
)
validate = PythonOperator(
task_id="validate_data",
python_callable=run_data_validation,
)
transform = PythonOperator(
task_id="compute_features",
python_callable=compute_features,
)
load = PythonOperator(
task_id="load_to_feature_store",
python_callable=load_features,
)
test = PythonOperator(
task_id="test_feature_quality",
python_callable=run_quality_tests,
)
extract >> validate >> transform >> load >> test
Apache Kafka (Event Streaming)
# Producer: send events
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=["kafka:9092"],
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
def emit_click_event(user_id: str, item_id: str, context: dict):
event = {
"event_type": "click",
"user_id": user_id,
"item_id": item_id,
"timestamp": datetime.utcnow().isoformat(),
"context": context,
}
producer.send("user-events", value=event)
# Consumer: process events for real-time features
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"user-events",
bootstrap_servers=["kafka:9092"],
group_id="feature-computation",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
)
for message in consumer:
event = message.value
if event["event_type"] == "click":
update_realtime_features(event)
Apache Flink (Stream Processing)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Real-time feature computation
t_env.execute_sql("""
CREATE TABLE user_events (
user_id STRING,
item_id STRING,
event_type STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
""")
# Sliding window aggregation
t_env.execute_sql("""
SELECT
user_id,
COUNT(*) AS click_count_5min,
COUNT(DISTINCT item_id) AS unique_items_5min,
TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end
FROM user_events
WHERE event_type = 'click'
GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTE)
""")
Data Quality for ML
Data Validation
import great_expectations as gx
context = gx.get_context()
# Define expectations for training data
validator = context.sources.pandas_default.read_dataframe(training_df)
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("age", min_value=13, max_value=120)
validator.expect_column_values_to_be_in_set("country", ["US", "UK", "KR", "JP", "DE"])
validator.expect_column_mean_to_be_between("click_rate", min_value=0.01, max_value=0.15)
validator.expect_column_proportion_of_unique_values_to_be_between("user_id", min_value=0.95)
# Run validation
results = validator.validate()
if not results.success:
raise DataQualityError(f"Data validation failed: {results}")
Training-Serving Skew Detection
The #1 bug in production ML: features computed differently in training vs. serving.
class SkewDetector:
def __init__(self, reference_stats: dict):
self.reference = reference_stats
def check(self, serving_features: dict) -> list[str]:
issues = []
for feature_name, value in serving_features.items():
ref = self.reference.get(feature_name)
if ref is None:
issues.append(f"Unknown feature: {feature_name}")
continue
# Check range
if value < ref["min"] or value > ref["max"]:
issues.append(f"{feature_name}={value} outside training range [{ref['min']}, {ref['max']}]")
# Check type
if type(value).__name__ != ref["dtype"]:
issues.append(f"{feature_name} type mismatch: {type(value).__name__} vs {ref['dtype']}")
# Check for missing features
for expected in self.reference:
if expected not in serving_features:
issues.append(f"Missing feature: {expected}")
return issues
Common Data Quality Issues in ML
| Issue | Impact | Detection |
|---|---|---|
| Label leakage | Impossibly good metrics | Feature importance analysis |
| Time leakage | Overly optimistic evaluation | Check temporal ordering |
| Selection bias | Model doesn’t generalize | Compare train/prod distributions |
| Missing values | Training-serving skew | Monitor null rates |
| Stale features | Degraded accuracy | Monitor feature freshness |
| Duplicate data | Biased model | Dedup analysis |
Data Formats for ML
Parquet (Columnar Storage)
import pyarrow.parquet as pq
# Write training data
table = pa.Table.from_pandas(df)
pq.write_table(
table,
"training_data.parquet",
compression="snappy",
row_group_size=100_000,
)
# Read only needed columns (huge speedup)
df = pq.read_table(
"training_data.parquet",
columns=["user_id", "feature_1", "feature_2", "label"],
).to_pandas()
Delta Lake / Iceberg (Table Formats)
# Delta Lake: ACID transactions + time travel for ML data
from delta import DeltaTable
# Write with versioning
df.write.format("delta").save("s3://data/features")
# Time travel: load training data as of a specific date
df_v2 = spark.read.format("delta") \
.option("timestampAsOf", "2025-04-01") \
.load("s3://data/features")
# Audit: what changed between versions?
dt = DeltaTable.forPath(spark, "s3://data/features")
history = dt.history()
Practical Architecture: End-to-End Example
User Actions (clicks, views, purchases)
│
├── Kafka (event stream)
│ │
│ ├── Flink (real-time features)
│ │ └── Redis (online feature store)
│ │
│ └── S3 (raw event archive)
│ │
│ └── Spark (daily batch job)
│ ├── Delta Lake (feature tables)
│ └── Feature Store (offline: for training)
│
├── Training Pipeline (Airflow)
│ ├── Read from Feature Store (point-in-time join)
│ ├── Train model
│ ├── Evaluate against baseline
│ └── Register model (MLflow)
│
└── Serving Pipeline
├── Model Server (vLLM / TorchServe)
├── Feature Server (online features from Redis)
└── Monitoring (Prometheus + Grafana)
Takeaways
- Feature stores aren’t optional in production ML—they prevent training-serving skew
- Start with batch pipelines, add streaming only when latency requirements demand it
- Data validation is as important as model testing—bad data silently kills accuracy
- Use Parquet for all ML data storage—columnar format is 10x faster for feature reads
- Log everything: raw events, computed features, model predictions, and outcomes
- Point-in-time correct joins are essential—future data leaking into training is the most common ML bug
- Design for reproducibility: given a timestamp, you should be able to reconstruct exactly what features the model saw