Data Engineering/Systems/Hadoop Event Ingestion Lifecycle
High level overview of how an event makes it into the analytics-hadoop cluster.
Event Ingestion Lifecycle
(There is a recording of a talk describing this system (as well as EventLogging))
- EventGate (optional)
- HDFS raw (via Gobblin MR job)
- Hive event database (via Refine Spark job)
- Hive event_sanitized database (via RefineSanitized Spark job) (optional)
- unsanitized Hive event table data deleted (after 90 days)
Most event streams in WMF Kafka are multi-datacenter. You can learn more about how we run multi-DC Kafka clusters.
An Event Platform stream can be made up of many individual topics in Kafka. In the multi DC case, usually a single stream is made up of its prefixed Kafka topics. E.g.
my_stream would be made up of
codfw.my_stream Kafka topics.
In Hive (via Refine) we ingest these Kafka topics into tables in the
event database named after the stream. The datacenter Kafka topic prefixes are added as a datacenter Hive partition key.
Downstream jobs that use these Hive
event tables need to know when an hourly partition has completed.
Problem: If there is no data in one of a stream's composite Kafka topics for a given hour, how can we differentiate between the following scenarios?
- There is a problem somewhere with in the ingestion pipeline, and data failed to be ingested.
- There really was no data produced to the topic during this hour.
We solve this problem by producing artificial 'canary' events into every stream. As long as we successfully produce at least one canary event into all of a stream's composite topics every hour, we can be sure that there should be at least one event in every topic every hour. If there are no events for a topic in an hour, then something is broken. Downstream jobs can hope that the hourly Hive partition will be ingested later, and wait until it is.
As of 2023-06, The ProduceCanaryEvents job runs as a systemd timer several times an hour. It …
- discovers all event streams via EventStreamConfig (that have
- uses the stream's schema examples to create an artificial canary event for the stream (with
meta.domainset to 'canary'),
- POSTs these events to the
destination_event_service(eventgate) in each datacenter.
This means that all streams will have artificial canary events in them, and consumers must intentionally filter these events out. (e.g.
where meta.domain != 'canary')
NOTE: the systemd timer approach is brittle. Ideally, production of canary events will be handle by an Airflow job for which we can track individual runs of the job for each stream.
- task T266798 - Enable canary events for all streams
- task T337055 - Send a critical alert to data-engineering if produce_canary_events isn't running correctly
- ✅ task T251609 - Automate ingestion and refinement into Hive of event data from Kafka using stream configs and canary/heartbeat events
- ✅ task T252585 - Refine event pipeline at this time refines data in hourly partitions without knowing if the partition is complete