Files

23 lines
557 B
Python

from datetime import timedelta
import faust
from taxi_rides import TaxiRide
app = faust.App('datatalksclub.stream.v2', broker='kafka://localhost:9092')
topic = app.topic('datatalkclub.yellow_taxi_ride.json', value_type=TaxiRide)
vendor_rides = app.Table('vendor_rides_windowed', default=int).tumbling(
timedelta(minutes=1),
expires=timedelta(hours=1),
)
@app.agent(topic)
async def process(stream):
async for event in stream.group_by(TaxiRide.vendorId):
vendor_rides[event.vendorId] += 1
if __name__ == '__main__':
app.main()