Analytics/Systems/Druid

From Wikitech
Jump to: navigation, search

Druid is an analytics data store, currently (as of August 2016) in experimental use for the upcoming Analytics/Data_Lake. It is comprised of many services, each of which is fully redundant.

The Analytics team is using a nodejs Web UI application called Pivot as experimental tool to explore Druid's data.

Why Druid. Value Proposition

When looking for an analytics columnar datastore we wanted a product that could fit our use cases and scale and that in the future we could use to support real time ingestion of data. We had several alternatives: Druid, Cassandra, ElasticSearch, and of late, Clickhouse. All these are open source choices that served our use cases to different degrees.

Druid offered the best value proposition:

  • It is designed for analytics so it can handle creation of cubes with many different dimensions without having to have those precomputed (like Cassandra does)
  • It has easy loading specs and supports real time ingestion
  • It provides front caching that repeated queries benefit from (Clickhouse is designed as a fast datastore for analytics but it doesn't have a frontend cache)
  • Druid shipped also with a convenient UI to do basic exploration of data that was also open source: Pivot

Access to Druid Data via Pivot

Analytics/Systems/Pivot

Access via command line

From the stat machines you can query druid via curl. Write a valid druid query, based on this example into a file, let's say query.druid.json. Then POST it to druid like this:

curl -X POST "http://druid1001.eqiad.wmnet:8082/druid/v2/?pretty" -H "content-type: application/json" -d @geowiki.druid.json

Druid Administration

Naming convention

For homogeneity across systems, underscores _ should be used in datasource names and field names instead of hyphens -.

Restart an Indexing job

sudo -u hdfs oozie job \
     -Duser=$USER \
      -Dstart_time=2017-04-01T00:00Z \
      -Dstop_time=2017-06-01T00:00Z \
      -Dqueue_name=production \
      -Drefinery_directory=hdfs://analytics-hadoop$(hdfs dfs -ls -d /wmf/refinery/2017* | tail -n 1 | awk '{print $NF}') \
      -oozie $OOZIE_URL -run -config /srv/deployment/analytics/refinery/oozie/unique_devices/per_project_family/druid/daily/coordinator.properties

Delete segments from deep storage

Preparation for deletion

For segments to deleted from deep-storage, they need to be NOT in use in druid historical nodes. There are two ways this can be done:

  • A rule is defined for the datasource so that segments are automatically dropped from historical-nodes after a certain duration (this is what we do for webrequest, for instance). Notice that the datasource is not disabled in that case.
  • The datasource is disabled in the coordinator, meaning after deep-storage segments deletion, the entire datasource will be lost. To disable datasource in the coordinator (reversible, data is still present in deep-storage and can reloaded easily):
 curl -L -X DELETE http://localhost:8081/druid/coordinator/v1/datasources/DATASOURCE_NAME

Actual deletion

After either the rule is applied ot the datasource is disabled, you can hard-delete segments that are not loaded in historical node from the deep storage. Please be carefull, he step is irreversible.

curl -X 'POST' -H 'Content-Type:application/json' -d "{ \"type\":\"kill\", \"id\":\"kill_task-tiles-poc-`date --iso-8601=seconds`\",\"dataSource\":\"DATASOURCE_NAME\", \"interval\":\"2016-01-01/2017-10-01\" }" localhost:8090/druid/indexer/v1/task

Overlords Administration UI

Only one overlord is the active leader at any given moment. The fastest way to figure it out is to try to establish a ssh tunnel like the following to one random Druid node of the target cluster, and then check the UI via browser (using localhost as described below). If you get the wrong overlord, the UI will not show up and you'll get a redirect to the right one.

ssh -N druid100X.eqiad.wmnet -L 8090:druid100X.eqiad.wmnet:8090

http://localhost:8090/console.html

Deletion control

Check the dataset does not appear on deep storage directory on hdfs

hdfs dfs -ls /user/druid/deep-storage/<dataset>

Coordinators Administration UI

Only one coordinator is the active leader at any given moment. The fastest way to figure it out is to try to establish a ssh tunnel like the following to one random Druid node of the target cluster, and then check the UI via browser (using localhost as described below). If you get the wrong coordinator, the UI will not show up and you'll get a redirect to the right one.

ssh -N druid1003.eqiad.wmnet -L 8081:druid1003.eqiad.wmnet:8081
http://localhost:8081/#/datasources/pageviews-hourly

Indexing Logs

Located at "/var/lib/druid/indexing-logs"

Safe restart of MiddleManagers when running Real time Indexing jobs

In order to avoid confusion for the Overlord, it is nice to drain jobs from a MiddleManager before restarting via the following command:

# Generic case
curl -X POST http://hostname:8091/druid/worker/v1/disable

# Example
curl -X POST http://druid1001.eqiad.wmnet:8091/druid/worker/v1/disable

The current indexing jobs assigned to a MiddleManager can be found checking the Overlord's console via ssh tunnel + browser to localhost:8090:

# ssh tunnel to the current Overlord leader
ssh -L 8090:localhost:8090 hostname -N

# example
ssh -L 8090:localhost:8090 druid1001.eqiad.wmnet -N

Please note that the Overlord runs on every Druid node, but only one is the leader for every given moment. After the creation of the ssh tunnel (you can start from a host picked up at random in the cluster), you'll get a redirect in the browser to the current hostname of the leader (when trying to access localhost:8090) if you didn't pick the right one. This is of course a quick and dirty procedure, but it works :)

