Jump to content

Kafka/Administration

From Wikitech

Safe Broker Restarts

Since partitions have at least 2 (usually 3) replicas, you should be able to restart a broker without losing any messages. A single broker offline temporarily will not cause a critical outage. Depending on the partition replica layout and number of brokers in a cluster, multiple brokers offline might be safe as well (see below about groups of Kafka brokers offline).

Note: the following details are still relevant but were abstracted behind an SRE cookbook: cookbook sre.kafka.roll-restart-reboot-brokers executable from cumin hosts.

Example:

sudo cookbook sre.kafka.roll-restart-reboot-brokers \
    --alias kafka-jumbo \
    --reason 'configuration reload' \
    restart_daemons

Brokers, consumer, and producers will automatically rebalance themselves when a broker dies, but it is nice to allow them to do so gracefully. service kafka stop will perform a graceful shutdown. Before you run this, you should make sure that any topics for which the target broker is the leader also has In Sync Replicas:

root@kafka1014:~# kafka topics --describe
...
Topic:webrequest_text	PartitionCount:12	ReplicationFactor:3	Configs:
	Topic: webrequest_text	Partition: 0	Leader: 18	Replicas: 18,22,12	Isr: 18,22,12
	Topic: webrequest_text	Partition: 1	Leader: 20	Replicas: 20,12,13	Isr: 20,13,12
	Topic: webrequest_text	Partition: 2	Leader: 22	Replicas: 22,13,14	Isr: 22,13,14
	Topic: webrequest_text	Partition: 3	Leader: 12	Replicas: 12,14,18	Isr: 18,12,14
	Topic: webrequest_text	Partition: 4	Leader: 13	Replicas: 13,18,20	Isr: 18,13,20
	Topic: webrequest_text	Partition: 5	Leader: 14	Replicas: 14,20,22	Isr: 22,20,14
	Topic: webrequest_text	Partition: 6	Leader: 18	Replicas: 18,12,13	Isr: 18,13,12
	Topic: webrequest_text	Partition: 7	Leader: 20	Replicas: 20,13,14	Isr: 20,13,14
	Topic: webrequest_text	Partition: 8	Leader: 22	Replicas: 22,14,18	Isr: 22,18,14
	Topic: webrequest_text	Partition: 9	Leader: 12	Replicas: 12,18,20	Isr: 20,18,12
	Topic: webrequest_text	Partition: 10	Leader: 13	Replicas: 13,20,22	Isr: 22,20,13
	Topic: webrequest_text	Partition: 11	Leader: 14	Replicas: 14,22,12	Isr: 22,12,14
...

# partitions with Leader: 14 have several brokers in the ISR for that partition.  It is safe to stop kafka1014
root@kafka1014:~# service kafka stop

Notice how (eventually) after the broker stops, broker 14 is no longer the leader for any topics when you run kafka topics --describe.

Once you are ready to start the broker back up, you can do so with a simple service kafka start.

It will likely take a few minutes for the broker to recover after restarting. It needs to check all of its recent logs to make sure that it doesn't have any incomplete messages. It will also start replicating partitions from where it left off when it was restarted. Keep checking kafka topics --describe until all topic-partitions have all brokers in the isr. Once the topic-partitions are up to date on all brokers, you can start a replica election to balance the leaders across brokers. NOTE: auto leader rebalancing is enabled, but after any broker restarts you should probably run a kafka preferred-replica-election, as it is possible that high volume partitions might not resync fast enough to be considered by the auto rebalancer. See T207768 for more info.

Network switch restarts; or, groups of Kafka brokers offline

The key to zero downtime Kafka broker restarts is to ensure that there are never any partitions for which all replicas are offline at once. Usually, all of the nodes in a WMF datacenter row use the same network switch. At WMF, we use the Kafka broker.rack setting to assign a datacenter row letter to each broker. When creating partitions, Kafka will use the broker.rack setting to make sure that partition replicas are distributed as evenly as possible between different racks. Thus, as long as all partitions have been created while broker.rack was applied properly, it should be safe to restart all brokers in the same row and not risk any downtime.

Caveat: it is possible to have partition replicas that do not respect this setting. broker.rack is only consulted when partition replicas are created, and it is also possible to manually assign partitions.

If you are expecting a downtime of a group of Kafka brokers, it is probably best to check the output of kafka topics --describe and make sure there are no partition replicas for which you will be taking down all brokers. If there are, you should probably manually reassign the partition replicas so that at least one is on a broker that will be online.

Note: In Kafka 3.4.0, a consumer rack awareness feature was added. If/when we upgrade, we should update these docs with relevant consumer details.

Replica Elections

To trigger a leadership rebalance manually, do the following.

List topics to see the current leader assignments:

root@kafka1014:~$ kafka topics --describe
Topic:webrequest_bits   PartitionCount:12       ReplicationFactor:3     Configs:
        Topic: webrequest_bits  Partition: 0    Leader: 22      Replicas: 22,12,18      Isr: 12,18,22
        Topic: webrequest_bits  Partition: 1    Leader: 12      Replicas: 12,18,21      Isr: 12,18,21
        Topic: webrequest_bits  Partition: 2    Leader: 18      Replicas: 18,21,22      Isr: 18,22,21
        Topic: webrequest_bits  Partition: 3    Leader: 21      Replicas: 21,22,12      Isr: 12,22,21
        Topic: webrequest_bits  Partition: 4    Leader: 22      Replicas: 22,18,21      Isr: 18,22,21
        Topic: webrequest_bits  Partition: 5    Leader: 12      Replicas: 12,21,22      Isr: 12,22,21
        Topic: webrequest_bits  Partition: 6    Leader: 18      Replicas: 18,22,12      Isr: 12,18,22
        Topic: webrequest_bits  Partition: 7    Leader: 21      Replicas: 21,12,18      Isr: 12,18,21
        Topic: webrequest_bits  Partition: 8    Leader: 22      Replicas: 22,21,12      Isr: 12,22,21
        Topic: webrequest_bits  Partition: 9    Leader: 12      Replicas: 12,22,18      Isr: 12,18,22
        Topic: webrequest_bits  Partition: 10   Leader: 18      Replicas: 18,12,21      Isr: 12,18,21
        Topic: webrequest_bits  Partition: 11   Leader: 21      Replicas: 21,18,22      Isr: 18,22,21
