Event Platform/Stream Processing/Flink

From Wikitech

This page describes how we deploy Apache Flink stream processing applications in WMF Kubernetes clusters. This page assumes that you have a basic understanding of how Flink schedules jobs and what JobManagers and TaskManagers are. It also assumes you have a basic understanding of Kubernetes and Kubernetes Resources.

Flink job lifecycle

A Flink streaming job is generally meant to run indefinitely but it sometimes have to be stopped for various maintenance operations:

  • job upgrades
  • Flink version upgrades
  • maintenance on its state (think of ALTER TABLE for SQL RDBMS)

A stateful Flink job has to remember the location of the state it has written to its object storage (at WMF we use OpenStack Swift as of 2023-05) so that when resuming operations after a failure/restart it can continue where it left off.

There are two kind of state backup formats in Flink:

  • checkpoints: cheap, possibly incremental, generally made at regular intervals (every 30secs for the wdqs streaming updater)
  • savepoints: self-contained, triggered on-demand

In short, checkpoints are designed to allow the pipeline to recover from failures, savepoints are made to allow anything else (upgrades, fork the stream, state inspection/manipulation...).

Flink application deployment

Native Kubernetes Application Deployment Mode

As of 2023-05, WMF supports running Flink in Kubernetes via Native Kubernetes in Application deployment mode using the flink-kubernetes-operator.

  • flink-kubernetes-operator is running and deployed in target k8s cluster
  • A custom flink-app Helm chart is used to declare and deploy FlinkDeployments in application mode.
  • flink-kubernetes-operator + JobManagers interact with kubernetes to create needed resources (deployments, pods, etc.)

We maintain our own flink-kubernetes-operator and Flink docker images, and Helm charts. Flink application docker images should be built based on our Flink docker image, and should be deployed using the flink-app chart.

See the mw-page-content-change-enrich helmfile and docs for an example.


All other modes of Flink deployment are deprecated.

Periodic savepoints

flink-kubernetes-operator can be configure to make JobManagers take periodic savepoints. Your job should configure this.

Recovery from savepoint

flink-kubernetes-operator supports 3 different upgrade modes . The upgradeMode chosen will tell flink-kubernetes-operator how to reconcile Flink k8s resources, and how to handle application restarts.

If running with JobManager HA, you can use upgradeMode: last-state. This allows JobManagers to restart from the latest checkpoints, and should result in faster restarts/recoveries than savepoint.

upgradeMode: savepoint can be used even if not using JobManager HA. A savepoint will be taken whenever application is restarted normally by flink-kubernetes-operator. If the JobManager is offline, it will not be possible to take an automatic savepoint before stopping the running application.

JobManager High Availability

Using Flink JobManager HA is mandatory as we must ensure that the job is restarted properly if the JobManager dies unexpectedly. This means that maintenance operation performed on the k8s cluster don't require a manual intervention on the Flink jobs themselves as long as the Flink HA state are kept during the operation.

When a Flink job starts it can be started from an existing savepoint/checkpoint.

When operating, Flink can use its own retry mechanism. This is configurable, and permits a Flink job to die and restart a certain number of times. In these failure modes when the JobManager remains alive it is its responsibility to track in memory the path of the job's latest successful checkpoint.

But when the JobManager itself dies and restarts it needs a way to retrieve the pointer to the latest successful checkpoint.

When running Flink in Kubernetes, this can be done in one of two ways: k8s ConfigMaps or Zookeeper. In either case, "HA state", a path to the latest successful checkpoint, will be stored. This HA state will survive JobManager death, and can be used to automatically restart a full Flink job. Having this small HA state also allows for running standby JobManagers in High Availability Mode.

When starting in HA Flink has a choice to make:

  • should it start from the initialSavepointPath ?
  • should it start from the HA state (either in Zookeeper or k8s ConfigMap)?

Flink will simply take the most recent of the two.

If using k8s ConfigMaps to store HA state

A Flink job will be able to restart itself from where it left off without manual intervention when:

  • the k8s cluster is restarted
  • the JobManager pod is lost and automatically restarted by k8s
  • one or all TaskManager pods are lost and automatically restarted by k8s
No manual intervention from serviceops is required to make a flink job survive the loss of one or more of its k8s pods as long as these pods are restarted at some point and that the configmaps are retained.

A Flink job will be able to restart itself from where it left off with manual intervention when:

  • k8s cluster is restarted and ConfigMaps changes would be lost.

