Reading CSV files

Based on WafaStudies PySpark tutorial.

Imports

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder\
                    .appName('Spark')\
                    .master("local[*]")\
                    .getOrCreate()
your 131072x1 screen size is bogus. expect trouble
23/10/25 15:11:16 WARN Utils: Your hostname, PC resolves to a loopback address: 127.0.1.1; using 172.29.148.244 instead (on interface eth0)
23/10/25 15:11:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/25 15:11:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/25 15:11:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

Generate data

!mkdir data
import csv
import random
from faker import Faker

faker = Faker()

with open('data/employees1.csv', 'w', newline='') as csvfile:
    fieldnames = ['id', 'name', 'gender', 'salary']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()

    for id in range(1, 6):
        name = faker.name()
        gender = random.choice(['Male', 'Female'])
        salary = random.randint(1000, 10000)

        writer.writerow({'id': id, 'name': name, 'gender': gender, 'salary': salary})

with open('data/employees2.csv', 'w', newline='') as csvfile:
    fieldnames = ['id', 'name', 'gender', 'salary']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

    writer.writeheader()

    for id in range(1, 6):
        name = faker.name()
        gender = random.choice(['Male', 'Female'])
        salary = random.randint(1000, 10000)

        writer.writerow({'id': id, 'name': name, 'gender': gender, 'salary': salary})

Reading CSV files

help(spark.read.csv)
Help on method csv in module pyspark.sql.readwriter:

csv(path: Union[str, List[str]], schema: Union[pyspark.sql.types.StructType, str, NoneType] = None, sep: Optional[str] = None, encoding: Optional[str] = None, quote: Optional[str] = None, escape: Optional[str] = None, comment: Optional[str] = None, header: Union[bool, str, NoneType] = None, inferSchema: Union[bool, str, NoneType] = None, ignoreLeadingWhiteSpace: Union[bool, str, NoneType] = None, ignoreTrailingWhiteSpace: Union[bool, str, NoneType] = None, nullValue: Optional[str] = None, nanValue: Optional[str] = None, positiveInf: Optional[str] = None, negativeInf: Optional[str] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, maxColumns: Union[str, int, NoneType] = None, maxCharsPerColumn: Union[str, int, NoneType] = None, maxMalformedLogPerPartition: Union[str, int, NoneType] = None, mode: Optional[str] = None, columnNameOfCorruptRecord: Optional[str] = None, multiLine: Union[bool, str, NoneType] = None, charToEscapeQuoteEscaping: Optional[str] = None, samplingRatio: Union[str, float, NoneType] = None, enforceSchema: Union[bool, str, NoneType] = None, emptyValue: Optional[str] = None, locale: Optional[str] = None, lineSep: Optional[str] = None, pathGlobFilter: Union[bool, str, NoneType] = None, recursiveFileLookup: Union[bool, str, NoneType] = None, modifiedBefore: Union[bool, str, NoneType] = None, modifiedAfter: Union[bool, str, NoneType] = None, unescapedQuoteHandling: Optional[str] = None) -> 'DataFrame' method of pyspark.sql.readwriter.DataFrameReader instance
    Loads a CSV file and returns the result as a  :class:`DataFrame`.
    
    This function will go through the input once to determine the input schema if
    ``inferSchema`` is enabled. To avoid going through the entire data once, disable
    ``inferSchema`` option or specify the schema explicitly using ``schema``.
    
    .. versionadded:: 2.0.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Parameters
    ----------
    path : str or list
        string, or list of strings, for input path(s),
        or RDD of Strings storing CSV rows.
    schema : :class:`pyspark.sql.types.StructType` or str, optional
        an optional :class:`pyspark.sql.types.StructType` for the input schema
        or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
    
    Other Parameters
    ----------------
    Extra options
        For the extra options, refer to
        `Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option>`_
        for the version you use.
    
        .. # noqa
    
    Examples
    --------
    Write a DataFrame into a CSV file and read it back.
    
    >>> import tempfile
    >>> with tempfile.TemporaryDirectory() as d:
    ...     # Write a DataFrame into a CSV file
    ...     df = spark.createDataFrame([{"age": 100, "name": "Hyukjin Kwon"}])
    ...     df.write.mode("overwrite").format("csv").save(d)
    ...
    ...     # Read the CSV file as a DataFrame with 'nullValue' option set to 'Hyukjin Kwon'.
    ...     spark.read.csv(d, schema=df.schema, nullValue="Hyukjin Kwon").show()
    +---+----+
    |age|name|
    +---+----+
    |100|NULL|
    +---+----+
