Kafka Job Queue

From Wikitech

Job queue based on Apache Kafka is a new backend transport and scheduler implementation for the MediaWiki JobQueue based on the EventBus extension and WMF's Event Platform, the Change-Propagation service and Apache Kafka.

Architecture overview

Architecture diagram
MediaWiki infrastructure, featuring "JobQueue job" flow

The MediaWiki (GitHub) EventBus extension (GitHub) serializes instances of JobSpecification interface in JSON format conforming to the mediawiki.job schema. The schema contains several properties common to all the events (meta, database, type etc) as well as an unstructured params property containing the parameters specific to each job type.

In order to push the events to Kafka, a special implementation of the JobQueue interface is provided, JobQueueEventBus. This implementation is push-only, so only push-related methods are implemented. Users should not try to pop from it. Also currently the JobQueueEventBus implementation doesn't provide methods to track the number of jobs in the queue and other queue statistics.

The events are validated by the EventGate service to verify they conform to the schema and produce to an Event Platform stream with a mediawiki.job.{job_type} naming convention. To support multi-DC Kafka, a 'stream' is made up of Kafka topics. A job Kafka topic will be prefixed with the datacenter the job was produced in, so in eqiad this will be eqiad.mediawiki.job.{job_type}.

On the other end of a Kafka queue, there's a dedicated installation of the ChangeProp service (GitHub). The service has a set of rules on which topics to subscribe to and what to do with the events coming in. For the job events, the service issues a POST request to job runners LVS endpoint to execute each job. Also, ChangePropagation is responsible for the following features:

  • Guaranteed retries. The job executor endpoint in MediaWiki responds with an HTTP error code in case the execution has failed. In that case ChangePropagation posts a retry event into a special topic and retries executing the job with exponentially growing delay up to a configurable number of times.
  • Commit management. ChangePropagation service makes sure that every event is somehow acted upon - either successfully executed or deduplicated, retried or acknowledged as completely failed. None of the events should be lost.
  • Concurrency limiting. Each rule has a configurable concurrency so that spikes in job rates can be smoothed.
  • Deduplication. The jobs could be deduplicated. The information for deduplication is stored in Redis for configurable period of time. Deduplication is done based on 3 possible strategies which all work together:
    • Deduplication based on ID. In case the exact same event was reread because of service failure or restart, the duplicate will not be executed.
    • Deduplication based on SHA1. In case the job parameters are the same as some previously executed event, and the similar job was already executed after the event was created - it's skipped.
    • Deduplication based on root event. For recursive events if we know that a particular set of events was superseded with a similar root event - it's skipped.

After the JSON serialized job is POSTed to the job executor (via /rpc/RunSingleJob.php endpoint), it's deserialized and executed. The result of the execution is reported via HTTP status codes back to ChangePropagation service. Each job is signed by MediaWiki with a secret key and the signature is verified on the receiving side to avoid executing unverified events.

Monitoring

Logs

For logs monitoring there is a consolidated Logstash JobQueue EventBus dashboard showing the logs from each part of the system:

  • EventBus logs for the Mediawiki extension that posts the jobs to Kafka.
  • cpjobqueue logs for the dedicated Change Propagation service deploy.
  • JobExecutor logs for the job execution side of the system.

Graphs

Grafana has a JobQueue EventBus dashboard as well with the following graphs:

  • Job insertion rates. Represents the rate of the job events per second inserted into Kafka per job type.
  • Normal job processing rate. Represents the number of events executed but change propagation per job type.
  • Normal job processing timing. Represents the average time it takes to execute a job per type.
  • Normal job processing delay in time. Represents the average time spent between enqueueing a job event and executing it.
  • Normal ROOT job processing delay in time. For recursive jobs represents the average, p75 and p99 delay between enqueueing a root job and processing an individual leaf job.
  • Retry job processing graphs. Where appropriate the same graphs exist for retry topics for each job type.
  • Deduplication. Represents the rate in which each job type was deduplicated based on either of the 3 deduplication conditions.
  • Change Propagation memory graphs. Allows to track memory usage and garbage collection patterns in the change propagation service.

Scripts

Apart from the dashboards, the queue could be manually examined via directly connecting to Kafka via kafkacat, e.g.

kafkacat -b kafka-main1003.eqiad.wmnet:9092 -t eqiad.mediawiki.job.ThumbnailRender -o end

Tips and tricks

A collection of tips that were learned while operating the similar system for RESTBase updates.

  • When restarting a service a huge spike in event processing speed will occur - that's normal.
  • Restating a single node will cause a rebalance of a topic if that node was responsible for processing it.

Miscellaneous

Debugging

You can push a null job to the jobqueue via eval.php:

> JobQueueGroup::singleton()->push( new NullJob( Title::newMainPage(), array() ) );

Multi-DC support

Operation in multi-datacenter environment

The queue implementation comes with built-in automatic multi-datacenter support. Jobs in each DC are posted to the local Kafka cluster and consumed by a local instances of change propagation service. The execution is dispatched to the master DC by sending requests to jobrunner.discovery.wmnet.

In both datacenters topic names for local events are prefixed with the datacenter name. The job topics are mirrored between datacenters via MirrorMaker, so in case of actual datacenter loss the backlog is not lost and can be re-executed. However currently there's no special software to reexecute the backlogs.

See also