Analytics/Systems/EventLogging/Administration

From Wikitech
Jump to navigation Jump to search

Overview

The following diagram should be a companion of the excellent explanation in https://github.com/wikimedia/eventlogging EventLogging.png

The diagram has been created with https://www.draw.io/. If you want the source code, please check https://gist.github.com/elukey/975fab2bcf2ea6398fe1

Important notes

Dependent systems

statsv is running on hafnium.eqiad,wmnet and it is a daemon responsible to aggregate performance data before sending it to Statsd. There are metrics in the Even Logging dashboard like https://grafana.wikimedia.org/dashboard/db/eventlogging?panelId=11&fullscreen that are counting on this service to work properly. If you observe datapoint loss in the metric please check the status of the statsv service on hafnium and restart it if needed.

Alarms

As depicted in the picture we are monitoring:

  • Lag between messages landing in Kafka topics and message consumption rate from EventLogging's processes using Burrow. The alarm will be triggered as email sent to analytics-alert@.
  • Insertion rate to Mysql master from the Consumer processes using Graphite/Icinga (config file). You will see alerts in the wikimedia-analytics IRC channel.
  • [IN PROGRESS] Replication lag between for MySQL slaves (https://phabricator.wikimedia.org/T124306)

Consumption Lag "Alarms": Burrow

Alarms for burrow report numbers like

eventlogging-valid-mixed:2 (1454622892611, 199315032, 17) -> (1454622901672, 199315032, 17)

This is

(timestamp, offset, #of messages behind (lag))
  • Burrow evaluates lag in length window of 10 offsets by default. We are committing offsets every second, this frequency would make us evaluate lag every 10 secs which seems too frequent so we have changed the length of lag window window to 100 secs.
  • Lag is evaluated aprox every couple minutes. (we are fine tuning this)
  • docs about burrow lag - https://github.com/linkedin/Burrow/wiki/Consumer-Lag-Evaluation-Rules

An interesting alert use case is the following one:

Cluster:  eqiad
Group:    eventlogging-00
Status:   ERR
Complete: true
Errors:   1 partitions have problems
Format:   Topic:Partition (timestamp, start-offset, start-lag) -> (timestamp, end-offset, end-lag)

           eventlogging-client-side:11 (1455812030875, 188196575, 0) -> (1455813022220, 188205237, 0)

As you can see the end offset is greater than the start one (meanwhile the lag is zero) and the partition status is ERROR. This is due to the current Burrow's rule 4 in https://github.com/linkedin/Burrow/wiki/Consumer-Lag-Evaluation-Rules, that should be read in this way:

Burrow will alert you when a consumer is so slow that the time elapsed between the last offset committed and now is bigger than the time taken to commit all the offsets belonging the last window.

This can happen when a Kafka broker goes offline and it is a partition leader. EventLogging will need a bit of time to recognize the problem and request new metadata from Kafka, and hence the related consumer status according to Burrow will look like it is stalled or completely blocked even though it is only a temporary stop.

Reseting burrow consumer group monitoring

If you happen to change the topic assignment for a consumer group, burrow will continue to think that that group should consume from its previous assigned topics, and report lag if it stops doing so. To reset what topics burrow should monitor for a given consumer group, you should delete the consumer group monitoring from burrow. After deletion, consumer group monitoring will be automatically recreated for new topic partitions that get offset commits.

 curl -X DELETE localhost:8000/v2/kafka/<cluster>/consumer/<consumer_group>

e.g.

 curl -X DELETE localhost:8000/v2/kafka/eqiad/consumer/eventlogging_consumer_mysql_00

See: https://github.com/linkedin/Burrow/wiki/http-request-remove-consumer-group

MySQL Master/Slave replication

Eventlogging databases (m4 shard):

  • db1107 (m4-master)
  • db1108 (analytics-slave) use a custom replication mechanism:

History:

  • db1046 used to be the m4-master (replaced with db1107 in T156844).
  • db1047 used to be the analytics-slave (replaced with db1108 in T156844).
  • dbstore1002 used to replicate the log database (changed with T156844).
  • the m4-master domain used by the Mysql consumers points directly to db1107 rather than dbproxy (T188991)

Mysql insertion rate dropping to zero due to db failures

After T188991, the Mysql consumers points directly to the db1107 master database, since m4-master.eqiad.wmnet is a CNAME to it rather than a dbproxy. This ensures that no failover happens when db1107 is down for some reason (maintenance or outages). So if the mysql insertion rate drops, the first thing to do is to check on the Eventlogging host errors in the mysql consumers log files (under /srv/log/eventlogging/systemd). If the cause is a problem connection to the db, do the following:

1) sudo puppet agent --disable "$your-username - working on eventlogging"

2) sudo systemctl stop eventlogging-consumer@mysql-eventbus.service (same thing with eventlogging-consumer@mysql-m4-master-00.service)

