Jump to content

Search/CirrusStreamingUpdater

From Wikitech

Cirrus Streaming Updater

The Cirrus Streaming Updater (SUP) updates the elasticsearch indexes for each and every mediawiki edit. The updater consists of two applications, a producer and a consumer. One producer runs per datacenter reading the events from various topics and generating a unified stream of updates that need to be applied to the search clusters. One consumer runs per cluster group (eqiad, codfw, cloudelastic) to write to. The producer only reads events from it's local datacenter. The consumer reads events from all producers in all datacenters.

The chain of events between a user clicking the 'save page' button and elasticsearch being updated is roughly as follows:

  • MW core approves of the edit and generates an event in the mediawiki.page-change stream.
  • The event is read by the streaming updater producer in the same datacenter, aggregated with related events, and results in a cirrussearch.update_pipeline.update event.
  • The consumer recieves the event, fetches the content of the update from the mediawiki api, batches together many updates, and performs a bulk write request to the appropriate elasticsearch cluster (each consumer writes to a cluster group of three elasticsearch clusters).

In addition to page change events, a significant source of updates are mediawiki.cirrussearch.page_rerender events. These events represent changes that did not change the revision_id, but likely changed the rendered result of the page (ex: propagated template updates). While these are the higher volume inputs, the updater reads one more set of topics related to the CirrusSearch Weighted Tags functionality. These come in over a variety of streams and are generally metadata generated by async processes such as ML models or batch data processing.

There is a DPE Deep Dive from 2024-09-03 "Search Update Pipeline - Feeding search with Flink stream processing" (Recording)

Backfilling/Reindexing

The streaming updater can also backfill existing indices for periods of time where updates, for whatever reason, were not written to the elasticsearch cluster. The backfill uses a custom helm release, which runs a second copy of the standard consumer with specific constraints on kafka offsets and wikis to process. See the "Backfill Batch" section of the cirrus-streaming-updater README.

In-place reindexing is similar, but uses different arguments. There is the top level entrypoint (python -m cirrus_reindexer) for backfilling, and then another one (python -m cirrus_reindexer.reindex_all) . See the Cirrus Reindex Orchestrator repo for more details.

San(e)itizing

San(e)itizing is a process to keep the CirrusSearch indices sane. Its primary purpose is to make sure pages that are out of date or missing in the search index will be (re-)indexed.

This process has a secondary purpose of ensuring all indexed pages have been rendered from wikitext within the last few months. It accomplishes this by indexing every n-th page it visits in such a way that after n loops over the dataset all pages will have been re-indexed.

This loop algorithm has been ported to the SUP consumer application as an additional, optional source of cirrussearch.update_pipeline.update events. It lives besides the regular kafka source, but only produces events locally at a constant, low rate.

Troubleshooting the Streaming Updater

This script outputs the kubernetes logs based on datacenter, helmfile release, and kubernetes environment.

Warning: Obsolete Documentation

This update process described below was replaced in early 2024. Nothing in this section is actively used anymore, but we keep the documentation around for historical purposes.

Realtime updates

The CirrusSearch extension updates the elasticsearch indexes for each and every mediawiki edit. The chain of events between a user clicking the 'save page' button and elasticsearch being updated is roughly as follows:

  • MW core approves of the edit and inserts the LinksUpdate object into DeferredUpdates
  • DeferredUpdates runs the LinksUpdate in the web request process, but after closing the connection to the user (so no extra delays).
  • When LinksUpdate completes it runs a LinksUpdateComplete hook which CirrusSearch listens for. In response to this hook CirrusSearch inserts CirrusSearch\Job\LinksUpdate for this page into the job queue (backed by Kafka in wmf prod).
  • The CirrusSearch\Job\LinksUpdate job runs CirrusSearch\Updater::updateFromTitle() to re-build the document that represents this page in elasticsearch. For each wikilink that was added or removed this inserts CirrusSearch\Job\IncomingLinkCount to the job queue.
  • The CirrusSearch\Job\IncomingLinkCount job runs CirrusSearch\Updater::updateLinkedArticles() for the title that was added or removed.

