Analytics/Systems/Druid

From Wikitech
Jump to navigation Jump to search

Druid is an analytics data store, currently (as of August 2016) in experimental use for the upcoming Analytics/Data_Lake. It is comprised of many services, each of which is fully redundant.


Why Druid. Value Proposition

When looking for an analytics columnar datastore we wanted a product that could fit our use cases and scale and that in the future we could use to support real time ingestion of data. We had several alternatives: Druid, Cassandra, ElasticSearch, and of late, Clickhouse. All these are open source choices that served our use cases to different degrees.

Druid offered the best value proposition:

  • It is designed for analytics so it can handle creation of cubes with many different dimensions without having to have those precomputed (like Cassandra does)
  • It has easy loading specs and supports real time ingestion
  • It provides front caching that repeated queries benefit from (Clickhouse is designed as a fast datastore for analytics but it doesn't have a frontend cache)
  • Druid shipped also with a convenient UI to do basic exploration of data that was also open source: Pivot

Access to Druid Data via Pivot

Analytics/Systems/Pivot

Access via command line

From the stat machines you can query druid via curl. Write a valid druid query, based on this example into a file, let's say query.druid.json. Then POST it to druid like this:

curl -L -X POST "http://druid1001.eqiad.wmnet:8082/druid/v2/?pretty" -H "content-type: application/json" -d @geowiki.druid.json

Druid Administration

Naming convention

For homogeneity across systems, underscores _ should be used in datasource names and field names instead of hyphens -.

Restart an Indexing job

sudo -u hdfs oozie job \
     -Duser=$USER \
      -Dstart_time=2017-04-01T00:00Z \
      -Dstop_time=2017-06-01T00:00Z \
      -Dqueue_name=production \
      -Drefinery_directory=hdfs://analytics-hadoop$(hdfs dfs -ls -d /wmf/refinery/2017* | tail -n 1 | awk '{print $NF}') \
      -oozie $OOZIE_URL -run -config /srv/deployment/analytics/refinery/oozie/unique_devices/per_project_family/druid/daily/coordinator.properties

Delete segments from deep storage

Preparation for deletion

For segments to deleted from deep-storage, they need to be NOT in use in druid historical nodes. There are two ways this can be done:

  • A rule is defined for the datasource so that segments are automatically dropped from historical-nodes after a certain duration (this is what we do for webrequest, for instance). Notice that the datasource is not disabled in that case.
  • The datasource is disabled in the coordinator, meaning after deep-storage segments deletion, the entire datasource will be lost. To disable datasource in the coordinator (reversible, data is still present in deep-storage and can reloaded easily):
 curl -L -X DELETE http://localhost:8081/druid/coordinator/v1/datasources/DATASOURCE_NAME

Actual deletion

After either the rule is applied ot the datasource is disabled, you can hard-delete segments that are not loaded in historical node from the deep storage. Please be carefull, he step is irreversible.

curl -L -X 'POST' -H 'Content-Type:application/json' -d "{ \"type\":\"kill\", \"id\":\"kill_task-tiles-poc-`date --iso-8601=seconds`\",\"dataSource\":\"DATASOURCE_NAME\", \"interval\":\"2016-01-01/2017-10-01\" }" localhost:8090/druid/indexer/v1/task

Overlords Administration UI

Only one overlord is the active leader at any given moment. The fastest way to figure it out is to try to establish a ssh tunnel like the following to one random Druid node of the target cluster, and then check the UI via browser (using localhost as described below). If you get the wrong overlord, the UI will not show up and you'll get a redirect to the right one.

ssh -N druid100X.eqiad.wmnet -L 8090:druid100X.eqiad.wmnet:8090

http://localhost:8090/console.html

Deletion control

Check the dataset does not appear on deep storage directory on hdfs

hdfs dfs -ls /user/druid/deep-storage/<dataset>

Coordinators Administration UI

Only one coordinator is the active leader at any given moment. The fastest way to figure it out is to try to establish a ssh tunnel like the following to one random Druid node of the target cluster, and then check the UI via browser (using localhost as described below). If you get the wrong coordinator, the UI will not show up and you'll get a redirect to the right one.

ssh -N druid1003.eqiad.wmnet -L 8081:druid1003.eqiad.wmnet:8081
http://localhost:8081/#/datasources/pageviews-hourly

Indexing Logs

Located at "/var/lib/druid/indexing-logs"

Safe restart of MiddleManagers when running Real time Indexing jobs

In order to avoid confusion for the Overlord, it is nice to drain jobs from a MiddleManager before restarting via the following command:

# Generic case
curl -X POST http://hostname:8091/druid/worker/v1/disable

# Example
curl -X POST http://druid1001.eqiad.wmnet:8091/druid/worker/v1/disable

The current indexing jobs assigned to a MiddleManager can be found checking the Overlord's console via ssh tunnel + browser to localhost:8090:

# ssh tunnel to the current Overlord leader
ssh -L 8090:localhost:8090 hostname -N

# example
ssh -L 8090:localhost:8090 druid1001.eqiad.wmnet -N

Please note that the Overlord runs on every Druid node, but only one is the leader for every given moment. After the creation of the ssh tunnel (you can start from a host picked up at random in the cluster), you'll get a redirect in the browser to the current hostname of the leader (when trying to access localhost:8090) if you didn't pick the right one. This is of course a quick and dirty procedure, but it works :)

