Jump to content

Search/CirrusStreamingUpdater

From Wikitech

Cirrus Streaming Updater

The Cirrus Streaming Updater (SUP) updates the elasticsearch indexes for each and every mediawiki edit. The updater consists of two applications, a producer and a consumer. One producer runs per datacenter reading the events from various topics and generating a unified stream of updates that need to be applied to the search clusters. One consumer runs per cluster group (eqiad, codfw, cloudelastic) to write to. The producer only reads events from it's local datacenter. The consumer reads events from all producers in all datacenters.

The chain of events between a user clicking the 'save page' button and elasticsearch being updated is roughly as follows:

  • MW core approves of the edit and generates an event in the mediawiki.page-change stream.
  • The event is read by the streaming updater producer in the same datacenter, aggregated with related events, and results in a cirrussearch.update_pipeline.update event.
  • The consumer receives the event, fetches the content of the update from the mediawiki api, batches together many updates, and performs a bulk write request to the appropriate elasticsearch cluster (each consumer writes to a cluster group of three elasticsearch clusters).

In addition to page change events, a significant source of updates are mediawiki.cirrussearch.page_rerender events. These events represent changes that did not change the revision_id, but likely changed the rendered result of the page (ex: propagated template updates). While these are the higher volume inputs, the updater reads one more set of topics related to the CirrusSearch Weighted Tags functionality. These come in over a variety of streams and are generally metadata generated by async processes such as ML models or batch data processing.

There is a DPE Deep Dive from 2024-09-03 "Search Update Pipeline - Feeding search with Flink stream processing" (Recording)

Backfilling/Reindexing

The streaming updater can also backfill existing indices for periods of time where updates, for whatever reason, were not written to the elasticsearch cluster. The backfill uses a custom helm release, which runs a second copy of the standard consumer with specific constraints on kafka offsets and wikis to process. See the "Backfill Batch" section of the cirrus-streaming-updater README.

In-place reindexing is similar, but uses different arguments. There is the top level entrypoint (python -m cirrus_reindexer) for backfilling, and then another one (python -m cirrus_reindexer.reindex_all) . See the Cirrus Reindex Orchestrator repo for more details.

Deployment

The flink jobs run in 3 different environments:

  • wikikube staging
    • producer: consuming all DCs topics and filtering on a couple wikis: testwiki, frwiki, itwiki and officewiki
    • consumer-search: writing to relforge
  • wikikube eqiad
    • producer: consumer all eqiad prefixed topics
    • consumer-search: writing to the main search cluster in eqiad
    • consumer-cloudelastic: writing to cloudelastic
  • wikikube codfw
    • producer: consumer all codfw prefixed topics
    • consumer-search: writing to the main search cluster in codfw

Prior to deploying a new version of the job you must ensure that it properly works in staging:

It is important to deploy a new version only if the job is properly working. If it is down you must investigate the reasons and bring the jobs back up before any attempts to upgrade it.

The deployment procedure is quite standard and can be done using helm:

cd /srv/deployment-charts/helmfile.d/services/cirrus-streaming-updater
helmfile apply -e staging -i

Please verify the diff and abort if you see an unexpected change. It may happen that someone else has tuned some settings, if in doubt please ask. Diffs you may see that you can safely ignore:

  • restartNonce: XXX
  • savepointNonce: XXX

Let the job run for a while and monitor its output with:

 kubectl get flinkdeployments.flink.apache.org

A healthy deployment should show

NAME                        JOB STATUS   LIFECYCLE STATE
flink-app-consumer-search   RUNNING      STABLE
flink-app-producer          RUNNING      STABLE
kube_env cirrus-streaming-updater staging
kubectl logs -l component=jobmanager -c flink-main-container -f | jq -r .message

If everything looks fine you can proceed to the next environments using the same procedure. But always check that the jobs are running properly before updating.

Individual jobs can be updated using a selector:

helmfile apply -e staging -i --selector name=producer

You can override some values as well:

helmfile apply -e staging -i --selector name=producer --set app.config_files.app\\.config\\.yaml.kafka-source-start-time=2025-09-18T16:00:00Z

But if you do so: please make sure to re-deploy the job after to make sure that the next deployer does not see some unexpected differences.

It is never normal to have to drop the flink state, if you run into this please take a moment to understand why the job is failing to load its state, it is likely that the new version you are pushing is introducing a bug.

Note that dropping the state may cause some data loss:

  • producer: the 10mins deduplication is in the state, and can contain a huge number of events in case the job was backlogged.
  • consumer: contains all in-flight requests to MW and the saneitizer state

San(e)itizing

San(e)itizing is a process to keep the CirrusSearch indices sane. Its primary purpose is to make sure pages that are out of date or missing in the search index will be (re-)indexed.

This process has a secondary purpose of ensuring all indexed pages have been rendered from wikitext within the last few months. It accomplishes this by indexing every n-th page it visits in such a way that after n loops over the dataset all pages will have been re-indexed.

This loop algorithm has been ported to the SUP consumer application as an additional, optional source of cirrussearch.update_pipeline.update events. It lives besides the regular kafka source, but only produces events locally at a constant, low rate.

Troubleshooting the Streaming Updater

This script outputs the kubernetes logs based on datacenter, helmfile release, and kubernetes environment.

Warning: Obsolete Documentation

