From Wikitech
Jump to navigation Jump to search

Refine is a Spark job that 'refines' arbitrary datasets into Parquet backed Hive tables. It passes over the incoming dataset, creates or alters a Hive table to match the dataset, and then inserts the data into the Hive table.

Refine expects that table names and partitions can be regex matched out of incoming dataset paths.

Refined data types

Refine infers field data types from the incoming data itself. It does not use any externally defined JSON Schemas. This allows it to be agnostic about the system where the data comes from. JSON data does not on its own have a schema, so in order to map a JSON dataset to a Hive table, the entire dataset must first be read. Every encountered field in each record will be added to a larger union schema representing every field in the dataset. Example: Let's say you have a dataset that has 3 records in it: { "id": 1 } { "id": 2, "name": "Donkey" } and { "status": "sleeping" }. Refine will scan these records, and notice that there are a total of 3 distinct fields: "id": int, "name": string, and "status": string. The resulting Hive table schema columns will look like

`id`: bigint,
`name`: string,
`status`: string

Schema inference is done recursively. Nested object become Hive struct types.

This gets especially tricky with JSON data and numbers. JSON does not have a distinction between decimal and integer numbers. Let's say you have records like { "measure": 1 } and { "measure": 1.1 }. In this case, Refine will be able to cast the type of `measure` to the wider data type of double. However, now let's say that this is not the first time the dataset has been refined, and that there is already an existent Hive table. The first time this dataset was refined, refine only encountered `measure` fields with integers. Refine does not handle type changes, so the existent Hive table's field type becomes the canonical one. When refining new datasets, if there is a decimal value of `measure`, that value will be cast to integer, and the decimal part will be lost. This should work in the opposite direction as well. If an existent Hive table has a double column, for which there is new data that looks like integers, those integers will be cast to doubles. There are also other conditions where this casting might work or not work. Refine tries to do its best, but it can't always succeed when casting.

The main lesson here is: BE EXPLICIT. If you are emitting JSON events and you want a field to be a decimal, you should ALWAYS include a decimal part. E.g. never emit { "measure": 1 } when what you really want is { "measure": 1.0 }.


There are various Refine jobs scheduled to run in the Analytics Cluster. These cron jobs run on the analytics coordinator host (currently an-coord1001) as the hdfs user and log their output to files in /var/log/refinery.


The jobs themselves run in YARN, so the local log files will not have much useful information. Once one of these jobs has finished, you should be able to see the logs using the yarn CLI: sudo -u hdfs yarn logs -applicationId <application_id>

Failed jobs

If data for a partition contains fields with types that conflict with an existent Hive table, the Refine job for that partition will likely fail. This will cause the _REFINE_FAILED flag to be written, and an email alert sent. The entire partition is not refine-able. However in some simple cases, Refine will be able to cast between types. The only known example of this is casting an integer to a float field. In other cases, Refine will be able to choose the source table's field type for a field. In this case, it is likely that all of the fields of the record with the conflicting type field will be nullified.

Rerunning jobs

If a Refine job for a particular hour fails, it will write a _REFINE_FAILED flag file into the destination partition directory. To schedule a job to rerun, you may either remove this flag file, or run the Refine job with the --ignore_failure_flag=true option. The latter will cause the job to re-attempt refinement of any partition in its --since range that has a _REFINE_FAILED flag.

If you need to run a job that is farther in the past than the configured --since flag, you may use both the --since and --until flags with ISO 8601 date formats to specify a date range.

Wrapper scripts for each refine job are installed on an-coord1001 in /usr/local/bin. These scripts load configuration out of properties files in /etc/refinery/refine. You can override any config by appending CLI flags to the wrapper scripts.

For example, if you have to re-run any failed partition refinements in the last 96 hours for the EventLogging analytics table MobileWikiAppShareAFact, you'll want to provide some overrides on the CLI to set the table_whitelist_regex, ignore_failure_flag and since properties:

sudo -u hdfs /usr/local/bin/refine_eventlogging_analytics --ignore_failure_flag=true --table_whitelist_regex=MobileWikiAppShareAFact --since=96 --verbose refine_eventlogging_analytics

Note that any config property that Refine takes can be overriden in this way. If you need to modify the Spark opts that are passed to the spark2-submit command, you'll have to do that manually, and can't use the wrapper script. You can however still use the same --config_file and Refine job overrides.

Things to look for:

- If the main job is running the command above will not do anything, just (hopefully) warn you of that fact and exit, just wait for the other job to finish (/usr/bin/yarn application -status <applicationId> will return ongoing status) and re-run the refine command

See data that might have failed

kafkacat   -b kafka-jumbo1002.eqiad.wmnet:9092 -t codfw.mediawiki.revision-score


Here's a list of tips and things you can do to troubleshoot a Refine job that sent an alarm:

  • The alarm email contains the name of the job that failed (there are some jobs that wrap Refine, like EventLoggingSanitization). This can help you understand what failed.
  • The alarm email also contains a list of partitions (schema, year, month, day, hour) that have failed refinement. This can help you understand whether the problem affects just 1 schema, a subset of them, or the whole database. Also, give you the exact time of the problem.
  • Refine jobs execute periodically, and are able to re-refine and fix a failed partition automatically. If you are troubleshooting an alarm that happened hours or days ago, it might have been automatically fixed by Refine itself.
  • If you look for the job name in puppet (refine.pp or data_purge.pp), you'll find the source and destination HDFS directories and Hive tables.
  • Check whether the destination partitions are present (success) or missing (failed). You can do it from Hive by querying for the partitions in question; or from HDFS by executing hdfs dfs -ls.
  • It can happen that the source data is invalid too, check it querying from Hive or by executing `hdfs dfs -ls` on the source directories.
  • If the destination data is invalid, you can look at the yarn logs for the job. For that, you need the applicationId. You can find it in by searching for the job name and datetime. Once you have the applicationId, see how to access logs in the 'logs' section above. The logs can be huge, pipe them to less, or try grepping them.
  • If you can not find the applicationId, you can rerun the job manually to reproduce the error. Execute systemctl list-timers in an-coord1001.eqiad.wmnet and grep for the job name. Then execute systemctl status <corresponding service name> and under ExecStart you'll see the script that runs that job. You can cat it and extract the re-run command. Note that you might have to add --since and --until parameters.