4  Data systems

4.1 Preview

Machine learning systems live or die by their data. Models are often treated as the centerpiece of an ML stack, but in production the model is only as good as the data systems that feed it, govern it, and allow it to be understood over time.

In a toy machine learning system, data is easy: we find a clean, curated dataset, download it to a laptop, and design, train, and evaluate a model around it. In a production system, data is more complicated. We may pull from multiple internal and external sources, many teams need access to data at different stages of its lifecycle, models need regularly updated training data for retraining, and the data is often too large to process on a single machine.

This chapter describes the data infrastructure required to support large-scale, reliable, and compliant machine learning in real organizations. We focus on the types of data that are involved in an ML system, where that data comes from, how it is stored, how it flows through batch and real-time pipelines, how it becomes features and training sets, and how it is monitored and governed.

Warning

Code snippets in this chapter are examples for illustration. They may not run exactly as written in a given environment.

4.2 Types of data

ML systems involve many kinds of data, and not all data has the same role or requirements. Before discussing design decisions, it is useful to first understand the different types of data an ML system produces and consumes.

Application state is what the product needs in order to operate from moment to moment. This data is small, changes often, and must be strongly consistent and safe for concurrent access.

Training data is used to build and test models - images, labels, model outputs, and user corrections. It keeps growing as new training data is generated by the operational system, and it is read far more often than it is written. It should be saved in a way that supports:

  • being able to go back and see exactly what data a model was trained on (reproducibility)
  • being able to see where each label or prediction came from (lineage)
  • being able to enforce who is allowed to use which data and for how long (governance)

Analytics data is used to understand how the system is behaving and how well it is working - for example, how often people engage with content, how many errors occur, or whether the model treats different groups fairly. It is usually looked at over large amounts of data and long periods of time to find patterns and trends, rather than updated one record at a time.

Analytics data is more complex in ML systems than in traditional software systems, because the ML system is constantly changing its own behavior. In a normal application, logs describe how fixed code behaved. In an ML system, analytics must also track which model was running, what data it was trained on, and how its outputs changed over time. It has to link events (predictions, user edits, errors) back to model versions, training data, and deployments, and keep that history even after models and outputs are replaced. Without that, you cannot explain why the system behaved a certain way in the past, detect regressions, or audit bias and errors.

For example, imagine a company whose service includes an image captioning model. Given one specific image,

  • application state includes a unique identifier for the image and the user who uploaded it, the current caption shown on the image (which may have been generated by a model, updated by a newer model, or modified manually by the user), whether it has been deleted, and any moderation status. This is what the product uses to display content and decide what users are allowed to see.
  • training data includes the image itself, every caption a model has ever generated for it and which model version produced each caption, feedback from users or moderators, and the user’s consent and usage rights for that image. This data is used to build training sets and evaluation sets for new image captioning models.
  • analytics data includes the full history of what happened to an image over time, including actions that were later changed or removed. For example, if users complain about biased or offensive captions, regulators might ask to see exactly what the model produced for photos uploaded between March 3rd and March 10th, how users reacted, and what moderators did. Answering that question means looking through all past model outputs, user edits, and actions over that time period - not only the captions that happen to be stored in the system today.

It is also useful to categorize data along another dimension: how is it organized?

  • Structured and semi-structured data is data that fits naturally into rows and columns - things like user IDs, timestamps, labels, or model scores. At small scales this might live in CSV (row‑oriented) or Parquet (column‑oriented) flat files that you can load into memory. As the data grows or more people start using it, it usually moves into systems that can handle larger volumes and many users at once, such as databases or analytics systems.
  • Other data does not fit into rows and columns at all. This includes images, videos, audio files, PDF documents, and saved model files. These are large files that are usually stored and accessed by name or URL. To make them usable, systems usually need to keep extra metadata information elsewhere - for example, a table that says which file belongs to which user, or a registry that keeps track of which model file is the latest version.

These two types of data coexist and relate to one another within the system - here is an example that shows how raw and processed image files can be referenced within a structured schema:

upload_id user_id file_id object_store_raw object_store_processed upload_date
102 55 f_98a2 s3://raw/f_98a2.jpg s3://processed/f_98a2.jpg 2025-02-11
103 55 f_98a3 s3://raw/f_98a3.jpg s3://processed/f_98a3.jpg 2025-02-11

These two ways of looking at data - what it is used for, and how it is shaped - determine which kinds of storage systems and data repositories make sense, which we will return to later. First, though, we will look more closely at where training data comes from.

4.3 Acquiring training data

Application data and analytics data are relevant to many software systems, but training data is specific to ML systems. Furthermore, training data is a key factor that determines the value of an ML model and the system that surrounds it:

  • In a model-centric view of ML systems, the team collects whatever data happens to be available, and then modifies the model to try and improve its performance.
  • In a data-centric approach, the model design is mostly held constant, and instead, the focus is on improving the result by making the training dataset cleaner, more consistent, and more representative of the real world.

Model-centric loop:

---
config:
  theme: default
---
stateDiagram
  direction TB
  Train --> Error
  Error --> Tune
  Tune --> Train
  Train:Train model
  Error:Error analysis
  Tune:**Tune model**

Data-centric loop:

---
config:
  theme: default
---
stateDiagram
  direction TB
  Train --> Error
  Error --> Tune
  Tune --> Train
  Train:Train model
  Error:Error analysis
  Tune:**Tune data**

When the data is strong, many different models can perform well. When the data is weak, even very advanced models are likely to struggle. So, even in the model-centric approach, it is very important that the data be of very high quality and relevance to the task. In the data-centric approach, it is very important to have a strong process for iterating and improving on the data, and a robust data infrastructure to monitor and track these changes.

It is therefore worthwhile for us to discuss training data in more detail. Where does training data come from? What are the properties of “good” training data, and what makes data “bad” for training?

4.3.1 Sourcing training data

Training data can come from:

  • External sources: There may be a public or private third-party dataset that is relevant to the machine learning task you are interested in. For example, an image-recognition project might use an open dataset of labeled photos, or a speech-to-text system might start with hours of professionally transcribed audio. Sometimes this data is already in a machine-learning-ready format, but in many cases it arrives as raw files, logs, or documents that need substantial cleaning, filtering, and reformatting before they can be used for training.
  • Internal sources: When an ML team is part of an existing organization, it can often draw on operational data generated by real users and real systems. A streaming service might use viewing history and search logs to train a recommendation model, while a customer support team might use past chat transcripts to train a classifier that routes tickets. This data is usually highly relevant to the business, but it often contains noise, missing fields, and biases that must be addressed before it becomes useful training data.
  • Synthetic sources: Teams can also create data artificially, e.g. by simulating scenarios or by augmenting real examples. For instance, a speech system might add background noise to clean recordings, or an image system might rotate, crop, or distort photos to create new training examples. Synthetic data can greatly expand coverage of rare or difficult cases, but it needs to be grounded in real data and real-world conditions; otherwise, the model learns patterns that do not actually occur in practice.

Training data can also come from a mix of these sources - for example, you might combine internally/externally sourced unlabelled data with synthetic labels to get a labeled training set.

NoteGenerating synthetic labels for real data

When labeled training data is scarce or expensive to obtain, teams sometimes use models or heuristics to generate synthetic labels automatically. “Real” data with synthetic labels is typically much, much better than completely synthetic data, because the inputs come from the real production distribution (real noise, edge cases, and correlations), so the model learns to handle what it will actually see at inference time.

For example, suppose you want to train a classifier to estimate the quality of Git commit messages in your organization. You have an existing unlabeled dataset of commit messages, from your organization’s Git repository. You might use a high-quality foundation model to generate the quality labels, then use that labeled data to train your own model.

Why train your own model, if the high-quality foundation model can already generate good labels? Your organization might want to use a much smaller model, one that is less expensive (e.g. uses less compute, energy) than the model used to generate the data, and that has a faster inference time. This approach - transferring knowledge from a large, highly capable “teacher” model to a smaller and more efficient specialized “student” model - is a kind of knowledge distillation.

Or, instead of using a model to generate labels, you might use some imperfect heuristics to apply quality labels to the commit messages. For example:

  • if the message is very short, label it “low quality”
  • if the exact message appears multiple times in the dataset (e.g. exactly “fix the frontend”, “updates to data ingestion capabilities”, “did some stuff”), label it low quality
  • if the message includes a “why” phrase (“to prevent”, “because”, “so that”), label it high quality

You anticipate that these heuristic-labeled data will have noisy labels that are not always correct, but it is still useful because it gives you a large, cheap-to-create training set that captures many of your organization’s norms, and the model can learn the general patterns despite noise. This approach - applying imperfect, noisy, machine-generated labels to an unlabeled dataset - is known as weak supervision.

Synthetic labels are only as good as the process that generates them. If a model used for labeling is biased, inaccurate, or trained on unrepresentative data, those errors propagate into the new training set. A model trained on synthetically labeled data can then amplify and perpetuate those errors.

To use synthetic labels responsibly,

  • document the labeling process clearly, ideally with a reproducible pipeline
  • create a small data set with human-generated ground truth labels against which to compare and validate both the labeling process and the model itself
  • include evaluation of label quality in your pipeline

and keep checks in place to catch systematic errors and biases.

NoteGenerating synthetic data (features and labels)

Sometimes, we need to generate synthetic data including both features and labels - not only labels. This may be because we don’t have enough real data and can’t get more, or because the real data is private and it is considered inappropriate to use it directly for training.

To generate synthetic data, we would:

Start from real data seeds. We need to start with at least a small set of real examples, and we should evaluate their quality carefully.

As with any training data source, we should check for coverage and bias. But given that we are going to generate a large number of new samples using these “seeds”, any distortion in the seeds gets amplified, so we need to be even more careful than we would be for a real training set about things like marginals, correlations, class balance and rare events.

For example, imagine we have seed data from one class where one particular very high-performing student always submits assignments early, the day they are released. As a result, we might see a correlation between “submission_time” and high grades. A synthetic data generator trained on that seed set can amplify that relationship and produce synthetic data where early submission looks like a strong driver of performance, even though in the general population we also see high grades from students who submit later.

Use a generation pipeline that preserves required properties. There are a variety of techniques to generate new samples from these seeds. At a high level, these techniques learn the joint distribution of the real data, then sample new records from that learned distribution to create synthetic examples that match the overall structure.

For tabular data, frameworks like SynthCity implement some of these techniques1. For example, we can use SynthCity to generate new samples from a “real” data frame:

from synthcity.plugins import Plugins

plugin = Plugins().get("ctgan")
plugin.fit(real_df, outcome_column="outcome")

synthetic_df = plugin.generate(count=50_000).dataframe()

If we are concerned about leaking private data from the seed examples, we can use a generator with differential privacy, which adds carefully calibrated noise to protect individual records without substantially changing the result. For example, PATE-GAN is a generator with differential privacy guarantees:

from synthcity.plugins import Plugins

plugin = Plugins().get("pategan")
plugin.fit(real_df, outcome_column="outcome")
synthetic_df = plugin.generate(count=50_000).dataframe()

For unstructured data, like text or images, we might use a generative AI model in our synthetic data pipeline to generate more examples like our seeds.

Evaluate and clean the synthetic data. Once we have generated synthetic data, we need to evaluate its quality and clean it. Broadly speaking, there are a few ways to evaluate the synthetic data we have generated:

  • by checking the data itself for problems like near duplicates, low information samples, or label issues
  • by checking the statistical properties of the synthetic data relative to our real seed samples and our target domain
  • or by checking whether a model trained on this synthetic data is effective at making predictions on real samples

As a first pass, frameworks like cleanlab can identify potential data issues without having to manually specify any rules. For example, it can identify near-duplicate images or text samples, which is a common problem when generative AI models are used in a synthetic data pipeline - it is often necessary to filter out a substantial fraction of generated samples because of duplication.

For statistical similarity, we would want to compare our synthetic data and our real seed samples. Frameworks like Evidently can check for “drift” with respect to the original data. Here is a simple example that compares synthetic data to the real seed data, then asserts that the share of drifted columns stays below a limit:

from evidently import Report
from evidently.presets import DataDriftPreset

report = Report([DataDriftPreset()])
report.run(current_data=synthetic_df, reference_data=real_df)

result = report.as_dict()
share_drifted = result["metrics"][0]["result"]["share_of_drifted_columns"]
assert share_drifted <= 0.1

More generally, we also want to make sure the data is reasonable with respect to the target domain. For example, we have expectations like: age cannot be negative, days_since_last_login cannot exceed account_age_days, and when plan_status is cancelled, cancellation_timestamp must be present and occur after signup_timestamp. We can use whatever framework is in our data control plane (discussed later in this chapter) to validate and enforce this in our data pipeline.

The real test of a synthetic training set is whether it is actually useful for model training. The typical pattern is a train-on-synthetic, test-on-real check: train a model on synthetic data and evaluate it on a held-out real test set. If it collapses on the real test data, the synthetic data is not capturing the decision-relevant structure we need, and we can’t use it. (Note that even if privacy concerns prevent us from training on real data, the same concerns may not apply to the test set, since there is much less risk of private data leaking into the model via the test set.)

Ideally, all three of these stages - selecting seeds and evaluating them, generating synthetic data, and evaluating the synthetic data- happen as part of an automated pipeline, with quality gates so that low-quality data is not moved to the next stage.

synthetic_pipeline A Select real seed data B Seed quality checks A->B C Generate synthetic data B->C pass G1 Stop pipeline B->G1 fail hard B1 Drop bad seeds B:se->B1:sw fail E Synthetic quality checks C->E F Approved synthetic dataset E->F pass G2 Stop pipeline E->G2 fail hard E1 Drop bad samples E:se->E1:sw fail B1:nw->B:ne recheck E1:nw->E:ne recheck

4.3.3 Matching business context

Even perfectly legal use of high-quality data can be wrong for a particular ML system if it does not match the context in which the system will actually be used. Training data should reflect the people, behaviors, and environments the model will encounter in production. If it does not, the model will appear to work well in testing but fail in practice.

For example,

  • a speech-recognition system trained mostly on recordings from professional voice actors will struggle when real users speak casually, mumble, use slang, or talk in noisy environments.
  • a fraud-detection model trained on historical data from one country may perform poorly when deployed in another, where spending patterns and payment methods are different.
  • a customer support classifier trained on last year’s tickets may not recognize new products, new issues, or new ways that users describe their problems.

This is why data alignment is not just about technical quality, but also about the context. Teams need to ask whether the dataset represents the users they care about, the decisions the system will make, and the conditions under which it will operate. When evaluating alignment, three dimensions matter:

  • coverage (who/what is represented),
  • environment (where the data comes from),
  • and time (when it was collected).

A mismatch in any of these - coverage, environment, or time - can lead to models that look good on paper, but fail when deployed.

WarningReal-world example: survivorship bias as a context mismatch

Survivorship bias happens when your dataset includes only the entities that “survived” long enough to be observed, while the failures were removed from the data. A classic example is mutual funds. Suppose you build a model to pick “good funds” using a dataset of active mutual funds and their historical returns over a long time period. When you evaluate the model on the test set that you held out from the same data, it looks great: many of the funds in your dataset have strong performance.

But this can be an artifact of how the dataset was assembled. Funds that performed poorly often get shut down, merged away, or renamed, and they disappear from an “active funds” snapshot. If you only include the survivors, you are implicitly filtering out the failures. The model then appears to do well because the training and evaluation data over-represent winners and under-represent the full population that existed at the time decisions were made.7

In ML system design, the fix is not a modeling trick. It is a data design requirement: define the population as of each point in time, keep lifecycle events (creation, closure, deletion, migration), and ensure your training and evaluation sets include the entities that failed or disappeared. Otherwise, you will ship a model that looks strong offline but is calibrated to a world where failures do not exist.

Here’s a concrete example that came from a major private bank in India:8

Pranav, a data scientist with expertise in statistical modeling, was developing an algorithm aimed at producing a recommendation for the underwriters responsible for approving secured loans to small and medium-sized enterprises. Using the credit approval memos (CAMs) for all loan applications processed over the previous 10 years, he compared the borrowers’ financial health at the time of their application with their current financial status. Within a couple of months, Pranav had a software tool built around a highly accurate model, which the underwriting team implemented.

Unfortunately, after six months, it became clear that the delinquency rates on the loans were higher after the tool was implemented than before. Perplexed, senior managers assigned an experienced underwriter to work with Pranav to figure out what had gone wrong.

The epiphany came when the underwriter discovered that the input data came from CAMs. What the underwriter knew, but Pranav hadn’t, was that CAMs were prepared only for loans that had already been prescreened by experienced relationship managers and were very likely to be approved. Data from loan applications rejected at the prescreening stage was not used in the development of the model, which produced a huge selection bias. This bias led Pranav to miss the import of a critical decision parameter: bounced checks. Unsurprisingly, there were very few instances of bounced checks among the borrowers whom relationship managers had prescreened.

The technical fix in this case was easy: Pranav added data on loan applications rejected in prescreening, and the “bounced checks” parameter became an important element in his model. The tool began to work as intended.

The bigger problem for companies seeking to achieve business value from data science is how to discern such sources of bias upfront and ensure that they do not creep into models in the first place. This is challenging because laypeople - and sometimes analytics experts themselves - can’t easily tell how the “black box” of analytics generates output. And analytics experts who do understand the black box often do not recognize the biases embedded in the raw data they use.

The banks in our study avoid unrecognized bias by requiring that data scientists become more familiar with the sources of the data they use in their models. For instance, we saw one data scientist spend a month in a branch shadowing a relationship manager to identify the data needed to ensure that a model produced accurate results.

We also saw a project team composed of data scientists and business professionals use a formal bias-avoidance process, in which they identified potential predictor variables and their data sources and then scrutinized each for potential biases. The objective of this process was to question assumptions and otherwise “deodorize” the data - thus avoiding problems that can arise from the circumstances in which the data was created or gathered.

NoteBusiness alignment hypothetical: drone-based crop monitoring

For example, suppose you are working for a startup that uses drones to detect plant disease in farms from overhead photos. You find an open dataset of photographs of plants with and without disease, called PlantVillage, and you use it to train your model.

However, when your model is deployed on real drone photographs, it fails. You realize that in your training dataset, each leaf is isolated on a plain background. Here is a description of the data collection process, from the original paper about the dataset:

From field trials of crops infected with one disease, the technicians would collect leaves by removing them from the plant. The leaves were then placed against a paper sheet that provided a grey or black background. All images were taken outside under full light. Once the images were collected, they were edited by cropping away much of the background and orientating all leaves so that they tip-pointed upwards.9

In the drone footage, however, leaves are partially occluded, lighting changes constantly, and backgrounds include soil and other plants10. The “world” that the model has learned from - a clean, high-contrast visual world - does not exist in real fields.

PlantVillage leaf on plain background

In-the-field plant disease photo

To identify this mismatch before failing in the real world, you should have:

  • visually explored the data, including looking at individual samples (not just summary statistics), and have enough domain knowledge to judge whether it resembles deployment conditions,
  • understood the lineage of the data, how it was collected and how it was processed before you ingested it.
NoteBusiness alignment hypothetical: headshot authenticity

Suppose a career networking platform wants to enforce a “real photos only” policy, and needs a classifier to distinguish real headshots from AI-generated ones.

The engineers building this classifier create a labeled training set by combining multiple sources: “real” images from a well-known dataset used for face recognition, and “fake” images from an AI model that they prompted to generate headshots. They develop a model with excellent accuracy on the test set that they have held out from their data, but when deployed in production, the model fails!

The failure was not a modeling error, but rather, a data error. This Frankenstein dataset makes the task artificially easy, because the “real” images were cropped from candid, non-posed photographs. As a result, the model can learn candid vs studio cues (lighting, framing, background) rather than real vs synthetic. In production, the real photos are professional headshots, so those cues vanish and the model fails for the wrong reason.

Candid face photo.

Real professional headshot

Even if this issue is addressed (e.g. you collect real studio-style headshots) and the classifier works today, the “fake” distribution changes quickly. A detector trained on headshots generated by earlier models can fail when a newer generator produces more photorealistic studio portraits. Unless the dataset is refreshed frequently, the model will lag behind the current generation of synthetic images.

AI headshot from earlier model

AI headshot from newer model
WarningReal-world example: Frankenstein datasets in COVID-19 chest X-ray models

Early in the COVID-19 pandemic, many papers claimed that deep learning models could diagnose COVID-19 from chest radiographs. A later review found that none of the surveyed models were of potential clinical use due to methodological flaws and underlying biases.11

A major source of failure was the use of Frankenstein datasets. Researchers would combine an emerging COVID-19 X-ray dataset with one or more existing X-ray datasets (for example, datasets built for viral vs bacterial pneumonia). In many of these combinations, the “COVID” images and the “non-COVID” images came from different hospitals, different machines, different preprocessing pipelines, and sometimes different patient populations. The images were so distinctive to their source dataset that a model could learn to recognize the dataset itself, rather than the disease.

In some “COVID vs non-COVID” constructions, the non-COVID examples were taken from pediatric chest X-ray datasets. Models that appeared to learn COVID vs non-COVID pneumonia ended up with zero clinical value, because they were actually learning artifacts correlated with the label in the training data, for example, child vs adult body structure. In fact, researchers showed that similar performance could be achieved even when most of the lung region was masked out, indicating that the model was not learning the intended clinical signal.12

Example: dataset artifacts in COVID-19 chest X-ray collections

This example highlights the important of (1) looking at the data, and having enough domain expertise to interpret it - for example, to distinguish between adults and children in X-rays! (2) understanding the data lineage - including how it was collected and who it was collected from (3) sanity-checking results with domain experts - in this case, radiologists understood that differences between COVID and non-COVID pneumonia were not usually obvious in radiography, and could have flagged this as an unreasonable result.

4.3.4 Fairness and bias

Even if the data is from the right “world”, ML teams still need to consider whether the data fairly covers the important parts of that world, or whether there is a potential for bias due to representation issues. When certain groups or scenarios are missing or rare, models tend to work well for the majority and poorly for everyone else.

  • A voice assistant trained mostly on young, urban speakers may struggle with older users or regional accents.
  • A navigation system trained on dense city traffic may behave unpredictably on rural roads.
  • A recommendation system trained mostly on heavy users may ignore the needs of casual or new users.

In each case, the model is not intentionally biased, but it reflects the incomplete picture of the world that it was trained on.

WarningReal-world example: representation bias in skin lesion classifiers

In medical imaging, training data often comes from a limited set of hospitals or regions. Skin lesion classifiers provide a clear example of why this matters. Many widely used datasets (e.g., HAM10000) were built from patients in predominantly light-skinned populations. When models trained on those images are applied to patients with darker skin tones (e.g., from the Diverse Dermatology Images dataset), performance drops, because the data did not include enough examples of how diseases appear on darker skin and so the model never learned those visual patterns.13

HAM10000 skin-tone distribution proxy from corner sampling (darkest to lightest)

The figure above comes from a method that samples the corner pixels of each dermoscopy image (to avoid sampling the lesion itself), computes an average skin-color per image, and then orders those averages from darkest to lightest.14 This makes the under-representation of darker skin colors easier to see.

This is a form of selection bias: the dataset reflects who was easiest to collect data from, not who will ultimately be served by the system. In healthcare, that mismatch is especially dangerous, because worse model performance for underrepresented groups can directly translate into worse care.

Bias also enters through the labels and outcomes attached to the data. Even if a dataset includes all relevant groups, the way those groups are labeled can encode historical or social inequities. For example, a credit dataset might reflect discriminatory lending practices, where some applicants were more likely to be marked as “high risk” despite similar financial behavior. A content moderation dataset might reflect the cultural norms of a particular group of reviewers, causing some speech from some communities to be labeled as “toxic” or “unsafe” more often than others. These issues can lead to embarrassing product failures.

WarningReal-world example: hiring labels that encode bias

A widely reported example is Amazon’s attempt to build an automated recruiting system. The idea was to train a model to score job candidates by learning from historical resumes and hiring outcomes. But because the historical data reflected a workforce that was disproportionately male, the model learned patterns that correlated with gender and penalized resumes associated with women. The system was reportedly shut down after it could not be made to stop discriminating against women.15

This is a general lesson about labels in ML systems. In hiring, the “label” is often a past human decision (hire, reject, promote), not an objective truth about candidate quality. If those past decisions reflect unequal opportunity or bias, then training data labeled with those outcomes can teach a model to reproduce the same bias at scale, even if the model never sees an explicit gender field.

WarningReal-world example: bias encoded in proxy labels

In many ML problems, the true thing you care about cannot be directly measured or is not available in your data. Teams then use a proxy label: a measurable quantity that is supposed to be correlated with the real outcome. Proxies can be useful, but they are dangerous when the proxy does not mean the same thing for everyone, or when it reflects historical or structural inequities.

A well-studied example from healthcare demonstrates how bias can be encoded even in completely objective labels, when they are a proxy for the true measure of interest. An algorithm used to allocate care did not have any way to measure actual medical need; as a proxy, the target variable it was trained to predict was health care cost, under the premise that more severe conditions would be more expensive to treat.

But because historically marginalized groups often incur lower costs due to unequal access to care, the model systematically underestimated their health needs. Even though cost appeared to be a convenient proxy and the predictions seemed accurate on that metric, the underlying target encoded a biased relationship, leading to inequitable outcomes.16

These issues of coverage and labeling are not just technical concerns; they also raise concerns with respect to discrimination law and regulation. When models systematically perform worse for certain groups, or when they reproduce biased historical decisions, the results can violate civil rights and consumer protection laws even if no one intended harm. Because of this, regulators increasingly treat biased ML systems as a form of regulated decision-making. Organizations may be required to demonstrate that their training data is appropriate, that protected groups are not unfairly disadvantaged, and that models can be audited when harms occur.

4.3.5 Tracking data lineage

Given that many of the problems described above related to the way that the data is collected, or the way that the data is processed before we acquire it as a training set, lineage is extremely important. For any dataset used to train or evaluate a model, we must know who collected it, when, from what sources, and what transformations were applied. Without lineage, it is impossible to understand if the data is aligned with the business context, audit bias, or respond to consent revocations.

