Event Platform/Stream Configuration

From Wikitech

Stream configuration refers to configuration that distributed producers or consumers of a stream might want, e.g. the sampling rate or the schema title of the events that are allowed in the stream. Stream configuration was originally a requested feature of Event Platform for Product engineers, so they could more easily vary some event stream producer setting without having to do code deploys. It has since become a critical part of Event Platform, used by multiple services.

Declaring streams

To produce events to WMF's Event Platform, you must declare the stream in Stream Configuration in mediawiki-config/wmf-config/ext-EventStreamConfig.php A minimum stream config entry would look like:

// Declare a stream named "domain_namespace.my_event_stream_name":
'domain_namespace.my_event_stream_name' => [
    // schema_title must match the title of the JSONSchema.
	'schema_title' => 'domain_namespace/my/event/schema_uri',
],

This declares a stream named "domain_namespace.my_event_stream_name", which accepts events with a JSONSchema title matches "domain_namespace/my/event/schema_uri".

Each key in the stream's config object is a stream config setting. Some top level stream settings are handled by the platform (e.g. schema_title), while other's are settings that producer and consumer clients should respect. A stream's final settings will be merged with wgEventStreamsDefaultSettings.

See also the EventStreamConfig README.

Stream Configuration deployment

  1. Declare your stream as described above, and get the mediawiki-config patch reviewed.
  2. Schedule a mediawiki-config Backport Deployment, starting from step 2 in those instructions.
  3. Verify that your stream is available in the EventStreamConfig API. See: Querying stream config below.
  4. If this is a brand new stream that will be produced through an destination_event_service eventgate that only requests stream configs at start up, you'll need to ask for the relevant eventgate cluster to be restarted.

Renaming streams

There may be cases when you would like to rename a previously declared stream. To do so, you'll have to declare a new stream with the same settings, and then remove the old stream declaration. If a stream's producer or consumers are actively using this stream, you'll have to manage this in several migration steps. If the old stream is not actively being used, you can just declare and delete the old stream in the same patch. The following instructions assume your stream is actively being used.

  1. Follow the steps in Stream Configuration Deployment above to declare and deploy your new stream.
  2. Deploy relevant application (producer and/or consumer) changes to use the new stream.
  3. Once all users of the stream reference the new stream, and there are no active uses of the old stream, delete the old stream from wgEventStreams, and and have the mediawiki-config change deployed.

Querying stream config

EventStreamConfig exposes an HTTP API, canonically hosted on metawiki. To get a list of all streams declared in WMF production:

curl 'https://meta.wikimedia.org/w/api.php?action=streamconfigs'

In beta, you can query metawiki deployed in beta instead:

curl 'https://meta.wikimedia.beta.wmflabs.org/w/api.php?action=streamconfigs'

Because beta config inherits from production, most stream configs will be the same. Beta specific overrides are supported via mediawiki-config.

Stream versioning

Streams are not dissimilar to an API. They

  • Have a single producer
  • Have many consumers
  • Declare a contract (schema)

Sometimes, API maintainers need to make breaking changes. A standard approach is to version the API, and bump the major version number when a breaking change is needed. In this way, multiple major versions of the API can be maintained simultaneously, allowing the API maintainers to provide a deprecation period.

Streams sometimes have this need as well, so we accomplish this in a similar way: include the major version in the stream name. We do this purely by convention. Each unique stream name is distinct stream, so bumping the version in a stream name effectively declares a new stream.

Stream versioning is opt in; you only need to do this if you intend to maintain your stream as an API for other consumers to use.


Versioned streams should be declared with a major version suffix, like: mediawiki.page_change.v1. All events produced to this v1 stream should have a major schema version of 1.

While developing a new stream, you may choose to suffix with a temporary 'version', e.g. mediawiki.page_change.rc0 or mediawiki.page_change.dev1. You can do this to indicate that the stream should not be used by other consumers, and that its event in the stream may change schemas at will.

If you produce different events with incompatible schemas to a stream , you may break automatic downstream consumers, e.g. consumers.analytics_hadoop_ingestion. Consider setting this to false while you are in development mode.

Upgrading the stream version is equivalent to declaring a new stream with the bumped major version, e.g. mediawiki.page_change.v2. How you coordinate this upgrade is up to you. You should consider:

  • Maintaining both streams during an announced deprecation period
  • Backfilling your new stream with historical data in the new schema format
  • Coordinating decommissioning the stream with known consumers.

More context can be found in task T332212.

Common Settings Documentation

In lieu of a better place, we'll try to document some of the common stream config settings here.

stream

wgEventStreams is keyed by stream name. The stream name is also available as the stream setting in API results.

schema_title

This much match exactly the title of the event JSONSchema that is allowed in this stream.

message_key_fields

A map of key names to message value fields. This is used by EventGate to extract message values into a Kafka key, which will be used for partitioning of data in Kafka. E.g.

message_key_fields:
  wiki_id: "wiki_id"
  page_id: "page.page.id"

The above would instruct EventGate to pull the value of wiki_id and page.page_id from the message and set a Kafka JSON encoded key of

{"wiki_id": "<value of message's wiki_id>", "page_id": "<value of message's page.page_id>"}

destination_event_service

This refers to the name of the EventGate HTTP event intake service the stream should be produced through. Producer clients use this to figure out where to send the stream. The EventGate services also use this to determine if a stream is allowed to be produced through them.

While this setting in technically only needed if the producer is using EventGate, it must be set if canary_events_enabled, as the canary event producer uses EventGate.

NOTE: This should one day be moved into a producers config subobject: https://phabricator.wikimedia.org/T321557

canary_events_enabled

This aides in monitoring ingestion pipelines for event streams. If this is true (the default if not set), artificial canary events will periodically be produced into the stream. The canary events are created from the first event example in the schema, but with meta.dt at a current timestamp, and with meta.domain: "canary". Consumers of streams with canary_events_enabled: true should filter out all events where meta.domain == "canary".

consumers and producers

These sub object config settings should be used to configure specific clients that produce or consume this stream. The keys in this subobject should be the name of the client. Clients look up their configuration from the API by this name.

As of 2021-09, this is only used for the Analytics Hadoop ingestion pipeline. See also https://phabricator.wikimedia.org/T273235.

EventStreamConfig

EventStreamConfig is a MediaWiki extension that implements PHP and HTTP API for requesting stream configuration. Streams configuration entries are declared in the $wgEventStreams global list in mediawiki-config/wmf-config/ext-EventStreamConfig.php.

This centralized EventStreamConfig is used by several services to automate discovery and configuration of stream producer and consumer clients:

  • EventGate service clusters uses stream config to restrict which types of events are allowed in which streams via tha schema_title setting.
  • The MediaWiki EventLogging extension uses stream config to vary things like event stream sampling rate.
  • The Analytics Cluster uses stream config to automate ingestion of streams into Hive.
  • EventStreams uses stream config to discover streams and auto-generate OpenAPI docs.


Because this API is a MediaWiki extension deployed to all (most?) Wikimedia Foundation wikis, it can be requested from any wiki. Because the configuration of the streams is in mediawiki-config, specific per wiki settings can be provided.

It is expected that 'global' configuration be requested from meta.wikimedia.org in production. You can then override things like sample rate per wiki by configuring the override for that wiki, and then requested the config you need from that wiki's action API URL.