Skip to content Skip to sidebar Skip to footer

How To Decode/deserialize Avro With Python From Kafka

I am receiving from a remote server Kafka Avro messages in Python (using the consumer of Confluent Kafka Python library), that represent clickstream data with json dictionaries wit

Solution 1:

I thought Avro library was just to read Avro files, but it actually solved the problem of decoding Kafka messages, as follow: I first import the libraries and give the schema file as a parameter and then create a function to decode the message into a dictionary, that I can use in the consumer loop.

import io

from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema

schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)

def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

c = Consumer()
c.subscribe(topic)
running = True
whilerunning:
    msg = c.poll()
    ifnot msg.error():
        msg_value = msg.value()
        event_dict = decode(msg_value)
        print(event_dict)
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False

Solution 2:

If you use Confluent Schema Registry and want to deserialize avro messages, just add message_bytes.seek(5) to the decode function, since Confluent adds 5 extra bytes before the typical avro-formatted data.

def decode(msg_value):
    message_bytes = io.BytesIO(msg_value)
    message_bytes.seek(5)
    decoder = BinaryDecoder(message_bytes)
    event_dict = reader.read(decoder)
    return event_dict

Solution 3:

If you have access to a Confluent schema registry server, you can also use Confluent's own AvroDeserializer to avoid messing with their magic 5 bytes:

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer

defprocess_record_confluent(record: bytes, src: SchemaRegistryClient, schema: str):
    deserializer = AvroDeserializer(schema_str=schema, schema_registry_client=src)
    return deserializer(record, None) # returns dict

Post a Comment for "How To Decode/deserialize Avro With Python From Kafka"