Kafka/Administration

From Wikitech
Jump to: navigation, search

Safe Broker Restarts

Since partitions have at least 2 (usually 3) replicas, you should be able to restart a broker without losing any messages. Brokers, consumer, and producers will automatically rebalance themselves when a broker dies, but it is nice to allow them to do so gracefully. service kafka stop will perform a graceful shutdown. Before you run this, you should make sure that any topics for which the target broker is the leader also has In Sync Replicas:

root@kafka1014:~# kafka topics --describe
...
Topic:webrequest_text	PartitionCount:12	ReplicationFactor:3	Configs:
	Topic: webrequest_text	Partition: 0	Leader: 18	Replicas: 18,22,12	Isr: 18,22,12
	Topic: webrequest_text	Partition: 1	Leader: 20	Replicas: 20,12,13	Isr: 20,13,12
	Topic: webrequest_text	Partition: 2	Leader: 22	Replicas: 22,13,14	Isr: 22,13,14
	Topic: webrequest_text	Partition: 3	Leader: 12	Replicas: 12,14,18	Isr: 18,12,14
	Topic: webrequest_text	Partition: 4	Leader: 13	Replicas: 13,18,20	Isr: 18,13,20
	Topic: webrequest_text	Partition: 5	Leader: 14	Replicas: 14,20,22	Isr: 22,20,14
	Topic: webrequest_text	Partition: 6	Leader: 18	Replicas: 18,12,13	Isr: 18,13,12
	Topic: webrequest_text	Partition: 7	Leader: 20	Replicas: 20,13,14	Isr: 20,13,14
	Topic: webrequest_text	Partition: 8	Leader: 22	Replicas: 22,14,18	Isr: 22,18,14
	Topic: webrequest_text	Partition: 9	Leader: 12	Replicas: 12,18,20	Isr: 20,18,12
	Topic: webrequest_text	Partition: 10	Leader: 13	Replicas: 13,20,22	Isr: 22,20,13
	Topic: webrequest_text	Partition: 11	Leader: 14	Replicas: 14,22,12	Isr: 22,12,14
...

# partitions with Leader: 14 have several brokers in the ISR for that partition.  It is safe to stop kafka1014
root@kafka1014:~# service kafka stop

Notice how (eventually) after the broker stops, broker 14 is no longer the leader for any topics when you run kafka topics --describe.

Once you are ready to start the broker back up, you can do so with a simple service kafka start.

It will likely take a few minutes for the broker to recover after restarting. It needs to check all of its recent logs to make sure that it doesn't have any incomplete messages. It will also start replicating partitions from where it left off when it was restarted. Keep checking kafka topics --describe until all topic-partitions have all brokers in the isr. Once the topic-partitions are up to date on all brokers, you can start a replica election to balance the leaders across brokers.

Replica Elections

Kafka will not automatically rebalance replica leaders. It will only do so if it is explicitly asked to.

List topics to see the current leader assignments:

root@kafka1014:~$ kafka topics --describe
Topic:webrequest_bits   PartitionCount:12       ReplicationFactor:3     Configs:
        Topic: webrequest_bits  Partition: 0    Leader: 22      Replicas: 22,12,18      Isr: 12,18,22
        Topic: webrequest_bits  Partition: 1    Leader: 12      Replicas: 12,18,21      Isr: 12,18,21
        Topic: webrequest_bits  Partition: 2    Leader: 18      Replicas: 18,21,22      Isr: 18,22,21
        Topic: webrequest_bits  Partition: 3    Leader: 21      Replicas: 21,22,12      Isr: 12,22,21
        Topic: webrequest_bits  Partition: 4    Leader: 22      Replicas: 22,18,21      Isr: 18,22,21
        Topic: webrequest_bits  Partition: 5    Leader: 12      Replicas: 12,21,22      Isr: 12,22,21
        Topic: webrequest_bits  Partition: 6    Leader: 18      Replicas: 18,22,12      Isr: 12,18,22
        Topic: webrequest_bits  Partition: 7    Leader: 21      Replicas: 21,12,18      Isr: 12,18,21
        Topic: webrequest_bits  Partition: 8    Leader: 22      Replicas: 22,21,12      Isr: 12,22,21
        Topic: webrequest_bits  Partition: 9    Leader: 12      Replicas: 12,22,18      Isr: 12,18,22
        Topic: webrequest_bits  Partition: 10   Leader: 18      Replicas: 18,12,21      Isr: 12,18,21
        Topic: webrequest_bits  Partition: 11   Leader: 21      Replicas: 21,18,22      Isr: 18,22,21
