Jump to content


From Wikitech

Backfilling a kafka eventlogging_<Schema> topic

It is possible that eventlogging_<Schema> topic is missing data (if there was a host issue) when event data is in fact in eventlogging-client-side topic. Since eventlogging-client-side is also imported into Hadoop you can run a local backfill evenlogging-processor on a stat machine (as of recent the eventlogging role is deployed there) and pump the data back in the topics in which it belongs.

  • Get files from Hadoop for the time intervals that are missing (note files are compressed but you want them to be in plain text)
hdfs dfs -text /wmf/data/raw/eventlogging_client_side/eventlogging-client-side/hourly/2019/04/02/21/* > 2019-04-21.txt
  • Clean up files a bit, they have an extra sequence number:
more 2019-04-01-19.txt | awk '{$1=""; print substr($0,2)}' | awk 'NF' > 2019-04-01-19-plain.txt
  • Checkout eventlogging to your homedir and after do:
export PYTHONPATH=/home/nuria/eventlogging
export PATH="$PATH:/home/nuria/eventlogging/bin"
eventlogging-processor --help
  • Once you have a file you want to pump to kafka do (careful with quotes, format is picky):

Need to whitelist proxy's

export http_proxy=http://webproxy.eqiad.wmnet:8080
export https_proxy=http://webproxy.eqiad.wmnet:8080
ionice cat 2019-04-01-19-plain.txt | eventlogging-processor '%q %{recvFrom}s %{seqId}d %D %{ip}i %u' 'stdin://' 'kafka-confluent:///kafka-jumbo1001.eqiad.wmnet:9092,kafka-jumbo1002.eqiad.wmnet:9092,kafka-jumbo1003.eqiad.wmnet:9092,kafka-jumbo1004.eqiad.wmnet:9092,kafka-jumbo1005.eqiad.wmnet:9092,kafka-jumbo1006.eqiad.wmnet:9092?topic=eventlogging_{schema}&message.send.max.retries=6,retry.backoff.ms=200' --output-invalid

  • After camus runs there should be new files on say this partition
ls -la /mnt/hdfs/wmf/data/raw/eventlogging/eventlogging_VirtualPageView/hourly/2019/04/01/20
  • You would need to run refine for data to be refined if backfilling is putting events more than 1 month in the past (an-coord1001):
sudo -u hdfs /usr/local/bin/refine_eventlogging_analytics --since 2019-04-01T19:00:00 --until 2019-04-01T21:00:00
  • Also events_sanitized might need to be backfilled

Get all events from kafka topics for a given timeframe

We need to get all events from timeframe from kafka, that can be done via consuming N messages from an offset The following command will print out message and offset

kafkacat -C -b kafka1012.eqiad.wmnet:9092  -c 1 -t eventlogging-client-side  -f "%o %s

The following command will return 12 messages at offsite 35...

kafkacat -C   -b kafka-jumbo1002.eqiad.wmnet:9092 -t eventlogging-client-side  -o 3597866000 -c12

Backfilling refine

If an interval of data is missing or corrupted in Hive's event database (eventlogging tables), we can re-run the refine process.

  1. Ssh into an-launcher1002.eqiad.wmnet
  2. Execute systemctl list-timers | grep refine_eventlogging
  3. Choose the timer that corresponds to the data that you want to correct and run systemctl cat refine_eventlogging_<foo>
  4. The results will show you the script that is used to run the EventLogging refine in the ExecStart field, i.e.: ExecStart=/usr/local/bin/refine_eventlogging_analytics.
  5. Cat it and copy the command. You don't need to copy the is-yarn-app-running part of the command, just the spark2-submit part. You can also leave the "${@}" out.
  6. You might want to add sudo -u hdfs/analytics in front of it. It's a good idea to change the name as well, like prefixing it with backfill_. Add --since and --until parameters, to override the properties file. If you're not sure if you need to override other properties (like emails_to or table_blacklist_regex, etc.), get the properties file path from the spark2-submit command and cat it.
  7. Run your tailored command in an-launcher1002.eqiad.wmnet.

Backfilling sanitization

If an interval of data is missing, corrupted or out of date in Hive's event_sanitized database, we can re-run the sanitization process. Note that EL sanitization does already a second pass 45 days after data collection. So if the data that you want to backfill is not older than 45 days, you don't need to backfill it (will be done automatically after 45 days), unless it's urgent! One nuance to keep in mind is whether the alarms are alerting about a schema that was recently added to the allowlist. If this is the case, alarms could be false positives as RefineSanitize is trying to catch up.

  1. Ssh into an-launcher1002.eqiad.wmnet
  2. Execute systemctl cat refine_sanitize_eventlogging_analytics_delayed (the _immediate and _delayed versions of this timer and scripts are identical except for their "since" and "until" parameters in their respective .properties files).
  3. The results will show you the script that is used to run the EventLogging sanitization in the ExecStart field, i.e.: ExecStart=/usr/local/bin/refine_sanitize_eventlogging_analytics_delayed.
  4. Cat it and copy the command. You don't need to copy the is-yarn-app-running part of the command, just the spark3-submit part. You can also leave the "${@}" out.
  5. Run it with sudo -u analytics, and modify the name of the job, like prefix it with backfill_. Add --since and --until parameters, to override the properties file (make sure to use YYYY-MM-DDTHH:mm:ss format). If you're not sure if you need to override other properties (like emails_to or table_blacklist_regex, etc.), get the properties file path from the spark2-submit command and cat it.
  6. If you're only sanitizing data for one or a few streams, you'll need to update the whitelist.yaml that the properties file points to. Copy the properties file to your directory, change its absolute path in the command. Then change the hdfs absolute path to the whitelist.yaml and copy/modify that file as well. Remember to keep the _defaults section when deleting other sections.
  7. Run the command. If it starts retrying (switching from RUNNING to ACCEPTED multiple times) you have to let it retry 6 times without killing it, otherwise it won't generate logs and you won't be able to figure out why it failed. Look at logs as usual with yarn logs.

Script example to run RefineSanitizeMonitor & RefineSanitize:

# on statbox
spark2-submit \
--class org.wikimedia.analytics.refinery.job.refine.RefineSanitizeMonitor \
--master yarn \
--deploy-mode client \
/srv/deployment/analytics/refinery/artifacts/org/wikimedia/analytics/refinery/refinery-job-0.1.15.jar \
--output_database     event_sanitized \
--output_path /wmf/data/event_sanitized \
--since               "2022-06-19T09:52:00+0000" \
--until               "2022-06-19T11:02:00+0000" \
--table_include_regex mediawiki_revision_create \
--allowlist_path /wmf/refinery/current/static_data/sanitization/event_sanitized_main_allowlist.yaml \
--input_database event \
--keep_all_enabled true \
--should_email_report true \
--to_emails <youremail>@wikimedia.org

# on an-launcher1002
sudo -u analytics kerberos-run-command analytics \
spark2-submit \
--class org.wikimedia.analytics.refinery.job.refine.RefineSanitize \
--master yarn \
--deploy-mode client \
/srv/deployment/analytics/refinery/artifacts/org/wikimedia/analytics/refinery/refinery-job-0.1.15.jar \
--output_database     event_sanitized \
--output_path /wmf/data/event_sanitized \
--since               "2022-06-19T09:52:00+0000" \
--until               "2022-06-19T11:02:00+0000" \
--table_include_regex mediawiki_revision_create \
--allowlist_path /wmf/refinery/current/static_data/sanitization/event_sanitized_main_allowlist.yaml \
--input_database event \
--keep_all_enabled true \
--should_email_report true \
--to_emails <youremail>@wikimedia.org \
--salts_path /user/hdfs/salts/event_sanitized


You need sudo on Beta Cluster to test the backfilling scripts and also sudo on eventlog1002 to do the backfilling for real: EventLogging/Testing/BetaCluster

This document describes how to do backfilling from "processed" events. If you need to backfill from raw events, like the ones stored on the client side log, additional steps are needed. The idea is the same only that a "process" step needs to be included so raw events can be processed before inserted on db.

Note that from this change onwards: [1] eventlog1002 only has logs for the last 30 days so backfilling of an outage should be done as soon as possible

First step (data preparation)

In the first step, split the logs for the relevant day into files of 64K lines. This size ensures you don't go over memory issues.

(Having such small files gives good control over what timespan you want to backfill, and it allows for easy parallelization, speed-up, and fine-control during data injection).

Events can be split with a command like this:

 mkdir split && cd split && ionice nice zcat /srv/log/eventlogging/archive/all-events.log-20141114.gz >all-events.log && ionice nice split --lines=64000 all-events.log && rm all-events.log

Raw Events

If you need to backfill raw events you might find this snippet useful: https://gist.github.com/nuria/e837d16b94c09a4df8a4 raw events logs (client-side and server-side) include a bunch of characters that need to be removed to be processed by the processors.

Second step (data injection)

You should test your scripts and code in Beta Cluster before trying this on vanadium.

Checkout a separate clone of EventLogging

The injection is better done using a separate clone of EventLogging. That way the backfilling is not subjected to interruptions of eventual EventLogging deployments of others, and you can use could use an EventLogging version of your choice.

See for example changes done prior to be able to backfill events 1 by 1 (not batched): [2]

To run EventLogging from your local checkout you need to change the python library search path. So, if you checked out EL code in your home directory, you would need to tell python where to build it:

cd ~/EventLogging/server
export  PYTHONPATH='/home/nuria/backfilling/python'
python ./setup.py develop --install-dir=/home/nuria/backfilling/python

These command build EL to `/home/nuria/backfilling/python`

Start a Backfilling Consumer

In a simple for loop over those split files (or parts of them in parallel), start a separate EventLogging consumer (that consumes from stdin and writes to m2-master) and pipe the file in. The config for this EventLogging consumer is just a copy of the m2 consumer's config having it's input swapped by the stdin. I would rename this config so when running htop is easy to find the process:

Config looks as follows:

nuria@vanadium:~/backfilling$ more mysql-m2-master-BACKFILLING

Note that the regular consumer batches events. Using that code as is to backfill is fine if you a are dealing with a total outage. If you have a problem with dropped out events within the event stream you cannot batch insertion. Thus , you might need to do code changes to the consumer to be able to backfill:

I had to do these changes on 201502: https://gerrit.wikimedia.org/r/#/c/190139/

To try whether your changes are working (again, in Beta Cluster)

/usr/bin/python -OO ./python/eventlogging-consumer @/home/nuria/backfilling/mysql-m2-master-BACKFILLING  > log-backfill.txt 2>&1

For each of the started consumers (I could only start two without the db falling too much behind), capture stdout and stderr and exit code to separate (per input) files. This allowed to easily verify that backfilling did not bail out and correlate log files with input files.

A simple shell script to loop over files and consume each:


 fileList=`ls 20150208/x*`

 for f in $fileList
    ionice nice cat $f | ionice nice  /usr/bin/python -OO ./python/eventlogging-consumer @/home/nuria/backfilling/mysql-m2-master-BACKFILLING  > log-backfill-${l}.txt 2>&1
    rm log-backfill-${l}.txt


There are two things to monitor: the database and eventlogging hosts. You can monitor eventlogging hosts with htop, the database stats appear here: https://tendril.wikimedia.org/host/view/db1046.eqiad.wmnet/3306