Jump to content

Data Platform/Systems/Airflow/Developer guide

From Wikitech

For documentation about Apache Airflow, please refer to https://airflow.apache.org/docs/. This section is not intended to summarize or replace Airflow docs, but rather bring out some basic concepts and common gotchas.

Airflow basics

Glossary

Term Definition
DAG Directed Acyclic Graph. In Airflow it’s used as a synonym of job.
DAG run (or dag_run) The execution of a DAG for a particular time interval.
Task Unit (node) of an Airflow DAG. It’s usually a Sensor, an Operator or a TaskGroup.
Sensor Task that waits for a particular asset to be present in the specified location. For instance, it waits for a Hive partition to exist, or an HDFS path to exist. Until a Sensor doesn’t confirm the presence of its target, the subsequent tasks in the DAG are not triggered.
Operator Task that applies some workflow action, like computing a dataset, transforming some data, uploading a dataset to a data store, or creating a success file.
TaskGroup Group of tasks that can be treated as a task itself. Also, you can see a TaskGroup as an encapsulated sub-DAG.
Hive partition A division of a Hive table. In our setup, Hive tables are usually partitioned by time, so that at each new time interval (i.e. each hour/day/month/…) we add a new partition to the Hive table, containing the data that corresponds to that time interval. Partitions allow to prune the data that a query/job needs to read, thus making it more efficient. Partitions are usually named after the time period they represent, like: `database.table_name/year=2022/month=3/day=5/hour=14`.
Artifact Any external asset needed for the Airflow job to run, usually code dependencies like Maven JARs, Python virtualenvs, HQL queries, etc.
SLA Service Level Agreement. In practice, you set an SLA time on a Task. If the task instance has not finished after that time has elapsed, Airflow triggers an email alert.

DAG interpretation time vs execution time

Although Airflow DAGs are written in a single Python file, they are run in two stages: DAG interpretation stage and Task execution stage.

DAG interpretation stage

The DAG files are read and interpreted by Airflow about every minute. So, if you deploy a code change to your production instance (or testing instance), Airflow should pick it up quickly without the need of a restart. However, this stage does not execute your tasks, it simply parses the DAG and updates Airflow's internal representation of it. This means your Sensors and Operators will not be triggered yet; also, callables and templates will not be executed until the next stage. The DAG interpretation stage happens in the Airflow machine, and happens every minute for each DAG. Thus, it should be very efficient. We should i.e. avoid connecting to a database or querying HDFS within DAG files.

Task execution stage

When Airflow determines that a DAG should be triggered for a particular date, it will launch a "dag_run" and kick off the task execution stage. In this stage the DAG is not re-interpreted any more, only the Sensors and Operators you defined in your DAG will be executed. Sensors run from the Airflow machine, but they are mostly lightweight. Now, Operators are triggered from the Airflow machine, but ultimately (if done properly) they are executed in the Hadoop cluster. Before Sensors or Operators run, templated values will be resolved.

Data dependencies

Sensors are Airflow's way of defining data dependencies. Sometimes, we just want to trigger a data processing job on a timely schedule, i.e. a job that deletes data from a given place. It can run every hour, sharp. In this case, we don't need Sensors. However, most data processing jobs need to wait for input data to be present in the data store prior to doing any computation. Airflow sensors define those dependencies, and wait until the specified data is present (or other conditions!). In our case, sensors usually wait for 1 or more Hive partitions, or for 1 or more files/directories in a file system.

DAG runs

DAG Run dates

In Airflow a “dag_run” is the execution of a DAG for a particular time interval. DAG runs have a start datetime and an end datetime. Those delimit the period for which data will be processed/computed. The Airflow macro execution_date is very confusing, because of its misleading name. It is not the date a particular DAG is executed, but rather the start datetime of a dag_run. Note that the start datetime of a dag_run that processes data for 2022-05-12 is going to be the same (2022-05-12), regardless of when you execute the DAG. Thus, DAGs should always process/compute data starting at execution_date and ending at execution_date + granularity, where granularity is the DAG’s schedule_interval, namely: @hourly, @daily, @monthly, etc.

e.g. when we talk about the daily job of 2022-04-01

  • it handles the data from 2022-04-01-00:00 to 2022-04-01-23:59.999
  • it creates a new partition in hive which may be named 2022-04-01
  • its execution_date is going to be 2022-04-01-00:00
  • the dag run could be created and enqueued within Airflow at 2022-04-02-00:12
  • then the task instance may run at 2022-04-02-00:15, and complete at 2022-04-02-00:32

Concurrent DAG Runs

By default, our airflow instances are configured to run 3 instances of a dag_run per dag at a time. We have chosen to make this rule the default despite an issue with spark failling when having concurrent jobs writing to the same table (even if in different partitions) - see this task.

If your job fails due to concurrent runs (if it happens, it's most probably because you're back-filling data), one solution to prevent the issue is to add a parameter to your spark job (whether in SQL or as a spark configuration parameter):spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version = 2

This can be done in an HQL query as in:

SET spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version = 2;

INSERT INTO ...

It can also be done using spark parameters in airflow, when using the SparkSQLOperator or the SparkSubmitOperator by overriding the conf parameter as in:

myOp = SparkSqlOperator(
    ...
    conf={
        "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": 2,
        ...
    }
    ...
)