Topic:webrequest_mobile PartitionCount:12       ReplicationFactor:3     Configs:
        Topic: webrequest_mobile        Partition: 0    Leader: 12      Replicas: 12,21,22      Isr: 12,22,21
        Topic: webrequest_mobile        Partition: 1    Leader: 18      Replicas: 18,22,12      Isr: 12,18,22
        Topic: webrequest_mobile        Partition: 2    Leader: 21      Replicas: 21,12,18      Isr: 12,18,21
        Topic: webrequest_mobile        Partition: 3    Leader: 22      Replicas: 22,18,21      Isr: 18,22,21
        Topic: webrequest_mobile        Partition: 4    Leader: 12      Replicas: 12,22,18      Isr: 12,18,22
        Topic: webrequest_mobile        Partition: 5    Leader: 18      Replicas: 18,12,21      Isr: 12,18,21
        Topic: webrequest_mobile        Partition: 6    Leader: 21      Replicas: 21,18,22      Isr: 18,22,21
        Topic: webrequest_mobile        Partition: 7    Leader: 22      Replicas: 22,21,12      Isr: 12,22,21
        Topic: webrequest_mobile        Partition: 8    Leader: 12      Replicas: 12,18,21      Isr: 12,18,21
        Topic: webrequest_mobile        Partition: 9    Leader: 18      Replicas: 18,21,22      Isr: 18,22,21
        Topic: webrequest_mobile        Partition: 10   Leader: 21      Replicas: 21,22,12      Isr: 12,22,21
        Topic: webrequest_mobile        Partition: 11   Leader: 22      Replicas: 22,12,18      Isr: 12,18,22
Topic:webrequest_text   PartitionCount:12       ReplicationFactor:3     Configs:
        Topic: webrequest_text  Partition: 0    Leader: 22      Replicas: 22,21,12      Isr: 12,22,21
        Topic: webrequest_text  Partition: 1    Leader: 12      Replicas: 12,22,18      Isr: 12,18,22
        Topic: webrequest_text  Partition: 2    Leader: 18      Replicas: 18,12,21      Isr: 12,18,21
        Topic: webrequest_text  Partition: 3    Leader: 21      Replicas: 21,18,22      Isr: 18,22,21
        Topic: webrequest_text  Partition: 4    Leader: 22      Replicas: 22,12,18      Isr: 12,18,22
        Topic: webrequest_text  Partition: 5    Leader: 12      Replicas: 12,18,21      Isr: 12,18,21
        Topic: webrequest_text  Partition: 6    Leader: 18      Replicas: 18,21,22      Isr: 18,22,21
        Topic: webrequest_text  Partition: 7    Leader: 21      Replicas: 21,22,12      Isr: 12,22,21
        Topic: webrequest_text  Partition: 8    Leader: 22      Replicas: 22,18,21      Isr: 18,22,21
        Topic: webrequest_text  Partition: 9    Leader: 12      Replicas: 12,21,22      Isr: 12,22,21
        Topic: webrequest_text  Partition: 10   Leader: 18      Replicas: 18,22,12      Isr: 12,18,22
        Topic: webrequest_text  Partition: 11   Leader: 21      Replicas: 21,12,18      Isr: 12,18,21
