Understanding Kafka Message Byte Size

Understanding Kafka Message Byte Size

How do I get the size of a single record in Kafka?
There’s some exposition as to why I need this.
This does not appear to be the serializedValueSize exposed on the ConsumerRecord or RecordMetadata classes. I don’t really understand the value of this property, since it doesn’t match the size of the message useful for the consumer. What is the serializedValueSize used for if not this?
I am trying to make my Kafka java application behave like “min.poll.records” if it existed to complement “max.poll.records”. I have to do this because it’s required :). Assuming all messages on a given topic are the same size (which is true in this case), this should be possible from the consumer side by setting the fetch.min.bytes equal to the amount of messages to batch times the byte size of each message.
This exists:

https://kafka.apache.org/documentation/#consumerapi
max.poll.records
The maximum number of records returned in a single call to poll().

This doesn’t exist, but is the behavior I want:

min.poll.records
The minimum number of records returned in a single call to poll(). If not enough records are available before the time specified in fetch.max.wait.ms elapses, then the records are returned anyway, and as such, this is not an absolute minimum.

Here’s what I’ve found so far:

On the producer side, I have “batch.size” set to 1 byte. This forces the producer to send each message individually.
On the consumer size, I have “max.partition.fetch.bytes” set to 291 bytes. This makes the consumer only ever get back 1 message. Setting this value to 292 makes the consumer get back 2 messages sometimes. So I have calculated the message size to be half of 292; The size of one message is 146 bytes.
The above bullets require changes to the Kafka configuration and involve manually looking at / grepping some server logs. It’d be great if the Kafka Java API provided this value.
On the producer side, Kafka provides a way to get the serialized sizes for a record in the RecordMetadata.serializedValueSize method. This value is 76 bytes, much different from the 146 bytes given in the test above.
On the consumer size, Kafka provides the ConsumerRecord API. The serialized value size from this record is also 76. The offset just increments by one each time (not by the byte size of the record).
The size of the key is -1 bytes (the key is null).

Related:  Apache Kafka: Exactly Once in Version 0.10

System.out.println(myRecordMetadata.serializedValueSize());
// 76

# producer
batch.size=1

# consumer

# Expected this to work:
# 76 * 2 = 152
max.partition.fetch.bytes=152

# Actually works:
# 292 = ??? magic ???
max.partition.fetch.bytes=292

I expected that setting the max.partition.fetch.bytes to a multiple of the number of bytes given by the serializedValueSize would make the Kafka consumer receive at maximum that number of records from a poll. Instead, the max.partition.fetch.bytes value needs to be much higher for this to happen.

Solutions/Answers:

Solution 1:

Original answer

I’m not too familiar with the serializedValueSize method but as per the documentation, this is just the size of the value stored in that message. This is going to be less than the total message size (even with null keys) because the message also contains metadata (such as a timestamp) that are not part of the value.

As for your problem: Instead of controlling poll directly by working with message sizes and restricting the throughput of the consumer, why not just buffer the incoming messages until enough of them are available or the desired timeout (you mentioned fetch.max.wait.ms but you could just specify one manually) has elapsed?

public static <K, V> List<ConsumerRecord<K, V>>
    minPoll(KafkaConsumer<K, V> consumer, Duration timeout, int minRecords) {
  List<ConsumerRecord<K, V>> acc = new ArrayList<>();
  long pollTimeout = Duration.ofMillis(timeout.toMillis()/10);
  long start = System.nanoTime();
  do {
    ConsumerRecords<K, V> records = consumer.poll(pollTimeout);
    for(ConsumerRecord<K, V> record : records)
      acc.add(record);
  } while(acc.size() < minRecords &&
          System.nanoTime() - start < timeout.toNanos());
  return acc;
}

The timeout.toMillis()/10 timeout in the call to consumer.poll is arbitrary. You should pick a duration that is small enough such that it doesn’t matter if we wait that amount of time longer than the specified timeout (here: 10% longer).

Related:  Kafka Streaming not working with multiple instances

Edit: Note that this might potentially return a list that is larger than max.poll.records (the maximum being max.poll.records + minRecords - 1). If you also need to enforce this strict upper limit, either use another buffer external to the method to temporarily store the superfluous records (which will probably be faster but doesn’t allow mixing of minPoll and the ordinary poll method) or simply discard them and use the consumer‘s seek method to backtrack.

Answer to updated question

So the question is not so much about controlling the number of messages that are returned by the poll-method but really about how to get the size of a single record. Unfortunately, I don’t think that’s possible without going through a lot of trouble. The thing is that there is no real (constant) answer to this and even a ballpark answer will depend on the Kafka version or rather the different Kafka protocol versions.

First off, I’m not entirely sure what max.partition.fetch.bytes controls exactly (as in: is the protocol overhead also part of it or not?). Let me illustrate what I mean: When the consumer sends a fetch request, then the fetch response consists of the following fields:

  1. Throttle time (4 bytes)
  2. Array of topic responses (4 bytes for array length + size of data in array).

A topic response in turn consists of

  1. Topic name (2 bytes for string length + size of string)
  2. Array of partition responses (4 bytes for array length + size of data in array).

A partition response then has

  1. Partition ID (4 bytes)
  2. Error code (2 bytes)
  3. High watermark (8 bytes)
  4. Last stable offset (8 bytes)
  5. Log start offset (8 bytes)
  6. Array of aborted transactions (4 bytes for array length + data in array)
  7. Record set.
Related:  Unable to connect to Kafka 1.0 Broker On WSL

All this can be found in the FetchResponse.java file. A record set in turn consists of record batches, which contain records. I’m not gonna list all that comprises a record batch (you can see it here). Suffice it to say that the overhead is 61 bytes. Finally, the size of a single record in the batch is a little trickier because it uses varint and varlong fields. It contains

  1. Body size (1-5 bytes)
  2. Attributes (1 byte)
  3. Timestamp delta (1-10 bytes)
  4. Offset delta (1-5 bytes)
  5. Key byte array (1-5 bytes + key data size)
  6. Value byte array (1-5 bytes + value data size)
  7. Headers (1-5 bytes + headers data size).

The source code for this is here. As you can see, you cannot simply divide 292 bytes by two to get the record size because some of the overhead is constant and independent of the number of records.

What’s even worse is that records don’t have constant size, even if their keys and values (and headers) do because the timestamp and offset are stored as differences from the batch timestamp and offset, using a variable length data type. Moreover, this is just the situation for the most recent protocol versions at the point of writing this. For older versions, the answer is going to be different again and who knows what’s gonna happen in future versions.

References