A great way to be able to change this parameter without having to deploy code is to pass configuration values as airflow variables (see the Airflow developer guide's Variable Properties section)

Important note: Changing the fileoutputcommitter comes at some cost: while allowing the job to not fail while concurrently writing to the same table, it also leads to possible consistency issues in the output folders of your spark jobs (see this task for more details).

Writing a DAG using our Airflow setup

Our Airflow setup (instances, repository and libraries) offers you a set of tools for writing your DAGs. Those are designed to make your DAG code shorter and easier to read and to test.

Importing code

For your DAG, you’ll need to import Airflow native libraries, but also you can import libraries that we have developed to be shared by all WMF Airflow users. Finally, you can also develop your own helper libraries that are specific to your project.

Airflow native libraries

The Conda environment where Airflow runs (both your production instance and the development instance) has airflow installed in it. So, to import from airflow native libraries you should just use regular imports, like:

from airflow import DAG
from airflow.providers.apache.hive.sensors.named_hive_partition import NamedHivePartitionSensor

WMF Airflow common

WMF Airflow common (wmf_airflow_common) is a library that we have developed to provide tools that are tailored to WMF’s Airflow needs. It currently lives within the airflow-dags repository, but we are considering factoring it out in the future. In the meantime, because of the way the Airflow instances’ PYTHONPATH is configured, you can import libraries from it like so:

from wmf_airflow_common.operators.url import URLTouchOperator
from wmf_airflow_common.partitions_builder import partition_names_by_granularity

Instance level imports

Each Airflow instance has its own folder in the airflow-dags repository. For example, the Analytics Airflow instance, which runs in an-launcher1002.eqiad.wmnet, has an analytics folder at the root of the airflow-dags repository. All DAGs defined within that folder will be executed in (and only in) the Analytics Airflow instance. If you want to create a library that is specific to your instance, you can add your code to your instance folder within airflow-dags, for example:

Airflow-dags
    |-- your_instance_folder
    |       |-- dags
    |       |-- config
    |       \-- your_library_code

And then, because of the way the Airflow instances’ PYTHONPATH is configured, you can import libraries from it like:

from your_instance_folder.your_library_code import your_object 
# Example for the analytics instance.
from analytics.config import dag_config

Configuration and defaults

One of the goals of our Airflow setup is to reduce the amount of boilerplate code needed to write our jobs. Our Airflow setup offers a set of WMF defaults for the most used parameters (usually well suited for our Hadoop cluster).

WMF common DAG default args

Defaults are usually passed to your DAG via the default_args parameter (a Python dictionary). All its key-value pairs will be passed to all sensors and all operators of the corresponding DAG. For example, this will set all the WMF Airflow common defaults to your DAG:

from wmf_airflow_common.config import dag_default_args

with DAG(
    default_args=dag_default_args.get(),
    
) as dag:
    

Instance level configuration

WMF Airflow common defaults are good in many cases, but there are values that need to be specified per instance. For example, the user that will run all jobs (in Airflow naming: the owner), or the email address that will receive all alerts. To do that, we advise to add instance level configuration to the WMF defaults, like so:

# dag_config.py
from wmf_airflow_common.config import dag_default_args

instance_default_args = {
    "owner": "your_user",
    "email": "your_email",
}

# This returns a new dictionary with instance defaults merged on top of wmf defaults.
default_args = dag_default_args.get(instance_default_args)

To prevent having to write this for every DAG, this is usually defined in a separate file that can be reused by all DAGs. By convention we use a file named dag_config.py within a config directory within the instance folder. Thus, when defining a DAG, we do:

from your_instance.config import dag_config

with DAG(
    default_args=dag_config.default_args,
    
) as dag:
    

Feel free to add any other instance-level properties to the dag_config.py file, so that you can access them from all DAGs. For example: hadoop_name_node, hdfs_temp_directory, etc.

DAG level configuration

If you want to override or add any new configuration to any Sensor or Operator within your DAG, just pass the configuration directly to the Sensor or Operator in question. This will override the defaults passed to the DAG’s default_args parameter. For example:

from your_instance.config import dag_config

with DAG(
    default_args=dag_config.default_args,
    
) as dag:
    sensor = SomeSensor(
        some_property=this_value_overrides_the_defaults,
        
    )
    

Variable properties

The VariableProperties module allows you to override DAG configuration at run-time. This is useful when testing your DAG or when back-filling your production DAG. By properties we mean any configuration value. DAGs are ultimately a collection of job configuration properties. The VariableProperties code has some nice documentation that can help as well: https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/config/variable_properties.py

Which properties should be overridable?

Why override properties at run-time? A DAG specifies a list of properties that point to the production environment. When testing your DAG, you don’t want Airflow to use those production configs. Instead, you want to use temporary testing configurations. Here’s a list of common properties you’ll want to make overridable:

start_date You might want to test a DAG and start it at a different date than the one intended for production. You also might want to back-fill starting i.e. at an earlier date.
default_args You might want to override default_args values when executing a DAG, for instance, you want to pass your username as the owner when testing.
hql_path/artifact_path When testing, the query/artifact that you are using might not be deployed to the production cluster yet. So you might want to override the production configuration with a temporary run-time value.
destination_table/destination_path When testing, you don’t want to write on top of production data, rather write to a temp location, like your home folder.

In the end, you can make any property overridable, it’s up to you to decide.

How to make them overridable?

VariableProperties works with Airflow Variables (hence the name). It will look for property overrides in the Airflow Variable that you specify. If it finds an override, it will apply it; otherwise, it will apply the property production value. Usually you would specify an Airflow DAG configuration like this:

some_property = some_production_value
another_property = another_production_value

To make these properties overridable you must do:

var_props = VariableProperties(your_variable_name) 
some_property = var_props.get(some_property_key, some_production_value) 
another_property = var_props.get(another_property_key, another_production_value)

How to override them at run-time?

To override properties at run-time you have to create an Airflow variable. Its name must be the same you specified when creating the VariableProperties object in your DAG code. Its value must be a JSON string containing a map, i.e. {“some_property_key”: “some_value”, “another_property_key”: “another_value”}. You can create an Airflow Variable in 2 ways: Using the Airflow UI or defining an environment variable in the command line. The latter is useful if you’re testing your DAG using the command line (airflow dags test <DAG_ID> <EXECUTION_DATE> …).

Create an Airflow Variable using UI

Click on Admin > Variables. Add a new record. Key is the variable name (same as you specified in the DAG code), Val is the value (JSON string).

Create an Airflow Variable using environment variables

Run export AIRFLOW_VAR_<your_variable_name>=<JSON string> in your command line. The name of the environment variable must be the Variable name you chose in the DAG code, with the AIRFLOW_VAR_ prefix. The value must be the mentioned JSON string. e.g. myjob_config={"start_date":"2022-04-20"}.

Handy methods

Most properties are strings or numbers. These are handled fine by Airflow Variables. However, some Airflow properties are expected to be datetimes, timedeltas or dictionaries. For those particular cases, you can use the following getters:

var_props.get_datetime("start_date", datetime(2022, 4, 1)) 
var_props.get_timedelta("delay", timedelta(days=2)) 
var_props.get_merged("default_args", default_args)

These methods will ensure that the default value passed as a second parameter is of the expected type, and also that the JSON override specified in the Variable matches the expected type. For datetime and timedelta JSON values, use ISO8601 format i.e. 2022-04-01T00:30:15 for datetimes and i.e. P3M2D for timedeltas. On the other hand, get_merged will merge the dictionary parsed from the JSON string on top of the default value dictionary passed as a second parameter.

Artifacts

Our Airflow setup tries to make it easy to pull and use code dependencies from DAGs. It expects those dependencies to be packaged into maven JAR files or Python virtual envs. This section assumes you already have an artifact available in a public registry, like WMF’s Archiva or in a GitLab package registry. For instructions on how to put together a Gitlab Python project that generates artifacts, see Analytics/Systems/Airflow/Developer guide/Python Job Repos. To use the artifact from a DAG, you need 2 things:

Define your artifact in artifacts.yaml

Add a snippet that identifies your artifact and tells Airflow where to find it to the artifacts.yaml file in your instance’s config folder:

artifacts:
  # No source declared, so this uses the default URL based source: id is a URL to a package in GitLab.
  example-job-project-0.15.0.conda.tgz:
    id: https://gitlab.wikimedia.org/repos/data-engineering/example-job-project/-/package_files/487/download

  # From wmf_archiva_releases, for which the id should be a maven coordinate.
  refinery-job-0.1.23-shaded.jar:
    id: org.wikimedia.analytics.refinery.job:refinery-job:jar:shaded:0.1.23
    source: wmf_archiva_releases

The first key is the artifact name, you will use it to reference the artifact from your DAG. In the case of Maven JAR files the id is the Maven coordinate uniquely identifying the artifact. The source is the artifact registry where your artifact lives. Read airflow-dags/wmf_airflow_common/config/artifact_config.yaml to see all configured sources. For a more detailed documentation of the artifact configuration, see: https://gitlab.wikimedia.org/repos/data-engineering/workflow_utils/-/tree/main/#artifact-module

(Note that the artifact will be cached by its name, so you should probably make sure the artifact name has the appropriate file extension, e.g. .jar or .tgz.)

Reference your artifact from your DAG

To reference your artifact from a DAG you can use the wmf_airflow_common.artifact.ArtifactRegistry module. To initialize an ArtifactRegistry object you need to do:

artifact_registry = ArtifactRegistry.for_wmf_airflow_instance("your_instance_name")

The resulting artifact registry object provides a method that will return the location (url) of the artifact specified by the artifact_name, like so:

artifact_registry.artifact_url("refinery-job-0.1.23-shaded.jar")

So that you don’t have to create an artifact_registry object in each DAG, the dag_config.py file in your instance’s config folder already does it. It also prepares some syntactic sugar to shorten your code. Ultimately, in your DAG code, whenever you want to get the path to one of the artifacts you specified in the artifacts.yaml file, you should do:

from your_instance.config.dag_config import artifact

path_to_artifact = artifact("artifact_name")
refinery_job_jar_path = artifact("refinery-job-0.1.23-shaded.jar")

Deployment

Once your artifact is declared in your instance's artifact.yaml, you will need to deploy airflow-dags to your airflow instance. Scap will copy the artifact from it's source to configured cache locations, usually in HDFS. Once there, the artifact can be used from its cached location.

If you want to force sync, you can use the artifact-cache CLI includued in the airflow conda environment via workflow_utils. E.g.

/usr/lib/airflow/bin/artifact-cache status \
  /srv/deployment/airflow-dags/<your_instance_name>/wmf_airflow_common/config/artifact_config.yaml \
  /srv/deployment/airflow-dags/<your_instance_name>/<your_instance_name>/config/artifacts.yaml

How it works

Whenever Airflow is deployed to one of our instances, scap will launch a script that reads the artifacts.yaml file, pulls all artifacts from their sources and caches them by name in a predefined location (usually HDFS). When your DAG uses ArtifactRegistry (or artifact(‘...’)) to reference an artifact, Airflow will use the same logic scap deployment used to determine the cached location of the artifact and return that. This way the developer doesn’t need to handle artifact paths in HDFS.

Filters

Airflow allows you to use Jinja2 to templatize some fields when writing a DAG. It also provides a set of macros you can use to generate dynamic DAG parameters. The most used macro is probably execution_date. If your DAG has a @weekly schedule_interval, or it has several heterogeneous data sources, you might need to modify that execution_date to adapt to your needs. For that, you can use a set of filters provided by the wmf_airflow_common.templates module. You can use them like so:

"{{ execution_date | to_ds_month }}"  # returns the execution_date formatted like YYYY-MM 
"{{ execution_date | start_of_current_week }}"  # snaps the execution date to the start of its week 
"{{ execution_date | end_of_next_day | add_hours(2) }}"  # snaps the execution date to the end of next day and adds 2 hours

See a full list of all available custom filters here: https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/templates/time_filters.py

Don’t forget

For the filters to work, you need to pass them explicitly to your DAG, like this:

from wmf_airflow_common.templates.time_filters import filters

with DAG(
     
    user_defined_filters=filters,
) as dag: 
    

Conventions & good practices

The following paragraphs can be used as a good-practices guide to write DAGs. However, the advice here is very Data-Engineering centric. This is the way we D.E. have decided/agreed to write our DAGs. It is what we think is best now, but it might not be what you need for your use case. Also, we might change it in the future. See if it makes sense for you! :-)