Topic:webrequest_upload PartitionCount:12       ReplicationFactor:3     Configs:
        Topic: webrequest_upload        Partition: 0    Leader: 18      Replicas: 18,12,21      Isr: 12,18,21
        Topic: webrequest_upload        Partition: 1    Leader: 21      Replicas: 21,18,22      Isr: 22,18,21
        Topic: webrequest_upload        Partition: 2    Leader: 22      Replicas: 22,21,12      Isr: 12,22,21
        Topic: webrequest_upload        Partition: 3    Leader: 12      Replicas: 12,22,18      Isr: 12,18,22
        Topic: webrequest_upload        Partition: 4    Leader: 18      Replicas: 18,21,22      Isr: 18,22,21
        Topic: webrequest_upload        Partition: 5    Leader: 21      Replicas: 21,22,12      Isr: 22,12,21
        Topic: webrequest_upload        Partition: 6    Leader: 22      Replicas: 22,12,18      Isr: 12,18,22
        Topic: webrequest_upload        Partition: 7    Leader: 12      Replicas: 12,18,21      Isr: 12,18,21
        Topic: webrequest_upload        Partition: 8    Leader: 18      Replicas: 18,22,12      Isr: 12,18,22
        Topic: webrequest_upload        Partition: 9    Leader: 21      Replicas: 21,12,18      Isr: 12,18,21
        Topic: webrequest_upload        Partition: 10   Leader: 22      Replicas: 22,18,21      Isr: 18,22,21
        Topic: webrequest_upload        Partition: 11   Leader: 12      Replicas: 12,21,22      Isr: 12,22,21

In this case, you can see that leaders are balanced across all brokers. If they weren't (E.g.: broker “21” not appearing as leader) you can ask Kafka to do a leader election by running the following command on one of the brokers (i.e.: no sudo; the broker chosen does not matter).

 kafka preferred-replica-election

Wait few seconds, no more than a minute. If all goes well, kafka topics --describe again and you should see the leaders properly balanced.

Verify the state of a topic

Kafka nomenclature recap: a generic queue is called 'topic', and each one of them can be split in multiple partitions that producers and consumers will use to spread the load. For example, let's use the kafka topics --describe command described above to inspect a topic state:
Topic:webrequest_text	PartitionCount:24	ReplicationFactor:3	Configs:retention.bytes=375809638400
	Topic: webrequest_text	Partition: 0	Leader: 18	Replicas: 18,22,12	Isr: 22,18,12
	Topic: webrequest_text	Partition: 1	Leader: 20	Replicas: 20,12,13	Isr: 20,13,12
	Topic: webrequest_text	Partition: 2	Leader: 22	Replicas: 22,13,14	Isr: 13,14,22
	Topic: webrequest_text	Partition: 3	Leader: 12	Replicas: 12,14,18	Isr: 14,18,12
	Topic: webrequest_text	Partition: 4	Leader: 13	Replicas: 13,18,20	Isr: 20,13,18
	[...CUT...]
	Topic: webrequest_text	Partition: 22	Leader: 20	Replicas: 20,22,12	Isr: 20,22,12
	Topic: webrequest_text	Partition: 23	Leader: 22	Replicas: 22,12,13	Isr: 13,22,12
The webrequest_text topic contains data coming from Varnishkafka, related to the cache text HTTP requests. It is composed by 24 partitions (you can see them as the real queues that consumers/producers will use), each of them replicated three times. Zooming in:
	Topic: webrequest_text	Partition: 0	Leader: 18	Replicas: 18,22,12	Isr: 22,18,12
Partition 0 has three replicas, stored on the kafka1018, kafka1012 and kafka1022 brokers. The broker that acts as leader for the partition is kafka1018, the other two have the duty to keep themselves in sync with it. The number of replicas in sync with the leader is described by the Isr field (short for In Sync Replicas), and of course the perfect state is when "Replicas" is the same as "Isr". Loosing one Isr replica may happen in case a broker goes down for some reason, and it is usually not a big deal if nothing else is happening (2/3 in sync replicas are enough to ensure resiliency).

Handling a downed broker

