Jump to content

Data Platform/Systems/Airflow

From Wikitech
Please note that we are currently in the process of migrating Airflow to Kubernetes (task T362788) - See Data Platform/Systems/Airflow/Kubernetes for more details. Many of the instructions on this page refer to the pre-Kubernetes system and will need to be updated.

Apache Airflow is a workflow job scheduler. Developers declare job workflows using a custom DAG (directed acyclic graph) python API.

This page documents the Data Engineering managed Airflow instances in the Analytics Cluster. As of November 2024, we are running Airflow 2.10.3 (docs).

If you wish to develop DAGs with Airflow, you can find more information on the Airflow Developer guide page.

Airflow setup and conventions

The Data Engineering team maintains several Airflow instances. Usually, these instances are team specific. Teams have full control over their airflow instance. Data Platform Engineering manages the tooling needed to deploy and run these instances.

The current production instances are all host-based, meaning that they run on either bare-metal or individual virtual machines. The Airflow instances on Kubernetes run as distributed applications on the dse-k8s cluster in eqiad.

The host-based instances all live within the Analytics Cluster VLAN, and have access to Hadoop and other Analytics Cluster related tools. It is expected that the Airflow instances themselves do not perform real computation tasks; instead they should submit jobs to the Hadoop cluster. Airflow is used for the pipelining and scheduling of these jobs.

The Kubernetes based instances will also support running jobs on the Hadoop cluster using exactly the same tooling, but will also be able to support different execution models owing to the use of the Kubernetes Executor for Airflow.

Authentication

Host-based instances

All of our host-based Airflow instances are currently accessed via SSH tunnels, so management of DAG runs and tasks requires production shell access and membership of a specific group. This access is controlled by SRE, which means that our authentication and access control mechanism is external to Airflow itself. For this reason we allow full access rights to any user of the Airflow web interface.

Where to locate DAG run notes

However, a recent bug (see also: task T352534) means that we can no longer add notes to DAG runs unless we are logged in.

For this reason, we have created a simple user account called admin on each instance and assigned the password admin to it. You can log into this account if you wish to add add or edit notes associated with tasks. We expect to be reviewing this configuration in the near future, as we improve the Airflow service and user experience.

Kubernetes instances

The instances running on Kubernetes use an authentication mechanism that is integrated with airflow and backed by our CAS-SSO system. Users authenticate using their Mediwiki developer account and an LDAP group mapping determines the level of access permitted. Membership of the wmf or nda groups is required for read-only access. Each instance then has a specific LDAP group that maps to the operations users capability. Members of ops are granted admin rights. on the instances.

Metadata Database

Host-based instances

The current airflow backend is PostgreSQL. The architecture is as follows:

  • 1 instance of PostgreSQL on an-db1001
  • 1 DB per Airflow instance
  • 1 user account per database

Some of the reasons are:

  • Prevents unwanted queries across Airflow instance databases, or unwanted access to data (isolation).
  • Easier configuration with 1 database per instance (authorization, backups,...).

Kubernetes instances

Each of these instances has its own PostgreSQL cluster that is deployed with the instance by the CloudnativePG operator. There are two PostgreSQL database instances in each cluster, operating in a high-availability mode, with automatic failover. The storage layer for these databases is the Ceph storage cluster operated by the Data Platform Engineering team.

These CloudnativePG clusters also include a set of three pgbouncer pods, running as a connection pooler.

Airflow DAGs

To develop best practices around Airflow, we use a single shared git repository for Airflow DAGs for all instances: data-engineering/airflow-dags. Airflow instance (and team) specific DAGs live in subdirectories of this repository, e.g. in <instance_name>/dags.

Host-based DAG deployment

Each Airflow instance has its own scap deployment of data-engineering/airflow-dags. See Scap#Other_software_deployments for instructions on how to use scap to deploy.

Your airflow instance's airflow-dags scap deployment directory is located at /srv/deployment/airflow-dags/<instance_name> on the deployment server as well as on your airflow host. To deploy:

ssh deployment.eqiad.wmnet
cd /srv/deployment/airflow-dags/<instance_name>
git pull # or checkout, do whatever you need to make this git clone ready for deployment
scap deploy

Kubernetes based DAG deployment

We are intending to use a continuous-deployment model for this, although the precise mechanism has yet to be decided. See task T368033 for the current work.

Skein

We run Skein as a way to schedule Python Spark jobs on YARN from Airflow scheduled jobs. Skein is deployed on all airflow hosts.

