Data Platform/Systems/Refine
Refine (Airflow-based, current)
Refine is the production Spark job that evolves Hive schemas and refines "event" streams (JSON files in hourly Hive partitions) into Parquet-backed Hive tables in the event database. Most jobs have been migrated from systemd timers to "Airflow" and now run with a revised, simpler Scala implementation.
Scope
- Refines "events only" (not arbitrary datasets) sourced from hourly JSON Hive partitions in /wmf/data/raw/event/ and /wmf/data/raw/eventlogging_legacy/.
- Evolves/creates the destination Hive table and writes Parquet to /wmf/data/event/ (database: event).
- Uses the "latest schema version" from the schema registry to evolve the Hive table and select columns from the JSON.
- Everything is in place to support "Apache Iceberg" tables as output (rollout is prepared/compatible).
Scala entry points
There are two main entry points, plus a convenience bundle:
- org.wikimedia.analytics.refinery.job.refine.cli.EvolveHiveTable
- org.wikimedia.analytics.refinery.job.refine.cli.RefineHiveDataset
- org.wikimedia.analytics.refinery.job.refine.cli.EvolveAndRefineToHiveTable (bundle used in Airflow to reduce the number of individual jobs: one Airflow task ⇒ one k8s pod ⇒ one Skein app ⇒ one Spark app)
Exception: the "netflow" job still uses the "legacy" Refine entry point org.wikimedia.analytics.refinery.job.refine.Refine (see Legacy Refine).
Scheduling
- Refine workloads are orchestrated by "Airflow" (DAG: refine_to_hive_hourly).
- The DAG maps over streams; each mapped task launches a k8s pod running a Skein app that starts Spark.
Data readiness / lateness handling
- Refine waits for raw data to be present for 2 consecutive hours before refining. This gives Gobblin time to ingest late events (e.g., hour H events arriving during hour H+1).
Stream requirements
To be refined, a stream must have:
- A schema declared in schemas-event-primary/secondary
- A declaration in the "Event Stream Configurator (ESC)":
- enable analytics_hive_ingestion
- enable analytics_hadoop_ingestion
- "activate canary events" for the stream
Refine fetches the list of streams at the start of the DAG run:
- it refines streams where consumer analytics_hive_ingestion is declared and enabled,
- it skips streams without canary events,
- and it respects Airflow variables streams_allow_list and streams_disallow_list.
ESC references:
Deployment
- Deploy the new "refinery-source" version.
- Update the "airflow-dags" artifact reference to that version.
- Deploy "airflow-dags".
Logs
- Spark "driver logs are collected in Airflow" (verbosity reduced for readability) and usually contain what you need.
- Full logs are still available via YARN:
sudo -u hdfs yarn logs -applicationId <application_id>
- If you need very verbose driver logs, adjust/skip the variable spark_driver_log4j_properties_file.
Refine job sizing
To match resources to stream volume we use a T-shirt sizing:
- small — local Spark job within the Skein container/k8s pod (default for new streams)
- medium — regular distributed Spark job via Skein
- large — distributed Spark job with more resources
We set it on EventStreamConfig in consumers.analytics_hive_ingestion.spark_job_ingestion_scale
Tips:
- Start with "small", then scale up if the job takes too long / crashes due to OOM.
- Estimate from raw data volume in HDFS and Kafka Grafana dashboards by comparison with existing streams.
- list the streams on EventStreamConfig and get some examples of medium or large
- Look at the topic size on the Grafana Dashboard for your topic and the one you selected previously
- Pick the T-shirt size identical to the stream with the nearest topic in terme of topic size. And use the topic size value within a peak if your stream is subject to high hour variability.
Markers / flags
- The legacy _REFINE_FAILED and ignore_failure_flag mechanisms are "not used" anymore.
- We add a _PROCESSED marker in the "raw" folders "before" Refine starts. Gobblin uses this to estimate the number of late-event files—hence the marker is set pre-refinement.
Data locations
- Destination DB: event
- Destination HDFS: /wmf/data/event/
- Sources (raw JSON):
- /wmf/data/raw/event/
- /wmf/data/raw/eventlogging_legacy/ (single-DC legacy streams; names are CamelCase at source, downcased in HDFS/Hive)
- Current DCs: eqiad and codfw
Failures
Sensor failures
Airflow sensors fail when the expected raw partition is missing. Common causes:
- Gobblin delays/failures
- Missing canary events or delays/failures in the canary DAG
Resolution:
- Fix the upstream issue and "rerun the sensor".
- In the unlikely case the absence is expected, you may mark the precise sensor tasks as "success".
Refine job failures
- Schema evolution incompatibilities (a new JSON schema version cannot be applied directly by EvolveHiveTable)
- Infrastructure issues (network/HDFS outages, etc.)
Mitigations:
- Diagnose in Airflow driver logs or YARN full logs.
- If a schema is marked "inactive", add dependent streams to streams_disallow_list and then disable analytics_hive_ingestion in ESC.
Manually disabling streams
- Add a stream to Airflow variable streams_disallow_list to temporarily disable refinement.
Reruns / clearing in Airflow
General advice: when rerunning the large Refine DAG, be precise about which "task instances" you clear; avoid clearing the whole DAG (~600 tasks).
CLI example ("be careful"):
airflow tasks clear \
–dag-regex refine_to_hive_hourly \
–task-regex evolve_and_refine_to_hive_hourly \
–start-date "2023-08-01T05:00:00" \
–end-date "2023-08-01T08:00:00" \
–only-failed
This clears "all mapped tasks for all streams" in that time window. If you need per-stream precision, first identify the stream’s map_index, then use a targeted script (see https://gitlab.wikimedia.org/-/snippets/245).
Manual invocation (new Refine)
Example (adjust artifact version/paths as needed):
Run it on a stat box for a single hour:
spark3-submit –master yarn \
–conf write.spark.accept-any-schema=true \
–conf ‘spark.driver.extraJavaOptions= -Dlog4j.configuration=file:quieter_spark_log4j.properties’ \
–name refine_test \
–class org.wikimedia.analytics.refinery.job.refine.cli.EvolveAndRefineToHiveTable \
–deploy-mode client \
hdfs:///wmf/cache/artifacts/airflow/analytics/refinery-job-0.2.69-shaded.jar \
–table my_db.centralnoticebannerhistory \
–schema_uri /analytics/legacy/centralnoticebannerhistory/latest \
–location hdfs://analytics-hadoop/wmf/data/event/centralnoticebannerhistory \
–partition_columns year:LONG,month:LONG,day:LONG,hour:LONG \
–transform_functions org.wikimedia.analytics.refinery.job.refine.filter_allowed_domains,org.wikimedia.analytics.refinery.job.refine.remove_canary_events,org.wikimedia.analytics.refinery.job.refine.deduplicate,org.wikimedia.analytics.refinery.job.refine.geocode_ip,org.wikimedia.analytics.refinery.job.refine.parse_user_agent,org.wikimedia.analytics.refinery.job.refine.add_is_wmf_domain,org.wikimedia.analytics.refinery.job.refine.add_normalized_host,org.wikimedia.analytics.refinery.job.refine.normalizeFieldNamesAndWidenTypes \
–input_paths /wmf/data/raw/eventlogging_legacy/eventlogging_CentralNoticeBannerHistory/year=2025/month=08/day=21/hour=00 \
–partition_paths year=2025/month=8/day=21/hour=0 \
–spark_job_scale medium
See data that might have failed
kafkacat -b kafka-jumbo1002.eqiad.wmnet:9092 -t codfw.mediawiki.revision-score
Troubleshooting (quick tips)
- Airflow task logs include stream name and the partition window; use them to scope the issue.
- The destination partitions can be checked from Hive or HDFS (hdfs dfs -ls).
- Source data can also be invalid; validate via Hive or HDFS listings.
- If you can’t locate the failing app in the UI, re-run the minimal set of tasks to reproduce and capture the new applicationId, then fetch YARN logs.
New stream declaration
When a new stream is declared in EventStreamConfig, two intertwined processes may begin (if enabled):
- Canary event generation
- Stream refinement
Typically, to trigger a refinement job for a given hour, raw/input data must be available for both that hour and the following hour. This ensures that the streaming pipeline is healthy before refining any data.
For a newly declared stream, however, there is no guarantee that canary jobs will populate data in time. As a result, the first few hours may contain data. To avoid blocking Refine for new streams:
- For the first three hourly partitions, we refine whatever data is available. (no crash in case of no data)
- This refinement occurs 20 minutes after the end of the hour, regardless of data completeness.
Legacy Refine (systemd-based)
This section documents the previous Refine job and workflows. It remains relevant for streams that still use the legacy pipeline (e.g., "netflow").
Overview
The legacy Spark job (org.wikimedia.analytics.refinery.job.refine.Refine) “refined” arbitrary datasets into Parquet-backed Hive tables. It inferred table/partition names by regex-matching input paths.
Administration
- Various legacy Refine jobs were scheduled via systemd/cron on the analytics coordinator (e.g., an-launcher1003.eqiad.wmnet) as user analytics, logging to /var/log/refinery.
- Jobs run on YARN; local log files are minimal. Fetch logs via:
- sudo -u hdfs yarn logs -applicationId <application_id>
Failed jobs
Causes included schema registry inaccessibility, conflicting types, and other unexpected errors. A failure wrote a _REFINE_FAILED flag file and sent an email alert. Limited automatic casting could occur (e.g., int→float), otherwise conflicting rows could be nullified.
Check if the schema is inactive
If the schema owners mark it "inactive", disable refinement. Add the schema to table_exclude_regex in Puppet (example patch linked in history).
Rerunning jobs
A failing hour wrote _REFINE_FAILED in the destination partition, e.g.:
- Source: /wmf/data/raw/eventlogging/eventlogging_MobileWikiAppiOSFeed/hourly/2019/12/04/20
- Dest: /wmf/data/event/MobileWikiAppiOSFeed/year=2019/month=12/day=4/hour=20
To rerun:
- Remove the flag file "or"
- Run with –ignore_failure_flag=true to re-attempt failed partitions within –since.
To go beyond default –since, provide both –since and –until (ISO-8601).
"To rerun a job that previously "succeeded", remove the _REFINED flag (e.g., hdfs -rm -r)."
Wrapper scripts (e.g., /usr/local/bin/refine_eventlogging_legacy) load properties from /etc/refinery/refine; CLI flags override any property.
Examples:
sudo -u analytics kerberos-run-command analytics /usr/local/bin/refine_eventlogging_legacy –ignore_failure_flag=true –table_include_regex=MobileWikiAppShareAFact –since=96 –verbose
sudo -u analytics kerberos-run-command analytics \
/usr/local/bin/refine_eventlogging_legacy \
--ignore_failure_flag=true \
--since=2019-06-26T21:00:00 \
--until=2019-06-26T22:00:00 \
--table_include_regex="FeaturePolicyViolation|ResourceTiming|UploadWizardExceptionFlowEvent|MobileWikiAppOnThisDay|MultimediaViewerAttribution|MobileWikiAppSavedPages|WikidataCompletionSearchClicks|ClickTiming|QuickSurveysResponses|EchoMail|MobileWikiAppFeed|GuidedTourGuiderImpression" \
--verbose
If the main job is currently running, these commands will no-op (they should warn and exit). Check status and retry later:
- /usr/bin/yarn application -status <applicationId>
Malformed / corrupt records
Legacy Refine could be run with a permissive JSON mode to drop unparseable records:
sudo -u analytics kerberos-run-command analytics
/usr/local/bin/refine_eventlogging_legacy
–table_include_regex=‘SearchSatisfaction’
–since=‘2020-05-29T21:00:00’
–until=2
–ignore_failure_flag=true
–dataframereader_options=‘mode:DROPMALFORMED’
See Spark DataFrameReader#json "mode" docs for details.
Running locally / yarn client
Because Spark SQL required direct Hive DDL connections, legacy Refine in local or yarn-client mode needed explicit Hive client jars and proxy options. Example:
/usr/bin/spark2-submit
–name otto_test_refine_eventlogging_0
–class org.wikimedia.analytics.refinery.job.refine.Refine
–master yarn
–deploy-mode client
–conf spark.driver.extraClassPath=/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-common.jar:/srv/deployment/analytics/refinery/artifacts/hive-jdbc-1.1.0-cdh5.10.0.jar:/srv/deployment/analytics/refinery/artifacts/hive-service-1.1.0-cdh5.10.0.jar
–driver-java-options=’-Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080’
/srv/deployment/analytics/refinery/artifacts/refinery-job.jar
–output_database=otto_json_refine_test
–hive_server_url=an-coord1001.eqiad.wmnet:10000
–input_path=/wmf/data/raw/eventlogging
–input_path_regex=‘eventlogging_(.+)/hourly/(\d+)/(\d+)/(\d+)/(\d+)’
–input_path_regex_capture_groups=‘table,year,month,day,hour’
–output_path=/user/otto/external/eventlogging13
–table_include_regex=’^ChangesListHighlights$’
–transform_functions=org.wikimedia.analytics.refinery.job.refine.deduplicate_eventlogging,org.wikimedia.analytics.refinery.job.refine.geocode_ip
–schema_base_uri=eventlogging
–since=12 –until=10
Legacy job catalog
Existing legacy jobs were documented in Puppet (e.g., refine.pp).
Legacy troubleshooting
- Alert emails included the job name (some wrappers, like EventLoggingSanitization) and failing partitions.
- Refine periodically re-refined and could fix failed partitions automatically.
- Find source/destination HDFS paths and Hive tables in refine.pp or data_purge.pp.
- Check presence of destination partitions (Hive or hdfs dfs -ls).
- Validate source data similarly.
- To find YARN logs: locate the applicationId in yarn.wikimedia.org (by job name and datetime) and fetch logs as above.
- If you can’t find the applicationId, reproduce by re-running the job manually. On an-coord1001.eqiad.wmnet use:
- systemctl list-timers → find the job → systemctl status <service> → extract the run script/command; add –since/–until as needed.