{ "cells": [ { "cell_type": "code", "execution_count": 1, "id": "4341e0e6", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "WARNING: An illegal reflective access operation has occurred\n", "WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alexey/spark/spark-3.0.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)\n", "WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n", "WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n", "WARNING: All illegal access operations will be denied in a future release\n", "22/02/18 21:41:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n", "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n", "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n" ] } ], "source": [ "import pyspark\n", "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.builder \\\n", " .master(\"local[*]\") \\\n", " .appName('test') \\\n", " .getOrCreate()" ] }, { "cell_type": "code", "execution_count": 2, "id": "cd304aec", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "df_green = spark.read.parquet('data/pq/green/*/*')" ] }, { "cell_type": "code", "execution_count": 3, "id": "243991f3", "metadata": {}, "outputs": [], "source": [ "df_green.registerTempTable('green')" ] }, { "cell_type": "code", "execution_count": 18, "id": "e43764a7", "metadata": {}, "outputs": [], "source": [ "df_green_revenue = spark.sql(\"\"\"\n", "SELECT \n", " date_trunc('hour', lpep_pickup_datetime) AS hour, \n", " PULocationID AS zone,\n", "\n", " SUM(total_amount) AS amount,\n", " COUNT(1) AS number_records\n", "FROM\n", " green\n", "WHERE\n", " lpep_pickup_datetime >= '2020-01-01 00:00:00'\n", "GROUP BY\n", " 1, 2\n", "\"\"\")" ] }, { "cell_type": "code", "execution_count": 26, "id": "3e00310e", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "df_green_revenue \\\n", " .repartition(20) \\\n", " .write.parquet('data/report/revenue/green', mode='overwrite')" ] }, { "cell_type": "code", "execution_count": 20, "id": "07ebb68c", "metadata": {}, "outputs": [], "source": [ "df_yellow = spark.read.parquet('data/pq/yellow/*/*')\n", "df_yellow.registerTempTable('yellow')" ] }, { "cell_type": "code", "execution_count": 22, "id": "9d5be29d", "metadata": {}, "outputs": [], "source": [ "df_yellow_revenue = spark.sql(\"\"\"\n", "SELECT \n", " date_trunc('hour', tpep_pickup_datetime) AS hour, \n", " PULocationID AS zone,\n", "\n", " SUM(total_amount) AS amount,\n", " COUNT(1) AS number_records\n", "FROM\n", " yellow\n", "WHERE\n", " tpep_pickup_datetime >= '2020-01-01 00:00:00'\n", "GROUP BY\n", " 1, 2\n", "\"\"\")" ] }, { "cell_type": "code", "execution_count": 27, "id": "8bd9264e", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "df_yellow_revenue \\\n", " .repartition(20) \\\n", " .write.parquet('data/report/revenue/yellow', mode='overwrite')" ] }, { "cell_type": "code", "execution_count": 46, "id": "fd5d74d7", "metadata": {}, "outputs": [], "source": [ "df_green_revenue = spark.read.parquet('data/report/revenue/green')\n", "df_yellow_revenue = spark.read.parquet('data/report/revenue/yellow')" ] }, { "cell_type": "code", "execution_count": 47, "id": "35015ee6", "metadata": {}, "outputs": [], "source": [ "df_green_revenue_tmp = df_green_revenue \\\n", " .withColumnRenamed('amount', 'green_amount') \\\n", " .withColumnRenamed('number_records', 'green_number_records')\n", "\n", "df_yellow_revenue_tmp = df_yellow_revenue \\\n", " .withColumnRenamed('amount', 'yellow_amount') \\\n", " .withColumnRenamed('number_records', 'yellow_number_records')" ] }, { "cell_type": "code", "execution_count": 48, "id": "ec9f34ea", "metadata": {}, "outputs": [], "source": [ "df_join = df_green_revenue_tmp.join(df_yellow_revenue_tmp, on=['hour', 'zone'], how='outer')" ] }, { "cell_type": "code", "execution_count": 50, "id": "10238be7", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "df_join.write.parquet('data/report/revenue/total', mode='overwrite')" ] }, { "cell_type": "code", "execution_count": 51, "id": "c3af7169", "metadata": {}, "outputs": [], "source": [ "df_join = spark.read.parquet('data/report/revenue/total')" ] }, { "cell_type": "code", "execution_count": 56, "id": "bc2a6680", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[hour: timestamp, zone: int, green_amount: double, green_number_records: bigint, yellow_amount: double, yellow_number_records: bigint]" ] }, "execution_count": 56, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df_join" ] }, { "cell_type": "code", "execution_count": 54, "id": "abb46398", "metadata": {}, "outputs": [], "source": [ "df_zones = spark.read.parquet('zones/')" ] }, { "cell_type": "code", "execution_count": 57, "id": "b3cf98a5", "metadata": {}, "outputs": [], "source": [ "df_result = df_join.join(df_zones, df_join.zone == df_zones.LocationID)" ] }, { "cell_type": "code", "execution_count": 62, "id": "5e0614ba", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "df_result.drop('LocationID', 'zone').write.parquet('tmp/revenue-zones')" ] }, { "cell_type": "code", "execution_count": null, "id": "9f5ca913", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.7" } }, "nbformat": 4, "nbformat_minor": 5 }