Files
data-engineering-zoomcamp/week_5_batch_processing/code/07_groupby_join.ipynb
Alexey Grigorev 3fb08caff7 spark notebooks
2022-02-18 23:13:50 +00:00

315 lines
7.5 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "4341e0e6",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING: An illegal reflective access operation has occurred\n",
"WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alexey/spark/spark-3.0.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)\n",
"WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n",
"WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n",
"WARNING: All illegal access operations will be denied in a future release\n",
"22/02/18 21:41:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
"Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
"Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
]
}
],
"source": [
"import pyspark\n",
"from pyspark.sql import SparkSession\n",
"\n",
"spark = SparkSession.builder \\\n",
" .master(\"local[*]\") \\\n",
" .appName('test') \\\n",
" .getOrCreate()"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "cd304aec",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"df_green = spark.read.parquet('data/pq/green/*/*')"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "243991f3",
"metadata": {},
"outputs": [],
"source": [
"df_green.registerTempTable('green')"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "e43764a7",
"metadata": {},
"outputs": [],
"source": [
"df_green_revenue = spark.sql(\"\"\"\n",
"SELECT \n",
" date_trunc('hour', lpep_pickup_datetime) AS hour, \n",
" PULocationID AS zone,\n",
"\n",
" SUM(total_amount) AS amount,\n",
" COUNT(1) AS number_records\n",
"FROM\n",
" green\n",
"WHERE\n",
" lpep_pickup_datetime >= '2020-01-01 00:00:00'\n",
"GROUP BY\n",
" 1, 2\n",
"\"\"\")"
]
},
{
"cell_type": "code",
"execution_count": 26,
"id": "3e00310e",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"df_green_revenue \\\n",
" .repartition(20) \\\n",
" .write.parquet('data/report/revenue/green', mode='overwrite')"
]
},
{
"cell_type": "code",
"execution_count": 20,
"id": "07ebb68c",
"metadata": {},
"outputs": [],
"source": [
"df_yellow = spark.read.parquet('data/pq/yellow/*/*')\n",
"df_yellow.registerTempTable('yellow')"
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "9d5be29d",
"metadata": {},
"outputs": [],
"source": [
"df_yellow_revenue = spark.sql(\"\"\"\n",
"SELECT \n",
" date_trunc('hour', tpep_pickup_datetime) AS hour, \n",
" PULocationID AS zone,\n",
"\n",
" SUM(total_amount) AS amount,\n",
" COUNT(1) AS number_records\n",
"FROM\n",
" yellow\n",
"WHERE\n",
" tpep_pickup_datetime >= '2020-01-01 00:00:00'\n",
"GROUP BY\n",
" 1, 2\n",
"\"\"\")"
]
},
{
"cell_type": "code",
"execution_count": 27,
"id": "8bd9264e",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"df_yellow_revenue \\\n",
" .repartition(20) \\\n",
" .write.parquet('data/report/revenue/yellow', mode='overwrite')"
]
},
{
"cell_type": "code",
"execution_count": 46,
"id": "fd5d74d7",
"metadata": {},
"outputs": [],
"source": [
"df_green_revenue = spark.read.parquet('data/report/revenue/green')\n",
"df_yellow_revenue = spark.read.parquet('data/report/revenue/yellow')"
]
},
{
"cell_type": "code",
"execution_count": 47,
"id": "35015ee6",
"metadata": {},
"outputs": [],
"source": [
"df_green_revenue_tmp = df_green_revenue \\\n",
" .withColumnRenamed('amount', 'green_amount') \\\n",
" .withColumnRenamed('number_records', 'green_number_records')\n",
"\n",
"df_yellow_revenue_tmp = df_yellow_revenue \\\n",
" .withColumnRenamed('amount', 'yellow_amount') \\\n",
" .withColumnRenamed('number_records', 'yellow_number_records')"
]
},
{
"cell_type": "code",
"execution_count": 48,
"id": "ec9f34ea",
"metadata": {},
"outputs": [],
"source": [
"df_join = df_green_revenue_tmp.join(df_yellow_revenue_tmp, on=['hour', 'zone'], how='outer')"
]
},
{
"cell_type": "code",
"execution_count": 50,
"id": "10238be7",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"df_join.write.parquet('data/report/revenue/total', mode='overwrite')"
]
},
{
"cell_type": "code",
"execution_count": 51,
"id": "c3af7169",
"metadata": {},
"outputs": [],
"source": [
"df_join = spark.read.parquet('data/report/revenue/total')"
]
},
{
"cell_type": "code",
"execution_count": 56,
"id": "bc2a6680",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[hour: timestamp, zone: int, green_amount: double, green_number_records: bigint, yellow_amount: double, yellow_number_records: bigint]"
]
},
"execution_count": 56,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_join"
]
},
{
"cell_type": "code",
"execution_count": 54,
"id": "abb46398",
"metadata": {},
"outputs": [],
"source": [
"df_zones = spark.read.parquet('zones/')"
]
},
{
"cell_type": "code",
"execution_count": 57,
"id": "b3cf98a5",
"metadata": {},
"outputs": [],
"source": [
"df_result = df_join.join(df_zones, df_join.zone == df_zones.LocationID)"
]
},
{
"cell_type": "code",
"execution_count": 62,
"id": "5e0614ba",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"df_result.drop('LocationID', 'zone').write.parquet('tmp/revenue-zones')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9f5ca913",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}