Files
Alexey Grigorev 3eadd01037 code for week 5
2022-02-16 22:21:35 +00:00

562 lines
18 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "07de9dc3",
"metadata": {},
"outputs": [],
"source": [
"import pyspark\n",
"from pyspark.sql import SparkSession"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "ca5bbb06",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING: An illegal reflective access operation has occurred\n",
"WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alexey/spark/spark-3.0.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)\n",
"WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n",
"WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n",
"WARNING: All illegal access operations will be denied in a future release\n",
"22/02/16 21:11:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
"Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
"Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
]
}
],
"source": [
"spark = SparkSession.builder \\\n",
" .master(\"local[*]\") \\\n",
" .appName('test') \\\n",
" .getOrCreate()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "cf8de204",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"--2022-02-16 21:13:50-- https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-01.csv\n",
"Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.84.132\n",
"Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.84.132|:443... connected.\n",
"HTTP request sent, awaiting response... 200 OK\n",
"Length: 752335705 (717M) [text/csv]\n",
"Saving to: fhvhv_tripdata_2021-01.csv\n",
"\n",
"fhvhv_tripdata_2021 100%[===================>] 717.48M 35.6MB/s in 21s \n",
"\n",
"2022-02-16 21:14:11 (34.4 MB/s) - fhvhv_tripdata_2021-01.csv saved [752335705/752335705]\n",
"\n"
]
}
],
"source": [
"!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-01.csv"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "2a52087c",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"11908469 fhvhv_tripdata_2021-01.csv\r\n"
]
}
],
"source": [
"!wc -l fhvhv_tripdata_2021-01.csv"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "931021a7",
"metadata": {},
"outputs": [],
"source": [
"df = spark.read \\\n",
" .option(\"header\", \"true\") \\\n",
" .csv('fhvhv_tripdata_2021-01.csv')"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "d44b7839",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,StringType,true),StructField(DOLocationID,StringType,true),StructField(SR_Flag,StringType,true)))"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.schema"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "4249e790",
"metadata": {},
"outputs": [],
"source": [
"!head -n 1001 fhvhv_tripdata_2021-01.csv > head.csv"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "6894312c",
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "f3ca771b",
"metadata": {},
"outputs": [],
"source": [
"df_pandas = pd.read_csv('head.csv')"
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "f1066b4f",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"hvfhs_license_num object\n",
"dispatching_base_num object\n",
"pickup_datetime object\n",
"dropoff_datetime object\n",
"PULocationID int64\n",
"DOLocationID int64\n",
"SR_Flag float64\n",
"dtype: object"
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_pandas.dtypes"
]
},
{
"cell_type": "code",
"execution_count": 23,
"id": "f8413c9d",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(SR_Flag,DoubleType,true)))"
]
},
"execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"spark.createDataFrame(df_pandas).schema"
]
},
{
"cell_type": "markdown",
"id": "80f252c1",
"metadata": {},
"source": [
"Integer - 4 bytes\n",
"Long - 8 bytes"
]
},
{
"cell_type": "code",
"execution_count": 24,
"id": "16937bfd",
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import types"
]
},
{
"cell_type": "code",
"execution_count": 26,
"id": "fc61a99a",
"metadata": {},
"outputs": [],
"source": [
"schema = types.StructType([\n",
" types.StructField('hvfhs_license_num', types.StringType(), True),\n",
" types.StructField('dispatching_base_num', types.StringType(), True),\n",
" types.StructField('pickup_datetime', types.TimestampType(), True),\n",
" types.StructField('dropoff_datetime', types.TimestampType(), True),\n",
" types.StructField('PULocationID', types.IntegerType(), True),\n",
" types.StructField('DOLocationID', types.IntegerType(), True),\n",
" types.StructField('SR_Flag', types.StringType(), True)\n",
"])"
]
},
{
"cell_type": "code",
"execution_count": 32,
"id": "f94052ae",
"metadata": {},
"outputs": [],
"source": [
"df = spark.read \\\n",
" .option(\"header\", \"true\") \\\n",
" .schema(schema) \\\n",
" .csv('fhvhv_tripdata_2021-01.csv')"
]
},
{
"cell_type": "code",
"execution_count": 36,
"id": "c270d9d6",
"metadata": {},
"outputs": [],
"source": [
"df = df.repartition(24)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7796c2b2",
"metadata": {},
"outputs": [],
"source": [
"df.write.parquet('fhvhv/2021/01/')"
]
},
{
"cell_type": "code",
"execution_count": 44,
"id": "c3cab876",
"metadata": {},
"outputs": [],
"source": [
"df = spark.read.parquet('fhvhv/2021/01/')"
]
},
{
"cell_type": "code",
"execution_count": 48,
"id": "203b5627",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- hvfhs_license_num: string (nullable = true)\n",
" |-- dispatching_base_num: string (nullable = true)\n",
" |-- pickup_datetime: timestamp (nullable = true)\n",
" |-- dropoff_datetime: timestamp (nullable = true)\n",
" |-- PULocationID: integer (nullable = true)\n",
" |-- DOLocationID: integer (nullable = true)\n",
" |-- SR_Flag: string (nullable = true)\n",
"\n"
]
}
],
"source": [
"df.printSchema()"
]
},
{
"cell_type": "markdown",
"id": "64172a47",
"metadata": {},
"source": [
"SELECT * FROM df WHERE hvfhs_license_num = HV0003"
]
},
{
"cell_type": "code",
"execution_count": 56,
"id": "d24840a0",
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import functions as F"
]
},
{
"cell_type": "code",
"execution_count": 61,
"id": "3ab1ca44",
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+\n",
"|hvfhs_license_num|dispatching_base_num| pickup_datetime| dropoff_datetime|PULocationID|DOLocationID|SR_Flag|\n",
"+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+\n",
"| HV0005| B02510|2021-01-07 06:43:22|2021-01-07 06:55:06| 142| 230| null|\n",
"| HV0005| B02510|2021-01-01 16:01:26|2021-01-01 16:20:20| 133| 91| null|\n",
"| HV0003| B02764|2021-01-01 00:23:13|2021-01-01 00:30:35| 147| 159| null|\n",
"| HV0003| B02869|2021-01-06 11:43:12|2021-01-06 11:55:07| 79| 164| null|\n",
"| HV0003| B02884|2021-01-04 15:35:32|2021-01-04 15:52:02| 174| 18| null|\n",
"| HV0003| B02875|2021-01-04 13:42:15|2021-01-04 14:04:57| 201| 180| null|\n",
"| HV0005| B02510|2021-01-04 18:57:31|2021-01-04 19:09:55| 230| 142| null|\n",
"| HV0003| B02872|2021-01-03 18:42:03|2021-01-03 19:12:22| 132| 72| null|\n",
"| HV0004| B02800|2021-01-01 05:31:50|2021-01-01 05:40:03| 188| 61| null|\n",
"| HV0005| B02510|2021-01-04 20:21:47|2021-01-04 20:26:03| 97| 189| null|\n",
"| HV0003| B02764|2021-01-01 01:51:18|2021-01-01 02:05:32| 174| 235| null|\n",
"| HV0003| B02871|2021-01-05 10:20:54|2021-01-05 10:32:44| 35| 76| null|\n",
"| HV0005| B02510|2021-01-06 02:32:09|2021-01-06 02:43:35| 35| 39| null|\n",
"| HV0003| B02882|2021-01-04 12:34:52|2021-01-04 12:38:59| 231| 13| null|\n",
"| HV0003| B02617|2021-01-02 20:12:56|2021-01-02 20:41:18| 87| 127| null|\n",
"| HV0005| B02510|2021-01-02 16:55:48|2021-01-02 17:20:40| 17| 89| null|\n",
"| HV0003| B02869|2021-01-02 15:14:38|2021-01-02 15:23:27| 11| 14| null|\n",
"| HV0005| B02510|2021-01-01 05:54:50|2021-01-01 06:03:46| 21| 26| null|\n",
"| HV0003| B02869|2021-01-04 12:40:42|2021-01-04 12:48:34| 83| 260| null|\n",
"| HV0005| B02510|2021-01-01 14:58:57|2021-01-01 15:09:53| 189| 52| null|\n",
"+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"df.show()"
]
},
{
"cell_type": "code",
"execution_count": 63,
"id": "6d98c2ce",
"metadata": {},
"outputs": [],
"source": [
"def crazy_stuff(base_num):\n",
" num = int(base_num[1:])\n",
" if num % 7 == 0:\n",
" return f's/{num:03x}'\n",
" elif num % 3 == 0:\n",
" return f'a/{num:03x}'\n",
" else:\n",
" return f'e/{num:03x}'"
]
},
{
"cell_type": "code",
"execution_count": 65,
"id": "f3175419",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'s/b44'"
]
},
"execution_count": 65,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"crazy_stuff('B02884')"
]
},
{
"cell_type": "code",
"execution_count": 66,
"id": "9bb5d503",
"metadata": {},
"outputs": [],
"source": [
"crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())"
]
},
{
"cell_type": "code",
"execution_count": 67,
"id": "b38f0465",
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------+-----------+------------+------------+------------+\n",
"|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|\n",
"+-------+-----------+------------+------------+------------+\n",
"| e/9ce| 2021-01-07| 2021-01-07| 142| 230|\n",
"| e/9ce| 2021-01-01| 2021-01-01| 133| 91|\n",
"| e/acc| 2021-01-01| 2021-01-01| 147| 159|\n",
"| e/b35| 2021-01-06| 2021-01-06| 79| 164|\n",
"| s/b44| 2021-01-04| 2021-01-04| 174| 18|\n",
"| e/b3b| 2021-01-04| 2021-01-04| 201| 180|\n",
"| e/9ce| 2021-01-04| 2021-01-04| 230| 142|\n",
"| e/b38| 2021-01-03| 2021-01-03| 132| 72|\n",
"| s/af0| 2021-01-01| 2021-01-01| 188| 61|\n",
"| e/9ce| 2021-01-04| 2021-01-04| 97| 189|\n",
"| e/acc| 2021-01-01| 2021-01-01| 174| 235|\n",
"| a/b37| 2021-01-05| 2021-01-05| 35| 76|\n",
"| e/9ce| 2021-01-06| 2021-01-06| 35| 39|\n",
"| e/b42| 2021-01-04| 2021-01-04| 231| 13|\n",
"| e/a39| 2021-01-02| 2021-01-02| 87| 127|\n",
"| e/9ce| 2021-01-02| 2021-01-02| 17| 89|\n",
"| e/b35| 2021-01-02| 2021-01-02| 11| 14|\n",
"| e/9ce| 2021-01-01| 2021-01-01| 21| 26|\n",
"| e/b35| 2021-01-04| 2021-01-04| 83| 260|\n",
"| e/9ce| 2021-01-01| 2021-01-01| 189| 52|\n",
"+-------+-----------+------------+------------+------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"df \\\n",
" .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \\\n",
" .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \\\n",
" .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \\\n",
" .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \\\n",
" .show()"
]
},
{
"cell_type": "code",
"execution_count": 55,
"id": "00921644",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Row(pickup_datetime=datetime.datetime(2021, 1, 1, 0, 23, 13), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 30, 35), PULocationID=147, DOLocationID=159),\n",
" Row(pickup_datetime=datetime.datetime(2021, 1, 6, 11, 43, 12), dropoff_datetime=datetime.datetime(2021, 1, 6, 11, 55, 7), PULocationID=79, DOLocationID=164),\n",
" Row(pickup_datetime=datetime.datetime(2021, 1, 4, 15, 35, 32), dropoff_datetime=datetime.datetime(2021, 1, 4, 15, 52, 2), PULocationID=174, DOLocationID=18),\n",
" Row(pickup_datetime=datetime.datetime(2021, 1, 4, 13, 42, 15), dropoff_datetime=datetime.datetime(2021, 1, 4, 14, 4, 57), PULocationID=201, DOLocationID=180),\n",
" Row(pickup_datetime=datetime.datetime(2021, 1, 3, 18, 42, 3), dropoff_datetime=datetime.datetime(2021, 1, 3, 19, 12, 22), PULocationID=132, DOLocationID=72)]"
]
},
"execution_count": 55,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID') \\\n",
" .filter(df.hvfhs_license_num == 'HV0003')\n"
]
},
{
"cell_type": "code",
"execution_count": 50,
"id": "0866f9c0",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag\r",
"\r\n",
"HV0003,B02682,2021-01-01 00:33:44,2021-01-01 00:49:07,230,166,\r",
"\r\n",
"HV0003,B02682,2021-01-01 00:55:19,2021-01-01 01:18:21,152,167,\r",
"\r\n",
"HV0003,B02764,2021-01-01 00:23:56,2021-01-01 00:38:05,233,142,\r",
"\r\n",
"HV0003,B02764,2021-01-01 00:42:51,2021-01-01 00:45:50,142,143,\r",
"\r\n",
"HV0003,B02764,2021-01-01 00:48:14,2021-01-01 01:08:42,143,78,\r",
"\r\n",
"HV0005,B02510,2021-01-01 00:06:59,2021-01-01 00:43:01,88,42,\r",
"\r\n",
"HV0005,B02510,2021-01-01 00:50:00,2021-01-01 01:04:57,42,151,\r",
"\r\n",
"HV0003,B02764,2021-01-01 00:14:30,2021-01-01 00:50:27,71,226,\r",
"\r\n",
"HV0003,B02875,2021-01-01 00:22:54,2021-01-01 00:30:20,112,255,\r",
"\r\n"
]
}
],
"source": [
"!head -n 10 head.csv"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "aa1b0e18",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}