Analytics/Hive to Druid Ingestion Pipeline

From Wikitech

HiveToDruid is a Spark/Scala job which is part of the WMF's analytics data pipeline. It allows to easily load Hive tables to Druid. You can use it programatically, i.e. from within another Spark/Scala job; or from the command line. If you want HiveToDruid to periodically load the most recent data of a given Hive table to Druid, so that you get a periodically updated datasource in Turnilo, you just have to configure a job in the puppet repository.

How to configure

You need to specify which fields (or subfields) of your Hive table you'd like to load to Druid. Also, you have to specify whether you want to load them as Druid dimensions or metrics. Optionally, you can load additional fields by defining them as transformations of existing fields. And also, you can bucket time measure fields (which are not compatible with Druid metrics) into usable dimensions.

Dimensions

A Druid dimension is a categorical field, that can be used to slice and dice the Druid datasource. Examples of valid Druid dimensions are: language, country, eventAction, userIsAnonymous, etc. Druid lets you load any field type as dimension and transforms it to a string. But fields with very high cardinality like userId, userAgentString or time measures should not be used as dimensions, because Druid does not handle them well performance-wise. So always make sure your dimensions have a reasonably low cardinality (<1000).

Metrics

A Druid metric is a numeric (integer or decimal) field that Druid can aggregate and Turnilo can show its value in a chart. Examples of valid Druid metrics are: numOfErrors, numOfClicks, etc. HiveToDruid automatically adds a metric to the datasource called 'count', which represents the number of rows of the original Hive table that fall within the selected slice/dice. If the Hive table you're loading belongs to EventLogging ('event' database), then 'count' will represent the number of events. If count is all you need, then you don't need to specify any metric.

Transforms

A Druid transform is a formula that defines an additional Druid dimension or metric to be loaded. The expressions that you can use are defined in the Druid ingestion spec docs http://druid.io/docs/latest/misc/math-expr.html. Examples of valid expressions are: numOfClicks / count (metric), concat(category, subcategory) (dimension). Transforms should also include the name of the resulting field in the target datasource.

Time measure buckets

If your Hive table has a field that contains millisecond measurements, like i.e. timeToFirstByte, you can unfortunately not load that field as a Druid dimension (because its cardinality is too big); nor as a Druid metric, because the aggregation of time measurements does not make sense. We should rather build percentiles on top of it, but percentile support is still experimental in Druid. What you can do is specify them as time measures when running HiveToDruid, and they will be transformed into valid dimensions by bucketing their values into one of these values: '0ms-50ms', '50ms-250ms', '250ms-1sec', '1sec-4sec', '4sec-15sec', '15sec-1min', etc.

Struct, Map or Array fields

