* Refactor tutorial examples * Updates readmes for week6-streaming * Adds homework for 2023 week6-streaming * Fix merge conflicts on README updates
70 lines
3.0 KiB
Python
70 lines
3.0 KiB
Python
import os
|
|
from typing import Dict, List
|
|
|
|
from confluent_kafka import Consumer
|
|
from confluent_kafka.schema_registry import SchemaRegistryClient
|
|
from confluent_kafka.schema_registry.avro import AvroDeserializer
|
|
from confluent_kafka.serialization import SerializationContext, MessageField
|
|
|
|
from ride_record_key import dict_to_ride_record_key
|
|
from ride_record import dict_to_ride_record
|
|
from settings import BOOTSTRAP_SERVERS, SCHEMA_REGISTRY_URL, \
|
|
RIDE_KEY_SCHEMA_PATH, RIDE_VALUE_SCHEMA_PATH, KAFKA_TOPIC
|
|
|
|
|
|
class RideAvroConsumer:
|
|
def __init__(self, props: Dict):
|
|
|
|
# Schema Registry and Serializer-Deserializer Configurations
|
|
key_schema_str = self.load_schema(props['schema.key'])
|
|
value_schema_str = self.load_schema(props['schema.value'])
|
|
schema_registry_props = {'url': props['schema_registry.url']}
|
|
schema_registry_client = SchemaRegistryClient(schema_registry_props)
|
|
self.avro_key_deserializer = AvroDeserializer(schema_registry_client=schema_registry_client,
|
|
schema_str=key_schema_str,
|
|
from_dict=dict_to_ride_record_key)
|
|
self.avro_value_deserializer = AvroDeserializer(schema_registry_client=schema_registry_client,
|
|
schema_str=value_schema_str,
|
|
from_dict=dict_to_ride_record)
|
|
|
|
consumer_props = {'bootstrap.servers': props['bootstrap.servers'],
|
|
'group.id': 'datatalkclubs.taxirides.avro.consumer.2',
|
|
'auto.offset.reset': "earliest"}
|
|
self.consumer = Consumer(consumer_props)
|
|
|
|
@staticmethod
|
|
def load_schema(schema_path: str):
|
|
path = os.path.realpath(os.path.dirname(__file__))
|
|
with open(f"{path}/{schema_path}") as f:
|
|
schema_str = f.read()
|
|
return schema_str
|
|
|
|
def consume_from_kafka(self, topics: List[str]):
|
|
self.consumer.subscribe(topics=topics)
|
|
while True:
|
|
try:
|
|
# SIGINT can't be handled when polling, limit timeout to 1 second.
|
|
msg = self.consumer.poll(1.0)
|
|
if msg is None:
|
|
continue
|
|
key = self.avro_key_deserializer(msg.key(), SerializationContext(msg.topic(), MessageField.KEY))
|
|
record = self.avro_value_deserializer(msg.value(),
|
|
SerializationContext(msg.topic(), MessageField.VALUE))
|
|
if record is not None:
|
|
print("{}, {}".format(key, record))
|
|
except KeyboardInterrupt:
|
|
break
|
|
|
|
self.consumer.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
config = {
|
|
'bootstrap.servers': BOOTSTRAP_SERVERS,
|
|
'schema_registry.url': SCHEMA_REGISTRY_URL,
|
|
'schema.key': RIDE_KEY_SCHEMA_PATH,
|
|
'schema.value': RIDE_VALUE_SCHEMA_PATH,
|
|
}
|
|
avro_consumer = RideAvroConsumer(props=config)
|
|
avro_consumer.consume_from_kafka(topics=[KAFKA_TOPIC])
|