EventBus

From Wikitech
Jump to: navigation, search

If you need to send or collect data between or from different services, applications or features (for example, propagating change events from MediaWiki to a service), you should consider using EventBus.

The term 'EventBus' does refer to one specific technology. It refers to the WMF's use of various technologies to distribute streams of schema-conforming messages, or events.

More concretely, it is a common repository of JSON schema, and a RESTful interface on top of Kafka that performs schema validation. 'EventBus service' refers to the RESTful service that accepts and validates events, and produces to Kafka.

Administration

See the EventBus/Administration page for administration tips and documentation.

Background

Over time the Wikimedia architecture has become more and more services-oriented, making propagation of state changes between disparate systems increasingly important. In light of this, and the lack of a standardized approach, requirements for a general-purpose publish-subscribe event system were drafted[1][2]. The systems currently in use at Wikimedia were evaluated against these requirements, and of those, EventLogging ultimately proved the most interesting.

Initially designed for analytics, EventLogging has been in use at Wikimedia for a number of years, collecting data via an extension and a client-side Javascript library. Received events are validated against schema maintained in an on-wiki repository. A Python framework allows for composition of arbitrary stream topologies using a wide variety of protocol/transport options. In production at Wikimedia, the pipeline for analytics was initially composed of UDP and ZeroMQ transports, but in the summer of 2015, Kafka support was added, and nearly all EventLogging processes were ported to it.

After considerable discussion of requirements and available options, including an RFC meeting, a decision to use EventLogging with Kafka was made. Refactoring was performed to make the code-base more general (less specific to Mediawiki), and an HTTP REST API for event production was implemented.

EventLogging Analytics vs EventBus

EventLogging Analytics and EventBus are similar systems (the EventBus HTTP service is implemented in the EventLogging codebase!), but are for slightly different purposes. If you are a team that wants to generate MediaWiki specific analytics data for your own usage (not shared across teams), and for a relatively short term period of time (less than 2 years), the EventLogging Analytics platform should work well for you. There, schemas are stored and evolved on wiki. Events are automatically collected and validated and stored in MySQL and Hive. Events can be sent from client side browsers or apps. However, this system does not have strong reliability guarantees (as it is analytics data), and the schemas are not designed with maintainability and share-ability in mind. This leads to duplicate efforts between teams, and frustration as future developers attempt to adapt data designed for a single analytics purpose to another.

EventBus schemas are reviewed more closely than EventLogging Analytics, and are of a different non MediaWiki specific format. As such, they are (more) consistent (and hopefully) more future proof than EventLogging Analytics schemas usually are. In production, EventBus runs in multiple Data Centers, and has production service level reliability. If you want production level event data that can be used by multiple teams and services, you'll want to use EventBus.

