Files
2022-02-17 22:40:56 +00:00

871 lines
32 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "8c1d0c08",
"metadata": {},
"outputs": [],
"source": [
"import pyspark\n",
"from pyspark.sql import SparkSession"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "96a248f5",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"WARNING: An illegal reflective access operation has occurred\n",
"WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alexey/spark/spark-3.0.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.3.jar) to constructor java.nio.DirectByteBuffer(long,int)\n",
"WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n",
"WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n",
"WARNING: All illegal access operations will be denied in a future release\n",
"22/02/17 21:59:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
"Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
"Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
]
}
],
"source": [
"spark = SparkSession.builder \\\n",
" .master(\"local[*]\") \\\n",
" .appName('test') \\\n",
" .getOrCreate()"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "c53274b1",
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "5d8434e1",
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import types"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "a84c6c6d",
"metadata": {},
"outputs": [],
"source": [
"green_schema = types.StructType([\n",
" types.StructField(\"VendorID\", types.IntegerType(), True),\n",
" types.StructField(\"lpep_pickup_datetime\", types.TimestampType(), True),\n",
" types.StructField(\"lpep_dropoff_datetime\", types.TimestampType(), True),\n",
" types.StructField(\"store_and_fwd_flag\", types.StringType(), True),\n",
" types.StructField(\"RatecodeID\", types.IntegerType(), True),\n",
" types.StructField(\"PULocationID\", types.IntegerType(), True),\n",
" types.StructField(\"DOLocationID\", types.IntegerType(), True),\n",
" types.StructField(\"passenger_count\", types.IntegerType(), True),\n",
" types.StructField(\"trip_distance\", types.DoubleType(), True),\n",
" types.StructField(\"fare_amount\", types.DoubleType(), True),\n",
" types.StructField(\"extra\", types.DoubleType(), True),\n",
" types.StructField(\"mta_tax\", types.DoubleType(), True),\n",
" types.StructField(\"tip_amount\", types.DoubleType(), True),\n",
" types.StructField(\"tolls_amount\", types.DoubleType(), True),\n",
" types.StructField(\"ehail_fee\", types.DoubleType(), True),\n",
" types.StructField(\"improvement_surcharge\", types.DoubleType(), True),\n",
" types.StructField(\"total_amount\", types.DoubleType(), True),\n",
" types.StructField(\"payment_type\", types.IntegerType(), True),\n",
" types.StructField(\"trip_type\", types.IntegerType(), True),\n",
" types.StructField(\"congestion_surcharge\", types.DoubleType(), True)\n",
"])\n",
"\n",
"yellow_schema = types.StructType([\n",
" types.StructField(\"VendorID\", types.IntegerType(), True),\n",
" types.StructField(\"tpep_pickup_datetime\", types.TimestampType(), True),\n",
" types.StructField(\"tpep_dropoff_datetime\", types.TimestampType(), True),\n",
" types.StructField(\"passenger_count\", types.IntegerType(), True),\n",
" types.StructField(\"trip_distance\", types.DoubleType(), True),\n",
" types.StructField(\"RatecodeID\", types.IntegerType(), True),\n",
" types.StructField(\"store_and_fwd_flag\", types.StringType(), True),\n",
" types.StructField(\"PULocationID\", types.IntegerType(), True),\n",
" types.StructField(\"DOLocationID\", types.IntegerType(), True),\n",
" types.StructField(\"payment_type\", types.IntegerType(), True),\n",
" types.StructField(\"fare_amount\", types.DoubleType(), True),\n",
" types.StructField(\"extra\", types.DoubleType(), True),\n",
" types.StructField(\"mta_tax\", types.DoubleType(), True),\n",
" types.StructField(\"tip_amount\", types.DoubleType(), True),\n",
" types.StructField(\"tolls_amount\", types.DoubleType(), True),\n",
" types.StructField(\"improvement_surcharge\", types.DoubleType(), True),\n",
" types.StructField(\"total_amount\", types.DoubleType(), True),\n",
" types.StructField(\"congestion_surcharge\", types.DoubleType(), True)\n",
"])"
]
},
{
"cell_type": "code",
"execution_count": 27,
"id": "3f7e0cb9",
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/2\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/3\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/4\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/5\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/6\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/7\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/8\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/9\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/10\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/11\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/12\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"year = 2020\n",
"\n",
"for month in range(1, 13):\n",
" print(f'processing data for {year}/{month}')\n",
"\n",
" input_path = f'data/raw/green/{year}/{month:02d}/'\n",
" output_path = f'data/pq/green/{year}/{month:02d}/'\n",
"\n",
" df_green = spark.read \\\n",
" .option(\"header\", \"true\") \\\n",
" .schema(green_schema) \\\n",
" .csv(input_path)\n",
"\n",
" df_green \\\n",
" .repartition(4) \\\n",
" .write.parquet(output_path)"
]
},
{
"cell_type": "code",
"execution_count": 26,
"id": "96ac2ad7",
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2021/1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2021/2\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2021/3\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2021/4\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2021/5\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2021/6\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2021/7\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"[Stage 15:> (0 + 1) / 1]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2021/8\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
" \r"
]
},
{
"ename": "AnalysisException",
"evalue": "Path does not exist: file:/home/alexey/data-engineering-zoomcamp/week_5_batch_processing/code/data/raw/green/2021/08;",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mAnalysisException\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m/tmp/ipykernel_129101/906373977.py\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 7\u001b[0m \u001b[0moutput_path\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34mf'data/pq/green/{year}/{month:02d}/'\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 8\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 9\u001b[0;31m \u001b[0mdf_green\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mspark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 10\u001b[0m \u001b[0;34m.\u001b[0m\u001b[0moption\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"header\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"true\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 11\u001b[0m \u001b[0;34m.\u001b[0m\u001b[0mschema\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mgreen_schema\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/readwriter.py\u001b[0m in \u001b[0;36mcsv\u001b[0;34m(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup)\u001b[0m\n\u001b[1;32m 536\u001b[0m \u001b[0mpath\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 537\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mtype\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 538\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_df\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jreader\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcsv\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_spark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_sc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonUtils\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoSeq\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 539\u001b[0m \u001b[0;32melif\u001b[0m \u001b[0misinstance\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRDD\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 540\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0miterator\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1302\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1303\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1304\u001b[0;31m return_value = get_return_value(\n\u001b[0m\u001b[1;32m 1305\u001b[0m answer, self.gateway_client, self.target_id, self.name)\n\u001b[1;32m 1306\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 132\u001b[0m \u001b[0;31m# Hide where the exception came from that shows a non-Pythonic\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 133\u001b[0m \u001b[0;31m# JVM exception message.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 134\u001b[0;31m \u001b[0mraise_from\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mconverted\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 135\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 136\u001b[0m \u001b[0;32mraise\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mraise_from\u001b[0;34m(e)\u001b[0m\n",
"\u001b[0;31mAnalysisException\u001b[0m: Path does not exist: file:/home/alexey/data-engineering-zoomcamp/week_5_batch_processing/code/data/raw/green/2021/08;"
]
}
],
"source": [
"year = 2021 \n",
"\n",
"for month in range(1, 13):\n",
" print(f'processing data for {year}/{month}')\n",
"\n",
" input_path = f'data/raw/green/{year}/{month:02d}/'\n",
" output_path = f'data/pq/green/{year}/{month:02d}/'\n",
"\n",
" df_green = spark.read \\\n",
" .option(\"header\", \"true\") \\\n",
" .schema(green_schema) \\\n",
" .csv(input_path)\n",
"\n",
" df_green \\\n",
" .repartition(4) \\\n",
" .write.parquet(output_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "463c7dc8",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 23,
"id": "6ff4265d",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 24,
"id": "6e982d29",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 28,
"id": "19326bc9",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/2\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/3\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/4\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/5\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/6\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/7\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/8\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/9\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/10\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/11\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2020/12\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"year = 2020\n",
"\n",
"for month in range(1, 13):\n",
" print(f'processing data for {year}/{month}')\n",
"\n",
" input_path = f'data/raw/yellow/{year}/{month:02d}/'\n",
" output_path = f'data/pq/yellow/{year}/{month:02d}/'\n",
"\n",
" df_yellow = spark.read \\\n",
" .option(\"header\", \"true\") \\\n",
" .schema(yellow_schema) \\\n",
" .csv(input_path)\n",
"\n",
" df_yellow \\\n",
" .repartition(4) \\\n",
" .write.parquet(output_path)"
]
},
{
"cell_type": "code",
"execution_count": 29,
"id": "aeca811a",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2021/1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2021/2\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2021/3\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2021/4\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2021/5\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2021/6\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2021/7\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 78:===========================================> (3 + 1) / 4]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"processing data for 2021/8\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
" \r"
]
},
{
"ename": "AnalysisException",
"evalue": "Path does not exist: file:/home/alexey/data-engineering-zoomcamp/week_5_batch_processing/code/data/raw/yellow/2021/08;",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mAnalysisException\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m/tmp/ipykernel_129101/2088663510.py\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 7\u001b[0m \u001b[0moutput_path\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34mf'data/pq/yellow/{year}/{month:02d}/'\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 8\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 9\u001b[0;31m \u001b[0mdf_yellow\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mspark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 10\u001b[0m \u001b[0;34m.\u001b[0m\u001b[0moption\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"header\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"true\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 11\u001b[0m \u001b[0;34m.\u001b[0m\u001b[0mschema\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0myellow_schema\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;31m \u001b[0m\u001b[0;31m\\\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/readwriter.py\u001b[0m in \u001b[0;36mcsv\u001b[0;34m(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup)\u001b[0m\n\u001b[1;32m 536\u001b[0m \u001b[0mpath\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 537\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mtype\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 538\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_df\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jreader\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcsv\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_spark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_sc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonUtils\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoSeq\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 539\u001b[0m \u001b[0;32melif\u001b[0m \u001b[0misinstance\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRDD\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 540\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0miterator\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1302\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1303\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1304\u001b[0;31m return_value = get_return_value(\n\u001b[0m\u001b[1;32m 1305\u001b[0m answer, self.gateway_client, self.target_id, self.name)\n\u001b[1;32m 1306\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 132\u001b[0m \u001b[0;31m# Hide where the exception came from that shows a non-Pythonic\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 133\u001b[0m \u001b[0;31m# JVM exception message.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 134\u001b[0;31m \u001b[0mraise_from\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mconverted\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 135\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 136\u001b[0m \u001b[0;32mraise\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mraise_from\u001b[0;34m(e)\u001b[0m\n",
"\u001b[0;31mAnalysisException\u001b[0m: Path does not exist: file:/home/alexey/data-engineering-zoomcamp/week_5_batch_processing/code/data/raw/yellow/2021/08;"
]
}
],
"source": [
"year = 2021\n",
"\n",
"for month in range(1, 13):\n",
" print(f'processing data for {year}/{month}')\n",
"\n",
" input_path = f'data/raw/yellow/{year}/{month:02d}/'\n",
" output_path = f'data/pq/yellow/{year}/{month:02d}/'\n",
"\n",
" df_yellow = spark.read \\\n",
" .option(\"header\", \"true\") \\\n",
" .schema(yellow_schema) \\\n",
" .csv(input_path)\n",
"\n",
" df_yellow \\\n",
" .repartition(4) \\\n",
" .write.parquet(output_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d7eb0da9",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}