You are free to restart the MiddleManager when you see that it is not running any indexing job. This will automatically put it back into enabled state.

Full Restart of services

To restart all druid services, you must restart each service on each Druid node individually. It is best to do them one at a time, but the order does not particularly matter.

NOTE: druid-historical can take a while to restart up, as it needs to re-read indexes.

NOTE2: if you are running Real time indexing jobs, please check the above paragraph before proceeding.

# for each Druid node (druid100[123]):
service druid-broker restart
service druid-coordinator restart
service druid-historical restart
service druid-middlemanager restart
service druid-overlord restart

Bash snippet to automate the restart:

#!/bin/bash
set -x
set -e

sudo service druid-broker restart
sudo service druid-broker status
sleep 5
sudo service druid-coordinator restart
sudo service druid-coordinator status
sleep 5
sudo service druid-historical restart
sudo service druid-historical status
sleep 120 # check that historical startup finishes in /var/log/druid/historical.log
sudo service druid-middlemanager restart
sudo service druid-middlemanager status
sleep 5
sudo service druid-overlord restart
sudo service druid-overlord status

We intend to also run a dedicated Zookeeper cluster for druid on the druid nodes. For now (August 2016), druid uses the main Zookeeper cluster on conf100[123]. In the future, when the Druid nodes run Zookeeper, you may also want to restart Zookeeper on each node.

service zookeeper restart

Removing hosts/ taking hosts out of service from cluster

1. Make sure the nodes we are about to remove are not running any critical services:

  • Overlord
  • Middle Manager
  • Standalone Real-time
  • Broker
  • Coordinator

2. Remove node. Per: http://druid.io/docs/latest/design/coordinator.html

"If a historical node restarts or becomes unavailable for any reason, the Druid coordinator will notice a node has gone missing and treat all segments served by that node as being dropped. Given a sufficient period of time, the segments may be reassigned to other historical nodes in the cluster. However, each segment that is dropped is not immediately forgotten. Instead, there is a transitional data structure that stores all dropped segments with an associated lifetime..."

Wait a bit and segments should be reassigned. You can see this happening on zookeeper CLI:

Example, CLI before we took 1005 before and out of service:

Connect to zookeeper:

nuria@druid1001:/var/log/zookeeper$ /usr/share/zookeeper/bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 43] ls /druid/analytics-eqiad/segments/druid1005.eqiad.wmnet:8083
[druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.286Z_a97d8888936740238ff0754d74fb2e3e5, druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.351Z_0824bc7be2f14238a8f7c3608718eca210, druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.314Z_390390b7e7a94b9a9a5dcdec18fff8ce7, druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.410Z_4e49bdceec584f7bb8e673dcd79c58d815, druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.216Z_24505a368ccf48ddbdf40419cf73a9a51, 
[..]
druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.327Z_1b2ae0502526474aabbfbc81053126bd8, druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.375Z_bd7a60d9c16d40afaec932fc56abe8d212, druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.301Z_a7e2531e13334206b91ce6f5859343536, druid1005.eqiad.wmnet:8083_historical__default_tier_2017-10-06T09:32:10.236Z_fa21ec26754e442d9201ce5bac22d1032]
[zk: localhost:2181(CONNECTED) 44] ls /druid/analytics-eqiad/segments/druid1005.eqiad.wmnet:8083
[]

Now those segments should be loaded, if no indexing is happening the loading of segments should be visible:

curl -sL druid1003.eqiad.wmnet:8081/druid/coordinator/v1/loadqueue

Handling alarms for unavailable segments

We have alarms for both clusters related to the number of segments that the Coordinators see as unavailable, namely that should be loaded by the Historical daemons but for some reason they are not. If this happens, please check the logs in /var/log/druid/historical.log and see if anything is ongoing.