Machine Learning/Airflow ML Pipelines
This documentation is a guide for Machine Learning (ML) engineers to develop, deploy, and manage end-to-end ML training pipelines on the WMF Airflow ML instance. It is based on the experience of productionizing the Tone-Check model training pipeline, but the patterns and best practices apply to any ML pipeline at WMF.
The goal of the Airflow ML platform is to provide a scalable and reproducible environment for training and evaluating models. It allows the ML team to focus on building job logic while leveraging a powerful orchestration platform managed by the Data Engineering Platform (DPE) SRE team.
To understand this Airflow ML Pipelines guide, please first read the WMF Airflow developer guide written by Data Platform team.
Roles and Responsibilities
- DPE SRE Team: Manages the Airflow instances (
airflow-ml,airflow-devenv), the Kubernetes cluster, and core data infrastructure like Hadoop. They are responsible for enabling features like the Airflow Triggerer, setting resource quotas, and managing PVCs. - ML Team: Owns the pipeline logic. This includes writing the Spark and Python scripts for the jobs, defining the Airflow DAGs to orchestrate them, and packaging the code into artifacts.
Core Concepts and Architecture
Our ML pipelines operate across two primary, interconnected ecosystems: the Hadoop/YARN ecosystem for large-scale data processing and the Kubernetes ecosystem for containerized tasks like model training. Airflow serves as the central orchestrator that manages the flow of data and dependencies between these two environments.
Architectural Diagram and Data Flow
The following diagram illustrates the typical end-to-end workflow of an ML pipeline, using the Tone-Check pipeline as an example:

