Jump to content

Data Platform/Systems/Airflow/Kubernetes

From Wikitech

This page relates specifically to our Airflow instances deployed to Kubernetes, and their specificity. We assume that the airflow instance is deployed alongside a dedicated CloudnativePG cluster, running in the same namespace.



airflow
Attribute Value
Owner Data Platform SRE
Kubernetes Cluster dse-k8s-eqiad
Kubernetes Namespace airflow-test-k8s
Chart https://gerrit.wikimedia.org/r/plugins/gitiles/operations/deployment-charts/+/refs/heads/master/charts/airflow
Helmfiles https://gerrit.wikimedia.org/r/plugins/gitiles/operations/deployment-charts/+/refs/heads/master/helmfile.d/dse-k8s-services/airflow-test-k8s/helmfile.yaml
Docker image https://gitlab.wikimedia.org/repos/data-engineering/airflow/
Internal service DNS airflow-test-k8s.discovery.wmnet
Public service URL https://airflow-test-k8s.wikimedia.org
Logs https://logstash.wikimedia.org/app/dashboards#/view/7f6bc030-8166-11ef-9967-d396febc14fa?_g=h@c823129&_a=h@fa450d4
Metrics https://grafana.wikimedia.org/d/aca34072-ad28-4f08-8468-9a28f7b45c52/airflow-on-kubernetes?orgId=1
Monitors TODO
Application documentation https://airflow.apache.org/docs/apache-airflow/stable/index.html
Paging true
Deployment Phabricator ticket https://phabricator.wikimedia.org/T362788

Note: replace airflow-test-k8s by other instance names where appropriate.

Architecture

Airflow is deployed on Kubernetes, talks to a PG cluster deployed in the same k8s namespace. Airflow task logs, PG WALs and backups are all uploaded to S3. PG data is written to Ceph persistent volumes.
Simplified architecture diagram of an airflow deployment

Components

Airflow is deployed via the airflow chart. The release is composed of 3 Deployment resources:

  • the webserver
  • the scheduler
  • the kerberos ticket renewer

Executor

While we migrate the airflow instances to Kubernetes, the scheduler will be configured with LocalExecutor, meaning that tasks are executed as subprocess of the scheduler process. However, the chart was developed with KubernetesExecutor in mind, meaning that any DAG task should be executed in a Kubernetes pod.

DAGs deployment

TODO

Logging

The logs of the airflow components themselves are sent to our observability pipeline and are accessible through logstash. However, the DAG task logs themselves are uploaded to S3 after completion. Streaming the logs of an ongoing DAG task can be done from the web UI, and relies on the Kubernetes Logs API (when using KubernetesExecutor) or simply tails local logs (when using LocalExecutor).

Security

Webserver authentication and authorization

Access to the webserver is OIDC authenticated, and the user role is derived from its LDAP groups. For example, SREs (members of the ops LDAP group) are automatically given the Admin role. The mapping can be customized per instance, so that we can define LDAP groups for per-instance admins and per-instance members.

API access

TODO

Kerberos

Created the keytab via the guide provided on Data Platform/Systems/Kerberos/Administration

stevemunene@krb1001:~/airflowk8$ vim list.txt
stevemunene@krb1001:~/airflowk8$ cat list.txt 
airflow-test-k8s.discovery.wmnet,create_princ,analytics
airflow-test-k8s.discovery.wmnet,create_keytab,analytics

stevemunene@krb1001:~/airflowk8$ sudo generate_keytabs.py --realm WIKIMEDIA list.txt
analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA
Entry for principal analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA with kvno 1, encryption type aes256-cts-hmac-sha1-96 added to keytab WRFILE:/srv/kerberos/keytabs/airflow-test-k8s.discovery.wmnet/analytics/analytics.keytab.

Created the base64 representation for the keytab

root@krb1001:~# base64 /srv/kerberos/keytabs/airflow-test-k8s.discovery.wmnet/analytics/analytics.keytab

Then committed them to the puppetserver10001

Kubernetes RBAC

When using the KubernetesExecutor, the scheduler needs to be able to perform CRUD operations on Pods, and the webserver needs to be able to tail Pod logs. As the user used deploy charts does not have permissions to create Role and RoleBinding resources, we deploy the chart with a specific user/role that can, called deploy-airflow.

