Jump to content

Data Platform/Systems/Airflow/Kubernetes

From Wikitech

This page relates specifically to our Airflow instances deployed to Kubernetes, and their specificity. We assume that the airflow instance is deployed alongside a dedicated CloudnativePG cluster, running in the same namespace.



airflow
Attribute Value
Owner Data Platform SRE
Kubernetes Cluster dse-k8s-eqiad
Kubernetes Namespace airflow-test-k8s
Chart https://gerrit.wikimedia.org/r/plugins/gitiles/operations/deployment-charts/+/refs/heads/master/charts/airflow
Helmfiles https://gerrit.wikimedia.org/r/plugins/gitiles/operations/deployment-charts/+/refs/heads/master/helmfile.d/dse-k8s-services/airflow-test-k8s/helmfile.yaml
Docker image https://gitlab.wikimedia.org/repos/data-engineering/airflow/
Internal service DNS airflow-test-k8s.discovery.wmnet
Public service URL https://airflow-test-k8s.wikimedia.org
Logs https://logstash.wikimedia.org/app/dashboards#/view/7f6bc030-8166-11ef-9967-d396febc14fa?_g=h@c823129&_a=h@fa450d4
Metrics https://grafana.wikimedia.org/d/aca34072-ad28-4f08-8468-9a28f7b45c52/airflow-on-kubernetes?orgId=1
Monitors https://gerrit.wikimedia.org/r/plugins/gitiles/operations/alerts/+/master/team-data-platform/airflow-k8s.yaml
Application documentation https://airflow.apache.org/docs/apache-airflow/stable/index.html
Paging true
Deployment Phabricator ticket https://phabricator.wikimedia.org/T362788

Note: replace airflow-test-k8s by other instance names where appropriate.

Architecture

Airflow is deployed on Kubernetes, talks to a PG cluster deployed in the same k8s namespace. Airflow task logs, PG WALs and backups are all uploaded to S3. PG data is written to Ceph persistent volumes.
Simplified architecture diagram of an airflow deployment

Components

Airflow is deployed via the airflow chart. The release is composed of 3 Deployment resources:

  • the webserver
  • the scheduler
  • the kerberos ticket renewer

Executor

While we migrate the airflow instances to Kubernetes, the scheduler will be configured with LocalExecutor, meaning that tasks are executed as subprocess of the scheduler process. However, the chart was developed with KubernetesExecutor in mind, meaning that any DAG task should be executed in a Kubernetes pod.

DAGs deployment

For the moment, any Airlfow instance running on Kubernetes syncs up the main branch of airflow-dags every 5 minutes using https://github.com/kubernetes/git-sync, meaning that any merged MR should be reflected in Airflow in about 5 minutes.

In the near future, the model will be turned into a push vs a pull. When an MR is merged, the Gitlab CI will send a POST request to blunderbuss , which will trigger a sync of the main branch into the volume mounted by the airflow schedulers.

Even when that is in place, we'll still be able to use git-sync to synchronize feature branches in development instances.

Logging

The logs of the airflow components themselves are sent to our observability pipeline and are accessible through logstash. However, the DAG task logs themselves are uploaded to S3 after completion. Streaming the logs of an ongoing DAG task can be done from the web UI, and relies on the Kubernetes Logs API (when using KubernetesExecutor) or simply tails local logs (when using LocalExecutor).

Security

Webserver authentication and authorization

Access to the webserver is OIDC authenticated, and the user role is derived from its LDAP groups. For example, SREs (members of the ops LDAP group) are automatically given the Admin role. The mapping can be customized per instance, so that we can define LDAP groups for per-instance admins and per-instance members.

API access

Access to the Airflow API will be Kerberos authenticated, meaning that:

  • services will be able to access the API by authenticating to Kerberos via their own Keytab
  • users will be able to access the API by authenticated to Kerberos via their password and kinit
Kerberos

We generate a keytab for each instance. It will be stored as a base64-encoded secret, and only mounted on the airflow-kerberos pod, in charge of obtaining (as well as regularly renewing) a TGT, itself mounted into every single pod that will need to communicate with Kerberised systems (aka the worker pods).

Kubernetes RBAC

When using the KubernetesExecutor, the scheduler needs to be able to perform CRUD operations on Pods, and the webserver needs to be able to tail Pod logs. As the user used deploy charts does not have permissions to create Role and RoleBinding resources, we deploy the chart with a specific user/role that can, called deploy-airflow.