In order to have Airflow authenticate against the Skein server, an x509 certificate is automatically generated in the airflow home directory, under .skein/skein.crt that has a validity period of 1 year. If that certificate expires though, Airflow won't be able to schedule Spark jobs on YARN and we will face an outage (see https://phabricator.wikimedia.org/T344617).

We monitor the expiry date of these certificates (https://grafana-rw.wikimedia.org/d/980N6H7Iz/skein-certificate-expiry?orgId=1), and we have a weekly systemd job in charge of renewing the certificate, to make sure that we never face such an outage.

I'm getting paged for a Skein certificate about to expire

ssh onto the host associated to the alert, and go the the Airflow home directory. Hint: the alert will include a cert label which value is the absolute path of the associated certificate. Assuming that label has a value of /srv/airflow-platform_eng/.skein/skein.crt, run

sudo su - analytics-platform-eng
export HOME=/srv/airflow-platform_eng/
source /usr/lib/airflow/bin/activate
skein config gencerts --force

You can then check the new expiry date of the certificate

openssl x509 -in ~/.skein/skein.crt -dates |head -n 2
notBefore=Aug 21 15:39:26 2023 GMT
notAfter=Aug 20 15:39:26 2024 GMT

(See original phab task for details: https://phabricator.wikimedia.org/T344617#9106681)

See also

Airflow Instances

Kept up to date at: Data_Engineering/Systems/Airflow/Instances#List_of_instances

Airflow on Kubernetes

Kept up to date at Data Platform/Systems/Airflow/Kubernetes

Airflow Upgrades

The Airflow upgrade procedure is documented at: Data_engineering/Systems/Airflow/Upgrading

Administration

Overview of Data Engineering's Airflow deployments

Data Engineering maintains a debian package for Airflow at operations/debs/airflow/. This debian packaging installs a premade conda environment with all dependencies needed to run Airflow. The debian package installs this conda environment to /usr/lib/airflow.

The airflow::instance Puppet define is used to set up and run Airflow instances. This define can be used multiple times on the same host to declare multiple airflow instances. The instance specific configs are installed in /srv/airflow-<instance_name>, and templated systemd units are set up for services airflow-scheduler@<instance_name> and airflow-webserver@<instance_name>.

The profile::airflow Puppet class uses the profile::airflow::instances hiera variable to declare airflow::instances. This allows each airflow::instance to be fully specified via hiera. profile::airflow by default will use Data Engineering conventions as defaults for an airflow::instance.

These defaults include setting up instance specific scap::targets of the data-engineering/airflow-dags repository. (There is still some manual setup needed for this, see the instructions below on how to configure this for new instances.) The Airflow instance's dags_folder will be automatically set to one of the instance specific subdirectories in the airflow-dags repository. (You can override this in hiera if you need.)

Creating a new Airflow Instance

The information below is outdated, since any new instances will be deployed to Kubernetes.

In this example, we'll be creating a new Airflow instance named 'test'.

Prepare airflow-dags for deployment to the new instance

Create the instance specific dags folder

By convention, all Airflow team instances use the same DAGs repository: data-engineering/airflow-dags. Instance specific DAGs are located in the <instance-name>/dags directory. Unless you override defaults in puppet/hiera, this will be used as airflow's dags_folder.

Create this directory and commit the changes before proceeding. In our example, this directory would be test/dags, since 'test' is our instance name.

Create the instance specific scap repository

Scap requires configuration that is declared for each of its deployments. Because we use the same source DAGs repository for all airflow instances, we can't just add the scap.cfg file to the main airflow-dags repository. Instead, we use separately managed 'scap repositories' in which the deployment configuration is declared.

Create a new repository in gitlab with the name data-engineering/airflow-dags-scap-<instance_name>. For our example, we'll be creating data-engineering/airflow-dags-scap-test.

You'll need to create two files in this repository:

Create scap/scap.cfg with the following content:

[global]
git_repo: data-engineering/airflow-dags
ssh_user: test_user # (this user must exist on the airflow host, and it must be in the deploy_airflow.trusted_groups (see below)
dsh_targets: targets

And create a scap/targets file with the list of hostnames that will be deployed too. Likely this will be only your airflow host.

hostname1001.eqiad.wmnet
Create a scap deployment source

Scap is used to deploy the data-engineering/airflow-dags repository to airflow instances. Declaration of scap::target will be taken care for you by profile::airflow, but you will need to declare the scap::source for the deployment server.

Edit hieradata/role/common/deployment_server/kubernetes.yaml and add a new entry to scap::sources:

scap::sources:
    airflow-dags/test:
    repository: data-engineering/airflow-dags
    # This is the name of the scap repository we created in the previous step.
    scap_repository: data-engineering/airflow-dags-scap-test
    origin: gitlab

You'll also need to make sure that real users will be able to deploy. They must be in a posix group that has access to the deployment server, as well as in a group listed in this hiera config:

  # Shared deploy ssh key for Data Engineering maintained
  # Airflow instances. For now, all admins of Airflow instances
  # can deploy any Airflow instance.
  deploy_airflow:
    trusted_groups:
      - analytics-deployers
      # ...

Merge any changes and run puppet on the deployment server.

Create the Airflow PostgreSQL Database

Add a reference to the instance name in puppet/hieradata/role/common/analytics_cluster/postgresql.yaml under the profile::analytics::postgresql::databases key.

Add the corresponding password in the private repo in the file: /srv/private/hieradata/role/common/analytics_cluster/postgresql.yaml

Configure the Airflow instance in Puppet

Add the profile::airflow class to your node's role in Puppet and configure the Airflow instance(s) in your role's hiera.

Let's assume we're adding this instance in a role class role::airflow::test.

class role::airflow::test {
    include ::profile::airflow
    # profile::kerberos::keytabs is needed if your Airflow
    # instance needs to authenticate with Kerberos.
    # You'll need to create and configure the keytab for the Airflow instance's
    # $service_user we'll set below.
    include ::profile::kerberos::keytabs
}


Then, in hieradata/role/common/airflow/test.yaml:

# Set up airflow instances.
profile::airflow::instances:
  # airflow@test instance.
  test:
    # Since we set security: kerberos a keytab must be deployed for the service_user.
    service_user: test_user
    service_group: test_group
    # Set this to true if you want enable alerting for your airflow instance.
    monitoring_enabled: false
    # Configuration for /srv/airflow-test/airflow.cfg
    # Any airflow::instance configs can go here. See:
    # https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html
    # NOTE: unless your airflow instance does special things, the defaults
    # set in profile::airflow should be sufficient for setting up a
    # WMF Data Engineering managed airflow::instance.
    #airflow_config:
    #  core:

# Make sure the keytab for test_user is deployed via profile::kerberos::keytabs
profile::kerberos::keytabs::keytabs_metadata:
  - role: 'test_user'
    owner: 'test_user'
    group: 'test_group'
    filename: 'test_user.keytab'

See Create_a_keytab_for_a_service for instructions on creating keytabs.

Note that we didn't set db_user or db_password. These are secrets and should be set in the operations puppet private repository in the hiera variable profile::airflow::instances_secrets. So, in puppet private in the hieradata/role/common/airflow/test.yaml file:

# Set up airflow instances.
profile::airflow::instances_secrets:
  # airflow@test instance.
  test:
    db_user: airflow_test
    db_password: password_here

profile::airflow::instances_secrets will be merged with profile::airflow::instances by the profile::airflow class, and the parameters to airflow::instance will be available for use in the sql_alchemy_conn as an ERb template.

Once this is merged and applied, the node with the role::airflow::test will run the systemd services airflow-scheduler@test, airflow-webserver@test, airflow-kerberos@test, as well as some 'control' systemd services airflow@test and airflow that can be used to manage the Airflow test instance.

Create the airflow tables by running

 sudo -u test_user airflow-test db upgrade

The airflow services were probably already started by the earlier puppet run. Restart them now that the airflow tables are created properly.

 sudo systemctl restart airflow@test.service

Add service user to the Yarn production queue

Since all Yarn applications (Spark job, Skein apps, etc.) are submitted by the service user running the Airflow instance, we need to grant this user permissions in one of Yarn's queues defined in yarn_capacity_scheduler.pp. All Airflow instance users should be allowed to run on the "production" queue. Example adding "test_user" below:

...
  # this allows test_user to submit applications to the 'production' queue
  'yarn.scheduler.capacity.root.production.acl_submit_applications' => 'test_user,existingUser1,existingUser2',
...
  # this redirects applications submitted by test_user to the 'production' queue if no queue was specified.
  'yarn.scheduler.capacity.queue-mappings' => 'u:test_user:production,u:existingUser1:production,u:u:existingUser2:production',
...

Incident reports & known issues

Add incident reports and knowns issues in the following table. Please add a short description of the issue and a link to a more detailed one: either a wiki page or a Phabricator task. Thanks! :]

Date Incident / Issue description link
2022-07-26 This is an example incident description. example.link