Analytics/Systems/Cluster/Ingestion Pipeline

From Wikitech
Jump to: navigation, search

Note: ETL might not be the proper name for this process.

The Analytics cluster ingests several sources of data and transforms the data into a query-able data warehouse. This documents describes the ETL / ingestion process of these different data sources.


Webrequest

Initial Tests

Initial tests of the webrequest ETL phase was done using 2 test Kafka brokers, 2 test varnishkafka producers, Camus, the production Hadoop cluster, and Hive.

varnishkafka runs on cp1048 in eqiad and cp3003 in esams producing to Kafka brokers on analytics1003 and analytics1004 in eqiad. varnishkafka is configured to produce to a topic with 6 partitions (analytics1003 and analytics1004 each have 6 Kafka log disks) and 2 replicas, partition keyed by producer hostname. Using a varnish node's hostname as the partition key ensures that all webrequest logs from that host go to the same Kafka partition. As long as the number of varnishkafka producers is greater than or equal to the number of Kafka partitions, and traffic is relatively evenly spread between varnishes, the use of a non-random partition key shouldn't make a difference. Keeping all of single node's logs in the same partition makes it easier to verify that messages have been consumed properly, as they will usually be consumed in order.

Camus is used to import data from Kafka into HDFS and bucket based on timestamps. Hive is then used to create tables and date bucketed Hive partitions on top of the imported data. We then run Hive queries on this data to check for any missing sequence numbers. Standardized data was loaded in order to verify the Hive queries used. 100000 records for which no messages were missing was loaded, and the Hive queries reported 0 missing messages. Then, 10000 lines from this data was randomly removed. The queries correctly reported 10000 missing messages.

varnishkafka.conf

 kafka create topic --topic varnish6 --partition 6 --replica 2
######################################################################
#								     #
#		varnishkafka configuration file		      	     #
#								     #
#								     #
######################################################################
#								     #
# Format:							     #
# <property-name> = <value>					     #
#								     #
# boolean properties:						     #
#   >0, "true", "yes", "on" - interpreted as true		     #
#  everything else          - interpreted as false		     #
#								     #
######################################################################

# Where to output varnish log lines:
#  kafka  - (default) send to kafka broker
#  stdout - just print to stdout (behave like varnishncsa)
output = kafka

# Varnish log formatting
#format = %l	%n	%t	%{Varnish:time_firstbyte}x	%h	%{Varnish:handling}x/%s	%b	%m	http://%{Host}i%U%q	-	%{Content-Type}o	%{Referer}i	%{X-Forwarded-For}i	%{User-agent}i	%{Accept-Language}i
#format = %l	%n	%t	%{Varnish:time_firstbyte}x	%h	%{Varnish:handling}x/%s	%b	%m	http://%{Host}i%U%q	-	%{Content-Type}o	%{Referer}i
format.type = json
format = %{@hostname}l %{@sequence!num?0}n %{%FT%T@dt}t %{Varnish:time_firstbyte@time_firstbyte!num?0}x %{@ip}h %{Varnish:handling@cache_status}x %{@http_status}s %{@response_size!num?0}b %{@http_method}m %{Host@uri_host}i %{@uri_path}U %{@uri_query}q %{Content-Type@content_type}o %{Referer@referer}i %{X-Forwarded-For@x_forwarded_for}i %{User-Agent@user_agent}i %{Accept-Language@accept_language}i %{X-Analytics@x_analytics}o


# Start for sequence number (%n)
# Either a number, or the string "time" which will set it to the current
# unix time in seconds multiplied by 1,000,000.
# Defaults to 0.
#sequence.number = 0

#
# varnishkafka log messages configuration
# Debugging, error reporting, etc, not to be confused with varnish logs.
#

# varnishkafka log level (1 = emergencies .. 7 = debug)
log.level = 6

# specify log output
log.stderr = false
log.syslog = true

# daemonize varnishkafka (boolean)
daemonize = false

######################################################################
#								     #
# Standard varnish VSL command line arguments			     #
#								     #
# Format:                                                            #
#  varnish.arg.<c> = <value>, where <c> is a command line option.    #
#								     #
# See varnishncsa(1) and varnishlog(1) for valid options.	     #
#								     #
######################################################################