Now you have time to figure out what happened, and establish if data needs to be replayed or not. Since we are pulling from Kafka and we have unique key constraints on the mysql tables, replaying data already inserted will not cause any duplicate (but only warnings in the logs).

A very nice Python script was used in to reset the offsets for the mysql consumer groups to read earlier data:

https://gist.github.com/ottomata/d69ba72313c44e8e45e6453f4ea97074

 1 import json
 2 
 3 from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
 4 from pprint import pprint
 5 
 6 
 7 def get_committed_offsets(consumer, partitions):
 8     return [(tp, consumer.committed(tp)) for tp in partitions]
 9 
10 
11 def commit(consumer_group, assignments):
12     consumer = KafkaConsumer(
13         bootstrap_servers=['kafka-jumbo1001.eqiad.wmnet:9092'],
14         group_id=consumer_group,
15         enable_auto_commit=False,
16     )
17 
18     # print the current committed offsets
19     print("\n\n%s currently at: " % consumer_group)
20     pprint(get_committed_offsets(consumer, assignments.keys()))
21 
22 
23     # print the message dts at these offsets:
24     print("Seeking to new offsets:")
25     for tp, o in assignments.items():
26         consumer.assign([tp])
27         consumer.seek(tp, o.offset)
28         message = next(consumer)
29         event = json.loads(message.value)
30         try:
31             dt = event['dt']
32         except:
33             dt = event['meta']['dt']
34 
35         print("partition {}, offset {} is at dt {}".format(message.partition, message.offset, dt))
36 
37 
38     print("\nCommit these offsets to consumer group %s''? [y/n]:" % consumer_group)
39     if raw_input() == 'y':
40         print("Committing...", assignments)
41         consumer.commit(assignments)
42 
43     else:
44         print("Not committing.")
45 
46 
47     # print the current committed offsets
48     print("%s now at: " % consumer_group)
49     pprint(get_committed_offsets(consumer, assignments.keys()))
50 
51 
52 
53 # === EventLogging
54 #
55 # 12 partitions in eventlogging-valid-mixed
56 topic_partitions = [TopicPartition('eventlogging-valid-mixed', p) for p in range(0,12)]
57 
58 # Offsets obtained from kafkacat like:
59 # for p in $(seq 0 11); do  kafkacat -C -b kafka-jumbo1001.eqiad.wmnet -f "%p %o\n%s\n" -t eventlogging-valid-mixed -p $p -c 1 -o -169000; done
60 assignments = {
61     topic_partitions[0]:  OffsetAndMetadata(760139545, b''),
62     topic_partitions[1]:  OffsetAndMetadata(760165349, b''),
63     topic_partitions[2]:  OffsetAndMetadata(760213636, b''),
64     topic_partitions[3]:  OffsetAndMetadata(758471379, b''),
65     topic_partitions[4]:  OffsetAndMetadata(755461088, b''),
66     topic_partitions[5]:  OffsetAndMetadata(759662862, b''),
67     topic_partitions[6]:  OffsetAndMetadata(760187732, b''),
68     topic_partitions[7]:  OffsetAndMetadata(760190481, b''),
69     topic_partitions[8]:  OffsetAndMetadata(760185377, b''),
70     topic_partitions[9]:  OffsetAndMetadata(757392297, b''),
71     topic_partitions[10]: OffsetAndMetadata(755504356, b''),
72     topic_partitions[11]: OffsetAndMetadata(759680897, b'')
73 }
74 
75 commit('eventlogging_consumer_mysql_00', assignments)
76 
77 
78 # ===EventBus
79 #
80 # Offsets obtained like:
81 # kafkacat -C -b kafka-jumbo1001.eqiad.wmnet -f "%p %o\n%s\n" -t "eqiad.mediawiki.page-create" -p 0 -c 1 -o -67000
82 assignments = {
83     TopicPartition('eqiad.mediawiki.page-create', 0):  OffsetAndMetadata(48395235, b''),
84     TopicPartition('eqiad.mediawiki.page-delete', 0):  OffsetAndMetadata(5363302, b''),
85     TopicPartition('eqiad.mediawiki.page-undelete', 0):  OffsetAndMetadata(108336, b''),
86     TopicPartition('eqiad.mediawiki.page-move', 0):  OffsetAndMetadata(2821346, b''),
87 }
88 
89 commit('eventlogging_consumer_mysql_eventbus_00', assignments)

