7 min read

AI Engineer's Guide to Data Engineering

Feature stores, data pipelines, streaming, and batch processing for AI systems

Why AI Engineers Need Data Engineering Skills

The most common bottleneck in AI projects isn’t the model—it’s the data. Models are only as good as the data flowing into them, and getting clean, timely, and correctly-featured data to your models in production is a data engineering problem.

As an AI engineer, you don’t need to become a full-time data engineer, but you need to understand the fundamentals deeply enough to design systems that work and communicate effectively with data teams.

The Modern Data Stack for AI

Data Sources → Ingestion → Storage → Processing → Feature Store → Model

              Event Stream → Stream Processing → Real-time Features → Serving

Batch vs. Streaming

AspectBatchStreaming
LatencyMinutes to hoursMilliseconds to seconds
ProcessingComplete datasetsIndividual events
ToolsSpark, dbt, AirflowKafka, Flink, Spark Streaming
Use CasesTraining data, reportsReal-time features, alerts
CostLowerHigher
ComplexityLowerHigher

Feature Engineering and Feature Stores

What Is a Feature Store?

A feature store is a centralized system for managing, storing, and serving ML features. It solves the critical problem of training-serving skew: ensuring the features used during training are identical to those used during inference.

                    ┌─────────────────┐
                    │  Feature Store   │
                    ├─────────────────┤
   Batch Pipeline → │ Offline Store   │ → Training Data
                    │ (data warehouse)│
                    ├─────────────────┤
Stream Pipeline →  │ Online Store    │ → Real-time Serving
                    │ (Redis/DynamoDB)│
                    ├─────────────────┤
                    │ Feature Registry│ → Discovery & Governance
                    │ (metadata)      │
                    └─────────────────┘

Feast (Open-Source Feature Store)

# feature_repo/features.py
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from datetime import timedelta

# Define data source
user_stats_source = FileSource(
    path="s3://data/user_stats.parquet",
    timestamp_field="event_timestamp",
)

# Define entity
user = Entity(
    name="user_id",
    value_type=ValueType.STRING,
    description="User identifier",
)

# Define feature view
user_stats_fv = FeatureView(
    name="user_stats",
    entities=[user],
    ttl=timedelta(days=1),
    schema=[
        Feature(name="total_purchases", dtype=ValueType.INT64),
        Feature(name="avg_order_value", dtype=ValueType.FLOAT),
        Feature(name="days_since_last_purchase", dtype=ValueType.INT64),
        Feature(name="favorite_category", dtype=ValueType.STRING),
    ],
    source=user_stats_source,
)

# Serving: get features for inference
from feast import FeatureStore

store = FeatureStore(repo_path="feature_repo/")

# Online: single user, real-time
features = store.get_online_features(
    features=[
        "user_stats:total_purchases",
        "user_stats:avg_order_value",
        "user_stats:days_since_last_purchase",
    ],
    entity_rows=[{"user_id": "user_123"}],
).to_dict()

# Offline: batch for training
training_df = store.get_historical_features(
    entity_df=entity_df,  # DataFrame with user_id and timestamp
    features=[
        "user_stats:total_purchases",
        "user_stats:avg_order_value",
    ],
).to_df()

Feature Engineering Patterns

Temporal Features (most predictive, most error-prone):

def compute_temporal_features(events_df, reference_time):
    """Compute time-windowed aggregations."""
    windows = [1, 7, 30, 90]  # days

    features = {}
    for window in windows:
        cutoff = reference_time - timedelta(days=window)
        window_df = events_df[events_df["timestamp"] >= cutoff]

        features[f"count_{window}d"] = len(window_df)
        features[f"sum_{window}d"] = window_df["amount"].sum()
        features[f"avg_{window}d"] = window_df["amount"].mean()
        features[f"std_{window}d"] = window_df["amount"].std()
        features[f"unique_categories_{window}d"] = window_df["category"].nunique()

    # Ratios (trend detection)
    if features["count_7d"] > 0:
        features["trend_7d_30d"] = features["count_7d"] / max(features["count_30d"] / 4.28, 1)

    return features

Embedding Features:

def compute_embedding_features(user_history: list[str], item_catalog: dict):
    """Represent user as average of interacted item embeddings."""
    item_embeddings = [item_catalog[item_id].embedding for item_id in user_history
                       if item_id in item_catalog]
    if not item_embeddings:
        return np.zeros(embedding_dim)

    # Weighted average: recent items count more
    weights = np.exp(np.linspace(-1, 0, len(item_embeddings)))
    weights /= weights.sum()

    return np.average(item_embeddings, axis=0, weights=weights)

Cross Features:

def compute_cross_features(user_features: dict, item_features: dict):
    """Feature interactions that capture user-item affinity."""
    return {
        "user_age_x_item_category": f"{user_features['age_bucket']}_{item_features['category']}",
        "user_device_x_item_media": f"{user_features['device']}_{item_features['media_type']}",
        "price_vs_avg_spend": item_features["price"] / max(user_features["avg_spend"], 0.01),
        "category_match": int(item_features["category"] in user_features["favorite_categories"]),
    }

Data Pipelines

Apache Airflow (Batch Orchestration)

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "ml-team",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    "ml_feature_pipeline",
    default_args=default_args,
    schedule_interval="@daily",
    start_date=datetime(2025, 1, 1),
    catchup=False,
) as dag:

    extract = PythonOperator(
        task_id="extract_raw_data",
        python_callable=extract_from_sources,
    )

    validate = PythonOperator(
        task_id="validate_data",
        python_callable=run_data_validation,
    )

    transform = PythonOperator(
        task_id="compute_features",
        python_callable=compute_features,
    )

    load = PythonOperator(
        task_id="load_to_feature_store",
        python_callable=load_features,
    )

    test = PythonOperator(
        task_id="test_feature_quality",
        python_callable=run_quality_tests,
    )

    extract >> validate >> transform >> load >> test

