Jump to content

Data Platform/Systems/Airflow/Kubernetes/Operations

From Wikitech

Creating a new instance

In the following section, we're going to assume that we're creating a new airflow instance named `airflow-test-k8s`, deployed with a dedicated PG cluster named `postgresql-airflow-test-k8s`, deployed in the `dse-k8s-eqiad` Kubernetes environment.
  • The first thing you need to do is create Kubernetes read and deploy user credentials
  • Add a namespace (using the same name as the airflow instance) entry into deployment_charts/helmfile.d/admin_ng/values/dse-k8s.yaml
    namespaces:
      # ...
      airflow-test-k8s:
        deployClusterRole: deploy-airflow
        tlsExtraSANs:
          - airflow-test-k8s.wikimedia.org
    
  • Add the namespace under the tenantNamespaces list in deployment_charts/helmfile.d/admin_ng/values/dse-k8s-eqiad/cephfs-csi-rbd-values.yaml as well as deployment_charts/helmfile.d/admin_ng/values/dse-k8s-eqiad/cephfs-csi-rbd-values.yaml
  • Add the namespace to the watchedNamespaces list defined in deployment_charts/helmfile.d/admin_ng/values/dse-k8s-eqiad/cloudnative-pg.yaml
  • Then, create the public and internal DNS records for this instance
  • Define the airflow instance helmfile.yaml file and associated values (take example from deployment_charts/helmfile.d/dse-k8s-services/airflow-test-k8s)
  • Generate the S3 keypairs for both PG and Airflow
    brouberol@cephosd1001:~$ sudo radosgw-admin user create --uid=postgresql-airflow-test-k8s --display-name="postgresql-airflow-test-k8s"
    brouberol@cephosd1001:~$ sudo radosgw-admin user create --uid=airflow-test-k8s --display-name="airflow-test-k8s"
    # note: copy the `access_key` and `secret_key` from the JSON output, you will need it in the next step
    
  • Create the S3 buckets for both PG and Airflow
  • Register the service in our IDP server (into idp.yaml). After the patch was merged and puppet ran on the idp servers, copy the OIDC secret key generated for the airflow service.
    root@idp1004:# cat /etc/cas/services/airflow_test_k8s-*.json  | jq -r .clientSecret
    <OIDC secret key>
    
  • Issue a Kerberos keytab following the guide provided on Data Platform/Systems/Kerberos/Administration. The principal must match the username of the runs.as user defined in the image blubberfile. The hostname must match the internal service discovery domain of the airflow instance.
    # Change `analytics` to the UNIX user the airflow tasks will impersonate by default in Hadoop
    brouberol@krb1001:~$ sudo kadmin.local addprinc -randkey analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA
    brouberol@krb1001:~$ sudo kadmin.local addprinc -randkey airflow/airflow-test-k8s.discovery.wmnet@WIKIMEDIA
    brouberol@krb1001:~$ sudo kadmin.local addprinc -randkey HTTP/airflow-test-k8s.discovery.wmnet@WIKIMEDIA
    brouberol@krb1001:~$ sudo kadmin.local ktadd -norandkey -k analytics.keytab \
        analytics/airflow-test-k8s.discovery.wmnet \
        airflow/airflow-test-k8s.discovery.wmnet@WIKIMEDIA \
        HTTP/airflow-test-k8s.discovery.wmnet@WIKIMEDIA
    

Created the base64 representation for the keytab

