MediaWiki Event Enrichment

From Wikitech

mediawiki-event-enrichment is a repository of Apache Flink (pyflink) streaming enrichment jobs intended for streaming enrichment of MediaWiki event state change streams.

These streams are implementation of the “comprehensiveness” problem described in  T291120 MediaWiki Event Carried State Transfer - Problem Statement

Why

We need a way to get real time updates of comprehensive MediaWiki state outside of MediaWiki.

We want to design MediaWiki event streams that can be used to fully externalize MediaWiki state, without involving MediaWiki on the consumer side.  That is, we want MediaWiki state to be carried by events to any downstream consumer.

Event Notification vs. Event-Carried State Transfer | by Arvind Balachandran .

Enrichment Jobs

mw-page-content-change-enrich

We had hoped that MediaWiki entity based changelog streams would be enough to externalize all MediaWiki state. The MediaWiki revision table itself is really just an event log of changes to pages. However, this is not technically true, as past revisions can be updated. On page deletes, revision records are 'archived'. They can be merged back into existing pages, updating the revision record's page_id. Modeling this as a page changelog event will be very difficult.

Instead, page state change event data model will support use cases that only care about externalized current state. That is, we will not try to capture modifications to MediaWiki's past in this stream. These stream will be useful for Wikimedia Enterprise, Dumps, Search updates, cache invalidation, etc, but not for keeping a comprehensive history of all state changes to pages.

The page_content_change enrichment job consumes the mediawiki.page_change stream and emits the mediawiki.page_content_change stream. Both of these streams conform to the mediawiki/page/change event schema. (Design work in T308017.)

mw-page-content-change-enrich is deployed in wikikube kubernetes using the flink-app helm chart via flink-kubernetes-operator.

mediawiki-event-enrichment is built using Wikimedia Event Utilities java and python libraries.

mediawiki.page_content_change semantics

Because the streaming enrichment job that creates mediawiki.page_content_change runs asynchronously, by the time it processes a input mediawiki.page_change event, MediaWiki state may have changed in such a way that the data needed is no longer available.

For example, a page may be created and then immediately deleted (by a bot). The mw-page-content-change-enrich streaming job will receive the page create event before the page delete. When it receives the page create event, it will ask the MediaWiki API for content. MediaWiki will return an error, because the page no longer exists. The subsequent page delete event should soon be processed, so the eventual state of the page will be consistent, but a question remains as to what the mw-page-content-change-enrich job should do with the page create event. https://phabricator.wikimedia.org/T309699 should resolve this issue.

For all cases in the table below an event was consumed from page_change. Failure scenario describes the outcome of an enrichment call to MW Action API (query action).

Failure scenario HTTP Error code Retry Enriched Event produced Event forwarded to error topic When can this happen Behavior
Connection Timeout 408 Yes No Yes The request took more than <timeout> to complete Retry up to threshold, than forward to error topic
Read timeout Yes No Yes The request was canceled by the client because it exceeded <timeout>. A ReadTimeout Exception was raised. Retry up to threshold, than forward to error topic
No content 204 No No Yes An exception will be raised when trying to parse response body
Further action needs to be taken in order to complete the request 3xx No No Yes An exception will be raise on 3xx statuses.
Other Request errors 4xx No No Yes An exception will be raise on 4xx statuses.
Internal error 5xx Yes No Yes Action API is momentarily not unavailable An exception will be raise on 5xx statuses, with the exception of 500, 502. 503, 504. Those will be retried before failing.
Missing content: badrevids in response 200 Yes No Yes Content body can be parsed and contains badrevids payload. mw enrichment will raise an exception Retry up to threshold, than forward to error topic
Missing content: database replica lag 200 Yes No Yes Content could not be retrieved because the maximum database replica lag exceeded the `maxlag` parameter https://www.mediawiki.org/wiki/Manual:Maxlag_parameter Retry up to threshold, than forward to error topic
Missing content

