Data Engineering/Systems/Druid

From Wikitech

Druid is an analytics data store, which WMF began to use in 2016, first for the Data Lake. It is comprised of many services, each of which is fully redundant.

Why Druid. Value Proposition

When looking for an analytics columnar datastore we wanted a product that could fit our use cases and scale and that in the future we could use to support real time ingestion of data. We had several alternatives: Druid, Cassandra, ElasticSearch, and of late, Clickhouse. All these are open source choices that served our use cases to different degrees.

Druid offered the best value proposition:

  • It is designed for analytics so it can handle creation of cubes with many different dimensions without having to have those precomputed (like Cassandra does)
  • It has easy loading specs and supports real time ingestion
  • It provides front caching that repeated queries benefit from (Clickhouse is designed as a fast datastore for analytics but it doesn't have a frontend cache)
  • Druid shipped also with a convenient UI to do basic exploration of data that was also open source: Pivot (since replaced by Turnilo)

Access to Druid data via Turnilo

See Analytics/Systems/Turnilo-Pivot

Access to Druid data via Superset

See Analytics/Systems/Superset

Access via command line

From the stat machines you can query druid via curl. Write a valid druid query, based on this example into a file, let's say query.druid.json. Then POST it to druid like this:

curl -L -X POST "http://an-druid1001.eqiad.wmnet:8082/druid/v2/?pretty" -H "content-type: application/json" -d @geowiki.druid.json

Ingesting EventLogging data

See Analytics/Systems/EventLogging/Schema Guidelines

Druid Administration

Send commands to Druid

This is an example of how to query druid once you have ssh-ed into one of the hosts:

curl -X POST  'http://localhost:8082/druid/v2/?pretty'  -H 'Content-Type:application/json' -H 'Accept:application/json' -d '{
     "queryType": "timeseries",
     "dataSource": "wmf_netflow",
     "intervals": "2019-09-05T19:11Z/2019-09-06T19:11Z",
     "granularity": "all",
     "aggregations": [
       {
         "name": "__VALUE__",
         "type": "longSum",
         "fieldName": "bytes"
       }
     ]
   }'

Naming convention

For homogeneity across systems, underscores _ should be used in datasource names and field names instead of hyphens -.

Restart an Indexing job

sudo -u hdfs oozie job \
     -Duser=$USER \
      -Dstart_time=2017-04-01T00:00Z \
      -Dstop_time=2017-06-01T00:00Z \
      -Dqueue_name=production \
      -Drefinery_directory=hdfs://analytics-hadoop$(hdfs dfs -ls -d /wmf/refinery/2017* | tail -n 1 | awk '{print $NF}') \
      -oozie $OOZIE_URL -run -config /srv/deployment/analytics/refinery/oozie/unique_devices/per_project_family/druid/daily/coordinator.properties

Delete segments

In order to delete segments in druid the dataset needs to be disabled first, disabling a dataset can be done through the coordinator UI. In the upper right corner there is a link to the "old coordinator UI", click there and there is a link to delete segments given a time interval. Intervals have to be spec-ed out in this format: "2019-09-05T19:11Z/2019-09-06T19:11Z",

Delete segments from deep storage

Preparation for deletion

For segments to deleted from deep-storage, they need to be NOT in use in druid historical nodes. There are two ways this can be done:

  • A rule is defined for the datasource so that segments are automatically dropped from historical-nodes after a certain duration (this is what we do for webrequest, for instance). The "rule" is visible using druid admin UI. Notice that the datasource is not disabled in that case.
  • The datasource is disabled in the coordinator, meaning after deep-storage segments deletion, the entire datasource will be lost. To disable datasource in the coordinator (reversible, data is still present in deep-storage and can reloaded easily):
 curl -L -X DELETE http://localhost:8081/druid/coordinator/v1/datasources/DATASOURCE_NAME

If you don't have access to the host, use any of the druid hostnames instead of localhost.

Actual deletion of deep storage segments

After either the rule is applied to the datasource is disabled, you can hard-delete segments that are not loaded in historical node from the deep storage. Please be careful, he step is irreversible. There are two documented ways of deleting segments at the hdfs level. The first one involves sending manuially a kill task to the overlord for every interval:

curl -L -X 'POST' -H 'Content-Type:application/json' -d "{ \"type\":\"kill\", \"id\":\"kill_task-tiles-poc-`date --iso-8601=seconds`\",\"dataSource\":\"DATASOURCE_NAME\", \"interval\":\"2016-01-01/2017-10-01\" }" localhost:8090/druid/indexer/v1/task
Don't delete data on HDFS manually - it would make it very complicated to clean up druid afterward.

Overlords Administration UI

Don't use this UI anymore, it works but since Druid 0.19 is it incomplete. Use the coordinator's UI instead (even to check indexation jobs).

Only one overlord is the active leader at any given moment. The fastest way to figure it out is to try to establish a ssh tunnel like the following to one random Druid node of the target cluster, and then check the UI via browser (using localhost as described below). If you get the wrong overlord, the UI will not show up and you'll get a redirect to the right one.