Dumping data via sqoop from eventlogging to hdfs

We will be archiving large tables that do not need immediate data access at " /wmf/data/archive/eventlogging/Table_name" the archival is a plain sqoop dump of table, nothing else. Avro schemas for tables (autogenerated by sqoop upon import) can be found at /wmf/data/archive/eventlogging/avro-schemas.

Hive tables on top of data are generated on archive database;

Sqoop

Of interest: https://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#_example_invocations

Try that you can connect via scoop using a command with no side effects, for example list tables:

sudo -u hdfs sqoop list-tables  --password-file '/user/hdfs/mysql-analytics-research-client-pw.txt' --username research --connect jdbc:mysql://analytics-store.eqiad.wmnet/log


A sample import (to /tmp/PageContentSaveCompleteAvro) to avro file format

time sudo -u hdfs sqoop import  --as-avrodatafile  --password-file '/user/hdfs/mysql-analytics-research-client-pw.txt' --username research --connect jdbc:mysql://analytics-store.eqiad.wmnet/log --table PageContentSaveComplete_5588433 --columns id,uuid,timestamp,webHost,wiki,event_isAPI,event_isMobile,event_revisionId  --target-dir /tmp/PageContentSaveCompleteAvro


Another example (note column names with "." need to be quoted):

time sudo -u hdfs sqoop import --as-avrodatafile --username research --password-file '/user/hdfs/mysql-analytics-research-client-pw.txt' --connect jdbc:mysql://analytics-store.eqiad.wmnet/log --quer y 'select convert(uuid using utf8) uuid,convert(timestamp using utf8) timestamp, convert(wiki using utf8) wiki, convert(webHost using utf8) webHost, convert(event_action using utf8) event_action,conve rt(`event_action.abort.mechanism` using utf8) `event_action.abort.mechanism`, convert(`event_action.abort.timing` using utf8) `event_action.abort.timing`, convert(`event_action.abort.type` using utf8) `event_action.abort.type`, convert(`event_action.init.mechanism` using utf8) `event_action.init.mechanism`, convert(`event_action.init.timing` using utf8) `event_action.init.timing` , convert(`event_ action.init.type` using utf8) `event_action.init.type`, convert(`event_action.ready.timing` using utf8) `event_action.ready.timing`, convert(`event_action.saveAttempt.timing` using utf8) `event_action. saveAttempt.timing`, convert(`event_action.saveFailure.message` using utf8) `event_action.saveFailure.message`, convert(`event_action.saveFailure.timing` using utf8) `event_action.saveFailure.timing` , convert(`event_action.saveFailure.type` using utf8) `event_action.saveFailure.type` , convert(`event_action.saveIntent.timing` using utf8) `event_action.saveIntent.timing`, convert(`event_action.sa veSuccess.timing` using utf8) `event_action.saveSuccess.timing`, convert(`event_editingSessionId` using utf8) `event_editingSessionId`, convert(event_editor using utf8) event_editor, convert(event_int egration using utf8) event_integration, convert(`event_mediawiki.version` using utf8) `event_mediawiki.version`, convert(`event_page.id` using utf8) `event_page.id`, convert(`event_page.ns` using utf8) `event_page.ns`, convert(`event_page.revid` using utf8) `event_page.revid`, convert(`event_page.title` using utf8) `event_page.title`, convert(event_platform using utf8) event_platform, convert(`event _user.class` using utf8) `event_user.class`, convert(`event_user.editCount` using utf8) `event_user.editCount`, convert(`event_user.id` using utf8) `event_user.id`, convert(event_version using utf8) event_version from Edit_13457736_15423246 where $CONDITIONS ' --target-dir /wmf/data/archive/eventlogging/Edit_13457736_15423246 --split-by uuid


