import findspark
findspark.init()
Reading JSON files
Based on WafaStudies PySpark tutorial.
Imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
= SparkSession.builder\
spark 'Spark')\
.appName("local[*]")\
.master( .getOrCreate()
your 131072x1 screen size is bogus. expect trouble
23/10/25 15:13:10 WARN Utils: Your hostname, PC resolves to a loopback address: 127.0.1.1; using 172.29.148.244 instead (on interface eth0)
23/10/25 15:13:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/25 15:13:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/25 15:13:12 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/10/25 15:13:12 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/10/25 15:13:12 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
Generate data
!mkdir data
!mkdir ml_data
mkdir: cannot create directory ‘data’: File exists
import json
import random
from faker import Faker
= Faker('pt_BR')
faker
# Generate data for employees1.json
with open('data/employees1.json', 'w') as jsonfile:
for id in range(1, 6):
= faker.name()
name = random.randint(1000, 10000)
salary = {'id': id, 'name': name, 'salary': float(salary)}
employee
json.dump(employee, jsonfile)'\n')
jsonfile.write(
# Generate data for employees2.json
= []
employees2_data for id in range(1, 6):
= faker.name()
name = random.randint(1000, 10000)
salary = {'id': id, 'name': name, 'salary': float(salary)}
employee
employees2_data.append(employee)
with open('ml_data/employees2.json', 'w') as jsonfile:
=2)
json.dump(employees2_data, jsonfile, indent
# Generate data for employees3.json
with open('data/employees3.json', 'w') as jsonfile:
for id in range(6, 11):
= faker.name()
name = random.randint(1000, 10000)
salary = {'id': id, 'name': name, 'salary': float(salary)}
employee
json.dump(employee, jsonfile)'\n') jsonfile.write(
Reading json files
help(spark.read.json)
Help on method json in module pyspark.sql.readwriter:
json(path: Union[str, List[str], pyspark.rdd.RDD[str]], schema: Union[pyspark.sql.types.StructType, str, NoneType] = None, primitivesAsString: Union[bool, str, NoneType] = None, prefersDecimal: Union[bool, str, NoneType] = None, allowComments: Union[bool, str, NoneType] = None, allowUnquotedFieldNames: Union[bool, str, NoneType] = None, allowSingleQuotes: Union[bool, str, NoneType] = None, allowNumericLeadingZero: Union[bool, str, NoneType] = None, allowBackslashEscapingAnyCharacter: Union[bool, str, NoneType] = None, mode: Optional[str] = None, columnNameOfCorruptRecord: Optional[str] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, multiLine: Union[bool, str, NoneType] = None, allowUnquotedControlChars: Union[bool, str, NoneType] = None, lineSep: Optional[str] = None, samplingRatio: Union[str, float, NoneType] = None, dropFieldIfAllNull: Union[bool, str, NoneType] = None, encoding: Optional[str] = None, locale: Optional[str] = None, pathGlobFilter: Union[bool, str, NoneType] = None, recursiveFileLookup: Union[bool, str, NoneType] = None, modifiedBefore: Union[bool, str, NoneType] = None, modifiedAfter: Union[bool, str, NoneType] = None, allowNonNumericNumbers: Union[bool, str, NoneType] = None) -> 'DataFrame' method of pyspark.sql.readwriter.DataFrameReader instance
Loads JSON files and returns the results as a :class:`DataFrame`.
`JSON Lines <http://jsonlines.org/>`_ (newline-delimited JSON) is supported by default.
For JSON (one record per file), set the ``multiLine`` parameter to ``true``.
If the ``schema`` parameter is not specified, this function goes
through the input once to determine the input schema.
.. versionadded:: 1.4.0
.. versionchanged:: 3.4.0
Supports Spark Connect.
Parameters
----------
path : str, list or :class:`RDD`
string represents path to the JSON dataset, or a list of paths,
or RDD of Strings storing JSON objects.
schema : :class:`pyspark.sql.types.StructType` or str, optional
an optional :class:`pyspark.sql.types.StructType` for the input schema or
a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
Other Parameters
----------------
Extra options
For the extra options, refer to
`Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option>`_
for the version you use.
.. # noqa
Examples
--------
Write a DataFrame into a JSON file and read it back.
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as d:
... # Write a DataFrame into a JSON file
... spark.createDataFrame(
... [{"age": 100, "name": "Hyukjin Kwon"}]
... ).write.mode("overwrite").format("json").save(d)
...
... # Read the JSON file as a DataFrame.
... spark.read.json(d).show()
+---+------------+
|age| name|
+---+------------+
|100|Hyukjin Kwon|
+---+------------+
= spark.read.json("data/employees1.json")
df
df.printSchema() df.show()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- salary: double (nullable = true)
+---+--------------------+------+
| id| name|salary|
+---+--------------------+------+
| 1| Giovanna Teixeira|6238.0|
| 2|Dra. Isabel Silveira|2311.0|
| 3| Davi Luiz da Rosa|5428.0|
| 4|João Gabriel Cald...|8027.0|
| 5| Natália Teixeira|8968.0|
+---+--------------------+------+
We can also tell spark the schema:
= "id long, name string, salary double" schema
= spark.read.json("data/employees1.json", schema=schema)
df
df.printSchema() df.show()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- salary: double (nullable = true)
+---+--------------------+------+
| id| name|salary|
+---+--------------------+------+
| 1| Giovanna Teixeira|6238.0|
| 2|Dra. Isabel Silveira|2311.0|
| 3| Davi Luiz da Rosa|5428.0|
| 4|João Gabriel Cald...|8027.0|
| 5| Natália Teixeira|8968.0|
+---+--------------------+------+
What if the json file is multiline?
with open('ml_data/employees2.json', 'r') as file:
= json.dumps(json.load(file), indent=4)
pretty_json print(pretty_json)
[
{
"id": 1,
"name": "Pedro Henrique Moreira",
"salary": 7979.0
},
{
"id": 2,
"name": "Jo\u00e3o Miguel da Concei\u00e7\u00e3o",
"salary": 5590.0
},
{
"id": 3,
"name": "Luiz Miguel Monteiro",
"salary": 7854.0
},
{
"id": 4,
"name": "Jo\u00e3o Miguel Porto",
"salary": 4581.0
},
{
"id": 5,
"name": "Dr. Luiz Gustavo Moraes",
"salary": 9965.0
}
]
Let’s try to read it:
= spark.read.json("ml_data/employees2.json")
df
df.printSchema() df.show()
root
|-- _corrupt_record: string (nullable = true)
AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).csv(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().
We got an error
To fix it, we need to tell spark the json file is multiline:
= spark.read.json("ml_data/employees2.json", multiLine=True)
df
df.printSchema() df.show()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- salary: double (nullable = true)
+---+--------------------+------+
| id| name|salary|
+---+--------------------+------+
| 1|Pedro Henrique Mo...|7979.0|
| 2|João Miguel da Co...|5590.0|
| 3|Luiz Miguel Monteiro|7854.0|
| 4| João Miguel Porto|4581.0|
| 5|Dr. Luiz Gustavo ...|9965.0|
+---+--------------------+------+
What if we want to load multiple json files?
Just use a list :)
= spark.read.json(["data/employees1.json", "data/employees3.json"])
df
df.printSchema() df.show()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- salary: double (nullable = true)
+---+--------------------+------+
| id| name|salary|
+---+--------------------+------+
| 1| Giovanna Teixeira|6238.0|
| 2|Dra. Isabel Silveira|2311.0|
| 3| Davi Luiz da Rosa|5428.0|
| 4|João Gabriel Cald...|8027.0|
| 5| Natália Teixeira|8968.0|
| 6| Marcela Freitas|8091.0|
| 7| Stephany Fogaça|3337.0|
| 8| Vicente Moreira|4439.0|
| 9| Leandro Cunha|2241.0|
| 10| Amanda Peixoto|5925.0|
+---+--------------------+------+
If all the files are in the same folder, you can pass *.json
:
= spark.read.json("data/*.json")
df
df.printSchema() df.show()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- salary: double (nullable = true)
+---+--------------------+------+
| id| name|salary|
+---+--------------------+------+
| 1| Giovanna Teixeira|6238.0|
| 2|Dra. Isabel Silveira|2311.0|
| 3| Davi Luiz da Rosa|5428.0|
| 4|João Gabriel Cald...|8027.0|
| 5| Natália Teixeira|8968.0|
| 6| Marcela Freitas|8091.0|
| 7| Stephany Fogaça|3337.0|
| 8| Vicente Moreira|4439.0|
| 9| Leandro Cunha|2241.0|
| 10| Amanda Peixoto|5925.0|
+---+--------------------+------+