Task ids

Task ids (in Operators and Sensors) should describe actions i.e. process_certain_dataset or wait_for_dataset_partitions, if possible starting with a verb and following with an object. It is a good idea to be explicit about which data is the object of the action. The task id should be unique within the DAG. Also, use snake_case (all lowercase with underscores as word separators).

DAG ids

Use snake_case for DAG ids, too (all lowercase with underscores as word separators). Also, DAG names should be unique within the Airflow instance. Naming DAGs is hard, and it’s difficult to find a formula that fits all cases. But here’s some ideas of what a DAG id could contain: The name of the resulting dataset, the granularity of the job (if there are several jobs with different granularities for the same dataset), and the tool that the DAG uses to process or store the resulting dataset (i.e. druid or anomaly_detection). Avoid very generic words like data, table or dag, which don’t give much information. Finally, it’s a good idea to match the DAG file name and DAG id, so if the DAG file is blah_blah_dag.py the id should be blah_blah (without the _dag.py suffix).

DAG file names

DAG file names should be the same as DAG ids, except they will have a _dag.py suffix. It’s a good idea to include any directory levels the DAG file might live in into the file name. For instance, if the DAG lives in dags/druid/monthly/ and we want to name it blah_blah, then a good file name would be druid_monthly_blah_blah_dag.py. Including the directory tree namespace in the DAG name (and in the DAG id) helps identifying the DAG in the UI, and when receiving email alerts.