Topic:webrequest_mobile PartitionCount:12       ReplicationFactor:3     Configs:
        Topic: webrequest_mobile        Partition: 0    Leader: 12      Replicas: 12,21,22      Isr: 12,22,21
        Topic: webrequest_mobile        Partition: 1    Leader: 18      Replicas: 18,22,12      Isr: 12,18,22
        Topic: webrequest_mobile        Partition: 2    Leader: 21      Replicas: 21,12,18      Isr: 12,18,21
        Topic: webrequest_mobile        Partition: 3    Leader: 22      Replicas: 22,18,21      Isr: 18,22,21
        Topic: webrequest_mobile        Partition: 4    Leader: 12      Replicas: 12,22,18      Isr: 12,18,22
        Topic: webrequest_mobile        Partition: 5    Leader: 18      Replicas: 18,12,21      Isr: 12,18,21
        Topic: webrequest_mobile        Partition: 6    Leader: 21      Replicas: 21,18,22      Isr: 18,22,21
        Topic: webrequest_mobile        Partition: 7    Leader: 22      Replicas: 22,21,12      Isr: 12,22,21
        Topic: webrequest_mobile        Partition: 8    Leader: 12      Replicas: 12,18,21      Isr: 12,18,21
        Topic: webrequest_mobile        Partition: 9    Leader: 18      Replicas: 18,21,22      Isr: 18,22,21
        Topic: webrequest_mobile        Partition: 10   Leader: 21      Replicas: 21,22,12      Isr: 12,22,21
        Topic: webrequest_mobile        Partition: 11   Leader: 22      Replicas: 22,12,18      Isr: 12,18,22
Topic:webrequest_text   PartitionCount:12       ReplicationFactor:3     Configs:
        Topic: webrequest_text  Partition: 0    Leader: 22      Replicas: 22,21,12      Isr: 12,22,21
        Topic: webrequest_text  Partition: 1    Leader: 12      Replicas: 12,22,18      Isr: 12,18,22
        Topic: webrequest_text  Partition: 2    Leader: 18      Replicas: 18,12,21      Isr: 12,18,21
        Topic: webrequest_text  Partition: 3    Leader: 21      Replicas: 21,18,22      Isr: 18,22,21
        Topic: webrequest_text  Partition: 4    Leader: 22      Replicas: 22,12,18      Isr: 12,18,22
        Topic: webrequest_text  Partition: 5    Leader: 12      Replicas: 12,18,21      Isr: 12,18,21
        Topic: webrequest_text  Partition: 6    Leader: 18      Replicas: 18,21,22      Isr: 18,22,21
        Topic: webrequest_text  Partition: 7    Leader: 21      Replicas: 21,22,12      Isr: 12,22,21
        Topic: webrequest_text  Partition: 8    Leader: 22      Replicas: 22,18,21      Isr: 18,22,21
        Topic: webrequest_text  Partition: 9    Leader: 12      Replicas: 12,21,22      Isr: 12,22,21
        Topic: webrequest_text  Partition: 10   Leader: 18      Replicas: 18,22,12      Isr: 12,18,22
        Topic: webrequest_text  Partition: 11   Leader: 21      Replicas: 21,12,18      Isr: 12,18,21
Topic:webrequest_upload PartitionCount:12       ReplicationFactor:3     Configs:
        Topic: webrequest_upload        Partition: 0    Leader: 18      Replicas: 18,12,21      Isr: 12,18,21
        Topic: webrequest_upload        Partition: 1    Leader: 21      Replicas: 21,18,22      Isr: 22,18,21
        Topic: webrequest_upload        Partition: 2    Leader: 22      Replicas: 22,21,12      Isr: 12,22,21
        Topic: webrequest_upload        Partition: 3    Leader: 12      Replicas: 12,22,18      Isr: 12,18,22
        Topic: webrequest_upload        Partition: 4    Leader: 18      Replicas: 18,21,22      Isr: 18,22,21
        Topic: webrequest_upload        Partition: 5    Leader: 21      Replicas: 21,22,12      Isr: 22,12,21
        Topic: webrequest_upload        Partition: 6    Leader: 22      Replicas: 22,12,18      Isr: 12,18,22
        Topic: webrequest_upload        Partition: 7    Leader: 12      Replicas: 12,18,21      Isr: 12,18,21
        Topic: webrequest_upload        Partition: 8    Leader: 18      Replicas: 18,22,12      Isr: 12,18,22
        Topic: webrequest_upload        Partition: 9    Leader: 21      Replicas: 21,12,18      Isr: 12,18,21
        Topic: webrequest_upload        Partition: 10   Leader: 22      Replicas: 22,18,21      Isr: 18,22,21
        Topic: webrequest_upload        Partition: 11   Leader: 12      Replicas: 12,21,22      Isr: 12,22,21

In this case, you can see that leaders are balanced across all brokers. If they weren't (E.g.: broker “21” not appearing as leader) you can ask Kafka to do a leader election by running the following command on one of the brokers (i.e.: no sudo; the broker chosen does not matter).

 kafka preferred-replica-election

Wait few seconds, no more than a minute. If all goes well, kafka topics --describe again and you should see the leaders properly balanced.

Verify the leader of a topic

Kafka nomenclature recap: a generic queue is called 'topic', and each one of them can be split in multiple partitions that producers and consumers will use to spread the load. For example, let's use the kafka topics --describe command described above to inspect a topic state:

Topic:webrequest_text	PartitionCount:24	ReplicationFactor:3	Configs:retention.bytes=375809638400
	Topic: webrequest_text	Partition: 0	Leader: 18	Replicas: 18,22,12	Isr: 22,18,12
	Topic: webrequest_text	Partition: 1	Leader: 20	Replicas: 20,12,13	Isr: 20,13,12
	Topic: webrequest_text	Partition: 2	Leader: 22	Replicas: 22,13,14	Isr: 13,14,22
	Topic: webrequest_text	Partition: 3	Leader: 12	Replicas: 12,14,18	Isr: 14,18,12
	Topic: webrequest_text	Partition: 4	Leader: 13	Replicas: 13,18,20	Isr: 20,13,18
	[...CUT...]
	Topic: webrequest_text	Partition: 22	Leader: 20	Replicas: 20,22,12	Isr: 20,22,12
	Topic: webrequest_text	Partition: 23	Leader: 22	Replicas: 22,12,13	Isr: 13,22,12