root@krb1001:~# base64 analytics.keytab
  • Generate the secrets or both the PG cluster and the Airflow instance and add the to the private puppet repository, to /srv/git/private/hieradata/role/common/deployment_server/kubernetes.yaml
    dse-k8s:
      # ...
      postgresql-airflow-test-k8s:
        dse-k8s-eqiad:
          s3:
            accessKey: <PG S3 access key>
            secretKey: <PG S3 secret key>
    
      airflow-test-k8s:
        dse-k8s-eqiad:
          config:
            private:
              airflow__core__fernet_key: <random 64 characters>
              airflow__webserver__secret_key: <random 64 characters>
            airflow:
              aws_access_key_id: <Airflow S3 access key>
              aws_secret_access_key: <Airflow S3 secret key>
            oidc:
              client_secret: <OIDC secret key>
          kerberos:
            keytab: | # The base64 representation obtained in the previous step
              ABCDEFHIJKLMNOP
              ABCDEFHIJKLMNOP=
    
  • Register the PG bucket name and keys into /srv/git/private/hieradata/role/common/mariadb/misc/analytics/backup.yaml on the puppetserver host. This will make sure that the PG base backups and WALs are regularly backed up outside of our Ceph cluster.
    profile::ceph::backup::s3_local::sources:
      ...
      postgresql-airflow-test-k8s.dse-k8s-eqiad: # must match the PG bucket name 
        access_key: <PG S3 access key>
        secret_key: <PG S3 secret key>
    
  • Deploy the service (which should deploy both the PG cluster and the airflow instance)
  • Once the instance is running, enable the ATS redirection from the wikimedia.org subdomain to the kube ingress. After puppet has run on all the cache servers (wait a good 30 minutes), https://airflow-test-k8s.wikimedia.org should display the airflow web UI, and you should be able to connect via CAS.

Migrating an existing instance

This section addresses how to perform a piecemeal migration of the airflow instances listed in Data Platform/Systems/Airflow/Instances to Kubernetes. The result of this migration is an airflow instance (webserver, scheduler and kerberos) all running in Kubernetes, alongside with the database itself, without any data loss.

The migration is done in 4 steps:

  • Migrate the webserver to Kubernetes
  • Migrate the scheduler and kerberos components to Kubernetes
  • Deploy a CloudnativePG cluster in the name Kubernetes namespace than Airlow and import the data
  • Cleanup

At the time of the writing, we have already migrated airflow-analytics-test, and we'll assume that this documentation covers the case of the airflow-search instance.

At the time of the writing, we have already migrated airflow-analytics-test, and we'll assume that this documentation covers the case of the airflow-search instance.

Migrating the webserver to Kubernetes

To only deploy the webserver to Kubernetes, we need to deploy airflow in a way that makes sure it talks to an external database, and opts out of the scheduler and kerberos components.

Prep work
  • The first thing you need to do is create Kubernetes read and deploy user credentials
  • Add a namespace (using the same name as the airflow instance) entry into deployment_charts/helmfile.d/admin_ng/values/dse-k8s.yaml
    namespaces:
      # ...
      airflow-search:
        deployClusterRole: deploy-airflow
        tlsExtraSANs:
          - airflow-search.wikimedia.org
    
  • Add the airflow-search namespace under the tenantNamespaces list in deployment_charts/helmfile.d/admin_ng/values/dse-k8s-eqiad/cephfs-csi-rbd-values.yaml as well as deployment_charts/helmfile.d/admin_ng/values/dse-k8s-eqiad/cephfs-csi-rbd-values.yaml
  • Add the airflow-search namespace to the watchedNamespaces list defined in deployment_charts/helmfile.d/admin_ng/values/dse-k8s-eqiad/cloudnative-pg.yaml
  • Deploy admin_ng
  • Create the public and internal DNS records for this instance
  • Create the airflow-search-ops LDAP group
  • Register the service in our IDP server (into idp.yaml). After the patch was merged and puppet ran on the idp servers, copy the OIDC secret key generated for the airflow service.
    root@idp1004:# cat /etc/cas/services/airflow_search-*.json  | jq -r .clientSecret
    <OIDC secret key>
    
Defining a secret key shared between the scheduler and the webserver

To be able to have the webserver running on Kubernetes fetch tasks logs from the scheduler running on an Airflow host, they need to share the same secret key. This means that we need to commit this secret key in a location that will be taken into account by Puppet, as well as another that will be taken into account by our Helm tooling.

First, generate a random string (64 characters long is good). Then commit that string in the 2 following locations, on puppetserver:

# /srv/git/private/hieradata/role/common/analytics_cluster/airflow/search.yaml
# warn: adapt the file path for each airflow instance
profile::airflow::instances_secrets:
  search:
    ...
    secret_key: <secret key>

Run puppet on the airflow instance, and make sure each airflow service is restarted.

Keep the secret key handy, it will be used in the next section. Copy the db_password value as well, you will need it for the next step.

Defining the webserver configuration

Add the following block in /srv/git/private/hieradata/role/common/deployment_server/kubernetes.yaml, on puppetserver.

