Kafka/Administration

From Wikitech
Jump to navigation Jump to search

Safe Broker Restarts

Since partitions have at least 2 (usually 3) replicas, you should be able to restart a broker without losing any messages. Brokers, consumer, and producers will automatically rebalance themselves when a broker dies, but it is nice to allow them to do so gracefully. service kafka stop will perform a graceful shutdown. Before you run this, you should make sure that any topics for which the target broker is the leader also has In Sync Replicas:

root@kafka1014:~# kafka topics --describe
...
Topic:webrequest_text	PartitionCount:12	ReplicationFactor:3	Configs:
	Topic: webrequest_text	Partition: 0	Leader: 18	Replicas: 18,22,12	Isr: 18,22,12
	Topic: webrequest_text	Partition: 1	Leader: 20	Replicas: 20,12,13	Isr: 20,13,12
	Topic: webrequest_text	Partition: 2	Leader: 22	Replicas: 22,13,14	Isr: 22,13,14
	Topic: webrequest_text	Partition: 3	Leader: 12	Replicas: 12,14,18	Isr: 18,12,14
	Topic: webrequest_text	Partition: 4	Leader: 13	Replicas: 13,18,20	Isr: 18,13,20
	Topic: webrequest_text	Partition: 5	Leader: 14	Replicas: 14,20,22	Isr: 22,20,14
	Topic: webrequest_text	Partition: 6	Leader: 18	Replicas: 18,12,13	Isr: 18,13,12
	Topic: webrequest_text	Partition: 7	Leader: 20	Replicas: 20,13,14	Isr: 20,13,14
	Topic: webrequest_text	Partition: 8	Leader: 22	Replicas: 22,14,18	Isr: 22,18,14
	Topic: webrequest_text	Partition: 9	Leader: 12	Replicas: 12,18,20	Isr: 20,18,12
	Topic: webrequest_text	Partition: 10	Leader: 13	Replicas: 13,20,22	Isr: 22,20,13
	Topic: webrequest_text	Partition: 11	Leader: 14	Replicas: 14,22,12	Isr: 22,12,14
...

# partitions with Leader: 14 have several brokers in the ISR for that partition.  It is safe to stop kafka1014
root@kafka1014:~# service kafka stop

Notice how (eventually) after the broker stops, broker 14 is no longer the leader for any topics when you run kafka topics --describe.

Once you are ready to start the broker back up, you can do so with a simple service kafka start.

It will likely take a few minutes for the broker to recover after restarting. It needs to check all of its recent logs to make sure that it doesn't have any incomplete messages. It will also start replicating partitions from where it left off when it was restarted. Keep checking kafka topics --describe until all topic-partitions have all brokers in the isr. Once the topic-partitions are up to date on all brokers, you can start a replica election to balance the leaders across brokers. NOTE: replica election may not be necessary on the jumbo Kafka cluster, as auto leader rebalancing is enabled.

Replica Elections

Kafka jumbo and main clusters are configured to do auto leader rebalancing. Only the to-be-deprecated 'analytics' Kafka cluster does not. To balance manually, do the following.

List topics to see the current leader assignments:

root@kafka1014:~$ kafka topics --describe
Topic:webrequest_bits   PartitionCount:12       ReplicationFactor:3     Configs:
        Topic: webrequest_bits  Partition: 0    Leader: 22      Replicas: 22,12,18      Isr: 12,18,22
        Topic: webrequest_bits  Partition: 1    Leader: 12      Replicas: 12,18,21      Isr: 12,18,21
        Topic: webrequest_bits  Partition: 2    Leader: 18      Replicas: 18,21,22      Isr: 18,22,21
        Topic: webrequest_bits  Partition: 3    Leader: 21      Replicas: 21,22,12      Isr: 12,22,21
        Topic: webrequest_bits  Partition: 4    Leader: 22      Replicas: 22,18,21      Isr: 18,22,21
        Topic: webrequest_bits  Partition: 5    Leader: 12      Replicas: 12,21,22      Isr: 12,22,21
        Topic: webrequest_bits  Partition: 6    Leader: 18      Replicas: 18,22,12      Isr: 12,18,22
        Topic: webrequest_bits  Partition: 7    Leader: 21      Replicas: 21,12,18      Isr: 12,18,21
        Topic: webrequest_bits  Partition: 8    Leader: 22      Replicas: 22,21,12      Isr: 12,22,21
        Topic: webrequest_bits  Partition: 9    Leader: 12      Replicas: 12,22,18      Isr: 12,18,22
        Topic: webrequest_bits  Partition: 10   Leader: 18      Replicas: 18,12,21      Isr: 12,18,21
        Topic: webrequest_bits  Partition: 11   Leader: 21      Replicas: 21,18,22      Isr: 18,22,21
