* Refactor tutorial examples * Updates readmes for week6-streaming * Adds homework for 2023 week6-streaming * Fix merge conflicts on README updates
44 lines
1.5 KiB
Python
44 lines
1.5 KiB
Python
import csv
|
|
import json
|
|
from typing import List, Dict
|
|
from kafka import KafkaProducer
|
|
from kafka.errors import KafkaTimeoutError
|
|
|
|
from ride import Ride
|
|
from settings import BOOTSTRAP_SERVERS, INPUT_DATA_PATH, KAFKA_TOPIC
|
|
|
|
|
|
class JsonProducer(KafkaProducer):
|
|
def __init__(self, props: Dict):
|
|
self.producer = KafkaProducer(**props)
|
|
|
|
@staticmethod
|
|
def read_records(resource_path: str):
|
|
records = []
|
|
with open(resource_path, 'r') as f:
|
|
reader = csv.reader(f)
|
|
header = next(reader) # skip the header row
|
|
for row in reader:
|
|
records.append(Ride(arr=row))
|
|
return records
|
|
|
|
def publish_rides(self, topic: str, messages: List[Ride]):
|
|
for ride in messages:
|
|
try:
|
|
record = self.producer.send(topic=topic, key=ride.pu_location_id, value=ride)
|
|
print('Record {} successfully produced at offset {}'.format(ride.pu_location_id, record.get().offset))
|
|
except KafkaTimeoutError as e:
|
|
print(e.__str__())
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Config Should match with the KafkaProducer expectation
|
|
config = {
|
|
'bootstrap_servers': BOOTSTRAP_SERVERS,
|
|
'key_serializer': lambda key: str(key).encode(),
|
|
'value_serializer': lambda x: json.dumps(x.__dict__, default=str).encode('utf-8')
|
|
}
|
|
producer = JsonProducer(props=config)
|
|
rides = producer.read_records(resource_path=INPUT_DATA_PATH)
|
|
producer.publish_rides(topic=KAFKA_TOPIC, messages=rides)
|