Data Engineering/Systems/Hadoop Event Ingestion Lifecycle

From Wikitech

High level overview of how an event makes it into the analytics-hadoop cluster.

Event Ingestion Lifecycle

(There is a recording of a talk describing this system (as well as EventLogging))

Producer code (EventBus, EventLogging, Flink, etc. etc.)


Canary Events

Why?

Most event streams in WMF Kafka are multi-datacenter. You can learn more about how we run multi-DC Kafka clusters.

An Event Platform stream can be made up of many individual topics in Kafka. In the multi DC case, usually a single stream is made up of its prefixed Kafka topics. E.g. my_stream would be made up of eqiad.my_stream and codfw.my_stream Kafka topics.

In Hive (via Refine) we ingest these Kafka topics into tables in the event database named after the stream. The datacenter Kafka topic prefixes are added as a datacenter Hive partition key.

Downstream jobs that use these Hive event tables need to know when an hourly partition has completed.

Problem

Problem: If there is no data in one of a stream's composite Kafka topics for a given hour, how can we differentiate between the following scenarios?

  1. There is a problem somewhere with in the ingestion pipeline, and data failed to be ingested.
  2. There really was no data produced to the topic during this hour.

Solution

We solve this problem by producing artificial 'canary' events into every stream. As long as we successfully produce at least one canary event into all of a stream's composite topics every hour, we can be sure that there should be at least one event in every topic every hour. If there are no events for a topic in an hour, then something is broken. Downstream jobs can hope that the hourly Hive partition will be ingested later, and wait until it is.

ProduceCanaryEvents

As of 2023-06, The ProduceCanaryEvents job runs as a systemd timer several times an hour. It …

This means that all streams will have artificial canary events in them, and consumers must intentionally filter these events out. (e.g. where meta.domain != 'canary')

NOTE: the systemd timer approach is brittle. Ideally, production of canary events will be handle by an Airflow job for which we can track individual runs of the job for each stream.

See also

  • task T266798 - Enable canary events for all streams
  • task T337055 - Send a critical alert to data-engineering if produce_canary_events isn't running correctly
  • ✅ task T251609 - Automate ingestion and refinement into Hive of event data from Kafka using stream configs and canary/heartbeat events
  • ✅ task T252585 - Refine event pipeline at this time refines data in hourly partitions without knowing if the partition is complete