Topic:webrequest_mobile PartitionCount:12       ReplicationFactor:3     Configs:
        Topic: webrequest_mobile        Partition: 0    Leader: 12      Replicas: 12,21,22      Isr: 12,22,21
        Topic: webrequest_mobile        Partition: 1    Leader: 18      Replicas: 18,22,12      Isr: 12,18,22
        Topic: webrequest_mobile        Partition: 2    Leader: 21      Replicas: 21,12,18      Isr: 12,18,21
        Topic: webrequest_mobile        Partition: 3    Leader: 22      Replicas: 22,18,21      Isr: 18,22,21
        Topic: webrequest_mobile        Partition: 4    Leader: 12      Replicas: 12,22,18      Isr: 12,18,22
        Topic: webrequest_mobile        Partition: 5    Leader: 18      Replicas: 18,12,21      Isr: 12,18,21
        Topic: webrequest_mobile        Partition: 6    Leader: 21      Replicas: 21,18,22      Isr: 18,22,21
        Topic: webrequest_mobile        Partition: 7    Leader: 22      Replicas: 22,21,12      Isr: 12,22,21
        Topic: webrequest_mobile        Partition: 8    Leader: 12      Replicas: 12,18,21      Isr: 12,18,21
        Topic: webrequest_mobile        Partition: 9    Leader: 18      Replicas: 18,21,22      Isr: 18,22,21
        Topic: webrequest_mobile        Partition: 10   Leader: 21      Replicas: 21,22,12      Isr: 12,22,21
        Topic: webrequest_mobile        Partition: 11   Leader: 22      Replicas: 22,12,18      Isr: 12,18,22
Topic:webrequest_text   PartitionCount:12       ReplicationFactor:3     Configs:
        Topic: webrequest_text  Partition: 0    Leader: 22      Replicas: 22,21,12      Isr: 12,22,21
        Topic: webrequest_text  Partition: 1    Leader: 12      Replicas: 12,22,18      Isr: 12,18,22
        Topic: webrequest_text  Partition: 2    Leader: 18      Replicas: 18,12,21      Isr: 12,18,21
        Topic: webrequest_text  Partition: 3    Leader: 21      Replicas: 21,18,22      Isr: 18,22,21
        Topic: webrequest_text  Partition: 4    Leader: 22      Replicas: 22,12,18      Isr: 12,18,22
        Topic: webrequest_text  Partition: 5    Leader: 12      Replicas: 12,18,21      Isr: 12,18,21
        Topic: webrequest_text  Partition: 6    Leader: 18      Replicas: 18,21,22      Isr: 18,22,21
        Topic: webrequest_text  Partition: 7    Leader: 21      Replicas: 21,22,12      Isr: 12,22,21
        Topic: webrequest_text  Partition: 8    Leader: 22      Replicas: 22,18,21      Isr: 18,22,21
        Topic: webrequest_text  Partition: 9    Leader: 12      Replicas: 12,21,22      Isr: 12,22,21
        Topic: webrequest_text  Partition: 10   Leader: 18      Replicas: 18,22,12      Isr: 12,18,22
        Topic: webrequest_text  Partition: 11   Leader: 21      Replicas: 21,12,18      Isr: 12,18,21