Variable properties

The name of the variable used for a VariableProperties object should match the DAG id with an extra _config suffix. For example, if the DAG id is druid_monthly_navtiming, the variable for VariableProperties should be named druid_monthly_navtiming_config.

Tags

Use snake_case for tags as well. It’s very difficult to scope which are good tags to add to a DAG. But some general ideas are: Don’t use concepts that are already displayed elsewhere i.e. the name of the generated dataset, which should be in the DAG ID; Don’t use concepts that are going to be shared by most DAGs, like data, computation, etc. Don’t use concepts that are most likely only going to be used by a single DAG, like wikidata_aggregation_for_metrics_project_2024.

In the analytics Airflow instance we follow this convention:

  • Add a tag for the DAG's schedule, i.e.: hourly, daily, weekly, monthly, etc.
  • Add tags to indicate the datastore of each of the data sources, and use the from_ prefix, like: from_hive, from_hdfs, from_druid, etc.
  • Add tags to indicate the datastore of each of the generated datasets, and use the to_ prefix, i.e.: to_iceberg, to_cassandra, to_hive, etc.
  • Add a tag for each dataset required to compute the job, and use the requires_ prefix, for example: requires_wmf_webrequest, requires_event_navigationtiming, etc.
  • Add a tag for each technology used to compute the job, and use the uses_ prefix, like: uses_hql, uses_spark, uses_archiver, etc.

If you're developing for another instance different from analytics, feel free to ignore this convention!

Schedule interval

Use the predefined schedule intervals whenever possible (use @daily instead of 0 0 * * *). For weekly DAGs, note the preset @weekly starts the DAGs on Sunday. If you want to start your DAG on Monday, use 0 0 * * 1. If possible, all your weekly DAGs should start on the same day (either Sunday or Monday).

SLAs

