Is it correct to use database as a storage for states of messages consumed from kafka?

Is it correct to use database as a storage for states of messages consumed from kafka?

Currently I have implemented a kafka consumer that works as follows:
Inside a while loop:

Consume message from kafka
Put consumed message into seperate task for processing, so that main thread and consumer loop are not blocked
2.1 Commit message only when processing is successful or number of attemps to process exceeeded.

Step #2.1 can take from 1 second upto 6 hours to finish.
The problem is that if application crashes and there were tasks that haven’t finished, on application restart(or even on rebalance) those messages will be consumed and processed again.
I don’t want to commit offsets automatically, because it will guarantee only at-most-once delivery. I was thinking about using database as a storage for message states and implement a consumer as follows:
Inside a while loop:

Consume message from kafka
Check db if such message exists

If message exists in db and state is ‘completed’, then commit message
If message exists in db, but state is ‘in progress’, then go to step #4 directly
If message doesn’t exist, then go to step #3

Save message into database with state ‘in progress’
Put consumed message into seperate task for processing, so that main thread and consumer loop are not blocked
4.1 Commit message and change state in db to ‘completed’ only when processing is successful or number of attemps to process exceeeded.

I am not sure if using a db is a right approach, because if I have many messages, it will slow down the consumer. Can you give me any suggestions on how to correctly implement consumer so that each message is processed only once?

Related:  Kafka 10 kafka-consumer-groups.sh vs. Kafka 8 kafka-run-class.sh of ConsumerOffsetChecker

Solutions/Answers:

Solution 1:

Your consumer should take the task from the stream (Kafka) so that the stream no longer contains that task. If your worker node crashes while running the task you need to implement redundancy / error handling i.e global exception handling & persistent temporary storage. Thus I wouldn’t recommend storing tasks in a database along side the stream, however if you are going to do that then you might as well create a table in Kafka as they are persistent.

When error handling, the implementation strategy is up to you as there are a few ways you could go about it i.e if the node crashes then ready the task back onto the stream ready to be captured by another node, or you could just log the task and notify the user that the task has failed.

References