The webrequest_text topic contains data coming from Varnishkafka, related to the cache text HTTP requests. It is composed by 24 partitions (you can see them as the real queues that consumers/producers will use), each of them replicated three times. Zooming in:

Topic: webrequest_text	Partition: 0	Leader: 18	Replicas: 18,22,12	Isr: 22,18,12

Partition 0 has three replicas, stored on the kafka1018, kafka1012 and kafka1022 brokers. The broker that acts as leader for the partition is kafka1018, the other two have the duty to keep themselves in sync with it. The number of replicas in sync with the leader is described by the Isr field (short for In Sync Replicas), and of course the perfect state is when "Replicas" is the same as "Isr". Loosing one Isr replica may happen in case a broker goes down for some reason, and it is usually not a big deal if nothing else is happening (2/3 in sync replicas are enough to ensure resiliency).


Alter topic partitions number

Be careful when doing this, as it is not reversible. Consumers may be relying on a certain number of partitions, so make sure you are aware of any downstream consequences for consumers before doing this.

kafka topics --alter --topic my_topic_name --partitions 5

Alter topic retention settings

Our default is to keep data in a topic for 7 days. You can vary both the time and the size of a topic that should be kept. The time based retention setting is retention.ms and the size based retention setting is retention.bytes. To alter a topic config:

kafka configs --alter --entity-type topics --entity-name my_topic_name --add-config retention.ms=2678400000

To undo this and revert to the default setting, you can delete the config setting:

kafka configs --alter --entity-type topics --entity-name my_topic_name --delete-config retention.ms

Delete a topic

Kafka stores all the topic names in Zookeeper, so the easiest way to delete a topic is to use the following command from a Kafka host of the cluster in which the topic has been created (we'll use main-eqiad as example from now on):

#~ kafka topics --delete --topic thisisatest
kafka-topics --zookeeper conf1004.eqiad.wmnet,conf1005.eqiad.wmnet,conf1006.eqiad.wmnet/kafka/main-eqiad --delete --topic thisisatest

As you can see there is a bash script on every Kafka host that automatically expands the kafka prefixed commands with all the boring parameters like zookeeper hostnames, paths, etc.. Before explaining what the command does under the hood, here's a list of places in which a topic name may be found in Zookeeper (keeping main-eqiad as example):

  • /kafka/main-eqiad/brokers/topics
  • /kafka/main-eqiad/admin/delete_topics
  • /kafka/main-eqiad/config/topics

The first path stores znodes that hold the status of a given topic/partition, in particular what brokers are handling it:

[zk: localhost:2181(CONNECTED) 14] get /kafka/main-eqiad/brokers/topics/eqiad.mediawiki.job.categoryMembershipChange/partitions/0/state
{"controller_epoch":96,"leader":1001,"version":1,"leader_epoch":102,"isr":[1003,1002,1001]}
[..cut..]

The second one contains the list of topics flagged to be deleted, for example after running the kafka topics --delete command stated above. The last one generic information about the topic:

[zk: localhost:2181(CONNECTED) 17] get /kafka/main-eqiad/config/topics/eqiad.mediawiki.job.categoryMembershipChange
{"version":1,"config":{}}
[..cut..]

So in a normal scenario, the kafka topics --delete command should take care of these three paths by itself, setting up znodes in a way to instruct the Kafka brokers to do the proper clean up work. During an outage we discovered that sometimes this doesn't work, for example in the presence of Kafka core bugs like https://issues.apache.org/jira/browse/KAFKA-7156. In that particular case, the Kafka brokers were not able to delete topics due to a filename length problem on the ext4 data disk partitions, and since zookeeper was not updated correctly, even manual rm commands were not doing the right thing, since during boot the brokers always try to re-create a directory structure under the data directory that resembles what stored in zookeeper. So during the outage the brokers were re-creating topics after each restart, ending up in the same problem over and over again, so we had to grab the list of topics to delete, and do the following for each one of them:

#/bin/bash

for topic in $(cat topics_to_delete)
do
  echo "Deleting ${topic}"
  /usr/share/zookeeper/bin/zkCli.sh rmr /kafka/main-eqiad/brokers/topics/${topic}
  /usr/share/zookeeper/bin/zkCli.sh rmr /kafka/main-eqiad/config/topics/${topic}
  /usr/share/zookeeper/bin/zkCli.sh rmr /kafka/main-eqiad/admin/delete_topics/${topic}
done

The above procedure is the nuclear option to use if nothing else works, and should be done with all brokers stopped to avoid them interfering with the deletion. After zookeeper is clean, the Kafka brokers can be restarted one at the time and they should be able to boot correctly without any issue.

Handling a downed broker

It happened in the past that events like a single disk failure in the analytics-eqiad cluster caused tons of alarms fired due to a Kafka broker going down (we are not using RAID but JBOD for analytics-eqiad, this is why a disk failure cause troubles). Generally, one Kafka broker down in our clusters does not cause any harm to the overall Kafka availability, but it wise to double check to be sure when that happens. The above sections (especially "Verify the state of a topic") are a good start to make sure that the cluster is healthy, moreover it is wise to check the various dashboards listed in Kafka#Monitoring.

Usually the impact of a single broker down is nothing more than alarms for replicas not in sync, but as we discussed in the above sections this is not a critical situation if nothing else explodes at the same time. Just systemctl mask the kafka daemon and file a task to fix the hardware issue. During this time, daemons like Varnishkafka could log several errors stating that the broker is down, but librdkafka is strong enough to guarantee that no message is lost.

We are not aware of any bug or software issue that might happen to a Kafka broker.

Recovering a laggy broker replica

If a Kafka Broker goes offline for a long while, it will likely come back online and be far behind in logs. It will need to catch up on logs from remaining brokers before it can be put back into the ISR. During normal operation, replicas should be able to stay in sync with each other. But when one broker is far behind, you may need to tweak settings to encourage Kafka to spend more resources keeping replicas up to date.

num.replica.fetchers

This setting controls the number of threads dedicated to fetching logs from other replicas. Bump this number up temporarily and restart Kafka to try to get it to consume faster.

replica.fetch.max.bytes

This is the number of bytes that each fetch request will attempt to consume from each topic-partition. The actual number of bytes being requested at a time will be this multiplied by the number of topic-partitions. Be careful not to set this too high.

Checking consumer offsets

As of November 2015, LinkedIn's Burrow is installed and running on krypton.eqiad.wmnet. It is configured to email analytics admins if consumers start lagging. You can also query it directly via its HTTP interface. E.g., to see if a particular consumer group is lagging, and where its latest offset commits are:

 curl http://krypton.eqiad.wmnet:8000/v2/kafka/eqiad/consumer/mysql-m4-master/topic/eventlogging-valid-mixed
 {"error":false,"message":"consumer group topic offsets returned","offsets":[79014572,79014599,79003923,79014602,79014593,79014599,79014574,79014599,79003640,79014585,79014592,79014597]}

CLI

Run kafka-consumer-groups with no flags to get a full list of options.

Get a list of groupsː

 KAFKA_HEAP_OPTS="-Xmx512M" kafka-consumer-groups --bootstrap-server localhost:9092 --list

Get a list of topics for a group with their consumer offsets:

 KAFKA_HEAP_OPTS="-Xmx512M" kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group <group>

Setting an offset to latest for a topic in group:

 KAFKA_HEAP_OPTS="-Xmx512M" kafka-consumer-groups --bootstrap-server localhost:9092 --reset-offsets --to-latest --group <group> --topic <topic> --dry-run

When ready to execute the operation, change --dry-run to --execute.

The offset for a topic partition can be specified as well: --topic <topic>:<partition>(,<partition>)

Purge broker logs

After being off for a while, when a broker rejoins its cluster it will replicate anything it has missed while it was off. Kafka's log.retention.hours setting is applied by looking at the mtime (or ctime?) of its log data files. On a recently started broker, these files are written later than usual, as replication catches back up. These files will have mtimes later than the produce time of the messages inside them, and as such their deletion may be delayed. This can cause disks to fill up more than usual.

If you receive an alarm like:

<icinga-wm_> PROBLEM - Disk space on kafka1012 is CRITICAL: DISK CRITICAL - free space: /var/spool/kafka/b 73705 MB (3% inode=99%): /var/spool/kafka/f 127290 MB (6% inode=99%)

You might want to purge some old logs from the broker's partitions. One strategy is to lower down the log retention rate. You can do this either per topic dynamically or statically for all topics by editing server.properties.

Temporarily Modify Per Topic Retention Settings

If you are just trying to buy some time until the global retention setting takes effect you can dynamically alter configuration for a topic. If you do this for a high volume topic, you might get Kafka to delete just enough data on disk to buy you enough time.

# Set rentention.ms for the high volume webrequest upload topic.  This will delete any log files on disk older than 48 hours.
kafka configs --alter --entity-type topics --entity-name webrequest_upload --add-config retention.ms=172800000

# Set rentention.bytes for the high volume webrequest text topic partitions.  
# Please note the fact that you will have to set the maximum size of the 
# topic's partitions, not the topic as a whole!
# This will delete any log files on disk bigger than 536GB.
kafka configs --alter --entity-type topics --entity-name webrequest_text --add-config retention.bytes=536870912000

# Wait until brokers delete data.  Make sure you have enough room to spare.  If not, consider setting retention.ms for another topic.
# Once brokers have deleted data, it is safe to delete the per topic config override to reset it to the global default.
kafka configs --alter --entity-type topics --entity-name webrequest_upload --delete-config retention.ms

Temporarily Edit Global Retention Settings

If you need data for many topics deleted, it may be worth temporarily changing the global retention setting. This is more disruptive than doing so dynamically, since you must restart brokers for the config to be applied.

sudo puppet agent --disable
sudo vim /etc/kafka/server.properties # lower down log.retention.hours=168 
sudo service kafka restart

Please remember to log your work in #wikimedia-operations. After checking /var/log/kafka/server.log for a confirmation of the delete actions (together with a df -h of course) please restore the previous config:

sudo vim /etc/kafka/server.properties # reset to default log.retention.hours=168 
sudo service kafka restart
sudo puppet agent --enable

After doing this for a broker, you should remember to run kafka preferred-replica-election to rebalance topic-partition leadership.

Kafka Certificates

Newer versions of Kafka support TLS encryption, authentication and authorization. Certificates are signed by the PKI kafka intermediate CA, and they are automatically provisioned by puppet if the hiera flag is set. The only requirement is to set the keystore's password in puppet private (check the operations/puppet repository's kafka::broker profile for more info).