In practice, lineage is often documented through traditional codebooks and data dictionaries: supplementary documents that explain how a dataset was collected, what each field means, what values are valid, and what limitations or caveats apply. Reading these documents is not optional in production ML. It is how you find out whether a column is complete, whether a measurement is missing for part of the population, whether a value of 0 means “actual zero value” or “missing value”, and whether the dataset has known quirks that would otherwise look like signal.

As a concrete example, consider the NYC Taxi and Limousine Commission (TLC) Yellow Taxi Trip Data.17 The web page for this data hosts a data dictionary spreadsheet that documents both dataset-level context and column-level semantics.

The data dictionary includes a “dataset information” table:

Field Value
Dataset Name Yellow Taxi Trip Data
Dataset URL https://data.cityofnewyork.us/browse?Data-Collection_Data-Collection=TLC%20Trip%20Data&sortBy=alpha
Data Provided by
The name of the NYC agency providing this data to the public.
Taxi and Limousine Commission (TLC)
Each row is a…
The unit of analysis/level of aggregation of the dataset.
Taxi trip record
Publishing Frequency
How often changed data is published to this dataset. For an automatically updated dataset, this is the frequency of that automation.
Historical data
Data Change Frequency
How often the data underlying this dataset is changed.
As needed
Frequency Details
Additional details about the publishing or data change frequency, if needed.
As this is historical data, the dataset is only published once. The data will only be changed rarely if there are corrections needed.
Dataset Description
Overview of the information this dataset contains, including overall context and definitions of key terms. This field may include links to supporting datasets, agency websites, or external resources for additional context.
These records are generated from the trip record submissions made by yellow taxi Technology Service Providers (TSPs). Each row represents a single trip in a yellow taxi. The trip records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off taxi zone locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts.
Why is this data collected?
Purpose behind the collection of this data, including any legal or policy requirements for this data by NYC Executive Order, Local Law, or other policy directive.
In partnership with the New York City Office of Technology and Innovation (OTI), TLC has published millions of trip records from both yellow medallion taxis and green Street Hail Livery (SHLs). Publicizing trip record data through an open platform permits instant access to records which previously were available only through a formal process (FOIL request.). Internally, TLC uses similar data to guide and evaluate policy decisions.
How is this data collected?
The methods used to create and update this dataset, including what cleaning or processing was involved prior to dataset publication.
If data collection includes interpreting physical information this field includes technical details.
If data collection includes fielding applications, requests, or complaints, this field includes details about the forms, applications, and processes used.
The data used in the attached datasets were collected and provided to the NYC Taxi and Limousine Commission (TLC) by technology providers authorized under the Taxicab & Livery Passenger Enhancement Programs (TPEP/LPEP). The trip data was not created by the TLC, and TLC makes no representations as to the accuracy of these data.
How can this data be used?
Examples of and/or links to projects or agency operations that have used this dataset.
Where relevant, includes links to online projects, agency websites, visualizations, maps, or dashboards.
What are some questions one might answer using this dataset?
TLC internally uses similar data to provide internal and external public dashboards, please see: https://www.nyc.gov/site/tlc/about/data-and-research.page. This data is heavily used as an example dataset for data engineering and data science tasks. For an excellent third-party set of visualizations, see https://toddwschneider.com/dashboards/nyc-taxi-ridehailing-uber-lyft-data/.
What are the unique characteristics or limitations of this dataset?
Unique characteristics of this dataset to be aware of, specifically, constraints or limitations to the use of the data.
As the trip data was provided by technology providers to TLC, there may be some noise. This may occur in the form of unexpected categories or numbers out of expected ranges in some columns.
Additional geospatial information
For any datasets with geospatial data, specify the coordinate reference system or projection used and other relevant details.
Please see our website under “Taxi Zone Maps and Lookup Tables”: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page. There is a shapefile to associate LocationID zones to geographic coordinates.

If you were using this data for ML, this table already tells you some important things:

  • The provider explicitly warns that the data may contain noise and out-of-range values, so you know to check for and handle those.
  • Geospatial features require external lookup tables (taxi zones); now you know where to find them, and that you’ll need to join them.

The data dictionary also includes a “column information” table:

Column Description Codes / Values Notes
VendorID A code indicating the TPEP provider that provided the record. 1=Creative Mobile Technologies, LLC
2=VeriFone Inc.
tpep_pickup_datetime The date and time when the meter was engaged.
tpep_dropoff_datetime The date and time when the meter was disengaged.
passenger_count The number of passengers in the vehicle. This is a driver-entered value.
trip_distance The elapsed trip distance in miles reported by the taximeter.
RatecodeID The final rate code in effect at the end of the trip. 1= Standard rate
2=JFK
3=Newark
4=Nassau or Westchester
5=Negotiated fare
6=Group ride
99 = Null/unknown
store_and_fwd_flag This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka “store and forward,” because the vehicle did not have a connection to the server. Y= store and forward trip
N= not a store and forward trip
PULocationID TLC Taxi Zone in which the taximeter was engaged
DOLocationID TLC Taxi Zone in which the taximeter was disengaged
payment_type A numeric code signifying how the passenger paid for the trip. 0= Flex Fare trip
1= Credit card
2= Cash
3= No charge
4= Dispute
5= Unknown
6= Voided trip
fare_amount The time-and-distance fare calculated by the meter. For additional information on the following columns, see https://www.nyc.gov/site/tlc/passengers/taxi-fare.page
extra Miscellaneous extras and surcharges.
mta_tax Tax that is automatically triggered based on the metered rate in use.
tip_amount Tip amount - This field is automatically populated for credit card tips. Cash tips are not included.
tolls_amount Total amount of all tolls paid in trip.
improvement_surcharge Improvement surcharge assessed trips at the flag drop. The improvement surcharge began being levied in 2015.
total_amount The total amount charged to passengers. Does not include cash tips.
congestion_surcharge Total amount collected in trip for NYS congestion surcharge.
airport_fee For pick up only at LaGuardia and John F. Kennedy Airports.

This table is similarly essential. For example, suppose you were building a model to predict tipping behavior. You really need to know (from reading the data dictionary!) that tip_amount only records credit card tips. When payment_type is not credit card, tip_amount is not a valid observation of tipping and is encoded as 0. If you do not read the codebook, you might incorrectly conclude that many riders did not tip at all, and you would train and evaluate your model on a mislabeled target. If you read the codebook, you would know to either restrict the task to payment_type = 1 (credit card) or define a different target that does not assume missing tips are zero.

A slightly more modern approach is to ship datasets with “data cards” (also called dataset cards): short, structured documents that summarize what the dataset is, how it was collected, what it should and should not be used for, and what risks or limitations are known. The core idea is the same as a traditional codebook, but with a stronger emphasis on intended use, ethical considerations, and how the dataset behaves over time. When available, a data card can quickly tell you what questions to ask before you build features or train a model.

On Hugging Face, dataset cards are typically stored as a README.md that combines machine-readable metadata and human-readable documentation. The machine-readable portion is a YAML block at the top (front matter). The Hub can parse these fields (languages, licenses, tasks, splits, features) to power search, filtering, and dataset viewers. For example, the Hugging Face dataset card metadata documentation includes YAML like this:

---
language:
  - {lang_0}  # Example: fr
  - {lang_1}  # Example: en
license: {license}  # Example: apache-2.0 or any license from https://hf.co/docs/hub/repositories-licenses
tags:
  - {tag_0}  # Example: audio
annotations_creators:
  - {creator}  # Example: crowdsourced, found, expert-generated, machine-generated
pretty_name: {pretty_name}  # Example: SQuAD
task_categories:
  - {task_0}  # Example: question-answering
dataset_info:
  features:
    - name: {feature_name_0}    # Example: id
      dtype: {feature_dtype_0}  # Example: int32
  splits:
    - name: {split_name_0}                  # Example: train
      num_examples: {split_num_examples_0}  # Example for SQuAD: 87599
---

See the full metadata field list and examples here: Hugging Face dataset card metadata documentation.

The human-readable portion is ordinary Markdown with a consistent set of sections. The Hugging Face dataset card template is a good reference for what to include:

# Dataset Card for [Dataset Name]

## Dataset Description
- **Homepage:**
- **Repository:**
- **Paper:**
- **Leaderboard:**
- **Point of Contact:**

## Dataset Structure
### Data Instances
### Data Fields
### Data Splits

## Dataset Creation
### Source Data
### Annotations

## Considerations for Using the Data
### Discussion of Biases
### Other Known Limitations

Template source: Hugging Face dataset card template (README.md).

For guidance on what to write and why, see the Hugging Face dataset card creation guide: README_guide.md.

As one example, the dataset card for stanfordnlp/snli includes sections like Dataset Description, Dataset Structure, Dataset Creation, Considerations for Using the Data, and Additional Information. Two details are especially useful to notice if you were deciding whether to train on this dataset:

  • Labeling process: the card explains that a subset was re-annotated in a validation task with five total judgments (including the original label), reports agreement statistics, and describes who the annotators were and how they were paid.
  • Biases: the card’s bias discussion calls out evidence of stereotypes (including gender stereotypes) in the hypotheses and points to analyses of those patterns.

When an external data source is ingested, its codebook or data card explains how the data was collected and processed up until the point of ingestion. Beyond that point, it is the team’s responsibility to keep a record of where the data came from and what was done to it.

A simple way to think about this: every time a new dataset is created from an old one, it should leave a receipt behind. That receipt should be easy to find later, and it should let someone answer a few basic questions:

  • What did we start from? Which source dataset(s), and which exact copy of them?
  • What did we do? Which code ran, with what settings?
  • What did we produce? Where is the output, and what does it look like?
  • When did we do it, and why? What was the goal, and who ran it?

Ideally, teams automate this by having the data pipeline record these details for every run, and carrying this record through until data is “consumed” (e.g. by an ML training run).

Finally, note that this trail is not only about debugging or reproducibility: it is how governance stays intact after ingestion. If a data sample has rules like “only for research”, “delete after 30 days”, or “do not use for training”, those rules must be recorded alongside the data and carried forward into derived datasets, so the system can enforce access, auditing, deletion, and retention as data moves through pipelines.

When using external data sets, we often have to make tradeoffs related to the items mentioned in this section. For example, Kaggle data sets are easy to use and discover, but they often lack lineage. In many cases, they are generated synthetically following very poor practices, or are copied from other sources without attribution and have incorrect licensing information. Similar concerns apply to some HuggingFace datasets, especially community-contributed datasets that do not come well-known organizations or research groups. Open government data portals typically have well-documented data, but it may be unstructured data that needs substantial processing and joining multiple sources before it can be useful. Official, curated benchmark data sets distributed on HuggingFace are also easy to use, but they are often disconnected from the typical usage context. Other data sources might be better in terms of lineage/documentation and fit for the intended use, but are more difficult to discover.

4.3.6 System design questions

Some questions we should consider when acquiring data:

  • Is it legally and ethically acceptable for this use? Do the license, consent, and privacy rules allow this data to be used for the specific purpose and type of model you are building?
  • Is it aligned with the business and user context? Does this data actually reflect the people, behaviors, and situations your system is meant to support?
  • Is the data representative? Are important user groups, environments, or edge cases missing or underrepresented?
  • Is the labeling valid and consistent? Are labels defined clearly enough that different people (or systems) would assign the same label, and do the labels measure the real outcome of interest rather than a misleading proxy? If labels are synthetic or weakly supervised, is there a plan to validate them (e.g., a small human-labeled set)? Can systematic label bias be checked (for example, do error rates or label distributions differ across groups, settings, or time)?
  • Is the data fresh enough? Does it reflect current behavior, language, or conditions, or has the world already changed?
  • Can it be traced and audited? Do you know where it came from, how it was processed, and which models it has been used to train?

4.4 Storage primitives

Now, let’s return to the technical matter of: where does data live, and how does it move through the ML system?

At the lowest level, all data systems are built on two physical storage abstractions: block storage and object storage.

Block storage presents data as addressable blocks and is optimized for low-latency random reads and writes. If you have ever used a laptop disk, an external hard drive, or a USB thumb drive, you have used block storage. Raw block devices are not usable for files until you prepare them: you first

  1. partition the disk,
  2. create a filesystem on a partition (or on the whole device),
  3. and then mount that filesystem inside an operating system so it can store files and directories.

Block storage is the low-level substrate; filesystems are a structure we build on top of it.

NoteBlock storage and compute

When you create a compute instance on a cloud platform, you specify a disk image (with a bootable OS and a filesystem) to boot from. That image is typically written to an ephemeral block storage volume, and the instance boots from it. In this storage context, ephemeral means the volume is tied to the lifetime of the compute instance: when the instance is terminated, so is the volume, and the filesystem and any files you wrote to that disk are lost.

If you need more storage, or storage that persists beyond the lifetime of a single instance, you create a persistent block storage volume and attach it. There are two common ways to do this:

  • Create a volume from an image. The cloud provider writes a disk image to the new volume, so it is bootable and already has a filesystem. When you launch a compute instance, you specify that it should boot from this persistent volume. Any data written by the OS or applications persists after the instance is terminated. You can later launch a new instance that boots from the same volume and see the disk exactly as it was left.
  • Create an empty volume. This gives you raw block storage with no filesystem. You attach it to an instance that is already booted (often from its ephemeral disk), then partition/format it and mount it so it can store files. Even after the instance is terminated, you can access the files by attaching the volume to a new instance.

This helps keep costs down, because block storage is much cheaper than compute: if you are keeping an idle instance alive only to preserve its data between active sessions, you are usually better off storing that data on a persistent volume and running compute only when you need it.

Sometimes, you need a block storage volume tied to a service, not a particular compute instance. Managed services like Kubernetes can be configured to provision and attach persistent volumes on demand for the services they run, using the underlying cloud provider’s block storage.

Object storage is different: it does not expose a filesystem. Instead, it allows you to put (store) and get (retrieve) named objects (blobs) using an API. Objects live inside buckets, and the object name within a bucket is used as a key to uniquely identify the object. The storage system handles placement, replication, and durability behind the scenes. Familiar filesystem behaviors do not apply:

  • There are no true directories. “Folders” are just key prefixes, and listings are filtered by prefix rather than by a real directory tree. For example, naming an object images/12345.png does not create a real images/ folder; it just gives the object a key that happens to start with images/. You can then filter a bucket on objects whose key starts with images/, but you can’t rely on directory semantics like empty folders or per-directory permissions.
  • There are no in-place edits. You can’t modify an object, only replace it with another object that has the same name.

Databases, log systems, and operating systems traditionally use block storage underneath because they need low-latency random reads and writes, in-place updates, and full filesystem semantics (files, directories, permissions, and locking). Object storage, by contrast, is much less expensive when you just need to store large immutable blobs addressed by a key. It favors whole-object reads and writes, high durability, and low cost over fine-grained updates or filesystem-style access.

All major cloud computing platforms offer basic block storage and object storage services, although they may have different names. Block storage volumes are attached to compute instances and show up as disks; object storage is accessed through APIs and URLs.

Service OpenStack GCP AWS Azure
Block storage Cinder Persistent Disk Elastic Block Store (EBS) Managed Disks
Object storage Swift Cloud Storage (GCS) S3 Blob Storage
NoteSelf-hosted object storage

In our labs, we use MinIO as a self-hosted object storage service so the exercises stay cloud-provider-independent. MinIO is an S3-compatible object store that you can run yourself, which lets us work with object storage APIs and concepts without tying the labs to any specific provider. Other self-hosted alternatives are Garage and RustFS.

4.5 Data repositories

In ML systems, block storage and object storage are both very relevant. But raw storage (block or object) is only the foundation; recall that application state, training data, and analytics data all have specific and different requirements for consistency, history, and access:

  • application state needs fast and reliable (but small) reads and writes, including concurrent access from multiple processes.
  • training needs a validated and versioned snapshot of the data, so that results can be reproduced and audited. And, the system should preserve enough history to explain where labels, features, and examples came from, whether they were produced by humans or by earlier models - this property is called lineage.
  • analytics data needs a complete history of model outputs, user edits, and events, not only the latest values stored in the system, and it needs to support large-scale queries and compute over that data.

In a toy system, data is stored as files: but they can be overwritten, partially written, corrupted by concurrent processes, and there is no record of what changed or when. In practice, we need sort of data repository layer on top of the raw storage to organize, index, version, and serve data in ways that match those requirements.

4.5.1 Databases

A database is one such layer, and several database families are relevant to ML systems. A useful first distinction is between transactional databases and analytical databases.

  • Transactional systems are optimized for many small, latency‑sensitive operations (point lookups, inserts/updates/deletes), strong consistency, and concurrency control. The core workload is CRUD operations (Create, Read, Update, Delete) on small numbers of rows at a time.
  • Analytical systems are optimized for fewer but heavy queries that read and aggregate lots of data (often column-wise).

To illustrate the difference, consider two queries:

  • Transactional: “Update the caption for photo 12345 and record who changed it.” This touches a small number of rows, and needs to be atomic and consistent.
  • Analytical: “Compute the average number of comments per tag over the last 90 days.” This scans a single column across millions of rows and aggregates.

Relational databases, like PostgreSQL, are optimized for transactional workloads with frequent creates, updates, and deletes on rows with a predefined schema. They might be used store application state, including structured output of a classification or regression model.

Data warehouses store structured data in a schema optimized primarily for reads and large analytical queries. They are designed for aggregations, joins, and scans across millions or billions of rows. Columnar databases like ClickHouse are often used to implement a warehouse; they store data column-wise so that queries like “average number of comments per tag last week” can be answered efficiently by scanning only the relevant columns.

Document databases such as MongoDB store semi-structured records where each row can have a slightly different shape. They are useful when data evolves quickly or does not fit neatly into tables. For example, imagine tracking instructors who will be teaching courses in a department. A relational table might require a fixed set of columns for every instructor (net_id, name, degree):

id net_id name degree
u_7b3c ff524 Fraida Fund PhD ECE

A table in a document database would store the same record like this:

id document
u_7b3c {“net_id”:“ff524”,“name”:“Fraida Fund”,“degree”:“PhD ECE”}

If you later add a new field such as office_hours, in the relational database you must run a schema migration and decide how to backfill existing rows. In the document database, you can just start including the new field in new records when available, while older records can omit it. The trade-off is weaker relational constraints and more work to enforce consistency at the application layer.

Vector databases are optimized for storing and computing operations on embeddings, e.g. “find photos similar to this one” or “retrieve related documents for a prompt.” There are specialized vector databases like Qdrant, or some general-purpose databases also support vector search to a degree (for example, PostgreSQL with pgvector or ClickHouse with vector indexes), which can be sufficient at smaller scales or when you want to avoid a separate system.

Many ML stacks also rely on in-memory key-value stores such as Redis. These systems keep data in RAM, so reads and writes are extremely fast, but the data is not meant to be the long-term system of record. A typical pattern is: the authoritative data lives in a database or object store, and an operation over that data is maintained in the in-memory store for quick access. For example, an online ranking model might need the latest “per-user last 3-hour engagement rate” to generate output for a particular user; every engagement action by this user is logged to durable storage, like a database, but a streaming job updates this online feature and writes the latest value to an in-memory key-value store keyed by “user_id”. At request time, the model fetches that value in a few milliseconds instead of running a database query over raw events. If the in-memory store is restarted, it can be repopulated from the durable source of truth.

Cloud service providers offer many types of database services as managed services, or you can self-host open-source systems on a compute instance on-premise or in the cloud. The table below lists some common options by service category:

Database Open source
(self hosted)
GCP AWS Azure
Relational database PostgreSQL GCP Cloud SQL, GCP Cloud Spanner AWS RDS, AWS Aurora Azure SQL, Azure Database for PostgreSQL
Data warehouse ClickHouse GCP BigQuery AWS Redshift Azure Synapse Analytics
Document database PostgreSQL JSONB, MongoDB GCP Firestore AWS DocumentDB Azure Cosmos DB
Vector database PostgreSQL pgvector, Qdrant GCP Vertex AI Vector Search AWS OpenSearch Azure AI Search
In-memory data store Redis GCP Memorystore AWS ElastiCache Azure Cache for Redis

Besides for these examples, Snowflake is a popular commercial data warehouse that is neither open source, nor cloud-specific. Instead of being “the AWS warehouse” or “the GCP warehouse”, Snowflake is a separate managed service that can run in multiple clouds (AWS, GCP, or Azure).

4.5.2 Data lakes and lakehouses

So far we have described systems in which data is placed into a database before it is used: records fit a predefined schema, constraints are enforced on write, and updates happen through controlled transactions. This approach is called schema on write, and it works well when data is relatively structured and stable. But many machine-learning workloads start from the opposite position. Data arrives first, including unstructured data like images, text, and audio, and only later do we decide how it should be interpreted, labeled, filtered, or joined. Labels change, features are redefined, and new training objectives appear, so the raw data must remain accessible to support new and unforeseen uses.

A data lake is designed for this purpose: it stores raw and processed data as files in object storage, with minimal imposed structure, making it easy to capture and retain data in the system. This is often called schema on read: you store the data first, then apply a schema when you query it. For example, a lake might store raw JSON event logs as-is, and later we extract fields like user_id and event_type. A relational database following a schema on write pattern would require those fields to be defined and validated before a row can be inserted.

The downside of a lake is that it can become difficult to discover, manage, and reliably use data - the lake risks becoming a “data swamp.” To address this, we might add one or more “layers” above the object store, to provide structure, indexing, and queryability on top of a flat blob store. The “layer on top” will look different depending on the goal:

  • For raw media and datasets, it might be a metadata database that stores object URLs, labels, and permissions.
  • For analytics and training tables, it might be a catalog plus transaction log that makes Parquet files behave like tables.
  • For model artifacts, it might be a model registry that stores versioned pointers to objects and their lineage.

A data lakehouse adds a particular kind of such layer: one that adds history and ACID semantics, so that data in object storage can be managed and queried just like a structured data warehouse, while preserving the flexibility of a lake.

NoteACID, explained with examples

ACID is a set of guarantees that make a shared database (or data lakehouse) safe to use in multi-user systems:

  • Atomicity: a multi-step change is all or nothing. If a user posts a comment and you need to (1) insert the comment and (2) increment the comment count, you do not want a crash in the middle to insert the comment but fail to update the count (or vice versa).
  • Consistency: updates must obey rules and constraints. For example, if a schema requires that every upload_id is unique and every user_id referenced by an upload exists, the database prevents writes that would violate those rules.
  • Isolation: concurrent operations do not interfere in unsafe ways. If two requests try to update the same record at the same time, each one behaves as if it ran alone, preventing lost updates or partially mixed results.
  • Durability: once a write is committed, it persists even if the process crashes or the machine reboots.

Let us discuss the “layers” of a data lakehouse in some more depth.

lakehouse_layers object_store Object store note_bot Physical storage for raw and processed files in object storage. table_format Table format table_format->object_store note_mid Table layer tracks schema, snapshots, and file mappings. metadata Metadata / metastore metadata:s->table_format:n note_top Catalog and engines plan and coordinate reads and writes. query_compute Query / compute engines query_compute:s->table_format:n query_compute->metadata

