Data Platform/Systems/Airflow/Kubernetes
This page relates specifically to our Airflow instances deployed to Kubernetes, and their specificity. We assume that the airflow instance is deployed alongside a dedicated CloudnativePG cluster, running in the same namespace.
Note: replace airflow-test-k8s by other instance names where appropriate.
Architecture
Components
Airflow is deployed via the airflow
chart. The release is composed of 3 Deployment
resources:
- the webserver
- the scheduler
- the kerberos ticket renewer
Executor
While we migrate the airflow instances to Kubernetes, the scheduler will be configured with LocalExecutor
, meaning that tasks are executed as subprocess of the scheduler process. However, the chart was developed with KubernetesExecutor
in mind, meaning that any DAG task should be executed in a Kubernetes pod.
DAGs deployment
TODO
Logging
The logs of the airflow components themselves are sent to our observability pipeline and are accessible through logstash. However, the DAG task logs themselves are uploaded to S3 after completion. Streaming the logs of an ongoing DAG task can be done from the web UI, and relies on the Kubernetes Logs API (when using KubernetesExecutor
) or simply tails local logs (when using LocalExecutor
).
Security
Webserver authentication and authorization
Access to the webserver is OIDC authenticated, and the user role is derived from its LDAP groups. For example, SREs (members of the ops
LDAP group) are automatically given the Admin
role. The mapping can be customized per instance, so that we can define LDAP groups for per-instance admins and per-instance members.
API access
TODO
Kerberos
Created the keytab via the guide provided on Data Platform/Systems/Kerberos/Administration
stevemunene@krb1001:~/airflowk8$ vim list.txt
stevemunene@krb1001:~/airflowk8$ cat list.txt
airflow-test-k8s.discovery.wmnet,create_princ,analytics
airflow-test-k8s.discovery.wmnet,create_keytab,analytics
stevemunene@krb1001:~/airflowk8$ sudo generate_keytabs.py --realm WIKIMEDIA list.txt
analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA
Entry for principal analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA with kvno 1, encryption type aes256-cts-hmac-sha1-96 added to keytab WRFILE:/srv/kerberos/keytabs/airflow-test-k8s.discovery.wmnet/analytics/analytics.keytab.
Created the base64 representation for the keytab
root@krb1001:~# base64 /srv/kerberos/keytabs/airflow-test-k8s.discovery.wmnet/analytics/analytics.keytab
Then committed them to the puppetserver10001
Kubernetes RBAC
When using the KubernetesExecutor
, the scheduler needs to be able to perform CRUD operations on Pods
, and the webserver needs to be able to tail Pod
logs. As the user used deploy charts does not have permissions to create Role
and RoleBinding
resources, we deploy the chart with a specific user/role that can, called deploy-airflow
.
UNIX user impersonation
Each airflow instance is assigned a UNIX user in its dedicated image. When a task runs as a Pod, it will run as the user defined in the worker image defined in the .Values.app.executor_image
.
Instance | Image | Username | UID | GID |
---|---|---|---|---|
airflow-analytics | analytics | analytics | 906 | 906 |
airflow-analytics-product | analytics-product | analytics-product | 910 | 910 |
airflow-analytics-test | analytics | analytics | 906 | 906 |
airflow-platform-eng | analytics-platform-eng | analytics-platform-eng | 913 | 913 |
airflow-research | analytics-research | analytics-research | 912 | 912 |
airflow-search | analytics-search | analytics-search | 911 | 911 |
airflow-test-k8s | analytics | analytics | 906 | 906 |
airflow-wmde | analytics-wmde | analytics-wmde | 927 | 927 |
Database access
The airflow chart was designed to run alongside a CloudnativePG cluster running in the same namespace. However, it can be configured to use an "external" PG database, such as an-db1001.eqiad.wmnet
for transitioning purposes. The ultimate goal is to have each instance run alongside its own PG cluster.
When configured to use a Cloudnative PG cluster, access to the DB goes through PGBouncer, instead of hitting PG directly. This, as described in the airflow documentation, was made to mitigate the fact that:
Airflow is known - especially in high-performance setup - to open many connections to metadata database. This might cause problems for Postgres resource usage, because in Postgres, each connection creates a new process and it makes Postgres resource-hungry when a lot of connections are opened. Therefore we recommend to use PGBouncer as database proxy for all Postgres production installations. PGBouncer can handle connection pooling from multiple components, but also in case you have remote database with potentially unstable connectivity, it will make your DB connectivity much more resilient to temporary network problems.
Connections and variables
Connections and variables are managed via helm values, under .Values.config.airflow.connections
and .Values.config.airflow.variables
. As such, they are managed by a LocalFilesystemBackend
secret manager, and will not be visible in the web UI.
Management DAGs
The goal is for each airflow instance to have a common set of management DAGs that perform routine actions such as:
- removing task logs from S3 after they reach a certain age
- expunging DB tables from data that has reached a certain age
- removing obsolete Airflow DAG/Task lineage data from DataHub
- ...
These DAGs are tagged with airflow_maintenance
.
Operations
Creating a new instance
- The first thing you need to do is create Kubernetes read and deploy user credentials
- Add a namespace (using the same name as the airflow instance) entry into
deployment_charts/helmfile.d/admin_ng/values/dse-k8s.yaml
namespaces: # ... airflow-test-k8s: deployClusterRole: deploy-airflow tlsExtraSANs: - airflow-test-k8s.wikimedia.org
- Register a new image under the airflow blubberfile, as well as build/publish jobs under the airflow gitlab-ci file. Make sure that the UID is reserved in Puppet.
- Then, create the public and internal DNS records for this instance
- Define the airflow instance
helmfile.yaml
file and associated values (take example fromdeployment_charts/helmfile.d/dse-k8s-services/airflow-test-k8s
) - Generate the S3 keypairs for both PG and Airflow
brouberol@cephosd1001:~$ sudo radosgw-admin user create --uid=postgresql-airflow-test-k8s --display-name="postgresql-airflow-test-k8s" brouberol@cephosd1001:~$ sudo radosgw-admin user create --uid=airflow-test-k8s --display-name="airflow-test-k8s" # note: copy the `access_key` and `secret_key` from the JSON output, you will need it in the next step
- Create the S3 buckets for both PG and Airflow
brouberol@stat1008:~$ read access_key <PG S3 access key> brouberol@stat1008:~$ read secret_key <PG S3 secret key> brouberol@stat1008:~$ s3cmd --access_key=$access_key --secret_key=$secret_key --host=rgw.eqiad.dpe.anycast.wmnet --bucket-location=dpe --host-bucket=n mb s3://postgresql.postgresql-airflow-test-k8s.dse-k8s-eqiad/ brouberol@stat1008:~$ read access_key <Airflow S3 access key> brouberol@stat1008:~$ read secret_key <Airflow S3 secret key> brouberol@stat1008:~$ s3cmd --access_key=$access_key --secret_key=$secret_key --host=rgw.eqiad.dpe.anycast.wmnet --bucket-location=dpe --host-bucket=n mb s3://logs.airflow-test-k8s.dse-k8s-eqiad/
- Register the service in our IDP server (into
idp.yaml
). After the patch was merge and puppet ran on the idp servers, copy the OIDC secret key generated for the airflow service.root@idp1004:# cat /etc/cas/services/airflow_test_k8s-*.json | jq -r .clientSecret <OIDC secret key>
- Generate the secrets or both the PG cluster and the Airflow instance and add the to the private puppet repository, to
/srv/git/private/hieradata/role/common/deployment_server/kubernetes.yaml
dse-k8s: # ... postgresql-airflow-test-k8s: dse-k8s-eqiad: s3: accessKey: <PG S3 access key> secretKey: <PG S3 secret key> airflow-test-k8s: dse-k8s-eqiad: config: private: airflow__core__fernet_key: <random 64 characters> airflow__webserver__secret_key: <random 64 characters> airflow: aws_access_key_id: <Airflow S3 access key> aws_secret_access_key: <Airflow S3 secret key> oidc: client_secret: <OIDC secret key>
- Deploy the service (which should deploy both the PG cluster and the airflow instance)
- Once the instance is running, enable the ATS redirection from the wikimedia.org subdomain to the kube ingress. After puppet has run on all the cache servers (wait a good 30 minutes), https://airflow-test-k8s.wikimedia.org should display the airflow web UI, and you should be able to connect via CAS.
Creating a new user
Users are automatically created when they first login via CAS. However, you might want to create a User for applications requiring API access to the Airflow instance. To do this, exec into the webserver and create the user via the airflow users create
command.
brouberol@deploy1003:~/airflow-k8s-executor$ kubectl exec -it -c airflow-production $(kubectl get pod --no-headers -l app=airflow,component=webserver | awk '{ print $1 }') -- bash
runuser@airflow-webserver-7b79bf8db5-qmv5c:/opt/airflow$ airflow users create \
--username myapp \
--password randompassowrd \
--email=myapp@wikimedia.org \
--firstname myapp \
--lastname myapp \
--role User
Configuring out-of-band backups
The PostgreSQL database cluster for this instance will already be configured with its own backup system that writes database backups and WAL archives to the S3 interface of the Ceph cluster.
However, we decided to implement out-of-band backups of each of the S3 buckets containing these database backups, so we added a new backup pipeline to our database backup replica system, which is db1208.
In this case the file you need to modify when you add a new instance is in the private repo and is named: hieradata/role/common/mariadb/misc/analytics/backup.yaml
Add your new bucket and its access credentials to the profile::ceph::backup::s3_local::sources
hash structure, as shown.
profile::ceph::backup::s3_local::sources:
postgresql.airflow-test-k8s.dse-k8s-eqiad:
access_key: <Airflow S3 access key>
secret_key: <Airflow S3 secret key>
When merged, this will update the file /srv/postgresql_backups/rclone.conf
on db1208, adding the backups of this database cluster to the daily sync process and therefore to Bacula.
Upgrading Airflow
To upgrade Airflow, we first need to rebuild a new docker image installing on a more recent apache-airflow
package version (example). Once the patch is merged, a publish:airflow job will be kicked off for each airflow image.
Then, use the CLI described here to automatically get the docker image tag of each newly published airflow image from the Gitlab jobs.
Now, deploy the new image to the airflow-test-k8s instance, by changing the app.version
field in deployment_charts/helmfile.d/dse-k8s-services/airflow-test-k8s/values-production.yaml
, and redeploy the test instance. Any outstanding DB migrations will automatically be applied. If everything goes well, bump the airflow version under deployment_charts/helmfile.d/dse-k8s-services/_airflow_common_/values-dse-k8s-eqiad.yaml
, and redeploy every instance, one after the other.