The Action API cannot return a response for a `rev_id` that should exist.

Failure scenario New event is generated
Page deletion: page is deleted before we had a chance to read its content. Can happen within seconds though. Minutes to hours, more typically.
Content suppression: page is deleted before we had a chance to read its content. Can happen within seconds though. Minutes to hours, more typically. No
Mysql replica lag: we might request a revision to mariadb replica that is lagging behind
A maintenance SQL script is executed for content errors, resulting in DB mutations No

Monitoring

Flink App grafana dashboard

Logs are shipped to logstash in the ECS index. They are most easily viewed using this logstash dashboard.

Flink UI

The Flink JobManager UI can be accessed via an ssh tunnel to the leader JobManager.

Get the Pod IP of the JobManager:

$ ssh deployment.eqiad.wmnet
$ kube_env <k8s-namespace> <k8s-cluster>
# E.g. kube_env mw-page-content-change-enrich eqiad
$ kubectl get pods -o wide | grep -v taskmanager
flink-app-main-897448f4-vnsfh    2/2     Running   0          8h    10.64.75.96   kubestage1004.eqiad.wmnet   <none>           <none>
# JobManager Pod IP is: 10.64.75.96
$ exit

# Back on your local workstation, open an ssh tunnel:
$ ssh -t  -N -L8081:<FLINK_JOB_MANAGER_POD_IP>:8081 deploy1002.eqiad.wmnet

Now navigate to http://localhost:8081 to access the Flink JobManager UI.

Upgrades and Deployment

Development and release instructions for mediawiki-event-enrichment can be found in the README.

Once a new docker image has been built and published , we can bump the image version in the relevant helmfile values.yaml file. For mw-page-content-change-enrich, this is the appropriate values.yaml file.

Once merged, for the most part, the usual kubernetes helmfile deployment process can be followed. But, because this is a streaming Flink application, special care may need to be taken to handle the application restart.

See

MW enrichment runs active/active single compute, and there are no downstream applications to 'depool'.

If mw enrichment jobs are off in the active MW DC for a very long time, we will have issues. If they are of in the active MW DC for a short amount of time, we will just have late events. If they are off in the inactive MW DC, there shouldn't be any real issues (unless somehow page changes are processed by MW in the inactive DC).

Local development and debugging with Kafka

eventutilities-python and thus mediawiki-event-enrichment run tests using Event Platform streams with data in local file sources and sinks. Local development can then be as easy as running the tests, or running the python main entrypoint with the relevant configuration.

However, as of 2023-07, there are no tests or automation for running the Flink job using Kafka sources and sinks. Usually this is fine, as the same SerDes are used for file sources and sinks. However, we have encountered issues where the Kafka source was improperly configured. The easiest way to debug these types of issues is to run Flink locally with Kafka sources and sinks.

Running Kafka

You can run Kafka locally in a myriad of ways, and any will suffice. Since we run k8s at WMF, our deployment-charts repo includes a kafka-dev Helm chart. For our purposes we'll use minikube locally, and our kafka-dev chart to run Kafka.

Install docker, minikube, and helm, if you don't already have them.

Start minikube:

minikube start

Clone the operations/deployment-charts repo and install the kafka-dev helm chart. Since we'll be connecting to Kafka from a client external to the k8s cluster, follow the instructions in the kafka-dev README. You may need to change the value of kafka_advertised_hosts depending on how you are running minikube.

git clone ssh://gerrit.wikimedia.org:29418/operations/deployment-charts
cd ./deployment-charts
helm install ./charts/kafka-dev

You can inspect the status of the pods with kubectl get pods and kubectl logs -f <pod_id>.

Kafka should now be running and accessible from your host machine at <kafka_advertised_host>:30092, default of 127.0.0.1:30092.


You should also install kcat (formerly kafkacat), so you can inspect, produce, and consume to your local Kafka cluster from the CLI.

Running Flink using local Kafka