Remapping columns and using a custom query:

time sudo -u hdfs sqoop import  --as-avrodatafile  --password-file '/user/hdfs/mysql-analytics-research-client-pw.txt' --username research --connect jdbc:mysql://analytics-store.eqiad.wmnet/log  --query 'select id,uuid,convert(timestamp using utf8) timestamp,convert(webHost using utf8) webhost,wiki,cast(event_isAPI as Integer) event_isAPI,cast(event_isMobile as Integer) event_isMobile,cast(event_revisionId as Integer) event_revisionId from PageContentSaveComplete_5588433 where $CONDITIONS'   --map-column-java event_isAPI=Integer,event_isMobile=Integer,event_revisionId=Integer  --target-dir /tmp/PageContentSaveCompleteAvro --split-by id

An example of how to set up a table on top of avro files can be found here: https://github.com/wikimedia/analytics-refinery/blob/master/hive/mediawiki/history/create_mediawiki_page_table.hql


I (Nuria) could not get the direct mapping on hive to work, rather I had to use a syntax that included the schema to create the table: https://gist.github.com/nuria/acd67dc1d237c59a2dda9799e82da4c3#file-create-avro-table-with-schema-hql


See also: https://phabricator.wikimedia.org/diffusion/AWCM/browse/master/WDCM_Sqoop_Clients.R

EventLogging Routine Maintenance for the oncall

  • Check grafana for problems with raw vs. validated events, or other apparent problems
  • Check storage for any gaps if you think there might be an issue: A few different scripts exist, Milimetric's gist for example.
  • Decide on whether we need to deploy on that week, avoid Friday deployments
  • Remember to log all actions to SAL log (!log <something> on ops channel)
  • Report outages as part of wikimedia's incident reports so there is a reference
  • Follow up on any alarms that might be raised

How Tos

Restart all EventLogging processes

Check:

sudo eventloggingctl status

Run:

 sudo eventloggingctl restart

Stop completely:

 sudo eventloggingctl stop

The config applied to create logs and such is at:

/etc/eventlogging.d/*/*

Start/Stop/Restart individual EventLogging processes

EventLogging processes are managed by systemd. Each config file in /etc/eventlogging.d/*/* corresponds to a single eventlogging process. Let's call the pieces of this hierarchy /etc/eventlogging.d/$service/$name.

To stop one of them, you can do something like:

systemctl stop eventlogging-$service@$name

For example:

elukey@eventlog1002:~$ sudo systemctl status eventlogging-consumer@mysql-m4-master-00.service
● eventlogging-consumer@mysql-m4-master-00.service - Eventlogging Consumer mysql-m4-master-00
   Loaded: loaded (/lib/systemd/system/eventlogging-consumer@mysql-m4-master-00.service; enabled; vendor preset: enabled)
   Active: active (running) since Thu 2018-03-08 13:32:20 UTC; 5 days ago
 Main PID: 7882 (python)
    Tasks: 2 (limit: 4915)
   CGroup: /system.slice/system-eventlogging\x2dconsumer.slice/eventlogging-consumer@mysql-m4-master-00.service
           └─7882 python /srv/deployment/eventlogging/analytics/bin/eventlogging-consumer @/etc/eventlogging.d/consumers/mysql-m4-master-00
           
elukey@eventlog1002:~$ sudo systemctl restart eventlogging-consumer@mysql-m4-master-00.service

You can stop, start, and restart any individual EventLogging process using variations of this command.

Backfill schemas on MySQL

Analytics/EventLogging/Backfilling

Check logs and errors

Raw logs are at:

/srv/log/eventlogging

Process logs are at:

/srv/log/eventlogging/systemd


While errors appear on logs is almost easier to check those by consuming from kafka error topic:

> kafkacat -C  -b kafka-jumbo1001.eqiad.wmnet -t eventlogging_EventError

Using the above with some sed/sort/unique you can easily grasp schema in error distribution (change the -o 10000 to the last number of kafka messages you want to analyze):

> kafkacat -C  -b kafka-jumbo1001.eqiad.wmnet -t eventlogging_EventError -o -10000 -e | sed -n 's/^.*"schema": "\([^"]*\)"}.*$/\1/p' | sort | uniq -c


Longer term logs

Longer term logs of events are help for 90 days in hadoop. See: https://gerrit.wikimedia.org/r/#/c/operations/puppet/+/467646/7/modules/camus/templates/eventlogging-client-side.erb

They can be found at:

/wmf/data/raw/eventlogging_client_side

Camus is also dumping errors that might be on kafka to:

/wmf/camus/eventlogging-client-side-00

Get a first feeling of end-to-end issues on the whole pipeline of a schema

If you're for example interested in NavigationTiming 10374055 run

mysql --defaults-extra-file=/etc/mysql/conf.d/research-client.cnf --host analytics-slave.eqiad.wmnet -e "select left(timestamp,10) ts , COUNT(*) from log.NavigationTiming_10374055 where left(timestamp,8) >= '20150101' group by ts order by ts;" >out && tail -n 100 out

on stat1006 (You need to be in the researchers group to access the research-client.cnf). That will dump recent hourly totals to the screen, and if you prefer graphs, more data is stored in out and only waiting to get plotted.

If you need different granularity, just change the 10 in the query to the granularity you need (like 8 => per day, 12 => per minute, 14 => per second).

If the numbers you get indicate issues, you can go to Graphite to sanity check the early parts of the pipeline. A first indicator is typically the overall counts. Like comparing eventlogging.overall.raw.rate to eventlogging.overall.valid.rate. Then one can bisect into eventlogging.client_side_events.*, or eventlogging.server_side_events.*, directly drill down into per schema counts, like looking at the graph for eventlogging.schema.NavigationTiming.rate. If you're good at staring at graphs, go right to the all in one graph.

