Data Platform/Systems/Airflow/Developer guide
For documentation about Apache Airflow, please refer to https://airflow.apache.org/docs/. This section is not intended to summarize or replace Airflow docs, but rather bring out some basic concepts and common gotchas.
Airflow basics
Glossary
Term | Definition |
---|---|
DAG | Directed Acyclic Graph. In Airflow it’s used as a synonym of job. |
DAG run (or dag_run) | The execution of a DAG for a particular time interval. |
Task | Unit (node) of an Airflow DAG. It’s usually a Sensor, an Operator or a TaskGroup. |
Sensor | Task that waits for a particular asset to be present in the specified location. For instance, it waits for a Hive partition to exist, or an HDFS path to exist. Until a Sensor doesn’t confirm the presence of its target, the subsequent tasks in the DAG are not triggered. |
Operator | Task that applies some workflow action, like computing a dataset, transforming some data, uploading a dataset to a data store, or creating a success file. |
TaskGroup | Group of tasks that can be treated as a task itself. Also, you can see a TaskGroup as an encapsulated sub-DAG. |
Hive partition | A division of a Hive table. In our setup, Hive tables are usually partitioned by time, so that at each new time interval (i.e. each hour/day/month/…) we add a new partition to the Hive table, containing the data that corresponds to that time interval. Partitions allow to prune the data that a query/job needs to read, thus making it more efficient. Partitions are usually named after the time period they represent, like: `database.table_name/year=2022/month=3/day=5/hour=14`. |
Artifact | Any external asset needed for the Airflow job to run, usually code dependencies like Maven JARs, Python virtualenvs, HQL queries, etc. |
SLA | Service Level Agreement. In practice, you set an SLA time on a Task. If the task instance has not finished after that time has elapsed, Airflow triggers an email alert. |
DAG interpretation time vs execution time
Although Airflow DAGs are written in a single Python file, they are run in two stages: DAG interpretation stage and Task execution stage.
DAG interpretation stage
The DAG files are read and interpreted by Airflow about every minute. So, if you deploy a code change to your production instance (or testing instance), Airflow should pick it up quickly without the need of a restart. However, this stage does not execute your tasks, it simply parses the DAG and updates Airflow's internal representation of it. This means your Sensors and Operators will not be triggered yet; also, callables and templates will not be executed until the next stage. The DAG interpretation stage happens in the Airflow machine, and happens every minute for each DAG. Thus, it should be very efficient. We should i.e. avoid connecting to a database or querying HDFS within DAG files.
Task execution stage
When Airflow determines that a DAG should be triggered for a particular date, it will launch a "dag_run" and kick off the task execution stage. In this stage the DAG is not re-interpreted any more, only the Sensors and Operators you defined in your DAG will be executed. Sensors run from the Airflow machine, but they are mostly lightweight. Now, Operators are triggered from the Airflow machine, but ultimately (if done properly) they are executed in the Hadoop cluster. Before Sensors or Operators run, templated values will be resolved.
Data dependencies
Sensors are Airflow's way of defining data dependencies. Sometimes, we just want to trigger a data processing job on a timely schedule, i.e. a job that deletes data from a given place. It can run every hour, sharp. In this case, we don't need Sensors. However, most data processing jobs need to wait for input data to be present in the data store prior to doing any computation. Airflow sensors define those dependencies, and wait until the specified data is present (or other conditions!). In our case, sensors usually wait for 1 or more Hive partitions, or for 1 or more files/directories in a file system.
DAG runs
DAG Run dates
In Airflow a “dag_run” is the execution of a DAG for a particular time interval. DAG runs have a start datetime and an end datetime. Those delimit the period for which data will be processed/computed. The Airflow macro execution_date
is very confusing, because of its misleading name. It is not the date a particular DAG is executed, but rather the start datetime of a dag_run. Note that the start datetime of a dag_run that processes data for 2022-05-12 is going to be the same (2022-05-12), regardless of when you execute the DAG. Thus, DAGs should always process/compute data starting at execution_date
and ending at execution_date + granularity
, where granularity is the DAG’s schedule_interval, namely: @hourly
, @daily
, @monthly
, etc.
e.g. when we talk about the daily job of 2022-04-01
- it handles the data from 2022-04-01-00:00 to 2022-04-01-23:59.999
- it creates a new partition in hive which may be named 2022-04-01
- its
execution_date
is going to be 2022-04-01-00:00 - the dag run could be created and enqueued within Airflow at 2022-04-02-00:12
- then the task instance may run at 2022-04-02-00:15, and complete at 2022-04-02-00:32
Concurrent DAG Runs
By default, our airflow instances are configured to run 3 instances of a dag_run per dag at a time. We have chosen to make this rule the default despite an issue with spark failling when having concurrent jobs writing to the same table (even if in different partitions) - see this task.
If your job fails due to concurrent runs (if it happens, it's most probably because you're back-filling data), one solution to prevent the issue is to add a parameter to your spark job (whether in SQL or as a spark configuration parameter):spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version = 2
This can be done in an HQL query as in:
SET spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version = 2;
INSERT INTO ...
It can also be done using spark parameters in airflow, when using the SparkSQLOperator
or the SparkSubmitOperator
by overriding the conf
parameter as in:
myOp = SparkSqlOperator(
...
conf={
"spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": 2,
...
}
...
)
A great way to be able to change this parameter without having to deploy code is to pass configuration values as airflow variables (see the Airflow developer guide's Variable Properties section)
Important note: Changing the fileoutputcommitter
comes at some cost: while allowing the job to not fail while concurrently writing to the same table, it also leads to possible consistency issues in the output folders of your spark jobs (see this task for more details).
Writing a DAG using our Airflow setup
Our Airflow setup (instances, repository and libraries) offers you a set of tools for writing your DAGs. Those are designed to make your DAG code shorter and easier to read and to test.
Importing code
For your DAG, you’ll need to import Airflow native libraries, but also you can import libraries that we have developed to be shared by all WMF Airflow users. Finally, you can also develop your own helper libraries that are specific to your project.
Airflow native libraries
The Conda environment where Airflow runs (both your production instance and the development instance) has airflow installed in it. So, to import from airflow native libraries you should just use regular imports, like:
from airflow import DAG
from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor
WMF Airflow common
WMF Airflow common (wmf_airflow_common
) is a library that we have developed to provide tools that are tailored to WMF’s Airflow needs. It currently lives within the airflow-dags repository, but we are considering factoring it out in the future. In the meantime, because of the way the Airflow instances’ PYTHONPATH is configured, you can import libraries from it like so:
from wmf_airflow_common.operators.url import URLTouchOperator
from wmf_airflow_common.partitions_builder import partition_names_by_granularity
Instance level imports
Each Airflow instance has its own folder in the airflow-dags repository. For example, the Analytics Airflow instance, which runs in an-launcher1002.eqiad.wmnet
, has an analytics
folder at the root of the airflow-dags repository. All DAGs defined within that folder will be executed in (and only in) the Analytics Airflow instance. If you want to create a library that is specific to your instance, you can add your code to your instance folder within airflow-dags, for example:
Airflow-dags |-- your_instance_folder | |-- dags | |-- config | \-- your_library_code
And then, because of the way the Airflow instances’ PYTHONPATH is configured, you can import libraries from it like:
from your_instance_folder.your_library_code import your_object
# Example for the analytics instance.
from analytics.config import dag_config
Configuration and defaults
One of the goals of our Airflow setup is to reduce the amount of boilerplate code needed to write our jobs. Our Airflow setup offers a set of WMF defaults for the most used parameters (usually well suited for our Hadoop cluster).
WMF common DAG default args
Defaults are usually passed to your DAG via the default_args
parameter (a Python dictionary). All its key-value pairs will be passed to all sensors and all operators of the corresponding DAG. For example, this will set all the WMF Airflow common defaults to your DAG:
from wmf_airflow_common.config import dag_default_args
with DAG(
default_args=dag_default_args.get(),
…
) as dag:
…
Instance level configuration
WMF Airflow common defaults are good in many cases, but there are values that need to be specified per instance. For example, the user that will run all jobs (in Airflow naming: the owner), or the email address that will receive all alerts. To do that, we advise to add instance level configuration to the WMF defaults, like so:
# dag_config.py
from wmf_airflow_common.config import dag_default_args
instance_default_args = {
"owner": "your_user",
"email": "your_email",
}
# This returns a new dictionary with instance defaults merged on top of wmf defaults.
default_args = dag_default_args.get(instance_default_args)
To prevent having to write this for every DAG, this is usually defined in a separate file that can be reused by all DAGs. By convention we use a file named dag_config.py
within a config
directory within the instance folder. Thus, when defining a DAG, we do:
from your_instance.config import dag_config
with DAG(
default_args=dag_config.default_args,
…
) as dag:
…
Feel free to add any other instance-level properties to the dag_config.py
file, so that you can access them from all DAGs. For example: hadoop_name_node, hdfs_temp_directory, etc.
DAG level configuration
If you want to override or add any new configuration to any Sensor or Operator within your DAG, just pass the configuration directly to the Sensor or Operator in question. This will override the defaults passed to the DAG’s default_args parameter. For example:
from your_instance.config import dag_config
with DAG(
default_args=dag_config.default_args,
…
) as dag:
sensor = SomeSensor(
some_property=this_value_overrides_the_defaults,
…
)
…
Variable properties
The VariableProperties module allows you to override DAG configuration at run-time. This is useful when testing your DAG or when back-filling your production DAG. By properties we mean any configuration value. DAGs are ultimately a collection of job configuration properties. The VariableProperties code has some nice documentation that can help as well: https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/config/variable_properties.py
Which properties should be overridable?
Why override properties at run-time? A DAG specifies a list of properties that point to the production environment. When testing your DAG, you don’t want Airflow to use those production configs. Instead, you want to use temporary testing configurations. Here’s a list of common properties you’ll want to make overridable:
start_date | You might want to test a DAG and start it at a different date than the one intended for production. You also might want to back-fill starting i.e. at an earlier date. |
default_args | You might want to override default_args values when executing a DAG, for instance, you want to pass your username as the owner when testing.
|
hql_path/artifact_path | When testing, the query/artifact that you are using might not be deployed to the production cluster yet. So you might want to override the production configuration with a temporary run-time value. |
destination_table/destination_path | When testing, you don’t want to write on top of production data, rather write to a temp location, like your home folder. |
In the end, you can make any property overridable, it’s up to you to decide.
How to make them overridable?
VariableProperties works with Airflow Variables (hence the name). It will look for property overrides in the Airflow Variable that you specify. If it finds an override, it will apply it; otherwise, it will apply the property production value. Usually you would specify an Airflow DAG configuration like this:
some_property = some_production_value
another_property = another_production_value
To make these properties overridable you must do:
var_props = VariableProperties(your_variable_name)
some_property = var_props.get(some_property_key, some_production_value)
another_property = var_props.get(another_property_key, another_production_value)
How to override them at run-time?
To override properties at run-time you have to create an Airflow variable. Its name must be the same you specified when creating the VariableProperties object in your DAG code. Its value must be a JSON string containing a map, i.e. {“some_property_key”: “some_value”, “another_property_key”: “another_value”}
. You can create an Airflow Variable in 2 ways: Using the Airflow UI or defining an environment variable in the command line. The latter is useful if you’re testing your DAG using the command line (airflow dags test <DAG_ID> <EXECUTION_DATE>
…).
Create an Airflow Variable using UI
Click on Admin > Variables
.
Add a new record. Key is the variable name (same as you specified in the DAG code), Val is the value (JSON string).
Create an Airflow Variable using environment variables
Run export AIRFLOW_VAR_<your_variable_name>=<JSON string>
in your command line.
The name of the environment variable must be the Variable name you chose in the DAG code, with the AIRFLOW_VAR_
prefix. The value must be the mentioned JSON string. e.g. myjob_config={"start_date":"2022-04-20"}.
Handy methods
Most properties are strings or numbers. These are handled fine by Airflow Variables. However, some Airflow properties are expected to be datetimes, timedeltas or dictionaries. For those particular cases, you can use the following getters:
var_props.get_datetime("start_date", datetime(2022, 4, 1))
var_props.get_timedelta("delay", timedelta(days=2))
var_props.get_merged("default_args", default_args)
These methods will ensure that the default value passed as a second parameter is of the expected type, and also that the JSON override specified in the Variable matches the expected type. For datetime and timedelta JSON values, use ISO8601 format i.e. 2022-04-01T00:30:15
for datetimes and i.e. P3M2D
for timedeltas. On the other hand, get_merged will merge the dictionary parsed from the JSON string on top of the default value dictionary passed as a second parameter.
Artifacts
Our Airflow setup tries to make it easy to pull and use code dependencies from DAGs. It expects those dependencies to be packaged into maven JAR files or Python virtual envs. This section assumes you already have an artifact available in a public registry, like WMF’s Archiva or in a GitLab package registry. For instructions on how to put together a Gitlab Python project that generates artifacts, see Analytics/Systems/Airflow/Developer guide/Python Job Repos. To use the artifact from a DAG, you need 2 things:
Define your artifact in artifacts.yaml
Add a snippet that identifies your artifact and tells Airflow where to find it to the artifacts.yaml file in your instance’s config folder:
artifacts:
# No source declared, so this uses the default URL based source: id is a URL to a package in GitLab.
example-job-project-0.15.0.conda.tgz:
id: https://gitlab.wikimedia.org/repos/data-engineering/example-job-project/-/package_files/487/download
# From wmf_archiva_releases, for which the id should be a maven coordinate.
refinery-job-0.1.23-shaded.jar:
id: org.wikimedia.analytics.refinery.job:refinery-job:jar:shaded:0.1.23
source: wmf_archiva_releases
The first key is the artifact name, you will use it to reference the artifact from your DAG. In the case of Maven JAR files the id
is the Maven coordinate uniquely identifying the artifact. The source
is the artifact registry where your artifact lives. Read airflow-dags/wmf_airflow_common/config/artifact_config.yaml
to see all configured sources. For a more detailed documentation of the artifact configuration, see: https://gitlab.wikimedia.org/repos/data-engineering/workflow_utils/-/tree/main/#artifact-module
(Note that the artifact will be cached by its name, so you should probably make sure the artifact name has the appropriate file extension, e.g. .jar or .tgz.)
Reference your artifact from your DAG
To reference your artifact from a DAG you can use the wmf_airflow_common.artifact.ArtifactRegistry
module. To initialize an ArtifactRegistry object you need to do:
artifact_registry = ArtifactRegistry.for_wmf_airflow_instance("your_instance_name")
The resulting artifact registry object provides a method that will return the location (url) of the artifact specified by the artifact_name, like so:
artifact_registry.artifact_url("refinery-job-0.1.23-shaded.jar")
So that you don’t have to create an artifact_registry object in each DAG, the dag_config.py
file in your instance’s config folder already does it. It also prepares some syntactic sugar to shorten your code. Ultimately, in your DAG code, whenever you want to get the path to one of the artifacts you specified in the artifacts.yaml
file, you should do:
from your_instance.config.dag_config import artifact
path_to_artifact = artifact("artifact_name")
refinery_job_jar_path = artifact("refinery-job-0.1.23-shaded.jar")
Deployment
Once your artifact is declared in your instance's artifact.yaml, you will need to deploy airflow-dags to your airflow instance. Scap will copy the artifact from it's source to configured cache locations, usually in HDFS. Once there, the artifact can be used from its cached location.
If you want to force sync, you can use the artifact-cache
CLI includued in the airflow conda environment via workflow_utils. E.g.
/usr/lib/airflow/bin/artifact-cache status \
/srv/deployment/airflow-dags/<your_instance_name>/wmf_airflow_common/config/artifact_config.yaml \
/srv/deployment/airflow-dags/<your_instance_name>/<your_instance_name>/config/artifacts.yaml
How it works
Whenever Airflow is deployed to one of our instances, scap will launch a script that reads the artifacts.yaml file, pulls all artifacts from their sources and caches them by name in a predefined location (usually HDFS). When your DAG uses ArtifactRegistry (or artifact(‘...’)
) to reference an artifact, Airflow will use the same logic scap deployment used to determine the cached location of the artifact and return that. This way the developer doesn’t need to handle artifact paths in HDFS.
Filters
Airflow allows you to use Jinja2 to templatize some fields when writing a DAG. It also provides a set of macros you can use to generate dynamic DAG parameters. The most used macro is probably execution_date
. If your DAG has a @weekly
schedule_interval, or it has several heterogeneous data sources, you might need to modify that execution_date
to adapt to your needs. For that, you can use a set of filters provided by the wmf_airflow_common.templates
module. You can use them like so:
"{{ execution_date | to_ds_month }}" # returns the execution_date formatted like YYYY-MM
"{{ execution_date | start_of_current_week }}" # snaps the execution date to the start of its week
"{{ execution_date | end_of_next_day | add_hours(2) }}" # snaps the execution date to the end of next day and adds 2 hours
See a full list of all available custom filters here: https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/templates/time_filters.py
Don’t forget
For the filters to work, you need to pass them explicitly to your DAG, like this:
from wmf_airflow_common.templates.time_filters import filters
with DAG(
…
user_defined_filters=filters,
) as dag:
…
Conventions & good practices
The following paragraphs can be used as a good-practices guide to write DAGs. However, the advice here is very Data-Engineering centric. This is the way Data Engineering have decided/agreed to write our DAGs. It is what we think is best now, but it might not be what you need for your use case. Also, we might change it in the future. See if it makes sense for you!
Task ids
Task ids (in Operators and Sensors) should describe actions i.e. process_certain_dataset
or wait_for_dataset_partitions
, if possible starting with a verb and following with an object. It is a good idea to be explicit about which data is the object of the action. The task id should be unique within the DAG. Also, use snake_case (all lowercase with underscores as word separators).
DAG ids
Use snake_case for DAG ids, too (all lowercase with underscores as word separators). Also, DAG names should be unique within the Airflow instance. Naming DAGs is hard, and it’s difficult to find a formula that fits all cases. But here’s some ideas of what a DAG id could contain: The name of the resulting dataset, the granularity of the job (if there are several jobs with different granularities for the same dataset), and the tool that the DAG uses to process or store the resulting dataset (i.e. druid or anomaly_detection
). Avoid very generic words like data
, table
or dag
, which don’t give much information. Finally, it’s a good idea to match the DAG file name and DAG id, so if the DAG file is blah_blah_dag.py
the id should be blah_blah
(without the _dag.py
suffix).
DAG file names
DAG file names should be the same as DAG ids, except they will have a _dag.py
suffix. It’s a good idea to include any directory levels the DAG file might live in into the file name. For instance, if the DAG lives in dags/druid/monthly/
and we want to name it blah_blah
, then a good file name would be druid_monthly_blah_blah_dag.py
. Including the directory tree namespace in the DAG name (and in the DAG id) helps identifying the DAG in the UI, and when receiving email alerts.
Variable properties
The name of the variable used for a VariableProperties object should match the DAG id with an extra _config
suffix. For example, if the DAG id is druid_monthly_navtiming
, the variable for VariableProperties should be named druid_monthly_navtiming_config
.
Tags
Use snake_case for tags as well. It’s very difficult to scope which are good tags to add to a DAG. But some general ideas are: Don’t use concepts that are already displayed elsewhere i.e. the name of the generated dataset, which should be in the DAG ID; Don’t use concepts that are going to be shared by most DAGs, like data
, computation
, etc. Don’t use concepts that are most likely only going to be used by a single DAG, like wikidata_aggregation_for_metrics_project_2024
.
In the analytics Airflow instance we follow this convention:
- Add a tag for the DAG's schedule, i.e.:
hourly
,daily
,weekly
,monthly
, etc. - Add tags to indicate the datastore of each of the data sources, and use the
from_
prefix, like:from_hive
,from_hdfs
,from_druid
, etc. - Add tags to indicate the datastore of each of the generated datasets, and use the
to_
prefix, i.e.:to_iceberg
,to_cassandra
,to_hive
, etc. - Add a tag for each dataset required to compute the job, and use the
requires_
prefix, for example:requires_wmf_webrequest
,requires_event_navigationtiming
, etc. - Add a tag for each technology used to compute the job, and use the
uses_
prefix, like:uses_hql
,uses_spark
,uses_archiver
, etc.
If you're developing for another instance different from analytics, feel free to ignore this convention!
Schedule interval
Use the predefined schedule intervals whenever possible (use @daily
instead of 0 0 * * *
). For weekly DAGs, note the preset @weekly
starts the DAGs on Sunday. If you want to start your DAG on Monday, use 0 0 * * 1
. If possible, all your weekly DAGs should start on the same day (either Sunday or Monday).
SLAs
Specify SLAs at the DAG level using default_args={..., sla=timedelta(days=3), ...}
(change days=3
to your preferred timedelta). There's a bug in Airflow 2.1 that fails registering SLAs whenever only part of the DAG elements have an SLA defined, so we can not use SLAs at the task level for now. When we migrate to Airflow 2.3, we'll be able to specify them at the task level (passing the parameter sla=timedelta(days=3)
to the corresponding Operator). Note that (as opposed to with Oozie) in Airflow SLA deltas start counting when the dag_run is triggered, which is after the whole logical period has passed. So, an Oozie SLA of 30 hours for a daily job translates to an Airflow SLA of 6 hours (30 - 24).
Timeouts
For now, we advise not to use timeout parameters such as dagrun_timeout
, execution_timeout
or timeout
(Sensors). The reason is we already have two measures in place that perform similar actions as the timeout parameters. Those are SLAs, which alert whenever a DAG is late; and mode=reschedule
Sensors, which prevent Airflow slots from being blocked by waiting Sensors. In the interest of reducing cognitive load and keeping DAGs as simple as possible, let's not use them, unless there's a specific reason to do so. See: https://phabricator.wikimedia.org/T317549
Artifact versioning
In some cases, multiple versions of the same artifact are going to be specified in your artifacts.yaml
file. To prevent having lots of different artifact versions (one for each new DAG), let’s use this convention: Whenever we are modifying an existing DAG or adding a new one, let’s try to update their artifact versions to the latest one already available in the artifacts.yaml
config. If the latest available version of the artifact does not implement the necessary code, only then let’s add a new entry in the artifacts.yaml
file.
DAG docs
Airflow DAGs accept a doc_md
parameter. Whatever you pass it through that parameter, will show up in the Airflow UI in a convenient collapsible widget. The idea is to reuse the Python docstring (triple quote comment) at the top of the DAG file, and pass it to the DAG using:
with DAG(
doc_md=__doc__,
…
) as dag:
…
You can use Markdown for a neat presentation in the Airflow UI. NOTE: Please, list all overridable properties (for VariableProperties) of your DAG in the top docstring.
Alert email
It is very important to configure an alert email for your DAG so that you get notified of failures. This should be a mailing list for your team rather than a personal email to ensure continuity if you're not here.
If your mailing list is a wikimedia.org Google Group, you will need to ensure that "noreply@wikimedia.org" can send to the group. It's not considered part of the "entire organization", so you can either manually add it as a member or change the group settings so that "anyone on the web" can post.
Start date
The start_date should match the start of the schedule_interval
. For instance, if the schedule_interval
is @daily
, the start_date
should always point to the start of a day. If the schedule_interval
is @monthly
, the start_date
should point to the start of a month, etc.
Sensor poke_interval
It is very important to specify the parameter poke_interval
for all Sensors. The poke_interval
is the frequency at which Airflow checks whether the Sensor’s target is available. A too small poke_interval
on a large enough set of Sensors can easily saturate Airflow. The value you will set depends on the urgency of the job, and also on its granularity. If you don’t know which value to use, you can use the following rule of thumb:
Hourly | poke_interval=timedelta(minutes=5).total_seconds()
|
Daily | poke_interval=timedelta(minutes=20).total_seconds()
|
Weekly | poke_interval=timedelta(hours=1).total_seconds()
|
Monthly | poke_interval=timedelta(hours=3).total_seconds()
|
Sensor timeout
Sensors have another important parameter: timeout
. It tells Airflow when it should stop trying to check for the Sensor’s target availability. When the schedule_interval
of your DAG is significantly greater than the time the Sensors will normally wait, then we should indicate the Sensor’s timeout
. For example, if our DAG is @monthly
, and we expect our Sensors to wait for just one or two days, we should set a timeout to prevent Airflow from checking the target during weeks. The value of the timeout
should be between the DAG’s SLA and its schedule_interval
. Use seconds to specify a Sensor’s timeout, i.e. timeout=timedelta(days=3).total_seconds()
.
Config and context
As opposed to Oozie (which completely separates configuration properties from DAG structure), Airflow’s proposal is to put both configuration properties and DAG structure in the same file. This reduces the boilerplate a lot and makes code shorter, provided common structures are factored out into custom Operators, Sensors and TaskGroups. Another advantage is that configuration properties can be side by side with their context. Consider avoiding this style:
property_1 = value_1
property_2 = value_2
property_3 = value_3
property_4 = value_4
…
with DAG(
…
) as dag:
op1 = Operator(
property_1=property_1,
property_2=property_2,
)
op2 = Operator(
property_3=property_3,
property_4=property_4,
)
Which separates the configuration from its context, and consider using this style:
with DAG(
…
) as dag:
op1 = Operator(
property_1=value_1,
property_2=value_2,
)
op2 = Operator(
property_3=value_3,
property_4=value_4,
)
An exception to this rule can be when a property is used in more than one place, then by defining it at the top, we make core more DRY.
TaskGroup factories vs. DAG factories
In some cases we might want to factor out a common sub-DAG into a factory that can be reused in multiple jobs. At first we tried implementing DAG factories that would accept a collection of DAG parameters and return a finished “closed-box” DAG. This approach is OK but has some disadvantages: Since the resulting DAG is closed, it can never fulfill all corner cases of various jobs and it can not be adapted to their special needs. Instead, we agreed that implementing TaskGroup factories was a better approach. TaskGroups are sub-DAGs that can be treated as tasks (you can define dependency relations between them and other tasks or TaskGroups). This way factories become much more flexible and powerful.
Linting & auto-formatting
After writing your code, you should:
- format it with black & isort
- then make sure it passes flake8 and mypy checks
# Setup your local environment if you haven't already:
./dev_launcher.sh build
# Auto format your code with:
./dev_launcher.sh format
# Run linting checks:
./dev_launcher.sh lint
Alternatively, you could use the pre-commit tool to check your code just before you locally commit.
wmf_airflow_common Sensors and Operators
The WMF Airflow common library provides a set of custom Sensors and Operators tailored to our Hadoop setup. Here’s a quick overview. For more detail, see the custom Sensors and Operators code. IF WMF’s custom sensors do not implement what you need, it means there’s already a native Airflow sensor you can use, see them here. TODO: Move this Sensors and Operators documentation to airflow-dags repository, using some automatic documentation generator, like Sphinx.
Sensors
RangeHivePartitionSensor
This Sensor dynamically generates a range of Hive partitions to be checked. Its main advantage is that it renders the list of partitions at run-time, after Airflow macros (like execution_date
) have been resolved. This allows, for instance, to generate a set of hourly partitions for a given weekly interval. Be careful though, this Sensor does not implement the necessary interfaces to be considered a SmartSensor by Airflow, so it’s less efficient than regular Airflow Sensors. If you can, you should use Airflow’s NamedHivePartitionSensor instead.
sensor = RangeHivePartitionSensor(
task_id="wait_for_some_dataset",
table_name="some_db.some_table",
from_timestamp="2022-02-10",
to_timestamp="2022-02-13",
granularity="@hourly",
)
The resulting list of partitions would be:
some_db.some_table/year=2022/month=2/day=10/hour=0 some_db.some_table/year=2022/month=2/day=10/hour=1 some_db.some_table/year=2022/month=2/day=10/hour=2 … some_db.some_table/year=2022/month=2/day=10/hour=23 some_db.some_table/year=2022/month=2/day=11/hour=0 some_db.some_table/year=2022/month=2/day=11/hour=1 … some_db.some_table/year=2022/month=2/day=12/hour=22 some_db.some_table/year=2022/month=2/day=12/hour=23
daily_partitions
This is not a Sensor, it’s just a helper, but it will always be used in combination with a Sensor. It returns a list of Hive partitions for a day of data, the partitions can be @hourly
or @daily.
sensor = NamedHivePartitionSensor(
task_id="wait_for_some_dataset",
partition_names=daily_partitions(
table="some_db.some_table",
granularity="@hourly",
),
)
The results of the call to daily_partitions
would be:
some_db.some_table/year={{execution_date.year}}/month={{execution_date.month}}/day={{execution_date.day}}/hour=0 some_db.some_table/year={{execution_date.year}}/month={{execution_date.month}}/day={{execution_date.day}}/hour=1 some_db.some_table/year={{execution_date.year}}/month={{execution_date.month}}/day={{execution_date.day}}/hour=2 … some_db.some_table/year={{execution_date.year}}/month={{execution_date.month}}/day={{execution_date.day}}/hour=22 some_db.some_table/year={{execution_date.year}}/month={{execution_date.month}}/day={{execution_date.day}}/hour=23
Note that the resulting partitions are templated and use Airflow’s execution_date
macro. The generation of mentioned partitions happens at DAG interpretation time, which is more efficient than with RangeHivePartitionSensor.
RESTExternalTaskSensor
This sensor enables DAG developers to rely on DAG/task group/task status changes occurring in another Airflow instance. Sharing its sensing heuristic with Airflow's original ExternalTaskSensor, this implementation enables us to cross the boundaries of a single instance and reach DAGs in any of WMF's Airflow instances.
Python usage
from wmf_airflow_common.sensors.rest_external_task import RestExternalTaskSensor
sensor = RestExternalTaskSensor(
external_instance_uri:str, # URL to the other instance's REST API, e.g. http://an-launcher1002.eqiad.wmnet:8600
external_dag_id:str, # The dag_id that contains the task you want to wait for. (templated)
external_task_id:str|None, # The task_id that contains the task you want to wait for. (templated)
external_task_ids:Collection[str]|None, # The list of task_ids that you want to wait for. (templated) If None (default value) the sensor waits for the DAG. Either external_task_id or external_task_ids can be passed to ExternalTaskSensor, but not both.
external_task_group_id:str|None, # The task_group_id that contains the task you want to wait for. (templated)
allowed_states:Iterable[str]|None, # Iterable of allowed states, default is ['success']
skipped_states:Iterable[str]|None, # Iterable of states to make this task mark as skipped, default is None
failed_states:Iterable[str]|None, # Iterable of failed or dis-allowed states, default is None
execution_delta:datetime.timedelta|None, # time difference with the previous execution to look at, the default is the same logical date as the current task or DAG. For yesterday, use [positive!] datetime.timedelta(days=1). Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.
execution_date_fn:Callable|None, # function that receives the current execution’s logical date as the first positional argument and optionally any number of keyword arguments available in the context dictionary, and returns the desired logical dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.
check_existence:bool, # Set to True to check if the external task exists (when external_task_id is not None) or check if the DAG to wait for exists (when external_task_id is None), and immediately cease waiting if the external task or DAG does not exist (default value: False).
poll_interval:float, # polling period in seconds to check for the status
deferrable:bool, # Run sensor in deferrable mode
)
Of all possible arguments, only external_instance_uri
and external_dag_id
are required. If your DAG should depend on a specific task id or a task group id that can be provided, too - but only one at the time.
Examples:
from wmf_airflow_common.sensors.rest_external_task import RestExternalTaskSensor
# Valid
sensor = RestExternalTaskSensor(
external_instance_uri="http://an-launcher1002.eqiad.wmnet:8600",
external_dag_id="automoderator_monitoring_snapshot_daily",
)
# Valid
sensor = RestExternalTaskSensor(
external_instance_uri="http://an-launcher1002.eqiad.wmnet:8600",
external_dag_id="automoderator_monitoring_snapshot_daily",
external_task_id="purge_automoderator_daily_snapshots",
)
# Invalid, task_id and task_group_id both provided
sensor = RestExternalTaskSensor(
external_instance_uri="http://an-launcher1002.eqiad.wmnet:8600",
external_dag_id="automoderator_monitoring_snapshot_daily",
external_task_id="purge_automoderator_daily_snapshots",
external_task_group_id="fetch_snapshots",
)
Datasets config option
Apart from directly instantiating the sensor in Python code, we also provide an option of declaring the DAG/task/task_group producing a specific dataset as configured in datasets.yaml
. Once a dataset is configured with a producer DAG, retrieving a sensor becomes much easier:
iceberg_wmf_dumps_wikitext_raw:
datastore: iceberg
table_name: wmf_dumps.wikitext_raw_rc2
produced_by:
airflow:
instance: analytics
dag_id: table_maintenance_iceberg_daily
sensor = dataset("iceberg_wmf_dumps_wikitext_raw").get_sensor_for(dag)
For a list of supported Airflow instances and their labels to be referred by in datasets.yaml
, please go to instance_properties.py
Aligning the schedules of two DAGs
Depending on another DAG's state unfortunately involves awareness of the different schedules the two DAGs are running on, and potentially applying the time delta between the two. If both DAGs are simply declared to be running @daily
, they will both run each day with the same execution_date
of 12:00AM midnight and no additional alignment would be necessary. If one of the DAGs is running @daily
but is set to run at 5AM in the morning, then in order for the external task sensor to be able to find that DAGs run when it executes on midnight it needs to be aware of the time delta and apply it.
That's what execution_delta:datetime.timedelta|None
and execution_date_fn:Callable
optional arguments are for.
In the scenario described above, if DAG A runs at midnight and DAG B runs at 5AM, if DAG B needs to sense on completeness of DAG A then it would construct its sensor as such:
from wmf_airflow_common.sensors.rest_external_task import RestExternalTaskSensor
sensor = RestExternalTaskSensor(
external_instance_uri="http://an-launcher1002.eqiad.wmnet:8600",
external_dag_id="automoderator_monitoring_snapshot_daily",
execution_delta=timedelta(hours=-5),
)
# or, using dataset config:
sensor = dataset("dataset_from_dag_A").get_sensor_for(dag, execution_delta=timedelta(hours=-5))
URLSensor
This sensor waits for a given URL to exist. For instance, it waits for a file to exist in a file system.
hdfs_sensor = URLSensor(
task_id="wait_for_some_dataset",
url="/wmf/data/wmf/some/dataset/snapshot={{ ds }}/_SUCCESS",
)
If execution_date
was 2022-01-01
This sensor would wait for the following URL to be present:
/wmf/data/wmf/some/dataset/snapshot=2022-01-01/_SUCCESS
Wiki partitions
A success file is being written when a spark job successfully completes. Larger datasets (e.g. often also partitioned by wiki_db) are generated using multiple batched spark jobs, e.g. writing data to a wiki_db partition directly, in which case there are multiple success files (possibly in sub-directories). The _PARTITIONED is a wmf flag file to indicate a specific snapshot is complete (e.g. all batched jobs have successfully written data).
So if a dataset uses a _PARTITIONED file (e.g. many wmf_raw.mediawiki_*
tables), it is the one to wait for, not the _SUCCESS file as mentioned above.
hdfs_sensor = URLSensor(
task_id="wait_for_some_dataset",
url="/wmf/data/wmf/some/dataset/snapshot={{ ds }}/_PARTITIONED",
)
Data retention
A naive data retention strategy for Hive tables is implemented in util.drop_older_than
. This methods returns a BashOperator task that execute's refinery drop_older_than script. This capability is experimental and available only in instances that bundle the refinery repo.
Example:
# Delete webrquest_frontend partitions older than seven day,
# relative to the current dag instance execution time.
drop_webrequest_frontend_raw = drop_older_than(
operator_config={"task_id": "drop_webrequest_frontend_raw"},
database="wmf_staging_raw",
older_than_days=7,
tables=["webrequest"],
checksum="dbc64f19c5acc0c02e01f20fde10f67b",
)
Documentation can be found in the method docstring.
Operators
Spark Operators
As of Sep 26, 2022, any Spark operator will run on top of Spark3. If you need your job to run on Spark2, please add the following configuration. You can add this at the DAG level, or at the Spark Operator level:
env_vars={"SPARK_CONF_DIR": "/etc/spark2/conf"}
spark_binary="/usr/bin/spark2-submit"
SparkSQLOperator
This operator executes a query in SparkSQL using a subclass of org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
. It allows us to pass in query_parameters to correctly populate the application arguments.
task_name = SparkSqlOperator(
task_id="task_name",
sql="hql_directory/path_to_hql_file.hql",
query_parameters={
"some_parameter_key": some_parameter_value,
"another_parameter_key": another_parameter_value
},
sla=timedelta(hours=6),
)
The SparkSqlOperator can take HTTPS URLs on the sql parameter. This is very convenient, since we can combine this with GitLab's raw rendering API to construct operators like so:
SparkSqlOperator(
task_id="do_hql",
# To run an HQL file, simply use a GitLab's raw URI that points to it.
# See how to build such a URI here:
# https://docs.gitlab.com/ee/api/repository_files.html#get-raw-file-from-repository
# We strongly recommend you use an immutable URI (i.e. one that includes an SHA or a tag) for reproducibility.
sql=var_props.get(
"hql_gitlab_raw_path",
"https://gitlab.wikimedia.org/api/v4/projects/1261/repository/files/test%2Ftest_hql.hql/raw?ref=0e4d2a9"
),
query_parameters={
"destination_directory": f"/tmp/xcollazo_test_generic_artifact_deployment_dag/{{{{ts_nodash}}}}",
"snapshot": "2023-01-02",
},
launcher="skein",
)
The operator above pulls the HQL file from GitLab and executes it. This effectively allows users that are interested in running SQL to keep it in their respective repositories without the need to do any artifact creation or declaration, and without depending on Data Eng's release cadence for the refinery component.
Do please note that, for reproducibility, you should use an immutable URI (i.e. one that includes a git SHA or a tag). This way, we can correlate a particular SQL file to a particular commit on the airflow-dags repository, making debugging issues more tractable.
SparkSubmitOperator
This custom operator uses the wmf_airflow_common
SparkSubmitHook to extend the functionality of the Airflow SparkSubmitOperator, providing us with more features such as the ability to launch spark jobs via skein. For this operator, we must set the application parameter to load the artifact that contains the spark job to be run and also specify the java class.
task_name = SparkSubmitOperator(
task_id="task_name",
application=artifact("artifact_name"),
java_class="custom_java_class_name",
application_args={ "--argument_name": argument_value },
sla=timedelta(days=7)
)
URLTouch Operator
This operator is useful for writing SUCCESS flags or any other files. It is the bridge between jobs that are still on Oozie and jobs that have been migrated to Airflow. Note that this operator only works with URLs that are supported by fsspec on writable file systems.
success = URLTouchOperator(
task_id="task_name",
url=(dag_config.hadoop_name_node + destination_path + "partition/_SUCCESS"),
)
BashOperator for Python scripts
Experimental support for executing python scripts is provided via a wrapper (decorator) in wmf_airflow_common.operator.python.python_script_executor
. Example
@python_script_executor("/path/to/script.py")
def my_task(param1: str, param2: int):
return ["--param1", param1, "--param2", str(param2)]
task = my_task(
operator_config={"task_id": "my_task"},
param1="value",
param2=42
)
Usage documentation can be found in the method docstring.
Testing
To build anything, you first need Kerberos development files. Install this with:
# Ubuntu
sudo apt install libkrb5-dev
Unit tests
Setting up the environment
First install Miniconda on your system. Then, in your DAG folder, create your virtual environment with the following:
./dev_launcher.sh build
If the installation process starts downloading multiple versions of some software, it probably means a previous step has failed, and you should troubleshoot why. On Debian, I had to install dependencies for SASL (see https://pypi.org/project/sasl/0.1.3/) and Kerberos (https://github.com/apple/ccs-pykerberos/issues/66#issuecomment-838285162).
Before running the tests, make sure you have your PYTHONPATH
environment variable correctly set:
export PYTHONPATH=.:./wmf_airflow_common/plugins
And now tests can be run with:
./dev_launcher.sh test
You could also run the tests with Docker:
./dev_launcher.sh docker_test
Definition test
This is a test to check if the DAG definition is properly read by the Airflow scheduler.
We can do it directly in python with python path/to/you/file_dag.py
.
Or by writing a unit test. See an example here: https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/tests/analytics/aqs/aqs_hourly_dag_test.py
Testing the DAG itself
As we try to keep the logic outside of our DAGs, the tests are mainly for sanity checks. Three noteable use cases:
- Check the interpolations of the dates generated with Jinja
- Check the generation of the partitions lists / the sensors
- Check the global number of tasks in the DAG
Testing the libraries
In our project, libraries live in wmf_airflow_common
. This is where we define custom operators and other helpers. Those files are important to unit test.
On-Disk Fixtures
There exist generic tests that run against all dags in the repository that render the skein specification that will be submitted to the cluster and write it to a file in the repository. These files help authors and reviewers to verify that changes to dags and/or shared code result in the expected changes to the jobs submitted to the cluster. When one of these fixtures fails in the test suite the expected course of action is to delete all fixtures (find tests -name \*.expected -exec rm '{}' \;
) and re-run the pytest suite with the environment variable REBUILD_FIXTURES=yes
set. This will rebuild the fixtures and allow the developer to review the differences in git diff
. If all changes are as expected the changed fixture files should be commited with the code changes that changed them.
These fixtures can make some commits a bit verbose when attempting to spelunk through history for particular changes. The fixtures can be ignored from git log with some special syntax. Only the portion after the --
is important and it can apply equally to -p
, -Sfoo
and other git log modes: git log -1 -p -- . ':(exclude,glob)tests/*/fixtures/**'
Developing on macOS
Instead of running ./dev_launcher.sh build
you will need to use a Docker container and the Docker-based commands documented in https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags#running-tests
Contrary to the README, you do not need to install Homebrew or the listed dependencies – since the environment will be inside the Docker container.
To generate the on-disk test fixtures, you would delete the fixtures as usual via find tests -name \*.expected -exec rm '{}' \;
and then run the Docker-based pytest
command with REBUILD_FIXTURES=yes
:
docker run \
--platform linux/x86_64 \
--memory="8g" \
--cpus="5.0" \
--rm -it \
--volume .:/opt/airflow-dags \
--workdir /opt/airflow-dags \
--env PYTHONPATH=".:./wmf_airflow_common/plugins" \
--entrypoint bash \
conda_env_test \
-c -p "source /srv/app/miniconda/bin/activate && conda activate airflow && REBUILD_FIXTURES=yes pytest"
You may get the following failures, which can be ignored:
tests/wmf_airflow_common/config/dag_properties_test.py:77: AssertionError =============================================== short test summary info ================================================ FAILED tests/wmf_airflow_common/config/dag_properties_test.py::test_dag_properties_with_no_prior_variable - AssertionError: expected call not found. FAILED tests/wmf_airflow_common/config/dag_properties_test.py::test_dag_properties_with_prior_variable_overrides - AssertionError: expected call not found. FAILED tests/wmf_airflow_common/config/dag_properties_test.py::test_dag_properties_with_partial_variable_overrides - AssertionError: expected call not found. FAILED tests/wmf_airflow_common/config/dag_properties_test.py::test_dag_properties_with_special_type_overrides - AssertionError: expected call not found. ===================================== 4 failed, 1328 passed, 10 skipped in 28.42s ======================================
CI
The Gilab-CI pipeline is described here: https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/.gitlab-ci.yml
- Build container images: in this optional first step, we are building container images with Kokkuri stored in the Rel-Eng image registry. The Gitlab runner later uses those images to execute the tests or the linting checks.
- Unit tests: pytest is run against the code. You will find most of the configuration in pyproject.toml
- Linting
- flake8 checks for syntax errors and style errors (PEP8)
- mypy checks for type errors
- isort standardize the import statements
- black standardize our code styles
- Conda environment debianized build: it builds the Debian package containing the conda environment with all the project dependencies. The package is stored in the GitLab registry and deployed to our clusters manually.
Development instance
We may run a custom dev instance in the cluster (e.g., stat1004, stat1007) . Handy for testing your code in production.
Limits
- There's only one process to do the job of the worker and the scheduler’s job.
Setup
Git clone your dag repo on a stats box.
Then run ./run_dev_instance.sh
.
It’s going to create a virtual environment, a sqlite DB, and launch the Airflow processes.
Open an ssh tunnel to access the UI as described in the script logs.
When running this script with sudo, please be aware that the home directory managed by the -m
parameter should avoid the use of /tmp/$USER
path. The reason for this is that the sudo user will own the home directory and this path is also used for a spark scratch directory. If that scratch directory is owned by a different user, spark will fail to run, as seen in task T380494.
Input/output override procedure
Make sure to unpause your dag only when the output of the script has been overridden by the following procedure. Use custom variable properties to configure your DAG for test. E.g. var to override:
start_date
end_date
output_dir
/output_table
tmp_dir
Analytics test Cluster
Limits:
- The lack of data
- The barrier to production (also, you can’t break production)
- The small size of the cluster
- Some datasets are subsets of the original with different partition names
How to deploy
As simple as a scap deploy from the deployment machine
- https://wikitech.wikimedia.org/wiki/Analytics/Systems/Airflow
- https://wikitech.wikimedia.org/wiki/Analytics/Systems/Airflow/Instances
You may deploy a custom branch:
cd /srv/deployment/airflow-dags/analytics_test
git fetch
scap deploy -r origin/my_branch
Use cases
- Canary for production
- Deploying custom branch
Airflow Debian package upgrade process
Update the conda environment
- Edit
conda-environment.yml
to edit the versions you want to use - Create the lock file with
generate_conda_environment_lock_yml.sh
- Check that it is can properly generate a conda environment with
generate_conda_environment_lock_yml.sh
- Locally use & test the conda environment with:
./dev_launcher.sh build && ./dev_launcher.sh test
Build the Debian package
- Once you are satisfied with your Conda environment, verify and update the version reference in
.gitlab-ci.yml
anddebian/Dockerfile
- Then commit and push to GitLab
- To trigger the package creation from the GitLab CI interface manually click on the publish pipeline.
- The publish pipeline is going to store the
.deb
file into the GitLab registry.
Deploy on the test cluster
- Send your new version to apt.wikimedia.org (require SRE rights)
- Connect to the test instance and pause all the DAGs. Wait for the end of any remaining executions.
- Stop the Airflow services
- Manually / Puppet update the machine holding the analytics_test instance (an-test-client1001) to install the last version of the Airflow package
- Upgrade the PG DB to the last version of airflow:
sudo -u analytics \ AIRFLOW_HOME=/srv/airflow-analytics_test \ /usr/lib/airflow/bin/airflow db upgrade
- Restart the Airflow services
- Checks everything works as expected
Deploy in production
The process is the same as for the test cluster, except that you have a growing list of Airflow instances to update:
cd puppet
ack airflow::instances hieradata/
Test deb package in a stat box
# On a stat box
ssh stat1007.eqiad.wmnet
# Download the .deb (don't forget to use the proxy)
wget -O airflow-2.6.1-py3.10-20230619_amd64.deb https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/package_files/1298/download
# Extract the archive
dpkg -X airflow-2.6.1-py3.10-20230619_amd64.deb airflow-2.6.1-py3.10-20230619_amd64
# Update all shebangs paths
./airflow-2.6.1-py3.10-20230619_amd64/usr/lib/airflow/bin/conda-unpack
# activate the unpacked environment and do your stuff
source ./airflow-2.6.1-py3.10-20230619_amd64/usr/lib/airflow/etc/profile.d/conda.sh
conda activate base
# Or launch a dev instance (See more information in run_dev_instance.sh)
git clone https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags.git
cd airflow-dags
sudo -u analytics-privatedata ./run_dev_instance.sh -m /tmp/my_home \
-b ~/airflow-2.6.1-py3.10-20230619_amd64/usr/lib/airflow/ analytics_test