Kafka

From Wikitech
Jump to navigation Jump to search

Apache Kafka is a scalable and durable distributed logging buffer. We currently run 6 Kafka clusters using Confluent's Kafka distribution.

Administration

See the Kafka Administration page for administration tips and documentation.

Multi Datacenter

We use Kafka MirrorMaker to handle multi datacenter replication of Kafka topics. There are various replication topologies to choose from. As of 2021-04, only our Kafka main cluster is truly multi-DC. Here's how it works:

Producers always write to their local Kafka cluster. Producers must prefix topic names with the datacenter they are in. E.g. a producer in eqiad that is producing a stream named 'application.state-change' should produce to a Kafka topic named 'eqiad.application.state-change'.

Kafka MirrorMaker is in each datacenter is configured to pull all prefixed topics from the alternate datacenter. That is, Kafka MirrorMaker running in codfw pulls all topics in Kafka main-eqiad that are prefixed with 'eqiad.'. Conversely, Kafka MirrorMaker running in eqiad pulls all topics in Kafka main-codfw that are prefixed with 'codfw.'

Non DC-prefixed topics are not replicated.

Kafka Clusters

Our six clusters are, jumbo-eqiad, main-eqiad, main-codfw, logging-eqiad, logging-codfw and test-eqiad.

jumbo (eqiad)

The Kafka jumbo cluster is the replacement for the Kafka analytics cluster. The bulk of the data on this cluster is from webrequests. This cluster is also used directly for Analytics Events, Discovery Analytics, statsv, etc.. Much of the data here is imported into Hadoop using Gobblin. Data from main Kafka clusters is mirrored to this cluster for Analytics purposes.

This cluster is intended to be used for high volume analytics, as well as other non production critical services. If you are building a production level (e.g. paging on a holiday) service that uses Kafka, you should use the main Kafka clusters. All (almost) topics from the main clusters are mirrored here via MirrorMaker instances running on each of the broker nodes. These MirrorMaker instances are called e.g. main-eqiad_to_jumbo-eqiad, and as such they mirror topics from the main-eqiad cluster.

Brokers: kafka_clusters.jumbo-eqiad.

main (eqiad and codfw)

The 'main' Kafka clusters in eqiad and codfw are mirrors of each other. The 'main' clusters should be used for low volume critical production services. Kafka main clusters are used directly by Event_Platform/EventGate change-propagation, etc.

MirrorMaker instances run on each broker node, and consume from the remote data center. On main-eqiad, these instances are called main-codfw_to_main-eqiad, and on main-codfw, they are callsed main-eqiad_to_main-codfw.

Only topics prefixed by the appropriate datacenter names are mirrored between the two main clusters. I.e. only eqiad.* topics are mirrored from main-eqiad -> main-codfw, and vice versa.

Brokers: kafka_clusters.main-eqiad, clusters.main-codfw.

logging (eqiad and codfw)

The 'logging' Kafka clusters in eqiad and codfw are a component of the ELK logging pipeline.

Hosts produce to this Kafka cluster by way of rsyslog-omkafka, with Logstash consuming from Kafka. This provides a buffering layer to smoothen load on the logstash collectors, and prevent lost log messages in the event that logstash crashes or is unable to cope with the load.

Multi-site for this cluster is designed so that logstash collectors in eqiad and codfw may consume from both Kafka clusters, and agents may produce to only the nearest cluster. Currently the Kafka brokers for this cluster run on the same hardware as the logstash Elasticsearch instances.

Brokers: kafka_clusters.logging-eqiad,kafka_clusters.logging-codfw

test-eqiad (eqiad)

Used to test Kafka configuration changes, such as rebalancing partitions and upgrading the Kafka version.

Webrequest logs

varnishkafka is installed on frontend varnishes. It sends webrequest logs to the jumbo Kafka brokers.

kafaktee, available on centrallog1001, is a replacement for udp2log that consumes from Kafka instead of from the udp2log firehose. It consumes, samples, and filters the webrequest to files for easy grepping and troubleshooting. Fundraising also runs an instance of Kafkatee that feeds webrequest logs into banner analysis logic.

Monitoring

In Cloud VPS

Kafka is puppetized in order to be able to spawn up arbitrary clusters in Cloud VPS. Here's how.

You'll need a running Zookeeper and Kafka broker. These instructions show how to set up a single node Zookeeper and Kafka on the same host.

Create a new Cloud VPS instance. In this example we have named our new instance 'kafka1' and it is in the 'analytics' project. Thus the hostname is kafka1.analytics.eqiad1.wikimedia.cloud. Wait for the instance to spawn and finish its first puppet run. Make sure you can log in.

Edit hiera data for your project and set the following:

zookeeper_cluster_name: my-zookeeper-cluster
zookeeper_clusters:
   my-zookeeper-cluster:
     hosts:
       kafka1.analytics.eqiad1.wikimedia.cloud: "1"

kafka_clusters:
  my-kafka-cluster:
    zookeeper_cluster_name: my-zookeeper-cluster
    brokers:
        kafka1.analytics.eqiad1.wikimedia.cloud:
            id: "1"

