Kafka Streams: How to fix Serde casting error

Kafka Streams: How to fix Serde casting error

When I simulate word count case with using aggregate function, I encounter Serde casting issue.
Exception in thread “aggregation-transformation-application-43485635-2d3c-4edc-b13c-c6505a793d18-StreamThread-1” org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
at org.apache.kafka.streams.processor.internals.RecordQueue.poll(RecordQueue.java:115)
at org.apache.kafka.streams.processor.internals.PartitionGroup.nextRecord(PartitionGroup.java:100)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:349)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4

Although, I defined Serdes for each task, it throws SerializationException.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class AggregationTransformation {
public static void main(String[] args) {
//prepare config
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, “aggregation-transformation-application”);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, “localhost:9092”);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();

KStream kStream = builder.stream(“agg-table-source-topic”);
KStream kStreamFormatted = kStream.flatMapValues((key, value) ->
Arrays.asList(value.split(“\\W+”))).selectKey((key, value) -> value)
.mapValues(value -> 1);

kStreamFormatted.groupByKey(Grouped.as(null)
.withValueSerde(Serdes.Integer()))
.aggregate(() -> 0,
(aggKey, newValue, aggValue) -> aggValue + newValue,
Materialized.>
as(“aggregated-stream-store”)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Integer())
).toStream().to(“agg-output-topic”, Produced.with(Serdes.String(), Serdes.Integer()));

Topology topology = builder.build();
KafkaStreams kafkaStreams = new KafkaStreams(topology, config);

CountDownLatch countDownLatch = new CountDownLatch(1);

// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread(“streams-shutdown-hook”) {
@Override
public void run() {
kafkaStreams.close();
countDownLatch.countDown();
}
});

try {
kafkaStreams.start();
countDownLatch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}

For first entry as “John Smith” to producer console, I expect that ouput topic (agg-output-topic) should have
John 1
Smith 1

And if I enter the same input to producer (agg-table-source-topic), then output topic should have do aggregation and result should be
John 2
Smith 2

Related:  Is there any way to use confluent schema registry with kafka-node module?

I appriciate for your help.

Solutions/Answers:

Solution 1:

SerializationException means that your Deserializer (in your case IntegerDeserializer) cannot deserializer message – cannot convert bytes to some object (Integer)

How you have written in comment you have changed types from Long to Integer. I think you first start your application with type Long and process several messages and than change type to Integer. Your application save intermediate result in changelog topic and later with new type and Deserializer (Serdes) it cannot deserialize and throw an exception.

If you change a type in your application you have to delete all changelog topic that are created during processing. Otherwise SerializationException might occur.

Solution 2:

When I simulate word count case with using aggregate function […]

Your setup looks very complicated. Why don’t you just do the following?

final KTable<String, Long> aggregated = builder.stream("agg-table-source-topic");
  .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
  .groupBy((keyIgnored, word) -> word)
  // Normally, you'd use `count()` here and be done with it.
  // But you mentioned you intentionally want to use `aggregate(...)`.
  .aggregate(
      () -> 0L,
      (aggKey, newValue, aggValue) -> aggValue + 1L,
      Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("aggregate-store").withValueSerde(Serdes.Long()))

aggregated.toStream().to("agg-output-topic", Produced.with(Serdes.String(), Serdes.Long()));

That is, compared to the normal WordCount example, all you have to do is to replace:

  .count()

with

  .aggregate(
      () -> 0L,
      (aggKey, newValue, aggValue) -> aggValue + 1L,
      Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("aggregate-store").withValueSerde(Serdes.Long()))

Note that the example code above using Long, not Integer, but of course you could change that.

References