Apache Kafka with Avro and Schema Repo – where in the message does the schema Id go?

Apache Kafka with Avro and Schema Repo – where in the message does the schema Id go?

I want to use Avro to serialize the data for my Kafka messages and would like to use it with an Avro schema repository so I don’t have to include the schema with every message.
Using Avro with Kafka seems like a popular thing to do, and lots of blogs / Stack Overflow questions / usergroups etc reference sending the Schema Id with the message but I cannot find an actual example of where it should go.
I think it should go in the Kafka message header somewhere but I cannot find an obvious place. If it was in the Avro message you would have to decode it against a schema to get the message contents and reveal the schema you need to decode against, which has obvious problems.
I am using the C# client but an example in any language would be great. The message class has these fields:
public MessageMetadata Meta { get; set; }
public byte MagicNumber { get; set; }
public byte Attribute { get; set; }
public byte[] Key { get; set; }
public byte[] Value { get; set; }

but non of these seem correct. The MessageMetaData only has Offset and PartitionId.
So, where should the Avro Schema Id go?

Solutions/Answers:

Solution 1:

The schema id is actually encoded in the avro message itself. Take a look at this to see how encoders/decoders are implemented.

In general what’s happening when you send an Avro message to Kafka:

  1. The encoder gets the schema from the object to be encoded.
  2. Encoder asks the schema registry for an id for this schema. If the schema is already registered you’ll get an existing id, if not – the registry will register the schema and return the new id.
  3. The object gets encoded as follows: [magic byte][schema id][actual message] where magic byte is just a 0x0 byte which is used to distinguish that kind of messages, schema id is a 4 byte integer value the rest is the actual encoded message.
Related:  Kafka Streams can not recover in case of Exception while processing Messages

When you decode the message back here’s what happens:

  1. The decoder reads the first byte and makes sure it is 0x0.
  2. The decoder reads the next 4 bytes and converts them to an integer value. This is how schema id is decoded.
  3. Now when the decoder has a schema id it may ask the schema registry for the actual schema for this id. Voila!

If your key is Avro encoded then your key will be of the format described above. The same applies for value. This way your key and value may be both Avro values and use different schemas.

Edit to answer the question in comment:

The actual schema is stored in the schema repository (that is the whole point of schema repository actually – to store schemas :)). The Avro Object Container Files format has nothing to do with the format described above. KafkaAvroEncoder/Decoder use slightly different message format (but the actual messages are encoded exactly the same way sure).

The main difference between these formats is that Object Container Files carry the actual schema and may contain multiple messages corresponding to that schema, whereas the format described above carries only the schema id and exactly one message corresponding to that schema.

Passing object-container-file-encoded messages around would probably be not obvious to follow/maintain because one Kafka message would then contain multiple Avro messages. Or you could ensure that one Kafka message contains only one Avro message but that would result in carrying schema with each message.

Avro schemas can be quite large (I’ve seen schemas like 600 KB and more) and carrying the schema with each message would be really costly and wasteful so that is where schema repository kicks in – the schema is fetched only once and gets cached locally and all other lookups are just map lookups that are fast.

Related:  Re-processing/reading Kafka records/messages again - What is the purpose of Consumer Group Offset Reset?

References