Since we'll be running Flink locally, we'll need a few more things:

A conda or virtualenv with apache-flink (pyflink) installed. If you have an active (~python 3.9) python environment, install Flink, and also other mediawiki-event-enrichment (test) requirements:

pip install apache-flink==1.17.1  # change this to the appropriate Flink version
# Install other requirements (like eventutilities-python), including
# those neede to run tests locally.
cd ./mediawiki-event-enrichment
pip install -r ./requirements-test.txt

You can now configure stream_manager sources and sinks to use kafka with this value for bootstrap_servers. Here's an example config file:

stream_manager:
  job_name: mw-page-content-change-enrich

  stream_config_uri: https://meta.wikimedia.org/w/api.php

  schema_uris:
    # Load schemas from local file repo:  CHANGE THIS PATH!
    - file:///Users/otto/Projects/wm/eventplatform/mediawiki-event-enrichment/event-schemas/primary/jsonschema

  # Configure sources and sinks.
  # NOTE: Make sure to set kafka options in environment specific values files.
  source:
    stream: mediawiki.page_change.v1:1.1.0
    connector: kafka
    options:
      topics: [eqiad.mediawiki.page_change.v1]
      consumer_group: mw_page_content_change_enrich-000
      bootstrap_servers: 127.0.0.1:30092

  sink:
    stream: rc1.mediawiki.page_content_change:1.1.0
    connector: kafka
    options:
      delivery_guarantee: AT_LEAST_ONCE
      bootstrap_servers: 127.0.0.1:30092
      kafka_topic_prefix: eqiad.

For whatever reason, Flink Kafka sources cannot auto create topics, so we'll need to create any source topics manually. The easiest way is to produce a blank line to the topic with kcat:

# Produce a blank line to the topic
echo ' ' | kcat -P -b 127.0.0.1:30092 -t eqiad.mediawiki.page_change.v1
# Verify that the topic exists
kcat -L -b 127.0.0.1:30092 -t eqiad.mediawiki.page_change.v1


Assuming we saved our above config file in config-local-kafka.yaml, the following command will run page_content_change.py in a local Flink cluster:

# Unfortunetly pip install apache-flink does not put `flink` on your PATH. 
# This will put pyflink/bin on your PATH:
PATH=$PATH:$(python -c 'import pyflink, os; print(os.path.join(os.path.dirname(pyflink.__file__), "bin"))')

flink run -t local \
  -py ./mediawiki_event_enrichment/page_content_change.py \
  --config ./scr/config-local-kafka.yaml

By default, Flink logs will be in the python env's pyflink/log directory. You can tail them like:

tail -f $(python -c 'import pyflink, os; print(os.path.join(os.path.dirname(pyflink.__file__), "log"))')/*

Replaying events through Kafka

Now that your Kafka and your Flink app are running, you'll need to produce actual data to the source topic(s) to get it to do anything. We can use kcat to both produce and consume data in Kafka.

Since our config-local-kafka.yaml is using defaults for the MediaWiki API, we'll be sending our enrichment HTTP requests to the production MW API. We should use real data!

We can get a sample of real data from production kafka clusters. Login to an analytics client (AKA stat box) and run the following command to get the last 10 page_change events.

stat1005 $ kafkacat -C -b kafka-jumbo1007.eqiad.wmnet:9092 -o -10 -c 10 -t eqiad.mediawiki.page_change.v1

Copy and paste these events to a local file called page_change_events.json.

On one terminal, start consuming from your output sink topic, e.g.

kcat -u -C -b 127.0.0.1:30092 -t eqiad.rc1.mediawiki.page_content_change

Then, pipe the page_change_events.json file into the source topic in your local kafka cluster

kcat -u -P -b 127.0.0.1:30092 -t eqiad.mediawiki.page_change.v1 < page_change_events.json

You can run this produce command as many times as you like to have those page_change_events.json enriched by your local pipeline.