677 lines
16 KiB
Plaintext
677 lines
16 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 5,
|
|
"id": "00bc6543",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import pyspark\n",
|
|
"from pyspark.sql import SparkSession\n",
|
|
"from pyspark.sql import types"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 2,
|
|
"id": "cd4a0f3d",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"WARNING: An illegal reflective access operation has occurred\n",
|
|
"WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alexey/spark/spark-3.0.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)\n",
|
|
"WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n",
|
|
"WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n",
|
|
"WARNING: All illegal access operations will be denied in a future release\n",
|
|
"22/03/07 21:55:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
|
|
"Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
|
|
"Setting default log level to \"WARN\".\n",
|
|
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"spark = SparkSession.builder \\\n",
|
|
" .master(\"local[*]\") \\\n",
|
|
" .appName('test') \\\n",
|
|
" .getOrCreate()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 3,
|
|
"id": "eb3e4c36",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"'3.0.3'"
|
|
]
|
|
},
|
|
"execution_count": 3,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"spark.version"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 4,
|
|
"id": "5236cebd",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"-rw-rw-r-- 1 alexey alexey 700M Oct 29 18:53 fhvhv_tripdata_2021-02.csv\r\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"!ls -lh fhvhv_tripdata_2021-02.csv"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 6,
|
|
"id": "0a3399a3",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"schema = types.StructType([\n",
|
|
" types.StructField('hvfhs_license_num', types.StringType(), True),\n",
|
|
" types.StructField('dispatching_base_num', types.StringType(), True),\n",
|
|
" types.StructField('pickup_datetime', types.TimestampType(), True),\n",
|
|
" types.StructField('dropoff_datetime', types.TimestampType(), True),\n",
|
|
" types.StructField('PULocationID', types.IntegerType(), True),\n",
|
|
" types.StructField('DOLocationID', types.IntegerType(), True),\n",
|
|
" types.StructField('SR_Flag', types.StringType(), True)\n",
|
|
"])"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 8,
|
|
"id": "68bc8b72",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df = spark.read \\\n",
|
|
" .option(\"header\", \"true\") \\\n",
|
|
" .schema(schema) \\\n",
|
|
" .csv('fhvhv_tripdata_2021-02.csv')\n",
|
|
"\n",
|
|
"df = df.repartition(24)\n",
|
|
"\n",
|
|
"df.write.parquet('data/pq/fhvhv/2021/02/', compression=)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 16,
|
|
"id": "58989b55",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"\r",
|
|
"[Stage 0:> (0 + 1) / 1]\r",
|
|
"\r",
|
|
" \r"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df = spark.read.parquet('data/pq/fhvhv/2021/02/')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "48b01d2f",
|
|
"metadata": {},
|
|
"source": [
|
|
"**Q3**: How many taxi trips were there on February 15?"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 18,
|
|
"id": "f7489aea",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from pyspark.sql import functions as F"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 24,
|
|
"id": "6c2500fd",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"367170"
|
|
]
|
|
},
|
|
"execution_count": 24,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"df \\\n",
|
|
" .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \\\n",
|
|
" .filter(\"pickup_date = '2021-02-15'\") \\\n",
|
|
" .count()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 25,
|
|
"id": "dd7ae60d",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df.registerTempTable('fhvhv_2021_02')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 28,
|
|
"id": "6d47c147",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"\r",
|
|
"[Stage 20:> (0 + 4) / 4]\r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+--------+\n",
|
|
"|count(1)|\n",
|
|
"+--------+\n",
|
|
"| 367170|\n",
|
|
"+--------+\n",
|
|
"\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"\r",
|
|
"[Stage 20:==============> (1 + 3) / 4]\r",
|
|
"\r",
|
|
" \r"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"spark.sql(\"\"\"\n",
|
|
"SELECT\n",
|
|
" COUNT(1)\n",
|
|
"FROM \n",
|
|
" fhvhv_2021_02\n",
|
|
"WHERE\n",
|
|
" to_date(pickup_datetime) = '2021-02-15';\n",
|
|
"\"\"\").show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "ae3f533b",
|
|
"metadata": {},
|
|
"source": [
|
|
"**Q4**: Longest trip for each day"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 29,
|
|
"id": "7befe422",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"['hvfhs_license_num',\n",
|
|
" 'dispatching_base_num',\n",
|
|
" 'pickup_datetime',\n",
|
|
" 'dropoff_datetime',\n",
|
|
" 'PULocationID',\n",
|
|
" 'DOLocationID',\n",
|
|
" 'SR_Flag']"
|
|
]
|
|
},
|
|
"execution_count": 29,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"df.columns"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 36,
|
|
"id": "279d9161",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"[Stage 37:==============> (1 + 3) / 4]\r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+-----------+-------------+\n",
|
|
"|pickup_date|max(duration)|\n",
|
|
"+-----------+-------------+\n",
|
|
"| 2021-02-11| 75540|\n",
|
|
"| 2021-02-17| 57221|\n",
|
|
"| 2021-02-20| 44039|\n",
|
|
"| 2021-02-03| 40653|\n",
|
|
"| 2021-02-19| 37577|\n",
|
|
"+-----------+-------------+\n",
|
|
"\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"\r",
|
|
"[Stage 38:==================================================> (187 + 4) / 200]\r",
|
|
"\r",
|
|
" \r"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df \\\n",
|
|
" .withColumn('duration', df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')) \\\n",
|
|
" .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \\\n",
|
|
" .groupBy('pickup_date') \\\n",
|
|
" .max('duration') \\\n",
|
|
" .orderBy('max(duration)', ascending=False) \\\n",
|
|
" .limit(5) \\\n",
|
|
" .show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 38,
|
|
"id": "74cf0e8b",
|
|
"metadata": {
|
|
"scrolled": true
|
|
},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"\r",
|
|
"[Stage 43:> (0 + 4) / 4]\r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+-----------+-----------------+\n",
|
|
"|pickup_date| duration|\n",
|
|
"+-----------+-----------------+\n",
|
|
"| 2021-02-11| 1259.0|\n",
|
|
"| 2021-02-17|953.6833333333333|\n",
|
|
"| 2021-02-20|733.9833333333333|\n",
|
|
"| 2021-02-03| 677.55|\n",
|
|
"| 2021-02-19|626.2833333333333|\n",
|
|
"| 2021-02-25| 583.5|\n",
|
|
"| 2021-02-18|576.8666666666667|\n",
|
|
"| 2021-02-10|569.4833333333333|\n",
|
|
"| 2021-02-21| 537.05|\n",
|
|
"| 2021-02-09|534.7833333333333|\n",
|
|
"+-----------+-----------------+\n",
|
|
"\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"\r",
|
|
"[Stage 44:================================================> (180 + 4) / 200]\r",
|
|
"\r",
|
|
" \r"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"spark.sql(\"\"\"\n",
|
|
"SELECT\n",
|
|
" to_date(pickup_datetime) AS pickup_date,\n",
|
|
" MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 60) AS duration\n",
|
|
"FROM \n",
|
|
" fhvhv_2021_02\n",
|
|
"GROUP BY\n",
|
|
" 1\n",
|
|
"ORDER BY\n",
|
|
" 2 DESC\n",
|
|
"LIMIT 10;\n",
|
|
"\"\"\").show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "d915096b",
|
|
"metadata": {},
|
|
"source": [
|
|
"**Q5**: Most frequent `dispatching_base_num`\n",
|
|
"\n",
|
|
"How many stages this spark job has?\n",
|
|
"\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 44,
|
|
"id": "25816aa2",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"\r",
|
|
"[Stage 73:> (0 + 4) / 4]\r",
|
|
"\r",
|
|
"[Stage 73:==============> (1 + 3) / 4]\r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+--------------------+--------+\n",
|
|
"|dispatching_base_num|count(1)|\n",
|
|
"+--------------------+--------+\n",
|
|
"| B02510| 3233664|\n",
|
|
"| B02764| 965568|\n",
|
|
"| B02872| 882689|\n",
|
|
"| B02875| 685390|\n",
|
|
"| B02765| 559768|\n",
|
|
"+--------------------+--------+\n",
|
|
"\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"\r",
|
|
"[Stage 74:===================================================> (189 + 5) / 200]\r",
|
|
"\r",
|
|
" \r"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"spark.sql(\"\"\"\n",
|
|
"SELECT\n",
|
|
" dispatching_base_num,\n",
|
|
" COUNT(1)\n",
|
|
"FROM \n",
|
|
" fhvhv_2021_02\n",
|
|
"GROUP BY\n",
|
|
" 1\n",
|
|
"ORDER BY\n",
|
|
" 2 DESC\n",
|
|
"LIMIT 5;\n",
|
|
"\"\"\").show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 46,
|
|
"id": "a78f9fe3",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"\r",
|
|
"[Stage 86:> (0 + 4) / 4]\r",
|
|
"\r",
|
|
"[Stage 86:=============================> (2 + 2) / 4]\r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+--------------------+-------+\n",
|
|
"|dispatching_base_num| count|\n",
|
|
"+--------------------+-------+\n",
|
|
"| B02510|3233664|\n",
|
|
"| B02764| 965568|\n",
|
|
"| B02872| 882689|\n",
|
|
"| B02875| 685390|\n",
|
|
"| B02765| 559768|\n",
|
|
"+--------------------+-------+\n",
|
|
"\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"\r",
|
|
"[Stage 87:===========================================> (161 + 5) / 200]\r",
|
|
"\r",
|
|
" \r"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"df \\\n",
|
|
" .groupBy('dispatching_base_num') \\\n",
|
|
" .count() \\\n",
|
|
" .orderBy('count', ascending=False) \\\n",
|
|
" .limit(5) \\\n",
|
|
" .show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"id": "0d10173a",
|
|
"metadata": {},
|
|
"source": [
|
|
"**Q6**: Most common locations pair"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 47,
|
|
"id": "74b7f664",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df_zones = spark.read.parquet('zones')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 49,
|
|
"id": "81642d3b",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"['LocationID', 'Borough', 'Zone', 'service_zone']"
|
|
]
|
|
},
|
|
"execution_count": 49,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"df_zones.columns"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 51,
|
|
"id": "4f460dda",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/plain": [
|
|
"['hvfhs_license_num',\n",
|
|
" 'dispatching_base_num',\n",
|
|
" 'pickup_datetime',\n",
|
|
" 'dropoff_datetime',\n",
|
|
" 'PULocationID',\n",
|
|
" 'DOLocationID',\n",
|
|
" 'SR_Flag']"
|
|
]
|
|
},
|
|
"execution_count": 51,
|
|
"metadata": {},
|
|
"output_type": "execute_result"
|
|
}
|
|
],
|
|
"source": [
|
|
"df.columns"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 50,
|
|
"id": "ad8f0101",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df_zones.registerTempTable('zones')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 57,
|
|
"id": "6f738414",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"[Stage 103:==============================================> (176 + 4) / 200]\r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"+--------------------+--------+\n",
|
|
"| pu_do_pair|count(1)|\n",
|
|
"+--------------------+--------+\n",
|
|
"|East New York / E...| 45041|\n",
|
|
"|Borough Park / Bo...| 37329|\n",
|
|
"| Canarsie / Canarsie| 28026|\n",
|
|
"|Crown Heights Nor...| 25976|\n",
|
|
"|Bay Ridge / Bay R...| 17934|\n",
|
|
"+--------------------+--------+\n",
|
|
"\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"\r",
|
|
" \r"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"spark.sql(\"\"\"\n",
|
|
"SELECT\n",
|
|
" CONCAT(pul.Zone, ' / ', dol.Zone) AS pu_do_pair,\n",
|
|
" COUNT(1)\n",
|
|
"FROM \n",
|
|
" fhvhv_2021_02 fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID\n",
|
|
" LEFT JOIN zones dol ON fhv.DOLocationID = dol.LocationID\n",
|
|
"GROUP BY \n",
|
|
" 1\n",
|
|
"ORDER BY\n",
|
|
" 2 DESC\n",
|
|
"LIMIT 5;\n",
|
|
"\"\"\").show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "e4b754d1",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": []
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "Python 3 (ipykernel)",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.9.7"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 5
|
|
}
|