Specify SLAs at the DAG level using default_args={..., sla=timedelta(days=3), ...} (change days=3 to your preferred timedelta). There's a bug in Airflow 2.1 that fails registering SLAs whenever only part of the DAG elements have an SLA defined, so we can not use SLAs at the task level for now. When we migrate to Airflow 2.3, we'll be able to specify them at the task level (passing the parameter sla=timedelta(days=3) to the corresponding Operator). Note that (as opposed to with Oozie) in Airflow SLA deltas start counting when the dag_run is triggered, which is after the whole logical period has passed. So, an Oozie SLA of 30 hours for a daily job translates to an Airflow SLA of 6 hours (30 - 24).

Timeouts

For now, we advise not to use timeout parameters such as dagrun_timeout, execution_timeout or timeout (Sensors). The reason is we already have two measures in place that perform similar actions as the timeout parameters. Those are SLAs, which alert whenever a DAG is late; and mode=reschedule Sensors, which prevent Airflow slots from being blocked by waiting Sensors. In the interest of reducing cognitive load and keeping DAGs as simple as possible, let's not use them, unless there's a specific reason to do so. See: https://phabricator.wikimedia.org/T317549

Artifact versioning

In some cases, multiple versions of the same artifact are going to be specified in your artifacts.yaml file. To prevent having lots of different artifact versions (one for each new DAG), let’s use this convention: Whenever we are modifying an existing DAG or adding a new one, let’s try to update their artifact versions to the latest one already available in the artifacts.yaml config. If the latest available version of the artifact does not implement the necessary code, only then let’s add a new entry in the artifacts.yaml file.

DAG docs

Airflow DAGs accept a doc_md parameter. Whatever you pass it through that parameter, will show up in the Airflow UI in a convenient collapsible widget. The idea is to reuse the Python docstring (triple quote comment) at the top of the DAG file, and pass it to the DAG using:

with DAG(
    doc_md=__doc__,
    
) as dag:
    

You can use Markdown for a neat presentation in the Airflow UI. NOTE: Please, list all overridable properties (for VariableProperties) of your DAG in the top docstring.

Alert email

It is very important to configure an alert email for your DAG so that you get notified of failures. This should be a mailing list for your team rather than a personal email to ensure continuity if you're not here.

If your mailing list is a wikimedia.org Google Group, you will need to ensure that "noreply@wikimedia.org" can send to the group. It's not considered part of the "entire organization", so you can either manually add it as a member or change the group settings so that "anyone on the web" can post.

Start date

The start_date should match the start of the schedule_interval. For instance, if the schedule_interval is @daily, the start_date should always point to the start of a day. If the schedule_interval is @monthly, the start_date should point to the start of a month, etc.

Sensor poke_interval

It is very important to specify the parameter poke_interval for all Sensors. The poke_interval is the frequency at which Airflow checks whether the Sensor’s target is available. A too small poke_interval on a large enough set of Sensors can easily saturate Airflow. The value you will set depends on the urgency of the job, and also on its granularity. If you don’t know which value to use, you can use the following rule of thumb:

Hourly poke_interval=timedelta(minutes=5).total_seconds()
Daily poke_interval=timedelta(minutes=20).total_seconds()
Weekly poke_interval=timedelta(hours=1).total_seconds()
Monthly poke_interval=timedelta(hours=3).total_seconds()

Sensor timeout

Sensors have another important parameter: timeout. It tells Airflow when it should stop trying to check for the Sensor’s target availability. When the schedule_interval of your DAG is significantly greater than the time the Sensors will normally wait, then we should indicate the Sensor’s timeout. For example, if our DAG is @monthly, and we expect our Sensors to wait for just one or two days, we should set a timeout to prevent Airflow from checking the target during weeks. The value of the timeout should be between the DAG’s SLA and its schedule_interval. Use seconds to specify a Sensor’s timeout, i.e. timeout=timedelta(days=3).total_seconds().

Config and context

As opposed to Oozie (which completely separates configuration properties from DAG structure), Airflow’s proposal is to put both configuration properties and DAG structure in the same file. This reduces the boilerplate a lot and makes code shorter, provided common structures are factored out into custom Operators, Sensors and TaskGroups. Another advantage is that configuration properties can be side by side with their context. Consider avoiding this style:

property_1 = value_1 
property_2 = value_2 
property_3 = value_3 
property_4 = value_4 
 

with DAG(
    
) as dag: 
    op1 = Operator(
        property_1=property_1, 
        property_2=property_2,
    ) 
    op2 = Operator(
        property_3=property_3,
        property_4=property_4,
    )

Which separates the configuration from its context, and consider using this style:

with DAG( 
    
) as dag: 
    op1 = Operator(
        property_1=value_1, 
        property_2=value_2,
    ) 
    op2 = Operator(
        property_3=value_3,
        property_4=value_4, 
    )

An exception to this rule can be when a property is used in more than one place, then by defining it at the top, we make core more DRY.

TaskGroup factories vs. DAG factories

In some cases we might want to factor out a common sub-DAG into a factory that can be reused in multiple jobs. At first we tried implementing DAG factories that would accept a collection of DAG parameters and return a finished “closed-box” DAG. This approach is OK but has some disadvantages: Since the resulting DAG is closed, it can never fulfill all corner cases of various jobs and it can not be adapted to their special needs. Instead, we agreed that implementing TaskGroup factories was a better approach. TaskGroups are sub-DAGs that can be treated as tasks (you can define dependency relations between them and other tasks or TaskGroups). This way factories become much more flexible and powerful.