If your Hive table has fields of type Struct, HiveToDruid lets you specify subfields within them easily. Just use <structField>.<subField> notation to refer to them. Fields of type Map are not supported yet (sorry!), but we are planning on adding it (task https://phabricator.wikimedia.org/T208589). Fields of type Array are supported by Druid! And Turnilo does a good job of integrating them into its UI. However, slicing and dicing on Array fields in Turnilo works a bit differently: If you check a value within an array-type filter or split, the chart will show all records that include that value within their array.


How to run programatically

HiveToDruid is actually a wrapper module that does parameter parsing and formatting to make it easier to load Hive tables to Druid from the command line or Puppet. However, if you already are within a Scala job and have the parameters ready and formatted, you can directly call DataFrameToDruid module, who actually does the Druid loading. For more details see: DataFrameToDruid.scala in github. An example of DataFrameToDruid call is:

val dftd = new DataFrameToDruid(
    spark = spark,
    dataSource = "your_desired_druid_datasource_name", // snake_case, please!
    inputDf = dataFrame, // the input DataFrame should be timely sliced already!
    dimensions = Seq("field1", "field2", "field3"),
    metrics = Seq("field4", "field5"),
    intervals = Seq((new DateTime(2019, 1, 1, 0, 0), new DateTime(2019, 1, 2, 0, 0))),
    segmentGranularity = "day",
    queryGranularity = "minute",
    numShards = 2,
    reduceMemory = "4096",
).start().await()


How to run from the command line

Please run HiveToDruid jobs from an-coord1001.eqiad.wmnet. Here's an example of a HiveToDruid call, but for full reference, see: HiveToDruid.scala in github. You need sudo permissions, for the job writes data to a temporary hdfs directory.

sudo -u hdfs spark2-submit \
--class org.wikimedia.analytics.refinery.job.HiveToDruid \
--master yarn \
--deploy-mode cluster \
--conf spark.driver.extraClassPath=/usr/lib/hive/lib/hive-jdbc.jar:/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-common.jar:/usr/lib/hive/lib/hive-service.jar \
--files /etc/hive/conf/hive-site.xml,/home/nuria/loading_test_data/properties \
--conf spark.dynamicAllocation.maxExecutors=64 \
--driver-memory 8G \
/srv/deployment/analytics/refinery/artifacts/org/wikimedia/analytics/refinery/refinery-job-0.0.83.jar \
--config_file properties \
--since 2019-02-10T00:00:00 --until 2019-02-11T00:00:00

Properties file looks like:

database = event
table= testsearchsatisfaction2
dimensions=event.action,event.scroll,event.position,event.subtest,event.source,event.autocompletetype,useragent.browser_family,useragent. browser_major,useragent.browser_minor,useragent.device_family,useragent.is_bot
query_granularity=hour
time_measures=event.mstodisplayresults,event.checkin
metrics=event.hitsReturned
segment_granularity=day
num_shards=1
reduce_memory=8192


You can also take a look at how jobs are configured on the box via systemd timers:

nuria@an-coord1001:~/tips$ systemctl cat eventlogging_to_druid_readingdepth_hourly
# /lib/systemd/system/eventlogging_to_druid_readingdepth_hourly.service
[Unit]
Description=Spark job for eventlogging_to_druid_readingdepth_hourly
[Service]
User=hdfs
SyslogIdentifier=eventlogging_to_druid_readingdepth_hourly
ExecStart=/usr/local/bin/eventlogging_to_druid_readingdepth_hourly -> here you can see actual command

Properties are at:

nuria@an-coord1001:/etc/refinery/eventlogging_to_druid$

If you need to delete data and start again

* Use coordinator console to disable datasource
* Delete datasource from deep storage by sending an http request to overlord, curl -L -X DELETE http://localhost:8081/druid/coordinator/v1/datasources/<datasource_name>/intervals/2016-01-01T00:00:00.000_2017-01-01T00:00:00.000
* Restart turnilo to refresh

How to run from Puppet

If you want to have a Druid datasource populated periodically from a Hive table, you can add a HiveToDruid job in puppet using either a refine_job or an eventlogging_to_druid_job. Go to druid_load.pp and add the corresponding call. If the Hive table belongs to the EventLogging pipeline (belongs to either 'event' or 'event_sanitized' database), you can use eventlogging_to_druid_job, like this:

# Load event.ReadingDepth
profile::analytics::refinery::job::eventlogging_to_druid_job { 'readingdepth':
    job_config => {
        dimensions    => 'event.action,event.namespaceId,event.skin,revision,wiki,event.page_issues_b_sample',
        time_measures => 'event.domInteractiveTime,event.firstPaintTime,event.totalLength,event.visibleLength',
    },
}

Once the puppet patch gets merged, you'll see data automatically appear in Druid and Turnilo.

Expected results

Once the data is loaded to Druid, you can expect to see your new datasource in Turnilo. You'll be able to use any field specified as dimension to slice and dice the graph, by drag and dropping. And the fields you specified as metrics will show at the bottom-left, and you can select them to add them to the chart. An example of datasource in Turnilo loaded using HiveToDruid is the NavigationTiming datasource.

Limitations/gotchas

  • When a HiveToDruid Puppet job is merged, it will start loading data from that exact moment. If you want to load data historically, you can execute a command line job in an-coord1001.eqiad.wmnet to backfill since the date that you need. You should take into account the segment_granularity and the num_shards parameters to tune the size of the Druid segments. Usually ~300MB per segment is a good value.
  • If the datasource you loaded to Druid is privacy sensitive, deletion rules should be setup for that datasource, so that its data gets deleted after 3 months. If the loaded data is very big, then deletion rules are also a good idea, otherwise, with time the Druid cluster will lose performance. To add deletion rules to a Druid datasource, use the Druid Coordinator console, click on the desired datasource and click edit rules. See: Druid docs.

See also