# -m tag:regex
varnish.arg.m = RxRequest:^(?!PURGE$)

# Examples:
# -C: ignore case when matching regex
# Non-value arguments need a dummy value to pass parsing, such as 'true'.
#varnish.arg.C = true

# -n: varnishd instance to get logs from.
varnish.arg.n = frontend


######################################################################
#								     #
# Kafka configuration                                                #
#								     #
# For the full range of Kafka handle and topic configuration         #
# properties, see:						     #
#  https://github.com/edenhill/librdkafka/blob/0.8-wip/rdkafka.h     #
#								     #
# And the Apache Kafka configuration reference:			     #
#  http://kafka.apache.org/08/configuration.html                     #
#								     #
######################################################################

# Initial list of kafka brokers
metadata.broker.list = analytics1003.wikimedia.org:9092,analytics1004.wikimedia.org:9092

# Maximum number of messages allowed on the local producer queue
queue.buffering.max.messages = 1000000

# Maximum number of retries per messageset.
message.send.max.retries = 3


#
# Topic configuration
#

# Topic to produce messages to
topic = varnish2

# Partition (-1: random, else one of the available partitions)
# partition on machine hostname: %l
partition = %l


# Required number of acks
topic.request.required.acks = 1

# Local message timeout (milliseconds)
topic.message.timeout.ms = 60000

compression.codec = snappy

camus.properties

 hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P ~/camus.properties


# final top-level data output directory, sub-directory will be dynamically created for each topic pulled
etl.destination.path=hdfs://kraken/wmf/raw/webrequest/test/data
# HDFS location where you want to keep execution files, i.e. offsets, error logs, and count files
etl.execution.base.path=hdfs://kraken/wmf/raw/webrequest/test/camus
# where completed Camus job output directories are kept, usually a sub-dir in the base.path
etl.execution.history.path=hdfs://kraken/wmf/raw/webrequest/test/camus/history

# Concrete implementation of the Decoder class to use
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder

# Our timestamps look like 2013-09-20T15:40:17
camus.message.timestamp.format=yyyy-MM-dd'T'HH:mm:ss

# use the dt field
camus.message.timestamp.field=dt

# RawRecordWriterProvider does no reformatting of the records as they come in.
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider

etl.output.record.delimiter=\n

# Used by the committer to arrange .avro files into a partitioned scheme. This will be the default partitioner for all
# topic that do not have a partitioner specified
#etl.partitioner.class=com.linkedin.camus.etl.kafka.coders.DefaultPartitioner


# max hadoop tasks to use, each task can pull multiple topic partitions
mapred.map.tasks=6
# max historical time that will be pulled from each partition based on event timestamp
kafka.max.pull.hrs=168
# events with a timestamp older than this will be discarded.
kafka.max.historical.days=7
# Max minutes for each mapper to pull messages (-1 means no limit)
kafka.max.pull.minutes.per.task=-1

# if whitelist has values, only whitelisted topic are pulled.  nothing on the blacklist is pulled
kafka.blacklist.topics=
kafka.whitelist.topics=varnish2

# Name of the client as seen by kafka
kafka.client.name=camus-1
# Fetch Request Parameters
#kafka.fetch.buffer.size=
#kafka.fetch.request.correlationid=
#kafka.fetch.request.max.wait=
#kafka.fetch.request.min.bytes=
# Connection parameters. Usually a VIP
kafka.host.url=analytics1003.wikimedia.org
kafka.host.port=9092
#kafka.timeout.value=


#Stops the mapper from getting inundated with Decoder exceptions for the same topic
#Default value is set to 10
max.decoder.exceptions.to.print=5

#Controls the submitting of counts to Kafka
#Default value set to true
post.tracking.counts.to.kafka=true

log4j.configuration=false

# everything below this point can be ignored for the time being, will provide more documentation down the road
##########################
etl.run.tracking.post=false
#kafka.monitor.tier=
#etl.counts.path=
kafka.monitor.time.granularity=10

etl.hourly=hourly
etl.daily=daily
etl.ignore.schema.errors=false