In these case it will be required to save & restore flink-configmaps before restarting flink applications.

The following services are impacted:

Some intervention might be required to depool dependent services during the operation, for WDQS it means routing all the external traffic to one cluster (moving all traffic to eqiad if k8s@codfw is downtimed for a long period).

This implies that whatever solutions is chosen the Flink cluster must have the rights to create/edit/watch/get/list a set of ConfigMaps in its k8s namespace.

If using Zookeeper to store HA state

TODO - https://phabricator.wikimedia.org/T331283

Deprecated deployment modes

Flink session cluster

A session cluster is an empty box to which one can schedule jobs, the Kubernetes responsibility here is only to make sure that the Flink components are alive and to restart them when a problem occurs.

Responsibility of the job lifecycle is left to job owners that will have to write custom deploy scripts to submit, stop, upgrade their jobs.

With the session cluster the set of H/A configmaps created is not known in advance and the corresponding service account must be granted full read/write access to all configmaps without resourceNames restrictions.

Because the same cluster is reused for multiple jobs its related resources (mainly H/A configmaps) will remain and will have to be cleaned up manually (needs kubectl delete configmap perms).

Pods will run code that is not part of the container image because deployed outside the classic helm apply lifecycle. The job jar is stored in the object storage (swift).

Job upgrades are done via the REST api using custom scripts.

Flink-cluster upgrades are strongly coupled with the jobs it's running, those jobs have to be stopped before upgrading the session-cluster.

Jobs are not isolated from each others and it might make sense to run multiple session-clusters.

Upgrading to a new version of the flink Job:

  1. the job owner uses the flink's REST api to stop and save the job
  2. the job owner uses the flink's REST api to submit the updated jar and start the job with the savepoint taken

Upgrading to a new version of Flink (requires coordination between job owner and k8s deployer):

  1. the job owner uses the flink's REST api to stop and save the job
  2. the helm chart of the of session-cluster is updated to use a new flink image and applied
  3. the job owner uses the flink's REST api to submit the updated jar and start the job with the savepoint taken

Flink application cluster

An application cluster embeds the job jar and its purpose is to run a single job, the lifecycle of the flink job is now tightly coupled with the lifecycle of the k8s resource.

Jobmanager as a k8s Deployment resource

K8s Deployment will always restart their failed pods to match the expected number of replicas. It becomes problematic when the job owner wants to stop the job. We want the jobmanager pod to end but k8s will want to restart it using the same the startup script (which we know is problematic as it will start from an old savepoint).

A hack is necessary to prevent the pod from restarting: a flag do-not-restart flag can be shared via a configmap that the startup script will inspect and die if set to true.

Upgrading to a new version of the flink Job:

  1. update the configmap to set the do-not-restart flag to true (via patch+helm apply or via kubectl edit configmap if allowed)
  2. stop the job with a savepoint using the flink REST API
  3. the jobmanager will want to restart but will enter the CrashLoopBackoff state because of the do-not-restart flag
    1. possibly scale down the jobmanager deployment to 0 so that it stops complaining (via patch+helm apply or via kubectl scale deploy if allowed)
  4. update the chart: container image, update the savepoint with the new one taken, set the do-not-restart flag to false (reset replicas to 1)
  5. apply the chart

(Upgrading to a new version of Flink is the same)

Jobmanager as a k8s Job resource

As suggested in the flink documentation the jobmanager can be a k8s Job. Since k8s Jobs are by design meant to end, k8s will not automatically restart the pod if it ends successfully (completions=1, parallelism=1). Using OnFailure restart strategy it should restart the jobmanager only when an error occurs this is exactly what is needed here: do not retry if I tell the job to stop cleanly. Unfortunately k8s Job resources have other drawbacks when it comes to deployments with helm : these resources are immutables, they must be deleted before applying the changes via helm.

Upgrading to a new version of the flink Job:

  1. stop the job with a savepoint
  2. the pods should end with a success and will mark the k8s Job as "completed"
  3. delete the k8s Job (deployer needs kubectl delete job perms)
  4. update the chart: container image, update the savepoint with the new one taken
  5. apply the chart

(Upgrading to a new version of Flink is the same)

It is not battle-tested and possibly dangerous if the job properly stops with a savepoint (emptying the H/A configmap) but still fails for some reasons afterwards returning non-zero status code causing the pod to restart. Introducing the do-not-restart flag hack might help to better guarantee that the Job's pod does not restart unexpectedly if the H/A has been emptied.