This update process described below was replaced in early 2024. Nothing in this section is actively used anymore, but we keep the documentation around for historical purposes.

Realtime updates

The CirrusSearch extension updates the elasticsearch indexes for each and every mediawiki edit. The chain of events between a user clicking the 'save page' button and elasticsearch being updated is roughly as follows:

  • MW core approves of the edit and inserts the LinksUpdate object into DeferredUpdates
  • DeferredUpdates runs the LinksUpdate in the web request process, but after closing the connection to the user (so no extra delays).
  • When LinksUpdate completes it runs a LinksUpdateComplete hook which CirrusSearch listens for. In response to this hook CirrusSearch inserts CirrusSearch\Job\LinksUpdate for this page into the job queue (backed by Kafka in wmf prod).
  • The CirrusSearch\Job\LinksUpdate job runs CirrusSearch\Updater::updateFromTitle() to re-build the document that represents this page in elasticsearch. For each wikilink that was added or removed this inserts CirrusSearch\Job\IncomingLinkCount to the job queue.
  • The CirrusSearch\Job\IncomingLinkCount job runs CirrusSearch\Updater::updateLinkedArticles() for the title that was added or removed.

Other processes that write to elasticsearch (such as page deletion) are similar. All writes to elasticsearch are funneled through the CirrusSearch\Updater class, but this class does not directly perform writes to the elasticsearch database. This class performs all the necessary calculations and then creates the CirrusSearch\Job\ElasticaWrite job to actually make the request to elasticsearch. When the job is run it creates CirrusSearch\DataSender which transforms the job parameters into the full request and issues it. This is done so that any updates that fail (network errors, cluster maintenance, etc) can be re-inserted into the job queue and executed at a later time without having to re-do all the heavy calculations of what actually needs to change.

Batch updates from the database

CirrusSearch indices can also be populated from the database to bootstrap brand new clusters or to backfill existing indices for periods of time where updates, for whatever reason, were not written to the elasticsearch cluster. These updates are performed with the forceSearchIndex.php maintenance script, the usage of which is described in multiple parts of the #Administration section.

Batch updates use a custom job type, the CirrusSearch\Job\MassIndex job. The main script iterates the entire page table and inserts jobs in batches of 10 titles. The MassIndex job kicks off CirrusSearch\Updater::updateFromPages() to perform the actual updates. This is the same process as CirrusSearch\Updater::updateFromTitle, updateFromTitle simply does a couple extra checks around redirect handling that is unnecessary here.

Scheduled batch updates from analytics network

Jobs are scheduled in the WMF analytics network by the search platform airflow instance to collect together various information collected there and ship it back to elasticsearch. The airflow jobs build one or more files per wiki containing elasticsearch bulk update statements, uploads them to swift, and sends a message over kafka indicating availability of new information to import. The mjolnir-msearch-daemon running on search-loader instances in the production network receive the kafka messages, download the bulk updates from swift, and pipe them into the appropriate elasticsearch clusters. This includes information such as page popularity and ml predictions from various wmf projects (link recommendation, ores, more in the future).

Saneitizer (background repair process)

The saneitizer is a process to keep the CirrusSearch indices sane. It's primary purpose is to compare the revision_id held in cirrussearch and the primary wiki databases, to verify that cirrus pages are properly being updated. Pages that have a mismatched revision_id in cirrussearch and sent to the indexing pipeline to be reindexed.

The saneitizer has a secondary purpose of ensuring all indexed pages have been rendered from wikitext within the last few months. It accomplishes this by indexing every n'th page it visits is such a way that after n loops over the dataset all pages will have been re-indexed.

TODO fill in info on the saneitizer (leaving as stub for now)

Autocomplete indices

Autocomplete indices build daily via a systemd timer on mwmaint servers. Specifically, mediawiki_job_cirrus_build_completion_indices_eqiad.timer calls mediawiki_job_cirrus_build_completion_indices_eqiad.service which runs a bash script cirrus_build_completion_indices.sh , which in turn calls the CirrusSearch maintenance script UpdateSuggesterIndex.php .

Job queue

CirrusSearch uses the mediawiki job queue for all operations that write to the indices. The jobs can be roughly split into a few groups, as follows:

Primary update jobs

These are triggered by the actions of either users or adminstrators.

  • DeletePages - Removes titles from the search index when they have been deleted.
  • LinksUpdate - Updates a page after it has been edited.
  • MassIndex - Used by the forceSearchIndex.php maintenance script to distribute indexing load across the job runners.

Secondary update jobs

These are triggered by primary update jobs to update pages or indices other than the main document.

  • OtherIndex - Updates the commonswiki index with information about file uploads to all wikis to prevent showing the user duplicate uploads.
  • IncomingLinkCount - Triggers against the linked page when a link is added or removed from a page. Updates the list of links coming into a page from other pages on the same wiki. This is an expensive operation, and the live updates are disabled in wmf. Instead the incoming links counts are calculated in a batch by the incoming_links_weekly dag on the search-platform airflow instance and shipped as a batch update from the analytics network.

Backend write jobs

These are triggered by primary and secondary update jobs to represent an individual write request to the cluster. One job is inserted for every cluster to write to. In the backend the jobqueue is configured to partition the jobs by cluster into a separate queues. This partitioning ensures slowdowns indexing to one cluster do not cause similar slowdowns in the remaining clusters.

  • ElasticaWrite