dse-k8s:
  # ...
  airflow-search:
    dse-k8s-eqiad:
      config:
        private:
          airflow__webserver__secret_key: <secret key>
        airflow:
          postgresqlPass: <PG password from the previous section>
        oidc:
          client_secret: <OIDC secret key>
      kerberos:
        keytab: |
          <base64 representation of keytab>

Then, create a new airflow-search folder in deployment-charts/helmfile.d/dse-k8s-services (feel free to copy from deployment-charts/helmfile.d/dse-k8s-services/airflow-analytics-test ). Your values-production.yaml file should look like this:

config:
  airflow:
    dags_folder: search
    instance_name: search
    dbHost: an-db1001.eqiad.wmnet
    dbName: airflow_search
    dbUser: airflow_search
    auth:
      role_mappings:
        airflow-search-ops: [Op]
    config:
      logging:
        remote_logging: false
  oidc:
    client_id: airflow_search

external_services:
  postgresql: [analytics]
  airflow: [search]

ingress:
  gatewayHosts:
    default: "airflow-search"
    extraFQDNs:
    - airflow-search.wikimedia.org

kerberos:
  enabled: false

scheduler:
  remote_host: an-airflow-1xxx.eqiad.wmnet  # use the appropriate hostname from https://wikitech.wikimedia.org/wiki/Data_Platform/Systems/Airflow/Instances
  enabled: false

postgresql:
  cloudnative: false

Deploy airflow with helmfile.

Setup the ATS direction

Take example from that patch to setup the appropriate redirection, to make the webUI visible to the public. Once merged, it should take about 30 minutes to fully take effect.

Migrate the scheduler and kerberos components to Kubernetes

  • Create the kerberos principals and the base64 representation of the instance keytab
