Files
Alexey Grigorev 301e2b0dda rdd
2022-02-21 23:49:41 +00:00

487 lines
11 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "d66f42fd",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING: An illegal reflective access operation has occurred\n",
"WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alexey/spark/spark-3.0.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)\n",
"WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n",
"WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n",
"WARNING: All illegal access operations will be denied in a future release\n",
"22/02/21 22:25:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
"Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
"Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
]
}
],
"source": [
"import pyspark\n",
"from pyspark.sql import SparkSession\n",
"\n",
"spark = SparkSession.builder \\\n",
" .master(\"local[*]\") \\\n",
" .appName('test') \\\n",
" .getOrCreate()"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "646fc343",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"[Stage 0:> (0 + 1) / 1]\r",
"\r",
" \r"
]
}
],
"source": [
"df_green = spark.read.parquet('data/pq/green/*/*')"
]
},
{
"cell_type": "markdown",
"id": "196cccd5",
"metadata": {},
"source": [
"```\n",
"SELECT \n",
" date_trunc('hour', lpep_pickup_datetime) AS hour, \n",
" PULocationID AS zone,\n",
"\n",
" SUM(total_amount) AS amount,\n",
" COUNT(1) AS number_records\n",
"FROM\n",
" green\n",
"WHERE\n",
" lpep_pickup_datetime >= '2020-01-01 00:00:00'\n",
"GROUP BY\n",
" 1, 2\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "74fe52cb",
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"rdd = df_green \\\n",
" .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \\\n",
" .rdd"
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "1a0bf382",
"metadata": {},
"outputs": [],
"source": [
"from datetime import datetime"
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "fa2b00f1",
"metadata": {},
"outputs": [],
"source": [
"start = datetime(year=2020, month=1, day=1)\n",
"\n",
"def filter_outliers(row):\n",
" return row.lpep_pickup_datetime >= start"
]
},
{
"cell_type": "code",
"execution_count": 21,
"id": "69dd326d",
"metadata": {},
"outputs": [],
"source": [
"rows = rdd.take(10)\n",
"row = rows[0]"
]
},
{
"cell_type": "code",
"execution_count": 29,
"id": "cd4b7006",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 16, 19, 49, 27), PULocationID=260, total_amount=14.3)"
]
},
"execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"row"
]
},
{
"cell_type": "code",
"execution_count": 31,
"id": "d99eb089",
"metadata": {},
"outputs": [],
"source": [
"def prepare_for_grouping(row): \n",
" hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)\n",
" zone = row.PULocationID\n",
" key = (hour, zone)\n",
" \n",
" amount = row.total_amount\n",
" count = 1\n",
" value = (amount, count)\n",
"\n",
" return (key, value)"
]
},
{
"cell_type": "code",
"execution_count": 34,
"id": "cb328a44",
"metadata": {},
"outputs": [],
"source": [
"def calculate_revenue(left_value, right_value):\n",
" left_amount, left_count = left_value\n",
" right_amount, right_count = right_value\n",
" \n",
" output_amount = left_amount + right_amount\n",
" output_count = left_count + right_count\n",
" \n",
" return (output_amount, output_count)"
]
},
{
"cell_type": "code",
"execution_count": 39,
"id": "2ea260f1",
"metadata": {},
"outputs": [],
"source": [
"from collections import namedtuple"
]
},
{
"cell_type": "code",
"execution_count": 40,
"id": "7dae6064",
"metadata": {},
"outputs": [],
"source": [
"RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])"
]
},
{
"cell_type": "code",
"execution_count": 41,
"id": "e0a98ee4",
"metadata": {},
"outputs": [],
"source": [
"def unwrap(row):\n",
" return RevenueRow(\n",
" hour=row[0][0], \n",
" zone=row[0][1],\n",
" revenue=row[1][0],\n",
" count=row[1][1]\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": 45,
"id": "a09200b8",
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import types"
]
},
{
"cell_type": "code",
"execution_count": 46,
"id": "5c14d15e",
"metadata": {},
"outputs": [],
"source": [
"result_schema = types.StructType([\n",
" types.StructField('hour', types.TimestampType(), True),\n",
" types.StructField('zone', types.IntegerType(), True),\n",
" types.StructField('revenue', types.DoubleType(), True),\n",
" types.StructField('count', types.IntegerType(), True)\n",
"])"
]
},
{
"cell_type": "code",
"execution_count": 47,
"id": "56ea72ff",
"metadata": {},
"outputs": [],
"source": [
"df_result = rdd \\\n",
" .filter(filter_outliers) \\\n",
" .map(prepare_for_grouping) \\\n",
" .reduceByKey(calculate_revenue) \\\n",
" .map(unwrap) \\\n",
" .toDF(result_schema) "
]
},
{
"cell_type": "code",
"execution_count": 50,
"id": "4675bd3f",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"df_result.write.parquet('tmp/green-revenue')"
]
},
{
"cell_type": "code",
"execution_count": 55,
"id": "255b5503",
"metadata": {},
"outputs": [],
"source": [
"columns = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance']\n",
"\n",
"duration_rdd = df_green \\\n",
" .select(columns) \\\n",
" .rdd"
]
},
{
"cell_type": "code",
"execution_count": 67,
"id": "645c3190",
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": 68,
"id": "921e4ef9",
"metadata": {},
"outputs": [],
"source": [
"rows = duration_rdd.take(10)"
]
},
{
"cell_type": "code",
"execution_count": 81,
"id": "f50db3eb",
"metadata": {},
"outputs": [],
"source": [
"df = pd.DataFrame(rows, columns=columns)"
]
},
{
"cell_type": "code",
"execution_count": 74,
"id": "5b8ecc53",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['VendorID',\n",
" 'lpep_pickup_datetime',\n",
" 'PULocationID',\n",
" 'DOLocationID',\n",
" 'trip_distance']"
]
},
"execution_count": 74,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"columns"
]
},
{
"cell_type": "code",
"execution_count": 76,
"id": "6766c0f8",
"metadata": {},
"outputs": [],
"source": [
"#model = ...\n",
"\n",
"def model_predict(df):\n",
"# y_pred = model.predict(df)\n",
" y_pred = df.trip_distance * 5\n",
" return y_pred"
]
},
{
"cell_type": "code",
"execution_count": 98,
"id": "7437b848",
"metadata": {},
"outputs": [],
"source": [
"def apply_model_in_batch(rows):\n",
" df = pd.DataFrame(rows, columns=columns)\n",
" predictions = model_predict(df)\n",
" df['predicted_duration'] = predictions\n",
"\n",
" for row in df.itertuples():\n",
" yield row"
]
},
{
"cell_type": "code",
"execution_count": 102,
"id": "580b5845",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"df_predicts = duration_rdd \\\n",
" .mapPartitions(apply_model_in_batch)\\\n",
" .toDF() \\\n",
" .drop('Index')"
]
},
{
"cell_type": "code",
"execution_count": 104,
"id": "6055d543",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"[Stage 48:> (0 + 1) / 1]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+------------------+\n",
"|predicted_duration|\n",
"+------------------+\n",
"| 12.95|\n",
"| 31.25|\n",
"| 14.0|\n",
"| 12.75|\n",
"| 0.1|\n",
"| 11.05|\n",
"|11.299999999999999|\n",
"|54.349999999999994|\n",
"| 15.25|\n",
"| 91.75|\n",
"| 12.25|\n",
"| 3.1|\n",
"| 7.5|\n",
"|11.899999999999999|\n",
"| 78.89999999999999|\n",
"| 4.45|\n",
"| 23.2|\n",
"| 4.85|\n",
"| 6.65|\n",
"| 15.1|\n",
"+------------------+\n",
"only showing top 20 rows\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
" \r"
]
}
],
"source": [
"df_predicts.select('predicted_duration').show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9e91d243",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}