Files
IremErturk cbe18f2f04 Refactor python streaming examples (#337)
* Refactor tutorial examples

* Updates readmes for week6-streaming

* Adds homework for 2023 week6-streaming

* Fix merge conflicts on README updates
2023-03-07 21:40:14 +01:00

94 lines
3.9 KiB
Python

import os
import csv
from time import sleep
from typing import Dict
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
from ride_record_key import RideRecordKey, ride_record_key_to_dict
from ride_record import RideRecord, ride_record_to_dict
from settings import RIDE_KEY_SCHEMA_PATH, RIDE_VALUE_SCHEMA_PATH, \
SCHEMA_REGISTRY_URL, BOOTSTRAP_SERVERS, INPUT_DATA_PATH, KAFKA_TOPIC
def delivery_report(err, msg):
if err is not None:
print("Delivery failed for record {}: {}".format(msg.key(), err))
return
print('Record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))
class RideAvroProducer:
def __init__(self, props: Dict):
# Schema Registry and Serializer-Deserializer Configurations
key_schema_str = self.load_schema(props['schema.key'])
value_schema_str = self.load_schema(props['schema.value'])
schema_registry_props = {'url': props['schema_registry.url']}
schema_registry_client = SchemaRegistryClient(schema_registry_props)
self.key_serializer = AvroSerializer(schema_registry_client, key_schema_str, ride_record_key_to_dict)
self.value_serializer = AvroSerializer(schema_registry_client, value_schema_str, ride_record_to_dict)
# Producer Configuration
producer_props = {'bootstrap.servers': props['bootstrap.servers']}
self.producer = Producer(producer_props)
@staticmethod
def load_schema(schema_path: str):
path = os.path.realpath(os.path.dirname(__file__))
with open(f"{path}/{schema_path}") as f:
schema_str = f.read()
return schema_str
@staticmethod
def delivery_report(err, msg):
if err is not None:
print("Delivery failed for record {}: {}".format(msg.key(), err))
return
print('Record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))
@staticmethod
def read_records(resource_path: str):
ride_records, ride_keys = [], []
with open(resource_path, 'r') as f:
reader = csv.reader(f)
header = next(reader) # skip the header
for row in reader:
ride_records.append(RideRecord(arr=[row[0], row[3], row[4], row[9], row[16]]))
ride_keys.append(RideRecordKey(vendor_id=int(row[0])))
return zip(ride_keys, ride_records)
def publish(self, topic: str, records: [RideRecordKey, RideRecord]):
for key_value in records:
key, value = key_value
try:
self.producer.produce(topic=topic,
key=self.key_serializer(key, SerializationContext(topic=topic,
field=MessageField.KEY)),
value=self.value_serializer(value, SerializationContext(topic=topic,
field=MessageField.VALUE)),
on_delivery=delivery_report)
except KeyboardInterrupt:
break
except Exception as e:
print(f"Exception while producing record - {value}: {e}")
self.producer.flush()
sleep(1)
if __name__ == "__main__":
config = {
'bootstrap.servers': BOOTSTRAP_SERVERS,
'schema_registry.url': SCHEMA_REGISTRY_URL,
'schema.key': RIDE_KEY_SCHEMA_PATH,
'schema.value': RIDE_VALUE_SCHEMA_PATH
}
producer = RideAvroProducer(props=config)
ride_records = producer.read_records(resource_path=INPUT_DATA_PATH)
producer.publish(topic=KAFKA_TOPIC, records=ride_records)