You are free to restart the MiddleManager when you see that it is not running any indexing job. This will automatically put it back into enabled state.

Full Restart of services

To restart all druid services, you must restart each service on each Druid node individually. It is best to do them one at a time, but the order does not particularly matter.

NOTE: druid-historical can take a while to restart up, as it needs to re-read indexes.

NOTE2: if you are running Real time indexing jobs, please check the above paragraph before proceeding.

# for each Druid node (druid100[123]):
service druid-broker restart
service druid-coordinator restart
service druid-historical restart
service druid-middlemanager restart
service druid-overlord restart

Bash snippet to automate the restart:

#!/bin/bash
set -x
set -e

sudo service druid-broker restart
sudo service druid-broker status
sleep 5
sudo service druid-coordinator restart
sudo service druid-coordinator status
sleep 5
sudo service druid-historical restart
sudo service druid-historical status
sleep 120 # check that historical startup finishes in /var/log/druid/historical.log
sudo service druid-middlemanager restart
sudo service druid-middlemanager status
sleep 5
sudo service druid-overlord restart
sudo service druid-overlord status

We intend to also run a dedicated Zookeeper cluster for druid on the druid nodes. For now (August 2016), druid uses the main Zookeeper cluster on conf100[123]. In the future, when the Druid nodes run Zookeeper, you may also want to restart Zookeeper on each node.

service zookeeper restart

Removing hosts/ taking hosts out of service from cluster

1. Make sure the nodes we are about to remove are not running any critical services:

  • Overlord
  • Middle Manager
  • Standalone Real-time
  • Broker
  • Coordinator

2. Remove node. Per: http://druid.io/docs/latest/design/coordinator.html

"If a historical node restarts or becomes unavailable for any reason, the Druid coordinator will notice a node has gone missing and treat all segments served by that node as being dropped. Given a sufficient period of time, the segments may be reassigned to other historical nodes in the cluster. However, each segment that is dropped is not immediately forgotten. Instead, there is a transitional data structure that stores all dropped segments with an associated lifetime..."

Wait a bit and segments should be reassigned. You can see this happening on zookeeper CLI:

Example, CLI before we took 1005 before and out of service:

Connect to zookeeper:

nuria@druid1001:/var/log/zookeeper$ /usr/share/zookeeper/bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 43] ls /druid/analytics-eqiad/segments/druid1005.eqiad.wmnet:8083
[druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.286Z_a97d8888936740238ff0754d74fb2e3e5, druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.351Z_0824bc7be2f14238a8f7c3608718eca210, druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.314Z_390390b7e7a94b9a9a5dcdec18fff8ce7, druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.410Z_4e49bdceec584f7bb8e673dcd79c58d815, druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.216Z_24505a368ccf48ddbdf40419cf73a9a51, 
[..]
druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.327Z_1b2ae0502526474aabbfbc81053126bd8, druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.375Z_bd7a60d9c16d40afaec932fc56abe8d212, druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.301Z_a7e2531e13334206b91ce6f5859343536, druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.236Z_fa21ec26754e442d9201ce5bac22d1032]
[zk: localhost:2181(CONNECTED) 44] ls /druid/analytics-eqiad/segments/druid1005.eqiad.wmnet:8083
[]

Now those segments should be loaded, if no indexing is happening the loading of segments should be visible:

curl -sL druid1003.eqiad.wmnet:8081/druid/coordinator/v1/loadqueue

Handling alarms for unavailable segments

We have alarms for both clusters related to the number of segments that the Coordinators see as unavailable, namely that should be loaded by the Historical daemons but for some reason they are not. If this happens, please check the logs in /var/log/druid/historical.log and see if anything is ongoing.

Regular indexations through Oozie

  • Regular indexations are made via oozie jobs.
  • For testing, the datasource-name to be indexed can be provided as an oozie parameter (-Ddruid_datasource=test_datasource)
  • To prevent reindexing production data, a non-hdfs user will be prevented to index datasources with name not starting with "test_"

