Files
Alexey Grigorev 3fb08caff7 spark notebooks
2022-02-18 23:13:50 +00:00

357 lines
8.6 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "3307b886",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING: An illegal reflective access operation has occurred\n",
"WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alexey/spark/spark-3.0.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)\n",
"WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n",
"WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n",
"WARNING: All illegal access operations will be denied in a future release\n",
"22/02/17 22:43:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
"Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
"Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
]
}
],
"source": [
"import pyspark\n",
"from pyspark.sql import SparkSession\n",
"\n",
"spark = SparkSession.builder \\\n",
" .master(\"local[*]\") \\\n",
" .appName('test') \\\n",
" .getOrCreate()"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "1ee1eb1d",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"df_green = spark.read.parquet('data/pq/green/*/*')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0ca5ee99",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 16,
"id": "649bb4da",
"metadata": {},
"outputs": [],
"source": [
"df_green = df_green \\\n",
" .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \\\n",
" .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "90cd6845",
"metadata": {},
"outputs": [],
"source": [
"df_yellow = spark.read.parquet('data/pq/yellow/*/*')"
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "88822efd",
"metadata": {},
"outputs": [],
"source": [
"df_yellow = df_yellow \\\n",
" .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \\\n",
" .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')"
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "610167a2",
"metadata": {},
"outputs": [],
"source": [
"common_colums = []\n",
"\n",
"yellow_columns = set(df_yellow.columns)\n",
"\n",
"for col in df_green.columns:\n",
" if col in yellow_columns:\n",
" common_colums.append(col)"
]
},
{
"cell_type": "code",
"execution_count": 26,
"id": "839d773f",
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import functions as F"
]
},
{
"cell_type": "code",
"execution_count": 28,
"id": "2498810a",
"metadata": {},
"outputs": [],
"source": [
"df_green_sel = df_green \\\n",
" .select(common_colums) \\\n",
" .withColumn('service_type', F.lit('green'))"
]
},
{
"cell_type": "code",
"execution_count": 29,
"id": "19032efc",
"metadata": {},
"outputs": [],
"source": [
"df_yellow_sel = df_yellow \\\n",
" .select(common_colums) \\\n",
" .withColumn('service_type', F.lit('yellow'))"
]
},
{
"cell_type": "code",
"execution_count": 30,
"id": "f5b0f3d1",
"metadata": {},
"outputs": [],
"source": [
"df_trips_data = df_green_sel.unionAll(df_yellow_sel)"
]
},
{
"cell_type": "code",
"execution_count": 33,
"id": "1bed8b33",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+------------+--------+\n",
"|service_type| count|\n",
"+------------+--------+\n",
"| green| 2304517|\n",
"| yellow|39649199|\n",
"+------------+--------+\n",
"\n"
]
}
],
"source": [
"df_trips_data.groupBy('service_type').count().show()"
]
},
{
"cell_type": "code",
"execution_count": 40,
"id": "28cc8fa3",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['VendorID',\n",
" 'pickup_datetime',\n",
" 'dropoff_datetime',\n",
" 'store_and_fwd_flag',\n",
" 'RatecodeID',\n",
" 'PULocationID',\n",
" 'DOLocationID',\n",
" 'passenger_count',\n",
" 'trip_distance',\n",
" 'fare_amount',\n",
" 'extra',\n",
" 'mta_tax',\n",
" 'tip_amount',\n",
" 'tolls_amount',\n",
" 'improvement_surcharge',\n",
" 'total_amount',\n",
" 'payment_type',\n",
" 'congestion_surcharge',\n",
" 'service_type']"
]
},
"execution_count": 40,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_trips_data.columns"
]
},
{
"cell_type": "code",
"execution_count": 35,
"id": "36e90cbc",
"metadata": {},
"outputs": [],
"source": [
"df_trips_data.registerTempTable('trips_data')"
]
},
{
"cell_type": "code",
"execution_count": 38,
"id": "d0e01bf1",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+------------+--------+\n",
"|service_type|count(1)|\n",
"+------------+--------+\n",
"| green| 2304517|\n",
"| yellow|39649199|\n",
"+------------+--------+\n",
"\n"
]
}
],
"source": [
"spark.sql(\"\"\"\n",
"SELECT\n",
" service_type,\n",
" count(1)\n",
"FROM\n",
" trips_data\n",
"GROUP BY \n",
" service_type\n",
"\"\"\").show()"
]
},
{
"cell_type": "code",
"execution_count": 45,
"id": "b2ee7038",
"metadata": {},
"outputs": [],
"source": [
"df_result = spark.sql(\"\"\"\n",
"SELECT \n",
" -- Reveneue grouping \n",
" PULocationID AS revenue_zone,\n",
" date_trunc('month', pickup_datetime) AS revenue_month, \n",
" service_type, \n",
"\n",
" -- Revenue calculation \n",
" SUM(fare_amount) AS revenue_monthly_fare,\n",
" SUM(extra) AS revenue_monthly_extra,\n",
" SUM(mta_tax) AS revenue_monthly_mta_tax,\n",
" SUM(tip_amount) AS revenue_monthly_tip_amount,\n",
" SUM(tolls_amount) AS revenue_monthly_tolls_amount,\n",
" SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,\n",
" SUM(total_amount) AS revenue_monthly_total_amount,\n",
" SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,\n",
"\n",
" -- Additional calculations\n",
" AVG(passenger_count) AS avg_montly_passenger_count,\n",
" AVG(trip_distance) AS avg_montly_trip_distance\n",
"FROM\n",
" trips_data\n",
"GROUP BY\n",
" 1, 2, 3\n",
"\"\"\")"
]
},
{
"cell_type": "code",
"execution_count": 49,
"id": "f67eeb92",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"df_result.coalesce(1).write.parquet('data/report/revenue/', mode='overwrite')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f56a885d",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}