Data Platform/Systems/Refine
Refine (Airflow-based, current)
Refine is the production Spark job that evolves Hive schemas and refines "event" streams (JSON files in hourly Hive partitions) into Parquet-backed Hive tables in the event database. Most jobs have been migrated from systemd timers to "Airflow" and now run with a revised, simpler Scala implementation.
Scope
- Refines "events only" (not arbitrary datasets) sourced from hourly JSON Hive partitions in /wmf/data/raw/event/ and /wmf/data/raw/eventlogging_legacy/.
- Evolves/creates the destination Hive table and writes Parquet to /wmf/data/event/ (database: event).
- Uses the "latest schema version" from the schema registry to evolve the Hive table and select columns from the JSON.
- Everything is in place to support "Apache Iceberg" tables as output (rollout is prepared/compatible).
Scala entry points
There are two main entry points, plus a convenience bundle:
- org.wikimedia.analytics.refinery.job.refine.cli.EvolveHiveTable
- org.wikimedia.analytics.refinery.job.refine.cli.RefineHiveDataset
- org.wikimedia.analytics.refinery.job.refine.cli.EvolveAndRefineToHiveTable (bundle used in Airflow to reduce the number of individual jobs: one Airflow task â one k8s pod â one Skein app â one Spark app)
Exception: the "netflow" job still uses the "legacy" Refine entry point org.wikimedia.analytics.refinery.job.refine.Refine (see Legacy Refine).
Scheduling
- Refine workloads are orchestrated by "Airflow" (DAG: refine_to_hive_hourly).
- The DAG maps over streams; each mapped task launches a k8s pod running a Skein app that starts Spark.
Data readiness / lateness handling
- Refine waits for raw data to be present for 2 consecutive hours before refining. This gives Gobblin time to ingest late events (e.g., hour H events arriving during hour H+1).
Stream requirements
To be refined, a stream must have:
- A schema declared in schemas-event-primary/secondary
- A declaration in the "Event Stream Configurator (ESC)":
- enable analytics_hive_ingestion
- enable analytics_hadoop_ingestion
- "activate canary events" for the stream
Refine fetches the list of streams at the start of the DAG run:
- it refines streams where consumer analytics_hive_ingestion is declared and enabled,
- it skips streams without canary events,
- and it respects Airflow variables streams_allow_list and streams_disallow_list.
ESC references:
Deployment
- Deploy the new "refinery-source" version.
- Update the "airflow-dags" artifact reference to that version.
- Deploy "airflow-dags".
Logs
- Spark "driver logs are collected in Airflow" (verbosity reduced for readability) and usually contain what you need.
- Full logs are still available via YARN:
sudo -u hdfs yarn logs -applicationId <application_id>
- If you need very verbose driver logs, adjust/skip the variable spark_driver_log4j_properties_file.
Refine job sizing
To match resources to stream volume we use a T-shirt sizing:
- small â local Spark job within the Skein container/k8s pod (default for new streams)
- medium â regular distributed Spark job via Skein
- large â distributed Spark job with more resources
We set it on EventStreamConfig in consumers.analytics_hive_ingestion.spark_job_ingestion_scale
Tips:
- Start with "small", then scale up if the job takes too long / crashes due to OOM.
- Estimate from raw data volume in HDFS and Kafka Grafana dashboards by comparison with existing streams.
- list the streams on EventStreamConfig and get some examples of medium or large
- Look at the topic size on the Grafana Dashboard for your topic and the one you selected previously
- Pick the T-shirt size identical to the stream with the nearest topic in terme of topic size. And use the topic size value within a peak if your stream is subject to high hour variability.
Markers / flags
- The legacy _REFINE_FAILED and ignore_failure_flag mechanisms are "not used" anymore.
- We add a _PROCESSED marker in the "raw" folders "before" Refine starts. Gobblin uses this to estimate the number of late-event filesâhence the marker is set pre-refinement.
Data locations
- Destination DB: event
- Destination HDFS: /wmf/data/event/
- Sources (raw JSON):
- /wmf/data/raw/event/
- /wmf/data/raw/eventlogging_legacy/ (single-DC legacy streams; names are CamelCase at source, downcased in HDFS/Hive)
- Current DCs: eqiad and codfw
Failures
Sensor failures
Airflow sensors fail when the expected raw partition is missing. Common causes:
- Gobblin delays/failures
- Missing canary events or delays/failures in the canary DAG
Resolution:
- Fix the upstream issue and "rerun the sensor".
- In the unlikely case the absence is expected, you may mark the precise sensor tasks as "success".
Refine job failures
- Schema evolution incompatibilities (a new JSON schema version cannot be applied directly by EvolveHiveTable)
- Infrastructure issues (network/HDFS outages, etc.)
Mitigations:
- Diagnose in Airflow driver logs or YARN full logs.
- If a schema is marked "inactive", add dependent streams to streams_disallow_list and then disable analytics_hive_ingestion in ESC.
Manually disabling streams
- Add a stream to Airflow variable streams_disallow_list to temporarily disable refinement.
Reruns / clearing in Airflow
General advice: when rerunning the large Refine DAG, be precise about which "task instances" you clear; avoid clearing the whole DAG (~600 tasks).
CLI example ("be careful"):
airflow tasks clear \
âdag-regex refine_to_hive_hourly \
âtask-regex evolve_and_refine_to_hive_hourly \
âstart-date "2023-08-01T05:00:00" \
âend-date "2023-08-01T08:00:00" \
âonly-failed
This clears "all mapped tasks for all streams" in that time window. If you need per-stream precision, first identify the streamâs map_index, then use a targeted script (see https://gitlab.wikimedia.org/-/snippets/245).
Manual invocation (new Refine)
Example (adjust artifact version/paths as needed):
Run it on a stat box for a single hour:
spark3-submit âmaster yarn \
âconf write.spark.accept-any-schema=true \
âconf âspark.driver.extraJavaOptions= -Dlog4j.configuration=file:quieter_spark_log4j.propertiesâ \
âname refine_test \
âclass org.wikimedia.analytics.refinery.job.refine.cli.EvolveAndRefineToHiveTable \
âdeploy-mode client \
hdfs:///wmf/cache/artifacts/airflow/analytics/refinery-job-0.2.69-shaded.jar \
âtable my_db.centralnoticebannerhistory \
âschema_uri /analytics/legacy/centralnoticebannerhistory/latest \
âlocation hdfs://analytics-hadoop/wmf/data/event/centralnoticebannerhistory \
âpartition_columns year:LONG,month:LONG,day:LONG,hour:LONG \
âtransform_functions org.wikimedia.analytics.refinery.job.refine.filter_allowed_domains,org.wikimedia.analytics.refinery.job.refine.remove_canary_events,org.wikimedia.analytics.refinery.job.refine.deduplicate,org.wikimedia.analytics.refinery.job.refine.geocode_ip,org.wikimedia.analytics.refinery.job.refine.parse_user_agent,org.wikimedia.analytics.refinery.job.refine.add_is_wmf_domain,org.wikimedia.analytics.refinery.job.refine.add_normalized_host,org.wikimedia.analytics.refinery.job.refine.normalizeFieldNamesAndWidenTypes \
âinput_paths /wmf/data/raw/eventlogging_legacy/eventlogging_CentralNoticeBannerHistory/year=2025/month=08/day=21/hour=00 \
âpartition_paths year=2025/month=8/day=21/hour=0 \
âspark_job_scale medium
See data that might have failed
kafkacat -b kafka-jumbo1002.eqiad.wmnet:9092 -t codfw.mediawiki.revision-score
Troubleshooting (quick tips)
- Airflow task logs include stream name and the partition window; use them to scope the issue.
- The destination partitions can be checked from Hive or HDFS (hdfs dfs -ls).
- Source data can also be invalid; validate via Hive or HDFS listings.
- If you canât locate the failing app in the UI, re-run the minimal set of tasks to reproduce and capture the new applicationId, then fetch YARN logs.
New stream declaration
When a new stream is declared in EventStreamConfig, two intertwined processes may begin (if enabled):
- Canary event generation
- Stream refinement
Typically, to trigger a refinement job for a given hour, raw/input data must be available for both that hour and the following hour. This ensures that the streaming pipeline is healthy before refining any data.
For a newly declared stream, however, there is no guarantee that canary jobs will populate data in time. As a result, the first few hours may contain data. To avoid blocking Refine for new streams:
- For the first three hourly partitions, we refine whatever data is available. (no crash in case of no data)
- This refinement occurs 20 minutes after the end of the hour, regardless of data completeness.
Legacy Refine (systemd-based)
This section documents the previous Refine job and workflows. It remains relevant for streams that still use the legacy pipeline (e.g., "netflow").
Overview
The legacy Spark job (org.wikimedia.analytics.refinery.job.refine.Refine) ârefinedâ arbitrary datasets into Parquet-backed Hive tables. It inferred table/partition names by regex-matching input paths.
Administration
- Various legacy Refine jobs were scheduled via systemd/cron on the analytics coordinator (e.g., an-launcher1003.eqiad.wmnet) as user analytics, logging to /var/log/refinery.
- Jobs run on YARN; local log files are minimal. Fetch logs via:
- sudo -u hdfs yarn logs -applicationId <application_id>
Failed jobs
Causes included schema registry inaccessibility, conflicting types, and other unexpected errors. A failure wrote a _REFINE_FAILED flag file and sent an email alert. Limited automatic casting could occur (e.g., intâfloat), otherwise conflicting rows could be nullified.
Check if the schema is inactive
If the schema owners mark it "inactive", disable refinement. Add the schema to table_exclude_regex in Puppet (example patch linked in history).
Rerunning jobs
A failing hour wrote _REFINE_FAILED in the destination partition, e.g.:
- Source: /wmf/data/raw/eventlogging/eventlogging_MobileWikiAppiOSFeed/hourly/2019/12/04/20
- Dest: /wmf/data/event/MobileWikiAppiOSFeed/year=2019/month=12/day=4/hour=20
To rerun:
- Remove the flag file "or"
- Run with âignore_failure_flag=true to re-attempt failed partitions within âsince.
To go beyond default âsince, provide both âsince and âuntil (ISO-8601).
"To rerun a job that previously "succeeded", remove the _REFINED flag (e.g., hdfs -rm -r)."
Wrapper scripts (e.g., /usr/local/bin/refine_eventlogging_legacy) load properties from /etc/refinery/refine; CLI flags override any property.
Examples:
sudo -u analytics kerberos-run-command analytics /usr/local/bin/refine_eventlogging_legacy âignore_failure_flag=true âtable_include_regex=MobileWikiAppShareAFact âsince=96 âverbose
sudo -u analytics kerberos-run-command analytics \
/usr/local/bin/refine_eventlogging_legacy \
--ignore_failure_flag=true \
--since=2019-06-26T21:00:00 \
--until=2019-06-26T22:00:00 \
--table_include_regex="FeaturePolicyViolation|ResourceTiming|UploadWizardExceptionFlowEvent|MobileWikiAppOnThisDay|MultimediaViewerAttribution|MobileWikiAppSavedPages|WikidataCompletionSearchClicks|ClickTiming|QuickSurveysResponses|EchoMail|MobileWikiAppFeed|GuidedTourGuiderImpression" \
--verbose
If the main job is currently running, these commands will no-op (they should warn and exit). Check status and retry later:
- /usr/bin/yarn application -status <applicationId>
Malformed / corrupt records
Legacy Refine could be run with a permissive JSON mode to drop unparseable records:
sudo -u analytics kerberos-run-command analytics
/usr/local/bin/refine_eventlogging_legacy
âtable_include_regex=âSearchSatisfactionâ
âsince=â2020-05-29T21:00:00â
âuntil=2
âignore_failure_flag=true
âdataframereader_options=âmode:DROPMALFORMEDâ
See Spark DataFrameReader#json "mode" docs for details.
Running locally / yarn client
Because Spark SQL required direct Hive DDL connections, legacy Refine in local or yarn-client mode needed explicit Hive client jars and proxy options. Example:
/usr/bin/spark2-submit
âname otto_test_refine_eventlogging_0
âclass org.wikimedia.analytics.refinery.job.refine.Refine
âmaster yarn
âdeploy-mode client
âconf spark.driver.extraClassPath=/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-common.jar:/srv/deployment/analytics/refinery/artifacts/hive-jdbc-1.1.0-cdh5.10.0.jar:/srv/deployment/analytics/refinery/artifacts/hive-service-1.1.0-cdh5.10.0.jar
âdriver-java-options=â-Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080â
/srv/deployment/analytics/refinery/artifacts/refinery-job.jar
âoutput_database=otto_json_refine_test
âhive_server_url=an-coord1001.eqiad.wmnet:10000
âinput_path=/wmf/data/raw/eventlogging
âinput_path_regex=âeventlogging_(.+)/hourly/(\d+)/(\d+)/(\d+)/(\d+)â
âinput_path_regex_capture_groups=âtable,year,month,day,hourâ
âoutput_path=/user/otto/external/eventlogging13
âtable_include_regex=â^ChangesListHighlights$â
âtransform_functions=org.wikimedia.analytics.refinery.job.refine.deduplicate_eventlogging,org.wikimedia.analytics.refinery.job.refine.geocode_ip
âschema_base_uri=eventlogging
âsince=12 âuntil=10
Legacy job catalog
Existing legacy jobs were documented in Puppet (e.g., refine.pp).
Legacy troubleshooting
- Alert emails included the job name (some wrappers, like EventLoggingSanitization) and failing partitions.
- Refine periodically re-refined and could fix failed partitions automatically.
- Find source/destination HDFS paths and Hive tables in refine.pp or data_purge.pp.
- Check presence of destination partitions (Hive or hdfs dfs -ls).
- Validate source data similarly.
- To find YARN logs: locate the applicationId in yarn.wikimedia.org (by job name and datetime) and fetch logs as above.
- If you canât find the applicationId, reproduce by re-running the job manually. On an-coord1001.eqiad.wmnet use:
- systemctl list-timers â find the job â systemctl status <service> â extract the run script/command; add âsince/âuntil as needed.