If the numbers you get indicate issues, you can also repeat the database query to the m2 master database directly (credentials are in /etc/eventlogging.d/consumers/mysql-m2-master on vanadium. That allows to exclude replication issues.

Troubleshoot events coming in in real time

  • Incoming events counts are logged to graphite, both the count of validating and non validating events per schema are available

using those users can get a sense of change, graphite is populated real-time and if all of a sudden events for an schema do not validate it is clearly visible.

  • EvenLogging slave database (for users with access to 'research' user) is also populated real-time.

Lastly, Event Logging events coming on real time are written to files that are sync-ed to stat1007 and stat1006 once a day, these files can be found here:

~@stat1006:/srv/log/eventlogging/archive$

If you detect an issue or suspicious change , please notify analytics@ and escalate with analytics devs.

Troubleshoot insufficient permission

"error: insufficient permission for adding an object to repository database .git/objects"

List > groups to see if you are on wikidev group, if so likely some files on .git directory are not writable by "wikidev" group. Make them so.

Deploy EventLogging

EventLogging is deployed using scap3. The scap deployment configuration for various EventLogging deployments can be found in specific scap repos in gerrit: eventlogging/scap/<deployment-name>. The EventLogging Analytics deployment scap configs are at at eventlogging/scap/analytics. Deployment on deployment.eqiad.wmnet using

# ssh to production deploy server
ssh deployment.eqiad.wmnet

# cd to the EventLogging Analytics instance deploy source
cd /srv/deployment/eventlogging/analytics

# Checkout the revision you want to deploy
git pull

# Update the submodules
git submodule update --init 

# Run scap3 deployment
deploy

ssh eventlog1002.eqiad.wmnet (or wherever eventlogging is deployed.)

Go to /srv/deployment/eventlogging/analytics

See that checkout is there from what you just pulled in from tin (via git log).

Restart EL on target host (eventlog1002.eqiad.wmnet)

eventloggingctl stop
eventloggingctl start

Check various logs in /srv/log/eventlogging/systemd/eventlogging_* to see that things are running as they should.

Check that /srv/log/eventlogging/all-events.log has data flowing in.

Hop in the Ops IRC channel and !log that you upgraded & restarted EventLogging and add the commit hash that you deployed.

!log Redeployed eventlogging with revert to batch/queue size change - https://gerrit.wikimedia.org/r/#/c/258384/

Now please deploy latest code to Beta Cluster to keep things in sync: EventLogging/Testing/BetaLabs#How_to_deploy_code

Blacklist a schema

https://gerrit.wikimedia.org/r/#/c/248045/

Resolve lagging replication between MySQL Master and Slaves

If replication is lagging we should open a ticket with the info we have and tag it with phabricator DBA task so DBA gets ping-ed. An example of a prior problem: [1]

Have in mind that while monitoring for database is here: [2], lag reported does not apply as replication on EL doesn't go through regular channels.

Ad hoc replication script is here:

https://github.com/wikimedia/operations-software/blob/master/dbtools/eventlogging_sync.sh

Raise log verbosity to debug

Sometimes while investigating an outage it is handy to raise log's verbosity to DEBUG to have more information about what it is happening. There are three main kind of logs:

  • Eventlogging ones (i.e. the ones logged from EL itself)
  • Kafka Python ones (library used to consume events from Kafka)
  • Confluent Kafka Python ones (library used to produce events to Kafka via librdkafka)

For the first two it is sufficient to do the following (requires root permissions):

  • Identify the Eventlogging daemon that you want to debug (processor 01 for example) and get its systemd unit path via systemctl cat eventlogging-processor@client-side-01.service (it will be the first line reported by the command).
  • Open the file with an editor and add a line with Environment=LOG_LEVEL=DEBUG
  • Restart the daemon via systemctl restart eventlogging-processor@client-side-01.service

For Confluent Kafka Python you'll need to add a specific parameter to the Eventlogging configuration files, since the library needs to configure librdkafka accordingly (where all the useful logs will come from). The procedure is the following:

  • Identify the Eventlogging daemon that you want to debug (processor 01 for example)
  • Open its configuration file with an editor (in this case /etc/eventlogging.d/processors/client-side-01)
  • Identify lines starting with kafka-confluent:/// and append to the end of the line debug=something (with something picked from librdkafka's configuration guidelines - currently suggested ones are: detailed Producer debugging: broker,topic,msg. Consumer: consumer,cgrp,topic,fetch)
  • Restart the daemon via systemctl restart eventlogging-processor@client-side-01.service