UNIX user impersonation

Each airflow instance has a dedicated keytab, with first principal of the form <user>/airflow-<instance-name>.discovery.wmnet@WIKIMEDIA. This will ensure that any interaction with HDFS, Spark, etc, will impersonate the <user> user.

For example, the first principal of airflow-test-k8s instance is analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA, which enables impersonation of the analytics user in Hadoop.

Database access

The airflow chart was designed to run alongside a CloudnativePG cluster running in the same namespace. However, it can be configured to use an "external" PG database, such as an-db1001.eqiad.wmnet for transitioning purposes. The ultimate goal is to have each instance run alongside its own PG cluster.

When configured to use a Cloudnative PG cluster, access to the DB goes through PGBouncer, instead of hitting PG directly. This, as described in the airflow documentation, was made to mitigate the fact that:

Airflow is known - especially in high-performance setup - to open many connections to metadata database. This might cause problems for Postgres resource usage, because in Postgres, each connection creates a new process and it makes Postgres resource-hungry when a lot of connections are opened. Therefore we recommend to use PGBouncer as database proxy for all Postgres production installations. PGBouncer can handle connection pooling from multiple components, but also in case you have remote database with potentially unstable connectivity, it will make your DB connectivity much more resilient to temporary network problems.

Connections

Connections are managed via helm values, under .Values.config.airflow.connections. As such, they are managed by a LocalFilesystemBackend secret manager, and will not be visible in the web UI.

Management DAGs

The Kubernetes Airflow instances come with built-in maintenance DAGs, performing actions such as:

  • removing task logs from S3 after they reach a certain age
  • expunging DB tables from data that has reached a certain age
  • removing obsolete Airflow DAG/Task lineage data from DataHub
  • ...

These DAGs are tagged with airflow_maintenance.

You can set the following Airflow variables in your release values, under config.airflow.variables, to configure the Airflow maintenance DAGs:

  • s3_log_retention_days (default value: 30): number of days of task logs to keep in S3
  • db_cleanup_tables  : a comma-separated list of tables that will be regularly expunged of old data, to keep the database as lean as possible
  • db_cleanup_retention_days: if specified along with db_cleanup_tables, specifies the number of days after which data will be cleaned from the these tables.

Operations

Moved to Data Platform/Systems/Airflow/Kubernetes/Operations

I'm getting paged

Pods are not running

If you're getting an alert or getting paged because the app isn't running, investigate if something in the application logs (see the checklist section) could explain the crash. In case of a recurring crash, the pod would be in CrashloopBackoff state in Kubernetes. To check whether this is the case, ssh to the deployment server and run the following commands

kube_env <namespace> dse-k8s-eqiad
kubectl get pods

Then you can tail the logs as needed. Feel free to refer to the log dashboard listed in the checklist.

If no pod at all is displayed, re-deploy the app by following the Kubernetes deployment instructions.

How to

Use the airflow CLI

brouberol@deploy2002:~$ kube_env airflow-test-k8s-deploy dse-k8s-eqiad
brouberol@deploy2002:~$ kubectl exec -it $(kubectl get pod -l app=airflow,component=webserver --no-headers -o custom-columns=":metadata.name") -c airflow-production -- airflow
Usage: airflow [-h] GROUP_OR_COMMAND ...

Positional Arguments:
  GROUP_OR_COMMAND

    Groups
      config         View configuration
      connections    Manage connections
      dags           Manage DAGs
      db             Database operations
      jobs           Manage jobs
      kubernetes     Tools to help run the KubernetesExecutor
      pools          Manage pools
      providers      Display providers
      roles          Manage roles
      tasks          Manage tasks
      users          Manage users
      variables      Manage variables

    Commands:
      cheat-sheet    Display cheat sheet
      dag-processor  Start a standalone Dag Processor instance
      info           Show information about current Airflow and environment
      kerberos       Start a kerberos ticket renewer
      plugins        Dump information about loaded plugins
      rotate-fernet-key
                     Rotate encrypted connection credentials and variables
      scheduler      Start a scheduler instance
      standalone     Run an all-in-one copy of Airflow
      sync-perm      Update permissions for existing roles and optionally DAGs
      triggerer      Start a triggerer instance
      version        Show the version
      webserver      Start a Airflow webserver instance

Options:
  -h, --help         show this help message and exit

airflow command error: the following arguments are required: GROUP_OR_COMMAND, see help above.
command terminated with exit code 2