Files

661 lines
25 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"id": "c4419168-c0e6-4a65-b56e-8454c42060ac",
"metadata": {
"jp-MarkdownHeadingCollapsed": true,
"tags": []
},
"source": [
"### 0. Spark Setup"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "32bd7cdd-8504-4a54-a461-244bf7878d2a",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.spark:spark-avro_2.12:3.3.1 pyspark-shell'"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "3aab2a7e-a685-4925-9c9a-b5adf201af77",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
":: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"Ivy Default Cache set to: /root/.ivy2/cache\n",
"The jars for the packages stored in: /root/.ivy2/jars\n",
"org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency\n",
"org.apache.spark#spark-avro_2.12 added as a dependency\n",
":: resolving dependencies :: org.apache.spark#spark-submit-parent-5a3a4db6-be91-4d32-9884-8b0f38241b3f;1.0\n",
"\tconfs: [default]\n",
"\tfound org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 in central\n",
"\tfound org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 in central\n",
"\tfound org.apache.kafka#kafka-clients;2.8.1 in central\n",
"\tfound org.lz4#lz4-java;1.8.0 in central\n",
"\tfound org.xerial.snappy#snappy-java;1.1.8.4 in central\n",
"\tfound org.slf4j#slf4j-api;1.7.32 in central\n",
"\tfound org.apache.hadoop#hadoop-client-runtime;3.3.2 in central\n",
"\tfound org.spark-project.spark#unused;1.0.0 in central\n",
"\tfound org.apache.hadoop#hadoop-client-api;3.3.2 in central\n",
"\tfound commons-logging#commons-logging;1.1.3 in central\n",
"\tfound com.google.code.findbugs#jsr305;3.0.0 in central\n",
"\tfound org.apache.commons#commons-pool2;2.11.1 in central\n",
"\tfound org.apache.spark#spark-avro_2.12;3.3.1 in central\n",
"\tfound org.tukaani#xz;1.8 in central\n",
":: resolution report :: resolve 544ms :: artifacts dl 11ms\n",
"\t:: modules in use:\n",
"\tcom.google.code.findbugs#jsr305;3.0.0 from central in [default]\n",
"\tcommons-logging#commons-logging;1.1.3 from central in [default]\n",
"\torg.apache.commons#commons-pool2;2.11.1 from central in [default]\n",
"\torg.apache.hadoop#hadoop-client-api;3.3.2 from central in [default]\n",
"\torg.apache.hadoop#hadoop-client-runtime;3.3.2 from central in [default]\n",
"\torg.apache.kafka#kafka-clients;2.8.1 from central in [default]\n",
"\torg.apache.spark#spark-avro_2.12;3.3.1 from central in [default]\n",
"\torg.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 from central in [default]\n",
"\torg.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 from central in [default]\n",
"\torg.lz4#lz4-java;1.8.0 from central in [default]\n",
"\torg.slf4j#slf4j-api;1.7.32 from central in [default]\n",
"\torg.spark-project.spark#unused;1.0.0 from central in [default]\n",
"\torg.tukaani#xz;1.8 from central in [default]\n",
"\torg.xerial.snappy#snappy-java;1.1.8.4 from central in [default]\n",
"\t---------------------------------------------------------------------\n",
"\t| | modules || artifacts |\n",
"\t| conf | number| search|dwnlded|evicted|| number|dwnlded|\n",
"\t---------------------------------------------------------------------\n",
"\t| default | 14 | 0 | 0 | 0 || 14 | 0 |\n",
"\t---------------------------------------------------------------------\n",
":: retrieving :: org.apache.spark#spark-submit-parent-5a3a4db6-be91-4d32-9884-8b0f38241b3f\n",
"\tconfs: [default]\n",
"\t0 artifacts copied, 14 already retrieved (0kB/8ms)\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"23/02/21 21:20:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
]
}
],
"source": [
"from pyspark.sql import SparkSession\n",
"import pyspark.sql.types as T\n",
"import pyspark.sql.functions as F\n",
"\n",
"spark = SparkSession \\\n",
" .builder \\\n",
" .appName(\"Spark-Notebook\") \\\n",
" .getOrCreate()"
]
},
{
"cell_type": "markdown",
"id": "6f4b62fa-b3ce-4a1b-a1f4-2ed332a0d55a",
"metadata": {
"jp-MarkdownHeadingCollapsed": true,
"tags": []
},
"source": [
"### 1. Reading from Kafka Stream\n",
"\n",
"through `readStream`"
]
},
{
"cell_type": "markdown",
"id": "f491fa45-4471-4bc5-92f7-48081f687140",
"metadata": {},
"source": [
"#### 1.1 Raw Kafka Stream"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "82c25cb2-2599-4f9b-8849-967fbb604a44",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# default for startingOffsets is \"latest\"\n",
"df_kafka_raw = spark \\\n",
" .readStream \\\n",
" .format(\"kafka\") \\\n",
" .option(\"kafka.bootstrap.servers\", \"localhost:9092,broker:29092\") \\\n",
" .option(\"subscribe\", \"rides_csv\") \\\n",
" .option(\"startingOffsets\", \"earliest\") \\\n",
" .option(\"checkpointLocation\", \"checkpoint\") \\\n",
" .load()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "d9149ccd-69b2-4f5b-afc0-43567673c634",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- key: binary (nullable = true)\n",
" |-- value: binary (nullable = true)\n",
" |-- topic: string (nullable = true)\n",
" |-- partition: integer (nullable = true)\n",
" |-- offset: long (nullable = true)\n",
" |-- timestamp: timestamp (nullable = true)\n",
" |-- timestampType: integer (nullable = true)\n",
"\n"
]
}
],
"source": [
"df_kafka_raw.printSchema()"
]
},
{
"cell_type": "markdown",
"id": "62e5e753-89c7-460f-a8be-16868ce5c680",
"metadata": {
"jp-MarkdownHeadingCollapsed": true,
"tags": []
},
"source": [
"#### 1.2 Encoded Kafka Stream"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "0b745eed-7d74-421e-8e4b-c8343fda4de3",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"df_kafka_encoded = df_kafka_raw.selectExpr(\"CAST(key AS STRING)\",\"CAST(value AS STRING)\")"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "6839addc-c7c0-4117-8c9c-d2cd59cbf136",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- key: string (nullable = true)\n",
" |-- value: string (nullable = true)\n",
"\n"
]
}
],
"source": [
"df_kafka_encoded.printSchema()"
]
},
{
"cell_type": "markdown",
"id": "6749c4de-6f80-4b91-b2b8-b2968c761d75",
"metadata": {},
"source": [
"#### 1.3 Structure Streaming DataFrame"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "ca20ae37-49f0-421f-9859-73fac8d4ca45",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"def parse_ride_from_kafka_message(df_raw, schema):\n",
" \"\"\" take a Spark Streaming df and parse value col based on <schema>, return streaming df cols in schema \"\"\"\n",
" assert df_raw.isStreaming is True, \"DataFrame doesn't receive streaming data\"\n",
"\n",
" df = df_raw.selectExpr(\"CAST(key AS STRING)\", \"CAST(value AS STRING)\")\n",
"\n",
" # split attributes to nested array in one Column\n",
" col = F.split(df['value'], ', ')\n",
"\n",
" # expand col to multiple top-level columns\n",
" for idx, field in enumerate(schema):\n",
" df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))\n",
" return df.select([field.name for field in schema])"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "e1737bd0-146f-4ee2-a70f-a4657af5bbc6",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"ride_schema = T.StructType(\n",
" [T.StructField(\"vendor_id\", T.IntegerType()),\n",
" T.StructField('tpep_pickup_datetime', T.TimestampType()),\n",
" T.StructField('tpep_dropoff_datetime', T.TimestampType()),\n",
" T.StructField(\"passenger_count\", T.IntegerType()),\n",
" T.StructField(\"trip_distance\", T.FloatType()),\n",
" T.StructField(\"payment_type\", T.IntegerType()),\n",
" T.StructField(\"total_amount\", T.FloatType()),\n",
" ])"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "ae2ce896-f54b-4166-b01f-b5532ab292fe",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"df_rides = parse_ride_from_kafka_message(df_raw=df_kafka_raw, schema=ride_schema)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "cd848228-97c5-4325-8457-97f35e533cd8",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- vendor_id: integer (nullable = true)\n",
" |-- tpep_pickup_datetime: timestamp (nullable = true)\n",
" |-- tpep_dropoff_datetime: timestamp (nullable = true)\n",
" |-- passenger_count: integer (nullable = true)\n",
" |-- trip_distance: float (nullable = true)\n",
" |-- payment_type: integer (nullable = true)\n",
" |-- total_amount: float (nullable = true)\n",
"\n"
]
}
],
"source": [
"df_rides.printSchema()"
]
},
{
"cell_type": "markdown",
"id": "60277fdc-2797-4b23-9ecf-956b76db5778",
"metadata": {
"tags": []
},
"source": [
"### 2 Sink Operation & Streaming Query\n",
"\n",
"through `writeStream`\n",
"\n",
"---\n",
"**Output Sinks**\n",
"- File Sink: stores the output to the directory\n",
"- Kafka Sink: stores the output to one or more topics in Kafka\n",
"- Foreach Sink:\n",
"- (for debugging) Console Sink, Memory Sink\n",
"\n",
"Further details can be found in [Output Sinks](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks)\n",
"\n",
"---\n",
"There are three types of **Output Modes**:\n",
"- Complete: The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.\n",
"- Append (default): Only new rows are added to the Result Table\n",
"- Update: Only updated rows are outputted\n",
"\n",
"[Output Modes](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes) differs based on the set of transformations applied to the streaming data. \n",
"\n",
"--- \n",
"**Triggers**\n",
"\n",
"The [trigger settings](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) of a streaming query define the timing of streaming data processing. Spark streaming support micro-batch streamings schema and you can select following options based on requirements.\n",
"\n",
"- default-micro-batch-mode\n",
"- fixed-interval-micro-batch-mode\n",
"- one-time-micro-batch-mode\n",
"- available-now-micro-batch-mode\n"
]
},
{
"cell_type": "markdown",
"id": "02ca9b08-aa61-46cd-b946-4457ce2cdf5d",
"metadata": {
"tags": []
},
"source": [
"#### Console and Memory Sink"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "74c72469-4c37-417c-a866-a1c1ef75ae8b",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):\n",
" write_query = df.writeStream \\\n",
" .outputMode(output_mode) \\\n",
" .trigger(processingTime=processing_time) \\\n",
" .format(\"console\") \\\n",
" .option(\"truncate\", False) \\\n",
" .start()\n",
" return write_query # pyspark.sql.streaming.StreamingQuery"
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "d866c7ba-f8e9-475d-830a-50ffb2c5472b",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"23/02/21 21:46:12 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-289a958e-f6b6-4b38-a87b-50002d82ec8b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.\n",
"23/02/21 21:46:12 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.\n",
"23/02/21 21:46:12 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-driver-0-3, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-driver-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.\n",
"23/02/21 21:46:12 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-driver-0-3, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-driver-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected\n",
"23/02/21 21:46:13 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor-4, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.\n",
"23/02/21 21:46:13 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor-4, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected\n",
"-------------------------------------------\n",
"Batch: 0\n",
"-------------------------------------------\n",
"+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
"|vendor_id|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|payment_type|total_amount|\n",
"+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
"|1 |2020-07-01 00:25:32 |2020-07-01 00:33:39 |1 |1.5 |2 |9.3 |\n",
"|1 |2020-07-01 00:03:19 |2020-07-01 00:25:43 |1 |9.5 |1 |27.8 |\n",
"|2 |2020-07-01 00:15:11 |2020-07-01 00:29:24 |1 |5.85 |2 |22.3 |\n",
"|2 |2020-07-01 00:30:49 |2020-07-01 00:38:26 |1 |1.9 |1 |14.16 |\n",
"|2 |2020-07-01 00:31:26 |2020-07-01 00:38:02 |1 |1.25 |2 |7.8 |\n",
"|1 |2020-07-01 00:25:32 |2020-07-01 00:33:39 |1 |1.5 |2 |9.3 |\n",
"|1 |2020-07-01 00:03:19 |2020-07-01 00:25:43 |1 |9.5 |1 |27.8 |\n",
"|2 |2020-07-01 00:15:11 |2020-07-01 00:29:24 |1 |5.85 |2 |22.3 |\n",
"|2 |2020-07-01 00:30:49 |2020-07-01 00:38:26 |1 |1.9 |1 |14.16 |\n",
"|2 |2020-07-01 00:31:26 |2020-07-01 00:38:02 |1 |1.25 |2 |7.8 |\n",
"+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
"\n",
"23/02/21 22:11:05 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor-5, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.\n",
"23/02/21 22:11:05 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor-5, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"-------------------------------------------\n",
"Batch: 1\n",
"-------------------------------------------\n",
"+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
"|vendor_id|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|payment_type|total_amount|\n",
"+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
"|1 |2020-07-01 00:25:32 |2020-07-01 00:33:39 |1 |1.5 |2 |9.3 |\n",
"|1 |2020-07-01 00:03:19 |2020-07-01 00:25:43 |1 |9.5 |1 |27.8 |\n",
"|2 |2020-07-01 00:15:11 |2020-07-01 00:29:24 |1 |5.85 |2 |22.3 |\n",
"|2 |2020-07-01 00:30:49 |2020-07-01 00:38:26 |1 |1.9 |1 |14.16 |\n",
"|2 |2020-07-01 00:31:26 |2020-07-01 00:38:02 |1 |1.25 |2 |7.8 |\n",
"+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
"\n"
]
}
],
"source": [
"write_query = sink_console(df_rides, output_mode='append')"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "a9bfa73f-a8cc-4988-a8cf-bf31ee6c449c",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"def sink_memory(df, query_name, query_template):\n",
" write_query = df \\\n",
" .writeStream \\\n",
" .queryName(query_name) \\\n",
" .format('memory') \\\n",
" .start()\n",
" query_str = query_template.format(table_name=query_name)\n",
" query_results = spark.sql(query_str)\n",
" return write_query, query_results"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "b31d0b76-e917-44e7-a14d-f9ce6901c23a",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"23/02/21 21:31:47 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b3e2c096-aa06-4083-9cdf-d6f3cf04fc06. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.\n",
"23/02/21 21:31:47 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.\n",
"23/02/21 21:31:48 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-driver-0-1, groupId=spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-driver-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.\n",
"23/02/21 21:31:48 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-driver-0-1, groupId=spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-driver-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected\n",
"23/02/21 21:31:49 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-executor-2, groupId=spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-executor] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.\n",
"23/02/21 21:31:49 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-executor-2, groupId=spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-executor] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"query_name = 'vendor_id_counts'\n",
"query_template = 'select count(distinct(vendor_id)) from {table_name}'\n",
"write_query, df_vendor_id_counts = sink_memory(df=df_rides, query_name=query_name, query_template=query_template)"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "4ba56111-83bf-4028-ac65-565e0190f310",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"<class 'pyspark.sql.streaming.StreamingQuery'>\n"
]
},
{
"data": {
"text/plain": [
"{'message': 'Waiting for data to arrive',\n",
" 'isDataAvailable': False,\n",
" 'isTriggerActive': True}"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"print(type(write_query)) # pyspark.sql.streaming.StreamingQuery\n",
"write_query.status"
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "7cc37bda-9cfa-402b-9d42-a6ba5271476b",
"metadata": {
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------------------------+\n",
"|count(DISTINCT vendor_id)|\n",
"+-------------------------+\n",
"| 2|\n",
"+-------------------------+\n",
"\n"
]
}
],
"source": [
"df_vendor_id_counts.show()"
]
},
{
"cell_type": "code",
"execution_count": 20,
"id": "88862ca9-4d89-487e-987f-08a2b9e83efe",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"write_query.stop()"
]
},
{
"cell_type": "markdown",
"id": "443d4041-06db-4a4a-89c1-348848cc7ca8",
"metadata": {
"tags": []
},
"source": [
"#### Kafka Sink\n",
"\n",
"To write stream results to `kafka-topic`, the stream dataframe has at least a column with name `value`.\n",
"\n",
"Therefore before starting `writeStream` in kafka format, dataframe needs to be updated accordingly.\n",
"\n",
"More information regarding kafka sink expected data structure [here](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka)\n"
]
},
{
"cell_type": "code",
"execution_count": 21,
"id": "8b08a013-d039-41cf-94fd-a1a57571d25f",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"def prepare_dataframe_to_kafka_sink(df, value_columns, key_column=None):\n",
" columns = df.columns\n",
" df = df.withColumn(\"value\", F.concat_ws(', ',*value_columns)) \n",
" if key_column:\n",
" df = df.withColumnRenamed(key_column,\"key\")\n",
" df = df.withColumn(\"key\",df.key.cast('string'))\n",
" return df.select(['key', 'value'])\n",
" \n",
"def sink_kafka(df, topic, output_mode='append'):\n",
" write_query = df.writeStream \\\n",
" .format(\"kafka\") \\\n",
" .option(\"kafka.bootstrap.servers\", \"localhost:9092,broker:29092\") \\\n",
" .outputMode(output_mode) \\\n",
" .option(\"topic\", topic) \\\n",
" .option(\"checkpointLocation\", \"checkpoint\") \\\n",
" .start()\n",
" return write_query"
]
},
{
"cell_type": "markdown",
"id": "e4cb2140-9f2e-4914-b74c-be4c18cdbe8a",
"metadata": {},
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}