The flow is as follows:
- ETL Data Generation: The Airflow ML instance triggers the ETL Data Generation task in the pipeline using the
SparkSubmitOperator. - ETL on Hadoop: A Spark job is submitted to the YARN cluster. It extracts and transforms raw data from the Hive Data Lake.
- HDFS (Intermediate Storage): The Spark job writes its output (e.g a processed training dataset) as a Parquet file to HDFS.
- Copy HDFS to PVC: The
WMFKubernetesPodOperatorlaunches a k8s pod to copy the processed data from HDFS to a Persistent Volume Claim (PVC). - PVC (Shared Filesystem): The PVC acts as a shared filesystem, making the data available to subsequent Kubernetes-based tasks.
- Model Training: The
WMFKubernetesPodOperatorlaunches a k8s pod, that requests a GPU. It mounts the PVC, reads the training data, and runs the model training script. - Model Artifact Storage: The training pod writes the final model artifact back to the PVC.
The Two-Repository Structure
To maintain a clean separation of concerns, pipeline development is split across two Git repositories:
- ml-pipelines: This is a monorepo that contains the job logic for all ML pipelines (see T396495#10927126). Each pipeline is a self-contained Python package or docker image with its own dependencies and entry points. This is where you write your Spark and Python scripts.
- airflow-dags: This repository contains the scheduling logic. Each pipeline has a DAG file that defines the tasks and their dependencies, but it does not contain the core application code itself.
Step-by-Step Guide: Creating a New ML Pipeline
This guide uses the tone-check pipeline as a concrete example. See another example here: T396495#11151194
Step 1: The Job Logic in ml-pipelines repo
The Monorepo Structure
The ml-pipelines repository is structured to host multiple, independent pipelines. Each pipeline lives in its own directory and is configured as either an installable Python package or docker image.
ml-pipelines/
├── .gitignore
├── .gitlab-ci.yml # Top-level CI for the monorepo that triggers pipeline-specific CI
├── .pipeline
│ └── training # Blubber files used to creat job logic docker images using kokkuri
│ ├── example
│ │ ├── copy
│ │ │ └── blubber.yaml
│ │ └── train
│ │ └── blubber.yaml
│ └── tone_check
│ └── retrain
│ └── blubber.yaml
├── .pre-commit-config.yaml
├── README.md
├── requirements-test.txt
├── ruff.toml
├── tests
│ └── training
│ ├── ...
└── training
├── common_utils # Shared library for all pipelines
│ ├── helper_functions.py
│ └── __init__.py
└── tone_check # Tone-Check Pipeline
├── data_generation # Tone-Check Data Generation job logic that creates conda arttifact
│ ├── conda-environment.yaml
│ ├── .gitlab-ci.yml # Tone-Check Pipeline-specific CI
│ ├── pyproject.toml
│ ├── requirements-test.txt
│ └── src
│ └── tone_check
│ ├── generate_training_data.py
│ ├── __init__.py
│ ├── split_training_data.py
│ └── utils.py
├── __init__.py
├── README.md
└── retrain # Tone-Check Model Training job logic that creates docker image
├── .gitlab-ci.yml
├── __init__.py
├── README.md
├── requirements.txt
└── retrain.py
Packaging Your Code: Conda Artifacts vs. Docker Images
You will package your job logic differently depending on where it needs to run:
- For Spark Jobs (on YARN): Package your code as a versioned Conda artifact (e.g
tone_check-0.0.1.conda.tgz). Docker images cannot be used on our Hadoop YARN cluster. Theml-pipelinesrepo uses a standard CI template to build and publish this artifact automatically. - For Kubernetes Pods: Package your code as a Docker image. The
WMFKubernetesPodOperatorwill pull this image to run your task. This is ideal for GPU-based training.
Step 2: The DAG in airflow-dags repo
Create a single DAG file (e.g tone_check_training_dag.py) in the ml/dags directory. This DAG will define all the tasks in your pipeline.
Task 1: The Data Generation/ETL Spark Job (SparkSubmitOperator)
This task runs your data generation script on the YARN cluster.
# Generate the full model-ready training dataset
generate_training_data = SparkSubmitOperator.for_virtualenv(
task_id="generate_training_data",
virtualenv_archive=artifact(conda_artifact_name),
entry_point="bin/tone_check_generate_training_data.py",
application_args=[
"--output-path",
model_ready_data_path,
"--wiki-db",
"enwiki",
"--snapshot",
"2025-04",
],
driver_cores=4,
driver_memory="8G",
executor_cores=4,
executor_memory="20G",
max_executors=64,
conf={
"spark.executor.memoryOverhead": "8G",
"spark.driver.memoryOverhead": "8G",
"spark.driver.maxResultSize": "16G",
"spark.sql.shuffle.partitions": 16384,
"spark.sql.autoBroadcastJoinThreshold": -1,
"spark.sql.adaptive.enabled": "true",
"spark.network.timeout": "1200s",
},
)
Task 2: The Data Transfer Job (WMFKubernetesPodOperator)
This task runs a simple pod to copy the output from HDFS to a shared PVC.
# Copy the split data from HDFS to the PVC
copy_hdfs_to_pvc = WMFKubernetesPodOperator(
task_id="copy_hdfs_to_pvc",
name="copy_hdfs_to_pvc",
image=f"{docker_image_uri}:{copy_docker_image_tag}",
cmds=["bash", "-c"],
arguments=[
textwrap.dedent(
f"""
set -e
set -x
echo "Creating local output directory on PVC if missing: {pvc_timestamped_data_path}"
mkdir -p {pvc_timestamped_data_path}
echo "Copying contents from HDFS '{hdfs_timestamped_data_path}' to PVC '{pvc_timestamped_data_path}'"
hdfs dfs -get {hdfs_timestamped_data_path}/* {pvc_timestamped_data_path}
echo "Verifying contents of PVC output directory after copy"
ls -lR {pvc_timestamped_data_path}
"""
)
],
volumes=[make_persistent_volume_claim_volume(pvc_name=pvc_name, read_only=False)],
volume_mounts=[make_volumemount(volume_name=pvc_name, mount_path=pvc_mount_path, read_only=False)],
pod_template_file=airflow_pod_template_path("kubernetes_pod_operator_hadoop_pod_template"),
security_context=V1PodSecurityContext(fs_group=900),
)
Task 3: The Model Training Job (WMFKubernetesPodOperator)
This task runs the actual training in a GPU-enabled pod.
# Run the model training job
train_tone_check = WMFKubernetesPodOperator(
task_id="train_tone_check",
name="train_tone_check",
image=f"{docker_image_uri}:{train_docker_image_tag}",
cmds=["bash", "-c"],
arguments=[
textwrap.dedent(
f"""
set -e
set -x
echo "Ensuring output model directory exists"
mkdir -p {output_model_path}
echo "Verifying input data exists on PVC"
ls -lR {pvc_timestamped_data_path}
echo "Starting model training"
python3 training/tone_check/retrain/retrain.py \
--train-data-path "{train_data_path}" \
--validation-data-path "{validation_data_path}" \
--output-model-path "{output_model_path}" \
--base-model-path "{base_model_path}"
echo "Verifying model output"
ls -l {output_model_path}
"""
)
],
volumes=[make_persistent_volume_claim_volume(pvc_name=pvc_name, read_only=False)],
volume_mounts=[make_volumemount(volume_name=pvc_name, mount_path=pvc_mount_path, read_only=False)],
node_selector={"amd.com/gpu.vram": "64G"},
container_resources={
"limits": {
"cpu": "4000m",
"memory": "8Gi",
"amd.com/gpu": 1,
},
"requests": {
"cpu": "2000m",
"memory": "8Gi",
},
},
security_context=V1PodSecurityContext(fs_group=900),
deferrable=True,
in_cluster=True,
)
Step 3: Configure Resources and CPU/GPU Access
Requesting CPU and Memory
You should specify resource requests and limits.
requests: The minimum amount of resources your pod needs. The scheduler will not start the pod until a node with these resources is available.limits: The maximum amount of resources your pod can use. If it exceeds this, it will be terminated.
Debugging Memory Issues:
- If your pod is terminated with
reason: OOMKilled, it means your script used more memory than its limit. You could run the job locally to determine memory usage e.g T407212#11280133
Requesting a GPU
To ensure your pod is scheduled on a machine with a specific GPU, use a node_selector. This is the standard way to target specialized hardware.
# Run the model training job
train_tone_check = WMFKubernetesPodOperator(
...
node_selector={"amd.com/gpu.vram": "64G"}, # Targets a node with an AMD MI210 GPU e.g https://phabricator.wikimedia.org/T373806#11275873
container_resources={
"limits": {
"cpu": "4000m",
"memory": "8Gi",
"amd.com/gpu": 1,
},
"requests": {
"cpu": "2000m",
"memory": "8Gi",
},
},
security_context=V1PodSecurityContext(fs_group=900),
deferrable=True,
in_cluster=True,
)
Advanced Topics and Best Practices
Handling Long-Running or Waiting Tasks with Deferrable Operators
If a task needs to wait for an external resource (like a free GPU), a standard operator will occupy a valuable Airflow worker slot while it waits. This is inefficient and can clog the system.
The solution is to use deferrable operators (deferrable=True). A deferred task releases its worker slot and hands off the “waiting” job to a lightweight, asynchronous service called the Airflow Triggerer. Once the condition is met (e.g the pod starts running and then completes), the Triggerer wakes the task up to continue e.g T406958
Development and Testing (airflow-devenv and CI/CD)
- Local Development: Use
airflow-devenvto spin up a personal Airflow instance for testing your DAGs. - CI/CD: The
airflow-dagsrepository has a CI pipeline that runs linting (black) and automated tests (pytest) on every merge request. - Updating Test Fixtures: The repo uses “fixture tests” to ensure changes to a DAG’s generated commands are intentional. If these tests fail after you’ve changed a DAG, it’s not a bug. You should regenerate the fixture files locally using the project’s documented procedure (e.g
REBUILD_FIXTURES=yes pytest) and commit the updated fixtures as part of your merge request.
Current ML Pipelines in Production
| Pipeline DAG ID | Description | Model Card |
|---|---|---|
| tone_check_training_dag | An end-to-end pipeline that generates training data from Wikipedia history and then trains a model to detect “peacock” language in articles. | https://meta.wikimedia.org/wiki/Machine_learning_models/Proposed/Tone_Check |
| … (add new pipelines here as they are created) |
Getting Help
For issues related to the Airflow platform, Kubernetes, HDFS permissions, or cluster resources, please contact the DPE SRE team in their Slack channel or by filing a Phabricator task.