Certificates and keys used to managed only using cergen (signed by our Puppet CA and distributed using Puppet). It is still an option if, for some reason, PKI is not viable. To create a new client key and certificate, add an entry to a cergen manifest file and run cergen with the --generate option as describe on the cergen documentation page. git add and commit the files to the puppet private repository, and then distribute the relevant files via puppet and configure your client.

Kafka ACLs

Kafka ACLs are used to restrict access to Kafka cluster operations, Kafka topics, and Kafka consumer groups. By default, if an ACL exists for a specific resource, e.g. a topic, then all operations on that resource will be denied to any principal (AKA certificate) not explicitly listed for that resource. We want to allow anonymous unencrypted uses of most Kafka topics, but restrict certain others. For example, we'd like to restrict writes to any webrequest topic to only varnishkafka producers, but still allow for anyone to consume. To do this, we need the proper invocation of the kafka acls command. (Much of this was originally figured out and documented in T167304.)

For certificate based authentication, we need to specify the full x509 DistinguishedName. To keep things simple, we generate subject-less certificates, so that we only have to use the CommonName in the ACLs. Before we add any set up any restricted ACLs at all, we need to allow defaults from the ANONYMOUS user:

# Allow ANONYOMOUS to produce to any topic
kafka acls --add --allow-principal User:ANONYMOUS --producer --topic '*'

# Allow ANONYMOUS to consume from any topic in any consumer group
kafka acls --add --allow-principal User:ANONYMOUS --consumer --topic '*' --group '*'

# Allow ANONYMOUS to use transactional producers via the IdempotentWrite cluster permission
# https://kpow.io/articles/kafka-producer-breaking-change/
kafka acls  --add --allow-principal User:ANONYMOUS --cluster --operation IdempotentWrite

# After this, kafka acls --list should show:
kafka acls --list
Current ACLs for resource `Group:*`:
 	User:ANONYMOUS has Allow permission for operations: Read from hosts: *

