Jump to content

Data Platform/Systems/Refine

From Wikitech

Refine is a Spark job that 'refines' arbitrary datasets into Parquet backed Hive tables. It passes over the incoming dataset, creates or alters a Hive table to match the dataset based on its schema, and then inserts the data into the Hive table.

Refine expects that table names and partitions can be regex matched out of incoming dataset paths.


There are various Refine jobs scheduled to run in the Analytics Cluster. These cron jobs run on the analytics coordinator host (currently an-launcher1002.eqiad.wmnet) as the analytics user and log their output to files in /var/log/refinery.


The jobs themselves run in YARN, so the local log files will not have much useful information. Once one of these jobs has finished, you should be able to see the logs using the yarn CLI: sudo -u hdfs yarn logs -applicationId <application_id>

Failed jobs

Jobs can fail due to its schema on meta being inaccessible, conflicting types, and other unexpected reasons. Refine failures will cause the _REFINE_FAILED flag to be written, and an email alert sent. In some simple cases, Refine will be able to cast between types. The only known example of this is casting an integer to a float field. In other cases, Refine will be able to choose the source table's field type for a field. In this case, it is likely that all of the fields of the record with the conflicting type field will be nullified.

Check if the schema is inactive

If a schema is causing refine alerts, and has been marked as inactive by its owners, its refinement should be disabled. Visit the page of the schema to check its status and if it is, send a patch to puppet to add the schema to the table_exclude_regex. Here's an example of such a patch.

Rerunning jobs

If a Refine job for a particular hour fails, it will write a _REFINE_FAILED flag file into the destination partition directory. For example, if you see a failure to refine /wmf/data/raw/eventlogging/eventlogging_MobileWikiAppiOSFeed/hourly/2019/12/04/20, you need to look at /wmf/data/event/MobileWikiAppiOSFeed/year=2019/month=12/day=4/hour=20. To schedule a job to rerun, you may either remove this flag file, or run the Refine job with the --ignore_failure_flag=true option. The latter will cause the job to re-attempt refinement of any partition in its --since range that has a _REFINE_FAILED flag.

If you need to run a job that is farther in the past than the configured --since flag, you may use both the --since and --until flags with ISO 8601 date formats to specify a date range.

