Kafka

From Wikitech
Jump to: navigation, search

Introduction

Apache Kafka is a scalable and durable distributed logging buffer.

Administration

See the Kafka Administration page for administration tips and documentation.

Kafka Clusters

WMF runs 3 Kafka clusters: analytics-eqiad, main-eqiad, and main-codfw. We run Confluent's Kafka distribution.

analytics-eqiad

analytics-eqiad is the original Kafka install at WMF. (It was originally referred to as just eqiad.) It consists of 6 brokers inside the Analytics VLAN. The bulk of the data on this cluster is from webrequests. This cluster is also used directly for Analytics EventLogging, Discovery Analytics, statsv and EventStreams. Much of the data here is imported into Hadoop using Camus. Data from main clusters is mirrored to this cluster for Analytics purposes.

'analytics' is no longer a descriptive name for this Kafka cluster. In FY2017-2018, we'd like to provision 2 new 'aggregate' Kafka clusters in eqiad and and codfw that will replace the out of warranty brokers in analytics-eqiad. These new clusters will not be in the Analytics VLAN. They will be used for high volume 'analytics' data as well as for non-user facing 'production' services.

main (eqiad and codfw)

The 'main' Kafka clusters in eqiad and codfw are mirrors of each other. The 'main' clusters should be used for low volume critical production services. Main is currently used directly by EventBus and change-propogation.

Each main Kafka cluster consists of 3 brokers. eventlogging-service-eventbus, the EventBus HTTP produce serivce endpoint, is colocated with main Kafka brokers.

Webrequest logs

varnishkafka is installed on frontend varnishes. It sends webrequest logs to the analytics-eqiad Kafka brokers.

kafaktee is a replacement for udp2log that consumes from Kafka instead of from the udp2log firehose. It runs on oxygen, consumes, samples, and filters the webrequest to files for easy grepping and troubleshooting. Fundraising also runs an instance of Kafkatee that feeds webrequest logs into banner analysis logic.

Monitoring

In Labs

Kafka is puppetized in order to be able to spawn up arbitrary clusters in labs. Here's how.

You'll need a running Zookeeper and Kafka broker. These instructions show how to set up a single node Zookeeper and Kafka on the same host.

Create a new Jessie labs instance. In this example we have named our new instance 'kafka1' and it is in the 'analytics' project. Thus the hostname is kafka1.analytics.eqiad.wmflabs. Wait for the instance to spawn and finish its first puppet run. Make sure you can log in.

Edit hiera data for your project and set the following:

zookeeper_cluster_name: my-zookeeper-cluster
zookeeper_clusters:
   my-zookeeper-cluster:
     hosts:
       kafka1.analytics.eqiad.wmflabs: "1"

kafka_clusters:
  my-kafka-cluster:
    zookeeper_cluster_name: my-zookeeper-cluster
    brokers:
        kafka1.analytics.eqiad.wmflabs:
            id: "1"

# Specify which kafka cluster we want to use.  You might want to only
kafka_cluster_name: my-kafka-cluster

Go to the configure instance page for your new instance, and check the following boxes to include needed classes:

  • role::zookeeper::server
  • role::kafka::main::broker

Run puppet on your new instance. Fingers crossed and you should have a new Kafka broker running.

To verify, log into your instance and run

 kafka topics --create --topic test --partitions 2 --replication-factor 1
 kafka topics --describe

If this succeeds, you will have created a topic in your new single node Kafka cluster.

Kafka clients usually take a list of brokers and/or a zookeeper connect string in order to work with Kafka. In this example, those would be:

  • broker list: kafka1.analytics.eqiad.wmflabs:9092
  • zookeeper connect: kafka1.analytics.eqiad.wmflabs:2181/kafka/my-kafka-cluster

Note that the zookeeper connect URL contains a path that has the value of kafka_cluster_name in it. You should substitute this for whatever you named your cluster in your hiera config.

How do I ...

Produce/Consume to kafka

Easiest is to use kafkacat, and can be executed in consumer or producer mode

Consume

stat1002$ kafkacat -C -b kafka1012.eqiad.wmnet:9092 -t test

Produce

stat1002$ cat test_message.txt
Hola Mundo
stat1002$ cat test_message.txt | kafkacat -P -b kafka1012.eqiad.wmnet:9092 -t test

Consume avro schema from kafka

 from kafka import KafkaConsumer
 import avro.schema
 import avro.io
 import io

 # To consume messages
 consumer = KafkaConsumer('mediawiki_CirrusSearchRequestSet',
                         group_id='my_group',
                         metadata_broker_list=['kafka1012:9092'])

 schema_path="/home/madhuvishy/avro-kafka/CirrusSearchRequestSet.avsc"
 schema = avro.schema.parse(open(schema_path).read())

 for msg in consumer:
    bytes_reader = io.BytesIO(msg.value)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    data = reader.read(decoder)
    print data

See also