One-off indexing data into Druid

Why

You may not want to write a whole oozie job if you're not planning to load data periodically on the datasource. An example of this was the archiving of Geowiki data. Since it was all old data, there was no need to ingest more new data in the future. Assuming you want to load some data from HDFS, you need to create an ingestion spec json file and put it in your home folder in a machine with access to the druid hosts (like stat1005).

The ingestion spec

At the refinery repository we have a few ingestion spec templates that you can use to fill the gaps. This is the one for the ingestion of the pageview_hourly datasource. The Druid documentation also has a step by step tutorial to generate your spec when loading from hadoop.

Format of your data

From experience, Druid prefers columnar data in the form of a json file formatted like this:

{"time": "2015-09-01T00:00:00Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}

{"time": "2015-09-01T01:00:00Z", "url": "/", "user": "bob", "latencyMs": 11}

{"time": "2015-09-01T01:30:00Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}

...but tsv/csv files are also fine! Make sure to specify the format, along with names of your columns, in your spec:

"parseSpec" : {
  "format" : "tsv",
  "dimensionsSpec" : {
    "dimensions" : [
      "project",
      "country",
      "cohort"
    ]
  },
  "delimiter": "\t",
  "columns": [
    "project",
    "country",
    "cohort",
    "month",
    "count",
    "ts"
  ],
  "timestampSpec" : {
    "format" : "auto",
    "column" : YOUR_TIME_COLUMN
  }
}

Segments

Druid uses your dataset's time dimension to partition the data into segments. You should specify in the ingestion spec the timespan of your dataset, as well as the granularity of your data:

"granularitySpec" : {
  "type" : "uniform",
  "segmentGranularity" : "month",
  "queryGranularity" : "none",
  "intervals" : ["START_DATE/END_DATE"] // The / slash to separate intervals is important
}

In the parseSpec mentioned above you can detail the format that your time column uses, but unless you're using non-standard timestamps, you can leave the format as auto, but remember to specify the name of the column.

Sending the indexation task

POSTing your spec

To initialise the ingestion you need to POST the json spec you just created to a druid overlord. Use this curl command if you want:

curl -L -X 'POST' -H 'Content-Type:application/json' -d @YOUR_INGESTION_SPEC.json http://druid1003.eqiad.wmnet:8090/druid/indexer/v1/task --dump-header -


You might need to use :

unset http_proxy && curl <blah> 

If you are running these commands from stats machines

The `--dump-header -` argument helps you make sure that your request was responded with a 200 OK code. If you get a different code, you may want to check that your JSON is validated, since usually the task fails after it has been accepted by the overlord. If you have a 200, you may monitor your task with the overlord console.

Other example:

curl -X POST -H "content-type: application/json" -d '
{
 "filter": {
   "field": {
     "value": "Unknown",
     "type": "selector",
     "dimension": "continent"
   },
   "type": "not"
 },
 "granularity": "all",
 "postAggregations": [],
 "metric": "count",
 "intervals": "2018-06-17T00:00:00+00:00/2018-07-17T19:27:53+00:00",
 "queryType": "topN",
 "dimension": "continent",
 "dataSource": "pageviews_daily",
 "threshold": 50000,
 "aggregations": [
   {
     "type": "count",
     "name": "count"
   }
 ]
}
' 'http://druid1001.eqiad.wmnet:8082/druid/v2/?pretty'

Monitoring the task

requires 

To access the overlord console, from your local machine create an SSH tunnel pointing at the overlord

ssh -N druid1001.eqiad.wmnet -L 8081:druid1002.eqiad.wmnet:8081

Enter localhost:8081 and check out the indexing tab. Your task should be listed there. It's recommended to follow the log link the moment you see it in the console, since the task disappears the moment it's either successful or fails.

If the task fails and the log link from the overlord console says nothing (which happens when no data has started loading), don't despair: that doesn't mean no log has been produced. Log into the druid host you used to send the task (in the case above, druid1003).

ssh druid1003.eqiad.wmnet
ls /var/lib/druid/indexing-logs

Your task should be logged there with a timestamp in the title. If it's not, there's still hope: a druid middle manager may have moved your task to a different druid host. You can find out which one in /var/log/druid/middlemanager.log. Or you can just SSH into the other two hosts (druid1001 and druid1002 and it'll probably be there).

Troubleshooting

The Druid documentation has a pretty nice ingestion troubleshooting section, so give that a shot. If everything fails, dig in the logs of the overlords (/var/log/druid), since there will be definitely information about your task there.