Apache Kafka (Event Streaming)

# Producer: send events
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=["kafka:9092"],
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)

def emit_click_event(user_id: str, item_id: str, context: dict):
    event = {
        "event_type": "click",
        "user_id": user_id,
        "item_id": item_id,
        "timestamp": datetime.utcnow().isoformat(),
        "context": context,
    }
    producer.send("user-events", value=event)

# Consumer: process events for real-time features
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "user-events",
    bootstrap_servers=["kafka:9092"],
    group_id="feature-computation",
    value_deserializer=lambda m: json.loads(m.decode("utf-8")),
)

for message in consumer:
    event = message.value
    if event["event_type"] == "click":
        update_realtime_features(event)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

# Real-time feature computation
t_env.execute_sql("""
    CREATE TABLE user_events (
        user_id STRING,
        item_id STRING,
        event_type STRING,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'user-events',
        'properties.bootstrap.servers' = 'kafka:9092',
        'format' = 'json'
    )
""")

# Sliding window aggregation
t_env.execute_sql("""
    SELECT
        user_id,
        COUNT(*) AS click_count_5min,
        COUNT(DISTINCT item_id) AS unique_items_5min,
        TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end
    FROM user_events
    WHERE event_type = 'click'
    GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTE)
""")

Data Quality for ML

Data Validation

import great_expectations as gx

context = gx.get_context()

# Define expectations for training data
validator = context.sources.pandas_default.read_dataframe(training_df)

validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_between("age", min_value=13, max_value=120)
validator.expect_column_values_to_be_in_set("country", ["US", "UK", "KR", "JP", "DE"])
validator.expect_column_mean_to_be_between("click_rate", min_value=0.01, max_value=0.15)
validator.expect_column_proportion_of_unique_values_to_be_between("user_id", min_value=0.95)

# Run validation
results = validator.validate()
if not results.success:
    raise DataQualityError(f"Data validation failed: {results}")

Training-Serving Skew Detection

The #1 bug in production ML: features computed differently in training vs. serving.

class SkewDetector:
    def __init__(self, reference_stats: dict):
        self.reference = reference_stats

    def check(self, serving_features: dict) -> list[str]:
        issues = []
        for feature_name, value in serving_features.items():
            ref = self.reference.get(feature_name)
            if ref is None:
                issues.append(f"Unknown feature: {feature_name}")
                continue

            # Check range
            if value < ref["min"] or value > ref["max"]:
                issues.append(f"{feature_name}={value} outside training range [{ref['min']}, {ref['max']}]")

            # Check type
            if type(value).__name__ != ref["dtype"]:
                issues.append(f"{feature_name} type mismatch: {type(value).__name__} vs {ref['dtype']}")

        # Check for missing features
        for expected in self.reference:
            if expected not in serving_features:
                issues.append(f"Missing feature: {expected}")

        return issues

Common Data Quality Issues in ML

IssueImpactDetection
Label leakageImpossibly good metricsFeature importance analysis
Time leakageOverly optimistic evaluationCheck temporal ordering
Selection biasModel doesn’t generalizeCompare train/prod distributions
Missing valuesTraining-serving skewMonitor null rates
Stale featuresDegraded accuracyMonitor feature freshness
Duplicate dataBiased modelDedup analysis

Data Formats for ML

Parquet (Columnar Storage)

import pyarrow.parquet as pq

# Write training data
table = pa.Table.from_pandas(df)
pq.write_table(
    table,
    "training_data.parquet",
    compression="snappy",
    row_group_size=100_000,
)

# Read only needed columns (huge speedup)
df = pq.read_table(
    "training_data.parquet",
    columns=["user_id", "feature_1", "feature_2", "label"],
).to_pandas()

Delta Lake / Iceberg (Table Formats)

# Delta Lake: ACID transactions + time travel for ML data
from delta import DeltaTable

# Write with versioning
df.write.format("delta").save("s3://data/features")

# Time travel: load training data as of a specific date
df_v2 = spark.read.format("delta") \
    .option("timestampAsOf", "2025-04-01") \
    .load("s3://data/features")

# Audit: what changed between versions?
dt = DeltaTable.forPath(spark, "s3://data/features")
history = dt.history()

Practical Architecture: End-to-End Example

User Actions (clicks, views, purchases)

    ├── Kafka (event stream)
    │   │
    │   ├── Flink (real-time features)
    │   │   └── Redis (online feature store)
    │   │
    │   └── S3 (raw event archive)
    │       │
    │       └── Spark (daily batch job)
    │           ├── Delta Lake (feature tables)
    │           └── Feature Store (offline: for training)

    ├── Training Pipeline (Airflow)
    │   ├── Read from Feature Store (point-in-time join)
    │   ├── Train model
    │   ├── Evaluate against baseline
    │   └── Register model (MLflow)

    └── Serving Pipeline
        ├── Model Server (vLLM / TorchServe)
        ├── Feature Server (online features from Redis)
        └── Monitoring (Prometheus + Grafana)

Takeaways

  1. Feature stores aren’t optional in production ML—they prevent training-serving skew
  2. Start with batch pipelines, add streaming only when latency requirements demand it
  3. Data validation is as important as model testing—bad data silently kills accuracy
  4. Use Parquet for all ML data storage—columnar format is 10x faster for feature reads
  5. Log everything: raw events, computed features, model predictions, and outcomes
  6. Point-in-time correct joins are essential—future data leaking into training is the most common ML bug
  7. Design for reproducibility: given a timestamp, you should be able to reconstruct exactly what features the model saw