It happened in the past that events like a single disk failure in the analytics-eqiad cluster caused tons of alarms fired due to a Kafka broker going down (we are not using RAID but JBOD for analytics-eqiad, this is why a disk failure cause troubles). Generally, one Kafka broker down in our clusters does not cause any harm to the overall Kafka availability, but it wise to double check to be sure when that happens. The above sections (especially "Verify the state of a topic") are a good start to make sure that the cluster is healthy, moreover it is wise to check the various dashboards listed in Kafka#Monitoring.

Usually the impact of a single broker down is nothing more than alarms for replicas not in sync, but as we discussed in the above sections this is not a critical situation if nothing else explodes at the same time. Just systemctl mask the kafka daemon and file a task to fix the hardware issue. During this time, daemons like Varnishkafka could log several errors stating that the broker is down, but librdkafka is strong enough to guarantee that no message is lost.

We are not aware of any bug or software issue that might happen to a Kafka broker.

Recovering a laggy broker replica

If a Kafka Broker goes offline for a long while, it will likely come back online and be far behind in logs. It will need to catch up on logs from remaining brokers before it can be put back into the ISR. During normal operation, replicas should be able to stay in sync with each other. But when one broker is far behind, you may need to tweak settings to encourage Kafka to spend more resources keeping replicas up to date.

num.replica.fetchers

This setting controls the number of threads dedicated to fetching logs from other replicas. Bump this number up temporarily and restart Kafka to try to get it to consume faster.

replica.fetch.max.bytes

This is the number of bytes that each fetch request will attempt to consume from each topic-partition. The actual number of bytes being requested at a time will be this multiplied by the number of topic-partitions. Be careful not to set this too high.

Checking consumer offsets

As of November 2015, LinkedIn's Burrow is installed and running on krypton.eqiad.wmnet. It is configured to email analytics admins if consumers start lagging. You can also query it directly via its HTTP interface. E.g., to see if a particular consumer group is lagging, and where its latest offset commits are:

 curl http://krypton.eqiad.wmnet:8000/v2/kafka/eqiad/consumer/mysql-m4-master/topic/eventlogging-valid-mixed
 {"error":false,"message":"consumer group topic offsets returned","offsets":[79014572,79014599,79003923,79014602,79014593,79014599,79014574,79014599,79003640,79014585,79014592,79014597]}

Purge broker logs

After being off for a while, when a broker rejoins its cluster it will replicate anything it has missed while it was off. Kafka's log.retention.hours setting is applied by looking at the mtime (or ctime?) of its log data files. On a recently started broker, these files are written later than usual, as replication catches back up. These files will have mtimes later than the produce time of the messages inside them, and as such their deletion may be delayed. This can cause disks to fill up more than usual.

If you receive an alarm like:
<icinga-wm_> PROBLEM - Disk space on kafka1012 is CRITICAL: DISK CRITICAL - free space: /var/spool/kafka/b 73705 MB (3% inode=99%): /var/spool/kafka/f 127290 MB (6% inode=99%)

You might want to purge some old logs from the broker's partitions. One strategy is to lower down the log retention rate. You can do this either per topic dynamically or statically for all topics by editing server.properties.

Temporarily Modify Per Topic Retention Settings

If you are just trying to buy some time until the global retention setting takes effect you can dynamically alter configuration for a topic. If you do this for a high volume topic, you might get Kafka to delete just enough data on disk to buy you enough time.

# Set rentention.ms for the high volume webrequest upload topic.  This will delete any log files on disk older than 48 hours.
kafka configs --alter --entity-type topics --entity-name webrequest_upload --add-config retention.ms=172800000

# Set rentention.bytes for the high volume webrequest text topic partitions.  
# Please note the fact that you will have to set the maximum size of the 
# topic's partitions, not the topic as a whole!
# This will delete any log files on disk older than 48 hours.
kafka configs --alter --entity-type topics --entity-name webrequest_text --add-config retention.bytes=536870912000