The bottom layer is the object store; this is the physical storage layer of the lakehouse. It stores data as immutable objects addressed by URLs (for example s3://bucket/path/file.parquet or s3://images/abc123.jpg). It provides scalable and cheap storage, but it has no concept of tables, rows, schemas, or versions - everything is just files.

Sitting just above the object store, the next layer is a table format, such as Apache Iceberg. This is a metadata system that sits just above the raw files in the object store, and represents collections of such files as logical tables. It records which data files belong to a table, what columns they contain, how rows are physically organized across files (to make queries fast), and which specific files make up each version of the table.

A change to a table never edits files in place; instead, it produces new files and then updates a pointer to say “this is the current version.” The underlying data files are never modified; only the metadata is updated.

(The following examples describe the way Apache Iceberg works; other table formats may work in slightly different ways.)

For example, suppose we have a table of file uploads like:

upload_id user_id file_id object_store_raw object_store_processed upload_date
102 55 f_98a2 s3://raw/f_98a2.jpg s3://processed/f_98a2.jpg 2025-02-11
103 55 f_98a3 s3://raw/f_98a3.jpg s3://processed/f_98a3.jpg 2025-02-11

as part of a data lakehouse. This table would itself be stored as one or more Parquet files in object storage. Initially it might look like:

s3://lake/uploads/data/
  date=2025-02-11/part-0001.parquet

That Parquet file contains the two rows shown above. The table format engine also writes a snapshot:

Snapshot 1:
  files = [part-0001.parquet]

which is a small metadata record that answers: Which files make up the table right now?

Now suppose two more uploads arrive:

upload_id user_id file_id object_store_raw object_store_processed upload_date
104 55 f_98a4 s3://raw/f_98a4.jpg s3://processed/f_98a4.jpg 2025-02-11
105 55 f_98a5 s3://raw/f_98a5.jpg s3://processed/f_98a5.jpg 2025-02-11

The table format engine does not edit part-0001.parquet. Instead, it writes a new Parquet file:

s3://lake/uploads/data/
  date=2025-02-11/part-0001.parquet   # old rows
  date=2025-02-11/part-0002.parquet   # new rows

and a new snapshot is written, and becomes the “current” snapshot:

Snapshot 2:
  files = [part-0001.parquet, part-0002.parquet]

If someone then changes or deletes data, the underlying data does not change. Instead, the table format engine writes

s3://lake/uploads/data/
  date=2025-02-11/part-0003.parquet  # (new replacement data)

and

Snapshot 3:
  files = [part-0001.parquet, part-0003.parquet]

but we can still say

SELECT * FROM uploads AT SNAPSHOT 2;

and get the table exactly as it was before the change.

The table format therefore provides reproducibility and time travel; the system retains a full history of table changes, and we can always read the table as it existed at an earlier point in time.

A table format such as Iceberg or Delta defines what a table is - snapshots, files, schemas, and versions - but it does not provide a global directory of tables. That role is played by the metastore, which is in the layer that sits above the table format. A metastore is a service that keeps track of what tables exist in the lakehouse and how to find them. It maps logical names such as prod.uploads to the underlying table metadata stored in object storage.

When a query engine sees a statement like

SELECT * FROM prod.uploads;

it asks the metastore:

What is prod.uploads, and where does its metadata live?

The metastore responds with something like:

{
  "table": "prod.uploads",
  "format": "iceberg",
  "metadata_location": "s3://lake/uploads/metadata/metadata.json",
  "owner": "ml-team",
  "schema_version": 7
}

The query engine can then read the Iceberg metadata from that location, which tells it which Parquet files make up the current snapshot of the table.

In this sense, the metastore is the directory service of the lakehouse, like an address book.

The metastore is also typically the layer that enforces governance: access controls and authentication live here, along with policies like row-level or column-level security, data classification tags, and audit logs of who read or changed what. The metastore can gate access to tables, mask columns with sensitive data, and enforce retention or deletion rules. This is how consent, licensing, and privacy requirements become concrete rules enforced on data pipelines and users.

The object store, table format, and metastore together define what data exists. The final layer of a lakehouse is what actually uses that data: the query and compute engines.

A query engine or compute engine is responsible for turning a user request (such as a SQL query or a data-processing job) into reads and writes of Parquet files in object storage.

When an engine receives a query like:

SELECT user_id, COUNT(*) 
FROM prod.uploads
WHERE upload_date = '2025-02-11'
GROUP BY user_id;

it performs a sequence of steps:

  1. It asks the metastore what prod.uploads is and where its metadata lives.
  2. It reads the table-format metadata (for example, Iceberg metadata files) from object storage to determine:
  • which Parquet files belong to the current snapshot,
  • how the table is partitioned,
  • and statistics about each file.
  1. It selects only the files that could contain relevant rows (for example, only files in date=2025-02-11).
  2. It reads those Parquet files in parallel from object storage and executes the query logic (filters, joins, aggregations) over the columns.

Different query and compute engines are optimized for different workloads. Trino and Dremio focus on interactive SQL, Spark and Flink handle large batch and streaming jobs, and DuckDB is lightweight and designed for fast data analysis on a single machine.

These query and compute engines are not lakehouse-specific - they can read from other types of data repositories, too. But in the context of a data lakehouse, all of them follow the same contract: they resolve table names through the metastore, use the table format to determine which files belong to a table and which snapshot is current, and then read and write Parquet files in object storage under the control of that metadata.

As with traditional databases, many cloud service providers offer managed services to support data lakehouses, and there are also open-source implementations that can be hosted on-premise:

Warning

Cloud service offerings in this change quickly. Product names, feature sets, and even the definition of what a provider considers a “lakehouse” can shift over time, so do not assume any list of services (including this one) will stay up to date.

Layer Open source (self-hosted) GCP AWS Azure Databricks
Object store MinIO Google Cloud Storage (GCS) AWS S3 Azure Blob Storage Customer’s object storage
Table format Apache Iceberg, Delta Lake, Apache Hudi Google BigLake (Iceberg) Apache Iceberg* Apache Iceberg,
Delta Lake*
Delta Lake
Metastore / catalog Hive Metastore, Nessie Dataplex Universal Catalog AWS Glue Data Catalog Microsoft Purview Unity Catalog
SQL & compute engines Spark, Trino, Flink, DuckDB, Dremio BigQuery, Dataproc (Spark, Trino) Athena (Trino), EMR (Spark), Redshift Spectrum Synapse, Fabric, Trino Managed Spark + Databricks SQL

* The table format layer itself is cloud-agnostic; the entries here indicate which open-source table formats are supported by each provider’s managed lakehouse services, i.e. the higher layers. Similarly, the query and compute engines listed are managed versions of popular open-source systems.

Databricks is a cloud-agnostic data platform that runs on GCP, AWS, or Azure; it bundles the Delta Lake table format, Unity Catalog metastore, and Spark + SQL for compute.

4.5.3 System design questions

Some questions to consider when designing a data system are:

  • What are the distinct types of data in the system (application state, training data, analytics), and what are their requirements for latency, concurrency, and history?
  • Which data must be updated in small, reliable CRUD transactions, and which data will be scanned and aggregated across large time windows?
  • How important is reproducible training, and how will it be supported (for example, keeping snapshots of datasets and being able to retrieve the exact version used for a given model)?
  • How will lineage be preserved from raw sources through derived datasets, features, and model artifacts, so that past outputs can be explained and audited?
  • Which data belongs as blobs in object storage, and what repository layer will organize and govern it (metadata DB, catalog, lakehouse table format, model registry)?
  • What governance rules must be enforced (access control, retention/deletion, licensing/consent), and where will those rules live so they follow the data through pipelines?

Considering the discussion above, we can see that an ML system’s data infrastructure will depend on its operational and scientific requirements.

For a small team moving fast on one or two datasets, where experiments are informal, history is rarely revisited, and one group controls the whole pipeline, an infrastructure like this:

  • application state in a relational database
  • training images or text stored as files in object storage + a simple metadata database on top

may be sufficient and avoids the overhead of more elaborate systems.

But as requirements grow - the arrival of messy data, labels being revised, multiple models trained from the same pool of examples by different teams, or the need to reproduce and explain past results - stronger guarantees become essential. To address this, we might add a data warehouse, so the infrastructure becomes:

  • application state in a relational database
  • analytics data, including a history of system events derived from logs or event streams, in a data warehouse
  • training images or text stored as files in object storage + a simple metadata database on top

As training and analytics mature, this split begins to break down. The same historical records - model outputs, user edits, moderation actions - are needed both when training models, and to answer analytical questions about system behavior. Maintaining a separate data lake for training files and a data warehouse for analytics means copying data between systems, keeping schemas in sync, and reconciling different versions of the truth. A data lakehouse addresses this by letting training and analytics share the same versioned, governed data on top of object storage. The infrastructure becomes:

  • application state in a relational database
  • raw images, text, and model artifacts in object storage
  • training and analytics datasets represented as versioned tables on that same object storage, with a catalog and query engines on top

This allows the organization to keep all raw data, all labels, and all model outputs in one place with time travel, lineage, and access control, while supporting both large analytical queries and reproducible model training.

4.6 Data pipelines

Eventually, we want our data to end up in a data repository, in a state where we can query it and use it for business purposes, e.g. analytics or model training. A data pipeline does exactly that: it gets data from a source, may modify it into a more useful form, and then stores it in a data repository in a ready-to-use form.

Conceptually, most pipelines follow the same flow:

  1. A source system produces data (a row is inserted, a user clicks a button, a file is uploaded).
  2. Ingestion captures that data reliably, with enough context to interpret it later.
  3. Storage and repository layers organize the captured data into queryable tables and files.
  4. Processing jobs transform raw inputs into cleaned datasets, aggregates, and features.
  5. Control and operations layers enforce quality, access, and reliability as data moves.

In many production systems, a pipeline exists to produce one or more data products. A data product is what downstream teams rely on: for example, a curated table, a documented view, or a feature dataset with a well-defined schema, clear semantics (what fields mean), and known expectations around freshness and quality.

However, not every pipeline output is a data product. Some pipelines mainly produce intermediate artifacts (raw ingested logs, staging tables), operational outputs (alerts, counters, caches), or one-off extracts. These outputs can support data products, but they are not necessarily data products themselves.

4.6.1 Data sources and ingestion

An ML system may have many sources of data, but two broad categories show up repeatedly:

  • First-party sources: data generated by our own product (user actions, uploads, edits, model predictions, moderation decisions).
  • External sources: data we pull from someone else (partner feeds, third-party APIs, web crawls, public datasets).

Ingestion is the step that turns those inputs into durable records; records that will not disappear if something crashes or restarts. A useful mental model is: ingestion is to a data system what a commit is to a codebase.

All of the following are examples of ingestion:

  • A user uploads an image file: the bytes land in object storage, and the upload event lands in an event log with a user_id, upload_id, timestamp, and the object URL.
  • A support agent changes a ticket status: the current status lives in the application database, but an append-only history of changes (who changed what, when) is captured for analytics and training.
  • A service calls an external API to do enrichment (for example, turning an address into latitude/longitude): the raw API responses are saved, and parsed fields are written into a table.

There are several possible ingestion patterns. The right one depends on what the data source lets us do (does it provide a full file export? does it provide an API? does it provide a live event stream?), and what we need from it (how fast do we need updates? how much history do we need?).

For example, suppose we want to ingest data about 311 Service Requests from NYC Open Data. 311 responds to thousands of calls requesting non-emergency city services every day; the dataset is updated daily to reflect new and updated requests from the last day.

In a full load, each day after the data is updated, we would download the entire dataset export and write it to object storage under a date-stamped path like s3://lake/raw/nyc_311_requests/date=2026-01-12/311_requests.csv. (Downstream stages in the pipeline, after ingestion, might work with the data to e.g. extract the new or updated rows, but that’s not at ingestion.) This approach is simple to reason about (we always have a complete copy), and it works well when the dataset is not too large. But, the obvious downside is cost and latency: we reread and rewrite the whole dataset even if only a small fraction of rows were added or changed that day.

In an incremental load, rather than downloading the full 311 dataset every day, we would use its API to fetch only records that are new since the last run. For example, a single record returned by the API for NYC 311 data looks like this (simplified, some fields omitted for brevity):

{
  ":id": "row-b6h6.ndd5~6nqn",
  ":created_at": "2026-01-11T02:31:37.691Z",
  ":updated_at": "2026-01-11T02:33:41.134Z",
  "unique_key": "67446372",
  "created_date": "2026-01-10T02:21:17.000",
  "agency": "NYPD",
  "complaint_type": "Noise - Residential",
  "descriptor": "Loud Talking",
  "incident_zip": "11237",
  "status": "In Progress"
}

Notice that there are two different notions of time:

  • created_date is when the 311 request happened in the world (the business event time).
  • :created_at and :updated_at are when the data portal created or last updated this record in the dataset.

To do an incremental load from this API, we typically keep a watermark, such as “the largest :updated_at we ingested last time”. On the next run, we query the API for only records where :updated_at is greater than that watermark. This returns both brand new records and corrections to older records (because corrections change :updated_at even if the original created_date was weeks ago).

Note that relying only on created_date would miss updates: a request created yesterday might be updated today with a new status or corrected location fields. If we only had created_date and no update date, we would need to include an extra lookback window when loading data, so that we re-fetch the last N hours or days of records by created_date on every run, where N is chosen based on how long after creation a 311 request can still be updated. For example, if requests are often updated for up to 7 days, we would use a 7-day lookback. Downstream stages in the pipeline, after ingestion, would then deduplicate by unique_key, keeping the record with the latest values.

For example, suppose we load data from an external source at the time indicated on the timeline in the diagram below. Records with created_at before this time are expected to be loaded. However, because time is continuous and publishing can lag, the pink record outlined with a dashed border is ambiguous: it may have a created_at earlier than our watermark timestamp, but still be missing from this load because the API was not serving it yet.

Incremental load using an updated_at watermark

If we use a lookback window, then the next load re-reads records with created_at between “watermark - lookback” and “watermark”, and also reads new records up to the new run time. This means we will capture that ambiguous record if it was missed earlier. This also helps us capture some updates even when we are filtering only by created_at: if a record was created recently enough to still fall inside the next run’s lookback window, we can ingest its newer version (like the blue record). But once a record is older than the lookback range, later updates (like the green record) are missed unless we also have an updated_at field.

Incremental load using a created_date lookback window plus deduplication

Whether we do a full load or an incremental load, the NYC 311 data is only updated once per day, so we cannot get new data more frequently than that. For some other types of data, though, we may be able to ingest a real-time event stream. For example, the MTA publishes real-time updates about subway and bus service through its API feeds (see MTA API). These feeds provide a continuous stream of updates (vehicle positions, service alerts) throughout the day. An ingestion service can poll the feed at regular intervals, and append each batch of updates to an event log in object storage (for example, partitioned by minute). Downstream stages in the pipeline can then (1) build a “current state” table used by an application or dashboard (latest arrivals by station), or (2) replay the same stream later to recompute features such as “average delay by line over the last week” or to debug an incident after the fact.

The examples above involved an external source, where we are the consumer of the data and do not control the producer. For internal sources, we control the producer. This means we have more flexibility, especially in a greenfield design, because we can decide up front how the application records changes and how the pipeline consumes them.

Internal data starts as application state in our own systems: a user signs up, an order is placed, a profile is updated, a model produces a prediction, a moderator changes a status. We then ingest a history of those changes into our analytics and training repositories so we can answer questions about system behavior over time and build training datasets without putting heavy load on the application database.

Some ingestion patterns for internal data include:

  • Export periodic snapshots: for example, if we retrain once per day, we may export a nightly snapshot of relatively slow-changing application data tables such as users or products into object storage, and then build daily training tables from those snapshots. This is a real and common approach when freshness requirements are low and simplicity is a higher priority.

  • Land internal file drops in object storage: for example, suppose our application stores user-uploaded media (images/audio) as files in object storage as part of its normal operation. Ingestion in this case means (1) noticing and recording that a new object exists at a specific URL/path, and (2) writing a small metadata record that makes the file queryable and governable (for example, file_id, object_url, uploaded_at, user_id, content_type, and permissions). Downstream jobs then use the metadata table to find the right files, join them to events and labels, and build training datasets without ever copying the raw bytes into a relational table.

  • Write events to an event log: for example, when our application processes a user action (sign-up, purchase, caption edit), it emits an event like user_signed_up or order_placed and appends it to the log. Downstream consumers use that log to build analytics tables and ML training datasets.

  • Write directly into lakehouse tables: for example, instead of exporting from the application database, we run an internal ingestion service (a small service we operate whose job is to collect events from our own systems, validate them, and write them into the data repository). The application database remains the system of record for serving the product, but the ingestion service writes append-only rows into governed lakehouse tables (for example, raw.user_events, raw.model_predictions) for analytics and training. Downstream jobs then treat the lakehouse as the first place to read analytics/training data from, rather than repeatedly querying the application database.

  • Change data capture (CDC): we use this when our application database (for example, PostgreSQL) is the system of record for application state, but we want a time-ordered history of changes for analytics and training. CDC reads the database’s internal change log and emits each insert/update/delete as a change event, effectively turning the application database into a kind of event log. We can then ingest that change stream into a data repository, without requiring every workflow that writes to the application database to also publish events to the event log.

CDC deserves special attention because it turns operational state into a time-ordered log of changes. Instead of repeatedly taking snapshots of “What does the table look like right now?”, or requiring every workflow that updates the table to also write to an event log, CDC plugs directly into the database and tells you “Here are the inserts, updates, and deletes that happened, in order.”

Many databases already record every change internally in an append-only write-ahead log (WAL). CDC tools read that log and emit change events downstream. Each event is a fact like “row with primary key X was inserted” or “row X was updated from A to B”. This is sometimes called log-based replication.

It is helpful to understand the shape of a CDC change event. The exact schema depends on the tool (e.g. Debezium), but the key fields are usually:

  • source_table: The table that changed (for example, deliveries), so you can route changes to the right downstream job.
  • op: The operation type (insert, update, delete), so downstream tables can apply the change correctly.
  • primary_key: The record identity (for example, delivery_id=d_9012), so updates apply to the right entity.
  • before: The previous values (often included for updates/deletes), which is useful for audits and for maintaining history.
  • after: The new values (often included for inserts/updates), which is what most derived tables use.
  • db_commit_time: When the database committed the change, which is what CDC preserves and orders.

The most important thing to notice is that CDC gives you “what changed” plus “when the database accepted it”. That is different from the business time inside the record (like an event_time from a client device), but both can matter.

CDC is relevant for ML pipelines for three practical reasons. First, it helps us stay time-correct: we can reconstruct what the system knew at a given moment and avoid data leakage. Second, it is efficient: instead of repeatedly copying tables just to find what changed, we can process only the new inserts/updates/deletes. Third, it lets batch and real-time jobs share the same underlying history of changes: we can build daily training tables from the change stream while also updating faster derived views from that same stream.

NoteCDC hypothetical: food delivery service

To make this concrete, suppose we run an on-demand food delivery service. We want to predict the delivery time at checkout, and we also want to update this prediction over the lifetime of the order to keep the user informed about when to expect delivery.

During operation, data from the live service will become training data used to re-train models, and analytics data used to evaluate model performance.

Our application database is the system of record for live operations. For example, we might have a transactional table deliveries keyed by delivery_id, which is updated by multiple internal workflows over the lifecycle of a delivery:

  • Checkout creates the delivery (address, items, promised delivery window).
  • Dispatch assigns a courier, and may later reassign if the courier cancels or is delayed.
  • The courier app reports status changes such as “arrived at restaurant”, “picked up”, and GPS pings.
  • Support tools can fix addresses, mark cancellations, or correct timestamps.

By the time a delivery is complete, we know many fields such as:

  • promised delivery time vs actual delivered time
  • time spent waiting at the restaurant, time in transit, reassignment count
  • final status (delivered, canceled, returned)

So, at the end of a delivery we might have a row like this in deliveries:

delivery_id created_at promised_by courier_id pickup_at dropoff_at reass_ct status
d_9012 2026-01-10 18:02 2026-01-10 18:45 c_77 2026-01-10 18:19 2026-01-10 18:53 0 delivered

However, when we make a prediction (at checkout time), we do not know who the courier will be and whether the courier will need to be reassigned, and we do not yet know the pickup time. Furthermore, besides for the application data related to the delivery itself, we will use other context at the time we make a prediction:

  • average delay for this restaurant over the last hour (signals kitchen backlog)
  • average delay in this neighborhood over the last 15 minutes (signals weather or traffic)
  • current load of available couriers vs open orders (signals dispatch pressure)

All of these features change over time. The courier availability we use at checkout is not the same as what we would use 10 minutes after pickup, and it is not the same as the final value at delivery time. This is why we need to capture what was known (including aggregate features) at each prediction time, not just the final delivery row.

The key data systems problem is that different fields become known at different times. We can’t train a model using a single “final” row per delivery that includes fields like picked_up_at and delivered_at; if we did, we would be giving the model information that would not have been available at prediction time (this is a kind of data leakage).

Now suppose we want predictions at multiple anchor points, for example:

  • at order creation
  • at courier assignment
  • 10 minutes after pickup
  • 15 minutes before the promised delivery time

To build a correct training set, we instead need one training example per delivery per anchor point, reflecting what was known at that time. For example, for one delivery_id, the training table might look like:

delivery_id anchor time courier_id pickup_at rest_delay_1h total_min
d_9012 order_created 2026-01-10 18:02 (unknown) (unknown) 6.2 51
d_9012 courier_assigned 2026-01-10 18:05 c_77 (unknown) 6.8 51
d_9012 after_pickup_10m 2026-01-10 18:29 c_77 2026-01-10 18:19 9.1 51

where total_min is the label, and the other fields show some features that are available at that anchor time. (Some features are omitted for brevity.)

CDC helps here because it captures the sequence of updates to the deliveries table as they happen. Instead of periodically exporting “the current state of deliveries”, CDC emits a change stream that includes events like “delivery created”, “courier assigned”, “status changed to picked up”, and “delivered_at written”. We can then reconstruct what the row looked like at each anchor point by replaying changes in commit order, and we can compute or join the time-based aggregates as of that same anchor time. This is exactly the kind of use case where a time-ordered history of internal application state is necessary to build correct ML training data.

In addition to reliably capturing events and changes, ingestion is also where governance enters the pipeline. Governance requirements are usually described in plain language (“Do not use data from minors for training”, “Delete a user’s data within 30 days”, “Only the fraud team can access raw card transaction logs”), but the pipeline can only follow those rules if the data carries the right metadata.

One common pattern is to attach governance metadata to each ingested record (or to each file/partition, for file-based ingestion). In practice, we usually implement this in one of two ways.

In one approach, we add the metadata as extra columns on our raw table. For example, a raw_events table has columns like consent_status and region next to the event payload. This is easy to query because everything is in one place.

In an alternative approach, we keep the metadata in a separate table such as ingestion_metadata, where each row describes one ingested record. We connect the metadata to the raw data by storing the same record_id in both tables. This is useful when we have many raw tables and want one consistent place for policy fields.

  • record_id: Stable ID for the ingested record so you can trace, audit, and delete it later.
  • source_system: Where the record came from (for example, app_db, pubsub_topic.user_events, partner_api.acme_v1) to support audits and incident response when a source is wrong.
  • ingested_at: When the pipeline captured the record (useful for freshness checks and debugging delays).
  • event_time: When the event happened in the real world (critical for building leak-free training sets and time-correct features).
  • data_category: High-level category used by policy (for example, personal, sensitive, non_personal) to drive access control and masking rules.
  • consent_status: Whether you are allowed to use the record for specific purposes (for example, granted, denied, unknown) so training pipelines can filter out data that is not permitted.
  • consent_version: Which terms/policy text the user agreed to, so you can answer “What did we think we were allowed to do at the time?”
  • allowed_purposes: A list like ["service", "analytics"] vs ["service", "analytics", "training"] so the same raw data can be used for some workflows but not others.
  • region: The jurisdiction that should control handling (for example, us, eu) so storage location, retention, and access can follow regulatory boundaries.
  • retention_until: The timestamp after which the record must be deleted or anonymized, so retention enforcement can be automated.
  • delete_request_id: Links a record to a user deletion request so you can prove deletion propagated through derived datasets.
  • owner_team: The accountable team for the dataset (who approves access, who is paged when quality breaks).

To summarize, ingestion is not just about moving data from one place to another. It is where we decide what the system will remember (full snapshots vs incremental updates vs streams), how we will handle late or corrected data (watermarks, lookback windows), and how we will keep the record time-correct so we can later build training data that reflects what was known at prediction time. It is also where we attach the metadata that makes downstream use safe: if consent, retention, region, and ownership are not recorded at ingestion, they are hard to reconstruct later.

Once data is ingested in a reliable and policy-aware way, the next question becomes how we process it.

4.6.2 Batch and real-time processing

You may have noticed that in some of our ingestion patterns, we ingest many rows in bulk, potentially hours or days after the business time when they occurred. In other ingestion patterns, we ingest one event at a time as it occurs in business time. These categories reflect two broad data paradigms that apply throughout the pipeline, not only at ingestion: batch and real-time.

  • Batch processing runs on a schedule. We let data accumulate, then run a job that reads a lot of it at once, transforms it, and writes a result table.
  • Real-time processing runs continuously (or very frequently). As new events arrive, we update derived results right away.

When data is ingested as a batch (daily snapshots, periodic bulk updates), we’ll typically process it as a batch. We don’t see individual events as they occur, so we won’t process them that way, either. If data is ingested as a stream (event stream or CDC change stream), we can process it in real-time (update outputs as events arrive) or in batch (periodically read a chunk of the stream and compute results). As a rule of thumb, if we only need updates every few hours or once per day, batch is usually the simplest choice. If the product needs fast feedback (user-facing updates, alerting, or continuously updated features), we need real-time processing.

Here are a few common patterns for processing data on the batch-to-real-time continuum in practice:

  • Batch only: we ingest data (often as files or daily API updates) and run scheduled jobs to produce the tables we need. This is common for processing external data sources that are updated infrequently, daily retraining, offline evaluation, and dashboards that do not need minute-by-minute updates.

  • Frequent batch (near real-time): we run batch jobs more often, such as every 5 or 15 minutes. This is sometimes called micro-batching. For example, we might recompute “average order preparation time by restaurant in the last hour” every 5 minutes instead of continuously updating it for every single event.

  • Stream for fast views, batch for historical rebuilds: we use real-time processing to keep a “current state” table fresh (latest ETAs, current system health), but we also write the raw events to a repository so we can recompute things later. For example, in the food delivery service, we might consume delivery_events and courier_locations to keep a table of “latest ETA per delivery” up to date for the app. In parallel, we write those same raw events to a lakehouse. If we later discover that we were parsing some location field incorrectly for a week, we can replay that week of raw events and rebuild the ETA history and related aggregates.

  • Stream plus batch training: we keep real-time features up to date for user-facing predictions, but we still build training datasets in batch. For example, a real-time job might update the latest ETA shown to a user whenever a courier location ping arrives. Separately, each night we run a batch job that joins the full event history with the final outcome (dropoff_at) to create labeled training examples (for example, one row per delivery per anchor point with total_min). This lets training stay reproducible and time-correct even though the user-facing product uses continuous updates.

  • Stream only (no batch): some systems process events in real-time and never schedule batch jobs. This is most realistic when the main goal is a live view of the world and we do not need historical rebuilds or model training. For example, a station arrival display might consume an MTA update stream and continuously maintain “latest arrivals by station” in a serving database.

Batch processing is usually implemented as scheduled jobs that read from a data repository (files or tables), run transformations, and write new files or tables. That is, the data repository is the “interface” for batch processing.

Conceptually, batch jobs pull data at rest from repositories: we read a chunk (often a time slice), compute, and write a new chunk. This works well, because batch jobs are relatively infrequent and are not very time sensitive. There are still some operational details to consider to make batch jobs work reliably. We’ll revisit some of these (scheduling, retries, monitoring, and rebuilding historical outputs) a little later.

In contrast, the data repository is usually a poor “interface” for real-time processing, because real-time services work with data as a stream: events flow continuously, and downstream services react continuously. If we try to treat a data repository like a stream, we usually end up polling for changes (“did anything new happen since the last query?”), which adds load and latency. Or we end up writing and reading many tiny updates, which is inefficient for repositories that are optimized for large, sequential reads and writes.

Aside from these operational concerns, the deeper difference is that many repositories are queried as “current state”, while streams preserve the full history of how that state evolved. For example, imagine a system that assigns seats on a flight. Suppose seat 12A is assigned to Alice at 10:01, then reassigned to Bob at 10:02, and then reassigned again to Carol at 10:03. If we query a seat assignment table at 10:05, it will usually only show the latest state (Carol). Without a history of changes, we cannot explain why Bob’s boarding pass printed at 10:02:30 says 12A, or debug what went wrong when Alice complains that she was sure she had a window seat. An event stream preserves the sequence of assignments, so we can reconstruct what was true at 10:02:30.

For real-time data, we need something more like a pipe that the stream can flow through. A producer (for example, an external real-time API feed, CDC reading a database’s change log, or a service that outputs business events) puts events into the pipe, and a consumer reads some or all of the flow and uses it to do work. The pipe acts as a buffer between them.

In real-time systems, a message broker is often used to provide that shared event pipe.

One benefit of the shared event pipe is that it decouples producers and consumers. We can have many producers and many consumers, and they do not have to all know about each other. They only need to know about the broker. For example, in a ride-sharing app, the trip service can publish a trip_started event without knowing whether the consumers are billing, safety monitoring, driver incentives, analytics, or all of the above. New consumers can be added without changing the trip service.

broker_decoupling p1 Producer A broker Message broker p1->broker publish p2 Producer B p2->broker publish p3 Producer C p3->broker publish c1 Consumer 1 broker->c1 deliver c2 Consumer 2 broker->c2 deliver c3 Consumer 3 broker->c3 deliver

Another benefit is that we can produce an event once and then many consumers can receive it in parallel. For example, when a bank transaction occurs, the same event can feed fraud detection, account balance updates, user notifications, and compliance logging at the same time.

broker_fanout producer Producer broker Message broker producer->broker publish once c1 Consumer 1 broker->c1 deliver c2 Consumer 2 broker->c2 deliver c3 Consumer 3 broker->c3 deliver

The broker also gives consumers flexibility to consume events at whatever schedule works for them. Consumers do not have to be ready at the exact moment an event is produced, because the broker can hold onto the event until they are ready to process it. For example, a search product might publish click events continuously, but a reporting job might only read them every 5 minutes to update a dashboard.

broker_flexible_timing producer Producer broker Message broker (keeps events) producer->broker publish now consumer Consumer broker->consumer consume later note Consumer reads later (for example, every 5 minutes).

If a consumer is temporarily unavailable, data is not lost. The consumer can come back and catch up by reading the events that the broker held onto. For example, if an email notification service is down for 10 minutes, users might get delayed emails, but the events will still be waiting and can be processed when the service recovers.

broker_catchup producer Producer broker Message broker (keeps events) producer->broker publish consumer Consumer broker->consumer X broker->consumer deliver later note If the consumer is temporarily down, the broker keeps events so it can catch up later.

Many brokers also keep events even for days or weeks, which is called retention. Retention is what makes replay possible. If we fix a bug in our parsing or feature logic, we can reread older events and rebuild a derived table instead of hoping the only copy was correct the first time.

One practical detail is that some consumers want every event (for example, a job that writes all raw events to the lakehouse), while others only need the latest state (for example, latest ETA per delivery). A broker helps with both: the first consumer reads the full stream, while the second consumer reads the stream and maintains a separate table of current values.

There are two major messaging patterns that a message broker might use:

In the publish-subscribe (pub/sub) pattern, we publish events like order_created (with data such as order_id=9012) to a named topic, and multiple consumers can subscribe to it. In a subscription, delivery is “at least once”: the broker will retry until a consumer acknowledges receipt. (This means a consumer might occasionally see the same event more than once, so consumers are usually designed to be safe to retry.) This pub/sub pattern is used very often for analytics and ML pipelines, because the same event is useful for many downstream uses.

pubsub_pattern cluster_broker Message broker p1 Producer A topic_orders Topic: order_events p1->topic_orders order_created (order_id=9012) p2 Producer B topic_clicks Topic: click_events p2->topic_clicks click (item_id=456) c1 Consumer 1 c2 Consumer 2 c3 Consumer 3 topic_orders->c1 order_created (order_id=9012) topic_orders->c2 order_created (order_id=9012) topic_clicks->c2 click (item_id=456) topic_clicks->c3 click (item_id=456)

In the work queue pattern, we enqueue messages representing tasks that are intended to be done once, and a pool of workers picks them up. For example, “resize this image”, “send this notification”, or “recompute the ETA for delivery d_9012”.

work_queue_pattern cluster_broker Message broker producer Producer queue_resize Queue: image_resize_tasks producer->queue_resize resize (image_id=123) producer->queue_resize resize (image_id=456) queue_email Queue: email_send_tasks producer->queue_email send_email (email_id=9001) c1 Consumer 1 c2 Consumer 2 c3 Consumer 3 note Each task goes to one consumer. queue_resize->c1 resize (image_id=123) queue_resize->c2 resize (image_id=456) queue_email->c3 send_email (email_id=9001)

NoteMessage broker hypothetical: food delivery service

Continuing the same food delivery example from the ingestion section, we can use a message broker to move order and courier events through the system in real-time.

In this kind of system, services publish events as orders move through their lifecycle (created, assigned, picked up, dropped off). A message broker gives those services a shared event pipe: instead of each downstream system polling the application database for changes, each downstream system reads the events it needs.

Here is an example of what an order_created event might look like in the food delivery service:

{
  "event_type": "order_created",
  "event_time": "2026-01-10T18:02:13Z",
  "delivery_id": "d_9012",
  "user_id": "u_104",
  "restaurant_id": "r_33",
  "delivery_zone": "brooklyn_04",
  "promised_by": "2026-01-10T18:45:00Z",
  "items_count": 4,
  "subtotal_cents": 3899
}

Events are usually organized into topics. A topic is like a channel: producers publish events to it, and consumers subscribe to receive them.

For example, in a food delivery service we might use topics like:

  • delivery_events: lifecycle events such as order_created, courier_assigned, picked_up, delivered, canceled.
  • courier_locations: periodic location updates from the courier app (for example, one event every few seconds while on an active delivery).
  • eta_predictions: the model’s own outputs (predicted delivery time, confidence, and which model version produced it), emitted whenever the ETA is computed or updated.
  • support_actions: changes made by support tools (address corrections, cancellations, refunds), so analytics and training pipelines can see interventions that did not come from the main application workflow.

Each topic typically has multiple consumers, because different parts of the system need the same facts for different reasons:

  • delivery_events
    • A lakehouse or warehouse writer records every event so we have a complete history for analytics and training.
    • A “current order state” updater maintains the latest status per delivery_id (so the app can show “courier assigned” without scanning the whole history).
    • A monitoring consumer watches for unusual rates of cancellations, long gaps between picked_up and delivered, or other signs something is broken.
  • courier_locations
    • A live ETA updater combines location updates with route and traffic information to refresh ETAs.
    • A map view service updates the courier dot on the user’s order tracking screen.
    • A safety or anomaly detector flags impossible jumps (for example, a courier “teleporting” across the city), which can indicate GPS bugs or fraud.
  • eta_predictions
    • The product database (or a dedicated serving store) stores the latest ETA shown to the user.
    • An analytics consumer logs what the model predicted at each point in time so we can later measure error and compare model versions.
    • A model monitoring consumer checks for changes in prediction behavior (for example, ETAs suddenly becoming much longer for one borough).
  • support_actions
    • A lakehouse or warehouse writer records these actions so training and analytics do not incorrectly assume all changes came from normal product flows.
    • A policy and audit consumer verifies that sensitive actions (refunds, cancellations, address edits) are attributable to an operator and have the right approvals.

This means the message broker does not replace the application database as the system of record. Instead, it provides a shared stream of operational facts that multiple systems can react to.

Some consumers maintain small, fast “current state” tables keyed by delivery_id (latest status, latest ETA, latest courier location) so user-facing product surfaces can update quickly.

In parallel, we can keep an append-only history of every event in a data repository so we can later build training tables and analytics that reflect what was known at each point in time (for example, at checkout vs after pickup), compute labels like the final delivery duration, and evaluate model performance without data leakage.

A message broker gets events from producers to consumers quickly and reliably. The next question is what those consumers do with the events. When a consumer reads events continuously and updates some derived output as each event arrives, we are doing stream processing. In other words, the broker is the pipe, and stream processing is the computation that runs over what flows through the pipe.

Stream processing shows up any time we need a result that is “always up to date” rather than “updated on a schedule”. The output might be stored in a normal database table when we want it to be queryable and durable, or in a fast key-value store when we want very low-latency reads. For example, in a transit app we might consume live MTA updates and keep “next arrivals by station” fresh in a serving store. In an online store, we might consume inventory and order events and keep “available stock by SKU” up to date so we do not oversell. In an incident response system, we might consume service health events and trigger alerts within seconds.

Stream processing is often used for two kinds of outputs. One output is a latest-value view keyed by an ID, like “latest status per delivery_id” or “latest location per courier”: when a new event arrives, we update the current value for that key. Another type of output is a rolling aggregate over a moving window, like “cancellations per minute” or “average pickup delay in the last 30 minutes”. These rolling metrics are useful both for product behavior (for example, showing a banner when delivery times are unusually high) and for monitoring (for example, paging on-call when something spikes).

For ML systems, stream processing is a common way to keep online features and user-facing predictions fresh. For example, in a fraud detection system, each card transaction event can update a user’s “recent spend in the last 10 minutes” feature and trigger an updated risk score that is written to a low-latency serving store. At the same time, we usually keep an append-only history of the raw events and the model outputs in a data repository, so we can later build training tables, compute labels (for example, whether the transaction was later confirmed as fraud), and evaluate model performance using what was known at the time.

Cloud service providers offer both message brokers and stream processing as managed services, or we can self-host open-source systems on compute (on-premise or in the cloud). The table below lists some options:

Category Open source (self-hosted) GCP AWS Azure
Message broker Apache Kafka, Redpanda Cloud Pub/Sub Kinesis Data Streams, MSK Event Hubs
Stream processing Apache Flink, Spark Structured Streaming Dataflow Managed Service for Apache Flink Stream Analytics

4.6.3 ETL and ELT

Once data is ingested, we still need to turn it into datasets that are easy to query and safe to use. The core question is where transformations happen: do we clean and join data before it enters our repository, or do we load raw data first and then transform it inside the repository?

One common approach is ETL, short for extract-transform-load. In ETL, we extract data from one or more source systems into a staging area that has enough capacity for the job. We run transformations on the raw data, and then load the transformed outputs into the repository that downstream users will use. The staging area is often temporary: we might keep raw extracts for a short time in case we need to debug or rerun the job, but the long-term datasets live in the repository.

ETL cluster_repos cluster_sources S1 Data source 1 (third party API) ST Staging area (raw extracts) S1->ST Extract S2 Data source 2 (operational DB) S2->ST Extract S3 Data source 3 (files) S3->ST Extract ST->ST Transform R1 Data repository 1 (lakehouse) ST->R1 Load R2 Data repository 2 (warehouse) ST->R2 Load

For example, suppose we are building a flight delay prediction system. Each day we ingest operational flight data (scheduled times, origin/destination, carrier), plus external context (weather observations, airport status). In an ETL design, we might:

  • Extract yesterday’s flight records from an internal database into a staging bucket (raw CSV or Parquet).
  • Extract yesterday’s weather observations from an external API into the same staging bucket.
  • Run a transformation job that parses timestamps, normalizes airport codes, deduplicates corrections, joins flights to weather snapshots, and computes aggregate delay features over time windows.
  • Load the transformed result into a warehouse table like analytics.flight_delay_features_daily.

By the time data lands in analytics.flight_delay_features_daily, it already matches our expected schema and quality checks.

Another common approach is ELT, short for extract-load-transform. In ELT, we load raw data into the repository first, and then run transformations inside the same compute environment that we will use for queries. The transformed data is loaded back into the repository.

ELT cluster_repo Data Repository cluster_sources S1 Data source 1 (third party API) Rraw Raw tables/files S1->Rraw Extract + Load S2 Data source 2 (operational DB) S2->Rraw Extract + Load S3 Data source 3 (files) S3->Rraw Extract + Load Rcurated Curated tables Rraw->Rcurated Transform

In this model, the repository becomes the main place where refinement happens: raw data stays available for debugging and backfills, and transformations become first-class, versioned data products.

ETL and ELT reflect a practical cost and flexibility tradeoff. Historically, when storage and warehouse compute were more expensive, teams often preferred ETL: we decided up front which fields we needed, transformed them into a well-defined schema, and loaded only those curated tables into the repository. This kept storage and query costs down, and it encouraged careful data modeling. The downside is that transformations can be lossy. If we did not keep the original raw payload and later realize we need an additional field (or a different parsing rule), we may not be able to recover it. Sometimes we can re-pull the raw data from the source, but sometimes we cannot (for example, if the source only keeps a short history or if the API has changed).

ELT becomes attractive when raw storage is cheap and we expect requirements to change. We can land the raw data first and decide later how to shape it. This makes it easier to add new columns, rebuild tables with new logic, or answer a new question without changing ingestion. The risk is that “land raw now, model later” turns into “land raw forever.” If raw data piles up as a single huge semi-structured table and we do not follow through on building structured, query-friendly outputs, query times and costs can grow quickly, and downstream users end up repeatedly re-parsing the same raw payloads. For example, we might ingest a high-volume API feed into a single raw table like raw.api_events with a payload_json column that stores the full response for each event. This is convenient at the start, because ingestion is simple and we have the full source data if we need to reinterpret it later. But as the table grows to terabytes, every downstream query that needs a few fields has to scan and parse that large semi-structured column, which adds cost and reduces velocity. In practice, ELT works best when we treat raw tables as an input layer, not an end state, and we actively maintain cleaned and curated tables that downstream workloads can use efficiently.

ELT is popular with modern warehouses and lakehouses because they can store large raw datasets cheaply and run scalable transformations close to the data. Many lakehouse deployments organize ELT transformations using a medallion architecture, where data is progressively transformed over three stages: bronze (raw), silver (cleaned/conformed), and gold (curated for analytics and ML).

medallion cluster_repo Data Repository cluster_sources S1 Data source 1 (third party API) Rbronze Bronze (raw tables/files) S1->Rbronze Extract + Load S2 Data source 2 (operational DB) S2->Rbronze Extract + Load S3 Data source 3 (files) S3->Rbronze Extract + Load Rsilver Silver (cleaned tables) Rbronze->Rsilver Transform Rgold Gold (curated tables) Rsilver->Rgold Transform

To make this concrete, here is what bronze, silver, and gold might look like for our flight delay prediction system:

  • Bronze (raw): tables that are close to the source, with minimal changes beyond parsing and metadata.
    • bronze.flights_api_responses: one row per API response or file drop, with the raw payload and fields like ingested_at and source_system.
    • bronze.weather_observations: raw weather rows as received from the provider, even if some fields are missing or change names over time.
    • bronze.airport_status_updates: raw airport status events (closures, delays, advisories), append-only.
  • Silver (cleaned): tables with consistent types, cleaned values, and stable keys that make joins reliable.
    • silver.flights_clean: one row per flight leg with typed timestamps, normalized airport codes, and a stable flight key.
    • silver.weather_clean: weather observations with consistent units, consistent station identifiers, and invalid readings filtered.
    • silver.flight_status_asof: a time-correct “as of” view keyed by (flight_id, as_of_time), where each row captures what the system knew about a flight at a specific anchor point (for example, 24 hours before departure, 1 hour before departure, or 10 minutes after departure).
  • Gold (curated): tables that are directly consumed by analytics, training, or product workflows.
    • gold.flight_delay_training_examples: one row per flight per anchor point, with features computed at that time and a label like arrival_delay_min.
    • gold.flight_delay_metrics_daily: aggregates used for monitoring and analysis, such as error by route, carrier, and airport.

Notice that gold tables are not just cleaner versions of bronze. They represent decisions about what a training example is (flight + anchor point), what a label is (delay minutes), and what joins and time semantics are valid. Keeping raw (bronze) and cleaned (silver) data alongside curated (gold) outputs makes those decisions explicit and makes it possible to rebuild and audit datasets when requirements change.

ETL and ELT are often described with batch jobs, but the same question shows up in real-time systems: when events arrive continuously, where do we do the parsing, validation, and joining work?

Continuing the flight delay example, suppose we want to update an arrival delay prediction as new information arrives (gate changes, late departures, updated weather, airport congestion). We might consume live flight status events and weather updates and keep the latest features and prediction in a low-latency serving store, so an application can query them for a specific flight.

In a real-time ETL-style design, we transform events before they become shared state. For example, a streaming consumer parses and validates each incoming update, normalizes airport codes, joins it with the most recent weather snapshot, and then writes a clean record into an in-memory key-value store keyed by flight_id (for example, with fields like latest_features and latest_prediction). Downstream services read the serving store without having to interpret raw events themselves.

In a real-time ELT-style design, we save the raw event stream first (for example, we append every flight status update into a raw table), and then we build other things from that raw history. A separate job reads the raw events and keeps an up-to-date view for product use, such as a table keyed by flight_id that stores the latest known gate, departure time, and predicted arrival delay.

Once we have decided when and where the “transformation” stage happens, we need to understand how to realize that transformation.

4.6.4 Query and compute engines

A query or compute engine is the infrastructure that executes transformations on our data.

In a “toy” context, you may have worked with libraries like pandas for data analysis. When you are working with a data set that easily fits in your available RAM, pandas is a simple compute engine that lets you compute over and transform your data. In a more realistic context, however, datasets often do not fit into memory. If we try to load everything into RAM at once, our process will crash or spend most of its time swapping data to disk. We need an engine that is capable of processing larger-than-memory datasets.

A useful first step is pyarrow.dataset, which gives us a scanner over parquet files and returns batches. This is often the simplest way to read larger-than-memory tabular data from object storage while keeping a clear batch interface:

import pyarrow.dataset as ds

dataset = ds.dataset("s3://ml-datasets/flights/", format="parquet")
scanner = dataset.scanner(columns=["carrier", "cancelled", "arr_delay_min"], batch_size=200_000)

for batch in scanner.to_batches():
    # batch is an Arrow RecordBatch; we can aggregate directly
    # or hand it to pandas/polars/NumPy for further processing.
    ...

Now let’s suppose we want to process tihs data in pandas. Although we can’t read the entire dataset into memory all at once, we can read and compute over data in chunks. This can work for some computations, like simple counts and averages, but it is not very efficient or general. Chunking forces us to re-implement a lot of what pandas normally does for us. For example, joining two large tables or computing rolling aggregates by key usually requires extra custom logic rather than a single pandas operation.

For example, suppose we want to compute “average arrival delay per carrier for non-canceled flights” from a large export. We can do it in batches using pyarrow.dataset for scanning and pandas for per-batch groupby logic:

import pyarrow.dataset as ds
import pandas as pd

sum_delay_by_carrier = {}
ct_by_carrier = {}

dataset = ds.dataset("s3://ml-datasets/flights/", format="parquet")
scanner = dataset.scanner(columns=["carrier", "cancelled", "arr_delay_min"], batch_size=500_000)

for batch in scanner.to_batches():
    # Convert each Arrow batch to pandas for familiar dataframe operations.
    chunk = batch.to_pandas()
    chunk = chunk[chunk["cancelled"] == False]
    chunk = chunk.dropna(subset=["carrier", "arr_delay_min"])

    # Per-carrier partial sums and counts for this chunk.
    sums = chunk.groupby("carrier")["arr_delay_min"].sum()
    cts = chunk.groupby("carrier")["arr_delay_min"].count()

    # Merge into the running totals.
    for carrier, s in sums.items():
        # dict.get(k, default) returns the value if k exists; otherwise (if we have not seen
        # this carrier yet) it returns the default (here, 0.0).
        sum_delay_by_carrier[carrier] = sum_delay_by_carrier.get(carrier, 0.0) + float(s)
    for carrier, c in cts.items():
        # Same idea: if we have not seen this carrier yet, start its running count at 0.
        ct_by_carrier[carrier] = ct_by_carrier.get(carrier, 0) + int(c)

carriers = list(ct_by_carrier.keys())
mean_arr_delay_min = [
    sum_delay_by_carrier[carrier] / ct_by_carrier[carrier] for carrier in carriers
]

result = (
    pd.DataFrame(
        {
            "carrier": carriers,
            "flight_ct": [ct_by_carrier[carrier] for carrier in carriers],
            "mean_arr_delay_min": mean_arr_delay_min,
        }
    )
    .sort_values("mean_arr_delay_min", ascending=False)
)

To do this, we need to maintain multiple running aggregates (sum and count) and be careful about missing values. And once we get into joins, time windows, and more complex reshaping, chunking often becomes a pile of special cases: buffering, repartitioning by key, writing intermediate files, and then doing a second pass. That is the point where a compute engine with a planner and optimizer becomes attractive.

Many modern compute engines take the same high-level approach: they build a logical plan (a graph of operations like reads, filters, joins, and aggregations), then optimize that plan to reduce work and data movement, and only then execute a physical plan that schedules concrete tasks on CPU cores, threads, or worker machines and streams data through the plan to produce results.

One of the most common optimizations is to avoid reading data that we will throw away anyway. There are two parts to this:

  • Filter pushdown: if we only want rows where cancelled = false, the engine tries to apply that filter as close to the data source as possible. Instead of reading every row and filtering later, it can skip entire file chunks or row groups that cannot match the filter.
  • Column pruning: if we only need carrier and arr_delay_min, the engine tries to read only those columns. In columnar formats like Parquet, this can be much cheaper than reading every column, because the engine can skip bytes on disk and reduce memory and CPU work.

You will sometimes see these described as “predicate pushdown” (predicate = filter condition) and “projection pushdown” (projection = selecting a subset of columns). The important idea is simple: do less work by reading less data.

Other common optimizations include:

  • Join strategy choice: if one table is small (for example, a lookup table of airport metadata), the engine might copy it to each worker and avoid a shuffle of a large table across the network. If both sides are large, it may choose a shuffle-based join.
  • Join ordering: if a query joins multiple tables, the engine might reorder joins so that it reduces the data early (join to a selective filter table first).
  • Rewriting aggregations: the engine might compute partial aggregates on each partition first, and only then merge those partial results. This reduces data movement because we shuffle fewer rows (aggregates) instead of raw events.
  • Removing redundant work: if the plan repeats a subcomputation, the engine might reuse the result instead of recomputing it.

For example, suppose we want “average arrival delay per carrier for non-canceled flights.” A good plan reads only the columns needed, filters out canceled flights immediately, and only then groups by carrier to compute aggregates. A poor plan reads all columns, materializes a large intermediate table, and only later filters and aggregates.

Here is what a simple logical plan graph might look like for the “average delay per carrier” computation, and how an optimizer might rewrite it:

---
config:
  theme: default
---
flowchart LR
  subgraph initial["Initial logical plan"]
    A["Read Parquet files<br/>flights_clean (yesterday)"]
    B["Filter rows<br/>cancelled = false"]
    C["Select columns<br/>carrier, arr_delay_min"]
    D["Group by carrier"]
    E["Aggregate<br/>mean(arr_delay_min), count(*)"]
    F["Write result table<br/>mean delay per carrier"]
    A --> B --> C --> D --> E --> F
  end

  subgraph optimized["Optimized logical plan"]
    A2["Read Parquet files<br/>with filter + column pruning<br/>(predicate + projection pushdown)"]
    D2["Group by carrier"]
    E2["Aggregate<br/>mean(arr_delay_min), count(*)"]
    F2["Write result table<br/>mean delay per carrier"]
    A2 --> D2 --> E2 --> F2
  end

  initial -.->|optimizer rewrite| optimized

Next, we will discuss some specific processing and compute engines.

One single-machine option is Polars. Polars is a single-machine dataframe library that supports lazy execution and a streaming mode, which can help with larger-than-memory workflows because it avoids materializing unnecessary intermediate results.

import polars as pl

lazy_flights = (
    pl.scan_parquet("s3://lake/silver/flights_clean/date=2026-01-12/*.parquet")
    .filter(pl.col("cancelled") == False)
    .select(["carrier", "arr_delay_min"])
    .group_by("carrier")
    .agg(
        pl.col("arr_delay_min").mean().alias("mean_arr_delay_min"),
        pl.len().alias("flight_ct"),
    )
)

# Quick sanity check on a partial dataset.
preview = lazy_flights.fetch(1000)

# Stream execution for larger-than-memory inputs.
result = lazy_flights.collect(streaming=True)

In this code, scan_parquet(...) creates a lazy query: in Polars, the scan_* functions (for example, scan_parquet and scan_csv) return a lazy representation of the data, and Polars does not immediately read the files. Instead, we build up a plan by chaining operations like .filter(...), .select(...), .group_by(...), and .agg(...). Before execution, Polars can optimize that plan (for example, it can avoid reading unused columns and apply filters as early as possible). When we call fetch(1000), Polars runs the plan on a small sample so we can sanity-check that the query looks right. When we call collect(streaming=True), Polars tries to execute the plan in a streaming way, pulling data through the pipeline in smaller batches instead of materializing every intermediate step in memory. This is most helpful for large scans and aggregations where we would otherwise create big temporary tables.

Another single-machine option is DuckDB, a SQL engine that is useful for querying local files (CSV, Parquet). Like other compute engines, DuckDB builds a plan for the query, optimizes it (for example, by pushing filters earlier and choosing join strategies), and then executes that plan.

If we have not seen SQL before, the basic idea is that we describe what we want:

  • SELECT chooses which columns or expressions we want.
  • FROM chooses a table (or file) to read from.
  • WHERE filters rows.
  • GROUP BY groups rows so we can compute aggregates like counts and averages.
  • AS names an output column, which is especially useful for aggregates like AVG(...) and COUNT(*).
SELECT
  carrier,
  AVG(arr_delay_min) AS mean_arr_delay_min,
  COUNT(*) AS flight_ct
FROM read_parquet('s3://lake/silver/flights_clean/date=2026-01-12/*.parquet')
WHERE cancelled = FALSE
GROUP BY carrier;

DuckDB can handle larger-than-memory data by spilling intermediate results to disk when needed, rather than requiring everything to fit in RAM.

Eventually, one machine is not enough. Sometimes we need more CPU or memory than a single machine can provide, or we need faster runtimes than one machine can achieve. That is where distributed processing comes in.

In distributed engines, one process plans work and many workers execute it. The input dataset is divided into partitions. The engine schedules tasks over those partitions. Workers read from a repository, compute, and write outputs. Some operations require moving data between workers (for example, joining or grouping by a key). This movement step is a shuffle, and it is often the most “expensive”.

One common distributed engine is Apache Spark. Spark is one of the most common engines for large table-centric transformations. At a high level, we can think in terms of three roles:

  • driver: the driver is the “brain” of a Spark application. It builds the computation plan (often from a SQL query or a DataFrame pipeline), applies optimizer rules, and then breaks the work into stages and tasks to run on executors. In many setups (client mode), it runs in the process that submits the job (for example, spark-submit on a laptop, a CI job, or a notebook server). In other setups (cluster mode), the driver is launched on the cluster, but it still plays the same coordinating role.
  • cluster manager: the cluster manager decides where the job can run by allocating CPU and memory. It starts executors on available worker machines, and it may restart them if a worker fails.
  • worker nodes: the worker nodes are the machines that do the work. Spark launches one or more executors on those machines; each executor has its own CPU and memory and may keep cached partitions in memory for reuse. If cached data or shuffle data does not fit, executors spill intermediate results to local disk.

spark_cluster cp Control plane dp Data plane driver Spark driver cm Cluster manager driver->cm request resources w1 Worker 1 Executor ------- Cache driver->w1 w2 Worker 2 Executor ------- Cache driver->w2 tasks w3 Worker 3 Executor ------- Cache driver->w3 cm->w1 cm->w2 start executors cm->w3 w1->w2 shuffle storage Data repository w1->storage w2->w3 shuffle w2->storage read/write w3->storage

When a job is submitted, the driver first requests resources from the cluster manager. The cluster manager finds available worker machines and starts executor processes there. Once executors are running, the driver sends them tasks to execute (for example, “scan this partition”, “apply this filter”, “compute partial aggregates for these keys”). As executors run tasks, they read input data from the data repository and write outputs back to it. If the job requires a shuffle (for example, a join or group-by), executors also exchange intermediate data with one another so that rows with the same key end up on the same worker for the next stage.

Spark does not require each worker to hold the whole dataset in RAM. Each task reads and processes one partition at a time. Memory becomes important when we cache intermediate results or when shuffles create large intermediate data. Spark can spill to disk, but excessive spilling is a common reason jobs become slow. In practice, Spark jobs get expensive when shuffles dominate: data has to move across the network so that matching keys can be joined or aggregated together.

A practical way to think about RAM per worker is that it needs to cover (1) the in-flight data structures for the tasks running on that worker right now, plus (2) any cached partitions we choose to keep hot, plus (3) some buffer so we do not spill constantly. When the working set does not fit, Spark writes intermediate data to local disk (spill files and shuffle files) and performance can drop sharply.

To make shuffles concrete, suppose we want to compute “average arrival delay per carrier for non-canceled flights” over a large flights_clean table stored as Parquet. Imagine the table is split into 9 partitions across 3 workers, and each partition has rows for many different carriers mixed together.

  1. Each worker reads its partitions and filters out canceled flights. At this stage, the work is local: each worker is just scanning files and running the filter on its own CPU.
  2. Each worker computes partial aggregates per carrier for the rows it currently has. Concretely, it can build a small in-memory map like {carrier -> (sum_delay_min, flight_ct)} for its own partitions.
  3. Now comes the shuffle: to produce the final answer, all rows (or partial aggregates) for the same carrier have to be brought together. Spark partitions the intermediate data by the group-by key (carrier) and sends those pieces over the network so that, for example, all carrier = "AA" partials land on one worker, all carrier = "DL" partials land on another, and so on.
  4. Each worker merges the partial aggregates it received (adds sums and counts) and computes mean_arr_delay_min = sum_delay_min / flight_ct, then writes the final result table.

The expensive part is step 3: we are no longer just reading and computing, we are moving data between machines. If the intermediate results are large (for example, because we are shuffling raw rows for a join, or because the group-by key has very high cardinality), Spark may also write shuffle data to local disk and read it back, which adds more I/O. That is why distributed jobs often feel fast when they are “just scanning and filtering”, and then slow down sharply when they hit a wide shuffle.

Here is an example of the same computation (“average arrival delay per carrier for non-canceled flights”) using Spark’s Python interface:

from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.getOrCreate()

flights = spark.read.parquet("s3://lake/silver/flights_clean/date=2026-01-12/*.parquet")

result_df = (
    flights.filter(F.col("cancelled") == False)
    .groupBy("carrier")
    .agg(
        F.avg("arr_delay_min").alias("mean_arr_delay_min"),
        F.count("*").alias("flight_ct"),
    )
)

# Persist the result as a table (for example, in the lakehouse).
result_df.write.mode("overwrite").parquet(
    "s3://lake/gold/mean_arrival_delay_by_carrier/date=2026-01-12/"
)

Spark also supports a SQL interface, often called Spark SQL. We can register a dataset as a temporary view and then query it with SQL:

flights.createOrReplaceTempView("flights_clean")

result_df = spark.sql(
    """
    SELECT
      carrier,
      AVG(arr_delay_min) AS mean_arr_delay_min,
      COUNT(*) AS flight_ct
    FROM flights_clean
    WHERE cancelled = FALSE
    GROUP BY carrier
    """
)

Another option is Dask. Dask builds a task graph and executes it in parallel. It can run on one machine (using multiple cores) or on a cluster. Dask is often used when we want a Python-first workflow and want a smoother path from “it runs on my laptop” to “it runs on a small cluster” without rewriting everything.

Compared to Spark, Dask tends to feel more like “pandas, but parallel”: we write computations in Python using familiar dataframe operations, and Dask schedules them across threads or workers. The tradeoff is that Dask generally has less of a SQL-style optimizer than Spark, so it can be easier to accidentally build a slow plan (especially around joins, group-bys, and other operations that require moving data between workers). In practice, Dask is a great fit for small-to-medium clusters and Python-heavy workflows, while Spark is often the default choice for very large, warehouse/lakehouse-style table transformations.

import dask.dataframe as dd

flights = dd.read_parquet(
    "s3://lake/silver/flights_clean/date=2026-01-12/*.parquet",
    columns=["carrier", "cancelled", "arr_delay_min"],
)

result_dd = (
    flights[flights["cancelled"] == False]
    .groupby("carrier")["arr_delay_min"]
    .agg(["mean", "count"])
    .rename(columns={"mean": "mean_arr_delay_min", "count": "flight_ct"})
    .reset_index()
)

# Trigger execution and bring the result back (for example, to write it out).
result = result_dd.compute()

So far, we have focused on batch-style engines that scan and transform “data at rest” in repositories. But in real-time systems, we often need to compute continuously as events arrive. That is where stream processing engines fit.

A stream processing engine consumes events from a message broker and maintains derived outputs as the stream evolves. Instead of “read yesterday’s files, write today’s table”, the engine runs continuously: each event updates some state and may emit new results.

For example, continuing the airline delay setting, suppose our system produces a stream of prediction updates as new information arrives (gate changes, late departures, updated weather, airport congestion). One kind of output is a “latest value” table keyed by flight, like:

flight_latest_prediction(flight_id, predicted_arrival_delay_min, updated_at)

Another kind of output is a rolling metric used as a feature, like “average actual arrival delay at the arrival airport over the last 30 minutes”, which uses a moving time window. In both cases, the stream processor keeps the necessary state (latest values and rolling aggregates) so queries stay fast and up to date.

Stream processing introduces a few new concerns that batch jobs can often ignore.

  • We need to be explicit about time: an event has a business timestamp (when it happened) and a processing timestamp (when we saw it), and late events can arrive out of order (i.e. not in order of business timestamp).
  • We also need to be explicit about delivery: events are typically delivered “at least once”, so consumers must be safe to retry.

To make this concrete, here is what it can look like in practice with Flink SQL. In this example, we read prediction updates from one broker topic, read flight arrival events from another, maintain a latest-value table keyed by flight_id, and compute a sliding 30-minute average actual arrival delay by arrival_airport.

We start by declaring source streams. This is how Flink knows how to read events and how to interpret their fields:

-- Stream source: prediction updates arriving from a broker topic.
CREATE TABLE flight_prediction_updates (
  flight_id STRING,
  arrival_airport STRING,
  predicted_arrival_delay_min DOUBLE,
  event_time TIMESTAMP,
  WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'flight_prediction_updates',
  'properties.bootstrap.servers' = 'broker:9092',
  'format' = 'json'
);
-- Stream source: flight arrival events (used for actual delay metrics).
CREATE TABLE flight_arrival_events (
  flight_id STRING,
  arrival_airport STRING,
  actual_arrival_delay_min DOUBLE,
  event_time TIMESTAMP,
  WATERMARK FOR event_time AS event_time - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'flight_arrival_events',
  'properties.bootstrap.servers' = 'broker:9092',
  'format' = 'json'
);

The event_time column is the business timestamp we want to compute over. The WATERMARK is a watermark that says “assume events can arrive up to 30 seconds late”.

Without a watermark, the engine has no clear rule for when a time window is “done”. For example, if we are computing the 30-minute window from 10:00 to 10:30, should we finalize it at 10:30 exactly, or keep it open in case an event with event_time=10:05 arrives late at 10:31?

  • If we close windows immediately, late events get missed and the result is wrong.
  • If we never close windows, it may keep unbounded state because any old window might still get another event.

With a watermark, the engine waits for late events up to the allowed lateness, then it can close windows and free state. Events that arrive after the watermark are treated as “too late” and must follow a policy (for example, drop them, send them to a separate stream for inspection, or trigger a correction path).

Next, we declare where our derived results should go. For the latest-value view, we want one row per flight_id, updated as new events arrive:

-- Latest-value sink: one row per flight_id, continuously updated.
CREATE TABLE flight_latest_prediction (
  flight_id STRING,
  arrival_airport STRING,
  predicted_arrival_delay_min DOUBLE,
  updated_at TIMESTAMP,
  PRIMARY KEY (flight_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'flight_latest_prediction',
  'properties.bootstrap.servers' = 'broker:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

This sink is keyed by flight_id, so new events for the same flight overwrite the previous value. In this example, we send the computed latest values back into the message broker as a new topic (flight_latest_prediction). From there, downstream consumers can use it however they need. For example, a small consumer service might upsert the latest value into an in-memory key-value store like Redis, so an application can serve “latest prediction for flight flight_id” with very low latency.

For the rolling metric, we define a second sink that holds windowed aggregates:

-- Rolling-metric sink: sliding-window averages by arrival airport.
CREATE TABLE avg_actual_delay_by_airport_30m (
  arrival_airport STRING,
  window_start TIMESTAMP,
  window_end TIMESTAMP,
  avg_actual_arrival_delay_min DOUBLE,
  PRIMARY KEY (arrival_airport, window_start, window_end) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'avg_actual_delay_by_airport_30m',
  'properties.bootstrap.servers' = 'broker:9092',
  'format' = 'json'
);

Now we write continuous queries. First, maintain the latest prediction per flight. We do this by keeping only the row with the maximum event_time for each flight_id:

-- Maintain the latest prediction per flight.
INSERT INTO flight_latest_prediction
SELECT
  flight_id,
  arrival_airport,
  predicted_arrival_delay_min,
  event_time AS updated_at
FROM (
  SELECT
    *,
    ROW_NUMBER() OVER (PARTITION BY flight_id ORDER BY event_time DESC) AS rn
  FROM flight_prediction_updates
)
WHERE rn = 1;

This approach is robust to out-of-order events: if an older update arrives late, it gets a lower rank and does not overwrite the newest value.

Second, compute the sliding 30-minute average actual arrival delay per arrival airport:

-- Compute a sliding 30-minute average delay per arrival airport.
INSERT INTO avg_actual_delay_by_airport_30m
SELECT
  arrival_airport,
  window_start,
  window_end,
  AVG(actual_arrival_delay_min) AS avg_actual_arrival_delay_min
FROM TABLE(
  HOP(
    TABLE flight_arrival_events,
    DESCRIPTOR(event_time),
    INTERVAL '5' MINUTES,
    INTERVAL '30' MINUTES
  )
)
GROUP BY arrival_airport, window_start, window_end;

Here, HOP is Flink SQL’s sliding window function. It has two time parameters: the slide (also called the hop) and the window size. In our query, the window size is 30 minutes and the slide is 5 minutes. So Flink produces overlapping 30-minute windows starting every 5 minutes. For example, around 10am it will produce windows like 10:00 to 10:30, 10:05 to 10:35, and 10:10 to 10:40.

Each arrival event contributes to every window whose time range includes its event_time. The HOP(...) call produces window_start and window_end columns, and we group by those window boundaries. In other words, Flink is maintaining many small aggregates in parallel: one running aggregate per (arrival_airport, window_start, window_end). The watermark determines when each window is allowed to close. If an event arrives after the watermark for a window has passed, it is considered too late to be included in that window’s aggregate (unless we choose a different late-event policy).

The diagram below shows how a 30-minute window sliding every 5 minutes creates overlapping intervals:

Time  10:00 10:05 10:10 10:15 10:20 10:25 10:30 10:35 10:40
      |-----|-----|-----|-----|-----|-----|-----|-----|-----|
W1    [-----|-----|-----|-----|-----|-----]                    (10:00-10:30)
W2          [-----|-----|-----|-----|-----|-----]              (10:05-10:35)
W3                [-----|-----|-----|-----|-----|-----]        (10:10-10:40)
Events     *   *     *        *              *        *

Cloud providers offer managed versions of some engines, and open-source options can be self-hosted. The table below lists a few common choices:

Category Open source
(single machine)
Open source (distributed) GCP AWS Azure
SQL engine DuckDB Trino, Spark SQL BigQuery Redshift, Athena Synapse SQL
Batch processing engine pandas, Polars, Dask Apache Spark, Dask Dataproc EMR, Glue Synapse Spark
Stream processing engine - Apache Flink, Materialize Dataflow Managed Service for Apache Flink Stream Analytics

Ray deserves a special mention because it is widely used in ML systems but does not fit cleanly into the SQL/batch/stream categories above. Ray is a distributed processing engine that can run on a single machine or on a cluster, and we can self-host on-premises or in any cloud. Ray is often used for distributed training, hyperparameter search, and parallel inference services built around long-running actors. We’ll revisit Ray in a later lesson.

Once we choose an engine, we still need a way to define transformations, version them, test them, and make them understandable to other teams.

4.6.5 Transformation layers

The processing, compute, or query engine is infrastructure: it gives us a place to run work. The transformation layer is what we build on top of that infrastructure so it consistently produces the tables, views, and files we want other teams to rely on as data products.

One way to think about this is the difference between ad-hoc analysis and productionized data work. Ad-hoc SQL queries and notebook code can feel like “ClickOps” for data: it is quick to explore, but it is hard to reproduce, review, test, or depend on. A transformation layer is closer to “DataOps”: we treat transformations like production code, versioning them and building them for reproducibility, so their outputs can be trusted and reused.

In practice, a transformation layer answers questions like:

  • Where does a derived dataset come from (what inputs, what logic)?
  • What does it mean (schema, field definitions, units, and semantics)?
  • How do we change it safely (code review, backfills, and deprecations)?
  • How do we trust it (tests, monitoring, and ownership)?

There are two common styles for a transformation layer.

One style is SQL-first. We define datasets as SQL queries and let the engine run them. Continuing the flight delay example, we might define a “gold” table that downstream teams treat as a data product:

SELECT
  carrier,
  AVG(arr_delay_min) AS mean_arr_delay_min,
  COUNT(*) AS flight_ct
FROM silver.flights_clean
WHERE cancelled = FALSE
GROUP BY carrier

This query is short, but turning it into a reliable data product requires extra structure: where the result is written, how often it is rebuilt, how schema changes are handled, and what happens when upstream data is late or corrected.

That is why many teams use a build system on top of SQL. A common example is dbt. In dbt, each transformation is a data model (a SQL file), stored in git, code reviewed like application code, and built into a table or a database view. We can also choose to store the result as a materialized view (or a table) when we want faster queries and stable performance.

To make this concrete, here is a minimal SQL-first “transformation layer” layout (dbt-style) for one data product. In practice, a full-scale project would include more configuration files and data models, but the two core pieces are the SQL data model itself and a companion file that documents and tests it.

transformations/
  models/
    gold_mean_arrival_delay_by_carrier.sql
    gold_mean_arrival_delay_by_carrier.yml
  macros/
    expression_is_true.sql

First, the SQL data model defines what the dataset is:

-- transformations/models/gold_mean_arrival_delay_by_carrier.sql
SELECT
  carrier,
  AVG(arr_delay_min) AS mean_arr_delay_min,
  COUNT(*) AS flight_ct
FROM {{ ref('silver_flights_clean') }}
WHERE cancelled = FALSE
GROUP BY carrier

Second, the schema file documents the columns and adds tests that must pass before we treat the output as a reliable data product. For example, if gold.mean_arrival_delay_by_carrier is a data product, we might also define and enforce expectations like:

  • carrier is never null.
  • flight_ct is always positive.
  • The dataset is updated daily and reflects the latest available inputs.

That would look like this:

# transformations/models/gold_mean_arrival_delay_by_carrier.yml
models:
  - name: gold_mean_arrival_delay_by_carrier
    description: "Daily summary of arrival delay by carrier for non-canceled flights."
    meta:
      expected_update_frequency: "daily"
      max_expected_lag_hours: 24
    columns:
      - name: carrier
        description: "Airline carrier code (for example, AA, DL). One row per carrier."
        tests:
          - not_null
      - name: mean_arr_delay_min
        description: "Mean arrival delay in minutes for non-canceled flights in the input dataset."
      - name: flight_ct
        description: "Number of non-canceled flights included in the aggregate."
        tests:
          - not_null
          - expression_is_true:
              expression: "flight_ct > 0"

In this example, carrier is never null and flight_ct is always positive are enforced as tests that must pass. The “updated daily” expectation is expressed as metadata here (so it is documented and machine-readable), and we will see later how freshness expectations and test failures are enforced as gates in a pipeline (the quality rules live in the data control plane, and the pipeline runner lives in orchestration and operations).

For the flight_ct > 0 check, we used a custom generic test. In dbt, a generic test is just a SQL query that returns the rows that fail. Here is a minimal implementation:

-- transformations/macros/expression_is_true.sql
{% test expression_is_true(model, expression) %}
SELECT *
FROM {{ model }}
WHERE NOT ({{ expression }})
{% endtest %}

Another style is code-first. Instead of writing transformations primarily in SQL, we write them as code in Python (for example, Spark jobs or Dask programs). This can be a better fit when:

  • The transformation logic is complex (custom parsing, user-defined functions, or advanced ML preprocessing).
  • We need to reuse libraries (for example, a shared parser or geospatial code).
  • We want to share code between batch transformations and online inference pipelines.

As with the SQL style, in code-first style a transformation layer is not just the code that runs. It is the surrounding structure that turns that code into a dataset other people can safely depend on.

For example, suppose we want to produce a data product, gold.mean_arrival_delay_by_carrier. The Spark code from the previous section can compute it. We can package the same Spark logic as a dbt data model. dbt is often associated with SQL data models, but it also supports Python data models on some platforms. The idea is the same: we keep the transformation in git, build it into a named dataset, and attach documentation and tests.

Here is what a minimal dbt-style layout can look like for this transformation:

transformations_dbt/
  models/
    gold_mean_arrival_delay_by_carrier.py
    gold_mean_arrival_delay_by_carrier.yml

First, the Python data model expresses the transformation logic using the engine’s API (here, PySpark):

# transformations_dbt/models/gold_mean_arrival_delay_by_carrier.py
from pyspark.sql import functions as F


def model(dbt, session):
    # Build a table (or incremental table) from this data model.
    dbt.config(materialized="table")

    flights = dbt.ref("silver_flights_clean")

    return (
        flights.filter(F.col("cancelled") == False)
        .groupBy("carrier")
        .agg(
            F.avg("arr_delay_min").alias("mean_arr_delay_min"),
            F.count("*").alias("flight_ct"),
        )
    )

This looks similar to a normal Spark job, but there are two dbt-specific pieces:

  • def model(dbt, session): is the entry point dbt calls when it builds the data model. dbt passes in a handle (dbt) we use for configuration and references, and a session object (session) for the execution engine (here, Spark).
  • dbt.ref("silver_flights_clean") is how we declare the dependency on an upstream data model. It is the Python equivalent of { ref('silver_flights_clean') } in SQL. dbt uses these references to build a dependency graph (what must run before what) and to record lineage.

The rest is standard PySpark: filter out canceled flights, group by carrier, and compute aggregates. The important difference is where the result goes and how it is managed. Because we configured materialized="table", dbt will write the returned dataframe into a named dataset for this data model (for example, a table/view in the warehouse or lakehouse), and it will run the associated tests from the YAML file.

Second, the schema file documents the data model and declares tests that must pass before we treat the output as a reliable data product. This is the same as in the SQL case:

# transformations_dbt/models/gold_mean_arrival_delay_by_carrier.yml
models:
  - name: gold_mean_arrival_delay_by_carrier
    description: "Daily summary of arrival delay by carrier for non-canceled flights."
    meta:
      expected_update_frequency: "daily"
      max_expected_lag_hours: 24
    columns:
      - name: carrier
        description: "Airline carrier code (for example, AA, DL). One row per carrier."
        tests:
          - not_null
      - name: mean_arr_delay_min
        description: "Mean arrival delay in minutes for non-canceled flights in the input dataset."
      - name: flight_ct
        description: "Number of non-canceled flights included in the aggregate."
        tests:
          - not_null
          - expression_is_true:
              expression: "flight_ct > 0"

dbt’s core offering is open source and can be used on any cloud or self-hosted. SQLMesh is another open source project that plays a similar role. A managed alternative is Dataflow on GCP.

What if we are not using a framework like dbt or the others mentioned above? We can still build a real transformation layer, but we have to supply the missing structure ourselves: stable outputs, ownership, documentation, and checks that run automatically.

from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.getOrCreate()

flights = spark.read.parquet("s3://lake/silver/flights_clean/date=2026-01-12/*.parquet")

result_df = (
    flights.filter(F.col("cancelled") == False)
    .groupBy("carrier")
    .agg(
        F.avg("arr_delay_min").alias("mean_arr_delay_min"),
        F.count("*").alias("flight_ct"),
    )
)

# Write to a stable destination that downstream jobs can depend on.
result_df.write.mode("overwrite").parquet(
    "s3://lake/gold/mean_arrival_delay_by_carrier/date=2026-01-12/"
)

On its own, this is just code. To make it into a reliable data product, we typically keep the job code alongside a product definition and a small set of automated checks:

pipelines/
  jobs/
    mean_arrival_delay_by_carrier.py
  products/
    gold.mean_arrival_delay_by_carrier.yml
  checks/
    gold.mean_arrival_delay_by_carrier.sql

The job file contains the Spark logic (like the code above). The extra files are what turn a script into something other teams can safely use.

The data product file is a data contract. It makes the output explicit: what it is called, where it is written, who owns it, how often it updates, and what the columns mean.

# pipelines/products/gold.mean_arrival_delay_by_carrier.yml
name: gold.mean_arrival_delay_by_carrier
owner: data-platform@example.com
update_frequency: daily

inputs:
  - silver.flights_clean

output:
  path_template: s3://lake/gold/mean_arrival_delay_by_carrier/date={{ ds }}/
  partition_key: date

schema:
  - name: carrier
    type: string
    description: "Airline carrier code (for example, AA, DL). One row per carrier."
  - name: mean_arr_delay_min
    type: double
    description: "Mean arrival delay in minutes for non-canceled flights in the input dataset."
  - name: flight_ct
    type: bigint
    description: "Number of non-canceled flights included in the aggregate."

The checks file defines validations that run after the job writes the output. A simple way to implement checks is to write SQL queries that return failing rows. For example:

-- pipelines/checks/gold.mean_arrival_delay_by_carrier.sql
-- Each query below should return zero rows.

-- carrier is never null
SELECT * FROM gold.mean_arrival_delay_by_carrier WHERE carrier IS NULL;

-- flight_ct is always positive
SELECT * FROM gold.mean_arrival_delay_by_carrier WHERE flight_ct <= 0;

Finally, the pipeline runner (which we will discuss in orchestration and operations) wires this together: it runs the job on a schedule, runs the checks, and only marks the run successful if the checks pass. That is what lets downstream teams trust the output without reading job code or re-running ad-hoc queries.

This approach also supports safer changes over time. For example, suppose we decide we also want a tail metric like p95_arr_delay_min in addition to mean_arr_delay_min. We add it in the Spark aggregation, add the new column to gold.mean_arrival_delay_by_carrier.yml, and reviewers can see the schema change in the same pull request as the code change. Downstream teams can start using the new column when they are ready, without having to reverse engineer the job.

Or suppose we discover a bug: we accidentally included diverted flights that should have been excluded, which changed mean_arr_delay_min for some carriers. Because this output is a data product built through a transformation layer, we can respond in a disciplined way:

  • We fix the filter in one place (the transformation definition), instead of hunting for multiple copies of the same logic spread across scripts and notebooks.
  • We can then run a backfill for the affected date partitions (for example, re-run the last 30 days) and be clear about what we are rebuilding because the transformation has a stable, named output destination.
  • Because the transformation layer records what the output depends on, we can also identify what else is affected and rebuild in the right order.
  • And because it includes automated checks, we validate the corrected outputs before we treat them as the new truth.
  • We can also add a new check for this specific failure mode (for example, assert that no diverted flights appear in the input set for the aggregate) so the same bug does not silently reappear later.
  • Finally, because downstream teams depend on the output as a published data product, we can communicate the change (and, if needed, version it) instead of silently changing meaning with no audit trail.

And, suppose we need a breaking change. Maybe we want to rename mean_arr_delay_min to mean_arrival_delay_min for clarity, or we realize we should publish the metric in seconds instead of minutes. Instead of silently changing the meaning of an existing column, we publish a new version of the data product (for example, gold.mean_arrival_delay_by_carrier_v2) and keep the old one around for a deprecation window so downstream pipelines can migrate safely.

The kind of structure provided by a transformation layer has only become more important in the era of AI agents. If an agent (or a new engineer) needs to discover existing data products and build a downstream job, it needs more than a table name. It needs to know what the columns mean, what time period a table represents, how often it updates, and what checks must pass for the data to be considered valid. Transformation layers may also publish this information as documentation (for example, dbt can generate a browsable docs site from data model and column descriptions plus lineage), which makes discovery easier for both humans and automated systems. A transformation layer makes data semantics and expectations explicit and machine-readable, so it is easier to build correct downstream pipelines.

4.6.6 Data control plane

So far, we have focused on the data plane: ingestion, storage, and transformation jobs that move and transform bytes. A data control plane is the layer that directs those movements and ensures the result is reliable. It is where we define what is allowed, what is expected, and what we do when reality does not match those expectations.

In practice, the control plane is where we make a data product enforceable. A dataset can have a README that says “updated daily” and “no PII”, but downstream teams can only trust those claims if the pipeline actually checks them and blocks broken outputs from being treated as real. In the control plane, we use real systems and tools to enforce rules around the data.

Enforcement usually happens at a few points in the lifecycle:

  • At ingestion: we check that what arrived is not obviously broken before we treat it as a valid raw input. For example, we check that we didn’t get an HTTP 404 Error, the extract is not empty, that required keys exist. We also record metadata that makes policy and audits possible later: where the data came from, when we ingested it, and what governance tags apply. If these checks fail, we stop the pipeline early and keep the bad input from flowing into downstream tables.
  • After each transformation (when we publish a table): we check that the new output matches our promises. For example, we check the schema, that key columns are not missing, that joins did not drop most rows, that metrics are in plausible ranges, and that the new daily partition is actually present. If these checks fail, we can choose to e.g. keep previous version of the table, so downstream jobs keep reading the most recent version that passed checks.
  • When the data product is used: we enforce who is allowed to read what in a concrete way. For example, we might put raw tables that include names, phone numbers, and street addresses in a restricted schema (or bucket) that only a small set of accounts can read. Most analytics and ML jobs would only have permission to read curated tables that have record ids but not personal information. Consuming jobs can also do simple safety checks like “did we actually produce the newest daily data?” (for example, “does the date=2026-01-12 folder exist, and does it have rows?”) so we fail fast instead of training or analyzing on stale or missing data.

When a check fails, the control plane needs a clear “what happens next” policy. In practice, we usually choose from a small set of actions:

  • Stop and alert: we fail the run, notify an owner, and do not produce new downstream outputs. This is common when inputs look corrupted or incomplete.
  • Retry with backoff: if the failure looks transient (network errors, temporary 500s, rate limiting), we wait and retry a few times before failing. We also record that retries happened so we can see flaky sources.
  • Keep the previous version: if a daily table fails its checks today, we do not overwrite the last published data product. Downstream jobs keep reading the last version that passed checks.
  • Write to a “do not use” location (quarantine): sometimes we still want to keep a copy of the bad input for debugging, but we write it to a separate path or table that downstream jobs are not configured to read from.
  • Degrade intentionally: in some products, we may choose a safe fallback (for example, skip a non-critical enrichment) while clearly marking the output as partial, so we do not silently pretend it is complete.

Let us now discuss concretely what the control plane might require at each stage of the pipeline.

At ingestion time, we can land data in a few different places. Sometimes we land it into a data lake: files in object storage (or a raw lakehouse table) partitioned by date and source. Other times we ingest directly into a database table (for example, an operational database that feeds reporting, or a warehouse raw table that downstream SQL can query).

The failure modes are similar in both cases, but the mechanics are different. In a lake, there is usually no built-in schema enforcement: if we write the wrong file or an empty file, the storage system will happily accept it. In a database table, we can sometimes enforce basic constraints (types, NOT NULL, unique keys), but if we don’t have any checks, we might still ingest the wrong rows or only a partial slice if our extraction failed.

For example, if we pull from an API, we should treat HTTP errors as ingestion failures no matter where we plan to write the result: a 404 (wrong endpoint or resource), a 401/403 (auth), a 429 (rate limited), or a 5xx (source outage). In those cases we usually retry with backoff for a short period and then stop the run and alert if it does not recover. Even if the request succeeds, “empty” can still be a failure mode: we might accidentally query the wrong date or cursor and get zero rows. A practical check is to compare against simple expectations (row count above a minimum, or within a reasonable range of yesterday), and to fail the run if the result is implausible for that source.

We should also look for schema changes at ingestion. External sources can and do evolve. For example, in December 2025, NYC Open Data changed the schema of the 311 dataset (renamed some columns, added some new ones). A practical ingestion check is to compare the columns we received to what we expect: if a required column disappeared or was renamed, we should fail the run loudly (so we do not silently produce incorrect downstream tables). If a new optional column appears, we can often accept it while logging the change, then update our downstream transformations when we are ready.

Ingestion is also a natural point to attach metadata. For lineage, we record where the data came from and how it was produced, for example: the source name and endpoint, the time we fetched it, the ingestion job version (git SHA), and the storage location we wrote. That metadata might be stored as columns on the raw table (for example, ingested_at, ingestion_run_id, source_name), or as a separate ingestion log table keyed by the output path or partition. For governance, we attach tags that downstream policy can interpret (for example, whether the dataset contains direct identifiers, whether it is allowed for training use, and how long it may be retained). Governance requirements are usually described in plain language (“do not use data from minors for training”, “delete a user’s data within 30 days”, “only the fraud team can access raw card transaction logs”), but they only become enforceable if the data carries the right metadata. In practice, the control plane defines that metadata and the rules that interpret it.

Some examples of governance metadata fields we might attach at ingestion time:

  • data_category: what kind of data this is (for example, support_ticket, delivery_event, location_ping), which drives different access and retention rules.
  • pii_class: whether the record contains direct identifiers, quasi-identifiers, or no identifiers, which can decide what layers may store it and who can read it.
  • collection_purpose: why the data was collected (for example, service_operation, marketing, research), which can gate training use.
  • consent_status: whether the data subject consented to specific uses (for example, training_allowed=true/false).
  • region: where the data is governed (for example, us, eu), which can determine storage location and deletion timelines.
  • retention_expires_at: when the record must be deleted or anonymized, which allows automatic enforcement instead of manual cleanup.

In addition to metadata, many organizations implement governance by splitting storage into data zones. Different use cases and teams need different levels of access. Zones make least-privilege access easier to implement and audit, and they reduce the blast radius of mistakes (for example, someone accidentally granting broad access to a raw table).

For example, in our food delivery service, operational data can include direct identifiers (names, phone numbers), sensitive location traces (courier GPS pings), and free-form text (support notes) that may contain unexpected personal information. A common zone split is:

  • Restricted raw zone: raw operational tables that may include PII, such as raw.customers (name, phone), raw.delivery_addresses (street address), and raw.courier_location_pings (fine-grained lat/long).
  • Curated zone: cleaned and de-identified tables that most analytics and ML work can use, such as silver.deliveries_clean with delivery_id, restaurant_id, created_at, pickup_at, dropoff_at, and stable identifiers like customer_id and courier_id (but not names or phone numbers).
  • PII lookup zone: small reference tables that map stable ids to PII, such as restricted.customer_profile(customer_id, name, phone) and restricted.courier_profile(courier_id, name, phone), which only a small set of services and teams can access.

Whether customer PII must live in a separate zone depends on our risk tolerance, whether the domain is regulated by law, and our access model. Some stacks rely on column-level security inside one database (so the table exists once, but only specific roles can read sensitive columns). Others prefer physical separation (e.g. different buckets) because it is harder to misconfigure. Either way, the goal is the same: most downstream jobs should not need to touch PII at all, and when they do, it should be explicit and auditable.

A big piece of the control plane is quality and contract testing, both at ingestion and after we transform the data. A quality gate is a check that prevents bad or misleading data from quietly propagating downstream. For example, we might gate on:

  • Schema: expected columns and types exist (and alerts when an upstream field is renamed or removed).
  • Constraints: required fields are not missing, numeric or datetime fields are parsed correctly, counts are positive, numeric values fall within expected range, there are zero or few anomalous values.
  • Freshness: for example, the prior-day partition is present and was produced recently.
  • Source evolution: if an external source changes shape (a form of schema drift), we either adapt safely or fail loudly before we publish.

In practice, we specify these expectations in a few common ways:

Sometimes the expectations are enforced by the data repository itself. For example, a relational database can enforce NOT NULL constraints, unique indexes, foreign keys, and CHECK constraints. That can be a great first line of defense for operational tables. But most analytical repositories (especially file-based lakehouse tables) do not enforce constraints in the same way, and many expectations are not simple row-level constraints anyway (freshness, distribution shifts, cross-table consistency).

More commonly, we specify expectations as code and run them as part of the pipeline. In dbt, expectations are typically expressed as tests attached to data models. For example, we can attach not_null and unique tests to key columns. If we want a richer library of ready-made checks inside dbt, we could use dbt-expectations, which adds tests for things like row count ranges, regex validation of strings, and allowed value sets. These checks live right next to the data model definition in a dbt YAML file, so they are version controlled and code reviewed alongside the transformations.

Recall the NYC 311 call example - suppose we ingest a daily external snapshot into raw.nyc_311_requests and we want to catch “obviously wrong” pulls early. We might define expectations like:

  • Completeness: the table is not empty, and row count is above a minimum threshold (so we catch “downloaded only 1,000 rows” mistakes).
  • Schema: required columns exist (for example, unique_key, created_date, agency, complaint_type).
  • Content: unique_key is never null, and looks like an integer string; created_date parses as a timestamp.
  • Allowed values: status is in an expected set like Open, In Progress, Closed (so a source change shows up quickly).

That might look like this in dbt (using dbt-expectations):

models:
  - name: raw_nyc_311_requests
    tests:
      - dbt_expectations.expect_table_row_count_to_be_between:
          min_value: 1000000
    columns:
      - name: unique_key
        tests:
          - not_null
          - dbt_expectations.expect_column_values_to_match_regex:
              regex: "^[0-9]+$"
      - name: created_date
        tests:
          - not_null
      - name: status
        tests:
          - dbt_expectations.expect_column_values_to_be_in_set:
              value_set: ["Open", "In Progress", "Closed"]

There are other data quality tools - we could express the same checks in Soda as well. Like dbt, Soda uses a YAML-based language for writing data quality checks. For example:

checks for raw.nyc_311_requests:
  - row_count > 1000000

  - missing_count(unique_key) = 0
  - invalid_count(unique_key) = 0:
      valid_regex: "^[0-9]+$"

  - missing_count(created_date) = 0

  - invalid_count(status) = 0:
      valid_values:
        - Open
        - In Progress
        - Closed

We can also define expectations that are more about “not anomalous” than correctness. Here are a few other concrete checks that come up often in ML pipelines:

  • Key uniqueness: for a raw event log, we might require that the pair unique_key and :version is unique, so we can safely deduplicate updates.
  • Range checks: numeric fields stay in plausible ranges (for example, latitude is between -90 and 90, and longitude is between -180 and 180).
  • Distribution checks: we might require that a categorical column does not suddenly collapse to a single value (for example, 99% of rows have status="Open"), or that the distribution of complaint_type does not shift drastically without an alert.
  • Cross-table consistency: we can validate joins by checking that keys match the reference tables we expect.

For example, in our food delivery service, we might have a daily deliveries table silver.deliveries_clean(date=...) and a reference table silver.restaurants_dim that lists valid restaurants. A cross-table check can assert that every restaurant_id in deliveries exists in silver.restaurants_dim, and fail if we somehow have a delivery from a restaurant the platform doesn’t know about. The simple cross-table check (as SQL that should return zero rows) might look like:

-- Every restaurant_id in deliveries should exist in the restaurants dimension table.
SELECT d.delivery_id, d.restaurant_id
FROM silver.deliveries_clean d
LEFT JOIN silver.restaurants_dim r
  ON d.restaurant_id = r.restaurant_id
WHERE r.restaurant_id IS NULL;

Here is what this query is doing.

  • We start from silver.deliveries_clean (the table we are trying to validate).
  • We then join it to silver.restaurants_dim (the table of restaurants we consider valid) using a LEFT JOIN, which means we keep every delivery row even if there is no matching restaurant row.
  • If a delivery has a restaurant_id that does not exist in the dimension table, then all the r.* columns will be null for that delivery.
  • The final WHERE r.restaurant_id IS NULL filters down to exactly those broken rows.

If the pipeline is healthy, this query should return zero rows. If it returns any rows, it is a signal that something is inconsistent: we may have ingested deliveries with a malformed restaurant_id, missed ingesting a subset of restaurants, or changed the restaurant id format without updating both datasets.

Or, in our air travel delay prediction example, if gold.mean_arrival_delay_by_carrier normally has 10 to 50 rows (a small number of carriers), we can gate on that range. Similarly if mean_arr_delay_min is usually between -120 and +600 minutes, we can alert if it is outside that range, which could indicate a timestamp parsing bug or other error.

We might express the checks in dbt like:

models:
  - name: gold_mean_arrival_delay_by_carrier
    tests:
      - dbt_expectations.expect_table_row_count_to_be_between:
          min_value: 10
          max_value: 50
    columns:
      - name: carrier
        tests:
          - not_null
          - unique
      - name: flight_ct
        tests:
          - not_null
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 1
      - name: mean_arr_delay_min
        tests:
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: -120
              max_value: 600

The same checks in SodaCL would look like:

checks for gold.mean_arrival_delay_by_carrier:
  - missing_count(carrier) = 0
  - duplicate_count(carrier) = 0
  - min(flight_ct) >= 1
  - row_count between 10 and 50
  - min(mean_arr_delay_min) >= -120
  - max(mean_arr_delay_min) <= 600

Freshness is usually expressed as an expectation too, even though it is not a property of individual rows. For example, “the prior-day partition exists by 7am” can be expressed as “the maximum event_date is at least yesterday” or “the output path for date={{ ds }} exists”. Many teams implement freshness checks as SQL queries or simple filesystem checks, because they are easy to understand and fast to run.

For example, suppose gold.mean_arrival_delay_by_carrier is rebuilt daily and written as date-stamped partitions like s3://lake/gold/mean_arrival_delay_by_carrier/date=2026-01-12/. If our contract says “updated daily by 7am”, a simple filesystem freshness check might be:

# Pseudocode: fail if the expected output for today is missing.
exists("s3://lake/gold/mean_arrival_delay_by_carrier/date=2026-01-12/")

Or, if the table is queryable in a warehouse/lakehouse, we might implement a SQL freshness check that looks for the latest partition:

-- Expect the newest event_date in the table to be at least yesterday.
SELECT
  MAX(event_date) AS max_event_date
FROM gold.mean_arrival_delay_by_carrier;

In a pipeline runner, we would turn that into a gate by comparing max_event_date to the expected date for the run, and failing if it is too old.

Once we have these checks, we still need to run them automatically and decide what to do when they fail. Operationalizing all of this - scheduling jobs, running gates, retrying failures, backfilling history, alerting owners, and publishing outputs only when checks pass - is the focus of orchestration and operations.

4.6.7 Orchestration and operations

The control plane defines rules and gates. Orchestration and operations is how we execute them reliably. It covers scheduling (what should run when), dependencies (what must finish before what), failure handling (what happens when something breaks), and observability (how we know what happened).

Most teams use a workflow orchestrator to do this. In this section, we will use Airflow, Argo Workflows, and Prefect as examples, but the underlying ideas are shared across tools.

At a high level, orchestration turns a pipeline into a DAG of steps. A typical daily batch pipeline for an external dataset like NYC 311 might look like:

nyc_311_orchestration_happy_path start Schedule trigger (daily 7am) fetch Fetch from API (NYC 311) start->fetch land_raw Land raw data (raw zone) fetch->land_raw ingest_checks Ingestion checks (schema, row count) land_raw->ingest_checks attach_meta Mark raw partition as valid (attach lineage + governance) ingest_checks->attach_meta notify_ingest Notify owner (ingestion failure) ingest_checks->notify_ingest fail transform Transform (clean + dedupe) attach_meta->transform transform_checks Output checks (keys, ranges) transform->transform_checks publish Publish new partition (gold table) transform_checks->publish notify_transform Notify owner (output-check failure) transform_checks->notify_transform fail record_lineage Record publish metadata (lineage + run info) publish->record_lineage

This is a simplified picture, but it captures a key operational behavior: checks run as gates, and a failure changes control flow. Instead of producing more tables on top of broken inputs, we stop early, alert an owner, and avoid publishing a new version.

Scheduling is the most visible part of orchestration, but it is not the hardest part. The harder part is handling failures and reprocessing.

One common mechanism is retries. Retries help with transient failures: rate limits, network timeouts, or a source outage that clears up after a few minutes. Retrying does not help with deterministic failures (a schema change, a logic bug, or a permission issue), and too many retries can hide real problems. A good default is a small number of retries with an increasing delay.

Another common mechanism is a backfill. An external dataset can be corrected days later, and internal pipelines can change logic (for example, we realize we need a different deduplication rule). Backfills are where orchestration meets the transformation layer: we need a repeatable way to rerun the same data models over older partitions and publish corrected outputs, without breaking downstream users.

Backfills also highlight a practical requirement: we want each run to be safe to rerun. This property is called idempotency. For example, if a job writes date=2026-01-12, it should either overwrite that partition atomically, or write to a temporary location and then swap it into place only after checks pass. Otherwise, partial reruns can leave behind a corrupt mix of old and new data.

Orchestration is also where we track time expectations. Many pipelines have requirements for freshness. Pipelines can fail, and teams still need a fast way to detect lateness and respond. A common pattern is to have a “freshness monitor” that checks whether the prior-day partition exists by a deadline and alerts the owner if it does not.

This is also where observability becomes concrete. When we run a pipeline every day for months, failures are not the only problem. Silent partial failures are often worse. For example, an API call might succeed but return far fewer rows than normal, or a transformation might accidentally drop most rows in a join. Observability is what lets us notice these problems quickly and debug them.

Concretely, orchestration systems usually give us:

  • Logs: per-step output, errors, and context like the run date and run id.
  • Metrics: durations, row counts, file sizes, retry counts, and freshness lag.
  • Alerts: notifications to an on-call rotation, Slack channel, or ticket queue when something is broken or late.
  • Dashboards: charts that show trends over time (freshness, volume, cost) so we can spot regressions.

Here are some concrete metrics and checks we might track for the NYC 311 pipeline in the DAG above:

  • Freshness: freshness_lag_hours for the newest raw and gold partitions, and an alert if it exceeds the SLA.
  • Row counts: raw_row_count and gold_row_count, with alerts if counts are implausible (for example, less than 50% of yesterday).
  • Schema changes: a list of added/removed/renamed columns detected at ingestion time, and an alert if a required column is missing.
  • Gate results: which checks failed (schema vs constraints vs ranges), so we can route the incident to the right owner.
  • Step durations: fetch_seconds, transform_seconds, checks_seconds, so we can see where time is spent.
  • Failure and retry behavior: retry_count per step and run_success per day, so we can tell flaky sources from broken code.

These signals are also useful for auditing and for the control plane. If we record the run id, the code version, the input partitions, the check results, and the publish decision for each run, we can later answer questions like: what data did we publish on January 12, which job version produced it, and which checks passed.

For example, suppose we get an alert that raw_row_count dropped to 10% of yesterday. Observability gives us a fast path to debugging: we can look at the fetch step logs and see whether we were rate limited (429), whether the query parameters were wrong (returned zero rows), or whether the source was down (5xx). We can also see whether the ingestion checks failed (so nothing downstream ran) or whether the checks passed but the transform later dropped rows (for example, a join key format changed). Without these signals, we often discover problems days later, after dashboards and training data have already been built on top of the bad run.

Different orchestrators expose these ideas in different ways: we will look at three different examples.

Airflow is Python-first and DAG-first. We define a DAG as code and schedule it. Airflow is a good fit for many batch pipelines and integrates well with data systems and dbt. For example, we can define a DAG that mirrors the NYC 311 pipeline above:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.email import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime

with DAG(
    dag_id="nyc_311_daily",
    start_date=datetime(2026, 1, 1),
    schedule="0 7 * * *",
    catchup=True,
) as dag:
    fetch = BashOperator(task_id="fetch", bash_command="python fetch_311.py --ds {{ ds }}")
    land_raw = BashOperator(task_id="land_raw", bash_command="python land_raw_311.py --ds {{ ds }}")
    ingest_checks = BashOperator(task_id="ingestion_checks", bash_command="python check_raw_311.py --ds {{ ds }}")
    mark_valid = BashOperator(task_id="mark_raw_valid", bash_command="python mark_raw_valid.py --ds {{ ds }}")
    transform = BashOperator(task_id="transform", bash_command="dbt build --select silver_nyc_311_requests_clean")
    output_checks = BashOperator(task_id="output_checks", bash_command="dbt test --select gold_nyc_311_requests_daily")
    publish = BashOperator(task_id="publish", bash_command="python publish_311_gold.py --ds {{ ds }}")
    record_meta = BashOperator(task_id="record_publish_metadata", bash_command="python record_lineage_311.py --ds {{ ds }}")

    notify = EmailOperator(
        task_id="notify_owner",
        to="data-oncall@example.com",
        subject="NYC 311 pipeline failed for {{ ds }}",
        html_content="Check Airflow logs for run {{ run_id }}.",
        trigger_rule=TriggerRule.ONE_FAILED,
    )

    fetch >> land_raw >> ingest_checks >> mark_valid >> transform >> output_checks >> publish >> record_meta
    [ingest_checks, output_checks] >> notify

This Airflow DAG says:

  • Schedule: run every day at 7am (schedule="0 7 * * *").
  • Steps: run fetch, land raw, run ingestion checks, mark the raw partition valid, transform, run output checks, publish, and record publish metadata.
  • Run date: { ds } is the run date (for example, 2026-01-12), passed into each step so it reads and writes the correct daily partition.
  • Operations: Airflow records run history and task logs, and it can retry tasks, mark runs as failed, and alert owners when something breaks.

Each step is a real program we own, stored in a repository and deployed like any other production code. For example, the fetch, land_raw, check_raw, and publish steps might be small Python entrypoints under a pipelines/ folder, while the transformations and tests live in a dbt project. (This is a simplified example - in a real deployment, we’d likely package the Python material more robustly.)

pipelines/
  dags/
    nyc_311_daily.py                 # Airflow DAG definition
  jobs/
    fetch_311.py                     # call API, write response to local temp
    land_raw_311.py                  # write raw partition to object storage (raw zone)
    check_raw_311.py                 # ingestion checks: schema + row count + sanity
    mark_raw_valid.py                # write "this partition is valid" metadata
    publish_311_gold.py              # publish the new partition (for example, atomic move)
    record_lineage_311.py            # record lineage + run metadata in a log table/catalog
  dbt/
    models/
      silver_nyc_311_requests_clean.sql
      gold_nyc_311_requests_daily.sql
    models.yml                       # data model docs + tests

Why split work this way? Ingestion steps often involve HTTP calls, pagination, rate limiting, file I/O, and writing to object storage or a raw table. Those are usually easiest to express as general-purpose code (Python). Transformations, on the other hand, are often best expressed as SQL data models that run close to the data, with standardized testing and documentation - and dbt is designed for that transformation layer.

In this example, we use BashOperator to execute steps of the pipeline directly on the host. In a more robust deployment, we might instead use DockerOperator or KubernetesPodOperator so each step runs in a more controlled, versioned environment (for example, a container image with pinned dependencies).

Argo Workflows is Kubernetes-native and always runs steps of the pipeline in pods! Instead of a Python process coordinating work, Argo schedules containers in a Kubernetes cluster. This is a good fit when our pipeline steps are already packaged as containers (for example, an ingestion container, a Spark submit container, or a dbt container) and we want the cluster to manage resources. For example:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: nyc-311-daily-
spec:
  arguments:
    parameters:
      - name: ds
        value: "2026-01-12"
  entrypoint: pipeline
  templates:
    - name: pipeline
      dag:
        tasks:
          - name: fetch
            template: fetch
            arguments:
              parameters:
                - name: ds
                  value: "{{workflow.parameters.ds}}"
          - name: land-raw
            dependencies: [fetch]
            template: land-raw
            arguments:
              parameters:
                - name: ds
                  value: "{{workflow.parameters.ds}}"
                - name: raw_payload_path
                  value: "{{tasks.fetch.outputs.parameters.raw_payload_path}}"
          - name: ingestion-checks
            dependencies: [land-raw]
            template: ingestion-checks
          - name: mark-raw-valid
            dependencies: [ingestion-checks]
            template: mark-raw-valid
          - name: transform
            dependencies: [mark-raw-valid]
            template: transform
          - name: output-checks
            dependencies: [transform]
            template: output-checks
          - name: publish
            dependencies: [output-checks]
            template: publish
          - name: record-publish-metadata
            dependencies: [publish]
            template: record-publish-metadata
          - name: notify-owner
            depends: "ingestion-checks.Failed || output-checks.Failed"
            template: notify-owner

    - name: fetch
      inputs:
        parameters:
          - name: ds
      container:
        image: our-ingestion-image:latest
        command: ["python", "fetch_311.py"]
        args: ["--ds", "{{inputs.parameters.ds}}", "--write-path-file", "/tmp/raw_payload_path.txt"]
      outputs:
        parameters:
          - name: raw_payload_path
            valueFrom:
              path: /tmp/raw_payload_path.txt
    - name: land-raw
      inputs:
        parameters:
          - name: ds
          - name: raw_payload_path
      container:
        image: our-ingestion-image:latest
        command: ["python", "land_raw_311.py"]
        args: ["--ds", "{{inputs.parameters.ds}}", "--raw-payload-path", "{{inputs.parameters.raw_payload_path}}"]
    - name: ingestion-checks
      container:
        image: our-ingestion-image:latest
        command: ["python", "check_raw_311.py"]
    - name: mark-raw-valid
      container:
        image: our-ingestion-image:latest
        command: ["python", "mark_raw_valid.py"]
    - name: transform
      container:
        image: our-dbt-image:latest
        command: ["dbt", "build", "--select", "silver_nyc_311_requests_clean"]
    - name: output-checks
      container:
        image: our-dbt-image:latest
        command: ["dbt", "test", "--select", "gold_nyc_311_requests_daily"]
    - name: publish
      container:
        image: our-ingestion-image:latest
        command: ["python", "publish_311_gold.py"]
    - name: record-publish-metadata
      container:
        image: our-ingestion-image:latest
        command: ["python", "record_lineage_311.py"]
    - name: notify-owner
      container:
        image: our-notify-image:latest
        command: ["python", "notify_owner.py"]

This workflow says:

  • Steps: define tasks that mirror the pipeline: fetch, land raw, checks, transform, publish, and record metadata.
  • Dependencies: the dependencies fields enforce the step order, and notify-owner runs only if a check fails.
  • Passing data between steps: the fetch step writes the raw payload to storage and outputs the resulting path as a parameter (raw_payload_path). The land-raw step reads that parameter and uses it to find the exact payload to land into the raw zone. This passes a small pointer between pods, not the full dataset.
  • Execution: each task runs as a container in Kubernetes, using the specified image and command.
  • Operations: Argo captures logs and status for each task run, and Kubernetes handles scheduling and resources.

Prefect is also Python-first, but it is more flexible about how and where it runs. Prefect is often used when we want a simple way to turn Python functions into a scheduled flow with retries and logs. For example:

from prefect import flow, task

@task(retries=3, retry_delay_seconds=60)
def fetch(ds: str):
    ...

@task
def land_raw(ds: str):
    ...

@task
def ingestion_checks(ds: str):
    ...

@task
def mark_raw_valid(ds: str):
    ...

@task
def transform(ds: str):
    ...

@task
def output_checks(ds: str):
    ...

@task
def publish(ds: str):
    ...

@task
def record_publish_metadata(ds: str):
    ...

@task
def notify_owner(ds: str):
    ...

@flow
def nyc_311_daily(ds: str):
    try:
        fetch(ds)
        land_raw(ds)
        ingestion_checks(ds)
        mark_raw_valid(ds)
        transform(ds)
        output_checks(ds)
        publish(ds)
        record_publish_metadata(ds)
    except Exception:
        notify_owner(ds)
        raise

This Prefect flow says:

  • Steps: treat each stage of the pipeline as a task, and run them in order inside a flow.
  • Retries: retry fetch up to 3 times with a 60 second delay (@task(retries=3, retry_delay_seconds=60)).
  • Run date: the ds parameter is passed into the flow so tasks can read and write the correct daily partition.
  • Failure handling: if any step raises an exception, we run notify_owner and then re-raise the error so the run is marked failed.
  • Execution: Prefect can run these tasks locally, in a container, or on a worker pool depending on how we deploy it.

In this chapter we are not trying to master any one orchestrator. The goal is to learn the common operational questions we must answer in any production data pipeline: what triggers the run, how we detect and handle failures, how we safely backfill and republish, and how we make the pipeline observable so humans can operate it.

4.7 Training data flows

Data pipelines move and transform data. Training data flows take the outputs of those pipelines and turn them into a dataset that a model can learn from, without accidentally using information that would not have been available at prediction time.

Training data flows have stricter constraints than general analytics pipelines:

  • causality: features must use only what was known at the time we would make a prediction.
  • data leakage: information that won’t be available at inference time cannot sneak into training features.
  • reproducibility: we need to be able to recreate the dataset for audits and model comparisons.
  • Feature correctness and label correctness: both inputs and ground truth need validation.

4.7.1 Example definition

We start by defining what a single training example represents. This definition becomes the interface between “data in repositories” and “data the model consumes”.

At minimum, we specify:

  • What we are predicting (target)
  • The entity we predict for (and its stable id)
  • The prediction time for each example (sometimes called an anchor time)
  • What is allowed as input at prediction time (what was known then)
  • How we compute the label later from outcomes

Revisiting some of the examples from this chapter:

  • 311 closure prediction: one example per request, anchored at created_date. Features can use fields known at request creation (complaint type, agency, borough, time of day). Labels use closed_date but only as ground truth, not as an input.
  • Flight delay prediction: multiple examples per flight, each at an anchor point (24 hours before departure, 1 hour before departure, 10 minutes after departure). Available features change by anchor point. Labels are computed from actual arrival time.
  • Food delivery ETA: multiple examples per delivery over the lifecycle (checkout, after courier assigned, after pickup). Features use what is known at each anchor, and labels come from eventual dropoff_at.

Specifying this early prevents a common failure mode: a dataset that mixes “what we know now” with “what we only learn later”, which trains a model that looks great offline but fails in production.

4.7.2 Candidate selection

Once we know what an example means, we select the candidates: which entities and time range we will include in the dataset. This determines the training population and removes obvious problems.

  • Time range: for example, 311 requests from the last 180 days, or flights from the last 12 months.
  • Eligibility: exclude rows that cannot be labeled (for example, flights with missing actual arrival time, or deliveries that were canceled before pickup).
  • Deduplication: handle repeated records and updates (for example, multiple versions of a 311 request).
  • Decontamination: remove data that should not be used for training (for example, internal QA traffic or records that violate consent rules).

Because candidate selection is not trivial, candidates often come from curated pipeline outputs rather than raw data. For example:

  • For 311, we might ingest the raw dataset daily, deduplicate by unique_key, and select only requests whose created_date is within the training window and whose fields pass basic sanity checks.
  • For flights, we might select only non-canceled flights and restrict to airports and carriers where we have sufficient coverage.
  • For food delivery, we might select deliveries that reached a terminal state (delivered or canceled) so the outcome is known, and exclude periods where logging was incomplete.

4.7.3 Labels

After selecting candidates, we attach labels: ground truth outcomes computed after the fact.

A label can arrive much later than prediction time, and it can be missing or corrected. Labels have their own pipeline, with validation and versioning.

One reason labeling is hard is that labels become known on very different timescales. In many systems, we deal with all of the following:

  • Labels known quickly: we learn the outcome within minutes or hours, so labels can be computed the same day (for example, food delivery duration after a delivery completes).
  • Labels known slowly: we only learn the outcome after days, weeks, or months (for example, did a 311 request get closed, did a user churn, did a loan default).
  • Labels we never know for sure: there is no authoritative ground truth in the data system, so we need human review, external evidence, or we accept noisy proxies (for example, headshot authenticity, fraud that was never detected, content that was never reported).

The label latency affects the whole training loop. If labels take weeks, we cannot retrain daily on fresh outcomes, and we need to be careful about what we treat as “negative” vs “not yet known”.

Some problems have no natural ground truth label. For example, a headshot authenticity classifier is trying to predict “real vs AI-generated”, but there is no automatic field in a database that tells us the truth. Or a recommender is trying to predict “would the user have liked this?”, but the user never saw many items so there is no direct outcome. In these cases, teams sometimes start with pseudo-labels: we take the current model output as the label, so the next training run at least sees new inputs. This can help with coverage, but it bakes in the current model’s mistakes and can cause the system to reinforce its own biases.

A more reliable approach is to set up a human labeling pipeline. For example, each day we might sample 1% of production requests, remove sensitive fields, and send them to trained reviewers. We might also prioritize cases that look ambiguous (low confidence) or high impact (a decision that triggers a user-facing action). Those human labels become a high-quality evaluation set and can also be joined back into training data. We will return to these strategies when we discuss evaluation and monitoring in a later chapter.

Some ML systems also use synthetic labels:

  • weak supervision: heuristic rules assign labels with noise.
  • Teacher models: a larger model generates labels for a smaller model.
  • Controlled generation: synthetic examples are created under known rules and labeled automatically.

Synthetic labels can be useful when human labels are scarce, but they require extra validation because errors in label generation can dominate model behavior.

Here is a concrete example of a synthetic labeling pipeline. Suppose we run a headshot authenticity classifier, but we do not have a reliable ground truth label in our databases. Each day we might:

  • Sample a fraction of new production headshots (for example, 5%), after applying any privacy filters needed for review.
  • Run a stronger teacher model offline and store its output (for example, p_ai_generated) as a synthetic label candidate.
  • Optionally add a human review step for a smaller subset (for example, 0.1%) to estimate how often the teacher is wrong and to create a stable evaluation set. Even this is not perfect truth - the humans are not necessarily always able to flag AI-generated headshots, either.
  • Write a label table keyed by the image id and the prediction time, so we can join labels back to the exact inputs used.

These labels are not really ground truth, but they are a way to expand coverage so the training set includes new inputs, while we build a longer-term labeling strategy (human labels, external evidence, or workflow-driven outcomes).

4.7.4 Example assembly

Candidates and labels are only part of a training example. We still need to assemble the model inputs by joining many signals into a single row (or a single example object for unstructured data). For example:

  • For food delivery ETA, we might join deliveries to courier location pings, restaurant backlog snapshots, and neighborhood traffic signals.
  • For flight delay, we might join flights to weather snapshots, airport congestion metrics, and rolling delay aggregates by carrier.
  • For 311 closure prediction, we might join requests to neighborhood-level historical closure rates, recent volume in the same zip code, and agency-level backlog signals.

Example construction is where we implement as-of joins. Note that we have to be careful about timelines here - instead of joining an entity to the latest snapshot, we join at a specific anchor_time to the most recent snapshot at or before that time. This keeps future information out of training inputs.

For problems with multiple anchor points, we often create multiple examples per entity. For example, a single flight_id might produce three training rows: one at t = scheduled_departure - 24h, one at t = scheduled_departure - 1h, and one at t = scheduled_departure + 10m. Each row has different available features.

Example construction usually includes feature computation. Some features can be computed directly from the joined columns (encode a complaint type, normalize a numeric field). Many of the highest value features in production systems are computed as time-aware aggregates first, and then joined onto each example at its anchor time.

Examples of aggregate features we might compute before the final join:

  • 311: counts and rates over time windows, such as “median closure time for this agency in the last 30 days”.
  • Flights: rolling aggregates like “average arrival delay at this airport over the last 30 minutes” and “carrier-wide average delay over the last day”.
  • Food delivery: “restaurant average delay over the last hour”, “courier supply vs demand in the last 15 minutes”, and “reassignment rate in the last day”.

When we compute windowed features, we define the window relative to the anchor time. For example, “last 30 minutes” means [anchor_time - 30 minutes, anchor_time), not “last 30 minutes from when the batch job runs”.

Example assembly is also where we handle fusion. For example, suppose we build a fake headshot detector. Our core input is an uploaded headshot image, but the image is not the only useful signal. We may also have:

  • Image metadata (for example, EXIF fields such as camera model or whether the file claims to be edited).
  • IP-level signals (for example, did this IP create many profiles in the last 24 hours).
  • Profile context (for example, is this a new profile, and what are the profile fields the user provided).
  • Profile behavior signals (for example, how quickly the profile performs actions after sign-up, or how often it is edited).

There are multiple ways to combine these modalities. In late fusion, we train an image-only model and a tabular model separately and combine their outputs. In mid-level fusion, we first compute an image representation and then join it to tabular context before the final predictor. In practice, we often treat the image embedding as its own dataset. For example, a batch job computes image_embeddings(image_id, encoder_version, embedding, computed_at) and stores it in the lakehouse. Example assembly then joins image_embeddings to the profile and behavior feature tables using image_id and uses the encoder_version as part of dataset versioning. If the encoder changes, we can backfill embeddings and rebuild the fused training dataset.

4.7.5 Splitting and leakage

Splitting a dataset is more than just randomly dividing into training and test sets. In production ML systems, splitting is part of the causal design.

We generally want splits that answer the question: how will the model behave on future data and future entities?

There are several common split strategies:

  • Time-based splits: train on older data, validate on more recent data, test on the most recent holdout window.
  • Entity-based splits: keep all rows for an entity in one split (for example, all rows for a delivery_id).
  • Group-based splits: split by a higher-level group such as restaurant, store, or user to measure generalization to new groups.

Time-based splits are often the default for operational prediction problems. For flight delay prediction, we might train on flights from January to October, validate on November, and test on December. This aligns evaluation with deployment: the model will be used on future flights.

Entity leakage is a common failure mode when we have multiple rows per entity (multiple anchor points) or near-duplicate records. For example, if we build multiple training rows per delivery_id (checkout, after assignment, after pickup), we keep all rows for delivery_id in the same split. Otherwise, the model can memorize aspects of a specific delivery across anchor points.

Time splits also need a decision about what “time” means. Many datasets have multiple timestamps: created_at, updated_at, event_time, and the label time. For splitting, we usually use the prediction time (the anchor time), because it matches when the model would run. For example, for food delivery ETA training examples anchored at checkout time, we split by checkout time. For flight delay examples anchored at “24 hours before departure”, we split by that anchor time, not by when the ingestion job ran.

Group-based splits are useful when we care about generalization to new groups. For example:

  • A split by restaurant ID evaluates whether a delivery ETA model generalizes to restaurants it has never seen.
  • A split by airport evaluates whether a flight delay model generalizes to new airports.
  • A split by zip code evaluates whether a 311 model generalizes to neighborhoods with different patterns.

These splits can be stricter than time splits. A model can do well on future dates but still fail badly on a new restaurant that has very different preparation times.

Leakage can also happen through feature computation, even if splits are correct. Common leakage patterns include:

  • Future outcomes as features: using closed_date or status="Closed" when predicting 311 closure time.
  • Post-anchor updates: using the final courier id or pickup time when training an ETA model at checkout time.
  • Aggregates computed with future data: computing “average delay at this airport over last 30 minutes” using arrivals after the anchor time.
  • Global normalization or encoding using the full dataset: computing statistics, imputing missing values, or target encoding using train and test together.
  • Label leakage through joins: joining a label table and accidentally exposing label columns to the model.

For the examples in this chapter, practical, concrete steps for leakage prevention would include:

  • 311: if the prediction is anchored at created_date, fields that change later (like status, closed_date, or corrected location fields) cannot be used as features unless they were known at creation. If we want to use status updates, we define a later anchor time and create a separate set of examples.
  • Flights: anchor points have different allowable fields. At 24 hours before departure, we cannot use anything that depends on the actual flight (wheels-on time, actual elapsed time). At 10 minutes after departure, we can use departure delay so far but not actual arrival delay.
  • Food delivery: at checkout, we cannot use pickup or dropoff timestamps, and we cannot use the final courier assignment. We can use current restaurant backlog and current courier supply signals computed before checkout.

Leakage can look subtle in real systems, and sometimes requires domain expertise. Here is a concrete medical example that shows how “future knowledge” can sneak in. Suppose we want to predict whether a patient will be diagnosed with high blood pressure at their next visit. You have a dataset of patient information including diagnosis codes, prescriptions, lab results, and visit notes. You think you can create a training set by excluding the high blood pressure diagnosis code from the data, and using its presence as a label. But, many patients start taking blood pressure medication after a diagnosis. If we include prescriptions in the dataset, and the prescription list is post-diagnosis, the model can learn a shortcut: patients who take medication for high blood pressure are likely to have a diagnosis of high blood pressure. But in deployment, that prescription information would only be available after diagnosis.

Some practical mechanism for avoiding data leakage:

  • Use time-aware joins (as-of joins) for reference tables and snapshots.
  • Compute windowed aggregates relative to anchor time, not relative to batch job time.
  • Apply preprocessing (normalization, vocab building, encoding) using training data only, then reuse the fitted preprocessors for validation and test.
  • Deduplicate before splitting, and keep duplicates in the same split if they remain.
  • Use a split strategy that matches the deployment goal. If the model will be deployed to new restaurants, a split by restaurant ID measures that generalization. If the model will be deployed to future time periods, a time-based split measures that generalization.

4.7.6 Versioning and tracking

Once we have examples, features, and splits, we freeze a dataset snapshot so training runs can be reproduced later.

A dataset snapshot includes:

  • The exact input partitions and tables used (for example, raw and silver partitions by date)
  • The transformation code version (git SHA) and configuration
  • The feature definition versions (or a feature store reference)
  • The split definition (time cutoffs, entity grouping rules)
  • The resulting files or tables, stored immutably (for example, a date-stamped path with a content hash)

There are several ways to implement this in practice:

  • File-oriented data versioning tools (for example, DVC) work well when training data is stored as files alongside code in a Git-style workflow. DVC computes content hashes for data artifacts, stores those hashes in small metadata files tracked by Git, and maps them to objects in local/remote cache storage (for example, S3). This lets a commit reproduce the exact dataset version without checking large binaries into Git.
  • Lakehouse table formats (for example, Delta Lake or Apache Iceberg) are often a strong default when data already lives in a lakehouse, because table snapshots and time travel are built into the storage layer.
  • Similarly, warehouse-native versioning works when we are already using a warehouse with built-in features like snapshot tables and time-travel queries.

The model training experiment tracking system should then link these dataset versions to training runs, model versions, and evaluation metrics.

4.7.7 Training input

Once a dataset exists, we still need to feed it efficiently into training runs. For small local experiments, we can copy data to local disk and read it from there. In production, however, training data is often stored in object storage (for example, S3), which complicates matters.

If we are training a model on a large tabular dataset, we may be using scikit-learn. This framework includes two general categories of training algorithms:

  • Incremental estimators (for example, SGDClassifier/SGDRegressor) that support partial_fit and can train on one batch at a time.
  • Non-incremental estimators that generally expect fit(X, y) on an in-memory array-like dataset.

The first category is designed to work with dataset that don’t fit in memory. For example, we can train linear/logistic models incrementally with SGD by reading chunks from a parquet file. We can combine this with one of the batching approaches discussed earlier, e.g. with pyarrow.datasets in this example:

import pyarrow.dataset as ds
from sklearn.linear_model import SGDClassifier

# PyArrow Dataset scans many parquet files (local or S3) as one dataset
# and yields record batches, so we do not load everything into memory.
dataset = ds.dataset("s3://ml-datasets/train/", format="parquet")

clf = SGDClassifier(loss="log_loss", random_state=42)   # logistic regression via SGD

classes = [0, 1]
first_batch = True
for record_batch in dataset.to_batches(columns=["x", "y"], batch_size=50_000):
    # Convert Arrow batch columns to NumPy arrays for scikit-learn.
    X = record_batch.column("x").to_numpy()
    y = record_batch.column("y").to_numpy()
    if first_batch:
        # SGDClassifier needs all possible label classes on the first
        # partial_fit call so it can initialize its class structure.
        clf.partial_fit(X, y, classes=classes)
        first_batch = False
    else:
        clf.partial_fit(X, y)

But, this approach won’t work for the non-incremental models. For other models, e.g. gradient boosted trees, we might instead use XGBoost integration with Dask or Spark.

What about training a model on unstructured data, such as images, audio, or text? In this case, especially in the case of image or audio data (which is relatively large per sample), the data is likely to be in object storage, and we cannot or should not copy it locally because it is very large. So, besides for concerns about what fits in memory and what doesn’t, we also need to be concerned about streaming data across a network.

As a baseline, suppose we were training a neural network on local training data using ImageFolder with split directories:

from torchvision import datasets
from torch.utils.data import DataLoader

data_root = "image_cls"

train_dataset = datasets.ImageFolder(f"{data_root}/training")
val_dataset = datasets.ImageFolder(f"{data_root}/validation")
eval_dataset = datasets.ImageFolder(f"{data_root}/evaluation")

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
eval_loader = DataLoader(eval_dataset, batch_size=32, shuffle=False, num_workers=4)

If the same dataset is in object storage, we can build a drop-in replacement with the same train_loader/val_loader/eval_loader interface:

import io
import fsspec
from PIL import Image
from torch.utils.data import Dataset, DataLoader

# Credentials are usually read from environment variables, not hardcoded in source.
fs = fsspec.filesystem(
    "s3",
    key="s3_key",
    secret="s3_secret",
    client_kwargs={"endpoint_url": "s3_endpoint"},
)

class RemoteImageDataset(Dataset):
    def __init__(self, samples, fs):
        self.samples = samples  # list of {"url": "...", "label": int}
        self.fs = fs

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample = self.samples[idx]
        with self.fs.open(sample["url"], "rb") as f:
            img = Image.open(io.BytesIO(f.read())).convert("RGB")
        return img, int(sample["label"])


train_dataset = RemoteImageDataset(train_samples, fs=fs)
val_dataset = RemoteImageDataset(val_samples, fs=fs)
eval_dataset = RemoteImageDataset(eval_samples, fs=fs)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
eval_loader = DataLoader(eval_dataset, batch_size=32, shuffle=False, num_workers=4)

But this one-file-at-a-time pattern is usually very inefficient at scale, even if we increase num_workers to hide some latency. Each sample triggers a separate network request, object-store metadata lookup, and file open/close cycle. The trainer spends significant time waiting on remote I/O instead of computing, which lowers GPU utilization and increases end-to-end training time.

So when data is in object storage, we usually shard it before training.

For example, we might lay out sharded image data like this:

s3://ml-datasets/image_cls/
  training/
    shard-000000.tar
    shard-000001.tar
    ...
  validation/
    shard-000000.tar
    ...
  evaluation/
    shard-000000.tar
    ...

Each TAR shard contains many samples (.jpg plus a label file such as .cls or .json). A simple sharding script could write split-specific shards:

import webdataset as wds

def write_split(records, out_pattern, maxcount=10_000):
    with wds.ShardWriter(out_pattern, maxcount=maxcount) as sink:
        for i, r in enumerate(records):
            sink.write({
                "__key__": f"sample-{i:09d}",
                "jpg": open(r["image_path"], "rb").read(),
                "cls": str(r["label"]),
            })

write_split(train_records, "s3://ml-datasets/image_cls/training/shard-%06d.tar")
write_split(val_records, "s3://ml-datasets/image_cls/validation/shard-%06d.tar")
write_split(eval_records, "s3://ml-datasets/image_cls/evaluation/shard-%06d.tar")

And we can load those shards as another drop-in replacement:

import webdataset as wds
from torch.utils.data import DataLoader

train_dataset = (
    wds.WebDataset("s3://ml-datasets/image_cls/training/shard-{000000..000255}.tar")
    .shuffle(10_000)
    .decode("pil")
    .to_tuple("jpg", "cls")
    .map_tuple(lambda x: x, int) # converts label to integer
)
val_dataset = (
    wds.WebDataset("s3://ml-datasets/image_cls/validation/shard-{000000..000031}.tar")
    .decode("pil")
    .to_tuple("jpg", "cls")
    .map_tuple(lambda x: x, int)
)
eval_dataset = (
    wds.WebDataset("s3://ml-datasets/image_cls/evaluation/shard-{000000..000031}.tar")
    .decode("pil")
    .to_tuple("jpg", "cls")
    .map_tuple(lambda x: x, int)
)

train_loader = DataLoader(train_dataset, batch_size=32, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, num_workers=4)
eval_loader = DataLoader(eval_dataset, batch_size=32, num_workers=4)

In training, .shuffle(10_000) gives buffered sample-level shuffling over a 10,000-sample window. Validation and evaluation stay unshuffled for deterministic metrics.

A practical rule is:

  • Prefer large shard files over many tiny files.
  • Keep preprocessing parallel (num_workers) so compute does not wait on I/O.
  • When you get new training samples, write new shard files for them; periodically you may compact/re-shard when you want to re-balance time and class within shards.

For image-heavy pipelines, common sharding choices include TAR shards (often with WebDataset, as shown above) and preprocessed record shards. As another option, if you are training a model with PyTorch Lightning and want to stay in the Lightning “ecosystem”, LitData uses its own optimization/sharding step, then streams those optimized chunks.

We would first run sharding as a separate preparation job:

from PIL import Image
from litdata import optimize

def encode_sample(sample):
    return {
        "image": Image.open(sample["image_path"]).convert("RGB"),
        "label": int(sample["label"]),
    }

# Build LitData chunks for each split.
optimize(fn=encode_sample, inputs=train_records, output_dir="s3://ml-datasets/image_cls/training_opt", chunk_bytes="64MB", num_workers=4)
optimize(fn=encode_sample, inputs=val_records, output_dir="s3://ml-datasets/image_cls/validation_opt", chunk_bytes="64MB", num_workers=4)
optimize(fn=encode_sample, inputs=eval_records, output_dir="s3://ml-datasets/image_cls/evaluation_opt", chunk_bytes="64MB", num_workers=4)

Then training reads those prebuilt chunks:

from litdata import StreamingDataset, StreamingDataLoader

train_dataset = StreamingDataset(input_dir="s3://ml-datasets/image_cls/training_opt", shuffle=True)
val_dataset = StreamingDataset(input_dir="s3://ml-datasets/image_cls/validation_opt", shuffle=False)
eval_dataset = StreamingDataset(input_dir="s3://ml-datasets/image_cls/evaluation_opt", shuffle=False)

train_loader = StreamingDataLoader(train_dataset, batch_size=32, num_workers=4)
val_loader = StreamingDataLoader(val_dataset, batch_size=32, num_workers=4)
eval_loader = StreamingDataLoader(eval_dataset, batch_size=32, num_workers=4)

Both approaches solve the same core issue: we will read training data in larger sequential chunks instead of one remote object per sample.

4.8 Inference data flows

Training data flows are mostly about building a stable dataset before training starts. Inference data flows are different: we receive a request now, make a decision now, and still need to keep enough records to evaluate and retrain later.

So inference data flows produce two things at the same time:

  • a low-latency response for product behavior
  • a durable record for analytics, monitoring, and future training

4.8.1 Request intake

Inference requests usually arrive from API calls, internal streams, or scheduled batch scoring. The key difference from training is that many requests are latency-sensitive and cannot wait for heavy data processing.

For example:

  • API-triggered: run inference for one entity when a user opens a page.
  • Stream-triggered: run inference on each event update (for example, location pings).
  • Batch-triggered: run inference for many entities on a schedule for monitoring or precomputation.

As in training, when we receive a request, we still need to construct an example. The difference is that the example is request-time, not batch-time. In training, we can afford multi-step dataset builds. In inference, assembly is on the critical path. We still validate, normalize, enrich, and construct features, but we must do it quickly and with clear fallbacks.

So we decide how features are produced and served:

  • In on-demand feature computation, we compute features at request time (for example, query the current courier location and compute distance to the restaurant). This can be simple, but it can add latency and load.
  • In materialized feature computation, we precompute features and store them so training and inference can read them quickly. For example, we might maintain a rolling “average delay at airport over last 30 minutes” table, or a “restaurant average delay over the last hour” table, updated as new events arrive.

For many products, the highest-value signals are real-time features, including streaming aggregates such as:

  • “views in the last 5 minutes”
  • “average delay at this airport in the last 30 minutes”
  • “events from this account in the last hour”

Inference commonly reads from a low-latency state store maintained by streaming jobs, instead of on-demand computation of aggregates on each request.

Feature freshness is an important design choice. Materialized features usually reduce latency and make training/inference definitions easier to keep aligned, but they also add compute and storage work for features that are not necessarily going to be used i.e. if no request arrives that needs them.

4.8.2 Return now, store for later

Inference typically has two output paths.

  1. Return the prediction immediately for use by the application
  2. Store the prediction event asynchronously as an immutable event for analytics and retraining

For example:

  • In delivery ETA, we return “17 minutes” to the app now, then store the full inference event for later analysis.
  • In flight delay prediction, we return “high delay risk” now, then store what features and model version produced that result.

The stored prediction enables retraining, evaluation, and debugging from real production behavior. A stored prediction event includes:

  • request identity (request_id, entity id)
  • prediction context (anchor_time, prediction_time)
  • prediction output
  • feature snapshot (or stable references)
  • version fields (model version, feature version, code version)
  • timestamps (request received, prediction produced)

Anchor time creates a common complication. The same entity can be predicted multiple times at different anchors, so we must store the anchor explicitly. For example, a single delivery_id might have predictions at checkout, after courier assignment, and after pickup. If we store only delivery_id and output, later evaluation will mix different decision points and produce misleading metrics.

Versioning needs to be part of each stored prediction event. If we do not store model, feature, and code versions with each record, we cannot later explain whether a behavior change came from a model update, a feature-definition change, or a serving-code change. We might need versions later to support replay: re-running inference on historical requests with a chosen model/feature version to compare outputs under controlled conditions. We might also need versions to support backfill, where we write those recomputed predictions into historical analytics or serving tables after we change logic.

Inference records can contain sensitive identifiers and user context, so we might also need to enforce a retention policy here. We keep only what we need, separate high-risk raw data from analytics tables, and enforce explicit retention windows.

4.8.3 Labels

Inference gives us predictions now, but labels arrive later, sometimes much later. So we must store enough information to join outcomes back to the right prediction event.

4.8.4 Feature stores

As the number of features grows, teams often build a shared place to define and reuse them. A feature store is one approach: it standardizes feature computation, keeps a catalog of feature definitions, and can serve features for both offline training and online inference.

feature_store_flow cluster_feature_store Feature store batch_source Batch source batch_transform Batch transform batch_source->batch_transform offline_store Offline data store batch_transform->offline_store fs_layout Store Serve Catalog offline_store->fs_layout stream_source Stream source stream_transform Stream transform stream_source->stream_transform online_store Online data store stream_transform->online_store online_store->fs_layout model_training Model training inference Inference fs_layout->model_training fs_layout->inference

Feature stores are often motivated by an offline/online mismatch. For example, if we compute “average arrival delay at this airport over the last 30 minutes” one way in a batch job for training, but compute it a different way in production, the model will see a different input distribution at inference time than it saw during training.

Feature stores are most useful when the same feature is needed in multiple places. For example:

  • A rolling airport delay feature can be used for flight delay prediction and for operational dashboards.
  • A restaurant backlog feature can be used in a delivery ETA model and a dispatch optimization system.

4.9 System design

When designing a data system for an ML product, we are making choices about data sources, ingestion, storage, processing, transformation, governance, and operations. Those choices then determine what kinds of models we can train, what kinds of product behavior we can support, and how safely we can evolve the system over time.

Most of the implementation work in this chapter is typically owned by data engineers (often with data platform or analytics engineers). But ML engineers and data scientists still need to understand these choices, because model quality and production behavior depend on them.

Design tasks:

  1. Define the data products we need (tables, feature sets, and logs) and who depends on them.
  2. Evaluate external and internal data sources for quality, coverage, timeliness, and compliance (consent, licensing, retention, privacy).
  3. Choose ingestion patterns per source (full loads, incremental loads, event streams, CDC) and define what “durable records” means for each.
  4. Design the data repository architecture (operational databases, warehouses, lakes/lakehouses, object storage, caches) and what is the system of record for each type of data.
  5. Choose processing and compute engines for batch and (if needed) for streaming, based on scale and latency requirements.
  6. Define schemas for structured data and metadata conventions for unstructured data (lineage, governance tags, version fields).
  7. Design the transformation layer (owned code, tests, documentation, stable destinations) that turns raw inputs into usable data products.
  8. Design the control plane (quality checks, access controls, schema change handling, freshness checks) and what happens when checks fail.
  9. Design orchestration and operations (scheduling, dependencies, retries/failure handling, observability, backfills, publishing).
  10. Design training data flows (labels, example assembly, splitting and leakage prevention, dataset versioning, training input pipelines).
  11. Design inference data flows (request records, feature access, logging, outcome capture, versioning, retention) so inference outputs become future training and analytics data.

Questions to ask (about the problem):

  • Product and success criteria
    • What decision or product behavior will the model drive, and what happens when it is wrong?
    • What is the latency budget for the user experience (seconds, minutes, hours), and what is the freshness budget for data (real time, hourly, daily)?
    • What are the required outputs: predictions only, explanations, confidence scores, or human review queues?
  • Sources and constraints
    • What are our sources, and which ones do we control (internal) vs consume (external)?
    • For each source, what does it actually provide: file exports, an API, or an event stream?
    • How often does the source update, and what is the best we can do given that schedule?
    • If the source is external, what are the failure modes: rate limits, outages, changing schemas, missing days?
    • What are the terms and conditions on the data (license, consent, retention, regional restrictions), and who is responsible for interpreting them?
  • Scale and shape
    • Is the data primarily structured, unstructured, or mixed?
    • What scale do we expect (rows per day, events per second, total growth rate)?
    • What are the “wide table” risks (large payloads, many columns, frequent schema change)?
  • Training data acquisition
    • Where will training data come from: external datasets, internal product logs, or synthetic generation/augmentation?
    • If we use an external dataset, how well does it match our production setting (users, geography, time period, hardware, language), and what evidence do we have?
    • Do we have coverage of rare and high-impact cases (edge cases), or do we need targeted collection?
    • If labels are scarce, what is our plan: human labeling, synthetic labels from a teacher model, or weak supervision rules?
    • If we use synthetic labels, how will we estimate their error rate (for example, by spot-checking with humans) and reduce feedback loops?
    • What is the label, how long does it take to become known (label latency), and how often is it missing or corrected?
    • What are the anchor points, and what information is allowed at each anchor time?
  • Reproducibility and trust
    • What does “durable” mean for each source: what must we store so we can replay, audit, and rebuild derived tables later?
    • How will we prevent data leakage (time-aware joins, windowed aggregates relative to anchor time, split strategy)?
    • How will we version datasets and link them to training runs so we can reproduce results?

Design decisions to make (how we will build it):

  • Ingestion and durability
    • Which ingestion pattern will we use per source (full load, incremental load, event stream, CDC)?
    • What is the stable identifier for each record, and how will we deduplicate updates and corrections?
  • Repositories and storage
    • Where will the raw record of truth live (database table, warehouse table, lakehouse table, object storage files)?
    • Do we need multiple repositories (for example, a warehouse for analytics and a lakehouse for large raw files), and how will we keep them consistent?
    • What is the schema strategy: strict schema, flexible schema, or both (raw payload plus curated columns)?
    • How will we partition data over time so batch jobs can read and write efficiently?
  • Processing and compute
    • What compute engines will we use for batch and (if needed) streaming, and where will compute run relative to storage?
  • Transformation layer
    • What are the stable outputs downstream users will depend on (table names, schemas, partitioning)?
    • Who owns each output and responds when it breaks?
    • How will we document field meanings so new engineers (and future automation) can use the data safely?
    • How will we change the data product safely: add columns, fix bugs, and handle breaking changes?
  • Data control plane
    • What checks will run at ingestion and after each transformation, and what happens when they fail?
    • How will we encode lineage and governance metadata so downstream systems can apply policy automatically?
    • What access boundaries do we need (least-privilege access) so a mistake has a limited blast radius?
  • Orchestration and operations
    • What are the dependencies between steps, and what should run in parallel vs in sequence?
    • What is the expected schedule (daily, hourly, event-driven), and how will we handle missed runs?
    • How will we backfill history when logic changes or late data arrives?
    • What observability do we need: logs, metrics, alerts, and dashboards per pipeline stage?
  • Training and inference data flows
    • How will we feed training data efficiently (shards, streaming, caching) at the scale we need?
    • What will we log for every prediction so we can audit and rebuild later (request id, entity id, prediction time, features used, model version)?
    • Where will inference records live for analytics and retraining, and what do we keep in the operational database?
    • How will we join outcomes back to predictions, and what retention policy will limit replay and backfills?

4.10 Key terms

  • AI agents: A system that can plan and execute multi-step tasks, including reading documentation and writing code, rather than only answering a single question.
  • alerts: Automatic notifications triggered when a rule is violated, such as a failed run or a dataset being late.
  • analytical: Workloads optimized for large scans and aggregations over many rows.
  • analytics data: Data used to understand system behavior over time, including engagement, errors, and fairness.
  • anchor points: A specific time in an entity’s lifecycle when we make a prediction, such as at order creation or 10 minutes after pickup.
  • application state: The product data needed for the system to operate moment to moment.
  • as-of joins: Joining a record at a given time to the most recent reference data at or before that time, rather than the latest overall.
  • backfill: Recomputing historical outputs (for example, daily partitions) after logic changes or when late or corrected inputs arrive.
  • batch processing: Processing data on a schedule by running jobs over a chunk of accumulated data (for example, every hour or every day).
  • bias: Systematic error or unfairness that arises when data or modeling choices cause worse performance for some groups or scenarios.
  • blast radius: How much damage a mistake can cause, such as accidentally granting broad access to a sensitive dataset.
  • block storage: Storage that exposes addressable blocks for low-latency random reads/writes, typically used by databases and logs.
  • buckets: Top-level containers that define a namespace and policy boundary for objects.
  • business time: The time when an event happened in the real world (for example, when a user clicked a button or a delivery was picked up).
  • causality: The order of events in time: what was known at a given moment, and what was only known later.
  • change data capture (CDC): A technique that captures inserts/updates/deletes from a transactional database as an ordered stream of changes, often by reading the database’s write-ahead log (WAL).
  • class balance: The proportion of positive vs negative labels, such as outcome 1 vs outcome 0.
  • cluster manager: The system responsible for starting Spark executors on worker machines and managing cluster resources. Examples include Spark standalone, YARN, and Kubernetes.
  • compute engine: A system that runs data-processing programs (batch or streaming) that read and write data files (for example, Spark or Flink).
  • consumer: A service that reads events and updates some output (a table, a dashboard, a notification, or a model feature).
  • correlations: How two or more variables change together, like a feature and a related feature moving in the same direction.
  • DAG: A directed acyclic graph: a set of steps with one-way dependencies, where a step runs only after its prerequisites finish successfully.
  • data contract: A published promise about a dataset: its schema, meanings, update frequency, and what changes are allowed without breaking downstream users.
  • data control plane: The set of rules, metadata, and automated gates that decide whether data is valid, visible, and usable as it moves through the pipeline.
  • data engineers: An engineer focused on building and operating reliable data ingestion, transformation, storage, and serving pipelines.
  • data lake: Object-storage-based repositories of raw and processed files with a metadata layer.
  • data lakehouse: Lakes with added schema and transactional semantics to behave more like warehouses.
  • data leakage: Using information at training time that would not have been available at prediction time, causing overly optimistic offline results.
  • data pipeline: The set of processes that move data from where it is produced to where it can be reliably queried and used.
  • data plane: The part of the stack that executes data movement and transformations: reading, writing, and computing over data.
  • data product: A dataset designed, owned, and maintained so other teams can depend on it, with clear meaning, quality expectations, and update behavior.
  • data warehouses: Analytical stores optimized for large read-heavy queries and aggregations.
  • data zones: A logical area of storage with its own access rules and expectations, such as a restricted zone for sensitive data and a broader zone for de-identified data.
  • data-centric: An ML approach where improvement comes primarily from changing the data while keeping the model fixed.
  • database view: A named SQL query saved in a database so it can be queried like a table, without storing a separate copy of the data.
  • dbt: A tool that treats SQL queries as version-controlled data models and builds them into tables/views with tests and documentation.
  • dependencies: The relationships between steps that determine execution order (step B can only run after step A succeeds).
  • differential privacy: A privacy guarantee that limits how much any single person’s data can change the output, so the synthetic data does not reveal whether a person was in the seed set.
  • document databases: Databases that store semi-structured records with flexible schema (e.g., MongoDB).
  • driver: The coordinating Spark process that plans the job, optimizes the computation, schedules tasks, and tracks progress.
  • durable: Data that will not disappear if a process crashes or a machine restarts.
  • ELT: A pipeline pattern where we extract and load raw data into the repository first, and then transform it inside the repository.
  • enrichment: Adding fields to an existing record by calling an external service or joining reference data.
  • entity: The thing we make predictions about, usually identified by a stable id.
  • ephemeral: Storage that is not preserved once the compute instance it is attached to ends.
  • ETL: A pipeline pattern where we extract data, transform it outside the target repository, and then load the transformed result.
  • event log: An append-only record of events (facts) produced by a system, kept in order so it can be replayed later.
  • event stream: A continuous sequence of events or updates that can be consumed as they arrive, rather than a once-per-day file or table.
  • executors: A Spark process that runs tasks on a worker machine and can cache intermediate data.
  • experiment tracking: Tracking training runs, parameters, datasets, and results so we can compare experiments and reproduce outcomes.
  • failure handling: What we do when a step fails: retry, alert, stop early, keep the previous output, or quarantine bad inputs.
  • feature store: A system that stores feature definitions and computed feature values so training and inference can use consistent features.
  • frankenstein dataset: A dataset that combines samples from multiple sources.
  • freshness: A check that a dataset is up to date, for example that the prior-day partition exists by an agreed deadline.
  • full load: Copying an entire dataset or table at once, rather than only changes since the last run.
  • fusion: Combining signals from multiple modalities (for example, images plus tabular context) to make one prediction.
  • governance: Policies and controls that govern data access, use, and retention.
  • greenfield: A new system built from scratch, where we can choose the architecture rather than inheriting legacy constraints.
  • idempotency: A property where running the same step multiple times produces the same final result (or at least does not corrupt outputs).
  • immutable: Data that is written once and not modified in place; updates create new versions.
  • in-memory: Datastores that keep data in RAM for very low-latency access, often used as caches or online feature stores.
  • incremental load: Copying only what changed since the last successful ingestion run.
  • ingestion: The process of capturing data from a source system and writing it into storage that can be read and replayed later, usually with metadata and an audit trail.
  • key-value stores: Datastores that store and retrieve values by key, optimized for fast lookups.
  • knowledge distillation: Training a smaller student model to match the outputs of a larger teacher model, so the student approximates the teacher at lower cost.
  • label: The ground truth value we want the model to predict, produced from real outcomes after the prediction time.
  • label latency: How long it takes for labels to become known after the prediction time.
  • late fusion: A design where we combine per-modality predictions at the end, for example by a weighted score or a small classifier on the scores.
  • lazy execution: A style where the system builds and optimizes a plan of operations first, and only runs it when you explicitly trigger execution.
  • least-privilege access: Granting each person or system only the minimum access they need to do their job.
  • lineage: The ability to trace data, labels, or predictions back to their sources and transformations.
  • log-based replication: Copying changes by reading a database’s internal change log rather than periodically querying tables.
  • logs: Text output from each step that shows what happened (and why it failed), often captured with timestamps and saved for later debugging.
  • lookback window: A small time buffer that re-reads recent data to catch late arrivals or corrections.
  • marginals: Single-variable distributions, for example the distribution of age by itself.
  • materialized: Precomputing features and storing them so they can be read quickly later.
  • materialized view: A precomputed, stored result of a query that is refreshed on a schedule or when inputs change.
  • medallion architecture: A common lakehouse convention that organizes data into raw, cleaned, and curated layers: bronze (raw), silver (cleaned/conformed), gold (curated outputs).
  • message broker: A system that accepts events from producers and delivers them to consumers, often with retention so events can be replayed.
  • metastore: A catalog service that maps human-readable table names to their physical locations, formats, and access rules.
  • metrics: Numeric measurements we record over time, such as runtime, row counts, and retry counts, which we can chart and alert on.
  • micro-batching: Running small batch jobs very frequently so outputs update in near real-time (for example, every few minutes).
  • mid-level fusion: A design where we combine intermediate representations (for example, embeddings) from each modality and train a final predictor on the combined representation.
  • model-centric: An ML approach where improvement comes primarily from changing the model.
  • object storage: Storage for large blobs addressed by key, optimized for durability and throughput rather than in-place updates.
  • observability: Making pipeline runs visible with logs, metrics, and alerts so we can see what happened and debug failures.
  • offline/online mismatch: When the feature logic used to train a model is not the same as the feature logic used to run the model in production, causing performance gaps and hard-to-debug failures.
  • on-demand: Computing a value only when it is requested, rather than storing it ahead of time.
  • prediction time: The moment in time when we would make a prediction.
  • prioritize: Selecting examples for labeling based on uncertainty or risk, rather than sampling uniformly at random.
  • producer: A service that emits events describing what happened.
  • proxy label: A measurable stand-in target used when the true outcome is not directly measurable or available.
  • pseudo-labels: Using an existing model’s prediction as a temporary label for new inputs.
  • publish-subscribe (pub/sub): Publish-subscribe: producers publish messages to a topic and subscribers receive them, often with automatic fan-out to multiple consumers.
  • quality gate: An automated validation that must pass before a dataset is published or treated as successful.
  • query engine: A system that executes queries (often SQL) by reading table metadata and scanning data files (for example, Trino or Dremio).
  • rare events: Low-frequency but high-impact patterns, like extreme values or rare outcomes.
  • real-time processing: Processing data soon after it arrives, so outputs update continuously or within seconds/minutes.
  • relational databases: Transactional, schema-based databases optimized for frequent creates, updates, and deletes (e.g., PostgreSQL).
  • reproducibility: The ability to reproduce a model training run with the same data, code, and settings.
  • requirements before data: The principle that intended use, impacted users, unacceptable harms, and constraints should be specified before collecting or reusing data.
  • retention: How long a messaging system keeps past events available to be read again.
  • retries: Automatically running a step again after it fails, often with a limit on how many attempts we will make.
  • scheduling: Deciding when a pipeline run should start, such as daily at 7am or when a new file arrives.
  • schema: A defined structure for data fields and their types in a table or dataset.
  • schema drift: When upstream schemas or field meanings change over time, potentially breaking downstream assumptions.
  • schema on read: A pattern where data is stored first and a schema is applied later, at query time.
  • schema on write: A pattern where data must match a schema at ingestion time before it is stored.
  • selection bias: Bias introduced when the data you collect is not representative of the population you care about because of how examples were selected or who was easiest to observe.
  • shard: Grouping many training examples into larger files so reads are sequential and efficient.
  • shuffle: Redistributing data across workers so that rows with the same key end up on the same worker (for example, for a join or group-by).
  • snapshot: A point-in-time, consistent view of a dataset.
  • Spark SQL: Spark’s SQL interface for querying tables and DataFrames using SQL.
  • staging area: Intermediate storage used to hold raw inputs before transformation and loading.
  • stream processing: Processing events continuously as they arrive, updating derived outputs (tables, counters, features) incrementally instead of waiting for a scheduled batch job.
  • stream processing engine: A system that continuously processes events as they arrive, often keeping state so it can maintain running results like latest values and rolling aggregates.
  • streaming mode: A mode where the engine tries to run the plan by pulling data through in smaller batches instead of materializing large intermediate tables in memory.
  • subscription: A named consumer attachment to a topic that tracks what has been received so the system can deliver new messages and support catch-up.
  • table format: A metadata and transaction layer that represents object-store files as versioned tables (for example, Delta Lake or Apache Iceberg).
  • target: The quantity a model predicts, such as a class label or a number.
  • time travel: Reading data as it existed at an earlier point in time by using historical snapshots/versions.
  • topic: A named stream/category of events inside a broker, such as ‘orders’ or ‘mta_updates’.
  • training data: Data used to build and test models: inputs, labels, model outputs, and corrections.
  • transactional: Workloads optimized for many small, latency-sensitive reads/writes with strong consistency.
  • transformation layer: A set of conventions and tools for defining, versioning, testing, and documenting transformations so they produce reliable, reusable datasets.
  • vector databases: Databases optimized for storing embeddings and approximate nearest-neighbor search.
  • watermark: A saved timestamp used as a checkpoint so the next run knows where to resume.
  • weak supervision: A labeling approach where supervision comes from noisy heuristics, rules, or other weak labelers rather than ground-truth human labels.
  • window: A time range over which we aggregate events, such as ‘last 15 minutes’ or ‘last hour’.
  • work queue: A messaging pattern where each message is processed by one worker, usually used for background jobs.
  • workflow orchestrator: A system that schedules and runs pipelines as explicit steps with dependencies, retries, and logs.
  • write-ahead log (WAL): An append-only log of database changes used for crash recovery; CDC tools can read it to stream changes out of the database.

  1. Zhaozhi Qian, Bogdan-Constantin Cebere, and Mihaela van der Schaar. 2023. “Synthcity: facilitating innovative use cases of synthetic data in different data modalities.” arXiv:2301.07573. https://arxiv.org/abs/2301.07573↩︎

  2. Benj Edwards, “Artist finds private medical record photos in popular AI training data set,” Ars Technica, Sep 19, 2022, https://arstechnica.com/information-technology/2022/09/artist-finds-private-medical-record-photos-in-popular-ai-training-data-set/↩︎

  3. Geoffrey A. Fowler. May 9, 2019. “Millions of people uploaded photos to the Ever app. Then the company used them to develop facial recognition technology.” NBC News. https://www.nbcnews.com/tech/security/millions-people-uploaded-photos-ever-app-then-company-used-them-n1003371↩︎

  4. Federal Trade Commission. January 11, 2021. “California Company Settles FTC Allegations It Deceived Consumers About Use of Facial Recognition in Photos.” Press release. https://www.ftc.gov/news-events/news/press-releases/2021/01/california-company-settles-ftc-allegations-it-deceived-consumers-about-use-facial-recognition-photo↩︎

  5. J. Cox, “70,000 OkCupid Users Just Had Their Data Published,” Vice (2016), https://www.vice.com/en/article/70000-okcupid-users-just-had-their-data-published/↩︎

  6. Aja Romano, “Researchers just released the profile data of 70,000 OkCupid users,” Vox, May 12, 2016, https://www.vox.com/2016/5/12/11666116/70000-okcupid-users-data-release↩︎

  7. S. J. Brown et al., “Survivorship bias in performance studies,” Review of Financial Studies (1992), https://doi.org/10.1093/rfs/5.4.553↩︎

  8. Mayur P. Joshi, Ning Su, Robert D. Austin, and Anand K. Sundaram, “Why So Many Data Science Projects Fail to Deliver,” MIT Sloan Management Review 62(3) (Spring 2021), 85-89. https://www.proquest.com/scholarly-journals/why-so-many-data-science-projects-fail-deliver/docview/2954927612/se-2↩︎

  9. David P. Hughes and Marcel Salathé. 2015. An open access repository of images on plant health to enable the development of mobile disease diagnostics through machine learning and crowdsourcing. CoRR abs/1511.08060. http://arxiv.org/abs/1511.08060↩︎

  10. Davinder Singh, Naman Jain, Pranjali Jain, Pratik Kayal, Sudhakar Kumawat, and Nipun Batra. 2020. PlantDoc: A Dataset for Visual Plant Disease Detection. In Proceedings of the 7th ACM IKDD CoDS and 25th COMAD (CoDS COMAD 2020). Association for Computing Machinery, New York, NY, USA, 249–253. https://doi.org/10.1145/3371158.3371196↩︎

  11. Michael Roberts et al., “Common pitfalls and recommendations for using machine learning to detect and prognosticate for COVID-19 using chest radiographs and CT scans,” Nature Machine Intelligence, Mar 15, 2021, https://doi.org/10.1038/s42256-021-00307-0↩︎

  12. Gianluca Maguolo and Loris Nanni, “A critic evaluation of methods for COVID-19 automatic detection from X-ray images,” Information Fusion 76 (2021), 1-7, https://doi.org/10.1016/j.inffus.2021.04.008↩︎

  13. Roxana Daneshjou et al., “Disparities in dermatology AI performance on a diverse, curated clinical image set,” Science Advances 8(32) (2022), https://doi.org/10.1126/sciadv.abq6147↩︎

  14. Andres Morales-Forero, Lili Rueda Jaime, Sebastian Ramiro Gil-Quinones, Marlon Y. Barrera Montanez, Samuel Bassetto, and Eric Coatanea. “An insight into racial bias in dermoscopy repositories: A HAM10000 data set analysis.” JEADV Clinical Practice 3(3):836-843 (2024). https://doi.org/10.1002/jvc2.477. URL: https://onlinelibrary.wiley.com/doi/abs/10.1002/jvc2.477↩︎

  15. David Meyer, “Amazon reportedly killed an AI recruitment system because it couldn’t stop the tool from discriminating against women,” Fortune (via Yahoo Finance), Oct 10, 2018, https://finance.yahoo.com/news/amazon-reportedly-killed-ai-recruitment-100042269.html↩︎

  16. Z. Obermeyer et al., “Dissecting racial bias in an algorithm used to manage the health of populations,” Science (2019), https://doi.org/10.1126/science.aax2342↩︎

  17. New York City Taxi and Limousine Commission (TLC). “2023 Yellow Taxi Trip Data.” NYC Open Data. Accessed Jan 9, 2026. https://data.cityofnewyork.us/Transportation/2023-Yellow-Taxi-Trip-Data/4b4i-vvec/about_data↩︎