ssh -N an-druid100X.eqiad.wmnet -L 8090:an-druid100X.eqiad.wmnet:8090

http://localhost:8090/console.html

Deletion control

Check the dataset does not appear on deep storage directory on hdfs

hdfs dfs -ls /user/druid/deep-storage/<dataset>

Have in mind that data can be in deep storage but not be showing up in druid as "rules" on druid admin UI might have disabled data entirely

Coordinators Administration UI

Process to connect (explanation below):

ssh -N an-druid1001.eqiad.wmnet -L 8081:localhost:8081

Check http://localhost:8081/, if it works right away, great. Otherwise, it will redirect you to an-druidXXXX.eqiad.wmnet:8081. Use that to re-tunnel:

ssh -N an-druidXXXX.eqiad.wmnet -L 8081:localhost:8081

This is because only one coordinator is the active leader at any given moment. Guessing like this is probably the easiest way to find the active leader, but you can also do it by ssh-ing into the cluster.

Indexing Logs

Located at "/srv/druid/indexing-logs"

Safe restart of MiddleManagers when running Real time Indexing jobs

In order to avoid confusion for the Overlord, it is nice to drain jobs from a MiddleManager before restarting via the following command:

# Generic case
curl -X POST http://hostname:8091/druid/worker/v1/disable

# Example
curl -X POST http://an-druid1001.eqiad.wmnet:8091/druid/worker/v1/disable

The current indexing jobs assigned to a MiddleManager can be found checking the Overlord's console via ssh tunnel + browser to localhost:8090:

# ssh tunnel to the current Overlord leader
ssh -L 8090:localhost:8090 hostname -N

# example
ssh -L 8090:localhost:8090 an-druid1001.eqiad.wmnet -N

Please note that the Overlord runs on every Druid node, but only one is the leader for every given moment. After the creation of the ssh tunnel (you can start from a host picked up at random in the cluster), you'll get a redirect in the browser to the current hostname of the leader (when trying to access localhost:8090) if you didn't pick the right one. This is of course a quick and dirty procedure, but it works :)

You are free to restart the MiddleManager when you see that it is not running any indexing job. This will automatically put it back into enabled state.

Full Restart of services

To restart all druid services, you must restart each service on each Druid node individually. It is best to do them one at a time, but the order does not particularly matter.

NOTE: druid-historical can take a while to restart up, as it needs to re-read indexes.

NOTE2: if you are running Real time indexing jobs, please check the above paragraph before proceeding.

# for each Druid node (an-druid100[1-5]):
service druid-broker restart
service druid-coordinator restart
service druid-historical restart
service druid-middlemanager restart
service druid-overlord restart

Bash snippet to automate the restart:

#!/bin/bash
set -x
set -e

sudo service druid-broker restart
sudo service druid-broker status
sleep 5
sudo service druid-coordinator restart
sudo service druid-coordinator status
sleep 5
sudo service druid-historical restart
sudo service druid-historical status
sleep 120 # check that historical startup finishes in /var/log/druid/historical.log
sudo service druid-middlemanager restart
sudo service druid-middlemanager status
sleep 5
sudo service druid-overlord restart
sudo service druid-overlord status

We also co-locate a Zookeeper ensemble with Druid. At the moment the Zookeeper servers run on on nodes: an-druid100[1-3].

service zookeeper restart

Removing hosts/ taking hosts out of service from cluster

Selecting coordinator dynamic configuration
Selecting coordinator dynamic configuration
Decommissioning druid historical nodes
Decommissioning druid historical nodes

There are two ways to remove Druid services from an active cluster before turning off the services.

1: Drain the middlemanager that you wish to stop. e.g. SSH to the host and run:

curl -X POST http://localhost:8091/druid/worker/v1/disable

2: Use the Dynamic Configuration API of the coordinator to set nodes into decommissioningNodes mode.

This is possible to do by sending a POST request to http://localhost:8081/druid/coordinator/v1/config and this is documented at Apache's site.

However, the Druid documentation recommends using the coordinator web interface for setting dynamic configuration parameters.

From the top-right corner cog menu, select Coordinator Dynamic Config

Once the historical disk cache is drained, the middlemanager is not running any jobs, and the overlord is not targeted by any scheduled jobs, it is safe to stop the services.

Handling alarms for unavailable segments

We have alarms for both clusters related to the number of segments that the Coordinators see as unavailable, namely that should be loaded by the Historical daemons but for some reason they are not. If this happens, please check the logs in /var/log/druid/historical.log and see if anything is ongoing.

Regular indexations through Airflow

  • Regular indexations are made via airflow jobs (example)

One-off indexing data into Druid

Why

You may not want to write a whole airflow job if you're not planning to load data periodically on the datasource. An example of this was the archiving of Geowiki data. Since it was all old data, there was no need to ingest more new data in the future. Assuming you want to load some data from HDFS, you need to create an ingestion spec json file and put it in your home folder in a machine with access to the druid hosts (like stat1007).

The ingestion spec

