661 lines
25 KiB
Plaintext
661 lines
25 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "c4419168-c0e6-4a65-b56e-8454c42060ac",
|
|
"metadata": {
|
|
"jp-MarkdownHeadingCollapsed": true,
|
|
"tags": []
|
|
},
|
|
"source": [
|
|
"### 0. Spark Setup"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "32bd7cdd-8504-4a54-a461-244bf7878d2a",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import os\n",
|
|
"os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.spark:spark-avro_2.12:3.3.1 pyspark-shell'"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 2,
|
|
"id": "3aab2a7e-a685-4925-9c9a-b5adf201af77",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
":: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Ivy Default Cache set to: /root/.ivy2/cache\n",
|
|
"The jars for the packages stored in: /root/.ivy2/jars\n",
|
|
"org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency\n",
|
|
"org.apache.spark#spark-avro_2.12 added as a dependency\n",
|
|
":: resolving dependencies :: org.apache.spark#spark-submit-parent-5a3a4db6-be91-4d32-9884-8b0f38241b3f;1.0\n",
|
|
"\tconfs: [default]\n",
|
|
"\tfound org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 in central\n",
|
|
"\tfound org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 in central\n",
|
|
"\tfound org.apache.kafka#kafka-clients;2.8.1 in central\n",
|
|
"\tfound org.lz4#lz4-java;1.8.0 in central\n",
|
|
"\tfound org.xerial.snappy#snappy-java;1.1.8.4 in central\n",
|
|
"\tfound org.slf4j#slf4j-api;1.7.32 in central\n",
|
|
"\tfound org.apache.hadoop#hadoop-client-runtime;3.3.2 in central\n",
|
|
"\tfound org.spark-project.spark#unused;1.0.0 in central\n",
|
|
"\tfound org.apache.hadoop#hadoop-client-api;3.3.2 in central\n",
|
|
"\tfound commons-logging#commons-logging;1.1.3 in central\n",
|
|
"\tfound com.google.code.findbugs#jsr305;3.0.0 in central\n",
|
|
"\tfound org.apache.commons#commons-pool2;2.11.1 in central\n",
|
|
"\tfound org.apache.spark#spark-avro_2.12;3.3.1 in central\n",
|
|
"\tfound org.tukaani#xz;1.8 in central\n",
|
|
":: resolution report :: resolve 544ms :: artifacts dl 11ms\n",
|
|
"\t:: modules in use:\n",
|
|
"\tcom.google.code.findbugs#jsr305;3.0.0 from central in [default]\n",
|
|
"\tcommons-logging#commons-logging;1.1.3 from central in [default]\n",
|
|
"\torg.apache.commons#commons-pool2;2.11.1 from central in [default]\n",
|
|
"\torg.apache.hadoop#hadoop-client-api;3.3.2 from central in [default]\n",
|
|
"\torg.apache.hadoop#hadoop-client-runtime;3.3.2 from central in [default]\n",
|
|
"\torg.apache.kafka#kafka-clients;2.8.1 from central in [default]\n",
|
|
"\torg.apache.spark#spark-avro_2.12;3.3.1 from central in [default]\n",
|
|
"\torg.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 from central in [default]\n",
|
|
"\torg.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 from central in [default]\n",
|
|
"\torg.lz4#lz4-java;1.8.0 from central in [default]\n",
|
|
"\torg.slf4j#slf4j-api;1.7.32 from central in [default]\n",
|
|
"\torg.spark-project.spark#unused;1.0.0 from central in [default]\n",
|
|
"\torg.tukaani#xz;1.8 from central in [default]\n",
|
|
"\torg.xerial.snappy#snappy-java;1.1.8.4 from central in [default]\n",
|
|
"\t---------------------------------------------------------------------\n",
|
|
"\t| | modules || artifacts |\n",
|
|
"\t| conf | number| search|dwnlded|evicted|| number|dwnlded|\n",
|
|
"\t---------------------------------------------------------------------\n",
|
|
"\t| default | 14 | 0 | 0 | 0 || 14 | 0 |\n",
|
|
"\t---------------------------------------------------------------------\n",
|
|
":: retrieving :: org.apache.spark#spark-submit-parent-5a3a4db6-be91-4d32-9884-8b0f38241b3f\n",
|
|
"\tconfs: [default]\n",
|
|
"\t0 artifacts copied, 14 already retrieved (0kB/8ms)\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"23/02/21 21:20:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Setting default log level to \"WARN\".\n",
|
|
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"from pyspark.sql import SparkSession\n",
|
|
"import pyspark.sql.types as T\n",
|
|
"import pyspark.sql.functions as F\n",
|
|
"\n",
|
|
"spark = SparkSession \\\n",
|
|
" .builder \\\n",
|
|
" .appName(\"Spark-Notebook\") \\\n",
|
|
" .getOrCreate()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "6f4b62fa-b3ce-4a1b-a1f4-2ed332a0d55a",
|
|
"metadata": {
|
|
"jp-MarkdownHeadingCollapsed": true,
|
|
"tags": []
|
|
},
|
|
"source": [
|
|
"### 1. Reading from Kafka Stream\n",
|
|
"\n",
|
|
"through `readStream`"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "f491fa45-4471-4bc5-92f7-48081f687140",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### 1.1 Raw Kafka Stream"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 3,
|
|
"id": "82c25cb2-2599-4f9b-8849-967fbb604a44",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [],
|
|
"source": [
|
|
"# default for startingOffsets is \"latest\"\n",
|
|
"df_kafka_raw = spark \\\n",
|
|
" .readStream \\\n",
|
|
" .format(\"kafka\") \\\n",
|
|
" .option(\"kafka.bootstrap.servers\", \"localhost:9092,broker:29092\") \\\n",
|
|
" .option(\"subscribe\", \"rides_csv\") \\\n",
|
|
" .option(\"startingOffsets\", \"earliest\") \\\n",
|
|
" .option(\"checkpointLocation\", \"checkpoint\") \\\n",
|
|
" .load()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 4,
|
|
"id": "d9149ccd-69b2-4f5b-afc0-43567673c634",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"root\n",
|
|
" |-- key: binary (nullable = true)\n",
|
|
" |-- value: binary (nullable = true)\n",
|
|
" |-- topic: string (nullable = true)\n",
|
|
" |-- partition: integer (nullable = true)\n",
|
|
" |-- offset: long (nullable = true)\n",
|
|
" |-- timestamp: timestamp (nullable = true)\n",
|
|
" |-- timestampType: integer (nullable = true)\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df_kafka_raw.printSchema()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "62e5e753-89c7-460f-a8be-16868ce5c680",
|
|
"metadata": {
|
|
"jp-MarkdownHeadingCollapsed": true,
|
|
"tags": []
|
|
},
|
|
"source": [
|
|
"#### 1.2 Encoded Kafka Stream"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 5,
|
|
"id": "0b745eed-7d74-421e-8e4b-c8343fda4de3",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [],
|
|
"source": [
|
|
"df_kafka_encoded = df_kafka_raw.selectExpr(\"CAST(key AS STRING)\",\"CAST(value AS STRING)\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 6,
|
|
"id": "6839addc-c7c0-4117-8c9c-d2cd59cbf136",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"root\n",
|
|
" |-- key: string (nullable = true)\n",
|
|
" |-- value: string (nullable = true)\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df_kafka_encoded.printSchema()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "6749c4de-6f80-4b91-b2b8-b2968c761d75",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### 1.3 Structure Streaming DataFrame"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 7,
|
|
"id": "ca20ae37-49f0-421f-9859-73fac8d4ca45",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [],
|
|
"source": [
|
|
"def parse_ride_from_kafka_message(df_raw, schema):\n",
|
|
" \"\"\" take a Spark Streaming df and parse value col based on <schema>, return streaming df cols in schema \"\"\"\n",
|
|
" assert df_raw.isStreaming is True, \"DataFrame doesn't receive streaming data\"\n",
|
|
"\n",
|
|
" df = df_raw.selectExpr(\"CAST(key AS STRING)\", \"CAST(value AS STRING)\")\n",
|
|
"\n",
|
|
" # split attributes to nested array in one Column\n",
|
|
" col = F.split(df['value'], ', ')\n",
|
|
"\n",
|
|
" # expand col to multiple top-level columns\n",
|
|
" for idx, field in enumerate(schema):\n",
|
|
" df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))\n",
|
|
" return df.select([field.name for field in schema])"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 8,
|
|
"id": "e1737bd0-146f-4ee2-a70f-a4657af5bbc6",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [],
|
|
"source": [
|
|
"ride_schema = T.StructType(\n",
|
|
" [T.StructField(\"vendor_id\", T.IntegerType()),\n",
|
|
" T.StructField('tpep_pickup_datetime', T.TimestampType()),\n",
|
|
" T.StructField('tpep_dropoff_datetime', T.TimestampType()),\n",
|
|
" T.StructField(\"passenger_count\", T.IntegerType()),\n",
|
|
" T.StructField(\"trip_distance\", T.FloatType()),\n",
|
|
" T.StructField(\"payment_type\", T.IntegerType()),\n",
|
|
" T.StructField(\"total_amount\", T.FloatType()),\n",
|
|
" ])"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 9,
|
|
"id": "ae2ce896-f54b-4166-b01f-b5532ab292fe",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [],
|
|
"source": [
|
|
"df_rides = parse_ride_from_kafka_message(df_raw=df_kafka_raw, schema=ride_schema)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 10,
|
|
"id": "cd848228-97c5-4325-8457-97f35e533cd8",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"root\n",
|
|
" |-- vendor_id: integer (nullable = true)\n",
|
|
" |-- tpep_pickup_datetime: timestamp (nullable = true)\n",
|
|
" |-- tpep_dropoff_datetime: timestamp (nullable = true)\n",
|
|
" |-- passenger_count: integer (nullable = true)\n",
|
|
" |-- trip_distance: float (nullable = true)\n",
|
|
" |-- payment_type: integer (nullable = true)\n",
|
|
" |-- total_amount: float (nullable = true)\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df_rides.printSchema()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "60277fdc-2797-4b23-9ecf-956b76db5778",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"source": [
|
|
"### 2 Sink Operation & Streaming Query\n",
|
|
"\n",
|
|
"through `writeStream`\n",
|
|
"\n",
|
|
"---\n",
|
|
"**Output Sinks**\n",
|
|
"- File Sink: stores the output to the directory\n",
|
|
"- Kafka Sink: stores the output to one or more topics in Kafka\n",
|
|
"- Foreach Sink:\n",
|
|
"- (for debugging) Console Sink, Memory Sink\n",
|
|
"\n",
|
|
"Further details can be found in [Output Sinks](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks)\n",
|
|
"\n",
|
|
"---\n",
|
|
"There are three types of **Output Modes**:\n",
|
|
"- Complete: The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.\n",
|
|
"- Append (default): Only new rows are added to the Result Table\n",
|
|
"- Update: Only updated rows are outputted\n",
|
|
"\n",
|
|
"[Output Modes](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes) differs based on the set of transformations applied to the streaming data. \n",
|
|
"\n",
|
|
"--- \n",
|
|
"**Triggers**\n",
|
|
"\n",
|
|
"The [trigger settings](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) of a streaming query define the timing of streaming data processing. Spark streaming support micro-batch streamings schema and you can select following options based on requirements.\n",
|
|
"\n",
|
|
"- default-micro-batch-mode\n",
|
|
"- fixed-interval-micro-batch-mode\n",
|
|
"- one-time-micro-batch-mode\n",
|
|
"- available-now-micro-batch-mode\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "02ca9b08-aa61-46cd-b946-4457ce2cdf5d",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"source": [
|
|
"#### Console and Memory Sink"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 11,
|
|
"id": "74c72469-4c37-417c-a866-a1c1ef75ae8b",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [],
|
|
"source": [
|
|
"def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):\n",
|
|
" write_query = df.writeStream \\\n",
|
|
" .outputMode(output_mode) \\\n",
|
|
" .trigger(processingTime=processing_time) \\\n",
|
|
" .format(\"console\") \\\n",
|
|
" .option(\"truncate\", False) \\\n",
|
|
" .start()\n",
|
|
" return write_query # pyspark.sql.streaming.StreamingQuery"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 22,
|
|
"id": "d866c7ba-f8e9-475d-830a-50ffb2c5472b",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"23/02/21 21:46:12 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-289a958e-f6b6-4b38-a87b-50002d82ec8b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.\n",
|
|
"23/02/21 21:46:12 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.\n",
|
|
"23/02/21 21:46:12 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-driver-0-3, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-driver-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.\n",
|
|
"23/02/21 21:46:12 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-driver-0-3, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-driver-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected\n",
|
|
"23/02/21 21:46:13 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor-4, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.\n",
|
|
"23/02/21 21:46:13 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor-4, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected\n",
|
|
"-------------------------------------------\n",
|
|
"Batch: 0\n",
|
|
"-------------------------------------------\n",
|
|
"+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
|
|
"|vendor_id|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|payment_type|total_amount|\n",
|
|
"+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
|
|
"|1 |2020-07-01 00:25:32 |2020-07-01 00:33:39 |1 |1.5 |2 |9.3 |\n",
|
|
"|1 |2020-07-01 00:03:19 |2020-07-01 00:25:43 |1 |9.5 |1 |27.8 |\n",
|
|
"|2 |2020-07-01 00:15:11 |2020-07-01 00:29:24 |1 |5.85 |2 |22.3 |\n",
|
|
"|2 |2020-07-01 00:30:49 |2020-07-01 00:38:26 |1 |1.9 |1 |14.16 |\n",
|
|
"|2 |2020-07-01 00:31:26 |2020-07-01 00:38:02 |1 |1.25 |2 |7.8 |\n",
|
|
"|1 |2020-07-01 00:25:32 |2020-07-01 00:33:39 |1 |1.5 |2 |9.3 |\n",
|
|
"|1 |2020-07-01 00:03:19 |2020-07-01 00:25:43 |1 |9.5 |1 |27.8 |\n",
|
|
"|2 |2020-07-01 00:15:11 |2020-07-01 00:29:24 |1 |5.85 |2 |22.3 |\n",
|
|
"|2 |2020-07-01 00:30:49 |2020-07-01 00:38:26 |1 |1.9 |1 |14.16 |\n",
|
|
"|2 |2020-07-01 00:31:26 |2020-07-01 00:38:02 |1 |1.25 |2 |7.8 |\n",
|
|
"+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
|
|
"\n",
|
|
"23/02/21 22:11:05 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor-5, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.\n",
|
|
"23/02/21 22:11:05 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor-5, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"-------------------------------------------\n",
|
|
"Batch: 1\n",
|
|
"-------------------------------------------\n",
|
|
"+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
|
|
"|vendor_id|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|payment_type|total_amount|\n",
|
|
"+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
|
|
"|1 |2020-07-01 00:25:32 |2020-07-01 00:33:39 |1 |1.5 |2 |9.3 |\n",
|
|
"|1 |2020-07-01 00:03:19 |2020-07-01 00:25:43 |1 |9.5 |1 |27.8 |\n",
|
|
"|2 |2020-07-01 00:15:11 |2020-07-01 00:29:24 |1 |5.85 |2 |22.3 |\n",
|
|
"|2 |2020-07-01 00:30:49 |2020-07-01 00:38:26 |1 |1.9 |1 |14.16 |\n",
|
|
"|2 |2020-07-01 00:31:26 |2020-07-01 00:38:02 |1 |1.25 |2 |7.8 |\n",
|
|
"+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"write_query = sink_console(df_rides, output_mode='append')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 15,
|
|
"id": "a9bfa73f-a8cc-4988-a8cf-bf31ee6c449c",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [],
|
|
"source": [
|
|
"def sink_memory(df, query_name, query_template):\n",
|
|
" write_query = df \\\n",
|
|
" .writeStream \\\n",
|
|
" .queryName(query_name) \\\n",
|
|
" .format('memory') \\\n",
|
|
" .start()\n",
|
|
" query_str = query_template.format(table_name=query_name)\n",
|
|
" query_results = spark.sql(query_str)\n",
|
|
" return write_query, query_results"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 16,
|
|
"id": "b31d0b76-e917-44e7-a14d-f9ce6901c23a",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"23/02/21 21:31:47 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b3e2c096-aa06-4083-9cdf-d6f3cf04fc06. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.\n",
|
|
"23/02/21 21:31:47 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.\n",
|
|
"23/02/21 21:31:48 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-driver-0-1, groupId=spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-driver-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.\n",
|
|
"23/02/21 21:31:48 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-driver-0-1, groupId=spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-driver-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected\n",
|
|
"23/02/21 21:31:49 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-executor-2, groupId=spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-executor] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.\n",
|
|
"23/02/21 21:31:49 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-executor-2, groupId=spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-executor] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"query_name = 'vendor_id_counts'\n",
|
|
"query_template = 'select count(distinct(vendor_id)) from {table_name}'\n",
|
|
"write_query, df_vendor_id_counts = sink_memory(df=df_rides, query_name=query_name, query_template=query_template)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 18,
|
|
"id": "4ba56111-83bf-4028-ac65-565e0190f310",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"<class 'pyspark.sql.streaming.StreamingQuery'>\n"
|
|
]
|
|
},
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"{'message': 'Waiting for data to arrive',\n",
|
|
" 'isDataAvailable': False,\n",
|
|
" 'isTriggerActive': True}"
|
|
]
|
|
},
|
|
"execution_count": 18,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"print(type(write_query)) # pyspark.sql.streaming.StreamingQuery\n",
|
|
"write_query.status"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 19,
|
|
"id": "7cc37bda-9cfa-402b-9d42-a6ba5271476b",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+-------------------------+\n",
|
|
"|count(DISTINCT vendor_id)|\n",
|
|
"+-------------------------+\n",
|
|
"| 2|\n",
|
|
"+-------------------------+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df_vendor_id_counts.show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 20,
|
|
"id": "88862ca9-4d89-487e-987f-08a2b9e83efe",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [],
|
|
"source": [
|
|
"write_query.stop()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "443d4041-06db-4a4a-89c1-348848cc7ca8",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"source": [
|
|
"#### Kafka Sink\n",
|
|
"\n",
|
|
"To write stream results to `kafka-topic`, the stream dataframe has at least a column with name `value`.\n",
|
|
"\n",
|
|
"Therefore before starting `writeStream` in kafka format, dataframe needs to be updated accordingly.\n",
|
|
"\n",
|
|
"More information regarding kafka sink expected data structure [here](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka)\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 21,
|
|
"id": "8b08a013-d039-41cf-94fd-a1a57571d25f",
|
|
"metadata": {
|
|
"tags": []
|
|
},
|
|
"outputs": [],
|
|
"source": [
|
|
"def prepare_dataframe_to_kafka_sink(df, value_columns, key_column=None):\n",
|
|
" columns = df.columns\n",
|
|
" df = df.withColumn(\"value\", F.concat_ws(', ',*value_columns)) \n",
|
|
" if key_column:\n",
|
|
" df = df.withColumnRenamed(key_column,\"key\")\n",
|
|
" df = df.withColumn(\"key\",df.key.cast('string'))\n",
|
|
" return df.select(['key', 'value'])\n",
|
|
" \n",
|
|
"def sink_kafka(df, topic, output_mode='append'):\n",
|
|
" write_query = df.writeStream \\\n",
|
|
" .format(\"kafka\") \\\n",
|
|
" .option(\"kafka.bootstrap.servers\", \"localhost:9092,broker:29092\") \\\n",
|
|
" .outputMode(output_mode) \\\n",
|
|
" .option(\"topic\", topic) \\\n",
|
|
" .option(\"checkpointLocation\", \"checkpoint\") \\\n",
|
|
" .start()\n",
|
|
" return write_query"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "e4cb2140-9f2e-4914-b74c-be4c18cdbe8a",
|
|
"metadata": {},
|
|
"source": []
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "Python 3 (ipykernel)",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.10.6"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 5
|
|
}
|