User:Milimetric/Notebook/PET code jam 2021 09

From Wikitech


I had a lot of fun working with Clara and Petr last week as part of the Platform Engineering Team's (PET) Code Jam. At a high level, we explored how MediaWiki (MW) could publish atomic events. Changes to MW internals are very relevant for consuming data outside MW, we should work together to build hook interfaces and modularize MW from the inside out. Establishing well-known guarantees in how MW performs state changes would enable eventual consistency even without solving some of the harder problems of real-time replication. Transactional outbox coupled with some changes that are already underway in MW would be a much more precise and low-impact solution than a general transactional outbox. For example, only replicating the corner cases that aren't easy to otherwise get from simple queries to the revision/page/user tables.

These are notes of our brainstorms as we examined potential solutions:

The Lost Events

We started trying to find out why EventBus events get lost on the way to Kafka. The timeline is basically:

  1. MW starts a transaction wrapped around an edit
  2. MW executes the RevisionRecordInserted hook
  3. EventBus prepares data and schedules a deferred update in the hook handler
  4. MediaWiki commits the transaction that the edit was part of
  5. Deferred update POSTs to EventGate
  6. EventGate produces to Kafka

We found ordinary revisions that are in the revision table but not in Kafka. As in, on the happy path, not inserted by some corner case that somehow doesn't fire the hook. We dug further and correlated some 503 errors coming from step 5 above, out of the envoy proxies between MW and EventGate. Andrew is working with others to improve logging there so we can get more detail. But I also learned that MW starts a 120 second timer before 1 and kills anything that is still running when that time passes, including deferred updates. This happens with no cleanup and we found some of these in logstash correlated with the time of some of our missing revisions as well. Our first main conclusion was that if this is possible, we can never get 100% consistent events out of MW unless we do something inside the transaction.

As a side note, these 503s seemed really weird at first since EventBus retries 3 times to send the event, each time with a 20 second(!!) timeout. Since EventGate runs on multiple Kubernetes pods it seemed very unlikely that this strategy would fail. The latest on that centers around this patch.

A Minimal Log For Reconciliation

So what if MW wrote a minimal log of events, for example just revision ids and timestamps, and consumers of EventBus events could periodically query that log to see if they're missing any revisions?

One sidetrack here is talking about how we might emit such a log in real time or really fast, and all the storage, replication, and general infrastructure that goes into that. There's a conversation with useful questions from the DBAs about Debezium and transactional outboxes, and it will serve as useful context in most of the ideas here. Another piece of useful context is that we used to store our job queue in MySQL and moved to Redis for performance reasons. Timo did a nice write-up of the History_of_job_queue_runners_at_WMF, and expanded a bit on the possible reasons for moving to Redis:

"My guess is that we want to be able to have multiple jobrunner processes consume jobs from the same table. We already had a scheduler process between the database and actual execution (so execution was multi-process and distributed over servers), but the scheduler that polled the database for unclaimed/retriable jobs was still singular. There was failover, but my guess is that the number of wikis and job types we had meant that when most queues are empty, there were a lot of selects to do before finding something, and for some jobs (like email dispatch) we wanted something more immediate. We actually ended up solving that a different way though (we raise a flag in memcached when queueing a job to tell the schedular where to look first), and the mysql backend already segmented things by job types so we had a tight-loop scheduler that only looked for email jobs in a loop and nothing else, etc. But if scheduling truly is the bottleneck even for a single wiki/jobtype, then yeah, redis would have allowed that to be distributed by atomically "pop"ing things off the queue without needing to worry about what other threads see. Although this brought with it a huge amount of complexity around reliably re-inserting things. So our list-pop commands in redis were actually not native redis commands but Lua_EVAL commands (which is a feature in Redis queries) where we atomically popped something from a queue and re-inserted it at the same time in a different list of "currently running" things and then returned the popped job. And periodically we'd check the running list of things that have likely died or gone unfinished. Today we do more or less the same thing in changeprop/kafka, but less atomic I think, and involving some memcached/redis keys over top as well for additional hinting and coordination."

So putting that aside, what does this minimal log look like, what fields could we save to enable reconciliation? Consumers of EventBus events should be able to join to it and pick out matches and misses. We pretty quickly realized that this isn't trivial and the list of fields you'd have to include grows with corner cases that only MW knows about.

Reality Check on MW consistency

