Data Platform/Systems/Airflow/Kubernetes/Operations
Creating a new instance
- The first thing you need to do is create Kubernetes read and deploy user credentials
- Add a namespace (using the same name as the airflow instance) entry into
deployment_charts/helmfile.d/admin_ng/values/dse-k8s.yaml
namespaces: # ... airflow-test-k8s: deployClusterRole: deploy-airflow tlsExtraSANs: - airflow-test-k8s.wikimedia.org
- Add the namespace under the
tenantNamespaces
list indeployment_charts/helmfile.d/admin_ng/values/dse-k8s-eqiad/cephfs-csi-rbd-values.yaml
as well asdeployment_charts/helmfile.d/admin_ng/values/dse-k8s-eqiad/cephfs-csi-rbd-values.yaml
- Add the namespace to the
watchedNamespaces
list defined indeployment_charts/helmfile.d/admin_ng/values/dse-k8s-eqiad/cloudnative-pg.yaml
- Then, create the public and internal DNS records for this instance
- Define the airflow instance
helmfile.yaml
file and associated values (take example fromdeployment_charts/helmfile.d/dse-k8s-services/airflow-test-k8s
) - Generate the S3 keypairs for both PG and Airflow
brouberol@cephosd1001:~$ sudo radosgw-admin user create --uid=postgresql-airflow-test-k8s --display-name="postgresql-airflow-test-k8s" brouberol@cephosd1001:~$ sudo radosgw-admin user create --uid=airflow-test-k8s --display-name="airflow-test-k8s" # note: copy the `access_key` and `secret_key` from the JSON output, you will need it in the next step
- Create the S3 buckets for both PG and Airflow
- Register the service in our IDP server (into
idp.yaml
). After the patch was merged and puppet ran on the idp servers, copy the OIDC secret key generated for the airflow service.root@idp1004:# cat /etc/cas/services/airflow_test_k8s-*.json | jq -r .clientSecret <OIDC secret key>
- Issue a Kerberos keytab following the guide provided on Data Platform/Systems/Kerberos/Administration. The principal must match the username of the
runs.as
user defined in the image blubberfile. The hostname must match the internal service discovery domain of the airflow instance.# Change `analytics` to the UNIX user the airflow tasks will impersonate by default in Hadoop brouberol@krb1001:~$ sudo kadmin.local addprinc -randkey analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA brouberol@krb1001:~$ sudo kadmin.local addprinc -randkey airflow/airflow-test-k8s.discovery.wmnet@WIKIMEDIA brouberol@krb1001:~$ sudo kadmin.local addprinc -randkey HTTP/airflow-test-k8s.discovery.wmnet@WIKIMEDIA brouberol@krb1001:~$ sudo kadmin.local ktadd -norandkey -k analytics.keytab \ analytics/airflow-test-k8s.discovery.wmnet \ airflow/airflow-test-k8s.discovery.wmnet@WIKIMEDIA \ HTTP/airflow-test-k8s.discovery.wmnet@WIKIMEDIA
Created the base64 representation for the keytab
root@krb1001:~# base64 analytics.keytab
- Generate the secrets or both the PG cluster and the Airflow instance and add the to the private puppet repository, to
/srv/git/private/hieradata/role/common/deployment_server/kubernetes.yaml
dse-k8s: # ... postgresql-airflow-test-k8s: dse-k8s-eqiad: s3: accessKey: <PG S3 access key> secretKey: <PG S3 secret key> airflow-test-k8s: dse-k8s-eqiad: config: private: airflow__core__fernet_key: <random 64 characters> airflow__webserver__secret_key: <random 64 characters> airflow: aws_access_key_id: <Airflow S3 access key> aws_secret_access_key: <Airflow S3 secret key> oidc: client_secret: <OIDC secret key> kerberos: keytab: | # The base64 representation obtained in the previous step ABCDEFHIJKLMNOP ABCDEFHIJKLMNOP=
- Register the PG bucket name and keys into
/srv/git/private/hieradata/role/common/mariadb/misc/analytics/backup.yaml
on thepuppetserver
host. This will make sure that the PG base backups and WALs are regularly backed up outside of our Ceph cluster.profile::ceph::backup::s3_local::sources: ... postgresql-airflow-test-k8s.dse-k8s-eqiad: # must match the PG bucket name access_key: <PG S3 access key> secret_key: <PG S3 secret key>
- Deploy the service (which should deploy both the PG cluster and the airflow instance)
- Once the instance is running, enable the ATS redirection from the wikimedia.org subdomain to the kube ingress. After puppet has run on all the cache servers (wait a good 30 minutes), https://airflow-test-k8s.wikimedia.org should display the airflow web UI, and you should be able to connect via CAS.
Migrating an existing instance
This section addresses how to perform a piecemeal migration of the airflow instances listed in Data Platform/Systems/Airflow/Instances to Kubernetes. The result of this migration is an airflow instance (webserver, scheduler and kerberos) all running in Kubernetes, alongside with the database itself, without any data loss.
The migration is done in 4 steps:
- Migrate the webserver to Kubernetes
- Migrate the scheduler and kerberos components to Kubernetes
- Deploy a CloudnativePG cluster in the name Kubernetes namespace than Airlow and import the data
- Cleanup
At the time of the writing, we have already migrated airflow-analytics-test
, and we'll assume that this documentation covers the case of the airflow-search
instance.
Migrating the webserver to Kubernetes
To only deploy the webserver to Kubernetes, we need to deploy airflow in a way that makes sure it talks to an external database, and opts out of the scheduler and kerberos components.
Prep work
- The first thing you need to do is create Kubernetes read and deploy user credentials
- Add a namespace (using the same name as the airflow instance) entry into
deployment_charts/helmfile.d/admin_ng/values/dse-k8s.yaml
namespaces: # ... airflow-search: deployClusterRole: deploy-airflow tlsExtraSANs: - airflow-search.wikimedia.org
- Add the
airflow-search
namespace under thetenantNamespaces
list indeployment_charts/helmfile.d/admin_ng/values/dse-k8s-eqiad/cephfs-csi-rbd-values.yaml
as well asdeployment_charts/helmfile.d/admin_ng/values/dse-k8s-eqiad/cephfs-csi-rbd-values.yaml
- Add the
airflow-search
namespace to thewatchedNamespaces
list defined indeployment_charts/helmfile.d/admin_ng/values/dse-k8s-eqiad/cloudnative-pg.yaml
- Deploy
admin_ng
- Create the public and internal DNS records for this instance
- Create the
airflow-search-ops
LDAP group - Register the service in our IDP server (into
idp.yaml
). After the patch was merged and puppet ran on the idp servers, copy the OIDC secret key generated for the airflow service.root@idp1004:# cat /etc/cas/services/airflow_search-*.json | jq -r .clientSecret <OIDC secret key>
Defining a secret key shared between the scheduler and the webserver
To be able to have the webserver running on Kubernetes fetch tasks logs from the scheduler running on an Airflow host, they need to share the same secret key. This means that we need to commit this secret key in a location that will be taken into account by Puppet, as well as another that will be taken into account by our Helm tooling.
First, generate a random string (64 characters long is good). Then commit that string in the 2 following locations, on puppetserver
:
# /srv/git/private/hieradata/role/common/analytics_cluster/airflow/search.yaml
# warn: adapt the file path for each airflow instance
profile::airflow::instances_secrets:
search:
...
secret_key: <secret key>
Run puppet on the airflow instance, and make sure each airflow service is restarted.
Keep the secret key handy, it will be used in the next section. Copy the db_password
value as well, you will need it for the next step.
Defining the webserver configuration
Add the following block in /srv/git/private/hieradata/role/common/deployment_server/kubernetes.yaml
, on puppetserver
.
dse-k8s:
# ...
airflow-search:
dse-k8s-eqiad:
config:
private:
airflow__webserver__secret_key: <secret key>
airflow:
postgresqlPass: <PG password from the previous section>
oidc:
client_secret: <OIDC secret key>
kerberos:
keytab: |
<base64 representation of keytab>
Then, create a new airflow-search
folder in deployment-charts/helmfile.d/dse-k8s-services
(feel free to copy from deployment-charts/helmfile.d/dse-k8s-services/airflow-analytics-test
). Your values-production.yaml
file should look like this:
config:
airflow:
dags_folder: search
instance_name: search
dbHost: an-db1001.eqiad.wmnet
dbName: airflow_search
dbUser: airflow_search
auth:
role_mappings:
airflow-search-ops: [Op]
config:
logging:
remote_logging: false
oidc:
client_id: airflow_search
external_services:
postgresql: [analytics]
airflow: [search]
ingress:
gatewayHosts:
default: "airflow-search"
extraFQDNs:
- airflow-search.wikimedia.org
kerberos:
enabled: false
scheduler:
remote_host: an-airflow-1xxx.eqiad.wmnet # use the appropriate hostname from https://wikitech.wikimedia.org/wiki/Data_Platform/Systems/Airflow/Instances
enabled: false
postgresql:
cloudnative: false
Deploy airflow with helmfile.
Setup the ATS direction
Take example from that patch to setup the appropriate redirection, to make the webUI visible to the public. Once merged, it should take about 30 minutes to fully take effect.
Migrate the scheduler and kerberos components to Kubernetes
- Create the kerberos principals and the base64 representation of the instance keytab
# Change `analytics` to the UNIX user the airflow tasks will impersonate by default in Hadoop
brouberol@krb1001:~$ sudo kadmin.local addprinc -randkey analytics-search/airflow-search.discovery.wmnet@WIKIMEDIA
brouberol@krb1001:~$ sudo kadmin.local addprinc -randkey airflow/airflow-search.discovery.wmnet@WIKIMEDIA
brouberol@krb1001:~$ sudo kadmin.local addprinc -randkey HTTP/airflow-search.discovery.wmnet@WIKIMEDIA
brouberol@krb1001:~$ sudo kadmin.local ktadd -norandkey -k analytics-search.keytab \
analytics-search/airflow-search.discovery.wmnet \
airflow/airflow-search.discovery.wmnet@WIKIMEDIA \
HTTP/airflow-search.discovery.wmnet@WIKIMEDIA
- Copy the base64 representation of the generated keytab
- Create the S3 user
brouberol@cephosd1001:~$ sudo radosgw-admin user create --uid=airflow-search --display-name="airflow-search" # copy the access_key and secret_key
- Add the following values to the values block in
/srv/git/private/hieradata/role/common/deployment_server/kubernetes.yaml
, onpuppetserver
.dse-k8s: # ... airflow-search: dse-k8s-eqiad: config: private: airflow__webserver__secret_key: <secret key> airflow: postgresqlPass: <PG password> aws_access_key_id: <S3 access key> # add this! aws_secret_access_key: <S3 secret key> # add this! oidc: client_secret: <OIDC secret key> kerberos: keytab: | <base64 representation of keytab> # add this!
- Create the S3 bucket
brouberol@stat1008:~$ read access_key <S3 access_key> brouberol@stat1008:~$ read secret_key <S3 secret key> brouberol@stat1008:~$ s3cmd --access_key=$access_key --secret_key=$secret_key --host=rgw.eqiad.dpe.anycast.wmnet --region=dpe --host-bucket=no mb s3://logs.airflow-search.dse-k8s-eqiad
- Sync all the scheduler and DAG task logs to S3
brouberol@an-airflow1005:~$ tmux # in tmux brouberol@an-airflow1005:~$ sudo apt-get install s3cmd brouberol@an-airflow1005:~$ read access_key <S3 access_key> brouberol@an-airflow1005:~$ read secret_key <S3 secret_key> brouberol@an-airflow1005:~$ cd /srv/airflow-search/logs brouberol@an-airflow1005:/srv/airflow-search/logs$ s3cmd \ --access_key=$access_key \ --secret_key=$secret_key \ --host=rgw.eqiad.dpe.anycast.wmnet \ --region=dpe \ --host-bucket=no \ sync -r ./* s3://logs.airflow-search.dse-k8s-eqiad/ ... # this will take a long time. Feel free to detach the tmux session
- Once the logs are synchronized, stop all the airflow systemd services and sync the logs again, to account for the dags that might have run during the first sync. Make an announcement of Slack and IRC as this will prevent any DAG to run for a time.
brouberol@an-airflow1005:~$ sudo puppet agent --disable "airflow scheduler migration to Kubernetes" brouberol@an-airflow1005:~$ sudo systemctl stop airflow-{webserver,kerberos,scheduler}@*.service brouberol@an-airflow1005:~$ cd /srv/airflow-search/logs brouberol@an-airflow1005:/srv/airflow-search/logs$ s3cmd \ --access_key=$access_key \ --secret_key=$secret_key \ --host=rgw.eqiad.dpe.anycast.wmnet \ --region=dpe \ --host-bucket=no \ sync -r ./* s3://logs.airflow-search.dse-k8s-eqiad/ ... # this time, this should be fairly short
- Deploy the following
helmfile.d/dse-k8s-services/airlfow-search/values-production.yaml
configuration:config: airflow: dags_folder: search instance_name: search dbHost: an-db1001.eqiad.wmnet dbName: airflow_search dbUser: airflow_search auth: role_mappings: airflow-analytics-search-ops: [Op] config: core: executor: KubernetesExecutor kerberos: principal: analytics-search/airflow-search.discovery.wmnet oidc: client_id: airflow_search external_services: postgresql: [analytics] ingress: gatewayHosts: default: "airflow-search" extraFQDNs: - airflow-search.wikimedia.org postgresql: cloudnative: false
- Deploy with
helmfile
. You should see theairflow-kerberos
andairflow-scheduler
pods appear. Once the deployment goes through, connect to https://airflow-search.wikimedia.org and execute DAGs, to make sure they run correctly. If they do not, well, the fun begins. There's no real playbook there, as there's no way to tell what will err in advance. Whether a network policy might be missing, or a patch might need to be submitted toairflow-dags
.. Roll the dice! - Once everything is working, submit a puppet patch that comments (or removes) the instance
profile::airflow::instances
hiera data.
Deploy a CloudnativePG cluster in the name Kubernetes namespace than Airlow and import the data
- Create the
postgresql-airflow-search
S3 user. Copy the access and secret keys from the output.brouberol@cephosd1001:~$ sudo radosgw-admin user create --uid=postgresql-airflow-search --display-name="postgresql-airflow-search"
- Create the S3 bucket in which the PG data will be stored.
brouberol@stat1008:~$ read access_key REDACTED brouberol@stat1008:~$ read secret_key REDACTED brouberol@stat1008:~$ s3cmd --access_key=$access_key --secret_key=$secret_key --host=rgw.eqiad.dpe.anycast.wmnet --region=dpe --host-bucket=no mb s3://postgresql-airflow-search.dse-k8s-eqiad Bucket 's3://postgresql-airflow-search.dse-k8s-eqiad/' created
- Add the S3 keys to the private secret repository, into
hieradata/role/common/deployment_server/kubernetes.yaml
... postgresql-airflow-search: dse-k8s-eqiad: s3: accessKey: <S3 access key> secretKey: <S3 secret key> cluster: initdb: import: password: <PG password> ...
- Add the
airflow-search
namespace to the list of cloudnative-pg tenant namespaces, inhelmfile.d/admin_ng/values/dse-k8s-eqiad/cloudnative-pg-values.yaml
. Deployadmin_ng
. - Uncomment the sections related to postgresql/cloudnative-pg from the airflow app helmfile, and add a
values-postgresql-airflow-search.yaml
containing the following datacluster: initdb: import: host: an-db1001.eqiad.wmnet user: airflow_search dbname: airflow_search external_services: postgresql: [analytics]
- Before deploying, edit the airflow webserver and scheduler deployment to 0 replicas. This will induce a downtime for Airflow, so make sure to reach out to the team beforehand.
- Deploy by running
helmfile -e dse-k8s-eqiad --selector 'name=postgresql-airflow-search' apply
to only deploy the cloudnative PG pods. - Remove all PG-related secrets from the private puppet repository.
... postgresql-airflow-search: dse-k8s-eqiad: s3: accessKey: REDACTED secretKey: REDACTED - cluster: - initdb: - import: - password: REDACTED airflow-search: dse-k8s-eqiad: config: private: airflow__webserver__secret_key: REDACTED airflow: - postgresqlPass: REDACTED aws_access_key_id: REDACTED aws_secret_access_key: REDACTED ...
- Once all pods are healthy, empty the
values-postgresql-airflow-search.yaml
file of all values, leaving it completely empty. Redeploy withhelmfile -e dse-k8s-eqiad --selector 'name=postgresql-airflow-search' apply
, which shouldn't restart any pod. - Apply this diff to the
values-production.yaml
fileconfig: airflow: dags_folder: search instance_name: search - dbHost: an-db1001.eqiad.wmnet - dbName: airflow_search - dbUser: airflow_search auth: role_mappings: airflow-analytics-search-ops: [Op] @@ -16,18 +13,12 @@ config: oidc: client_id: airflow_search -external_services: - postgresql: [analytics] - ingress: gatewayHosts: default: "airflow-search" extraFQDNs: - airflow-search.wikimedia.org -postgresql: - cloudnative: false
- Run
helmfile -e dse-k8s-eqiad --selector 'name=production' apply
to redeploy airflow, that will now connect to the PGBouncer pods of the cloudnative PG cluster.
Configuring out-of-band backups
The PostgreSQL database cluster for this instance will already be configured with its own backup system that writes database backups and WAL archives to the S3 interface of the Ceph cluster.
However, we decided to implement out-of-band backups of each of the S3 buckets containing these database backups, so we added a new backup pipeline to our database backup replica system, which is db1208.
In this case the file you need to modify when you add a new instance is in the private repo and is named: hieradata/role/common/mariadb/misc/analytics/backup.yaml
Add your new bucket and its access credentials to the profile::ceph::backup::s3_local::sources
hash structure, as shown.
profile::ceph::backup::s3_local::sources:
postgresql.airflow-test-k8s.dse-k8s-eqiad:
access_key: <Airflow S3 access key>
secret_key: <Airflow S3 secret key>
When merged, this will update the file /srv/postgresql_backups/rclone.conf
on db1208, adding the backups of this database cluster to the daily sync process and therefore to Bacula.
Upgrading Airflow
To upgrade Airflow, we first need to rebuild a new docker image installing on a more recent apache-airflow
package version (example). Once the patch is merged, a publish:airflow job will be kicked off for each airflow image.
Then, use the CLI described here to automatically get the docker image tag the newly published airflow image from the Gitlab jobs (or copy it manually from the Gitlab build job logs).
Now, deploy the new image to the airflow-test-k8s instance, by changing the app.version
field in deployment_charts/helmfile.d/dse-k8s-services/airflow-test-k8s/values-production.yaml
, and redeploy the test instance. Any outstanding DB migrations will automatically be applied. If everything goes well, bump the airflow version under deployment_charts/helmfile.d/dse-k8s-services/_airflow_common_/values-dse-k8s-eqiad.yaml
, and redeploy every instance, one after the other.