import findspark
findspark.init()
Reading CSV files
Based on WafaStudies PySpark tutorial.
Imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
= SparkSession.builder\
spark 'Spark')\
.appName("local[*]")\
.master( .getOrCreate()
your 131072x1 screen size is bogus. expect trouble
23/10/25 15:11:16 WARN Utils: Your hostname, PC resolves to a loopback address: 127.0.1.1; using 172.29.148.244 instead (on interface eth0)
23/10/25 15:11:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/25 15:11:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/25 15:11:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Generate data
!mkdir data
import csv
import random
from faker import Faker
= Faker()
faker
with open('data/employees1.csv', 'w', newline='') as csvfile:
= ['id', 'name', 'gender', 'salary']
fieldnames = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer
writer.writeheader()
for id in range(1, 6):
= faker.name()
name = random.choice(['Male', 'Female'])
gender = random.randint(1000, 10000)
salary
'id': id, 'name': name, 'gender': gender, 'salary': salary})
writer.writerow({
with open('data/employees2.csv', 'w', newline='') as csvfile:
= ['id', 'name', 'gender', 'salary']
fieldnames = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer
writer.writeheader()
for id in range(1, 6):
= faker.name()
name = random.choice(['Male', 'Female'])
gender = random.randint(1000, 10000)
salary
'id': id, 'name': name, 'gender': gender, 'salary': salary}) writer.writerow({
Reading CSV files
help(spark.read.csv)
Help on method csv in module pyspark.sql.readwriter:
csv(path: Union[str, List[str]], schema: Union[pyspark.sql.types.StructType, str, NoneType] = None, sep: Optional[str] = None, encoding: Optional[str] = None, quote: Optional[str] = None, escape: Optional[str] = None, comment: Optional[str] = None, header: Union[bool, str, NoneType] = None, inferSchema: Union[bool, str, NoneType] = None, ignoreLeadingWhiteSpace: Union[bool, str, NoneType] = None, ignoreTrailingWhiteSpace: Union[bool, str, NoneType] = None, nullValue: Optional[str] = None, nanValue: Optional[str] = None, positiveInf: Optional[str] = None, negativeInf: Optional[str] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, maxColumns: Union[str, int, NoneType] = None, maxCharsPerColumn: Union[str, int, NoneType] = None, maxMalformedLogPerPartition: Union[str, int, NoneType] = None, mode: Optional[str] = None, columnNameOfCorruptRecord: Optional[str] = None, multiLine: Union[bool, str, NoneType] = None, charToEscapeQuoteEscaping: Optional[str] = None, samplingRatio: Union[str, float, NoneType] = None, enforceSchema: Union[bool, str, NoneType] = None, emptyValue: Optional[str] = None, locale: Optional[str] = None, lineSep: Optional[str] = None, pathGlobFilter: Union[bool, str, NoneType] = None, recursiveFileLookup: Union[bool, str, NoneType] = None, modifiedBefore: Union[bool, str, NoneType] = None, modifiedAfter: Union[bool, str, NoneType] = None, unescapedQuoteHandling: Optional[str] = None) -> 'DataFrame' method of pyspark.sql.readwriter.DataFrameReader instance
Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
``inferSchema`` is enabled. To avoid going through the entire data once, disable
``inferSchema`` option or specify the schema explicitly using ``schema``.
.. versionadded:: 2.0.0
.. versionchanged:: 3.4.0
Supports Spark Connect.
Parameters
----------
path : str or list
string, or list of strings, for input path(s),
or RDD of Strings storing CSV rows.
schema : :class:`pyspark.sql.types.StructType` or str, optional
an optional :class:`pyspark.sql.types.StructType` for the input schema
or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
Other Parameters
----------------
Extra options
For the extra options, refer to
`Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option>`_
for the version you use.
.. # noqa
Examples
--------
Write a DataFrame into a CSV file and read it back.
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
... # Write a DataFrame into a CSV file
... df = spark.createDataFrame([{"age": 100, "name": "Hyukjin Kwon"}])
... df.write.mode("overwrite").format("csv").save(d)
...
... # Read the CSV file as a DataFrame with 'nullValue' option set to 'Hyukjin Kwon'.
... spark.read.csv(d, schema=df.schema, nullValue="Hyukjin Kwon").show()
+---+----+
|age|name|
+---+----+
|100|NULL|
+---+----+
= spark.read.csv(path='data/employees1.csv')
df
df.show() df.printSchema()
+---+----------------+------+------+
|_c0| _c1| _c2| _c3|
+---+----------------+------+------+
| id| name|gender|salary|
| 1| Kara Moyer| Male| 3265|
| 2| Kathryn Bell| Male| 4657|
| 3| Gerald Newman| Male| 4000|
| 4|Angela Rodriguez|Female| 2596|
| 5| Terry Smith|Female| 3685|
+---+----------------+------+------+
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
By default, spark read csv without header and all datatypes as string.
To avoid it, we use:
header=True
: first line will be taken as header
inferSchema=True
: spark will infer the datatypes of each column
= spark.read.csv(path='data/employees1.csv', header=True, inferSchema=True)
df
df.show() df.printSchema()
+---+----------------+------+------+
| id| name|gender|salary|
+---+----------------+------+------+
| 1| Kara Moyer| Male| 3265|
| 2| Kathryn Bell| Male| 4657|
| 3| Gerald Newman| Male| 4000|
| 4|Angela Rodriguez|Female| 2596|
| 5| Terry Smith|Female| 3685|
+---+----------------+------+------+
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: integer (nullable = true)
inferSchema
takes some time and processing power, so we can tell spark the schema:
= 'id integer, name string, gender string, salary double' schema
= spark.read.csv(path='data/employees1.csv', header=True, schema=schema)
df
df.show() df.printSchema()
+---+----------------+------+------+
| id| name|gender|salary|
+---+----------------+------+------+
| 1| Kara Moyer| Male|3265.0|
| 2| Kathryn Bell| Male|4657.0|
| 3| Gerald Newman| Male|4000.0|
| 4|Angela Rodriguez|Female|2596.0|
| 5| Terry Smith|Female|3685.0|
+---+----------------+------+------+
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: double (nullable = true)
We can also read multiple files in one dataframe:
= spark.read.csv(path=['data/employees1.csv', 'data/employees2.csv'], header=True, schema=schema)
df
df.show() df.printSchema()
+---+----------------+------+------+
| id| name|gender|salary|
+---+----------------+------+------+
| 1| Brandon Davis| Male|1786.0|
| 2| Amanda Hansen| Male|9218.0|
| 3|Valerie Peterson|Female|4119.0|
| 4| Robert Mason| Male|7547.0|
| 5| James Thomas|Female|4181.0|
| 1| Kara Moyer| Male|3265.0|
| 2| Kathryn Bell| Male|4657.0|
| 3| Gerald Newman| Male|4000.0|
| 4|Angela Rodriguez|Female|2596.0|
| 5| Terry Smith|Female|3685.0|
+---+----------------+------+------+
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: double (nullable = true)
If all the files are in the same folder, it’s possible to use the folder path:
= spark.read.csv(path=['data/'], header=True, schema=schema)
df
df.show() df.printSchema()
+---+----------------+------+------+
| id| name|gender|salary|
+---+----------------+------+------+
| 1| Brandon Davis| Male|1786.0|
| 2| Amanda Hansen| Male|9218.0|
| 3|Valerie Peterson|Female|4119.0|
| 4| Robert Mason| Male|7547.0|
| 5| James Thomas|Female|4181.0|
| 1| Kara Moyer| Male|3265.0|
| 2| Kathryn Bell| Male|4657.0|
| 3| Gerald Newman| Male|4000.0|
| 4|Angela Rodriguez|Female|2596.0|
| 5| Terry Smith|Female|3685.0|
+---+----------------+------+------+
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: double (nullable = true)