Topic:webrequest_upload PartitionCount:12       ReplicationFactor:3     Configs:
        Topic: webrequest_upload        Partition: 0    Leader: 18      Replicas: 18,12,21      Isr: 12,18,21
        Topic: webrequest_upload        Partition: 1    Leader: 21      Replicas: 21,18,22      Isr: 22,18,21
        Topic: webrequest_upload        Partition: 2    Leader: 22      Replicas: 22,21,12      Isr: 12,22,21
        Topic: webrequest_upload        Partition: 3    Leader: 12      Replicas: 12,22,18      Isr: 12,18,22
        Topic: webrequest_upload        Partition: 4    Leader: 18      Replicas: 18,21,22      Isr: 18,22,21
        Topic: webrequest_upload        Partition: 5    Leader: 21      Replicas: 21,22,12      Isr: 22,12,21
        Topic: webrequest_upload        Partition: 6    Leader: 22      Replicas: 22,12,18      Isr: 12,18,22
        Topic: webrequest_upload        Partition: 7    Leader: 12      Replicas: 12,18,21      Isr: 12,18,21
        Topic: webrequest_upload        Partition: 8    Leader: 18      Replicas: 18,22,12      Isr: 12,18,22
        Topic: webrequest_upload        Partition: 9    Leader: 21      Replicas: 21,12,18      Isr: 12,18,21
        Topic: webrequest_upload        Partition: 10   Leader: 22      Replicas: 22,18,21      Isr: 18,22,21
        Topic: webrequest_upload        Partition: 11   Leader: 12      Replicas: 12,21,22      Isr: 12,22,21

In this case, you can see that leaders are balanced across all brokers. If they weren't (E.g.: broker “21” not appearing as leader) you can ask Kafka to do a leader election by running the following command on one of the brokers (i.e.: no sudo; the broker chosen does not matter).

 kafka preferred-replica-election

Wait few seconds, no more than a minute. If all goes well, kafka topics --describe again and you should see the leaders properly balanced.

Verify the state of a topic

Kafka nomenclature recap: a generic queue is called 'topic', and each one of them can be split in multiple partitions that producers and consumers will use to spread the load. For example, let's use the kafka topics --describe command described above to inspect a topic state:

Topic:webrequest_text	PartitionCount:24	ReplicationFactor:3	Configs:retention.bytes=375809638400
	Topic: webrequest_text	Partition: 0	Leader: 18	Replicas: 18,22,12	Isr: 22,18,12
	Topic: webrequest_text	Partition: 1	Leader: 20	Replicas: 20,12,13	Isr: 20,13,12
	Topic: webrequest_text	Partition: 2	Leader: 22	Replicas: 22,13,14	Isr: 13,14,22
	Topic: webrequest_text	Partition: 3	Leader: 12	Replicas: 12,14,18	Isr: 14,18,12
	Topic: webrequest_text	Partition: 4	Leader: 13	Replicas: 13,18,20	Isr: 20,13,18
	[...CUT...]
	Topic: webrequest_text	Partition: 22	Leader: 20	Replicas: 20,22,12	Isr: 20,22,12
	Topic: webrequest_text	Partition: 23	Leader: 22	Replicas: 22,12,13	Isr: 13,22,12

The webrequest_text topic contains data coming from Varnishkafka, related to the cache text HTTP requests. It is composed by 24 partitions (you can see them as the real queues that consumers/producers will use), each of them replicated three times. Zooming in:

Topic: webrequest_text	Partition: 0	Leader: 18	Replicas: 18,22,12	Isr: 22,18,12

Partition 0 has three replicas, stored on the kafka1018, kafka1012 and kafka1022 brokers. The broker that acts as leader for the partition is kafka1018, the other two have the duty to keep themselves in sync with it. The number of replicas in sync with the leader is described by the Isr field (short for In Sync Replicas), and of course the perfect state is when "Replicas" is the same as "Isr". Loosing one Isr replica may happen in case a broker goes down for some reason, and it is usually not a big deal if nothing else is happening (2/3 in sync replicas are enough to ensure resiliency).

Delete a topic

