Analytics/Cluster/Gobblin

From Wikitech

Apache Gobblin is Hadoop ingestion software used at WMF primarily to import data from Kafka into HDFS.

Until 2021, we used Camus for this purpose. T238400 has some information on how Gobblin was chosen as its replacement.

Gobblin jobs

Gobblin jobs are declared in puppet and their configuration is defined in refinery.

gobblin-wmf

The Data Engineering team maintains a gobblin-wmf repository from which we add our gobblin extensions and build our gobblin fat jar for deployment. The gobblin-wmf-core module mostly contains code to ingest Kafka events to HDFS and to send metrics to Prometheus.

Releasing new Gobblin versions

Releasing is handled by a Jenkins job. Log into Jenkins and run the analytics-gobblin-wmf-maven-release-docker job. This will create a new git tag version, build the gobblin-wmf artifacts and upload them to Archiva.

To deploy, you will need to manually add the gobblin-wmf-core-jar-with-dependencies.jar file to analytics/refinery/artifacts, deploy following Analytics/Systems/Cluster/Deploy/Refinery, and then update the gobblin jar specified in puppet (here for analytics-hadoop, and here for analytics-test-hadoop).

Monitoring

We push Gobblin metrics to Prometheus via Prometheus PushGateway using a custom PrometheusEventReporter in the gobblin-wmf-core module.

A Gobblin Grafana dashboard exists to visualize some of these metrics.

Alerts are configured via Alertmanager. As of 2022-03 there are 2 alerts per gobblin job:

  • If the last successful run timestamp for a gobblin job is older than 2 hours, a critical alert will be fired to the Data-Engineering team
  • If a gobblin job run reports any Kafka topic partitions for which the number of actual extracted records is different than the number of expected extracted records, a warning alert will be fired to the Data-Engineering team.

Reading State Store

Gobblin writes state information to a directory configured by its .pull job file. For example, the event_default job writes state to /wmf/gobblin/state_store/event_default. To read this, you can:

java -cp /srv/deployment/analytics/refinery/artifacts/gobblin-wmf-core-jar-with-dependencies.jar:$(hadoop classpath) org.apache.gobblin.runtime.cli.GobblinCli job-state-store --storeurl file:///mnt/hdfs/wmf/gobblin/state_store -n event_default -r

Technical findings

Pulling from Kafka

We experienced regular inconsistencies in volumes of data pulled from Kafka in the first month of usage of Gobblin. More precisely, it happened regularly that some pull-tasks (executed in map-containers) were actually not pulling any data while they were expected to do so. This didn't lead to Gobblin job-errors, just delay in pulling some data. Unfortunately, due to how we trigger computation jobs after data is pulled, the problem led to data-loss alerts when the phenomenon occurred just after the hour. We investigated and found why some tasks were failing to pull data: there is a specific settings making Gobblin time-out (without error, just no data) when fetching it's first batch of data from kafka. Details can be found in that task: https://phabricator.wikimedia.org/T290723.

Gobblin metric details

Gobblin metrics are not easy to grasp. In particular they embed a Context notion (as tag) that details from how deep in the gobblin-architecture the metrics is generated. In our usage of gobblin, here are some findings:

  • The Gobblin CLI process and every Gobblin map-task are different metrics context (the map-tasks have the same job-id tag)
  • There are many contexts in which metrics are generated, making it not easy to define which to get (for our use case, it seems that contexts of the form: metricContextName=gobblin.metrics.job_JOBNAME_JOBTS are the ones summarizing all the values of interest).
  • Metrics are generated during the task execution with intermediate values, and the last generated metric-event is flagged as finalReport=true.