Analytics/Systems/Refine

From Wikitech
Jump to navigation Jump to search

Refine is a Spark job that 'refines' arbitrary datasets into Parquet backed Hive tables. It passes over the incoming dataset, creates or alters a Hive table to match the dataset based on its schema, and then inserts the data into the Hive table.

Refine expects that table names and partitions can be regex matched out of incoming dataset paths.

Administration

There are various Refine jobs scheduled to run in the Analytics Cluster. These cron jobs run on the analytics coordinator host (currently an-coord1001) as the hdfs user and log their output to files in /var/log/refinery.

logs

The jobs themselves run in YARN, so the local log files will not have much useful information. Once one of these jobs has finished, you should be able to see the logs using the yarn CLI: sudo -u hdfs yarn logs -applicationId <application_id>

Failed jobs

If data for a partition contains fields with types that conflict with an existent Hive table, the Refine job for that partition will likely fail. This will cause the _REFINE_FAILED flag to be written, and an email alert sent. The entire partition is not refine-able. However in some simple cases, Refine will be able to cast between types. The only known example of this is casting an integer to a float field. In other cases, Refine will be able to choose the source table's field type for a field. In this case, it is likely that all of the fields of the record with the conflicting type field will be nullified.

Rerunning jobs

If a Refine job for a particular hour fails, it will write a _REFINE_FAILED flag file into the destination partition directory. To schedule a job to rerun, you may either remove this flag file, or run the Refine job with the --ignore_failure_flag=true option. The latter will cause the job to re-attempt refinement of any partition in its --since range that has a _REFINE_FAILED flag.

If you need to run a job that is farther in the past than the configured --since flag, you may use both the --since and --until flags with ISO 8601 date formats to specify a date range.

If you need to rerun a refine job that succeeded you need to remove the _REFINED flag (hdfs -rf would do it)

Wrapper scripts for each refine job are installed on an-coord1001 in /usr/local/bin. These scripts load configuration out of properties files in /etc/refinery/refine. You can override any config by appending CLI flags to the wrapper scripts.

For example, if you have to re-run any failed partition refinements in the last 96 hours for the EventLogging analytics table MobileWikiAppShareAFact, you'll want to provide some overrides on the CLI to set the table_whitelist_regex, ignore_failure_flag and since properties:

sudo -u analytics /usr/local/bin/refine_eventlogging_analytics --ignore_failure_flag=true --table_whitelist_regex=MobileWikiAppShareAFact --since=96 --verbose refine_eventlogging_analytics
sudo -u analytics /usr/local/bin/refine_eventlogging_analytics --ignore_failure_flag=true --since=2019-06-26T21:00:00 --until=2019-06-26T22:00:00 --table_whitelist_regex="FeaturePolicyViolation|ResourceTiming|UploadWizardExceptionFlowEvent|MobileWikiAppOnThisDay|MultimediaViewerAttribution|MobileWikiAppSavedPages|WikidataCompletionSearchClicks|ClickTiming|QuickSurveysResponses|EchoMail|MobileWikiAppFeed|GuidedTourGuiderImpression" --verbose refine_eventlogging_analytics

Note that any config property that Refine takes can be overriden in this way. If you need to modify the Spark opts that are passed to the spark2-submit command, you'll have to do that manually, and can't use the wrapper script. You can however still use the same --config_file and Refine job overrides.

Things to look for:

- If the main job is running the command above will not do anything, just (hopefully) warn you of that fact and exit, just wait for the other job to finish (/usr/bin/yarn application -status <applicationId> will return ongoing status) and re-run the refine command

Running Refine in local or yarn client mode

Because of a limitation in Spark SQL, Refine uses a direct Hive connection to run Hive DDL statements to evolve tables. To do this, the job needs to be submitted with relevant job files. The invocation for running Refine is different in local or yarn client mode than in yarn cluster mode. Running in local or yarn client mode is useful for testing and debugging Refine using your own Hive database, or for backfilling limited amounts of data. Here's an example running in yarn client mode using otto's database. Change the appropriate parameters accordingly if you run this.