# configure output compression for deflate or snappy. Defaults to deflate
etl.output.codec=deflate
etl.deflate.level=6
#etl.output.codec=snappy

etl.default.timezone=UTC
etl.output.file.time.partition.mins=60
etl.keep.count.files=false
etl.execution.history.max.of.quota=.8

mapred.output.compress=true
mapred.map.max.attempts=1

kafka.client.buffer.size=20971520
kafka.client.so.timeout=60000

#zookeeper.session.timeout=
#zookeeper.connection.timeout=

Hive queries

create database test;
use test;

ADD JAR /home/otto/hive-serdes-1.0-SNAPSHOT.jar

CREATE EXTERNAL TABLE varnish2(
  hostname string COMMENT 'from deserializer',
  sequence int COMMENT 'from deserializer',
  dt string COMMENT 'from deserializer',
  time_firstbyte double COMMENT 'from deserializer',
  ip string COMMENT 'from deserializer',
  cache_status string COMMENT 'from deserializer',
  http_status string COMMENT 'from deserializer',
  response_size int COMMENT 'from deserializer',
  http_method string COMMENT 'from deserializer',
  uri_host string COMMENT 'from deserializer',
  uri_path string COMMENT 'from deserializer',
  uri_query string COMMENT 'from deserializer',
  content_type string COMMENT 'from deserializer',
  referer string COMMENT 'from deserializer',
  x_forwarded_for string COMMENT 'from deserializer',
  user_agent string COMMENT 'from deserializer',
  accept_language string COMMENT 'from deserializer',
  x_analytics string COMMENT 'from deserializer')
PARTITIONED BY (
  year int,
  month int,
  day int,
  hour int)
ROW FORMAT SERDE
  'com.cloudera.hive.serde.JSONSerDe'
LOCATION
  '/wmf/raw/webrequest/test/data/varnish2/hourly'
;

hive-partitioner is used to automatically create Hive partitions on top of time bucketed HDFS data.

 hive-partitioner -o '--auxpath /path/to/hive-serdes-1.0-SNAPSHOT.jar' --database test /wmf/raw/webrequest/test/data
 ...
 2013-10-21T15:01:37 INFO   Adding 76 partitions to table varnish2.


The queries here found here in this gist can be used to look for missing sequence numbers. Note that these queries will only work where there has been no reset of sequence counters (i.e. varnishkafka has not been restarted since it started writing to this topic). Note also that there is currently (as of 2013-10-21) a possible bug in varnishkafka where a sequence number of 0 is set if there is any kind of error when parsing a field. Magnus is looking into this.


Results

I ran the following query on 4 days of data from 2 varnishkafka producers.

set start_day=17;
set end_day=20;
set table_name=varnish2;
 
 select M.hostname,
        M.lower_seq,
        M.upper_seq,
        M.total_seqs,
        (M.expected_seqs - M.total_seqs) as missing_seqs,
        (M.expected_seqs - M.total_seqs) / M.expected_seqs as average_loss
   from (select T.hostname,
                min(T.sequence) as lower_seq,
                max(T.sequence) as upper_seq,
                max(T.sequence) - min(T.sequence) + 1 as expected_seqs,
                count(*)        as total_seqs
           from ${hiveconf:table_name} T
          where T.YEAR = 2013
            and T.DAY  between ${hiveconf:start_day} and ${hiveconf:end_day}
            and T.sequence <> 0
          group by T.hostname
        ) M
;

And got the following results:

hostname Lower Sequence Upper Sequence Total Messages Missing Messages Average Loss
cp3003.esams.wikimedia.org 422595396 1439811844 1017216444 5 4.915374702124975E-9
cp1048.eqiad.wmnet 489463468 1659747077 1170283607 3 2.5634811718844804E-9

However, since log lines where sequence == 0 were filtered out, I then counted the number of lines with sequence == 0 per host:

select hostname, count(*) from varnish2 where year = 2013 and day between 17 and 20 and sequence = 0;


hostname count
cp3003.esams.wikimedia.org 5
cp1048.eqiad.wmnet 3

These counts correspond to the count of missing messages in the previous query results, which means that these messages are not missing after all.

Conclusion: 0 messages lost from both an Amsterdam and a Virginia host in 4 days time!!!