Number of commits and offset in each partition of a kafka topic

Number of commits and offset in each partition of a kafka topic

How to find the number of commits and current offset in each partition of a known kafka topic. I am using kafka v0.8.1.1

Solutions/Answers:

Solution 1:

It is not clear from your question, what kind of offset you’re interested in.
There are actually three types of offsets:

  1. The offset of the first available message in topic’s partition. Use -2
    (earliest) as –time parameter for GetOffsetShell tool
  2. The offset of the last available message in topic’s partition. Use -1(latest) as –time
    parameter.
  3. The last read/processed message offset maintained by
    kafka consumer. High level consumer stores this information, for every consumer group, in
    an internal Kafka topic (used to be Zookeeper) and takes care about
    keeping it up to date when you call commit() or when auto-commit
    setting is set to true. For simple consumer, your code have to take
    care about managing offsets.

In addition to command line utility, the offset information for #1 and #2 is also available via SimpleConsumer.earliestOrLatestOffset().

If the number of messages is not too large, you can specify a large –offsets parameter to GetOffsetShell and then count number of lines returned by the tool. Otherwise, you can write a simple loop in scala/java that would iterate all available offsets starting from the earliest.

From Kafka documentation:

Get Offset Shell
get offsets for a topic
bin/kafka-run-class.sh kafka.tools.GetOffsetShell

required argument [broker-list], [topic]
Option Description 
------ ----------- 
--broker-list <hostname:port,..., REQUIRED: The list of hostname and hostname:port> port of the server to connect to. 
--max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000) 
--offsets <Integer: count> number of offsets returned (default: 1)
--partitions <partition ids> comma separated list of partition ids. If not specified, will find offsets for all partitions (default) 
--time <Long: timestamp in milliseconds / -1(latest) / -2 (earliest) timestamp; offsets will come before this timestamp, as in getOffsetsBefore  > 
--topic <topic> REQUIRED: The topic to get offsets from.

Solution 2:

Regarding the offset of the topic and partition you can use kafka.tools.GetOffsetShell. For example using these command (I have topic games):

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic games --time -1

I will get games:0:47841 which means that for topic games and partition I have latest not used offset 47841 (latest available message).

Related:  Cannot produce message to kafka from service running in docker

You can use -2 to see the first available message.

Solution 3:

Starting version 0.9.0.x you should start to use the kafka.admin.ConsumerGroupCommand tool. Below are the arguments that the tool take

List all consumer groups, describe a consumer group, or delete consumer group info.
Option                                  Description
------                                  -----------
--bootstrap-server <server to connect   REQUIRED (only when using new-
  to>                                     consumer): The server to connect to.
--command-config <command config        Property file containing configs to be
  property file>                          passed to Admin Client and Consumer.
--delete                                Pass in groups to delete topic
                                          partition offsets and ownership
                                          information over the entire consumer
                                          group. For instance --group g1 --
                                          group g2
                                        Pass in groups with a single topic to
                                          just delete the given topic's
                                          partition offsets and ownership
                                          information for the given consumer
                                          groups. For instance --group g1 --
                                          group g2 --topic t1
                                        Pass in just a topic to delete the
                                          given topic's partition offsets and
                                          ownership information for every
                                          consumer group. For instance --topic
                                          t1
                                        WARNING: Group deletion only works for
                                          old ZK-based consumer groups, and
                                          one has to use it carefully to only
                                          delete groups that are not active.
--describe                              Describe consumer group and list
                                          offset lag related to given group.
--group <consumer group>                The consumer group we wish to act on.
--list                                  List all consumer groups.
--new-consumer                          Use new consumer.
--topic <topic>                         The topic whose consumer group
                                          information should be deleted.
--zookeeper <urls>                      REQUIRED (unless new-consumer is
                                          used): The connection string for the
                                          zookeeper connection in the form
                                          host:port. Multiple URLS can be
                                          given to allow fail-over.

To get offsets for a Topic_X for a consumerGroup_Y use the command as below

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper <zookeeper urls> --describe --group consumerGroup_Y

Response would look like

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
consumerGroup, Topic_X, 0, 3030460, 3168412, 137952, none
consumerGroup, Topic_X, 1, 3030903, 3168884, 137981, none
consumerGroup, Topic_X, 2, 801564, 939540, 137976, none
consumerGroup, Topic_X, 3, 737290, 875262, 137972, none
consumerGroup, Topic_X, 4, 737288, 875254, 137966, none
consumerGroup, Topic_X, 5, 737276, 875241, 137965, none
consumerGroup, Topic_X, 6, 737290, 875251, 137961, none
consumerGroup, Topic_X, 7, 737290, 875248, 137958, none
consumerGroup, Topic_X, 8, 737288, 875246, 137958, none
consumerGroup, Topic_X, 9, 737293, 875251, 137958, none
consumerGroup, Topic_X, 10, 737289, 875244, 137955, none
consumerGroup, Topic_X, 11, 737273, 875226, 137953, none

Solution 4:

This information was also helpful in creating a script to view the number of messages on a partition for a topic (from the command line). While tools like Kafka-Web-Console are nice, some of us live in a non-GUI world.

Related:  Kafka check queue size

Here is the script … use and modify it at your own risk 🙂

#!/bin/bash

topic=$1

if [[ -z "${topic}" ]] ; then

    echo "Usage: ${0} <topic>"
    exit 1

fi


if [[ -z "${KAFKA_HOME}" ]] ; then

    # $KAFKA_HOME not set, using default /kafka
    KAFKA_HOME="/kafka"

fi

if [ ! -d ${KAFKA_HOME} ] ; then

    echo "\$KAFKA_HOME does not point to a valid directory [$KAFKA_HOME]"
    exit 1

fi

cd $KAFKA_HOME

echo
echo "Topic: ${topic}: "

#
printf "Partition  Count\n"
printf "~~~~~~~~~~ ~~~~~~~~~~~~\n"

idx=0
for msg in `bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic ${topic} --broker-list localhost:9092 --time -1` ; do

    name=`echo ${msg} | awk -F ":" '{print $1}'`
    partition=`echo ${msg} | awk -F ":" '{print $2}'`
    total=`echo ${msg} | awk -F ":" '{print $3}'`

    printf "%10d %12d\n" ${partition} ${total}
    idx=$((idx + 1))

done

if [ ${idx} -eq 0 ] ; then

    echo "Topic name not found!"
    exit 1

fi

echo
exit ${rc}

References