Jump to content

User:Chlod/Tips for EventStreams consumers

From Wikitech

This page documents oddities I encountered with EventStreams when writing wikimedia-streams, which could very much help other library developers when making libraries which consume events from EventStreams. This documentation is meant for consumers, not administrators. Some major oversimplifications may be present. In addition, I'm not claiming to be a Kafka or SSE expert; I may get some parts wrong.

Sequential consumption

EventStreams allows historical consumption with the use of the since parameter. This allows a consumer to stream past events for whatever reason, be it catching up during downtimes or, in the case of continuous integration for the wikimedia-streams library, receiving data from the past to avoid waiting for new events to happen live on wikis (especially with rare events).

When since is not provided, events are sent in real time. Unless you have a flux capacitor, the events will be more or less sequential as it reaches the consumer. This is not the case for historical consumption.

When consuming events from the past, you may get events with a timestamp earlier than the latest event you received. For this reason, don't expect that events are sequential. Note that the following statement from the original EventStreams page still holds true: "Kafka guarantees that all events after the returned message offset will be after the given timestamp."

If you'd like to test this yourself, you can check out this sample code, which should throw an error within a few seconds.

Cross-datacenter events

As mentioned above, since allows for historical consumption of data. This does, however, also cause events from different data centers to pop up. EventStreams currently (2023-05) consumes from eqiad and codfw, and you can expect to see data from the two datacenters (with topics eqiad.<stream name> and codfw.<stream name>, respectively) when consuming past streams.

Because of this, you'll end up getting a timestamp and/or offset for each topic in the SSE message's ID. This, in turn, is what you'll use for the Last-Event-ID header to continue consuming events after disconnects. The underlying software which runs EventStreams, KafkaSSE, favors timestamp more than offset, as this allows support for events coming from either datacenter. This does, however, mean that you cannot expect that passing the same Last-Event-ID for old data will give out the same first few events. This works sometimes, but sometimes you'll get different events, seconds apart from what you're expecting.

If you're planning to write tests to confirm that Last-Event-ID is being used properly, check the timestamp instead for similarity, rather than checking if the event, its ID, its offset, or its data is the same. KafkaSSE's documentation suggests another possible workaround for this: modify the ID provided by KafkaSSE to use the last offset you received so that you're targeting a specific event rather than any event close to a given timestamp (in this case, the one provided by KafkaSSE).

Zombie connections

An existing bug causes connections to EventStreams to stay open, even if the connection no longer receives data. Depending on the implementation (and in the case of eventsource), this can cause the connection to stay open, long past the usual automatic disconnect time of 15 minutes. The solution for this would be to trigger a reconnect on your own every 15 minutes (or less). For streams like mediawiki.recentchange where you'd expect an event every second or so, you can check if event throughput suddenly drops to zero and reconnect accordingly. For slower streams, like mediawiki.revision-visibility-change, rely on the provided Last-Event-ID to consume any event that you might have missed. This does come with the issue of having a worst-case delay of 15 minutes, unless you reconnect at a faster interval.

Be respectful!

As usual, provide a proper user agent when making connections to the streams, and keep connections limited. You will hit a 429 Too Many Requests error very quickly if you make multiple simultaneous connections to EventStreams. When writing integration tests, see if you can make tests run in serial than in parallel. This will also prevent tests from randomly failing, should you hit a 429 error while tests are being run.