We took a little detour here to observe that MW is not guaranteeing itself complete consistency on some of its internal state mutations. Namely, many changes are done via deferred updates and the jobqueue, both of which are not guaranteed. Petr's sense is that actually most operations outside of the core ones, like edits, are done this way. So if we fix those 503 errors and whatever else we find between EventBus and Kafka, then the consistency of those events should be basically the same as MW. And now we can focus on making only the absolutely necessary streams consistent, such as revision create.

Generalizing MW Hooks

We decided it would be fun to explore an idea Petr has mentioned in the past: what if we cleaned up and generalized MW hooks in such a way that MW core itself could rely on them to do all state changes? If we went a step further and published those hooks via something like a transactional outbox, then consumers of those events would be able to do anything MW can do. So we designed a little proof of concept system to play with and explore this idea.

  • hook_log would hold our data, with the schema hook_name, hook_payload (json), fired_at (timestamp), published_at (timestamp, null)
  • EventBus wrote the hooks we cared about for now, but a longer term solution would probably involve HookContainer::run and both standardizing what types of parameters these kinds of externalize-able hooks are allowed to have as their payload and some mechanism for easily or automatically serializing those types to JSON
  • some maintenance script reads, updates, and deletes these rows as necessary, writing them out to a file. We would write to Kafka in a real setup, perhaps with something like Debezium
  • a service would join two streams, coming from two hooks, RevisionRecordInserted and UserGroupsChanged, to try to recreate a revision_create_event event the way that EventBus publishes it today

In talking this over we found some problems, but we came away thinking this idea had lots of upside even if some of the pieces don't quite fall into place yet. The main value would be bringing everyone into the same space to design a common interface for a common problem. It would help MW developers understand how data is used outside and give everyone else an appreciation for what is and isn't possible or feasible.

The main problem seems to be that writing this kind of volume of data to the database would put a lot of stress on replication. Since MySQL replication doesn't allow leader-side filtering, all this data would flow out to at least replicas one hop away from the leader. If we tried to write to a different database we would lose the transactional consistency. We'd love to talk about this with DBAs to check our assumptions.

Another conceptual problem is that we're duplicating a lot of data here. Re-working hooks in this way might have a lot of benefit for MW in general, but if all we want is to reconcile all changes to the revision table, we can divide and conquer a bit. First, the vast majority of changes to the revision table and append-only inserts which can be easily identified by their timestamp. The reason we can't reconcile completely right now is that revisions get moved to the archive table without a trace, restored back into the revision table at arbitrary points in time, or updated in the past without updating any timestamps. Some of these changes are recorded in the logging table but not all as far as we're aware.

Update the Logging Table and Establish Strong Guarantees

We settled on an idea that would make it easy to reconcile with MW from a data model point of view, and postponed the related infrastructure and performance conversation. Basically, for consumers outside MW to reach eventual consistency with a consistent entity X, MW needs to guarantee this:

 When an instance of an entity type X is created, updated, or deleted, at time T, a record is written to a table with a way to identify X, the action performed, and the timestamp T.

Let's break this down for revision. When a revision is inserted, we have a record in the revision table, unless it was inserted as part of a restore. In this latter case records are just inserted in the past sometime. In this case we'd need a record in the logging table. Same for when pages are deleted and revision records archived. Or when revision records are suppressed. If the logging table captured all of these then consumers could query "get all rows in revision with timestamp > A, and all rows in logging with type 'revision/change' and timestamp > A". And MW would guarantee there are no corner cases that work some other way.

As a further improvement, we could use the work we did in the mediawiki_history dataset to update old logging table records to match this new approach. This would make bootstrapping much simpler.

To some extent this is already happening for User and Page, but for example historical information about when a user was in a particular bot group is lost as well as historical information about when a page was a redirect. If these are all consistently written to the logging table and no corner cases exist, then external services could compile the data for themselves and answer those questions that would otherwise require a full manual investigation of the logging table.

Crazy Idea that we hope Giuseppe Does Not See

So this was a thought that crossed our minds, but we can blame me in case the wrath of Jupiter sends lightning bolts. It's predicated on a few big "IF"s. So IF we find that writing to a transactional outbox type table would add an additional minimum of say 10ms to the total MW transaction time, and IF we find that PHP's librdkafka-based Kafka producer could publish to Kafka with >= 99.9% success in less than 10ms, then we could publish to Kafka from *inside* the MW transaction, timing out after at most 10ms, catching that error and writing to the DB instead, replicating those writes and publishing them to Kafka for eventual consistency. I feel like if those IFs don't even sound feasible we can ignore the idea for now. But eventually they'll probably be feasible, they're not breaking any laws of physics I'm aware of, so food for later thought maybe.