UNIX user impersonation

Each airflow instance is assigned a UNIX user in its dedicated image. When a task runs as a Pod, it will run as the user defined in the worker image defined in the .Values.app.executor_image.

Instance Image Username UID GID
airflow-analytics analytics analytics 906 906
airflow-analytics-product analytics-product analytics-product 910 910
airflow-analytics-test analytics analytics 906 906
airflow-platform-eng analytics-platform-eng analytics-platform-eng 913 913
airflow-research analytics-research analytics-research 912 912
airflow-search analytics-search analytics-search 911 911
airflow-test-k8s analytics analytics 906 906
airflow-wmde analytics-wmde analytics-wmde 927 927

Database access

The airflow chart was designed to run alongside a CloudnativePG cluster running in the same namespace. However, it can be configured to use an "external" PG database, such as an-db1001.eqiad.wmnet for transitioning purposes. The ultimate goal is to have each instance run alongside its own PG cluster.

When configured to use a Cloudnative PG cluster, access to the DB goes through PGBouncer, instead of hitting PG directly. This, as described in the airflow documentation, was made to mitigate the fact that:

Airflow is known - especially in high-performance setup - to open many connections to metadata database. This might cause problems for Postgres resource usage, because in Postgres, each connection creates a new process and it makes Postgres resource-hungry when a lot of connections are opened. Therefore we recommend to use PGBouncer as database proxy for all Postgres production installations. PGBouncer can handle connection pooling from multiple components, but also in case you have remote database with potentially unstable connectivity, it will make your DB connectivity much more resilient to temporary network problems.

Connections and variables

Connections and variables are managed via helm values, under .Values.config.airflow.connections and .Values.config.airflow.variables. As such, they are managed by a LocalFilesystemBackend secret manager, and will not be visible in the web UI.

Management DAGs

The goal is for each airflow instance to have a common set of management DAGs that perform routine actions such as:

  • removing task logs from S3 after they reach a certain age
  • expunging DB tables from data that has reached a certain age
  • removing obsolete Airflow DAG/Task lineage data from DataHub
  • ...

These DAGs are tagged with airflow_maintenance.

Operations

Creating a new instance

In the following section, we're going to assume that we're creating a new airflow instance named `airflow-test-k8s`, deployed with a dedicated PG cluster named `postgresql-airflow-test-k8s`, deployed in the `dse-k8s-eqiad` Kubernetes environment.
  • The first thing you need to do is create Kubernetes read and deploy user credentials
  • Add a namespace (using the same name as the airflow instance) entry into deployment_charts/helmfile.d/admin_ng/values/dse-k8s.yaml
    namespaces:
      # ...
      airflow-test-k8s:
        deployClusterRole: deploy-airflow
        tlsExtraSANs:
          - airflow-test-k8s.wikimedia.org
    
  • Register a new image under the airflow blubberfile, as well as build/publish jobs under the airflow gitlab-ci file. Make sure that the UID is reserved in Puppet.
  • Then, create the public and internal DNS records for this instance
  • Define the airflow instance helmfile.yaml file and associated values (take example from deployment_charts/helmfile.d/dse-k8s-services/airflow-test-k8s)
  • Generate the S3 keypairs for both PG and Airflow
    brouberol@cephosd1001:~$ sudo radosgw-admin user create --uid=postgresql-airflow-test-k8s --display-name="postgresql-airflow-test-k8s"
    brouberol@cephosd1001:~$ sudo radosgw-admin user create --uid=airflow-test-k8s --display-name="airflow-test-k8s"
    # note: copy the `access_key` and `secret_key` from the JSON output, you will need it in the next step
    
  • Create the S3 buckets for both PG and Airflow
    brouberol@stat1008:~$ read access_key
    <PG S3 access key>
    brouberol@stat1008:~$ read secret_key
    <PG S3 secret key>
    brouberol@stat1008:~$ s3cmd --access_key=$access_key --secret_key=$secret_key --host=rgw.eqiad.dpe.anycast.wmnet --bucket-location=dpe --host-bucket=n mb s3://postgresql.postgresql-airflow-test-k8s.dse-k8s-eqiad/
    brouberol@stat1008:~$ read access_key
    <Airflow S3 access key>
    brouberol@stat1008:~$ read secret_key
    <Airflow S3 secret key>
    brouberol@stat1008:~$ s3cmd --access_key=$access_key --secret_key=$secret_key --host=rgw.eqiad.dpe.anycast.wmnet --bucket-location=dpe --host-bucket=n mb s3://logs.airflow-test-k8s.dse-k8s-eqiad/
    
  • Register the service in our IDP server (into idp.yaml). After the patch was merge and puppet ran on the idp servers, copy the OIDC secret key generated for the airflow service.
    root@idp1004:# cat /etc/cas/services/airflow_test_k8s-*.json  | jq -r .clientSecret
    <OIDC secret key>
    
  • Generate the secrets or both the PG cluster and the Airflow instance and add the to the private puppet repository, to /srv/git/private/hieradata/role/common/deployment_server/kubernetes.yaml
    dse-k8s:
        # ...
        postgresql-airflow-test-k8s:
          dse-k8s-eqiad:
            s3:
              accessKey: <PG S3 access key>
              secretKey: <PG S3 secret key>
    
        airflow-test-k8s:
          dse-k8s-eqiad:
            config:
              private:
                airflow__core__fernet_key: <random 64 characters>
                airflow__webserver__secret_key: <random 64 characters>
              airflow:
                aws_access_key_id: <Airflow S3 access key>
                aws_secret_access_key: <Airflow S3 secret key>
              oidc:
                client_secret: <OIDC secret key>
    
  • Deploy the service (which should deploy both the PG cluster and the airflow instance)
  • Once the instance is running, enable the ATS redirection from the wikimedia.org subdomain to the kube ingress. After puppet has run on all the cache servers (wait a good 30 minutes), https://airflow-test-k8s.wikimedia.org should display the airflow web UI, and you should be able to connect via CAS.