Current ACLs for resource `Topic:*`:
 	User:ANONYMOUS has Allow permission for operations: Describe from hosts: *
 	User:ANONYMOUS has Allow permission for operations: DescribeConfigs from hosts: *
	User:ANONYMOUS has Allow permission for operations: Write from hosts: *
	User:ANONYMOUS has Allow permission for operations: Read from hosts: *

Current ACLs for resource `Cluster:kafka-cluster`:
 	User:ANONYMOUS has Allow permission for operations: Create from hosts: *

Now we can restrict operations on resources to other principals. When a client communicates over the SSL port 9093, it will attempt to authenticate with its certificate's DN. You need to add ACLs for that DN if you want that client to be able to use Kafka at all. For this example, we'll allow User:CN=varnishkafka to produce to the webrequest topic, restrict anyone else from producing to webrequest, but still allow anyone to read from webrequest. Note that we've used wildcard topic and group names here. Kafka does not (yet?) support full glob wildcards. It is all or nuthing! E.g. --topic 'webrequest_*' will not work.

# Allow User:CN=varnishkafka to be a producer for the webrequest topic
kafka acls --add --allow-principal User:CN=varnishkafka --producer --topic webrequest

# Deny unauthenticated users the ability to write to the webrequest topic
kafka acls --add --deny-principal User:ANONYMOUS --operation Write --topic webrequest

# Now we have the following ACLs defined:
kafka acls --list
Current ACLs for resource `Group:*`:
 	User:ANONYMOUS has Allow permission for operations: Read from hosts: *

Current ACLs for resource `Topic:*`:
 	User:ANONYMOUS has Allow permission for operations: DescribeConfigs from hosts: *
 	User:ANONYMOUS has Allow permission for operations: Describe from hosts: *
	User:ANONYMOUS has Allow permission for operations: Write from hosts: *
	User:ANONYMOUS has Allow permission for operations: Read from hosts: *

Current ACLs for resource `Topic:webrequest`:
 	User:CN=varnishkafka has Allow permission for operations: Write from hosts: *
	User:CN=varnishkafka has Allow permission for operations: Describe from hosts: *
	User:ANONYMOUS has Deny permission for operations: Write from hosts: *

Current ACLs for resource `Cluster:kafka-cluster`:
 	User:ANONYMOUS has Allow permission for operations: Create from hosts: *
	User:CN=varnishkafka has Allow permission for operations: Create from hosts: *

Because no ACL exists for Topic:webrequest Read operation, any User, including User:ANONYMOUS, can still consume from the webrequest topic.

Swapping broken disk

Similar to the Hadoop/Administration#Swapping_broken_disk, new disks on analytics Kafka brokers need to have some megacli tweaks for them to be useable. This needed to be done in T136933 . The following are steps that were taken then.

These Kafka brokers have 12 disks in JBOD via MegaRAID. We'll need to clear foreign configuration from the old disk, and then mark this disk to be used as JBOD.

Procedure:

  1. Check the status of the disks after the swap using the following commands:
    # Check general info about disks attached to the RAID and their status.
    sudo megacli -PDList -aAll 
    
    # Get Firmware status only to have a quick peek about disks status:
    sudo megacli -PDList -aAll | grep Firm
    
    # Check Virtual Drive info
    sudo megacli -LDInfo -LAll -aAll
    
  2. if you see "Firmware state: Unconfigured(bad)" fix it with:
    # From the previous commands you should be able to fill in the variables 
    # with the values of the disk's properties indicated below:
    # X => Enclosure Device ID
    # Y => Slot Number
    # Z => Controller number
    megacli -PDMakeGood -PhysDrv[X:Y] -aZ
    
    # Example:
    # otto@kafka1012:~$ sudo megacli -PDList -aAll | egrep "Adapter|Enclosure Device ID:|Slot Number:|Firmware state"
    # Adapter #0
    # Enclosure Device ID: 32
    # Slot Number: 5
    # Firmware state: Online, Spun Up
    # [..content cut..]
    
    megacli -PDMakeGood -PhysDrv[32:5] -a0
    
  3. Check for Foreign disks and fix them if needed:
    server:~# megacli -CfgForeign -Scan -a0
    There are 1 foreign configuration(s) on controller 0.
    
    server:~# megacli -CfgForeign -Clear -a0
    Foreign configuration 0 is cleared on controller 0.
    
  4. Add the disk as JBOD:
    megacli -PDMakeJBOD -PhysDrv[32:5] -a0
    
  5. You should be able to see the disk using fdisk, now is the time to add the partition and fs to it to complete the work.
    sudo fdisk /dev/sdf
    # ... make a new primary partition filling up the whole disk
    sudo mkfs.ext4 /dev/sdf1
    sudo tune2fs -m 0 /dev/sdf1
    

Very useful info contained in: http://hwraid.le-vert.net/wiki/LSIMegaRAIDSAS

Upgrade Checklist

The Analytics team experienced dataloss and a lot of headaches when performing a routine Kafka upgrade from 0.8.2.0 -> 0.8.2.1 in August 2015. The following is a deployment checklist we came up with as part of the postmortem after that outage. When upgrading, please follow this checklist before you proceed in production.

  • Update (or remove) api_version in kafka_clusters hiera hash in common.yaml
  • Check Release Notes for new versions. e.g. https://archive.apache.org/dist/kafka/0.8.2.1/RELEASE_NOTES.html
  • Check Apache JIRA for bugs that may affect new version(s). e.g. https://issues.apache.org/jira/browse/KAFKA-2496?jql=project%20%3D%20KAFKA%20AND%20affectedVersion%20%3D%200.8.2.1
  • Stress test to see if varnishkafka latency goes up. It may be difficult to do this, but it is worth a try.
    • Set up a varnish+varnishkafka instance and Kafka cluster in labs (if there is not one in deployment-prep already).
    • 1. Use ab (Apache Benchmark) to force varnishkafka to send requests as fast as you can.
    • 2. Record rtt times in varnishkafka stats.json
    • 3. Record data file sizes for Kafka partitions on Kafka brokers in /var/spool/kafka/
    • 4. Upgrade Kafka cluster
    • 5. Repeat steps 1 - 3 and compare results to previous version. There should be no (negative) change.
  • When doing production upgrade, document all steps in a deployment plan. Review and then execute the plan with a peer. Take notes on all steps along the way including execution of times for each step. This allows for easier documentation and correlations later if there are any problems. Be sure to keep an eye on the Kafka dashboard while deploying.