Other processes that write to elasticsearch (such as page deletion) are similar. All writes to elasticsearch are funneled through the CirrusSearch\Updater class, but this class does not directly perform writes to the elasticsearch database. This class performs all the necessary calculations and then creates the CirrusSearch\Job\ElasticaWrite job to actually make the request to elasticsearch. When the job is run it creates CirrusSearch\DataSender which transforms the job parameters into the full request and issues it. This is done so that any updates that fail (network errors, cluster maintenance, etc) can be re-inserted into the job queue and executed at a later time without having to re-do all the heavy calculations of what actually needs to change.

Batch updates from the database

CirrusSearch indices can also be populated from the database to bootstrap brand new clusters or to backfill existing indices for periods of time where updates, for whatever reason, were not written to the elasticsearch cluster. These updates are performed with the forceSearchIndex.php maintenance script, the usage of which is described in multiple parts of the #Administration section.

Batch updates use a custom job type, the CirrusSearch\Job\MassIndex job. The main script iterates the entire page table and inserts jobs in batches of 10 titles. The MassIndex job kicks off CirrusSearch\Updater::updateFromPages() to perform the actual updates. This is the same process as CirrusSearch\Updater::updateFromTitle, updateFromTitle simply does a couple extra checks around redirect handling that is unnecessary here.

Scheduled batch updates from analytics network

Jobs are scheduled in the WMF analytics network by the search platform airflow instance to collect together various information collected there and ship it back to elasticsearch. The airflow jobs build one or more files per wiki containing elasticsearch bulk update statements, uploads them to swift, and sends a message over kafka indicating availability of new information to import. The mjolnir-msearch-daemon running on search-loader instances in the production network recieve the kafka messages, download the bulk updates from swift, and pipe them into the appropriate elasticsearch clusters. This includes information such as page popularity and ml predictions from various wmf projects (link recommendation, ores, more in the future).

Saneitizer (background repair process)

The saneitizer is a process to keep the CirrusSearch indices sane. It's primary purpose is to compare the revision_id held in cirrussearch and the primary wiki databases, to verify that cirrus pages are properly being updated. Pages that have a mismatched revision_id in cirrussearch and sent to the indexing pipeline to be reindexed.

The saneitizer has a secondary purpose of ensuring all indexed pages have been rendered from wikitext within the last few months. It accomplishes this by indexing every n'th page it visits is such a way that after n loops over the dataset all pages will have been re-indexed.

TODO fill in info on the saneitizer (leaving as stub for now)

Autocomplete indices

Autocomplete indices build daily via a systemd timer on mwmaint servers. Specifically, mediawiki_job_cirrus_build_completion_indices_eqiad.timer calls mediawiki_job_cirrus_build_completion_indices_eqiad.service which runs a bash script cirrus_build_completion_indices.sh , which in turn calls the CirrusSearch maintenance script UpdateSuggesterIndex.php .

Job queue

CirrusSearch uses the mediawiki job queue for all operations that write to the indices. The jobs can be roughly split into a few groups, as follows:

Primary update jobs

These are triggered by the actions of either users or adminstrators.

  • DeletePages - Removes titles from the search index when they have been deleted.
  • LinksUpdate - Updates a page after it has been edited.
  • MassIndex - Used by the forceSearchIndex.php maintenance script to distribute indexing load across the job runners.

Secondary update jobs

These are triggered by primary update jobs to update pages or indices other than the main document.

  • OtherIndex - Updates the commonswiki index with information about file uploads to all wikis to prevent showing the user duplicate uploads.
  • IncomingLinkCount - Triggers against the linked page when a link is added or removed from a page. Updates the list of links coming into a page from other pages on the same wiki. This is an expensive operation, and the live updates are disabled in wmf. Instead the incoming links counts are calculated in a batch by the incoming_links_weekly dag on the search-platform airflow instance and shipped as a batch update from the analytics network.

Backend write jobs

These are triggered by primary and secondary update jobs to represent an individual write request to the cluster. One job is inserted for every cluster to write to. In the backend the jobqueue is configured to partition the jobs by cluster into a separate queues. This partitioning ensures slowdowns indexing to one cluster do not cause similar slowdowns in the remaining clusters.

  • ElasticaWrite