Linting & auto-formatting

After writing your code, you should:

  • format it with black & isort
  • then make sure it passes flake8 and mypy checks
# Setup your local environment if you haven't already:
./dev_launcher.sh build

# Auto format your code with:
./dev_launcher.sh format

# Run linting checks:
./dev_launcher.sh lint

Alternatively, you could use the pre-commit tool to check your code just before you locally commit.

wmf_airflow_common Sensors and Operators

The WMF Airflow common library provides a set of custom Sensors and Operators tailored to our Hadoop setup. Here’s a quick overview. For more detail, see the custom Sensors and Operators code. IF WMF’s custom sensors do not implement what you need, it means there’s already a native Airflow sensor you can use, see them here. TODO: Move this Sensors and Operators documentation to airflow-dags repository, using some automatic documentation generator, like Sphinx.

Sensors

RangeHivePartitionSensor

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/sensors/hive.py#L6

This Sensor dynamically generates a range of Hive partitions to be checked. Its main advantage is that it renders the list of partitions at run-time, after Airflow macros (like execution_date) have been resolved. This allows, for instance, to generate a set of hourly partitions for a given weekly interval. Be careful though, this Sensor does not implement the necessary interfaces to be considered a SmartSensor by Airflow, so it’s less efficient than regular Airflow Sensors. If you can, you should use Airflow’s NamedHivePartitionSensor instead.

sensor = RangeHivePartitionSensor( 
    task_id="wait_for_some_dataset", 
    table_name="some_db.some_table", 
    from_timestamp="2022-02-10", 
    to_timestamp="2022-02-13", 
    granularity="@hourly",
)

The resulting list of partitions would be:

some_db.some_table/year=2022/month=2/day=10/hour=0
some_db.some_table/year=2022/month=2/day=10/hour=1
some_db.some_table/year=2022/month=2/day=10/hour=2
…
some_db.some_table/year=2022/month=2/day=10/hour=23
some_db.some_table/year=2022/month=2/day=11/hour=0
some_db.some_table/year=2022/month=2/day=11/hour=1
…
some_db.some_table/year=2022/month=2/day=12/hour=22
some_db.some_table/year=2022/month=2/day=12/hour=23

daily_partitions

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/partitions_builder.py#L40

This is not a Sensor, it’s just a helper, but it will always be used in combination with a Sensor. It returns a list of Hive partitions for a day of data, the partitions can be @hourly or @daily.

sensor = NamedHivePartitionSensor(
    task_id="wait_for_some_dataset", 
    partition_names=daily_partitions( 
        table="some_db.some_table", 
        granularity="@hourly",
    ),
)

The results of the call to daily_partitions would be:

some_db.some_table/year={{execution_date.year}}/month={{execution_date.month}}/day={{execution_date.day}}/hour=0
some_db.some_table/year={{execution_date.year}}/month={{execution_date.month}}/day={{execution_date.day}}/hour=1
some_db.some_table/year={{execution_date.year}}/month={{execution_date.month}}/day={{execution_date.day}}/hour=2
…
some_db.some_table/year={{execution_date.year}}/month={{execution_date.month}}/day={{execution_date.day}}/hour=22
some_db.some_table/year={{execution_date.year}}/month={{execution_date.month}}/day={{execution_date.day}}/hour=23

Note that the resulting partitions are templated and use Airflow’s execution_date macro. The generation of mentioned partitions happens at DAG interpretation time, which is more efficient than with RangeHivePartitionSensor.

URLSensor

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/sensors/url.py#L25

This sensor waits for a given URL to exist. For instance, it waits for a file to exist in a file system.

hdfs_sensor = URLSensor(
    task_id="wait_for_some_dataset", 
    url="/wmf/data/wmf/some/dataset/snapshot={{ ds }}/_SUCCESS",
)

If execution_date was 2022-01-01 This sensor would wait for the following URL to be present:

/wmf/data/wmf/some/dataset/snapshot=2022-01-01/_SUCCESS

Operators

Spark Operators

As of Sep 26, 2022, any Spark operator will run on top of Spark3. If you need your job to run on Spark2, please add the following configuration. You can add this at the DAG level, or at the Spark Operator level:

env_vars={"SPARK_CONF_DIR": "/etc/spark2/conf"}
spark_binary="/usr/bin/spark2-submit"
SparkSQLOperator

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/bc37201a02b6a34d97a054a07f95495613ff01a3/wmf_airflow_common/operators/spark.py#L339

This operator executes a query in SparkSQL using a subclass of org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver. It allows us to pass in query_parameters to correctly populate the application arguments.

task_name = SparkSqlOperator(
    task_id="task_name", 
    sql="hql_directory/path_to_hql_file.hql", 
    query_parameters={ 
        "some_parameter_key": some_parameter_value, 
        "another_parameter_key": another_parameter_value
    }, 
    sla=timedelta(hours=6), 
)

The SparkSqlOperator can take HTTPS URLs on the sql parameter. This is very convenient, since we can combine this with GitLab's raw rendering API to construct operators like so:

SparkSqlOperator(
    task_id="do_hql", 
    # To run an HQL file, simply use a GitLab's raw URI that points to it. 
    # See how to build such a URI here: 
    # https://docs.gitlab.com/ee/api/repository_files.html#get-raw-file-from-repository 
    # We strongly recommend you use an immutable URI (i.e. one that includes an SHA or a tag) for reproducibility. 
    sql=var_props.get( 
        "hql_gitlab_raw_path", 
        "https://gitlab.wikimedia.org/api/v4/projects/1261/repository/files/test%2Ftest_hql.hql/raw?ref=0e4d2a9"
    ), 
    query_parameters={ 
        "destination_directory": f"/tmp/xcollazo_test_generic_artifact_deployment_dag/{{{{ts_nodash}}}}", 
        "snapshot": "2023-01-02", 
    }, 
    launcher="skein", 
)

The operator above pulls the HQL file from GitLab and executes it. This effectively allows users that are interested in running SQL to keep it in their respective repositories without the need to do any artifact creation or declaration, and without depending on Data Eng's release cadence for the refinery component.

Do please note that, for reproducibility, you should use an immutable URI (i.e. one that includes a git SHA or a tag). This way, we can correlate a particular SQL file to a particular commit on the airflow-dags repository, making debugging issues more tractable.

SparkSubmitOperator

https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/18182aa041ccae4290f8f35a8001eebcdb71fe44/wmf_airflow_common/operators/spark.py#L16

This custom operator uses the wmf_airflow_common SparkSubmitHook to extend the functionality of the Airflow SparkSubmitOperator, providing us with more features such as the ability to launch spark jobs via skein. For this operator, we must set the application parameter to load the artifact that contains the spark job to be run and also specify the java class.

task_name = SparkSubmitOperator(
    task_id="task_name", 
    application=artifact("artifact_name"), 
    java_class="custom_java_class_name", 
    application_args={ "--argument_name": argument_value }, 
    sla=timedelta(days=7)
)

URLTouch Operator

This operator is useful for writing SUCCESS flags or any other files. It is the bridge between jobs that are still on Oozie and jobs that have been migrated to Airflow. Note that this operator only works with URLs that are supported by fsspec on writable file systems.

success = URLTouchOperator(
    task_id="task_name",
    url=(dag_config.hadoop_name_node + destination_path + "partition/_SUCCESS"),
)

Testing

To build anything, you first need Kerberos development files. Install this with:

# Ubuntu
sudo apt install libkrb5-dev

Unit tests

Setting up the environment

First install Miniconda on your system. Then, in your DAG folder, create your virtual environment with the following:

./dev_launcher.sh build

