23 lines
557 B
Python
23 lines
557 B
Python
from datetime import timedelta
|
|
import faust
|
|
from taxi_rides import TaxiRide
|
|
|
|
|
|
app = faust.App('datatalksclub.stream.v2', broker='kafka://localhost:9092')
|
|
topic = app.topic('datatalkclub.yellow_taxi_ride.json', value_type=TaxiRide)
|
|
|
|
vendor_rides = app.Table('vendor_rides_windowed', default=int).tumbling(
|
|
timedelta(minutes=1),
|
|
expires=timedelta(hours=1),
|
|
)
|
|
|
|
|
|
@app.agent(topic)
|
|
async def process(stream):
|
|
async for event in stream.group_by(TaxiRide.vendorId):
|
|
vendor_rides[event.vendorId] += 1
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.main()
|