Writing dataframe to CSV

Based on WafaStudies PySpark tutorial.

Imports

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder\
                    .appName('Spark')\
                    .master("local[*]")\
                    .getOrCreate()

from pyspark.sql import dataframe
your 131072x1 screen size is bogus. expect trouble
23/10/25 15:11:26 WARN Utils: Your hostname, PC resolves to a loopback address: 127.0.1.1; using 172.29.148.244 instead (on interface eth0)
23/10/25 15:11:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/25 15:11:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/25 15:11:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/10/25 15:11:28 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.

Writing dataframe to CSV files

help(dataframe.DataFrame.write)
Help on property:

    Interface for saving the content of the non-streaming :class:`DataFrame` out into external
    storage.
    
    .. versionadded:: 1.4.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Returns
    -------
    :class:`DataFrameWriter`
    
    Examples
    --------
    >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
    >>> type(df.write)
    <class '...readwriter.DataFrameWriter'>
    
    Write the DataFrame as a table.
    
    >>> _ = spark.sql("DROP TABLE IF EXISTS tab2")
    >>> df.write.saveAsTable("tab2")
    >>> _ = spark.sql("DROP TABLE tab2")

Let’s start creating a dataframe

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])

df.show()
df.printSchema()
                                                                                
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
help(df.write.csv)
Help on method csv in module pyspark.sql.readwriter:

csv(path: str, mode: Optional[str] = None, compression: Optional[str] = None, sep: Optional[str] = None, quote: Optional[str] = None, escape: Optional[str] = None, header: Union[bool, str, NoneType] = None, nullValue: Optional[str] = None, escapeQuotes: Union[bool, str, NoneType] = None, quoteAll: Union[bool, str, NoneType] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, ignoreLeadingWhiteSpace: Union[bool, str, NoneType] = None, ignoreTrailingWhiteSpace: Union[bool, str, NoneType] = None, charToEscapeQuoteEscaping: Optional[str] = None, encoding: Optional[str] = None, emptyValue: Optional[str] = None, lineSep: Optional[str] = None) -> None method of pyspark.sql.readwriter.DataFrameWriter instance
    Saves the content of the :class:`DataFrame` in CSV format at the specified path.
    
    .. versionadded:: 2.0.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Parameters
    ----------
    path : str
        the path in any Hadoop supported file system
    mode : str, optional
        specifies the behavior of the save operation when data already exists.
    
        * ``append``: Append contents of this :class:`DataFrame` to existing data.
        * ``overwrite``: Overwrite existing data.
        * ``ignore``: Silently ignore this operation if data already exists.
        * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \
            exists.
    
    Other Parameters
    ----------------
    Extra options
        For the extra options, refer to
        `Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option>`_
        for the version you use.
    
        .. # noqa
    
    Examples
    --------
    Write a DataFrame into a CSV file and read it back.
    
    >>> import tempfile
    >>> with tempfile.TemporaryDirectory() as d:
    ...     # Write a DataFrame into a CSV file
    ...     df = spark.createDataFrame([{"age": 100, "name": "Hyukjin Kwon"}])
    ...     df.write.csv(d, mode="overwrite")
    ...
    ...     # Read the CSV file as a DataFrame with 'nullValue' option set to 'Hyukjin Kwon'.
    ...     spark.read.schema(df.schema).format("csv").option(
    ...         "nullValue", "Hyukjin Kwon").load(d).show()
    +---+----+
    |age|name|
    +---+----+
    |100|NULL|
    +---+----+
df.write\
.csv("df_csv")

Let’s check the written file:

spark.read.schema(df.schema)\
.format("csv")\
.load("df_csv").show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+

What if we want to change the dataframe?

df = spark.createDataFrame([(1, "Goku"), (2, "Naruto")], schema=["id", "name"])

df.show()
df.printSchema()
+---+------+
| id|  name|
+---+------+
|  1|  Goku|
|  2|Naruto|
+---+------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
df.write\
.csv("df_csv")
AnalysisException: [PATH_ALREADY_EXISTS] Path file:/home/pedro-wsl/Learning-PySpark/nbs/df_csv already exists. Set mode as "overwrite" to overwrite the existing path.

It will give us an error because the file already exists, so we need to overwrite it:

df.write\
.csv("df_csv", mode="overwrite")

Let’s check:

spark.read.schema(df.schema)\
.format("csv")\
.load("df_csv").show()
+---+------+
| id|  name|
+---+------+
|  2|Naruto|
|  1|  Goku|
+---+------+

And how to add more items to the file?

Let’s create another dataframe:

df2 = spark.createDataFrame([("3", "Gojo"), ("4", "Kirito")], schema=["id", "name"])

df2.show()
df2.printSchema()
+---+------+
| id|  name|
+---+------+
|  3|  Gojo|
|  4|Kirito|
+---+------+

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

Then we have to append it to the file:

df2.write\
.csv("df_csv", mode="append")
spark.read.schema(df.schema)\
.format("csv")\
.load("df_csv").show()
+---+------+
| id|  name|
+---+------+
|  2|Naruto|
|  4|Kirito|
|  3|  Gojo|
|  1|  Goku|
+---+------+

File structure:

Let’s check the csv file structure:

!file -b df_csv
directory

It’s a folder, let’s check it’s content:

!ls -la df_csv
total 52
drwxr-xr-x 2 pedro-wsl pedro-wsl 4096 Oct 25 15:13 .
drwxr-xr-x 5 pedro-wsl pedro-wsl 4096 Oct 25 15:13 ..
-rw-r--r-- 1 pedro-wsl pedro-wsl    8 Oct 25 15:13 ._SUCCESS.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl    8 Oct 25 15:13 .part-00000-1d555ec3-1910-480c-b90f-23b1a97a974c-c000.csv.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl    8 Oct 25 15:13 .part-00000-ab8126fc-3fdb-44d1-8d0a-c7556dbe19c0-c000.csv.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl   12 Oct 25 15:13 .part-00005-1d555ec3-1910-480c-b90f-23b1a97a974c-c000.csv.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl   12 Oct 25 15:13 .part-00005-ab8126fc-3fdb-44d1-8d0a-c7556dbe19c0-c000.csv.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl   12 Oct 25 15:13 .part-00011-1d555ec3-1910-480c-b90f-23b1a97a974c-c000.csv.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl   12 Oct 25 15:13 .part-00011-ab8126fc-3fdb-44d1-8d0a-c7556dbe19c0-c000.csv.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl    0 Oct 25 15:13 _SUCCESS
-rw-r--r-- 1 pedro-wsl pedro-wsl    0 Oct 25 15:13 part-00000-1d555ec3-1910-480c-b90f-23b1a97a974c-c000.csv
-rw-r--r-- 1 pedro-wsl pedro-wsl    0 Oct 25 15:13 part-00000-ab8126fc-3fdb-44d1-8d0a-c7556dbe19c0-c000.csv
-rw-r--r-- 1 pedro-wsl pedro-wsl    7 Oct 25 15:13 part-00005-1d555ec3-1910-480c-b90f-23b1a97a974c-c000.csv
-rw-r--r-- 1 pedro-wsl pedro-wsl    7 Oct 25 15:13 part-00005-ab8126fc-3fdb-44d1-8d0a-c7556dbe19c0-c000.csv
-rw-r--r-- 1 pedro-wsl pedro-wsl    9 Oct 25 15:13 part-00011-1d555ec3-1910-480c-b90f-23b1a97a974c-c000.csv
-rw-r--r-- 1 pedro-wsl pedro-wsl    9 Oct 25 15:13 part-00011-ab8126fc-3fdb-44d1-8d0a-c7556dbe19c0-c000.csv

It’s divided in partitions

The number of partitions is the same number of rows we have on df

This happens because spark have a driver node that divide the workload between worker nodes, like:

  Driver Node
 /   |    |   \
W1   W2   W3   W4

We can also specify the number of partitions we want:

help(df.repartition)
Help on method repartition in module pyspark.sql.dataframe:

repartition(numPartitions: Union[int, ForwardRef('ColumnOrName')], *cols: 'ColumnOrName') -> 'DataFrame' method of pyspark.sql.dataframe.DataFrame instance
    Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The
    resulting :class:`DataFrame` is hash partitioned.
    
    .. versionadded:: 1.3.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Parameters
    ----------
    numPartitions : int
        can be an int to specify the target number of partitions or a Column.
        If it is a Column, it will be used as the first partitioning column. If not specified,
        the default number of partitions is used.
    cols : str or :class:`Column`
        partitioning columns.
    
        .. versionchanged:: 1.6.0
           Added optional arguments to specify the partitioning columns. Also made numPartitions
           optional if partitioning columns are specified.
    
    Returns
    -------
    :class:`DataFrame`
        Repartitioned DataFrame.
    
    Examples
    --------
    >>> df = spark.createDataFrame(
    ...     [(14, "Tom"), (23, "Alice"), (16, "Bob")], ["age", "name"])
    
    Repartition the data into 10 partitions.
    
    >>> df.repartition(10).rdd.getNumPartitions()
    10
    
    Repartition the data into 7 partitions by 'age' column.
    
    >>> df.repartition(7, "age").rdd.getNumPartitions()
    7
    
    Repartition the data into 7 partitions by 'age' and 'name columns.
    
    >>> df.repartition(3, "name", "age").rdd.getNumPartitions()
    3
df = spark.createDataFrame([(1, "Goku"), (2, "Naruto")], schema=["id", "name"])

df.show()
df.printSchema()
+---+------+
| id|  name|
+---+------+
|  1|  Goku|
|  2|Naruto|
+---+------+

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
df_1part = df.repartition(1)
df_1part.rdd.getNumPartitions()
1
df_1part\
.write\
.csv("df_csv_1part", mode="overwrite", header=True)
spark.read\
.option("header", True)\
.format("csv")\
.load("df_csv_1part").show()
+---+------+
| id|  name|
+---+------+
|  1|  Goku|
|  2|Naruto|
+---+------+
!ls -la df_csv_1part/
total 20
drwxr-xr-x 2 pedro-wsl pedro-wsl 4096 Oct 25 15:13 .
drwxr-xr-x 6 pedro-wsl pedro-wsl 4096 Oct 25 15:13 ..
-rw-r--r-- 1 pedro-wsl pedro-wsl    8 Oct 25 15:13 ._SUCCESS.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl   12 Oct 25 15:13 .part-00000-48e65ab0-a0d2-4a48-8f0c-e0b85439f207-c000.csv.crc
-rw-r--r-- 1 pedro-wsl pedro-wsl    0 Oct 25 15:13 _SUCCESS
-rw-r--r-- 1 pedro-wsl pedro-wsl   24 Oct 25 15:13 part-00000-48e65ab0-a0d2-4a48-8f0c-e0b85439f207-c000.csv