# Wait until brokers delete data.  Make sure you have enough room to spare.  If not, consider setting retention.ms for another topic.
# Once brokers have deleted data, it is safe to delete the per topic config override to reset it to the global default.
kafka configs --alter --entity-type topics --entity-name webrequest_upload --delete-config retention.ms

Temporarily Edit Global Retention Settings

If you need data for many topics deleted, it may be worth temporarily changing the global retention setting. This is more disruptive than doing so dynamically, since you must restart brokers for the config to be applied.

sudo puppet agent --disable
sudo vim /etc/kafka/server.properties # lower down log.retention.hours=168 
sudo service kafka restart
Please remember to log your work in #wikimedia-operations. After checking /var/log/kafka/server.log for a confirmation of the delete actions (together with a df -h of course) please restore the previous config:
sudo vim /etc/kafka/server.properties # reset to default log.retention.hours=168 
sudo service kafka restart
sudo puppet agent --enable

After doing this for a broker, you should remember to run kafka preferred-replica-election to rebalance topic-partition leadership.

Data integrity

Kafka brokers for the Analytics clusters do not have RAID configured for the Broker's partition logs disks/partitions. Kafka relies heavily on the page cache and periodical fsync to keep good performances, and it uses the sendfile system call to move data from file descriptors (i.e. from the log file to the socket) bypassing userspace. This implies that CRC and data integrity checks are delegated to the consumers, so please be wary that SMART errors or broken disks may trigger weird consequences like Camus blocking or returning errors due to data corruption right after polling from Kafka.

New Broker Install

Partitioning

As of 2014-06, all of our brokers have 12 2TB disks. The first two disk have a 30GB RAID 1 /, and a 1GB RAID 1 swap. Partman will create these partitions when the node is installed. Partman is not fancy enough to do the rest, so you will have to do this manually. Copy/pasting the following should do everything necessary to set up the Broker data partitions.

data_directory='/var/spool/kafka'

# sda3 and sdb3 are already created, format them as ext4
for disk in /dev/sd{a,b}; do sudo fdisk $disk <<EOF
t
3
83
w
EOF

done


# sd{c..l}1 are full physical ext4 partitions
for disk in /dev/sd{c,d,e,f,g,h,i,j,k,l}; do sudo fdisk $disk <<EOF
n
p
1


w
EOF

done

# run partprobe to make the kernel pick up the partition changes.
apt-get install parted
partprobe

# mkfs.ext4 all data partitions
for disk_letter in a b c d e f g h i j k l; do
    # use partition 3 on sda and sdb
    if [ "${disk_letter}" = 'a' -o  "${disk_letter}" = 'b' ]; then
        partition_number=3
    else
        partition_number=1
    fi

    partition="/dev/sd${disk_letter}${partition_number}"
    disk_data_directory="${data_directory}/${disk_letter}"

    # Run mkfs.ext4 in background so we don't have to wait
    # for this to complete synchronously.
    mkfs.ext4 $partition &
done


###        IMPORTANT!
# Wait for all the above ext4 filesystems to be formatted
# before running the following loop.
# 


sudo mkdir -p $data_directory
for disk_letter in a b c d e f g h i j k l; do
    # use partition 3 on sda and sdb
    if [ "${disk_letter}" = 'a' -o  "${disk_letter}" = 'b' ]; then
        partition_number=3
    else
        partition_number=1
    fi

    partition="/dev/sd${disk_letter}${partition_number}"
    mount_point="${data_directory}/${disk_letter}"

    # don't reserve any blocks for OS on these partitions
    tune2fs -m 0 $partition

    ### TODO: Edit this script to use UUID in fstab instead of $partition
    # make the mount point
    mkdir -pv $mount_point
    grep -q $mount_point /etc/fstab || echo -e "# Kafka log partition ${disk_letter}\n${partition}\t${mount_point}\text4\tdefaults,noatime,data=writeback,nobh,delalloc\t0\t2" | sudo tee -a /etc/fstab    

    mount -v $mount_point
done

Note: ext4 settings were taken from recommendations found here: https://kafka.apache.org/08/ops.html