Kafka stores all the topic names in Zookeeper, so the easiest way to delete a topic is to use the following command from a Kafka host of the cluster in which the topic has been created (we'll use main-eqiad as example from now on):

#~ kafka topics --delete --topic thisisatest
kafka-topics --zookeeper conf1004.eqiad.wmnet,conf1005.eqiad.wmnet,conf1006.eqiad.wmnet/kafka/main-eqiad --delete --topic thisisatest

As you can see there is a bash script on every Kafka host that automatically expands the kafka prefixed commands with all the boring parameters like zookeeper hostnames, paths, etc.. Before explaining what the command does under the hood, here's a list of places in which a topic name may be found in Zookeeper (keeping main-eqiad as example):

  • /kafka/main-eqiad/brokers/topics
  • /kafka/main-eqiad/admin/delete_topics
  • /kafka/main-eqiad/config/topics

The first path stores znodes that hold the status of a given topic/partition, in particular what brokers are handling it:

[zk: localhost:2181(CONNECTED) 14] get /kafka/main-eqiad/brokers/topics/eqiad.mediawiki.job.categoryMembershipChange/partitions/0/state
{"controller_epoch":96,"leader":1001,"version":1,"leader_epoch":102,"isr":[1003,1002,1001]}
[..cut..]

The second one contains the list of topics flagged to be deleted, for example after running the kafka topics --delete command stated above. The last one generic information about the topic:

[zk: localhost:2181(CONNECTED) 17] get /kafka/main-eqiad/config/topics/eqiad.mediawiki.job.categoryMembershipChange
{"version":1,"config":{}}
[..cut..]

So in a normal scenario, the kafka topics --delete command should take care of these three paths by itself, setting up znodes in a way to instruct the Kafka brokers to do the proper clean up work. During an outage we discovered that sometimes this doesn't work, for example in the presence of Kafka core bugs like https://issues.apache.org/jira/browse/KAFKA-7156. In that particular case, the Kafka brokers were not able to delete topics due to a filename length problem on the ext4 data disk partitions, and since zookeeper was not updated correctly, even manual rm commands were not doing the right thing, since during boot the brokers always try to re-create a directory structure under the data directory that resembles what stored in zookeeper. So during the outage the brokers were re-creating topics after each restart, ending up in the same problem over and over again, so we had to grab the list of topics to delete, and do the following for each one of them:

#/bin/bash

for topic in $(cat topics_to_delete)
do
  echo "Deleting ${topic}"
  /usr/share/zookeeper/bin/zkCli.sh rmr /kafka/main-eqiad/brokers/topics/${topic}
  /usr/share/zookeeper/bin/zkCli.sh rmr /kafka/main-eqiad/config/topics/${topic}
  /usr/share/zookeeper/bin/zkCli.sh rmr /kafka/main-eqiad/admin/delete_topics/${topic}
done

The above procedure is the nuclear option to use if nothing else works, and should be done with all brokers stopped to avoid them interfering with the deletion. After zookeeper is clean, the Kafka brokers can be restarted one at the time and they should be able to boot correctly without any issue.

Handling a downed broker

It happened in the past that events like a single disk failure in the analytics-eqiad cluster caused tons of alarms fired due to a Kafka broker going down (we are not using RAID but JBOD for analytics-eqiad, this is why a disk failure cause troubles). Generally, one Kafka broker down in our clusters does not cause any harm to the overall Kafka availability, but it wise to double check to be sure when that happens. The above sections (especially "Verify the state of a topic") are a good start to make sure that the cluster is healthy, moreover it is wise to check the various dashboards listed in Kafka#Monitoring.

Usually the impact of a single broker down is nothing more than alarms for replicas not in sync, but as we discussed in the above sections this is not a critical situation if nothing else explodes at the same time. Just systemctl mask the kafka daemon and file a task to fix the hardware issue. During this time, daemons like Varnishkafka could log several errors stating that the broker is down, but librdkafka is strong enough to guarantee that no message is lost.

We are not aware of any bug or software issue that might happen to a Kafka broker.

Recovering a laggy broker replica

If a Kafka Broker goes offline for a long while, it will likely come back online and be far behind in logs. It will need to catch up on logs from remaining brokers before it can be put back into the ISR. During normal operation, replicas should be able to stay in sync with each other. But when one broker is far behind, you may need to tweak settings to encourage Kafka to spend more resources keeping replicas up to date.

num.replica.fetchers

This setting controls the number of threads dedicated to fetching logs from other replicas. Bump this number up temporarily and restart Kafka to try to get it to consume faster.

replica.fetch.max.bytes

This is the number of bytes that each fetch request will attempt to consume from each topic-partition. The actual number of bytes being requested at a time will be this multiplied by the number of topic-partitions. Be careful not to set this too high.

Checking consumer offsets

As of November 2015, LinkedIn's Burrow is installed and running on krypton.eqiad.wmnet. It is configured to email analytics admins if consumers start lagging. You can also query it directly via its HTTP interface. E.g., to see if a particular consumer group is lagging, and where its latest offset commits are:

 curl http://krypton.eqiad.wmnet:8000/v2/kafka/eqiad/consumer/mysql-m4-master/topic/eventlogging-valid-mixed
 {"error":false,"message":"consumer group topic offsets returned","offsets":[79014572,79014599,79003923,79014602,79014593,79014599,79014574,79014599,79003640,79014585,79014592,79014597]}

Purge broker logs

After being off for a while, when a broker rejoins its cluster it will replicate anything it has missed while it was off. Kafka's log.retention.hours setting is applied by looking at the mtime (or ctime?) of its log data files. On a recently started broker, these files are written later than usual, as replication catches back up. These files will have mtimes later than the produce time of the messages inside them, and as such their deletion may be delayed. This can cause disks to fill up more than usual.

If you receive an alarm like:

<icinga-wm_> PROBLEM - Disk space on kafka1012 is CRITICAL: DISK CRITICAL - free space: /var/spool/kafka/b 73705 MB (3% inode=99%): /var/spool/kafka/f 127290 MB (6% inode=99%)

You might want to purge some old logs from the broker's partitions. One strategy is to lower down the log retention rate. You can do this either per topic dynamically or statically for all topics by editing server.properties.

Temporarily Modify Per Topic Retention Settings

If you are just trying to buy some time until the global retention setting takes effect you can dynamically alter configuration for a topic. If you do this for a high volume topic, you might get Kafka to delete just enough data on disk to buy you enough time.

# Set rentention.ms for the high volume webrequest upload topic.  This will delete any log files on disk older than 48 hours.
kafka configs --alter --entity-type topics --entity-name webrequest_upload --add-config retention.ms=172800000

# Set rentention.bytes for the high volume webrequest text topic partitions.  
# Please note the fact that you will have to set the maximum size of the 
# topic's partitions, not the topic as a whole!
# This will delete any log files on disk older than 48 hours.
kafka configs --alter --entity-type topics --entity-name webrequest_text --add-config retention.bytes=536870912000

# Wait until brokers delete data.  Make sure you have enough room to spare.  If not, consider setting retention.ms for another topic.
# Once brokers have deleted data, it is safe to delete the per topic config override to reset it to the global default.
kafka configs --alter --entity-type topics --entity-name webrequest_upload --delete-config retention.ms

Temporarily Edit Global Retention Settings

If you need data for many topics deleted, it may be worth temporarily changing the global retention setting. This is more disruptive than doing so dynamically, since you must restart brokers for the config to be applied.

sudo puppet agent --disable
sudo vim /etc/kafka/server.properties # lower down log.retention.hours=168 
sudo service kafka restart

Please remember to log your work in #wikimedia-operations. After checking /var/log/kafka/server.log for a confirmation of the delete actions (together with a df -h of course) please restore the previous config:

sudo vim /etc/kafka/server.properties # reset to default log.retention.hours=168 
sudo service kafka restart
sudo puppet agent --enable

After doing this for a broker, you should remember to run kafka preferred-replica-election to rebalance topic-partition leadership.

Data integrity

Kafka brokers for the Analytics clusters do not have RAID configured for the Broker's partition logs disks/partitions. Kafka relies heavily on the page cache and periodical fsync to keep good performances, and it uses the sendfile system call to move data from file descriptors (i.e. from the log file to the socket) bypassing userspace. This implies that CRC and data integrity checks are delegated to the consumers, so please be wary that SMART errors or broken disks may trigger weird consequences like Camus blocking or returning errors due to data corruption right after polling from Kafka.

Kafka Certificates

Newer versions of Kafka support TLS encryption, authentication and authorization. (As of 2018-01, Kafka jumbo in eqiad is the only Kafka cluster supporting this.) Certificates and keys are managed using cergen. Certificates are signed by our Puppet CA and distributed using Puppet. To create a new client key and certificate, add an entry to a cergen manifest file and run cergen with the --generate option as describe on the cergen documentation page. git add and commit the files to the puppet private repository, and then distribute the relevant files via puppet and configure your client.

Kafka ACLs

Kafka ACLs are used to restrict access to Kafka cluster operations, Kafka topics, and Kafka consumer groups. By default, if an ACL exists for a specific resource, e.g. a topic, then all operations on that resource will be denied to any principal (AKA certificate) not explicitly listed for that resource. We want to allow anonymous unencrypted uses of most Kafka topics, but restrict certain others. For example, we'd like to restrict writes to any webrequest topic to only varnishkafka producers, but still allow for anyone to consume. To do this, we need the proper invocation of the kafka acls command. (Much of this was originally figured out and documented in T167304.)

For certificate based authentication, we need to specify the full x509 DistinguishedName. To keep things simple, we generate subject-less certificates, so that we only have to use the CommonName in the ACLs. Before we add any set up any restricted ACLs at all, we need to allow defaults from the ANONYMOUS user:

# Allow ANONYOMOUS to produce to any topic
kafka acls --add --allow-principal User:ANONYMOUS --producer --topic '*'

# Allow ANONYMOUS to consume from any topic in any consumer group
kafka acls --add --allow-principal User:ANONYMOUS --consumer --topic '*' --group '*'

# After this, kafka acls --list should show:
kafka acls --list
Current ACLs for resource `Group:*`:
 	User:ANONYMOUS has Allow permission for operations: Read from hosts: *

Current ACLs for resource `Topic:*`:
 	User:ANONYMOUS has Allow permission for operations: Describe from hosts: *
	User:ANONYMOUS has Allow permission for operations: Write from hosts: *
	User:ANONYMOUS has Allow permission for operations: Read from hosts: *

Current ACLs for resource `Cluster:kafka-cluster`:
 	User:ANONYMOUS has Allow permission for operations: Create from hosts: *

Now we can restrict operations on resources to other principals. When a client communicates over the SSL port 9093, it will attempt to authenticate with its certificate's DN. You need to add ACLs for that DN if you want that client to be able to use Kafka at all. For this example, we'll allow User:CN=varnishkafka to produce to the webrequest topic, restrict anyone else from producing to webrequest, but still allow anyone to read from webrequest. Note that we've used wildcard topic and group names here. Kafka does not (yet?) support full glob wildcards. It is all or nuthing! E.g. --topic 'webrequest_*' will not work.

# Allow User:CN=varnishkafka to be a producer for the webrequest topic
kafka acls --add --allow-principal User:CN=varnishkafka --producer --topic webrequest

# Deny unauthenticated users the ability to write to the webrequest topic
kafka acls --add --deny-principal User:ANONYMOUS --operation Write --topic webrequest

# Now we have the following ACLs defined:
kafka acls --list
Current ACLs for resource `Group:*`:
 	User:ANONYMOUS has Allow permission for operations: Read from hosts: *

Current ACLs for resource `Topic:*`:
 	User:ANONYMOUS has Allow permission for operations: Describe from hosts: *
	User:ANONYMOUS has Allow permission for operations: Write from hosts: *
	User:ANONYMOUS has Allow permission for operations: Read from hosts: *

Current ACLs for resource `Topic:webrequest`:
 	User:CN=varnishkafka has Allow permission for operations: Write from hosts: *
	User:CN=varnishkafka has Allow permission for operations: Describe from hosts: *
	User:ANONYMOUS has Deny permission for operations: Write from hosts: *

Current ACLs for resource `Cluster:kafka-cluster`:
 	User:ANONYMOUS has Allow permission for operations: Create from hosts: *
	User:CN=varnishkafka has Allow permission for operations: Create from hosts: *

Because no ACL exists for Topic:webrequest Read operation, any User, including User:ANONYMOUS, can still consume from the webrequest topic.

New Broker Install

Partitioning

As of 2014-06, all of our brokers have 12 2TB disks. The first two disk have a 30GB RAID 1 /, and a 1GB RAID 1 swap. Partman will create these partitions when the node is installed. Partman is not fancy enough to do the rest, so you will have to do this manually. Copy/pasting the following should do everything necessary to set up the Broker data partitions.

data_directory='/var/spool/kafka'

# sda3 and sdb3 are already created, format them as ext4
for disk in /dev/sd{a,b}; do sudo fdisk $disk <<EOF
t
3
83
w
EOF

done


# sd{c..l}1 are full physical ext4 partitions
for disk in /dev/sd{c,d,e,f,g,h,i,j,k,l}; do sudo fdisk $disk <<EOF
n
p
1


w
EOF

done

# run partprobe to make the kernel pick up the partition changes.
apt-get install parted
partprobe

# mkfs.ext4 all data partitions
for disk_letter in a b c d e f g h i j k l; do
    # use partition 3 on sda and sdb
    if [ "${disk_letter}" = 'a' -o  "${disk_letter}" = 'b' ]; then
        partition_number=3
    else
        partition_number=1
    fi

    partition="/dev/sd${disk_letter}${partition_number}"
    disk_data_directory="${data_directory}/${disk_letter}"

    # Run mkfs.ext4 in background so we don't have to wait
    # for this to complete synchronously.
    mkfs.ext4 $partition &
done


###        IMPORTANT!
# Wait for all the above ext4 filesystems to be formatted
# before running the following loop.
# 


sudo mkdir -p $data_directory
for disk_letter in a b c d e f g h i j k l; do
    # use partition 3 on sda and sdb
    if [ "${disk_letter}" = 'a' -o  "${disk_letter}" = 'b' ]; then
        partition_number=3
    else
        partition_number=1
    fi

    partition="/dev/sd${disk_letter}${partition_number}"
    mount_point="${data_directory}/${disk_letter}"

    # don't reserve any blocks for OS on these partitions
    tune2fs -m 0 $partition

    ### TODO: Edit this script to use UUID in fstab instead of $partition
    # make the mount point
    mkdir -pv $mount_point
    grep -q $mount_point /etc/fstab || echo -e "# Kafka log partition ${disk_letter}\n${partition}\t${mount_point}\text4\tdefaults,noatime,data=writeback,nobh,delalloc\t0\t2" | sudo tee -a /etc/fstab    

    mount -v $mount_point
done

Note: ext4 settings were taken from recommendations found here: https://kafka.apache.org/08/ops.html

Swapping broken disk

Similar to the Hadoop/Administration#Swapping_broken_disk, new disks on analytics Kafka brokers need to have some megacli tweaks for them to be useable. This needed to be done in T136933 . The following are steps that were taken then.

These Kafka brokers have 12 disks in JBOD via MegaRAID. We'll need to clear foreign configuration from the old disk, and then mark this disk to be used as JBOD.

Procedure:

  1. Check the status of the disks after the swap using the following commands:
    # Check general info about disks attached to the RAID and their status.
    sudo megacli -PDList -aAll 
    
    # Get Firmware status only to have a quick peek about disks status:
    sudo megacli -PDList -aAll | grep Firm
    
    # Check Virtual Drive info
    sudo megacli -LDInfo -LAll -aAll
    
  2. if you see "Firmware state: Unconfigured(bad)" fix it with:
    # From the previous commands you should be able to fill in the variables 
    # with the values of the disk's properties indicated below:
    # X => Enclosure Device ID
    # Y => Slot Number
    # Z => Controller number
    megacli -PDMakeGood -PhysDrv[X:Y] -aZ
    
    # Example:
    # otto@kafka1012:~$ sudo megacli -PDList -aAll | egrep "Adapter|Enclosure Device ID:|Slot Number:|Firmware state"
    # Adapter #0
    # Enclosure Device ID: 32
    # Slot Number: 5
    # Firmware state: Online, Spun Up
    # [..content cut..]
    
    megacli -PDMakeGood -PhysDrv[32:5] -a0
    
  3. Check for Foreign disks and fix them if needed:
    server:~# megacli -CfgForeign -Scan -a0
    There are 1 foreign configuration(s) on controller 0.
    
    server:~# megacli -CfgForeign -Clear -a0
    Foreign configuration 0 is cleared on controller 0.
    
  4. Add the disk as JBOD:
    megacli -PDMakeJBOD -PhysDrv[32:5] -a0
    
  5. You should be able to see the disk using fdisk, now is the time to add the partition and fs to it to complete the work.
    sudo fdisk /dev/sdf
    # ... make a new primary partition filling up the whole disk
    sudo mkfs.ext4 /dev/sdf1
    sudo tune2fs -m 0 /dev/sdf1
    

Very useful info contained in: http://hwraid.le-vert.net/wiki/LSIMegaRAIDSAS

Upgrade Checklist

The Analytics team experienced dataloss and a lot of headaches when performing a routine Kafka upgrade from 0.8.2.0 -> 0.8.2.1 in August 2015. The following is a deployment checklist we came up with as part of the postmortem after that outage. When upgrading, please follow this checklist before you proceed in production.

  • Update (or remove) api_version in kafka_clusters hiera hash in common.yaml
  • Check Release Notes for new versions. e.g. https://archive.apache.org/dist/kafka/0.8.2.1/RELEASE_NOTES.html
  • Check Apache JIRA for bugs that may affect new version(s). e.g. https://issues.apache.org/jira/browse/KAFKA-2496?jql=project%20%3D%20KAFKA%20AND%20affectedVersion%20%3D%200.8.2.1
  • Stress test to see if varnishkafka latency goes up. It may be difficult to do this, but it is worth a try.
    • Set up a varnish+varnishkafka instance and Kafka cluster in labs (if there is not one in deployment-prep already).
    • 1. Use ab (Apache Benchmark) to force varnishkafka to send requests as fast as you can.
    • 2. Record rtt times in varnishkafka stats.json
    • 3. Record data file sizes for Kafka partitions on Kafka brokers in /var/spool/kafka/
    • 4. Upgrade Kafka cluster
    • 5. Repeat steps 1 - 3 and compare results to previous version. There should be no (negative) change.
  • When doing production upgrade, document all steps in a deployment plan. Review and then execute the plan with a peer. Take notes on all steps along the way including execution of times for each step. This allows for easier documentation and correlations later if there are any problems. Be sure to keep an eye on the Kafka dashboard while deploying.

MirrorMaker

Kafka MirrorMaker a glorified Kafka consumer -> producer process. It consumes messages from a source Kafka cluster, and produces them to a destination Kafka cluster. The messages themselves are thus 'reproduced' as new messages. Thus 'mirroring' is different than 'replication'.

MirrorMaker is a peerless Kafka consumer group. Multiple processes belonging to the same Kafka consumer group can run, and will automatically balance load between themselves. It is safe to stop some or all MirrorMaker processes at any time, as long as they are restarted within the smallest Kafka topic retention time (7 days) with enough time to catch back up.

main mirroring

We use Kafka MirrorMaker for datacenter failover between the two main Kafka clusters in eqiad and codfw. These clusters mirror all of their datacenter prefixed topics to each other. Original topics in main-eqiad are prefixed with 'eqiad.', and original topics in main-codfw are prefixed with 'codfw'. This allows whitelisting of the original topics in each source cluster. E.g. the MirrorMaker instance running on main-codfw nodes, named main-eqiad_to_main-codfw, consumes only topics that match eqiad.* from main-eqiad and produces them to main-codfw. The reciprocal MirrorMaker instance, main-codfw_to_main-eqiad, running on main-eqiad nodes, does the opposite.

Since mirroring here is primarily used for datacenter failover, a short (less than a few days) downtime will have no practical impact (as long as there is no primary data center switch during the downtime). After a primary datacenter switch, the codfw.* prefixed topics will start being used in main-codfw, instead of the eqiad.* ones in main-eqiad.

jumbo mirroring

We also use MirrorMaker to consume all of the topics from the main clusters to jumbo-eqiad for analytics and posterity purposes. The jumbo-eqiad nodes run a MirrorMaker instance called main-eqiad_to_jumbo-eqiad. This instance consumes almost all topics from main-eqiad, including the codfw.* prefixed ones that were mirrored by the main-codfw_to_main-eqiad MirrorMaker instance. These topics are then ingested into Hadoop for analytics usage.

Note: As of 2018-07, the job queue and change-prop topics are not needed for anything in the jumbo-eqiad cluster. These topics are bursty and higher volume than the other topics in the main Kafka clusters. We've had problems with MirrorMaker and some of these topics in the past. If there are MirrorMaker errors where MirrorMaker fails producing messages in one of these topics, it is safe to blacklist the topic.