Files
Alexey Grigorev a868f23066 homework 5
2022-03-07 22:27:18 +00:00

677 lines
16 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 5,
"id": "00bc6543",
"metadata": {},
"outputs": [],
"source": [
"import pyspark\n",
"from pyspark.sql import SparkSession\n",
"from pyspark.sql import types"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "cd4a0f3d",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING: An illegal reflective access operation has occurred\n",
"WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alexey/spark/spark-3.0.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)\n",
"WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n",
"WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n",
"WARNING: All illegal access operations will be denied in a future release\n",
"22/03/07 21:55:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
"Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
"Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
]
}
],
"source": [
"spark = SparkSession.builder \\\n",
" .master(\"local[*]\") \\\n",
" .appName('test') \\\n",
" .getOrCreate()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "eb3e4c36",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'3.0.3'"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"spark.version"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "5236cebd",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-rw-rw-r-- 1 alexey alexey 700M Oct 29 18:53 fhvhv_tripdata_2021-02.csv\r\n"
]
}
],
"source": [
"!ls -lh fhvhv_tripdata_2021-02.csv"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "0a3399a3",
"metadata": {},
"outputs": [],
"source": [
"schema = types.StructType([\n",
" types.StructField('hvfhs_license_num', types.StringType(), True),\n",
" types.StructField('dispatching_base_num', types.StringType(), True),\n",
" types.StructField('pickup_datetime', types.TimestampType(), True),\n",
" types.StructField('dropoff_datetime', types.TimestampType(), True),\n",
" types.StructField('PULocationID', types.IntegerType(), True),\n",
" types.StructField('DOLocationID', types.IntegerType(), True),\n",
" types.StructField('SR_Flag', types.StringType(), True)\n",
"])"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "68bc8b72",
"metadata": {},
"outputs": [],
"source": [
"df = spark.read \\\n",
" .option(\"header\", \"true\") \\\n",
" .schema(schema) \\\n",
" .csv('fhvhv_tripdata_2021-02.csv')\n",
"\n",
"df = df.repartition(24)\n",
"\n",
"df.write.parquet('data/pq/fhvhv/2021/02/', compression=)"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "58989b55",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"[Stage 0:> (0 + 1) / 1]\r",
"\r",
" \r"
]
}
],
"source": [
"df = spark.read.parquet('data/pq/fhvhv/2021/02/')"
]
},
{
"cell_type": "markdown",
"id": "48b01d2f",
"metadata": {},
"source": [
"**Q3**: How many taxi trips were there on February 15?"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "f7489aea",
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import functions as F"
]
},
{
"cell_type": "code",
"execution_count": 24,
"id": "6c2500fd",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"data": {
"text/plain": [
"367170"
]
},
"execution_count": 24,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df \\\n",
" .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \\\n",
" .filter(\"pickup_date = '2021-02-15'\") \\\n",
" .count()"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "dd7ae60d",
"metadata": {},
"outputs": [],
"source": [
"df.registerTempTable('fhvhv_2021_02')"
]
},
{
"cell_type": "code",
"execution_count": 28,
"id": "6d47c147",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"[Stage 20:> (0 + 4) / 4]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+\n",
"|count(1)|\n",
"+--------+\n",
"| 367170|\n",
"+--------+\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"[Stage 20:==============> (1 + 3) / 4]\r",
"\r",
" \r"
]
}
],
"source": [
"spark.sql(\"\"\"\n",
"SELECT\n",
" COUNT(1)\n",
"FROM \n",
" fhvhv_2021_02\n",
"WHERE\n",
" to_date(pickup_datetime) = '2021-02-15';\n",
"\"\"\").show()"
]
},
{
"cell_type": "markdown",
"id": "ae3f533b",
"metadata": {},
"source": [
"**Q4**: Longest trip for each day"
]
},
{
"cell_type": "code",
"execution_count": 29,
"id": "7befe422",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['hvfhs_license_num',\n",
" 'dispatching_base_num',\n",
" 'pickup_datetime',\n",
" 'dropoff_datetime',\n",
" 'PULocationID',\n",
" 'DOLocationID',\n",
" 'SR_Flag']"
]
},
"execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.columns"
]
},
{
"cell_type": "code",
"execution_count": 36,
"id": "279d9161",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 37:==============> (1 + 3) / 4]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------+-------------+\n",
"|pickup_date|max(duration)|\n",
"+-----------+-------------+\n",
"| 2021-02-11| 75540|\n",
"| 2021-02-17| 57221|\n",
"| 2021-02-20| 44039|\n",
"| 2021-02-03| 40653|\n",
"| 2021-02-19| 37577|\n",
"+-----------+-------------+\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"[Stage 38:==================================================> (187 + 4) / 200]\r",
"\r",
" \r"
]
}
],
"source": [
"df \\\n",
" .withColumn('duration', df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')) \\\n",
" .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \\\n",
" .groupBy('pickup_date') \\\n",
" .max('duration') \\\n",
" .orderBy('max(duration)', ascending=False) \\\n",
" .limit(5) \\\n",
" .show()"
]
},
{
"cell_type": "code",
"execution_count": 38,
"id": "74cf0e8b",
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"[Stage 43:> (0 + 4) / 4]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------+-----------------+\n",
"|pickup_date| duration|\n",
"+-----------+-----------------+\n",
"| 2021-02-11| 1259.0|\n",
"| 2021-02-17|953.6833333333333|\n",
"| 2021-02-20|733.9833333333333|\n",
"| 2021-02-03| 677.55|\n",
"| 2021-02-19|626.2833333333333|\n",
"| 2021-02-25| 583.5|\n",
"| 2021-02-18|576.8666666666667|\n",
"| 2021-02-10|569.4833333333333|\n",
"| 2021-02-21| 537.05|\n",
"| 2021-02-09|534.7833333333333|\n",
"+-----------+-----------------+\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"[Stage 44:================================================> (180 + 4) / 200]\r",
"\r",
" \r"
]
}
],
"source": [
"spark.sql(\"\"\"\n",
"SELECT\n",
" to_date(pickup_datetime) AS pickup_date,\n",
" MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 60) AS duration\n",
"FROM \n",
" fhvhv_2021_02\n",
"GROUP BY\n",
" 1\n",
"ORDER BY\n",
" 2 DESC\n",
"LIMIT 10;\n",
"\"\"\").show()"
]
},
{
"cell_type": "markdown",
"id": "d915096b",
"metadata": {},
"source": [
"**Q5**: Most frequent `dispatching_base_num`\n",
"\n",
"How many stages this spark job has?\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 44,
"id": "25816aa2",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"[Stage 73:> (0 + 4) / 4]\r",
"\r",
"[Stage 73:==============> (1 + 3) / 4]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+--------+\n",
"|dispatching_base_num|count(1)|\n",
"+--------------------+--------+\n",
"| B02510| 3233664|\n",
"| B02764| 965568|\n",
"| B02872| 882689|\n",
"| B02875| 685390|\n",
"| B02765| 559768|\n",
"+--------------------+--------+\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"[Stage 74:===================================================> (189 + 5) / 200]\r",
"\r",
" \r"
]
}
],
"source": [
"spark.sql(\"\"\"\n",
"SELECT\n",
" dispatching_base_num,\n",
" COUNT(1)\n",
"FROM \n",
" fhvhv_2021_02\n",
"GROUP BY\n",
" 1\n",
"ORDER BY\n",
" 2 DESC\n",
"LIMIT 5;\n",
"\"\"\").show()"
]
},
{
"cell_type": "code",
"execution_count": 46,
"id": "a78f9fe3",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"[Stage 86:> (0 + 4) / 4]\r",
"\r",
"[Stage 86:=============================> (2 + 2) / 4]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+-------+\n",
"|dispatching_base_num| count|\n",
"+--------------------+-------+\n",
"| B02510|3233664|\n",
"| B02764| 965568|\n",
"| B02872| 882689|\n",
"| B02875| 685390|\n",
"| B02765| 559768|\n",
"+--------------------+-------+\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"[Stage 87:===========================================> (161 + 5) / 200]\r",
"\r",
" \r"
]
}
],
"source": [
"df \\\n",
" .groupBy('dispatching_base_num') \\\n",
" .count() \\\n",
" .orderBy('count', ascending=False) \\\n",
" .limit(5) \\\n",
" .show()"
]
},
{
"cell_type": "markdown",
"id": "0d10173a",
"metadata": {},
"source": [
"**Q6**: Most common locations pair"
]
},
{
"cell_type": "code",
"execution_count": 47,
"id": "74b7f664",
"metadata": {},
"outputs": [],
"source": [
"df_zones = spark.read.parquet('zones')"
]
},
{
"cell_type": "code",
"execution_count": 49,
"id": "81642d3b",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['LocationID', 'Borough', 'Zone', 'service_zone']"
]
},
"execution_count": 49,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_zones.columns"
]
},
{
"cell_type": "code",
"execution_count": 51,
"id": "4f460dda",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['hvfhs_license_num',\n",
" 'dispatching_base_num',\n",
" 'pickup_datetime',\n",
" 'dropoff_datetime',\n",
" 'PULocationID',\n",
" 'DOLocationID',\n",
" 'SR_Flag']"
]
},
"execution_count": 51,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.columns"
]
},
{
"cell_type": "code",
"execution_count": 50,
"id": "ad8f0101",
"metadata": {},
"outputs": [],
"source": [
"df_zones.registerTempTable('zones')"
]
},
{
"cell_type": "code",
"execution_count": 57,
"id": "6f738414",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 103:==============================================> (176 + 4) / 200]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+--------+\n",
"| pu_do_pair|count(1)|\n",
"+--------------------+--------+\n",
"|East New York / E...| 45041|\n",
"|Borough Park / Bo...| 37329|\n",
"| Canarsie / Canarsie| 28026|\n",
"|Crown Heights Nor...| 25976|\n",
"|Bay Ridge / Bay R...| 17934|\n",
"+--------------------+--------+\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
" \r"
]
}
],
"source": [
"spark.sql(\"\"\"\n",
"SELECT\n",
" CONCAT(pul.Zone, ' / ', dol.Zone) AS pu_do_pair,\n",
" COUNT(1)\n",
"FROM \n",
" fhvhv_2021_02 fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID\n",
" LEFT JOIN zones dol ON fhv.DOLocationID = dol.LocationID\n",
"GROUP BY \n",
" 1\n",
"ORDER BY\n",
" 2 DESC\n",
"LIMIT 5;\n",
"\"\"\").show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e4b754d1",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}