# Change `analytics` to the UNIX user the airflow tasks will impersonate by default in Hadoop
brouberol@krb1001:~$ sudo kadmin.local addprinc -randkey analytics-search/airflow-search.discovery.wmnet@WIKIMEDIA
brouberol@krb1001:~$ sudo kadmin.local addprinc -randkey airflow/airflow-search.discovery.wmnet@WIKIMEDIA
brouberol@krb1001:~$ sudo kadmin.local addprinc -randkey HTTP/airflow-search.discovery.wmnet@WIKIMEDIA
brouberol@krb1001:~$ sudo kadmin.local ktadd -norandkey -k analytics-search.keytab \
    analytics-search/airflow-search.discovery.wmnet \
    airflow/airflow-search.discovery.wmnet@WIKIMEDIA \
    HTTP/airflow-search.discovery.wmnet@WIKIMEDIA
  • Copy the base64 representation of the generated keytab
  • Create the S3 user
    brouberol@cephosd1001:~$ sudo radosgw-admin user create --uid=airflow-search --display-name="airflow-search"
    # copy the access_key and secret_key
    
  • Add the following values to the values block in /srv/git/private/hieradata/role/common/deployment_server/kubernetes.yaml, on puppetserver.
    dse-k8s:
      # ...
      airflow-search:
        dse-k8s-eqiad:
          config:
            private:
              airflow__webserver__secret_key: <secret key>
            airflow:
              postgresqlPass: <PG password>
              aws_access_key_id: <S3 access key> # add this!
              aws_secret_access_key: <S3 secret key> # add this!
            oidc:
              client_secret: <OIDC secret key>
          kerberos:
            keytab: |
              <base64 representation of keytab> #  add this!
    
  • Create the S3 bucket
    brouberol@stat1008:~$ read access_key
    <S3 access_key>
    brouberol@stat1008:~$ read secret_key
    <S3 secret key>
    brouberol@stat1008:~$ s3cmd --access_key=$access_key --secret_key=$secret_key --host=rgw.eqiad.dpe.anycast.wmnet --region=dpe --host-bucket=no mb s3://logs.airflow-search.dse-k8s-eqiad
    
  • Sync all the scheduler and DAG task logs to S3
    brouberol@an-airflow1005:~$ tmux
    # in tmux
    brouberol@an-airflow1005:~$ sudo apt-get install s3cmd
    brouberol@an-airflow1005:~$ read access_key
    <S3 access_key>
    brouberol@an-airflow1005:~$ read secret_key
    <S3 secret_key>
    brouberol@an-airflow1005:~$ cd /srv/airflow-search/logs
    brouberol@an-airflow1005:/srv/airflow-search/logs$ s3cmd \
        --access_key=$access_key \
        --secret_key=$secret_key \
        --host=rgw.eqiad.dpe.anycast.wmnet \
        --region=dpe \
        --host-bucket=no \
        sync -r ./* s3://logs.airflow-search.dse-k8s-eqiad/
    ... # this will take a long time. Feel free to detach the tmux session
    
  • Once the logs are synchronized, stop all the airflow systemd services and sync the logs again, to account for the dags that might have run during the first sync. Make an announcement of Slack and IRC as this will prevent any DAG to run for a time.
    brouberol@an-airflow1005:~$ sudo puppet agent --disable "airflow scheduler migration to Kubernetes"
    brouberol@an-airflow1005:~$ sudo systemctl stop airflow-{webserver,kerberos,scheduler}@*.service
    brouberol@an-airflow1005:~$ cd /srv/airflow-search/logs
    brouberol@an-airflow1005:/srv/airflow-search/logs$ s3cmd \
        --access_key=$access_key \
        --secret_key=$secret_key \
        --host=rgw.eqiad.dpe.anycast.wmnet \
        --region=dpe \
        --host-bucket=no \
        sync -r ./* s3://logs.airflow-search.dse-k8s-eqiad/
    ... # this time, this should be fairly short
    
  • Deploy the following helmfile.d/dse-k8s-services/airlfow-search/values-production.yaml configuration:
    config:
      airflow:
        dags_folder: search
        instance_name: search
        dbHost: an-db1001.eqiad.wmnet
        dbName: airflow_search
        dbUser: airflow_search
        auth:
          role_mappings:
            airflow-analytics-search-ops: [Op]
        config:
          core:
            executor: KubernetesExecutor
          kerberos:
            principal: analytics-search/airflow-search.discovery.wmnet
      oidc:
        client_id: airflow_search
    
    external_services:
      postgresql: [analytics]
    
    ingress:
      gatewayHosts:
        default: "airflow-search"
        extraFQDNs:
        - airflow-search.wikimedia.org
    
    postgresql:
      cloudnative: false
    
  • Deploy with helmfile. You should see the airflow-kerberos and airflow-scheduler pods appear. Once the deployment goes through, connect to https://airflow-search.wikimedia.org and execute DAGs, to make sure they run correctly. If they do not, well, the fun begins. There's no real playbook there, as there's no way to tell what will err in advance. Whether a network policy might be missing, or a patch might need to be submitted to airflow-dags.. Roll the dice!
  • Once everything is working, submit a puppet patch that comments (or removes) the instance profile::airflow::instances hiera data.

Deploy a CloudnativePG cluster in the name Kubernetes namespace than Airlow and import the data

Same as the previous section, this migration instruction guide will assume that we're migrating the airflow-search instance. Replace names accordingly.
  • Create the postgresql-airflow-search S3 user. Copy the access and secret keys from the output.
    brouberol@cephosd1001:~$ sudo radosgw-admin user create --uid=postgresql-airflow-search --display-name="postgresql-airflow-search"
    
  • Create the S3 bucket in which the PG data will be stored.
    brouberol@stat1008:~$ read access_key
    REDACTED
    brouberol@stat1008:~$ read secret_key
    REDACTED
    brouberol@stat1008:~$ s3cmd --access_key=$access_key --secret_key=$secret_key --host=rgw.eqiad.dpe.anycast.wmnet --region=dpe --host-bucket=no mb s3://postgresql-airflow-search.dse-k8s-eqiad
    Bucket 's3://postgresql-airflow-search.dse-k8s-eqiad/' created
    
  • Add the S3 keys to the private secret repository, into hieradata/role/common/deployment_server/kubernetes.yaml
    ...
        postgresql-airflow-search:
          dse-k8s-eqiad:
            s3:
              accessKey: <S3 access key>
              secretKey: <S3 secret key>
            cluster:
              initdb:
                import:
                  password: <PG password>
    ...
    
  • Add the airflow-search namespace to the list of cloudnative-pg tenant namespaces, in helmfile.d/admin_ng/values/dse-k8s-eqiad/cloudnative-pg-values.yaml. Deploy admin_ng.
  • Uncomment the sections related to postgresql/cloudnative-pg from the airflow app helmfile, and add a values-postgresql-airflow-search.yaml containing the following data
    cluster:
      initdb:
        import:
          host: an-db1001.eqiad.wmnet
          user: airflow_search
          dbname: airflow_search
          
    external_services:
      postgresql: [analytics]
    
  • Before deploying, edit the airflow webserver and scheduler deployment to 0 replicas. This will induce a downtime for Airflow, so make sure to reach out to the team beforehand.
  • Deploy by running helmfile -e dse-k8s-eqiad --selector 'name=postgresql-airflow-search' apply to only deploy the cloudnative PG pods.
  • Remove all PG-related secrets from the private puppet repository.
    ...
        postgresql-airflow-search:
          dse-k8s-eqiad:
            s3:
              accessKey: REDACTED
              secretKey: REDACTED
    -       cluster:
    -         initdb:
    -           import:
    -             password: REDACTED
    
        airflow-search:
          dse-k8s-eqiad:
            config:
              private:
                airflow__webserver__secret_key: REDACTED
              airflow:
    -           postgresqlPass: REDACTED
                aws_access_key_id: REDACTED
                aws_secret_access_key: REDACTED
    ...
    
  • Once all pods are healthy, empty the values-postgresql-airflow-search.yaml file of all values, leaving it completely empty. Redeploy with helmfile -e dse-k8s-eqiad --selector 'name=postgresql-airflow-search' apply , which shouldn't restart any pod.
  • Apply this diff to the values-production.yaml file
     config:
       airflow:
         dags_folder: search
         instance_name: search
    -    dbHost: an-db1001.eqiad.wmnet
    -    dbName: airflow_search
    -    dbUser: airflow_search
         auth:
           role_mappings:
             airflow-analytics-search-ops: [Op]
    @@ -16,18 +13,12 @@ config:
       oidc:
         client_id: airflow_search
    
    -external_services:
    -  postgresql: [analytics]
    -
     ingress:
       gatewayHosts:
         default: "airflow-search"
         extraFQDNs:
         - airflow-search.wikimedia.org
    
    -postgresql:
    -  cloudnative: false
    
  • Run helmfile -e dse-k8s-eqiad --selector 'name=production' apply to redeploy airflow, that will now connect to the PGBouncer pods of the cloudnative PG cluster.

Configuring out-of-band backups

The PostgreSQL database cluster for this instance will already be configured with its own backup system that writes database backups and WAL archives to the S3 interface of the Ceph cluster.

However, we decided to implement out-of-band backups of each of the S3 buckets containing these database backups, so we added a new backup pipeline to our database backup replica system, which is db1208.

In this case the file you need to modify when you add a new instance is in the private repo and is named: hieradata/role/common/mariadb/misc/analytics/backup.yaml

Add your new bucket and its access credentials to the profile::ceph::backup::s3_local::sources hash structure, as shown.

profile::ceph::backup::s3_local::sources:
  postgresql.airflow-test-k8s.dse-k8s-eqiad:
    access_key: <Airflow S3 access key>
    secret_key: <Airflow S3 secret key>

When merged, this will update the file /srv/postgresql_backups/rclone.conf on db1208, adding the backups of this database cluster to the daily sync process and therefore to Bacula.

Upgrading Airflow

To upgrade Airflow, we first need to rebuild a new docker image installing on a more recent apache-airflow package version (example). Once the patch is merged, a publish:airflow job will be kicked off for each airflow image.

Then, use the CLI described here to automatically get the docker image tag the newly published airflow image from the Gitlab jobs (or copy it manually from the Gitlab build job logs).

Now, deploy the new image to the airflow-test-k8s instance, by changing the app.version field in deployment_charts/helmfile.d/dse-k8s-services/airflow-test-k8s/values-production.yaml, and redeploy the test instance. Any outstanding DB migrations will automatically be applied. If everything goes well, bump the airflow version under deployment_charts/helmfile.d/dse-k8s-services/_airflow_common_/values-dse-k8s-eqiad.yaml, and redeploy every instance, one after the other.