357 lines
8.6 KiB
Plaintext
357 lines
8.6 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 1,
|
|
"id": "3307b886",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"WARNING: An illegal reflective access operation has occurred\n",
|
|
"WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alexey/spark/spark-3.0.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)\n",
|
|
"WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n",
|
|
"WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n",
|
|
"WARNING: All illegal access operations will be denied in a future release\n",
|
|
"22/02/17 22:43:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
|
|
"Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
|
|
"Setting default log level to \"WARN\".\n",
|
|
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"import pyspark\n",
|
|
"from pyspark.sql import SparkSession\n",
|
|
"\n",
|
|
"spark = SparkSession.builder \\\n",
|
|
" .master(\"local[*]\") \\\n",
|
|
" .appName('test') \\\n",
|
|
" .getOrCreate()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 2,
|
|
"id": "1ee1eb1d",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df_green = spark.read.parquet('data/pq/green/*/*')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "0ca5ee99",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": []
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 16,
|
|
"id": "649bb4da",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df_green = df_green \\\n",
|
|
" .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \\\n",
|
|
" .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 5,
|
|
"id": "90cd6845",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df_yellow = spark.read.parquet('data/pq/yellow/*/*')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 19,
|
|
"id": "88822efd",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df_yellow = df_yellow \\\n",
|
|
" .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \\\n",
|
|
" .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 22,
|
|
"id": "610167a2",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"common_colums = []\n",
|
|
"\n",
|
|
"yellow_columns = set(df_yellow.columns)\n",
|
|
"\n",
|
|
"for col in df_green.columns:\n",
|
|
" if col in yellow_columns:\n",
|
|
" common_colums.append(col)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 26,
|
|
"id": "839d773f",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from pyspark.sql import functions as F"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 28,
|
|
"id": "2498810a",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df_green_sel = df_green \\\n",
|
|
" .select(common_colums) \\\n",
|
|
" .withColumn('service_type', F.lit('green'))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 29,
|
|
"id": "19032efc",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df_yellow_sel = df_yellow \\\n",
|
|
" .select(common_colums) \\\n",
|
|
" .withColumn('service_type', F.lit('yellow'))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 30,
|
|
"id": "f5b0f3d1",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df_trips_data = df_green_sel.unionAll(df_yellow_sel)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 33,
|
|
"id": "1bed8b33",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+------------+--------+\n",
|
|
"|service_type| count|\n",
|
|
"+------------+--------+\n",
|
|
"| green| 2304517|\n",
|
|
"| yellow|39649199|\n",
|
|
"+------------+--------+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df_trips_data.groupBy('service_type').count().show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 40,
|
|
"id": "28cc8fa3",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"['VendorID',\n",
|
|
" 'pickup_datetime',\n",
|
|
" 'dropoff_datetime',\n",
|
|
" 'store_and_fwd_flag',\n",
|
|
" 'RatecodeID',\n",
|
|
" 'PULocationID',\n",
|
|
" 'DOLocationID',\n",
|
|
" 'passenger_count',\n",
|
|
" 'trip_distance',\n",
|
|
" 'fare_amount',\n",
|
|
" 'extra',\n",
|
|
" 'mta_tax',\n",
|
|
" 'tip_amount',\n",
|
|
" 'tolls_amount',\n",
|
|
" 'improvement_surcharge',\n",
|
|
" 'total_amount',\n",
|
|
" 'payment_type',\n",
|
|
" 'congestion_surcharge',\n",
|
|
" 'service_type']"
|
|
]
|
|
},
|
|
"execution_count": 40,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"df_trips_data.columns"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 35,
|
|
"id": "36e90cbc",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df_trips_data.registerTempTable('trips_data')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 38,
|
|
"id": "d0e01bf1",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+------------+--------+\n",
|
|
"|service_type|count(1)|\n",
|
|
"+------------+--------+\n",
|
|
"| green| 2304517|\n",
|
|
"| yellow|39649199|\n",
|
|
"+------------+--------+\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"spark.sql(\"\"\"\n",
|
|
"SELECT\n",
|
|
" service_type,\n",
|
|
" count(1)\n",
|
|
"FROM\n",
|
|
" trips_data\n",
|
|
"GROUP BY \n",
|
|
" service_type\n",
|
|
"\"\"\").show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 45,
|
|
"id": "b2ee7038",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df_result = spark.sql(\"\"\"\n",
|
|
"SELECT \n",
|
|
" -- Reveneue grouping \n",
|
|
" PULocationID AS revenue_zone,\n",
|
|
" date_trunc('month', pickup_datetime) AS revenue_month, \n",
|
|
" service_type, \n",
|
|
"\n",
|
|
" -- Revenue calculation \n",
|
|
" SUM(fare_amount) AS revenue_monthly_fare,\n",
|
|
" SUM(extra) AS revenue_monthly_extra,\n",
|
|
" SUM(mta_tax) AS revenue_monthly_mta_tax,\n",
|
|
" SUM(tip_amount) AS revenue_monthly_tip_amount,\n",
|
|
" SUM(tolls_amount) AS revenue_monthly_tolls_amount,\n",
|
|
" SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,\n",
|
|
" SUM(total_amount) AS revenue_monthly_total_amount,\n",
|
|
" SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,\n",
|
|
"\n",
|
|
" -- Additional calculations\n",
|
|
" AVG(passenger_count) AS avg_montly_passenger_count,\n",
|
|
" AVG(trip_distance) AS avg_montly_trip_distance\n",
|
|
"FROM\n",
|
|
" trips_data\n",
|
|
"GROUP BY\n",
|
|
" 1, 2, 3\n",
|
|
"\"\"\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 49,
|
|
"id": "f67eeb92",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df_result.coalesce(1).write.parquet('data/report/revenue/', mode='overwrite')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "f56a885d",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": []
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "Python 3 (ipykernel)",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.9.7"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 5
|
|
}
|