Jump to content

Data Platform/Systems/Refine

From Wikitech

Refine (Airflow-based, current)

Refine is the production Spark job that evolves Hive schemas and refines "event" streams (JSON files in hourly Hive partitions) into Parquet-backed Hive tables in the event database. Most jobs have been migrated from systemd timers to "Airflow" and now run with a revised, simpler Scala implementation.

Scope

  • Refines "events only" (not arbitrary datasets) sourced from hourly JSON Hive partitions in /wmf/data/raw/event/ and /wmf/data/raw/eventlogging_legacy/.
  • Evolves/creates the destination Hive table and writes Parquet to /wmf/data/event/ (database: event).
  • Uses the "latest schema version" from the schema registry to evolve the Hive table and select columns from the JSON.
  • Everything is in place to support "Apache Iceberg" tables as output (rollout is prepared/compatible).

Scala entry points

There are two main entry points, plus a convenience bundle:

  • org.wikimedia.analytics.refinery.job.refine.cli.EvolveHiveTable
  • org.wikimedia.analytics.refinery.job.refine.cli.RefineHiveDataset
  • org.wikimedia.analytics.refinery.job.refine.cli.EvolveAndRefineToHiveTable (bundle used in Airflow to reduce the number of individual jobs: one Airflow task ⇒ one k8s pod ⇒ one Skein app ⇒ one Spark app)

Exception: the "netflow" job still uses the "legacy" Refine entry point org.wikimedia.analytics.refinery.job.refine.Refine (see Legacy Refine).

Scheduling

  • Refine workloads are orchestrated by "Airflow" (DAG: refine_to_hive_hourly).
  • The DAG maps over streams; each mapped task launches a k8s pod running a Skein app that starts Spark.

Data readiness / lateness handling

  • Refine waits for raw data to be present for 2 consecutive hours before refining. This gives Gobblin time to ingest late events (e.g., hour H events arriving during hour H+1).

Stream requirements

To be refined, a stream must have:

  • A schema declared in schemas-event-primary/secondary
  • A declaration in the "Event Stream Configurator (ESC)":
    • enable analytics_hive_ingestion
    • enable analytics_hadoop_ingestion
    • "activate canary events" for the stream

Refine fetches the list of streams at the start of the DAG run:

  • it refines streams where consumer analytics_hive_ingestion is declared and enabled,
  • it skips streams without canary events,
  • and it respects Airflow variables streams_allow_list and streams_disallow_list.

ESC references:

Deployment

  1. Deploy the new "refinery-source" version.
  2. Update the "airflow-dags" artifact reference to that version.
  3. Deploy "airflow-dags".

Logs

  • Spark "driver logs are collected in Airflow" (verbosity reduced for readability) and usually contain what you need.
  • Full logs are still available via YARN:
 sudo -u hdfs yarn logs -applicationId <application_id>
  • If you need very verbose driver logs, adjust/skip the variable spark_driver_log4j_properties_file.

Refine job sizing

To match resources to stream volume we use a T-shirt sizing:

  • small — local Spark job within the Skein container/k8s pod (default for new streams)
  • medium — regular distributed Spark job via Skein
  • large — distributed Spark job with more resources

We set it on EventStreamConfig in consumers.analytics_hive_ingestion.spark_job_ingestion_scale

Tips:

  • Start with "small", then scale up if the job takes too long / crashes due to OOM.
  • Estimate from raw data volume in HDFS and Kafka Grafana dashboards by comparison with existing streams.
    • list the streams on EventStreamConfig and get some examples of medium or large
    • Look at the topic size on the Grafana Dashboard for your topic and the one you selected previously
    • Pick the T-shirt size identical to the stream with the nearest topic in terme of topic size. And use the topic size value within a peak if your stream is subject to high hour variability.

Markers / flags

  • The legacy _REFINE_FAILED and ignore_failure_flag mechanisms are "not used" anymore.
  • We add a _PROCESSED marker in the "raw" folders "before" Refine starts. Gobblin uses this to estimate the number of late-event files—hence the marker is set pre-refinement.

Data locations

  • Destination DB: event
  • Destination HDFS: /wmf/data/event/
  • Sources (raw JSON):
    • /wmf/data/raw/event/
    • /wmf/data/raw/eventlogging_legacy/ (single-DC legacy streams; names are CamelCase at source, downcased in HDFS/Hive)
  • Current DCs: eqiad and codfw

Failures

Sensor failures

Airflow sensors fail when the expected raw partition is missing. Common causes:

  • Gobblin delays/failures
  • Missing canary events or delays/failures in the canary DAG

Resolution:

  • Fix the upstream issue and "rerun the sensor".
  • In the unlikely case the absence is expected, you may mark the precise sensor tasks as "success".

Refine job failures

  • Schema evolution incompatibilities (a new JSON schema version cannot be applied directly by EvolveHiveTable)
  • Infrastructure issues (network/HDFS outages, etc.)

