import findspark
findspark.init()
Writing dataframe to CSV
Based on WafaStudies PySpark tutorial.
Imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
= SparkSession.builder\
spark 'Spark')\
.appName("local[*]")\
.master(
.getOrCreate()
from pyspark.sql import dataframe
your 131072x1 screen size is bogus. expect trouble
23/10/25 15:11:26 WARN Utils: Your hostname, PC resolves to a loopback address: 127.0.1.1; using 172.29.148.244 instead (on interface eth0)
23/10/25 15:11:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/25 15:11:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/25 15:11:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/10/25 15:11:28 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
Writing dataframe to CSV files
help(dataframe.DataFrame.write)
Help on property:
Interface for saving the content of the non-streaming :class:`DataFrame` out into external
storage.
.. versionadded:: 1.4.0
.. versionchanged:: 3.4.0
Supports Spark Connect.
Returns
-------
:class:`DataFrameWriter`
Examples
--------
>>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
>>> type(df.write)
<class '...readwriter.DataFrameWriter'>
Write the DataFrame as a table.
>>> _ = spark.sql("DROP TABLE IF EXISTS tab2")
>>> df.write.saveAsTable("tab2")
>>> _ = spark.sql("DROP TABLE tab2")
Let’s start creating a dataframe
= spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df
df.show() df.printSchema()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 5| Bob|
+---+-----+
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
help(df.write.csv)
Help on method csv in module pyspark.sql.readwriter:
csv(path: str, mode: Optional[str] = None, compression: Optional[str] = None, sep: Optional[str] = None, quote: Optional[str] = None, escape: Optional[str] = None, header: Union[bool, str, NoneType] = None, nullValue: Optional[str] = None, escapeQuotes: Union[bool, str, NoneType] = None, quoteAll: Union[bool, str, NoneType] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, ignoreLeadingWhiteSpace: Union[bool, str, NoneType] = None, ignoreTrailingWhiteSpace: Union[bool, str, NoneType] = None, charToEscapeQuoteEscaping: Optional[str] = None, encoding: Optional[str] = None, emptyValue: Optional[str] = None, lineSep: Optional[str] = None) -> None method of pyspark.sql.readwriter.DataFrameWriter instance
Saves the content of the :class:`DataFrame` in CSV format at the specified path.
.. versionadded:: 2.0.0
.. versionchanged:: 3.4.0
Supports Spark Connect.
Parameters
----------
path : str
the path in any Hadoop supported file system
mode : str, optional
specifies the behavior of the save operation when data already exists.
* ``append``: Append contents of this :class:`DataFrame` to existing data.
* ``overwrite``: Overwrite existing data.
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` or ``errorifexists`` (default case): Throw an exception if data already \
exists.
Other Parameters
----------------
Extra options
For the extra options, refer to
`Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option>`_
for the version you use.
.. # noqa
Examples
--------
Write a DataFrame into a CSV file and read it back.
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
... # Write a DataFrame into a CSV file
... df = spark.createDataFrame([{"age": 100, "name": "Hyukjin Kwon"}])
... df.write.csv(d, mode="overwrite")
...
... # Read the CSV file as a DataFrame with 'nullValue' option set to 'Hyukjin Kwon'.
... spark.read.schema(df.schema).format("csv").option(
... "nullValue", "Hyukjin Kwon").load(d).show()
+---+----+
|age|name|
+---+----+
|100|NULL|
+---+----+
\
df.write"df_csv") .csv(
Let’s check the written file:
\
spark.read.schema(df.schema)format("csv")\
."df_csv").show() .load(
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 5| Bob|
+---+-----+
What if we want to change the dataframe?
= spark.createDataFrame([(1, "Goku"), (2, "Naruto")], schema=["id", "name"])
df
df.show() df.printSchema()
+---+------+
| id| name|
+---+------+
| 1| Goku|
| 2|Naruto|
+---+------+
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
\
df.write"df_csv") .csv(
AnalysisException: [PATH_ALREADY_EXISTS] Path file:/home/pedro-wsl/Learning-PySpark/nbs/df_csv already exists. Set mode as "overwrite" to overwrite the existing path.
It will give us an error because the file already exists, so we need to overwrite it:
\
df.write"df_csv", mode="overwrite") .csv(
Let’s check:
\
spark.read.schema(df.schema)format("csv")\
."df_csv").show() .load(
+---+------+
| id| name|
+---+------+
| 2|Naruto|
| 1| Goku|
+---+------+
And how to add more items to the file?
Let’s create another dataframe:
= spark.createDataFrame([("3", "Gojo"), ("4", "Kirito")], schema=["id", "name"])
df2
df2.show() df2.printSchema()
+---+------+
| id| name|
+---+------+
| 3| Gojo|
| 4|Kirito|
+---+------+
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
Then we have to append it to the file:
\
df2.write"df_csv", mode="append") .csv(
\
spark.read.schema(df.schema)format("csv")\
."df_csv").show() .load(
+---+------+
| id| name|
+---+------+
| 2|Naruto|
| 4|Kirito|
| 3| Gojo|
| 1| Goku|
+---+------+
File structure:
Let’s check the csv file structure:
!file -b df_csv
directory
It’s a folder, let’s check it’s content:
!ls -la df_csv
total 52
drwxr-xr-x 2 pedro-wsl pedro-wsl 4096 Oct 25 15:13 .
drwxr-xr-x 5 pedro-wsl pedro-wsl 4096 Oct 25 15:13 ..
-rw-r--r-- 1 pedro-wsl pedro-wsl 8 Oct 25 15:13 ._SUCCESS.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl 8 Oct 25 15:13 .part-00000-1d555ec3-1910-480c-b90f-23b1a97a974c-c000.csv.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl 8 Oct 25 15:13 .part-00000-ab8126fc-3fdb-44d1-8d0a-c7556dbe19c0-c000.csv.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl 12 Oct 25 15:13 .part-00005-1d555ec3-1910-480c-b90f-23b1a97a974c-c000.csv.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl 12 Oct 25 15:13 .part-00005-ab8126fc-3fdb-44d1-8d0a-c7556dbe19c0-c000.csv.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl 12 Oct 25 15:13 .part-00011-1d555ec3-1910-480c-b90f-23b1a97a974c-c000.csv.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl 12 Oct 25 15:13 .part-00011-ab8126fc-3fdb-44d1-8d0a-c7556dbe19c0-c000.csv.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl 0 Oct 25 15:13 _SUCCESS
-rw-r--r-- 1 pedro-wsl pedro-wsl 0 Oct 25 15:13 part-00000-1d555ec3-1910-480c-b90f-23b1a97a974c-c000.csv
-rw-r--r-- 1 pedro-wsl pedro-wsl 0 Oct 25 15:13 part-00000-ab8126fc-3fdb-44d1-8d0a-c7556dbe19c0-c000.csv
-rw-r--r-- 1 pedro-wsl pedro-wsl 7 Oct 25 15:13 part-00005-1d555ec3-1910-480c-b90f-23b1a97a974c-c000.csv
-rw-r--r-- 1 pedro-wsl pedro-wsl 7 Oct 25 15:13 part-00005-ab8126fc-3fdb-44d1-8d0a-c7556dbe19c0-c000.csv
-rw-r--r-- 1 pedro-wsl pedro-wsl 9 Oct 25 15:13 part-00011-1d555ec3-1910-480c-b90f-23b1a97a974c-c000.csv
-rw-r--r-- 1 pedro-wsl pedro-wsl 9 Oct 25 15:13 part-00011-ab8126fc-3fdb-44d1-8d0a-c7556dbe19c0-c000.csv
It’s divided in partitions
The number of partitions is the same number of rows we have on df
This happens because spark have a driver node
that divide the workload between worker nodes
, like:
Driver Node
/ | | \
W1 W2 W3 W4
We can also specify the number of partitions we want:
help(df.repartition)
Help on method repartition in module pyspark.sql.dataframe:
repartition(numPartitions: Union[int, ForwardRef('ColumnOrName')], *cols: 'ColumnOrName') -> 'DataFrame' method of pyspark.sql.dataframe.DataFrame instance
Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The
resulting :class:`DataFrame` is hash partitioned.
.. versionadded:: 1.3.0
.. versionchanged:: 3.4.0
Supports Spark Connect.
Parameters
----------
numPartitions : int
can be an int to specify the target number of partitions or a Column.
If it is a Column, it will be used as the first partitioning column. If not specified,
the default number of partitions is used.
cols : str or :class:`Column`
partitioning columns.
.. versionchanged:: 1.6.0
Added optional arguments to specify the partitioning columns. Also made numPartitions
optional if partitioning columns are specified.
Returns
-------
:class:`DataFrame`
Repartitioned DataFrame.
Examples
--------
>>> df = spark.createDataFrame(
... [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
Repartition the data into 10 partitions.
>>> df.repartition(10).rdd.getNumPartitions()
10
Repartition the data into 7 partitions by 'age' column.
>>> df.repartition(7, "age").rdd.getNumPartitions()
7
Repartition the data into 7 partitions by 'age' and 'name columns.
>>> df.repartition(3, "name", "age").rdd.getNumPartitions()
3
= spark.createDataFrame([(1, "Goku"), (2, "Naruto")], schema=["id", "name"])
df
df.show() df.printSchema()
+---+------+
| id| name|
+---+------+
| 1| Goku|
| 2|Naruto|
+---+------+
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
= df.repartition(1)
df_1part df_1part.rdd.getNumPartitions()
1
\
df_1part\
.write"df_csv_1part", mode="overwrite", header=True) .csv(
\
spark.read"header", True)\
.option(format("csv")\
."df_csv_1part").show() .load(
+---+------+
| id| name|
+---+------+
| 1| Goku|
| 2|Naruto|
+---+------+
!ls -la df_csv_1part/
total 20
drwxr-xr-x 2 pedro-wsl pedro-wsl 4096 Oct 25 15:13 .
drwxr-xr-x 6 pedro-wsl pedro-wsl 4096 Oct 25 15:13 ..
-rw-r--r-- 1 pedro-wsl pedro-wsl 8 Oct 25 15:13 ._SUCCESS.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl 12 Oct 25 15:13 .part-00000-48e65ab0-a0d2-4a48-8f0c-e0b85439f207-c000.csv.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl 0 Oct 25 15:13 _SUCCESS
-rw-r--r-- 1 pedro-wsl pedro-wsl 24 Oct 25 15:13 part-00000-48e65ab0-a0d2-4a48-8f0c-e0b85439f207-c000.csv