{ "cells": [ { "cell_type": "code", "execution_count": 5, "id": "00bc6543", "metadata": {}, "outputs": [], "source": [ "import pyspark\n", "from pyspark.sql import SparkSession\n", "from pyspark.sql import types" ] }, { "cell_type": "code", "execution_count": 2, "id": "cd4a0f3d", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING: An illegal reflective access operation has occurred\n", "WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alexey/spark/spark-3.0.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)\n", "WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n", "WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n", "WARNING: All illegal access operations will be denied in a future release\n", "22/03/07 21:55:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n", "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n", "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n" ] } ], "source": [ "spark = SparkSession.builder \\\n", " .master(\"local[*]\") \\\n", " .appName('test') \\\n", " .getOrCreate()" ] }, { "cell_type": "code", "execution_count": 3, "id": "eb3e4c36", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'3.0.3'" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.version" ] }, { "cell_type": "code", "execution_count": 4, "id": "5236cebd", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-rw-rw-r-- 1 alexey alexey 700M Oct 29 18:53 fhvhv_tripdata_2021-02.csv\r\n" ] } ], "source": [ "!ls -lh fhvhv_tripdata_2021-02.csv" ] }, { "cell_type": "code", "execution_count": 6, "id": "0a3399a3", "metadata": {}, "outputs": [], "source": [ "schema = types.StructType([\n", " types.StructField('hvfhs_license_num', types.StringType(), True),\n", " types.StructField('dispatching_base_num', types.StringType(), True),\n", " types.StructField('pickup_datetime', types.TimestampType(), True),\n", " types.StructField('dropoff_datetime', types.TimestampType(), True),\n", " types.StructField('PULocationID', types.IntegerType(), True),\n", " types.StructField('DOLocationID', types.IntegerType(), True),\n", " types.StructField('SR_Flag', types.StringType(), True)\n", "])" ] }, { "cell_type": "code", "execution_count": 8, "id": "68bc8b72", "metadata": {}, "outputs": [], "source": [ "df = spark.read \\\n", " .option(\"header\", \"true\") \\\n", " .schema(schema) \\\n", " .csv('fhvhv_tripdata_2021-02.csv')\n", "\n", "df = df.repartition(24)\n", "\n", "df.write.parquet('data/pq/fhvhv/2021/02/', compression=)" ] }, { "cell_type": "code", "execution_count": 16, "id": "58989b55", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "\r", "[Stage 0:> (0 + 1) / 1]\r", "\r", " \r" ] } ], "source": [ "df = spark.read.parquet('data/pq/fhvhv/2021/02/')" ] }, { "cell_type": "markdown", "id": "48b01d2f", "metadata": {}, "source": [ "**Q3**: How many taxi trips were there on February 15?" ] }, { "cell_type": "code", "execution_count": 18, "id": "f7489aea", "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import functions as F" ] }, { "cell_type": "code", "execution_count": 24, "id": "6c2500fd", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] }, { "data": { "text/plain": [ "367170" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df \\\n", " .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \\\n", " .filter(\"pickup_date = '2021-02-15'\") \\\n", " .count()" ] }, { "cell_type": "code", "execution_count": 25, "id": "dd7ae60d", "metadata": {}, "outputs": [], "source": [ "df.registerTempTable('fhvhv_2021_02')" ] }, { "cell_type": "code", "execution_count": 28, "id": "6d47c147", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "\r", "[Stage 20:> (0 + 4) / 4]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+--------+\n", "|count(1)|\n", "+--------+\n", "| 367170|\n", "+--------+\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\r", "[Stage 20:==============> (1 + 3) / 4]\r", "\r", " \r" ] } ], "source": [ "spark.sql(\"\"\"\n", "SELECT\n", " COUNT(1)\n", "FROM \n", " fhvhv_2021_02\n", "WHERE\n", " to_date(pickup_datetime) = '2021-02-15';\n", "\"\"\").show()" ] }, { "cell_type": "markdown", "id": "ae3f533b", "metadata": {}, "source": [ "**Q4**: Longest trip for each day" ] }, { "cell_type": "code", "execution_count": 29, "id": "7befe422", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['hvfhs_license_num',\n", " 'dispatching_base_num',\n", " 'pickup_datetime',\n", " 'dropoff_datetime',\n", " 'PULocationID',\n", " 'DOLocationID',\n", " 'SR_Flag']" ] }, "execution_count": 29, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.columns" ] }, { "cell_type": "code", "execution_count": 36, "id": "279d9161", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 37:==============> (1 + 3) / 4]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+-----------+-------------+\n", "|pickup_date|max(duration)|\n", "+-----------+-------------+\n", "| 2021-02-11| 75540|\n", "| 2021-02-17| 57221|\n", "| 2021-02-20| 44039|\n", "| 2021-02-03| 40653|\n", "| 2021-02-19| 37577|\n", "+-----------+-------------+\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\r", "[Stage 38:==================================================> (187 + 4) / 200]\r", "\r", " \r" ] } ], "source": [ "df \\\n", " .withColumn('duration', df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')) \\\n", " .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \\\n", " .groupBy('pickup_date') \\\n", " .max('duration') \\\n", " .orderBy('max(duration)', ascending=False) \\\n", " .limit(5) \\\n", " .show()" ] }, { "cell_type": "code", "execution_count": 38, "id": "74cf0e8b", "metadata": { "scrolled": true }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "\r", "[Stage 43:> (0 + 4) / 4]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+-----------+-----------------+\n", "|pickup_date| duration|\n", "+-----------+-----------------+\n", "| 2021-02-11| 1259.0|\n", "| 2021-02-17|953.6833333333333|\n", "| 2021-02-20|733.9833333333333|\n", "| 2021-02-03| 677.55|\n", "| 2021-02-19|626.2833333333333|\n", "| 2021-02-25| 583.5|\n", "| 2021-02-18|576.8666666666667|\n", "| 2021-02-10|569.4833333333333|\n", "| 2021-02-21| 537.05|\n", "| 2021-02-09|534.7833333333333|\n", "+-----------+-----------------+\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\r", "[Stage 44:================================================> (180 + 4) / 200]\r", "\r", " \r" ] } ], "source": [ "spark.sql(\"\"\"\n", "SELECT\n", " to_date(pickup_datetime) AS pickup_date,\n", " MAX((CAST(dropoff_datetime AS LONG) - CAST(pickup_datetime AS LONG)) / 60) AS duration\n", "FROM \n", " fhvhv_2021_02\n", "GROUP BY\n", " 1\n", "ORDER BY\n", " 2 DESC\n", "LIMIT 10;\n", "\"\"\").show()" ] }, { "cell_type": "markdown", "id": "d915096b", "metadata": {}, "source": [ "**Q5**: Most frequent `dispatching_base_num`\n", "\n", "How many stages this spark job has?\n", "\n" ] }, { "cell_type": "code", "execution_count": 44, "id": "25816aa2", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "\r", "[Stage 73:> (0 + 4) / 4]\r", "\r", "[Stage 73:==============> (1 + 3) / 4]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+--------+\n", "|dispatching_base_num|count(1)|\n", "+--------------------+--------+\n", "| B02510| 3233664|\n", "| B02764| 965568|\n", "| B02872| 882689|\n", "| B02875| 685390|\n", "| B02765| 559768|\n", "+--------------------+--------+\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\r", "[Stage 74:===================================================> (189 + 5) / 200]\r", "\r", " \r" ] } ], "source": [ "spark.sql(\"\"\"\n", "SELECT\n", " dispatching_base_num,\n", " COUNT(1)\n", "FROM \n", " fhvhv_2021_02\n", "GROUP BY\n", " 1\n", "ORDER BY\n", " 2 DESC\n", "LIMIT 5;\n", "\"\"\").show()" ] }, { "cell_type": "code", "execution_count": 46, "id": "a78f9fe3", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "\r", "[Stage 86:> (0 + 4) / 4]\r", "\r", "[Stage 86:=============================> (2 + 2) / 4]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+-------+\n", "|dispatching_base_num| count|\n", "+--------------------+-------+\n", "| B02510|3233664|\n", "| B02764| 965568|\n", "| B02872| 882689|\n", "| B02875| 685390|\n", "| B02765| 559768|\n", "+--------------------+-------+\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\r", "[Stage 87:===========================================> (161 + 5) / 200]\r", "\r", " \r" ] } ], "source": [ "df \\\n", " .groupBy('dispatching_base_num') \\\n", " .count() \\\n", " .orderBy('count', ascending=False) \\\n", " .limit(5) \\\n", " .show()" ] }, { "cell_type": "markdown", "id": "0d10173a", "metadata": {}, "source": [ "**Q6**: Most common locations pair" ] }, { "cell_type": "code", "execution_count": 47, "id": "74b7f664", "metadata": {}, "outputs": [], "source": [ "df_zones = spark.read.parquet('zones')" ] }, { "cell_type": "code", "execution_count": 49, "id": "81642d3b", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['LocationID', 'Borough', 'Zone', 'service_zone']" ] }, "execution_count": 49, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df_zones.columns" ] }, { "cell_type": "code", "execution_count": 51, "id": "4f460dda", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['hvfhs_license_num',\n", " 'dispatching_base_num',\n", " 'pickup_datetime',\n", " 'dropoff_datetime',\n", " 'PULocationID',\n", " 'DOLocationID',\n", " 'SR_Flag']" ] }, "execution_count": 51, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.columns" ] }, { "cell_type": "code", "execution_count": 50, "id": "ad8f0101", "metadata": {}, "outputs": [], "source": [ "df_zones.registerTempTable('zones')" ] }, { "cell_type": "code", "execution_count": 57, "id": "6f738414", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "[Stage 103:==============================================> (176 + 4) / 200]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+--------+\n", "| pu_do_pair|count(1)|\n", "+--------------------+--------+\n", "|East New York / E...| 45041|\n", "|Borough Park / Bo...| 37329|\n", "| Canarsie / Canarsie| 28026|\n", "|Crown Heights Nor...| 25976|\n", "|Bay Ridge / Bay R...| 17934|\n", "+--------------------+--------+\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\r", " \r" ] } ], "source": [ "spark.sql(\"\"\"\n", "SELECT\n", " CONCAT(pul.Zone, ' / ', dol.Zone) AS pu_do_pair,\n", " COUNT(1)\n", "FROM \n", " fhvhv_2021_02 fhv LEFT JOIN zones pul ON fhv.PULocationID = pul.LocationID\n", " LEFT JOIN zones dol ON fhv.DOLocationID = dol.LocationID\n", "GROUP BY \n", " 1\n", "ORDER BY\n", " 2 DESC\n", "LIMIT 5;\n", "\"\"\").show()" ] }, { "cell_type": "code", "execution_count": null, "id": "e4b754d1", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.7" } }, "nbformat": 4, "nbformat_minor": 5 }