Mitigations:

  • Diagnose in Airflow driver logs or YARN full logs.
  • If a schema is marked "inactive", add dependent streams to streams_disallow_list and then disable analytics_hive_ingestion in ESC.

Manually disabling streams

  • Add a stream to Airflow variable streams_disallow_list to temporarily disable refinement.

Reruns / clearing in Airflow

General advice: when rerunning the large Refine DAG, be precise about which "task instances" you clear; avoid clearing the whole DAG (~600 tasks).

CLI example ("be careful"):

airflow tasks clear \
–dag-regex refine_to_hive_hourly \
–task-regex evolve_and_refine_to_hive_hourly \
–start-date "2023-08-01T05:00:00" \
–end-date   "2023-08-01T08:00:00" \
–only-failed

This clears "all mapped tasks for all streams" in that time window. If you need per-stream precision, first identify the stream’s map_index, then use a targeted script (see https://gitlab.wikimedia.org/-/snippets/245).

Manual invocation (new Refine)

Example (adjust artifact version/paths as needed):

Run it on a stat box for a single hour:

spark3-submit –master yarn   \
–conf write.spark.accept-any-schema=true \
–conf ‘spark.driver.extraJavaOptions= -Dlog4j.configuration=file:quieter_spark_log4j.properties’ \
–name refine_test \
–class org.wikimedia.analytics.refinery.job.refine.cli.EvolveAndRefineToHiveTable \
–deploy-mode client \
hdfs:///wmf/cache/artifacts/airflow/analytics/refinery-job-0.2.69-shaded.jar \
–table my_db.centralnoticebannerhistory \
–schema_uri /analytics/legacy/centralnoticebannerhistory/latest \
–location hdfs://analytics-hadoop/wmf/data/event/centralnoticebannerhistory \
–partition_columns year:LONG,month:LONG,day:LONG,hour:LONG \
–transform_functions org.wikimedia.analytics.refinery.job.refine.filter_allowed_domains,org.wikimedia.analytics.refinery.job.refine.remove_canary_events,org.wikimedia.analytics.refinery.job.refine.deduplicate,org.wikimedia.analytics.refinery.job.refine.geocode_ip,org.wikimedia.analytics.refinery.job.refine.parse_user_agent,org.wikimedia.analytics.refinery.job.refine.add_is_wmf_domain,org.wikimedia.analytics.refinery.job.refine.add_normalized_host,org.wikimedia.analytics.refinery.job.refine.normalizeFieldNamesAndWidenTypes \
–input_paths /wmf/data/raw/eventlogging_legacy/eventlogging_CentralNoticeBannerHistory/year=2025/month=08/day=21/hour=00 \
–partition_paths year=2025/month=8/day=21/hour=0 \
–spark_job_scale medium


See data that might have failed

kafkacat -b kafka-jumbo1002.eqiad.wmnet:9092 -t codfw.mediawiki.revision-score

Troubleshooting (quick tips)

  • Airflow task logs include stream name and the partition window; use them to scope the issue.
  • The destination partitions can be checked from Hive or HDFS (hdfs dfs -ls).
  • Source data can also be invalid; validate via Hive or HDFS listings.
  • If you can’t locate the failing app in the UI, re-run the minimal set of tasks to reproduce and capture the new applicationId, then fetch YARN logs.

New stream declaration

When a new stream is declared in EventStreamConfig, two intertwined processes may begin (if enabled):

  1. Canary event generation
  2. Stream refinement

Typically, to trigger a refinement job for a given hour, raw/input data must be available for both that hour and the following hour. This ensures that the streaming pipeline is healthy before refining any data.

For a newly declared stream, however, there is no guarantee that canary jobs will populate data in time. As a result, the first few hours may contain data. To avoid blocking Refine for new streams:

  • For the first three hourly partitions, we refine whatever data is available. (no crash in case of no data)
  • This refinement occurs 20 minutes after the end of the hour, regardless of data completeness.

Legacy Refine (systemd-based)

This section documents the previous Refine job and workflows. It remains relevant for streams that still use the legacy pipeline (e.g., "netflow").

Overview

The legacy Spark job (org.wikimedia.analytics.refinery.job.refine.Refine) “refined” arbitrary datasets into Parquet-backed Hive tables. It inferred table/partition names by regex-matching input paths.

Administration

  • Various legacy Refine jobs were scheduled via systemd/cron on the analytics coordinator (e.g., an-launcher1003.eqiad.wmnet) as user analytics, logging to /var/log/refinery.
  • Jobs run on YARN; local log files are minimal. Fetch logs via:
sudo -u hdfs yarn logs -applicationId <application_id>

Failed jobs

Causes included schema registry inaccessibility, conflicting types, and other unexpected errors. A failure wrote a _REFINE_FAILED flag file and sent an email alert. Limited automatic casting could occur (e.g., int→float), otherwise conflicting rows could be nullified.

Check if the schema is inactive

If the schema owners mark it "inactive", disable refinement. Add the schema to table_exclude_regex in Puppet (example patch linked in history).

Rerunning jobs

A failing hour wrote _REFINE_FAILED in the destination partition, e.g.:

  • Source: /wmf/data/raw/eventlogging/eventlogging_MobileWikiAppiOSFeed/hourly/2019/12/04/20
  • Dest: /wmf/data/event/MobileWikiAppiOSFeed/year=2019/month=12/day=4/hour=20

To rerun:

  • Remove the flag file "or"
  • Run with –ignore_failure_flag=true to re-attempt failed partitions within –since.

To go beyond default –since, provide both –since and –until (ISO-8601).

"To rerun a job that previously "succeeded", remove the _REFINED flag (e.g., hdfs -rm -r)."

Wrapper scripts (e.g., /usr/local/bin/refine_eventlogging_legacy) load properties from /etc/refinery/refine; CLI flags override any property.

Examples:

sudo -u analytics kerberos-run-command analytics /usr/local/bin/refine_eventlogging_legacy –ignore_failure_flag=true –table_include_regex=MobileWikiAppShareAFact –since=96 –verbose


sudo -u analytics kerberos-run-command analytics \
  /usr/local/bin/refine_eventlogging_legacy \
  --ignore_failure_flag=true \
  --since=2019-06-26T21:00:00 \
  --until=2019-06-26T22:00:00 \
  --table_include_regex="FeaturePolicyViolation|ResourceTiming|UploadWizardExceptionFlowEvent|MobileWikiAppOnThisDay|MultimediaViewerAttribution|MobileWikiAppSavedPages|WikidataCompletionSearchClicks|ClickTiming|QuickSurveysResponses|EchoMail|MobileWikiAppFeed|GuidedTourGuiderImpression" \
  --verbose


If the main job is currently running, these commands will no-op (they should warn and exit). Check status and retry later:

/usr/bin/yarn application -status <applicationId>

Malformed / corrupt records

Legacy Refine could be run with a permissive JSON mode to drop unparseable records:

sudo -u analytics kerberos-run-command analytics
/usr/local/bin/refine_eventlogging_legacy  
–table_include_regex=‘SearchSatisfaction’
–since=‘2020-05-29T21:00:00’
–until=2
–ignore_failure_flag=true
–dataframereader_options=‘mode:DROPMALFORMED’

See Spark DataFrameReader#json "mode" docs for details.

Running locally / yarn client

Because Spark SQL required direct Hive DDL connections, legacy Refine in local or yarn-client mode needed explicit Hive client jars and proxy options. Example:

/usr/bin/spark2-submit
–name otto_test_refine_eventlogging_0
–class org.wikimedia.analytics.refinery.job.refine.Refine
–master yarn
–deploy-mode client
–conf spark.driver.extraClassPath=/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-common.jar:/srv/deployment/analytics/refinery/artifacts/hive-jdbc-1.1.0-cdh5.10.0.jar:/srv/deployment/analytics/refinery/artifacts/hive-service-1.1.0-cdh5.10.0.jar
–driver-java-options=’-Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080’
/srv/deployment/analytics/refinery/artifacts/refinery-job.jar
–output_database=otto_json_refine_test
–hive_server_url=an-coord1001.eqiad.wmnet:10000
–input_path=/wmf/data/raw/eventlogging
–input_path_regex=‘eventlogging_(.+)/hourly/(\d+)/(\d+)/(\d+)/(\d+)’
–input_path_regex_capture_groups=‘table,year,month,day,hour’
–output_path=/user/otto/external/eventlogging13
–table_include_regex=’^ChangesListHighlights$’
–transform_functions=org.wikimedia.analytics.refinery.job.refine.deduplicate_eventlogging,org.wikimedia.analytics.refinery.job.refine.geocode_ip
–schema_base_uri=eventlogging
–since=12 –until=10


Legacy job catalog

Existing legacy jobs were documented in Puppet (e.g., refine.pp).

Legacy troubleshooting

  • Alert emails included the job name (some wrappers, like EventLoggingSanitization) and failing partitions.
  • Refine periodically re-refined and could fix failed partitions automatically.
  • Find source/destination HDFS paths and Hive tables in refine.pp or data_purge.pp.
  • Check presence of destination partitions (Hive or hdfs dfs -ls).
  • Validate source data similarly.
  • To find YARN logs: locate the applicationId in yarn.wikimedia.org (by job name and datetime) and fetch logs as above.
  • If you can’t find the applicationId, reproduce by re-running the job manually. On an-coord1001.eqiad.wmnet use:
systemctl list-timers → find the job → systemctl status <service> → extract the run script/command; add –since/–until as needed.