MirrorMaker

Kafka MirrorMaker a glorified Kafka consumer -> producer process. It consumes messages from a source Kafka cluster, and produces them to a destination Kafka cluster. The messages themselves are thus 'reproduced' as new messages. Thus 'mirroring' is different than 'replication'.

MirrorMaker is a peerless Kafka consumer group. Multiple processes belonging to the same Kafka consumer group can run, and will automatically balance load between themselves. It is safe to stop some or all MirrorMaker processes at any time, as long as they are restarted within the smallest Kafka topic retention time (7 days) with enough time to catch back up.

main mirroring

We use Kafka MirrorMaker for datacenter failover between the two main Kafka clusters in eqiad and codfw. These clusters mirror all of their datacenter prefixed topics to each other. Original topics in main-eqiad are prefixed with 'eqiad.', and original topics in main-codfw are prefixed with 'codfw'. This allows whitelisting of the original topics in each source cluster. E.g. the MirrorMaker instance running on main-codfw nodes, named main-eqiad_to_main-codfw, consumes only topics that match eqiad.* from main-eqiad and produces them to main-codfw. The reciprocal MirrorMaker instance, main-codfw_to_main-eqiad, running on main-eqiad nodes, does the opposite.

Since mirroring here is primarily used for datacenter failover, a short (less than a few days) downtime will have no practical impact (as long as there is no primary data center switch during the downtime). After a primary datacenter switch, the codfw.* prefixed topics will start being used in main-codfw, instead of the eqiad.* ones in main-eqiad.

jumbo mirroring

We also use MirrorMaker to consume all of the topics from the main clusters to jumbo-eqiad for analytics and posterity purposes. The jumbo-eqiad nodes run a MirrorMaker instance called main-eqiad_to_jumbo-eqiad. This instance consumes almost all topics from main-eqiad, including the codfw.* prefixed ones that were mirrored by the main-codfw_to_main-eqiad MirrorMaker instance. These topics are then ingested into Hadoop for analytics usage.

Note: As of 2018-07, the job queue and change-prop topics are not needed for anything in the jumbo-eqiad cluster. These topics are bursty and higher volume than the other topics in the main Kafka clusters. We've had problems with MirrorMaker and some of these topics in the past. If there are MirrorMaker errors where MirrorMaker fails producing messages in one of these topics, it is safe to blacklist the topic.

Broker Migrations

Kafka-logging

The below migration process assumes a new host will be assuming the broker ID of the host being migrated away from.

note: the current/old broker will be referred to as "source_host" and the new host being migrated to will be referred to as "dest_host"

  1. Write/merge homer templates/cr/firewall.conf patch to include new broker(s) addresses
  2. Update broker list in eventgate-logging to include new broker(s) addresses
    1. helmfile.d/services/eventgate-logging-external/values-eqiad.yaml
    2. helmfile.d/services/eventgate-logging-external/values-codfw.yaml
    3. helmfile.d/services/eventgate-logging-external/values-staging.yaml
    4. helmfile.d/services/eventgate-logging-external/values.yaml
  3. Downtime source_host in icinga
  4. Disable notifications for dest_host (via hiera patch)
  5. Downtime icinga “under replicated partitions” checks on all members of the kafka cluster undergoing maintenance (e.g. all kafka-logging eqiad broker hosts)
  6. source_host: Disable puppet with message
  7. dest_host: Disable puppet with message
  8. source_host: stop & mask service kafka
  9. Gerrit/puppetmaster: Merge “kafka-logging: replace source_host broker with dest_host" (example: https://gerrit.wikimedia.org/r/c/operations/puppet/+/677009)
  10. Conf1*: run puppet agent
  11. Run puppet agent manually on remaining kafka cluster hosts
  12. run puppet agent on prometheus nodes
  13. run puppet agent on grafana nodes
  14. dest_host: enable puppet & run puppet agent
  15. Ensure kafka, services come up cleanly (watch grafana dashboard for this kafka cluster, and watch consumer lag. it may be necessary to reload grafana in the browser to pick up new cluster hosts)
  16. Give Kafka cluster time to sync and settle down
  17. if replica imbalance does not correct itself, issue a reelection with `kafka preferred-replica-election`
  18. Ensure dest_host green in icinga
  19. Enable dest_host notifications via hiera (revert patch from step 4)
  20. run puppet on source_host

Rebalance topic partitions to new brokers

One interesting use case for Kafka is how to rebalance topic partitions in a cluster when new brokers are added to expand it. We worked on this use case two times: T225005 and T255973.

Kafka by default does not move any partition from one broker to another one unless explicitly told to do it, via kafka reassign-partitions. The tool is very powerful and generic, but the operator needs to come up with a plan about what partitions to move (from broker A to broker B), so tools like topicmappr's "rebuild" may be useful to generate a plan. A plan is basically a list of json files (one for each topic) that can be given as input to kafka reassign-partitions. The tool supports multiple criteria to generate a plan, like pulling metrics from Prometheus, etc.., but for our use cases its default behavior leads to good results (basically move partitions evenly across a set of new brokers).

More details about how topicmappr's rebuild works: https://github.com/DataDog/kafka-kit/wiki/Rebuild-command and https://docs.google.com/presentation/d/1vIzvXOSnIfCK8ZqY33ZOvRhEshARdWXd97fI66J3peI

The overall idea is the following:

  • Find a list of topics that you want to target, for example the ones having a certain minimum level of traffic (like >= 10 mgs/s). Something like this may be a good starting point.
  • Use topicmappr to generate a plan, namely a list of json files (one for each topic/partition combination) that are readable by kafka reassign-partitions
  • Execute manually kafka reassign-partitions for all json files, one at the time. The command, once executed, returns immediately, but kafka may take a lot of time to move the target partition to another broker (depending on size/throttling/etc..).
    • Save the output of the above command since it prints the current status, that can be used to rollback if needed (see below).
    • Use the --throttle option for kafka-reassign-partitions to limit the bw used for data transfers (remember to add it every time you invoke the command otherwise it will be reset to default).
    • Use the --verify option to check the status of a rebalance periodically. The --verify options has also the power to remove the throttle after the rebalance is completed, to restore the broker to its previous state completely (otherwise other replication tasks might suffer as well).