If the installation process starts downloading multiple versions of some software, it probably means a previous step has failed, and you should troubleshoot why. On Debian, I had to install dependencies for SASL (see https://pypi.org/project/sasl/0.1.3/) and Kerberos (https://github.com/apple/ccs-pykerberos/issues/66#issuecomment-838285162).

Before running the tests, make sure you have your PYTHONPATH environment variable correctly set:

export PYTHONPATH=.:./wmf_airflow_common/plugins

And now tests can be run with:

./dev_launcher.sh test

You could also run the tests with Docker:

./dev_launcher.sh docker_test

Definition test

This is a test to check if the DAG definition is properly read by the Airflow scheduler. We can do it directly in python with python path/to/you/file_dag.py. Or by writing a unit test. See an example here: https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/tests/analytics/aqs/aqs_hourly_dag_test.py

Testing the DAG itself

As we try to keep the logic outside of our DAGs, the tests are mainly for sanity checks. Three noteable use cases:

  • Check the interpolations of the dates generated with Jinja
  • Check the generation of the partitions lists / the sensors
  • Check the global number of tasks in the DAG

Testing the libraries

In our project, libraries live in wmf_airflow_common. This is where we define custom operators and other helpers. Those files are important to unit test.

On-Disk Fixtures

There exist generic tests that run against all dags in the repository that render the skein specification that will be submitted to the cluster and write it to a file in the repository. These files help authors and reviewers to verify that changes to dags and/or shared code result in the expected changes to the jobs submitted to the cluster. When one of these fixtures fails in the test suite the expected course of action is to delete all fixtures (find tests -name \*.expected -exec rm '{}' \;) and re-run the pytest suite with the environment variable REBUILD_FIXTURES=yes set. This will rebuild the fixtures and allow the developer to review the differences in git diff. If all changes are as expected the changed fixture files should be commited with the code changes that changed them.

These fixtures can make some commits a bit verbose when attempting to spelunk through history for particular changes. The fixtures can be ignored from git log with some special syntax. Only the portion after the -- is important and it can apply equally to -p, -Sfoo and other git log modes: git log -1 -p -- . ':(exclude,glob)tests/*/fixtures/**'

Developing on macOS

Instead of running ./dev_launcher.sh build you will need to use a Docker container and the Docker-based commands documented in https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags#running-tests

Contrary to the README, you do not need to install Homebrew or the listed dependencies – since the environment will be inside the Docker container.

To generate the on-disk test fixtures, you would delete the fixtures as usual via find tests -name \*.expected -exec rm '{}' \;

and then run the Docker-based pytest command with REBUILD_FIXTURES=yes:

docker run \
  --platform linux/x86_64 \
  --memory="8g" \
  --cpus="5.0" \
  --rm -it \
  --volume .:/opt/airflow-dags \
  --workdir /opt/airflow-dags \
  --env PYTHONPATH=".:./wmf_airflow_common/plugins" \
  --entrypoint bash \
  conda_env_test \
  -c -p "source /srv/app/miniconda/bin/activate && conda activate airflow && REBUILD_FIXTURES=yes pytest"

You may get the following failures, which can be ignored:

tests/wmf_airflow_common/config/dag_properties_test.py:77: AssertionError
=============================================== short test summary info ================================================
FAILED tests/wmf_airflow_common/config/dag_properties_test.py::test_dag_properties_with_no_prior_variable - AssertionError: expected call not found.
FAILED tests/wmf_airflow_common/config/dag_properties_test.py::test_dag_properties_with_prior_variable_overrides - AssertionError: expected call not found.
FAILED tests/wmf_airflow_common/config/dag_properties_test.py::test_dag_properties_with_partial_variable_overrides - AssertionError: expected call not found.
FAILED tests/wmf_airflow_common/config/dag_properties_test.py::test_dag_properties_with_special_type_overrides - AssertionError: expected call not found.
===================================== 4 failed, 1328 passed, 10 skipped in 28.42s ======================================

CI

The Gilab-CI pipeline is described here: https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/.gitlab-ci.yml

  • Build container images: in this optional first step, we are building container images with Kokkuri stored in the Rel-Eng image registry. The Gitlab runner later uses those images to execute the tests or the linting checks.
  • Unit tests: pytest is run against the code. You will find most of the configuration in pyproject.toml
  • Linting
    • flake8 checks for syntax errors and style errors (PEP8)
    • mypy checks for type errors
    • isort standardize the import statements
    • black standardize our code styles
  • Conda environment debianized build: it builds the Debian package containing the conda environment with all the project dependencies. The package is stored in the GitLab registry and deployed to our clusters manually.

Development instance

We may run a custom dev instance in the cluster (e.g., stat1004, stat1007) . Handy for testing your code in production.

Limits

  • There's only one process to do the job of the worker and the scheduler’s job.

Setup

Git clone your dag repo on a stats box. Then run ./run_dev_instance.sh. It’s going to create a virtual environment, a sqlite DB, and launch the Airflow processes. Open an ssh tunnel to access the UI as described in the script logs.

Input/output override procedure

Make sure to unpause your dag only when the output of the script has been overridden by the following procedure. Use custom variable properties to configure your DAG for test. E.g. var to override:

  • start_date
  • end_date
  • output_dir / output_table
  • tmp_dir

Analytics test Cluster

Limits:

  • The lack of data
  • The barrier to production (also, you can’t break production)
  • The small size of the cluster
  • Some datasets are subsets of the original with different partition names

How to deploy

As simple as a scap deploy from the deployment machine

You may deploy a custom branch:

cd /srv/deployment/airflow-dags/analytics_test
git fetch
scap deploy -r origin/my_branch

Use cases

  • Canary for production
  • Deploying custom branch

Airflow Debian package upgrade process

Update the conda environment

  • Edit conda-environment.yml to edit the versions you want to use
  • Create the lock file with generate_conda_environment_lock_yml.sh
  • Check that it is can properly generate a conda environment with generate_conda_environment_lock_yml.sh
  • Locally use & test the conda environment with: ./dev_launcher.sh build && ./dev_launcher.sh test

Build the Debian package

  • Once you are satisfied with your Conda environment, verify and update the version reference in .gitlab-ci.yml and debian/Dockerfile
  • Then commit and push to GitLab
  • To trigger the package creation from the GitLab CI interface manually click on the publish pipeline.
  • The publish pipeline is going to store the .deb file into the GitLab registry.

Deploy on the test cluster

  • Send your new version to apt.wikimedia.org (require SRE rights)
  • Connect to the test instance and pause all the DAGs. Wait for the end of any remaining executions.
  • Stop the Airflow services
  • Manually / Puppet update the machine holding the analytics_test instance (an-test-client1001) to install the last version of the Airflow package
  • Upgrade the PG DB to the last version of airflow:
    sudo -u analytics \
        AIRFLOW_HOME=/srv/airflow-analytics_test \
        /usr/lib/airflow/bin/airflow db upgrade
    
  • Restart the Airflow services
  • Checks everything works as expected

Deploy in production

The process is the same as for the test cluster, except that you have a growing list of Airflow instances to update:

cd puppet
ack airflow::instances hieradata/

Test deb package in a stat box

# On a stat box
ssh stat1007.eqiad.wmnet
# Download the .deb (don't forget to use the proxy)
wget -O  airflow-2.6.1-py3.10-20230619_amd64.deb https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/package_files/1298/download
# Extract the archive
dpkg -X airflow-2.6.1-py3.10-20230619_amd64.deb airflow-2.6.1-py3.10-20230619_amd64
# Update all shebangs paths
./airflow-2.6.1-py3.10-20230619_amd64/usr/lib/airflow/bin/conda-unpack
# activate the unpacked environment and do your stuff
source ./airflow-2.6.1-py3.10-20230619_amd64/usr/lib/airflow/etc/profile.d/conda.sh
conda activate base
# Or launch a dev instance (See more information in run_dev_instance.sh)
git clone https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags.git
cd airflow-dags
sudo -u analytics-privatedata ./run_dev_instance.sh -m /tmp/my_home \
  -b ~/airflow-2.6.1-py3.10-20230619_amd64/usr/lib/airflow/ analytics_test