df = spark.read.csv(path='data/employees1.csv')
df.show()
df.printSchema()
+---+----------------+------+------+
|_c0|             _c1|   _c2|   _c3|
+---+----------------+------+------+
| id|            name|gender|salary|
|  1|      Kara Moyer|  Male|  3265|
|  2|    Kathryn Bell|  Male|  4657|
|  3|   Gerald Newman|  Male|  4000|
|  4|Angela Rodriguez|Female|  2596|
|  5|     Terry Smith|Female|  3685|
+---+----------------+------+------+

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)

By default, spark read csv without header and all datatypes as string.

To avoid it, we use:

header=True: first line will be taken as header

inferSchema=True: spark will infer the datatypes of each column

df = spark.read.csv(path='data/employees1.csv', header=True, inferSchema=True)
df.show()
df.printSchema()
+---+----------------+------+------+
| id|            name|gender|salary|
+---+----------------+------+------+
|  1|      Kara Moyer|  Male|  3265|
|  2|    Kathryn Bell|  Male|  4657|
|  3|   Gerald Newman|  Male|  4000|
|  4|Angela Rodriguez|Female|  2596|
|  5|     Terry Smith|Female|  3685|
+---+----------------+------+------+

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

inferSchema takes some time and processing power, so we can tell spark the schema:

schema = 'id integer, name string, gender string, salary double'
df = spark.read.csv(path='data/employees1.csv', header=True, schema=schema)

df.show()
df.printSchema()
+---+----------------+------+------+
| id|            name|gender|salary|
+---+----------------+------+------+
|  1|      Kara Moyer|  Male|3265.0|
|  2|    Kathryn Bell|  Male|4657.0|
|  3|   Gerald Newman|  Male|4000.0|
|  4|Angela Rodriguez|Female|2596.0|
|  5|     Terry Smith|Female|3685.0|
+---+----------------+------+------+

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: double (nullable = true)

We can also read multiple files in one dataframe:

df = spark.read.csv(path=['data/employees1.csv', 'data/employees2.csv'], header=True, schema=schema)

df.show()
df.printSchema()
+---+----------------+------+------+
| id|            name|gender|salary|
+---+----------------+------+------+
|  1|   Brandon Davis|  Male|1786.0|
|  2|   Amanda Hansen|  Male|9218.0|
|  3|Valerie Peterson|Female|4119.0|
|  4|    Robert Mason|  Male|7547.0|
|  5|    James Thomas|Female|4181.0|
|  1|      Kara Moyer|  Male|3265.0|
|  2|    Kathryn Bell|  Male|4657.0|
|  3|   Gerald Newman|  Male|4000.0|
|  4|Angela Rodriguez|Female|2596.0|
|  5|     Terry Smith|Female|3685.0|
+---+----------------+------+------+

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: double (nullable = true)

If all the files are in the same folder, it’s possible to use the folder path:

df = spark.read.csv(path=['data/'], header=True, schema=schema)

df.show()
df.printSchema()
+---+----------------+------+------+
| id|            name|gender|salary|
+---+----------------+------+------+
|  1|   Brandon Davis|  Male|1786.0|
|  2|   Amanda Hansen|  Male|9218.0|
|  3|Valerie Peterson|Female|4119.0|
|  4|    Robert Mason|  Male|7547.0|
|  5|    James Thomas|Female|4181.0|
|  1|      Kara Moyer|  Male|3265.0|
|  2|    Kathryn Bell|  Male|4657.0|
|  3|   Gerald Newman|  Male|4000.0|
|  4|Angela Rodriguez|Female|2596.0|
|  5|     Terry Smith|Female|3685.0|
+---+----------------+------+------+

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: double (nullable = true)