Obsolete:Analytics/Systems/Cluster/MediaWiki Avro Logging

From Wikitech
This page contains historical information. It may be outdated or unreliable.

Logs can be generated from the production MediaWiki instances in Avro and shipped through Kafka to Hadoop and finally end up in HDFS. This documentation is a work in progress. Get ahold of ottomatta, nuria, dcausse, or ebernhardson to clear up any questions about this documentation.

Schema

Schemas are stored in the mediawiki/event-schemas repository. Schemas are labeled with a version number, starting at 10, along with the a unix timestamp indicating when the schema was created. For example the schema stored in avro/mediawiki/CirrusSearchRequestSet/101446746400.avsc within the event-schemas repository is the first version of the CirrusSearchRequestSet. It was created at 1446746400.

Schema conventions

  • The schema should contain a { "name": "ts", "type": "int"} field recording the UNIX epoch time that the record represents. This is used by the default camus job that consumes messages from all mediawiki_.* kafka topics
  • Schemas should use camelCase naming conventions for field names when possible. This matches the default naming convention for EventLogging schemas.

MediaWiki

Once a schema has been added it can be referenced from the operations/mediawiki-config repository. You will need to generate a submodule bump for the wmf-config/event-schemas directory within operations/mediawiki-config so it points to a version that includes the new schema. In addition you will need to edit wmf-config/InitializeSettings.php . First off you need to associate a monolog channel to the Avro schema. This is done through the wmgMonologAvroSchemas setting. The schema field needs to be a string containing a valid Avro schema. This schema must be sourced from the event-schemas repository. The revision field refers to the exact schema number. This schema number is encoded into the header of a log message so that readers can use the correct schema to decode the log line.

'wmgMonologAvroSchemas' => array(
    'default' => array(
        'CirrusSearchRequestSet' => array(
            'schema' => file_get_contents( __DIR__ . '/event-schemas/avro/mediawiki/CirrusSearchRequestSet/111448028943.avsc' ),
            'revision' => 111448028943,
        ),
    ),
),

In addition to defining the schema a Monolog channel needs to be defined for this schema. The channel name must be the same as the schema name above. When utilizing the Avro + Kafka pipeline the buffer flag should always be set to true to prevent latency communicating with Kafka from negatively impacting page load performance. For any high volume logging the logstash handler for this channel must be disabled. The udp2log channel can be enabled or disabled depending on your needs.

'wmgMonologChannels' => array(
    'default' => array(
        ...
        'CirrusSearchRequestSet' => array(
            'kafka' => 'debug',
            'udp2log' => false,
            'logstash' => false,
            'buffer' => true
        ),
        ...
    )
),

Kafka

With MediaWiki configured as defined above binary Avro log messages will start appearing in the mediawiki_CirrusSearchRequestSet topic of Kafka. Topics will be auto-created in Kafka as necessary, but they will only have a single partition. For logging channels with any kind of volume, which should be the only reason a MediaWiki->Kafka->Hadoop pipeline is necessary, you will need to create a ticket in Phabricator for analytics engineering to create your topic with an appropriate number of partitions.

Test reading an Avro record from a Kafka topic

Individual messages can be read out of Kafka on stat1002.eqiad.wmnet, to verify the data is being encoded as expected. This command will read the most recent message from one partition:

$ kafkacat -b kafka1012 -t mediawiki_CirrusSearchRequestSet -c 1 > csrq.avro

The extracted message is not quite valid Avro yet, it contains a 9 byte header pre-pended to it. The header can be read with this small bit of scala:

object AvroTest {
  def main(args: Array[String]) {
    val file = new java.io.FileInputStream(args(0));
    val data = new java.io.DataInputStream(file);
    println("magic = %d".format(data.readByte()));
    println("revid = %d".format(data.readLong()));
  }
}

With that code in a file named avrotest.scala the following shell command will read in and report the header. Magic must always be 0. The revid must match the writer schema revision id that was used to write the message in mediawiki:

$ scala avrotest.scala csrq.avro

Finally the actual Avro message can be read in to ensure it, as well, is as expected with the following shell command. Pull down avro-tools-1.7.7.jar to your home directory and run the following command. This will throw a java.io.EOFException after printing out the decoded event. The EOFException can safely be ignored.

$ dd if=csrq.avro bs=1 skip=9 | \
      java -jar avro-tools-1.7.7.jar fragtojson --schema-file event-schemas/CirrusSearchRequestSet/111448028943.avsc -

Here's a script that can be used to perform all of the steps:

#!/usr/bin/env bash
# Validate MediaWiki Avro encoded message

CHANNEL=${1:?MediaWiki log channel expected (e.g. ApiAction)}
VERSION=${2:?Avro schema version expected (e.g. 101453221640)}

TOPIC="mediawiki_${CHANNEL}"
KAFKA_SERVER=kafka1012
REC="${CHANNEL}.avro"
SCALA_PROG=avrotest.scala
SCHEMAS=/srv/event-schemas/avro/mediawiki

# create a temp file for capturing command output
TEMPFILE=$(mktemp -t $(basename $0).XXXXXX)
trap '{ rm -f "$TEMPFILE"; }' EXIT

# Grab a record from kafka
echo "Reading 1 record from ${TOPIC} via ${KAFKA_SERVER}..."
echo "(this may take a while depending on event volume)"
kafkacat -b ${KAFKA_SERVER} -t ${TOPIC} -c 1 > ${REC}

# Validate the header
[[ -f $SCALA_PROG ]] ||
cat <<EOF >$SCALA_PROG
object AvroTest {
  def main(args: Array[String]) {
    val file = new java.io.FileInputStream(args(0));
    val data = new java.io.DataInputStream(file);
    println("magic = %d".format(data.readByte()));
    println("revid = %d".format(data.readLong()));
  }
}
EOF

echo "Checking binary packet header..."
scala $SCALA_PROG $REC > "$TEMPFILE"
grep 'magic = 0' "$TEMPFILE" || {
  echo >&2 "[ERROR] Expected 'magic = 0' in $REC"
  cat "$TEMPFILE"
  exit 1
}
grep "revid = ${VERSION}" "$TEMPFILE" || {
  echo >&2 "[ERROR] Expected 'revid = ${VERSION}' in $REC"
  cat "$TEMPFILE"
  exit 1
}

# Validate the payload
echo "Validating JSON..."
echo "(java.io.EOFException can be safely ignored)"
dd if=${REC} bs=1 skip=9 |
java -jar avro-tools-1.7.7.jar fragtojson \
    --schema-file ${SCHEMAS}/${CHANNEL}/${VERSION}.avsc -

Camus

Camus is the piece of software that reads messages out of Kafka and writes them to Hadoop. Camus will read the previous hour worth of data out of Kafka topics named mediawiki_<name> at 15 minutes past the hour. This data is written out to Hadoop in /wmf/data/raw/mediawiki/<name>/<year>/<month>/<day>/<hour>. For this to work though, Camus needs to know about the schemas to use. Camus reads schemas out of the analytics/refinery/source repository, via a git submodule to the event-schema repository. After adding a schema to the event-schema repository a submodule bump needs to be merged to analytics/refinery/source and a new version of the refinery-source jar needs to be deployed to the analytics cluster.

While Camus can read events from any schema version it knows about, it always writes events out to HDFS as the schema version configured in the mediawiki.erb template of the camus module in puppet. For each Kafka topic Camus reads there needs to be a line like the following:

org.wikimedia.analytics.schemas.CirrusSearchRequestSet.latestRev=111448028943

In order to correctly write files in their corresponding folder (partitions) camus needs to read the event timestamp. This is also configured in mediawiki.erb with the following properties:

  • camus.message.timestamp.field: the field name (defaults to timestamp)
  • camus.message.timestamp.format: the format (defaults to unix_milliseconds), can be :
    • unix_milliseconds: unix timestamp in millisecond (number)
    • unix_seconds: unix timestamp in second (number)
    • ISO-8601: ISO_8601 format (string)
    • any other string will be treated as a JAVA SimpleDateFormat