At the refinery repository we have a few ingestion spec templates that you can use to fill the gaps. This is the one for the ingestion of the pageview_hourly datasource. The Druid documentation also has a step by step tutorial to generate your spec when loading from hadoop.

Format of your data

From experience, Druid prefers columnar data in the form of a json file formatted like this:

{"time": "2015-09-01T00:00:00Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}

{"time": "2015-09-01T01:00:00Z", "url": "/", "user": "bob", "latencyMs": 11}

{"time": "2015-09-01T01:30:00Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}

...but tsv/csv files are also fine! Make sure to specify the format, along with names of your columns, in your spec:

"parseSpec" : {
  "format" : "tsv",
  "dimensionsSpec" : {
    "dimensions" : [
      "project",
      "country",
      "cohort"
    ]
  },
  "delimiter": "\t",
  "columns": [
    "project",
    "country",
    "cohort",
    "month",
    "count",
    "ts"
  ],
  "timestampSpec" : {
    "format" : "auto",
    "column" : YOUR_TIME_COLUMN
  }
}

Segments

Druid uses your dataset's time dimension to partition the data into segments. You should specify in the ingestion spec the timespan of your dataset, as well as the granularity of your data:

"granularitySpec" : {
  "type" : "uniform",
  "segmentGranularity" : "month",
  "queryGranularity" : "none",
  "intervals" : ["START_DATE/END_DATE"] // The / slash to separate intervals is important
}

In the parseSpec mentioned above you can detail the format that your time column uses, but unless you're using non-standard timestamps, you can leave the format as auto, but remember to specify the name of the column.

Sending the indexation task

POSTing your spec

To initialize the ingestion you need to POST the json spec you just created to a druid overlord. Use this curl command if you want:

curl -L -X 'POST' -H 'Content-Type:application/json' -d @YOUR_INGESTION_SPEC.json http://druid1003.eqiad.wmnet:8090/druid/indexer/v1/task --dump-header -

You might need to use :

unset http_proxy && curl <blah> 

If you are running these commands from stats machines

The `--dump-header -` argument helps you make sure that your request was responded with a 200 OK code. If you get a different code, you may want to check that your JSON is validated, since usually the task fails after it has been accepted by the overlord. If you have a 200, you may monitor your task with the overlord console.

Other example:

curl -X POST -H "content-type: application/json" -d '
{
 "filter": {
   "field": {
     "value": "Unknown",
     "type": "selector",
     "dimension": "continent"
   },
   "type": "not"
 },
 "granularity": "all",
 "postAggregations": [],
 "metric": "count",
 "intervals": "2018-06-17T00:00:00+00:00/2018-07-17T19:27:53+00:00",
 "queryType": "topN",
 "dimension": "continent",
 "dataSource": "pageviews_daily",
 "threshold": 50000,
 "aggregations": [
   {
     "type": "count",
     "name": "count"
   }
 ]
}
' 'http://an-druid1001.eqiad.wmnet:8082/druid/v2/?pretty'

Monitoring the task

To access the overlord console, from your local machine create an SSH tunnel pointing at the overlord

ssh -N an-druid1001.eqiad.wmnet -L 8081:an-druid1002.eqiad.wmnet:8081

Enter localhost:8081 and check out the indexing tab. Your task should be listed there. It's recommended to follow the log link the moment you see it in the console, since the task disappears the moment it's either successful or fails.

If the task fails and the log link from the overlord console says nothing (which happens when no data has started loading), don't despair: that doesn't mean no log has been produced. Log into the druid host you used to send the task (in the case above, druid1003).

ssh an-druid1003.eqiad.wmnet
ls /var/lib/druid/indexing-logs

Your task should be logged there with a timestamp in the title. If it's not, there's still hope: a druid middle manager may have moved your task to a different druid host. You can find out which one in /var/log/druid/middlemanager.log. Or you can just SSH into any of the other four hosts (currently the hosts are: an-druid100[1-5]) and it'll probably be there.

Troubleshooting

The Druid documentation has a pretty nice ingestion troubleshooting section, so give that a shot. If everything fails, dig in the logs of the overlords (/var/log/druid), since there will be definitely information about your task there.

Realtime indexation to Druid

Druid offers Kafka Supervisors to allow real time indexation from Kafka to Druid. This is particularly useful for datasets like wmf_netflow (and in the past also banner impressions).

The json specs for our current supported jobs are stored in refinery under druid/kafka. To kick off the real time indexation it is sufficient to send a POST to the Druid overlord like the following:

curl -L -X POST -H 'Content-Type: application/json' -d '{
  "type": "kafka",
  [..cut..]
  }
}' http://an-druid1001.eqiad.wmnet:8090/druid/indexer/v1/supervisor

Or even simpler:

curl -L -X POST -H 'Content-Type: application/json' -d @PATH_OF_THE_SPEC.json http://an-druid1001.eqiad.wmnet:8090/druid/indexer/v1/supervisor

If you want to update an existing running supervisor, it is sufficient to POST the updated JSON spec to the overlord. It will take care of stopping the current one and re-launching the new one.

See also