Rebalance topic partitions to equalize broker storage

If some brokers in the cluster have a significantly higher disk usage than others, this can cause unwanted discrepancies in both storage and bandwidth. To reduce the skew between brokers, we can rely on topicmappr rebalance to generate a reassignment plan based on partition size and host disk usage metrics.

To do this, ssh onto any kafka broker, and run the following commands:

# this will fetch disk usage and partition size metrics from prometheus
prometheus-metricsfetcher

# this will instruct topicmappr to attempt to binpack partitions of all topics on all brokers
# in the cluster in order to minimize disk usage spread between the most and least loaded brokers,
# based on the metrics fetched by prometheus-metricsfetcher
topicmappr rebalance --brokers -2 --topics '.*' --out-file cluster-rebalance

# if the output looks satisfactory, execute
kafka reassign-partitions --reassignment-json-file ./cluster-rebalance.json --execute --throttle 50000000

However, if you see a significant number of data (multiple TBs) moved around, of partitions being moved to a non-negligible amount of brokers in the cluster, we suggest that you proceed with the rebalancing in several phases. To do this, run the following commands

topicmappr rebalance --brokers -2 --topics '.*' 

# now, for each generated .json file, run
kafka reassign-partitions -reassignment-json-file <json-file> --execute --throttle 50000000

# When the reassignment has finished (or to check the current status) run
kafka reassign-partitions --reassignment-json-file <json-file> --verify

# and then move on to the next json file

This method takes longer, but it is also lighter on the cluster in terms of impact: each reassignment should take less than a workday, and could be rollbacked if need be, instead of a single Big-Bang data movement.

Rebalance topic partitions to equalize broker throughput

This section is related to various tips to use with kafka reassign-partitions. If you need to move partitions only due to new empty brokers, please see the above sections.

The tool gets as input one json file for each topic, formatted like the following:

{
  "partitions": [
    {"topic": "eqiad.resource-purge", "partition": 0, "replicas": [2001,2005,2003]},
    {"topic": "eqiad.resource-purge", "partition": 1, "replicas": [2002,2004,2001]},
    {"topic": "eqiad.resource-purge", "partition": 2, "replicas": [2003,2004,2002]},
    {"topic": "eqiad.resource-purge", "partition": 3, "replicas": [2004,2001,2002]},
    {"topic": "eqiad.resource-purge", "partition": 4, "replicas": [2005,2001,2002]}
  ],
  "version":1
}

The idea is, for every partition, to instruct Kafka about what brokers should get it. The tool should not be used to increase the partitions number or their replicas, so it is essential to craft a json document that is consistent with the current topic state (number of partition, and number of replicas for each partition). Shuffling where the replicas will be placed helps in redistributing the traffic, since more workers will be able to act as partition leaders and the Kafka producers will be able to spread their traffic to more workers. Caveat: the first replica listed is the one that likely will do the partition leader for the broker. An interesting use case could be this:

{
  "partitions": [
    {"topic": "eqiad.resource-purge", "partition": 0, "replicas": [2001,2005,2003]},
    {"topic": "eqiad.resource-purge", "partition": 1, "replicas": [2002,2004,2001]},
    {"topic": "eqiad.resource-purge", "partition": 2, "replicas": [2004,2003,2002]},
    {"topic": "eqiad.resource-purge", "partition": 3, "replicas": [2004,2001,2002]},
    {"topic": "eqiad.resource-purge", "partition": 4, "replicas": [2005,2001,2002]}
  ],
  "version":1
}

In which I have changed only the partition 2 layout (the first replica is different from above). If we apply this configuration, Kafka will likely assign partition 2 and 3 to the same broker (2004) even if another broker would be the best choice (2003). Kafka rebalance partitions based on their number, it doesn't know much about how the operator wants to shape each broker's traffic, and it trusts the first replica to be a good indication about what the operator wants.

Once you have executed kafka reassign-partitions --execute please run the same command with --verify until the partitions are reassigned (it clears out any throttling setting if applied).

Note: the --throttle option is really nice if you have big topics and you don't want to risk saturating the broker's bandwidth. If applied please remember the suggestion written above about --verify.

Rollback/Stop topic partitions rebalance

Please do this only if you are in a really messed up state. A partition move will likely cause some metrics to show temporary issues, like under replicated partitions etc.., but eventually they auto-resolve.What should the operator do if something happens while the partition is being moved? The solution found by the Analytics/DE team at the time is the following one:

  • Stop the current rebalancing in progress (since it blocks other ones)
# see https://phabricator.wikimedia.org/T255973#6762713

# This of course assuming main-eqiad/jumbo/etc..,
# please use codfw's zookeeper cluster when needed.
ssh conf1004.eqiad.wmnet

elukey@conf1004:~$ sudo -u zookeeper /usr/share/zookeeper/bin/zkCli.sh
rmr /kafka/test-eqiad/admin/reassign_partitions
rmr /kafka/test-eqiad/controller

The above will cause the Kafka controller (one broker elected at the time) to loose its status, and the cluster will elect another one. From this point onward no more partitions are being reassigned.

  • Rollback to the previous state (emitted by kafka-reassign-partitions when executed)

Renew TLS certificate

The brokers of Kafka clusters (that we manage in production) are all using TLS certificates to secure traffic between each other and the clients. At the time of writing (January 2022) there are two main types of certificates used:

  • Puppet - all the brokers use the same TLS certificate created with cergen, and trust only the Puppet CA.
  • PKI Kafka Intermediate - every broker has its own certificate (hostname based) issued by a special PKI intermediate CA.

The latter is preferred, but the efforts to migrate all the clusters to it is still ongoing (see T291905 for more info). In order to check what certificate type is used by a broker, just ssh to it and run:

echo y | openssl s_client -connect $(hostname -f):9093  | openssl x509 -issuer -nout

If the CA mentioned is:

  • the Puppet one, then you'll need to follow Cergen#Update_a_certificate and deploy the new certificate to all nodes.
  • the Kafka PKI Intermediate one, then in theory a new certificate should be issued few days before the expiry and puppet should replace the Kafka keystore automatically (under /etc/kafka/ssl).

In both cases, a roll restart of the brokers is needed to force them to pick up the new certificates. Please use the related Kafka cookbook :)

Is there a way to reload the keystores without restarting?