(Ideally, these services would not be as different as they are. In the (probably distant) future, we'd like to modify EventLogging Analytics so that it looks a little more like EventBus, and possibly display EventBus schemas on wiki. This work is not prioritized.)

Event Schemas

In a system where the writers of data may not be the same as the readers, the format of the data is important. Data is not static, and developers will make changes to formats over time. Without a predefined and shared schema, a writer may make a format change without notifying a reader and break the reader's code. A schema acts as a contract between writers and readers, decoupling the two. When a schema is changed in a backwards compatible way, developers can change their schema and produce new data without breaking a downstream reader's code.[3]

Event schemas are currently shared via the mediawiki-event-schemas repository. For now, the EventBus HTTP service only supports JSON Schemas (Avro was considered, but was found to be difficult to work with). JSON Schemas do not support any fancy backwards compatible schema evolution on their own. However, so long as new fields are only ever added when evolving schemas, then changes can be considered to be backwards compatible.

Note: As of this writing (January 2016), there is no official review process for schema, and there is nothing to prevent you from making backward incompatible changes. In the future, continuous integration work is planned that will help with this.[citation needed] In the meantime, please get someone from Services and/or Analytics to review any proposed schema changes.

Metadata

All events share in common a sub-object named meta, that encapsulates a number of standard fields. The meta sub-object should conform to the following schema:

meta:
type: object
properties:
  topic:
    type: string
    description: the queue topic name this message belongs to
  schema_uri:
    type: string
    description: >
      The URI identifying the jsonschema for this event.  This may be just
      a short uri containing only the name and revision at the end of the
      URI path.  e.g. schema_name/12345 is acceptable.  This field
      is not required.
  uri:
    type: string
    format: uri
    description: the unique URI identifying the event
  request_id:
    type: string
    pattern: '^[a-fA-F0-9]{8}(-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}$'
    description: the unique UUID v1 ID of the event derived from the X-Request-Id header
  id:
    type: string
    pattern: '^[a-fA-F0-9]{8}(-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}$'
    description: the UUID of this event; should match the dt field if
	using UUID1
  dt:
    type: string
    format: date-time
    description: the time stamp of the event, in ISO8601 format
  domain:
    type: string
    description: the domain the event pertains to
required:
  - topic
  - uri
  - id
  - dt
  - domain

Note: Future iterations may make use of JSON Pointers ($refs in JSON schema parlance) to include this meta-data schema, however for now it should be manually included into every schema.

Note: As a convenience, JSON Schema should be represented using YAML (as in the example above), and will be transparently converted to JSON by the EventLogging service.

RESTful Service (Production)

eventlogging-service is a generic daemon that receives events via HTTP and produces them to EventLogging outputs. eventlogging-service-eventbus is an instance of eventlogging-service specifically for receiving events and producing them to Kafka topics using a strict topic -> schema mapping. That is, it has a topic configuration that specifies what type of schema may be produced to a particular Kafka topic. This provides guarantees of what downstream consumers can expect from a topic; Consumers can be sure that every event in a particular topic will conform to a backwards compatible schema.

By default, topics in Kafka only have a single partition. If a topic in the WMF production environment needs more partitions for throughput reasons, an Ops admin can increase the number of partitions. Messages are produced to random partitions within a topic.

Producers

MediaWiki

When equipped with the EventBus extension, MediaWiki can emit events for page edits (including creation), moves, deletes, undeletes, and changes to revision visibility, or anything else you might want to add.

ChangeProp

ChangeProp uses EventBus events to expire RESTBase cached articles, as well as various other updates.

Usage Example for MediaWiki Developers

For our example, we want to emit a new cool event from MediaWiki, perhaps a mediawiki/widget/twiddle event. We'll assume that there is a MediaWiki hook called WidgetTwiddle that is fired when the widget is twiddled.

First, we'll design our new schema and add it to the mediawiki/event-schemas repository, and also configure a topic to accept events of that schema in the eventbus-topics.yaml config file.

Schema Design

Clone mediawiki/event-schemas from https://gerrit.wikimedia.org/r/#/admin/projects/mediawiki/event-schemas . You'll be creating a new JSONSchema inside the jsonschema/mediawiki directory. Create a new file: jsonschema/mediawiki/widget/twiddle/1.yaml.

Edit this file to create your new schema. Look around at the other schemas that already exist for good ideas on how to organize yours. In order to encourage consistency, these event schemas are more tightly reviewed than the EventLogging ones you might be used to. At the very least, you'll need to add the meta schema object. Our schema will look like this:

title: mediawiki/widget/twiddle
description: >
  Represents a widget twiddle event.
type: object
properties:
  ### Meta data object.  All events schemas should have this.
  meta:
    type: object
    properties:
      topic:
        description: The queue topic name this message belongs to.
        type: string
      schema_uri:
        description: >
          The URI identifying the jsonschema for this event.  This may be just
          a short uri containing only the name and revision at the end of the
          URI path.  e.g. schema_name/12345 is acceptable.  This field
          is not required.
        type: string
      uri:
        description: The unique URI identifying the event.
        type: string
        format: uri
      request_id:
        description: The unique UUID v1 ID of the event derived from the X-Request-Id header.
        type: string
        pattern: '^[a-fA-F0-9]{8}(-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}$'
      id:
        description: The unique ID of this event; should match the dt field.
        type: string
        pattern: '^[a-fA-F0-9]{8}(-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}$'
      dt:
        description: The time stamp of the event, in ISO8601 format.
        type: string
        format: date-time
      domain:
        description: The domain the event pertains to.
        type: string
    required:
      - topic
      - uri
      - id
      - dt
      - domain

  ### Mediawiki entity fields.  All Mediawiki entity events should have these.
  database:
    description: The name of the wiki database this event belongs to.
    type: string

  performer:
    description: Represents the user that twiddled this widget.
    type: object
    properties:
      user_id:
        description: >
          The user id that twiddled the widget.  This is optional, and
          will not be present for anonymous users.
        type: integer
      user_text:
        description: The text representation of the user that twiddled the widget.
        type: string
      user_groups:
        description: A list of the groups this user belongs to.  E.g. bot, sysop etc.
        type: array
        items:
          type: string
      user_is_bot:
        description: >
          True if this user is considered to be a bot.  This is checked
          via the $user->isBot() method, which considers both user_groups
          and user permissions.
        type: boolean
    required:
      - user_text
      - user_groups
      - user_is_bot

  page_id:
    description: The page ID of the page this revision belongs to.
    type: integer
    minimum: 1

  page_title:
    description: The normalized title of the page this revision belongs to.
    type: string

  page_namespace:
    description: The namespace of the page this revision belongs to.
    type: integer

  widget_id:
    description: The ID of the widget being twiddled.
    type: integer
    minimum: 1

  widget_name:
    description: The name of the widget being twiddled.
    type: string

  widget_action:
    description: The twiddle action
    type: string

required:
  - meta
  - database
  - performer
  - widget_id
  - widget_name
  - widget_action

meta.topic specifies what topic your event should go to. This will probably be something close to the schema name. Your schema reviewers will help you pick this. We'll talk more about this in the next section.

meta.schema_uri is a relative URI naming your schema. Ours will be mediawiki/widget/twiddle.

meta.urishould be a unique URI representing the entity that the event is about. This does not have to map to a reachable http URI. In our case, this might be something like /widget/12345.

meta.request_id and meta.id will be generated for us by the EventBus MediaWiki extension.

meta.dt should represent the timestamp of the event in UTC ISO-8601 format, e.g 2017-06-07T08:43:15+00:00 If this is not provided, it will be generated for us.

domain is usually the wiki URI domain, e.g. en.wikipedia.org

The meta schema object is required for all schemas. We try to keep common MediaWiki specific fields consistent across schemas, so take a look at other schemas to see if a MediaWiki data field you need for your schema has been named before. Some conventions are documented at MediaWiki Specific Event Schemas.

Fields are optional by default. You should name as many required fields as you can, to help enforce consistency between different users of the schema.

Topic Config

If you are familiar with EventLogging, you know about schemas already. In EventLogging, schemas do have a 'topic', but they are mapped directly to the schema name. You don't control what the topic is. This means that EventLogging schemas are not reusable. Often, this is fine, but for some things, schemas should be reusable. MediaWiki and other services in production use a resource/change schema that is used to represent a change in any resource, not just MediaWiki based ones. Topics allow us to send events of the same schema to different queues. The word 'topic' comes from the Apache Kafka world, and is used by the EventBus service to know to which Kafka topic an event should be sent, but the concept applies generally. You can think of a topic as a queue name.

The EventBus service controls which schemas are allowed to be produced to which topics via the topics config file. This mostly just maps topics to schema names. We'll add a mapping for our new schema:

mediawiki.widget-twiddle:
    schema_name: mediawiki/widget/twiddle

Push your changes for review, and add someone in Services and/or Analytics (perhaps Mobrovac and/or Ottomata) as reviewers.

Event Production from MediaWiki

Clone the EventBus extension from https://gerrit.wikimedia.org/r/#/admin/projects/mediawiki/extensions/EventBus. We'll add a new handler function for the WidgetTwiddle hook.

Edit the extension.json config file and add an entry that maps your hook to the new function we are going to create:

"WidgetTwiddle": [
	"EventBusHooks::onWidgetTwiddle"
]

Now edit the Eventbus.hooks.php file. We'll add a new static function called onWidgetTwiddle.

	/**
	 * Fires when a Widget is twiddled.
	 *
	 * @param Title $title title corresponding to the article the widget was on.
	 * @param Widget $widget object
	 * @param string $widgetAction
	 */
	public static function onWidgetTwiddle( Title $title, Widget $widget, $widgetAction ) {
		global $wgDBname;
		$events = [];

		$performer = RequestContext::getMain()->getUser();

		// Create a mediawiki page undelete event.
		$attrs = [
			// Common Mediawiki entity fields
			'database'           => $wgDBname,
			'performer'          => EventBus::createPerformerAttrs( $performer ),

			// page entity fields
			'page_id'            => $title->getArticleID(),
			'page_title'         => $title->getPrefixedDBkey(),
			'page_namespace'     => $title->getNamespace(),
			'widget_id'          => $widget->getId(),
			'widget_name'        => $widget->getName(),
			'widget_action'      => $widgetAction
		];

		$events[] = EventBus::createEvent(
			// This will end up being the meta.uri
			$widget->getWidgetURI(),
			// The topic this event should be produced to.
			'mediawiki.widget-twiddle',
			// This rest of the event attributes.
			$attrs
		);

		DeferredUpdates::addCallableUpdate( function() use ( $events ) {
			EventBus::getInstance()->send( $events );
		} );
	}

Push this change for review. Once it and the changes to the mediawiki/event-schemas repository have been merged and deployed, your events will start flowing into Kafka.

Events are consumed into Hadoop, but querying them in Hive is not yet easy. (T162610 will help with this.) If you need these events in MySQL, like EventLogging, ask the Analytics team to help.

EventBus events can also be exposed publicly via EventStreams. If you think this would be useful, ask the Analytics team.

Direct Consumption from Kafka

You may consume your events using a Kafka client of your choice. Shown here are examples using kafkacat and pykafka.

kafkacat

kafkacat -C -b deployment-kafka04.eqiad.wmflabs:9092 -t mediawiki.widget-twiddle

pykafka

from pykafka import KafkaClient
client = KafkaClient(hosts='kafka1001.eqiad.wmnet:9092,kafka1002.eqiad.wmnet:9092')
topic = client.topics['mediawiki.widget-twiddle']
consumer = topic.get_simple_consumer()
for page_edit_event in consumer:
	print page_edit_event.value

MediaWiki Vagrant Development Environment

The EventBus service and MediaWIki extension can be installed by enabling the eventbus role in Vagrant:

 vagrant roles enable eventbus
 vagrant provision

The EventLogging server side codebase will be deployed to /vagrant/srv/eventlogging. All EventBus events will be written out to /vagrant/logs/eventbus.log. Try it out! tail -f /vagrant/logs/eventbus.log and then edit a page. You should see a mediawiki/revision/create event.

Debugging and developing EventBus service in EventLogging codebase:

To activate eventlogging's virtualenv:

. /vagrant/srv/eventlogging/virtualenv/bin/activate

Command to start an extra eventbus process in vagrant for debugging the eventlogging codebase:

 . /vagrant/srv/eventlogging/virtualenv/bin/activate
 eventlogging-service --port <PORT> --schemas-path /vagrant/srv/event-schemas/jsonschema --topic-config /vagrant/srv/event-schemas/config/eventbus-topics.yaml 'kafka:///localhost:9092?async=False&topic=datacenter1.{meta[topic]}' stdout://


To restart the main mediawiki vagrant eventbus service process:

 service eventlogging-service-eventbus restart

Process logs are in /var/log/syslog. Events will be produced to Kafka and also output to /vagrant/logs/eventbus.log


Testing might involve deploying a different version of eventlogging

(virtualenv) >python setup.py build
(virtualenv) >python setup.py install

Posting events directly to EventBus service

 cat event.json
 {"comment": "Created page with \"woooo\"", "database": "wiki", "meta": {"domain": "dev.wiki.local.wmftest.net", "dt": "2017-06-07T19:53:08+00:00", "id": "ed98038c-4bba-11e7-b02e-0800278dc04d", "request_id": "0c4dc11e-0eed-4cba-94b6-071a48ae3189", "topic": "mediawiki.revision-create", "uri": "http://dev.wiki.local.wmftest.net:8080/wiki/Test1"}, "page_id": 3, "page_is_redirect": false, "page_namespace": 0, "page_title": "Test1", "performer": {"user_groups": ["*"], "user_is_bot": false, "user_text": "10.0.2.2"}, "rev_content_format": "wikitext", "rev_content_model": "wikitext", "rev_id": 4, "rev_len": 5, "rev_minor_edit": false, "rev_sha1": "oclx0411bem1x8t7qw1nk5rooeifcfp", "rev_timestamp": "2017-06-07T19:53:07Z"}


POST the event

     curl -H 'Content-Type: application/json' -X POST http://localhost:8085/v1/events -d @event.json

References

  1. https://phabricator.wikimedia.org/T84923, Reliable publish / subscribe event bus
  2. https://meta.wikimedia.org/wiki/Research:MediaWiki_events:_a_generalized_public_event_datasource, Research:MediaWiki events: a generalized public event datasource
  3. http://www.confluent.io/blog/stream-data-platform-2, Putting Apache Kafka To Use: A Practical Guide to Building a Stream Data Platform (Part 2)