Swapping broken disk

Similar to the Hadoop/Administration#Swapping_broken_disk, new disks on analytics Kafka brokers need to have some megacli tweaks for them to be useable. This needed to be done in T136933 . The following are steps that were taken then.

These Kafka brokers have 12 disks in JBOD via MegaRAID. We'll need to clear foreign configuration from the old disk, and then mark this disk to be used as JBOD.

Procedure:

  1. Check the status of the disks after the swap using the following commands:
    # Check general info about disks attached to the RAID and their status.
    sudo megacli -PDList -aAll 
    
    # Get Firmware status only to have a quick peek about disks status:
    sudo megacli -PDList -aAll | grep Firm
    
    # Check Virtual Drive info
    sudo megacli -LDInfo -LAll -aAll
    
  2. if you see "Firmware state: Unconfigured(bad)" fix it with:
    # From the previous commands you should be able to fill in the variables 
    # with the values of the disk's properties indicated below:
    # X => Enclosure Device ID
    # Y => Slot Number
    # Z => Controller number
    megacli -PDMakeGood -PhysDrv[X:Y] -aZ
    
    # Example:
    # otto@kafka1012:~$ sudo megacli -PDList -aAll | egrep "Adapter|Enclosure Device ID:|Slot Number:|Firmware state"
    # Adapter #0
    # Enclosure Device ID: 32
    # Slot Number: 5
    # Firmware state: Online, Spun Up
    # [..content cut..]
    
    megacli -PDMakeGood -PhysDrv[32:5] -a0
    
  3. Check for Foreign disks and fix them if needed:
    server:~# megacli -CfgForeign -Scan -a0
    There are 1 foreign configuration(s) on controller 0.
    
    server:~# megacli -CfgForeign -Clear -a0
    Foreign configuration 0 is cleared on controller 0.
    
  4. Add the disk as JBOD:
    megacli -PDMakeJBOD -PhysDrv[32:5] -a0
    
  5. You should be able to see the disk using fdisk, now is the time to add the partition and fs to it to complete the work.
    sudo fdisk /dev/sdf
    # ... make a new primary partition filling up the whole disk
    sudo mkfs.ext4 /dev/sdf1
    sudo tune2fs -m 0 /dev/sdf1
    

Very useful info contained in: http://hwraid.le-vert.net/wiki/LSIMegaRAIDSAS

Upgrade Checklist

The Analytics team experienced dataloss and a lot of headaches when performing a routine Kafka upgrade from 0.8.2.0 -> 0.8.2.1 in August 2015. The following is a deployment checklist we came up with as part of the postmortem after that outage. When upgrading, please follow this checklist before you proceed in production.

  • Update (or remove) api_version in kafka_clusters hiera hash in common.yaml
  • Check Release Notes for new versions. e.g. https://archive.apache.org/dist/kafka/0.8.2.1/RELEASE_NOTES.html
  • Check Apache JIRA for bugs that may affect new version(s). e.g. https://issues.apache.org/jira/browse/KAFKA-2496?jql=project%20%3D%20KAFKA%20AND%20affectedVersion%20%3D%200.8.2.1
  • Stress test to see if varnishkafka latency goes up. It may be difficult to do this, but it is worth a try.
    • Set up a varnish+varnishkafka instance and Kafka cluster in labs (if there is not one in deployment-prep already).
    • 1. Use ab (Apache Benchmark) to force varnishkafka to send requests as fast as you can.
    • 2. Record rtt times in varnishkafka stats.json
    • 3. Record data file sizes for Kafka partitions on Kafka brokers in /var/spool/kafka/
    • 4. Upgrade Kafka cluster
    • 5. Repeat steps 1 - 3 and compare results to previous version. There should be no (negative) change.
  • When doing production upgrade, document all steps in a deployment plan. Review and then execute the plan with a peer. Take notes on all steps along the way including execution of times for each step. This allows for easier documentation and correlations later if there are any problems. Be sure to keep an eye on the Kafka dashboard while deploying.