How to flush data batch in Kafka Consumer when there are no more records in topic

How to flush data batch in Kafka Consumer when there are no more records in topic

Consider this Kafka consumer that receives data from topic, buffers it into PreparedStatement and when 100K records is batched, it issues INSERT query into DB.
This works well just until the data are still incoming. However when for example 20K records is buffered and there are no more records incoming, it still waits for more 80K records until in flushes the statement. But I’d like to flush those 20K if stalled after some time. How can I do that? I don’t see any way how to hook onto it.
For example in PHP that uses php-rdkafka extension based on librdkafka I get RD_KAFKA_RESP_ERR__PARTITION_EOF when end of partition is reached so it’s pretty easy to hook buffer flush when that happens.
I tried to simplify the code so only significant parts remained
public class TestConsumer {

private final Connection connection;
private final CountDownLatch shutdownLatch;
private final KafkaConsumer consumer;
private int processedCount = 0;

public TestConsumer(Connection connection) {
this.connection = connection;
this.consumer = new KafkaConsumer<>(getConfig(), new StringDeserializer(), new ProtoDeserializer<>(Message.parser()));
this.shutdownLatch = new CountDownLatch(1);
}

public void execute() {
PreparedStatement statement;
try {
statement = getPreparedStatement();
} catch (SQLException e) {
throw new RuntimeException(e);
}

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
commit(statement);

consumer.wakeup();
}));

consumer.subscribe(Collections.singletonList(“source.topic”));

try {
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));

records.forEach(record -> {
Message message = record.value();
try {
fillBatch(statement, message);
statement.addBatch();
} catch (SQLException e) {
throw new RuntimeException(e);
}
});

processedCount += records.count();

if (processedCount > 100000) {
commit(statement);
}
}
} catch (WakeupException e) {
// ignore, we’re closing
} finally {
consumer.close();
shutdownLatch.countDown();
}
}

private void commit(PreparedStatement statement) {
try {
statement.executeBatch();
consumer.commitSync();
processedCount = 0;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

Related:  Streamparse wordcount example

protected void fillBatch(PreparedStatement statement, Message message) throws SQLException {
try {
statement.setTimestamp(1, new Timestamp(message.getTime() * 1000L));
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}

Solutions/Answers:

Solution 1:

I understand your problem like this:

  • You want to consume messages from Kafka

  • Pile them up in memory upto 100K records

  • Commit in batch to a DB

  • But you want to wait only for t seconds (let us say 10 seconds)

This can be achieved in a much efficient and reliable way using Kafka’s built in consumer batching..provided you can somehow predict average size of your messages in bytes.

On the Kafka consumer configuration, you would set the following:

fetch.min.bytes => this should be 100k x average size of messages

fetch.max.wait.ms => this is your timeout in ms (e.g. 5000 for 5 seconds waiting)

max.partition.fetch.bytes => max. amount of data per partition. This helps to refine the total fetch size

max.poll.records => maximum number of records returned in a single poll..could be set to 100K

fetch.max.bytes => if you want to set the upper limit for a single request

This way, you could get upto 100K records if they fit into the defined byte size, but it will wait for a configurable number of millis.

Once the poll returns the records, you could save them in one go and repeat.

References