Creating a new user

Users are automatically created when they first login via CAS. However, you might want to create a User for applications requiring API access to the Airflow instance. To do this, exec into the webserver and create the user via the airflow users create command.

brouberol@deploy1003:~/airflow-k8s-executor$ kubectl exec -it -c airflow-production $(kubectl get pod --no-headers -l app=airflow,component=webserver  | awk '{ print $1 }') -- bash
runuser@airflow-webserver-7b79bf8db5-qmv5c:/opt/airflow$ airflow users create \
    --username myapp \
    --password randompassowrd \
    --email=myapp@wikimedia.org \
    --firstname myapp \
    --lastname myapp \
    --role User

Configuring out-of-band backups

The PostgreSQL database cluster for this instance will already be configured with its own backup system that writes database backups and WAL archives to the S3 interface of the Ceph cluster.

However, we decided to implement out-of-band backups of each of the S3 buckets containing these database backups, so we added a new backup pipeline to our database backup replica system, which is db1208.

In this case the file you need to modify when you add a new instance is in the private repo and is named: hieradata/role/common/mariadb/misc/analytics/backup.yaml

Add your new bucket and its access credentials to the profile::ceph::backup::s3_local::sources hash structure, as shown.

profile::ceph::backup::s3_local::sources:
  postgresql.airflow-test-k8s.dse-k8s-eqiad:
    access_key: <Airflow S3 access key>
    secret_key: <Airflow S3 secret key>

When merged, this will update the file /srv/postgresql_backups/rclone.conf on db1208, adding the backups of this database cluster to the daily sync process and therefore to Bacula.

Upgrading Airflow

To upgrade Airflow, we first need to rebuild a new docker image installing on a more recent apache-airflow package version (example). Once the patch is merged, a publish:airflow job will be kicked off for each airflow image.

Then, use the CLI described here to automatically get the docker image tag of each newly published airflow image from the Gitlab jobs.

Now, deploy the new image to the airflow-test-k8s instance, by changing the app.version field in deployment_charts/helmfile.d/dse-k8s-services/airflow-test-k8s/values-production.yaml, and redeploy the test instance. Any outstanding DB migrations will automatically be applied. If everything goes well, bump the airflow version under deployment_charts/helmfile.d/dse-k8s-services/_airflow_common_/values-dse-k8s-eqiad.yaml, and redeploy every instance, one after the other.