Files
IremErturk cbe18f2f04 Refactor python streaming examples (#337)
* Refactor tutorial examples

* Updates readmes for week6-streaming

* Adds homework for 2023 week6-streaming

* Fix merge conflicts on README updates
2023-03-07 21:40:14 +01:00

70 lines
3.0 KiB
Python

import os
from typing import Dict, List
from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
from ride_record_key import dict_to_ride_record_key
from ride_record import dict_to_ride_record
from settings import BOOTSTRAP_SERVERS, SCHEMA_REGISTRY_URL, \
RIDE_KEY_SCHEMA_PATH, RIDE_VALUE_SCHEMA_PATH, KAFKA_TOPIC
class RideAvroConsumer:
def __init__(self, props: Dict):
# Schema Registry and Serializer-Deserializer Configurations
key_schema_str = self.load_schema(props['schema.key'])
value_schema_str = self.load_schema(props['schema.value'])
schema_registry_props = {'url': props['schema_registry.url']}
schema_registry_client = SchemaRegistryClient(schema_registry_props)
self.avro_key_deserializer = AvroDeserializer(schema_registry_client=schema_registry_client,
schema_str=key_schema_str,
from_dict=dict_to_ride_record_key)
self.avro_value_deserializer = AvroDeserializer(schema_registry_client=schema_registry_client,
schema_str=value_schema_str,
from_dict=dict_to_ride_record)
consumer_props = {'bootstrap.servers': props['bootstrap.servers'],
'group.id': 'datatalkclubs.taxirides.avro.consumer.2',
'auto.offset.reset': "earliest"}
self.consumer = Consumer(consumer_props)
@staticmethod
def load_schema(schema_path: str):
path = os.path.realpath(os.path.dirname(__file__))
with open(f"{path}/{schema_path}") as f:
schema_str = f.read()
return schema_str
def consume_from_kafka(self, topics: List[str]):
self.consumer.subscribe(topics=topics)
while True:
try:
# SIGINT can't be handled when polling, limit timeout to 1 second.
msg = self.consumer.poll(1.0)
if msg is None:
continue
key = self.avro_key_deserializer(msg.key(), SerializationContext(msg.topic(), MessageField.KEY))
record = self.avro_value_deserializer(msg.value(),
SerializationContext(msg.topic(), MessageField.VALUE))
if record is not None:
print("{}, {}".format(key, record))
except KeyboardInterrupt:
break
self.consumer.close()
if __name__ == "__main__":
config = {
'bootstrap.servers': BOOTSTRAP_SERVERS,
'schema_registry.url': SCHEMA_REGISTRY_URL,
'schema.key': RIDE_KEY_SCHEMA_PATH,
'schema.value': RIDE_VALUE_SCHEMA_PATH,
}
avro_consumer = RideAvroConsumer(props=config)
avro_consumer.consume_from_kafka(topics=[KAFKA_TOPIC])