871 lines
32 KiB
Plaintext
871 lines
32 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 1,
|
|
"id": "8c1d0c08",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import pyspark\n",
|
|
"from pyspark.sql import SparkSession"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 2,
|
|
"id": "96a248f5",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"WARNING: An illegal reflective access operation has occurred\n",
|
|
"WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alexey/spark/spark-3.0.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)\n",
|
|
"WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n",
|
|
"WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n",
|
|
"WARNING: All illegal access operations will be denied in a future release\n",
|
|
"22/02/17 21:59:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
|
|
"Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
|
|
"Setting default log level to \"WARN\".\n",
|
|
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"spark = SparkSession.builder \\\n",
|
|
" .master(\"local[*]\") \\\n",
|
|
" .appName('test') \\\n",
|
|
" .getOrCreate()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 7,
|
|
"id": "c53274b1",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import pandas as pd"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 12,
|
|
"id": "5d8434e1",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from pyspark.sql import types"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 18,
|
|
"id": "a84c6c6d",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"green_schema = types.StructType([\n",
|
|
" types.StructField(\"VendorID\", types.IntegerType(), True),\n",
|
|
" types.StructField(\"lpep_pickup_datetime\", types.TimestampType(), True),\n",
|
|
" types.StructField(\"lpep_dropoff_datetime\", types.TimestampType(), True),\n",
|
|
" types.StructField(\"store_and_fwd_flag\", types.StringType(), True),\n",
|
|
" types.StructField(\"RatecodeID\", types.IntegerType(), True),\n",
|
|
" types.StructField(\"PULocationID\", types.IntegerType(), True),\n",
|
|
" types.StructField(\"DOLocationID\", types.IntegerType(), True),\n",
|
|
" types.StructField(\"passenger_count\", types.IntegerType(), True),\n",
|
|
" types.StructField(\"trip_distance\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"fare_amount\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"extra\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"mta_tax\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"tip_amount\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"tolls_amount\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"ehail_fee\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"improvement_surcharge\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"total_amount\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"payment_type\", types.IntegerType(), True),\n",
|
|
" types.StructField(\"trip_type\", types.IntegerType(), True),\n",
|
|
" types.StructField(\"congestion_surcharge\", types.DoubleType(), True)\n",
|
|
"])\n",
|
|
"\n",
|
|
"yellow_schema = types.StructType([\n",
|
|
" types.StructField(\"VendorID\", types.IntegerType(), True),\n",
|
|
" types.StructField(\"tpep_pickup_datetime\", types.TimestampType(), True),\n",
|
|
" types.StructField(\"tpep_dropoff_datetime\", types.TimestampType(), True),\n",
|
|
" types.StructField(\"passenger_count\", types.IntegerType(), True),\n",
|
|
" types.StructField(\"trip_distance\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"RatecodeID\", types.IntegerType(), True),\n",
|
|
" types.StructField(\"store_and_fwd_flag\", types.StringType(), True),\n",
|
|
" types.StructField(\"PULocationID\", types.IntegerType(), True),\n",
|
|
" types.StructField(\"DOLocationID\", types.IntegerType(), True),\n",
|
|
" types.StructField(\"payment_type\", types.IntegerType(), True),\n",
|
|
" types.StructField(\"fare_amount\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"extra\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"mta_tax\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"tip_amount\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"tolls_amount\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"improvement_surcharge\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"total_amount\", types.DoubleType(), True),\n",
|
|
" types.StructField(\"congestion_surcharge\", types.DoubleType(), True)\n",
|
|
"])"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 27,
|
|
"id": "3f7e0cb9",
|
|
"metadata": {
|
|
"scrolled": true
|
|
},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/1\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/2\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/3\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/4\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/5\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/6\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/7\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/8\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/9\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/10\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/11\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/12\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"year = 2020\n",
|
|
"\n",
|
|
"for month in range(1, 13):\n",
|
|
" print(f'processing data for {year}/{month}')\n",
|
|
"\n",
|
|
" input_path = f'data/raw/green/{year}/{month:02d}/'\n",
|
|
" output_path = f'data/pq/green/{year}/{month:02d}/'\n",
|
|
"\n",
|
|
" df_green = spark.read \\\n",
|
|
" .option(\"header\", \"true\") \\\n",
|
|
" .schema(green_schema) \\\n",
|
|
" .csv(input_path)\n",
|
|
"\n",
|
|
" df_green \\\n",
|
|
" .repartition(4) \\\n",
|
|
" .write.parquet(output_path)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 26,
|
|
"id": "96ac2ad7",
|
|
"metadata": {
|
|
"scrolled": true
|
|
},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2021/1\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2021/2\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2021/3\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2021/4\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2021/5\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2021/6\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2021/7\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"\r",
|
|
"[Stage 15:> (0 + 1) / 1]\r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2021/8\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"\r",
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"ename": "AnalysisException",
|
|
"evalue": "Path does not exist: file:/home/alexey/data-engineering-zoomcamp/week_5_batch_processing/code/data/raw/green/2021/08;",
|
|
"output_type": "error",
|
|
"traceback": [
|
|
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
|
|
"\u001b[0;31mAnalysisException\u001b[0m Traceback (most recent call last)",
|
|
"\u001b[0;32m/tmp/ipykernel_129101/906373977.py\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 7\u001b[0m \u001b[0moutput_path\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34mf'data/pq/green/{year}/{month:02d}/'\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 8\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 9\u001b[0;31m \u001b[0mdf_green\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mspark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 10\u001b[0m \u001b[0;34m.\u001b[0m\u001b[0moption\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"header\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"true\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 11\u001b[0m \u001b[0;34m.\u001b[0m\u001b[0mschema\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mgreen_schema\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
|
|
"\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/readwriter.py\u001b[0m in \u001b[0;36mcsv\u001b[0;34m(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup)\u001b[0m\n\u001b[1;32m 536\u001b[0m \u001b[0mpath\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 537\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mtype\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 538\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_df\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jreader\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcsv\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_spark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_sc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonUtils\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoSeq\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 539\u001b[0m \u001b[0;32melif\u001b[0m \u001b[0misinstance\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRDD\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 540\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0miterator\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
|
|
"\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1302\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1303\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1304\u001b[0;31m return_value = get_return_value(\n\u001b[0m\u001b[1;32m 1305\u001b[0m answer, self.gateway_client, self.target_id, self.name)\n\u001b[1;32m 1306\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
|
|
"\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 132\u001b[0m \u001b[0;31m# Hide where the exception came from that shows a non-Pythonic\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 133\u001b[0m \u001b[0;31m# JVM exception message.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 134\u001b[0;31m \u001b[0mraise_from\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mconverted\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 135\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 136\u001b[0m \u001b[0;32mraise\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
|
|
"\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mraise_from\u001b[0;34m(e)\u001b[0m\n",
|
|
"\u001b[0;31mAnalysisException\u001b[0m: Path does not exist: file:/home/alexey/data-engineering-zoomcamp/week_5_batch_processing/code/data/raw/green/2021/08;"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"year = 2021 \n",
|
|
"\n",
|
|
"for month in range(1, 13):\n",
|
|
" print(f'processing data for {year}/{month}')\n",
|
|
"\n",
|
|
" input_path = f'data/raw/green/{year}/{month:02d}/'\n",
|
|
" output_path = f'data/pq/green/{year}/{month:02d}/'\n",
|
|
"\n",
|
|
" df_green = spark.read \\\n",
|
|
" .option(\"header\", \"true\") \\\n",
|
|
" .schema(green_schema) \\\n",
|
|
" .csv(input_path)\n",
|
|
"\n",
|
|
" df_green \\\n",
|
|
" .repartition(4) \\\n",
|
|
" .write.parquet(output_path)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "463c7dc8",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": []
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 23,
|
|
"id": "6ff4265d",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": []
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 24,
|
|
"id": "6e982d29",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": []
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 28,
|
|
"id": "19326bc9",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/1\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/2\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/3\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/4\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/5\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/6\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/7\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/8\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/9\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/10\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/11\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2020/12\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"year = 2020\n",
|
|
"\n",
|
|
"for month in range(1, 13):\n",
|
|
" print(f'processing data for {year}/{month}')\n",
|
|
"\n",
|
|
" input_path = f'data/raw/yellow/{year}/{month:02d}/'\n",
|
|
" output_path = f'data/pq/yellow/{year}/{month:02d}/'\n",
|
|
"\n",
|
|
" df_yellow = spark.read \\\n",
|
|
" .option(\"header\", \"true\") \\\n",
|
|
" .schema(yellow_schema) \\\n",
|
|
" .csv(input_path)\n",
|
|
"\n",
|
|
" df_yellow \\\n",
|
|
" .repartition(4) \\\n",
|
|
" .write.parquet(output_path)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 29,
|
|
"id": "aeca811a",
|
|
"metadata": {},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2021/1\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2021/2\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2021/3\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2021/4\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2021/5\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2021/6\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2021/7\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"[Stage 78:===========================================> (3 + 1) / 4]\r"
|
|
]
|
|
},
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"processing data for 2021/8\n"
|
|
]
|
|
},
|
|
{
|
|
"name": "stderr",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"\r",
|
|
" \r"
|
|
]
|
|
},
|
|
{
|
|
"ename": "AnalysisException",
|
|
"evalue": "Path does not exist: file:/home/alexey/data-engineering-zoomcamp/week_5_batch_processing/code/data/raw/yellow/2021/08;",
|
|
"output_type": "error",
|
|
"traceback": [
|
|
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
|
|
"\u001b[0;31mAnalysisException\u001b[0m Traceback (most recent call last)",
|
|
"\u001b[0;32m/tmp/ipykernel_129101/2088663510.py\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 7\u001b[0m \u001b[0moutput_path\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34mf'data/pq/yellow/{year}/{month:02d}/'\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 8\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 9\u001b[0;31m \u001b[0mdf_yellow\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mspark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 10\u001b[0m \u001b[0;34m.\u001b[0m\u001b[0moption\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"header\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"true\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 11\u001b[0m \u001b[0;34m.\u001b[0m\u001b[0mschema\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0myellow_schema\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
|
|
"\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/readwriter.py\u001b[0m in \u001b[0;36mcsv\u001b[0;34m(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup)\u001b[0m\n\u001b[1;32m 536\u001b[0m \u001b[0mpath\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 537\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mtype\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 538\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_df\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jreader\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcsv\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_spark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_sc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonUtils\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoSeq\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 539\u001b[0m \u001b[0;32melif\u001b[0m \u001b[0misinstance\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRDD\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 540\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0miterator\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
|
|
"\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1302\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1303\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1304\u001b[0;31m return_value = get_return_value(\n\u001b[0m\u001b[1;32m 1305\u001b[0m answer, self.gateway_client, self.target_id, self.name)\n\u001b[1;32m 1306\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
|
|
"\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 132\u001b[0m \u001b[0;31m# Hide where the exception came from that shows a non-Pythonic\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 133\u001b[0m \u001b[0;31m# JVM exception message.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 134\u001b[0;31m \u001b[0mraise_from\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mconverted\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 135\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 136\u001b[0m \u001b[0;32mraise\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
|
|
"\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mraise_from\u001b[0;34m(e)\u001b[0m\n",
|
|
"\u001b[0;31mAnalysisException\u001b[0m: Path does not exist: file:/home/alexey/data-engineering-zoomcamp/week_5_batch_processing/code/data/raw/yellow/2021/08;"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"year = 2021\n",
|
|
"\n",
|
|
"for month in range(1, 13):\n",
|
|
" print(f'processing data for {year}/{month}')\n",
|
|
"\n",
|
|
" input_path = f'data/raw/yellow/{year}/{month:02d}/'\n",
|
|
" output_path = f'data/pq/yellow/{year}/{month:02d}/'\n",
|
|
"\n",
|
|
" df_yellow = spark.read \\\n",
|
|
" .option(\"header\", \"true\") \\\n",
|
|
" .schema(yellow_schema) \\\n",
|
|
" .csv(input_path)\n",
|
|
"\n",
|
|
" df_yellow \\\n",
|
|
" .repartition(4) \\\n",
|
|
" .write.parquet(output_path)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"id": "d7eb0da9",
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": []
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "Python 3 (ipykernel)",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.9.7"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 5
|
|
}
|