If you need to rerun a refine job that succeeded you need to remove the _REFINED flag (hdfs -rf would do it) `

Wrapper scripts for each refine job are installed on an-launcher1002.eqiad.wmnet in /usr/local/bin. These scripts load configuration out of properties files in /etc/refinery/refine. You can override any config by appending CLI flags to the wrapper scripts.

For example, if you have to re-run any failed partition refinements in the last 96 hours for the EventLogging analytics table MobileWikiAppShareAFact, you'll want to provide some overrides on the CLI to set the table_include_regex, ignore_failure_flag and since properties (the script is currently deployed on an-launcher1002.eqiad.wmnet):

sudo -u analytics kerberos-run-command analytics \
  /usr/local/bin/refine_eventlogging_analytics \
  --ignore_failure_flag=true \
  --table_include_regex=MobileWikiAppShareAFact \
  --since=96 \
sudo -u analytics kerberos-run-command analytics \
  /usr/local/bin/refine_eventlogging_analytics \
  --ignore_failure_flag=true \
  --since=2019-06-26T21:00:00 \
  --until=2019-06-26T22:00:00 \
  --table_include_regex="FeaturePolicyViolation|ResourceTiming|UploadWizardExceptionFlowEvent|MobileWikiAppOnThisDay|MultimediaViewerAttribution|MobileWikiAppSavedPages|WikidataCompletionSearchClicks|ClickTiming|QuickSurveysResponses|EchoMail|MobileWikiAppFeed|GuidedTourGuiderImpression" \

Note that any config property that Refine takes can be overriden in this way. If you need to modify the Spark opts that are passed to the spark2-submit command, you'll have to do that manually, and can't use the wrapper script. You can however still use the same --config_file and Refine job overrides.

Things to look for:

- If the main job is running the command above will not do anything, just (hopefully) warn you of that fact and exit, just wait for the other job to finish (/usr/bin/yarn application -status <applicationId> will return ongoing status) and re-run the refine command

Rerunning a Refine job that has a malformed or corrupt records

By default Refine will fail for an dataset partition if an input record has unparseable (bad JSON?) records. If this happen you might see

org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST.

Or, you may see an error like

Encountered 1952 corrupt records out of 2878 total input records (67.824875%) when reading[...] input data, which is greater than corruptRecordFailureThreshold 1. First corrupt record: [...]

In either case, after investigating the cause, you may want to refine the input data anyway, while ignoreing the malformed / corrupt records. You can run in mode:DROPMALFORMED instead of FAILFAST or PERMISSIVE (the default). This will cause Spark to skip the records that could not be parsed.


sudo -u analytics kerberos-run-command analytics \
  /usr/local/bin/refine_eventlogging_analytics  \
  --table_include_regex='SearchSatisfaction' \
  --since='2020-05-29T21:00:00' \
  --until=2 \
  --ignore_failure_flag=true \

See also Spark DataFrameReader#json docs about reader mode .

Running Refine in local or yarn client mode

Because of a limitation in Spark SQL, Refine uses a direct Hive connection to run Hive DDL statements to evolve tables. To do this, the job needs to be submitted with relevant job files. The invocation for running Refine is different in local or yarn client mode than in yarn cluster mode. Running in local or yarn client mode is useful for testing and debugging Refine using your own Hive database, or for backfilling limited amounts of data. Here's an example running in yarn client mode using otto's database. Change the appropriate parameters accordingly if you run this.

/usr/bin/spark2-submit \
--name otto_test_refine_eventlogging_0 \
--class org.wikimedia.analytics.refinery.job.refine.Refine \
--master yarn \
--deploy-mode client \
--conf spark.driver.extraClassPath=/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-common.jar:/srv/deployment/analytics/refinery/artifacts/hive-jdbc-1.1.0-cdh5.10.0.jar:/srv/deployment/analytics/refinery/artifacts/hive-service-1.1.0-cdh5.10.0.jar \
--driver-java-options='-Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080' \
/srv/deployment/analytics/refinery/artifacts/refinery-job.jar \
--output_database=otto_json_refine_test \
--hive_server_url=an-coord1001.eqiad.wmnet:10000 \
--input_path=/wmf/data/raw/eventlogging \
--input_path_regex='eventlogging_(.+)/hourly/(\d+)/(\d+)/(\d+)/(\d+)' \
--input_path_regex_capture_groups='table,year,month,day,hour' \
--output_path=/user/otto/external/eventlogging13 \
--table_include_regex='^ChangesListHighlights$' \
--transform_functions=org.wikimedia.analytics.refinery.job.refine.deduplicate_eventlogging,org.wikimedia.analytics.refinery.job.refine.geocode_ip \
--schema_base_uri=eventlogging \
--since=12 --until=10

Rerunning a specific Refine job

Existent Refine jobs are documented in puppet. As of 2020-07, EventLogging refine jobs are in the process of migrating from the deprecated 'refine_eventlogging_analytics' job to 'refine_eventlogging_legacy'. Which EventLogging table is refined by which job is defined in by a table includelist.

See data that might have failed

kafkacat   -b kafka-jumbo1002.eqiad.wmnet:9092 -t codfw.mediawiki.revision-score


Here's a list of tips and things you can do to troubleshoot a Refine job that sent an alarm:

  • The alarm email contains the name of the job that failed (there are some jobs that wrap Refine, like EventLoggingSanitization). This can help you understand what failed.
  • The alarm email also contains a list of partitions (schema, year, month, day, hour) that have failed refinement. This can help you understand whether the problem affects just 1 schema, a subset of them, or the whole database. Also, give you the exact time of the problem.
  • Refine jobs execute periodically, and are able to re-refine and fix a failed partition automatically. If you are troubleshooting an alarm that happened hours or days ago, it might have been automatically fixed by Refine itself.
  • If you look for the job name in puppet (refine.pp or data_purge.pp), you'll find the source and destination HDFS directories and Hive tables.
  • Check whether the destination partitions are present (success) or missing (failed). You can do it from Hive by querying for the partitions in question; or from HDFS by executing hdfs dfs -ls.
  • It can happen that the source data is invalid too, check it querying from Hive or by executing `hdfs dfs -ls` on the source directories.
  • If the destination data is invalid, you can look at the yarn logs for the job. For that, you need the applicationId. You can find it in yarn.wikimedia.org by searching for the job name and datetime. Once you have the applicationId, see how to access logs in the 'logs' section above. The logs can be huge, pipe them to less, or try grepping them.
  • If you can not find the applicationId, you can rerun the job manually to reproduce the error. Execute systemctl list-timers in an-coord1001.eqiad.wmnet and grep for the job name. Then execute systemctl status <corresponding service name> and under ExecStart you'll see the script that runs that job. You can cat it and extract the re-run command. Note that you might have to add --since and --until parameters.