With an avro schema like :

{
   "name": "ts",
   "doc": "The timestamp, in unix time, that the request was made",
   "type": "int",
   "default": 0
},

The camus properties file should contain :

camus.message.timestamp.field=ts
camus.message.timestamp.format=unix_seconds

Hive

Hive is the final destination for querying logs generated from mediawiki and stored within Hadoop. The appropriate external table needs to be created on Hive in the wmf_raw database. See Analytics/Cluster/Hive/Avro for more information.

Oozie

Oozie is the Hadoop workflow scheduler. A job needs to be created within the analytics/refinery repository to create partitions in the Hive table after Camus has written them out.

The oozie/mediawiki/load bundle defines a reusable workflow for populating a Hive table from an Avro dataset sent to Kafka via Mediawiki and Monolog. To add processing for a new schema, add a new coordinator declaration in oozie/mediawiki/load/bundle.xml and set $channel, $raw_data_directory and $send_error_email_to appropriately.

Example coordinator declaration for a schema named MyNewMediaWikiSchema :

    <coordinator name="load_mediawiki_MyNewMediaWikiSchema-coord">
        <app-path>${coordinator_file}</app-path>
        <configuration>
            <property>
                <name>channel</name>
                <value>MyNewMediaWikiSchema</value>
            </property>
            <property>
                <name>raw_data_directory</name>
                <value>${raw_base_directory}/mediawiki_MyNewMediaWikiSchema</value>
            </property>
            <property>
                <name>send_error_email_to</name>
                <value>${common_error_to},developer@example.org</value>
            </property>
        </configuration>
    </coordinator>

Initial Deployment Checklist

These steps are intended to be done in-order.

  1. Commit schema to mediawiki/event-schemas repository (e.g. 265164)
  2. Commit submodule bump to analytics/refinery/source repository (e.g. 273556)
  3. Commit Oozie job to create partitions to analytics/refinery repository (e.g. 273557)
  4. Commit property changes for Camus to operations/puppet repository (e.g. 273558)
  5. Wait for analytics to deploy new versions of refinery and refinery-source to analytics cluster
  6. Have a topic with the proper number of partitions created in Kafka
  7. Commit submodule bump along with proper configuration to operations/mediawiki-config repository (e.g. 273559)
  8. Deploy initial mediawiki-config patch to production with a sampling rate of a few events per minute for testing
  9. Verify events in Kafka are as expected. Check mediawiki logs for errors.
  10. After enough time has passed (Camus runs once per hour) verify events are showing up in HDFS
  11. Create table in Hive pointing at the events in HDFS
  12. Submit coordinator to Oozie to auto-create partitions
  13. Adjust (or remove) sampling of events in operations/mediawiki-config repository
  14. Query your data!

Schema Upgrade Checklist

Schema upgrades need to be performed in a controlled manner to ensure the full pipeline continues processing events throughout the change. The only schema upgrade that is tested so far is adding a new field with a default value.

  1. Commit new schema version to mediawiki/event-schemas repository
  2. Commit submodule bump to analytics/refinery/source repository
  3. Wait for analytics to deploy new versions of refinery and refinery-source to analytics cluster
  4. Alter the relevant Hive table to use the new schema version
  5. Commit property changes for Camus to operations/puppet repository to write out data files using the new schema
  6. Adjust and deploy mediawiki code to provide the new field. Note that the PHP Avro encoder will ignore new fields it doesn't know about, but will error out if a field it knows about (even with a default value) is missing.
  7. Commit submodule bump and schema update to operations/mediawiki-config
  8. After the next Camus run verify it is still writing out all of the events. If things are awry Camus may only partially write out the directory for that hour.
  9. Verify the Oozie job to create partitions completes. If Camus only wrote a partial directory this will fail.