In theory it should be sufficient to execute the kafka-config command on every broker to force a reload, but it practice this doesn't work with our version of kafka as described in https://phabricator.wikimedia.org/T299409

Increase a topic's replication factor

If you are getting an alert about a topic with an insufficient replication factor, run the following command to increase it, on any broker host in the cluster:

topicmappr rebuild --topic <topic_name> --brokers -2 --replication-factor 3
kafka reassign-partitions --reassignment-json-file ./topic_name>.json --execute --throttle 50000000
kafka reassign-partitions --reassignment-json-file ./topic_name>.json --verify

A topic with replication factor of 1 is a dangerous setup, as if the broker hosting the only replica of a given partition of this topic, then this partition goes fully offline.

Hardware replace a broker

The following steps describe how to replace an existing broker with new hardware, retaining the original broker-id, so that no partitions need to be shuffled around. The examples are based on replacing kafka-main2002 (broker-id 2002) with kafka-main2007:

1. Add all new kafka node IPs to the list of IPs in kafka_brokers_main (hieradata/common.yaml). It does not hurt if they are still in insetup role.

2. Run puppet on conf nodes (update zookeeper firewall with the new IPs)

3. Ensure that UnderReplicatedPartitions is 0

4. Remove the leadership for all partitions assigned to the to be replaced broker

5. The following commands can be run from any kafka broker

kafka-main2003:~$ BROKER_ID=2002

# Remove leadership from broker id 2002, this creates evacuate-2002-leadership.json
kafka-main2003:~$ topicmappr rebuild --leader-evac-brokers $BROKER_ID --leader-evac-topics '.*' --brokers -1 --topics '.*' --out-file evacuate-${BROKER_ID}-leadership --skip-no-ops --ignore-warns 

# Remove leadership for all partitions assigned to broker id 2002: 
kafka-main2003:~$ kafka reassign-partitions --reassignment-json-file evacuate-${BROKER_ID}-leadership.json --execute > evacuate-${BROKER_ID}-leadership-rollback.out 

# Extract rollback json from the output to restore the old state after the new node has catched up 
kafka-main2003:~$ sed '4q;d' evacuate-${BROKER_ID}-leadership-rollback.out > evacuate-${BROKER_ID}-leadership-rollback.json

6. Wait a bit (~5') for the above to settle, you will see it in the Partition Leaders graph. For whatever reason leadership can not always be removed for all partitions with the above method. It's still save to continue as the cluster will update the remaining partitions when the old node goes away. 7. Sync brokers, downtime hosts we are working on, and start the data transfer

# Ensure all brokers are in sync 
cumin1002:~$ DC=codfw; TASK=T363210; KAFKA_OLD=2002; KAFKA_NEW=2007
cumin1002:~$ sudo cumin A:kafka-main-${DC} /usr/local/bin/kafka-broker-in-sync

# Downtime new and old nodes 
cumin1002:~$ sudo cookbook sre.hosts.downtime -t ${TASK} -D 1 -r "Hardware refresh" 'kafka-main200[2,7].codfw.wmnet'

# Silence the following alerts for the cluster (alertmanager web):
# Matchers: <code>alertname=&quot;KafkaUnderReplicatedPartitions&quot;, kafka_cluster=&quot;main-codfw&quot;</code> 

# Disable puppet on new and old nodes, stop kafka and kafka-mirror on the old node: 
cumin1002:~$ sudo cumin kafka-main${KAFKA_OLD}.${DC}.wmnet 'disable-puppet "Hardware refresh - T363210"; systemctl stop kafka-mirror.service kafka.service' 
cumin1002:~$ sudo cumin sudo cumin kafka-main${KAFKA_NEW}.${DC}.wmnet ' 'disable-puppet "Hardware refresh - T363210"'

# Copy kafka data from old to new node
# USE tmux or screen!
cumin1002:~$ sudo transfer.py --no-encrypt --no-checksum kafka-main${KAFKA_OLD}.${DC}.wmnet:/srv/kafka kafka-main${KAFKA_NEW}.${DC}.wmnet:/srv/
  • The last command this uses Transfer.py, and could take up to 5-6 hrs based on the amount of data

8. Assign the broker-id of the old node to the new node in hieradata/common.yaml, assign kafka::main role to new node (like: https://gerrit.wikimedia.org/r/c/operations/puppet/+/1071610)

9. Run puppet on all “active” kafka nodes, prometheus, and roll restart kafka to read the updated config, **Edge caches** and other kafka consumers may observe connection errors during this step.

# Update the kafka config on existing nodes in order to allow connections from the new node 
sudo cumin 'A:kafka-main-codfw and not P{kafka-main200[2,7].codfw.wmnet}' 'run-puppet-agent' 
sudo cookbook sre.kafka.roll-restart-reboot-brokers --no-election --reason 'Hardware refresh' --task-id T363210 --exclude 'kafka-main200[2,7].codfw.wmnet' -a kafka-main-codfw --grace-sleep 120 restart_daemons

10. Make the new host a kafka node, and run puppet on the host itself, and prometheus hosts

# Setup the new node as kafka broker 
sudo cumin kafka-main2007.codfw.wmnet 'run-puppet-agent -e "Hardware refresh - T363210"'
# Add the new node as prometheus target right after
sudo cumin 'A:prometheus and A:codfw' 'run-puppet-agent'

11. Run puppet on deploy host, and deploy external-services update to all k8s clusters:

sudo cumin A:deployment-servers 'run-puppet-agent' 
# On deploy host 
deploy1002:~$ sudo -i 
cd /srv/deployment-charts/helmfile.d/admin_ng/ 
for e in $(kube_environments |grep -v staging$); do helmfile -e $e -l name=external-services apply; done

12. Wait until UnderReplicatedPartitions is 0 again

13. Remove downtime for the new node

sudo cookbook sre.hosts.remove-downtime 'kafka-main2007.codfw.wmnet'

14. Restore previous partition leader state

kafka-main2003:~$ BROKER_ID=2002
kafka-main2003:~$ kafka reassign-partitions --reassignment-json-file evacuate-${BROKER_ID}-leadership-rollback.json --execute
kafka preferred-replica-election

15. Remove the old node from various kafka connection strings and deploy that change (like https://gerrit.wikimedia.org/r/c/operations/deployment-charts/+/1064758)