Go to the configure instance page for your new instance, and check the following boxes to include needed classes:

  • role::zookeeper::server
  • role::kafka::simple::broker

Run puppet on your new instance. Fingers crossed and you should have a new Kafka broker running.

To verify, log into your instance and run

 kafka topics --create --topic test --partitions 2 --replication-factor 1
 kafka topics --describe

If this succeeds, you will have created a topic in your new single node Kafka cluster.

Kafka clients usually take a list of brokers and/or a zookeeper connect string in order to work with Kafka. In this example, those would be:

  • broker list: kafka1.analytics.eqiad1.wikimedia.cloud:9092
  • zookeeper connect: kafka1.analytics.eqiad1.wikimedia.cloud:2181/kafka/my-kafka-cluster

Note that the zookeeper connect URL contains a path that has the value of kafka_cluster_name in it. You should substitute this for whatever you named your cluster in your hiera config.

How do I ...

The easiest CLI to Kafka is kafkacat. This can be executed in several modes (e.g. consumer, producer, etc.). See kafkacat --help for more info.

List available Kafka topics

stat1007$ kafkacat -L -b kafka-jumbo1007.eqiad.wmnet:9092 | grep <my_topic>

Consume

stat1007$ kafkacat -C -b kafka-jumbo1007.eqiad.wmnet:9092 -t test_topic -o 0

This opens a stream that effectively tails or watches the given topic in real-time, starting from the present and returning new messages in real-time as they are arrive in Kafka.

You can control the start offset using the -o option, in which case Kafka will start by effectively replaying its message buffer first (upto 90 days for most topics). For example, -o -100 will tart the stream 100 messages back from the present, thus giving you an immediate burst of 100 messages. This is useful to kickstart your debugging session on topics that receive relatively few messages. This can be combined with the -e option to automatically end the program once the "present" has been reached (thus not waiting for new messages to arrive).

stat1007$ kafkacat -C -b kafka-jumbo1007.eqiad.wmnet:9092 -t test_topic -o -100 -e

If the -o option is omitted entirely, Kafka will replay its entire message buffer.

Produce

stat1007$ cat test_message.txt
Hola Mundo
stat1007$ cat test_message.txt | kafkacat -P -b kafka-jumbo1007.eqiad.wmnet:9092 -t test

Consume with a Python Kafka client

The two best Python Kafka clients (as of 2020-01) are confluent-kafka and kafka-python. Follow instructions there to connect to a Kafka broker and consume.


Consume using Spark Streaming

Get a neat GUI for interacting with Kafka

The Redpanda console offers useful functionality such as viewing or creating messages, viewing topic info, managing consumer groups, etc in an easy to use GUI. Run it with:

docker run --rm --network=host -p 8080:8080 -e KAFKA_BROKERS=localhost:30092 docker.redpanda.com/redpandadata/console:latest

(replace localhost:30092 with the Kafka brokers you want to connect to)

Then open http://localhost:8080 and enjoy.

Force a specific offset for a consumer group

1. Pause all active consumers

2. From one of the stats machines (e.g. stat1005.eqiad.wmnet) create a directory to run the following commands from

mkdir ~/kafka
cd ~/kafka

3. Download confluent-kafka debian package from apt.wikimedia.org thirdparty/confluent. The exact version may change in the future, open up the appropriate directory in a web browser if it 404's to see the current version.

wget https://apt.wikimedia.org/wikimedia/pool/thirdparty/confluent/c/confluent-kafka/confluent-kafka-2.11_1.1.0-1_all.deb

4. Manually unpackage the deb

ar x confluent-kafka-2.11_1.1.0-1_all.deb
tar -Jxf data.tar.xz

5. List current offsets for given consumer group and verify the consumers are really stopped. Note that this script will only work for topic/partition/group that are listed here. If your topic is unlisted, perhaps because the consumer offsets have expired, this tool will be unable to set the offsets (or at least, I couldn't figure out how).

usr/bin/kafka-consumer-groups --bootstrap-server kafka-jumbo1007:9092 --describe --group cirrussearch_updates_eqiad

6. Run the kafka-consumer-groups command with appropriate options. If your topic has more than one partition review the --help output for how to specify each partition.

usr/bin/kafka-consumer-groups \
  --bootstrap-server kafka-jumbo1007:9092 \
  --topic eqiad.swift.search_updates.upload-complete \
  --group cirrussearch_updates_eqiad \
  --reset-offsets \
  --to-offset 70678 \
  --dry-run

7. If you are happy with the output replace the --dry-run flag with --execute and re-run

Force a specific offset for a consumer group (more error prone)

If the above doesn't work you might need to explicitly create a commit. The following was used with the kafka-python==1.4.3 package. This assumes the topic has a single partition.

import kafka
c = kafka.KafkaConsumer(bootstrap_servers='kafka-jumbo1007.eqiad.wmnet:9092', group_id='cirrussearch_updates_eqiad', enable_auto_commit=False)
c.subscribe(['eqiad.swift.search_updates.upload-complete'])
# We have to poll at least once for the kafka connection to be made and c.assignment() to return values
c.poll(timeout_ms=100)
tp = next(iter(c.assignment()))
c.commit({tp: kafka.OffsetAndMetadata(70678, '')})

See also