Files
IremErturk cbe18f2f04 Refactor python streaming examples (#337)
* Refactor tutorial examples

* Updates readmes for week6-streaming

* Adds homework for 2023 week6-streaming

* Fix merge conflicts on README updates
2023-03-07 21:40:14 +01:00

63 lines
2.1 KiB
Python

import csv
from time import sleep
from typing import Dict
from kafka import KafkaProducer
from settings import BOOTSTRAP_SERVERS, INPUT_DATA_PATH, PRODUCE_TOPIC_RIDES_CSV
def delivery_report(err, msg):
if err is not None:
print("Delivery failed for record {}: {}".format(msg.key(), err))
return
print('Record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))
class RideCSVProducer:
def __init__(self, props: Dict):
self.producer = KafkaProducer(**props)
# self.producer = Producer(producer_props)
@staticmethod
def read_records(resource_path: str):
records, ride_keys = [], []
i = 0
with open(resource_path, 'r') as f:
reader = csv.reader(f)
header = next(reader) # skip the header
for row in reader:
# vendor_id, passenger_count, trip_distance, payment_type, total_amount
records.append(f'{row[0]}, {row[1]}, {row[2]}, {row[3]}, {row[4]}, {row[9]}, {row[16]}')
ride_keys.append(str(row[0]))
i += 1
if i == 5:
break
return zip(ride_keys, records)
def publish(self, topic: str, records: [str, str]):
for key_value in records:
key, value = key_value
try:
self.producer.send(topic=topic, key=key, value=value)
print(f"Producing record for <key: {key}, value:{value}>")
except KeyboardInterrupt:
break
except Exception as e:
print(f"Exception while producing record - {value}: {e}")
self.producer.flush()
sleep(1)
if __name__ == "__main__":
config = {
'bootstrap_servers': [BOOTSTRAP_SERVERS],
'key_serializer': lambda x: x.encode('utf-8'),
'value_serializer': lambda x: x.encode('utf-8')
}
producer = RideCSVProducer(props=config)
ride_records = producer.read_records(resource_path=INPUT_DATA_PATH)
print(ride_records)
producer.publish(topic=PRODUCE_TOPIC_RIDES_CSV, records=ride_records)