/usr/bin/spark2-submit \
--name otto_test_refine_eventlogging_0 \
--class org.wikimedia.analytics.refinery.job.refine.Refine \
--master yarn \
--deploy-mode client \
--conf spark.driver.extraClassPath=/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-common.jar:/srv/deployment/analytics/refinery/artifacts/hive-jdbc-1.1.0-cdh5.10.0.jar:/srv/deployment/analytics/refinery/artifacts/hive-service-1.1.0-cdh5.10.0.jar \
/usr/bin/spark2-submit \
--name otto_test_refine_eventlogging_0 \
--class org.wikimedia.analytics.refinery.job.refine.Refine \
--master yarn \
--deploy-mode client \
--conf spark.driver.extraClassPath=/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-common.jar:/srv/deployment/analytics/refinery/artifacts/hive-jdbc-1.1.0-cdh5.10.0.jar:/srv/deployment/analytics/refinery/artifacts/hive-service-1.1.0-cdh5.10.0.jar \
--driver-java-options='-Dhttp.proxyHost=webproxy.eqiad.wmnet -Dhttp.proxyPort=8080 -Dhttps.proxyHost=webproxy.eqiad.wmnet -Dhttps.proxyPort=8080' \
/srv/deployment/analytics/refinery/artifacts/refinery-job.jar \
--database=otto_json_refine_test \
--hive_server_url=an-coord1001.eqiad.wmnet:10000 \
--input_path=/wmf/data/raw/eventlogging \
--input_path_regex='eventlogging_(.+)/hourly/(\d+)/(\d+)/(\d+)/(\d+)' \
--input_path_regex_capture_groups='table,year,month,day,hour' \
--output_path=/user/otto/external/eventlogging13 \
--table_whitelist_regex='^ChangesListHighlights$' \
--transform_functions=org.wikimedia.analytics.refinery.job.refine.deduplicate_eventlogging,org.wikimedia.analytics.refinery.job.refine.geocode_ip \
--schema_base_uri=eventlogging \
--since=12 --until=10
/home/otto/refinery-source/refinery-job/target/refinery-job-0.0.89-SNAPSHOT.jar \
--database=otto_json_refine_test \
--hive_server_url=an-coord1001.eqiad.wmnet:10000 \
--input_path=/wmf/data/raw/eventlogging \
--input_path_regex='eventlogging_(.+)/hourly/(\d+)/(\d+)/(\d+)/(\d+)' \
--input_path_regex_capture_groups='table,year,month,day,hour' \
--output_path=/user/otto/external/eventlogging13 \
--table_blacklist_regex='^Edit|ChangesListHighlights|InputDeviceDynamics$' \
--transform_functions=org.wikimedia.analytics.refinery.job.refine.deduplicate_eventlogging,org.wikimedia.analytics.refinery.job.refine.geocode_ip \
--schema_base_uri=eventlogging \
--since=12 --until=11

See data that might have failed

kafkacat   -b kafka-jumbo1002.eqiad.wmnet:9092 -t codfw.mediawiki.revision-score

Troubleshooting

Here's a list of tips and things you can do to troubleshoot a Refine job that sent an alarm:

  • The alarm email contains the name of the job that failed (there are some jobs that wrap Refine, like EventLoggingSanitization). This can help you understand what failed.
  • The alarm email also contains a list of partitions (schema, year, month, day, hour) that have failed refinement. This can help you understand whether the problem affects just 1 schema, a subset of them, or the whole database. Also, give you the exact time of the problem.
  • Refine jobs execute periodically, and are able to re-refine and fix a failed partition automatically. If you are troubleshooting an alarm that happened hours or days ago, it might have been automatically fixed by Refine itself.
  • If you look for the job name in puppet (refine.pp or data_purge.pp), you'll find the source and destination HDFS directories and Hive tables.
  • Check whether the destination partitions are present (success) or missing (failed). You can do it from Hive by querying for the partitions in question; or from HDFS by executing hdfs dfs -ls.
  • It can happen that the source data is invalid too, check it querying from Hive or by executing `hdfs dfs -ls` on the source directories.
  • If the destination data is invalid, you can look at the yarn logs for the job. For that, you need the applicationId. You can find it in yarn.wikimedia.org by searching for the job name and datetime. Once you have the applicationId, see how to access logs in the 'logs' section above. The logs can be huge, pipe them to less, or try grepping them.
  • If you can not find the applicationId, you can rerun the job manually to reproduce the error. Execute systemctl list-timers in an-coord1001.eqiad.wmnet and grep for the job name. Then execute systemctl status <corresponding service name> and under ExecStart you'll see the script that runs that job. You can cat it and extract the re-run command. Note that you might have to add --since and --until parameters.