Reading JSON files

Based on WafaStudies PySpark tutorial.

Imports

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder\
                    .appName('Spark')\
                    .master("local[*]")\
                    .getOrCreate()
your 131072x1 screen size is bogus. expect trouble
23/10/25 15:13:10 WARN Utils: Your hostname, PC resolves to a loopback address: 127.0.1.1; using 172.29.148.244 instead (on interface eth0)
23/10/25 15:13:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/25 15:13:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/25 15:13:12 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/10/25 15:13:12 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/10/25 15:13:12 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.

Generate data

!mkdir data
!mkdir ml_data
mkdir: cannot create directory ‘data’: File exists
import json
import random
from faker import Faker

faker = Faker('pt_BR')

# Generate data for employees1.json
with open('data/employees1.json', 'w') as jsonfile:
    for id in range(1, 6):
        name = faker.name()
        salary = random.randint(1000, 10000)
        employee = {'id': id, 'name': name, 'salary': float(salary)}
        json.dump(employee, jsonfile)
        jsonfile.write('\n')

# Generate data for employees2.json
employees2_data = []
for id in range(1, 6):
    name = faker.name()
    salary = random.randint(1000, 10000)
    employee = {'id': id, 'name': name, 'salary': float(salary)}
    employees2_data.append(employee)

with open('ml_data/employees2.json', 'w') as jsonfile:
    json.dump(employees2_data, jsonfile, indent=2)

# Generate data for employees3.json
with open('data/employees3.json', 'w') as jsonfile:
    for id in range(6, 11):
        name = faker.name()
        salary = random.randint(1000, 10000)
        employee = {'id': id, 'name': name, 'salary': float(salary)}
        json.dump(employee, jsonfile)
        jsonfile.write('\n')

Reading json files

help(spark.read.json)
Help on method json in module pyspark.sql.readwriter:

json(path: Union[str, List[str], pyspark.rdd.RDD[str]], schema: Union[pyspark.sql.types.StructType, str, NoneType] = None, primitivesAsString: Union[bool, str, NoneType] = None, prefersDecimal: Union[bool, str, NoneType] = None, allowComments: Union[bool, str, NoneType] = None, allowUnquotedFieldNames: Union[bool, str, NoneType] = None, allowSingleQuotes: Union[bool, str, NoneType] = None, allowNumericLeadingZero: Union[bool, str, NoneType] = None, allowBackslashEscapingAnyCharacter: Union[bool, str, NoneType] = None, mode: Optional[str] = None, columnNameOfCorruptRecord: Optional[str] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, multiLine: Union[bool, str, NoneType] = None, allowUnquotedControlChars: Union[bool, str, NoneType] = None, lineSep: Optional[str] = None, samplingRatio: Union[str, float, NoneType] = None, dropFieldIfAllNull: Union[bool, str, NoneType] = None, encoding: Optional[str] = None, locale: Optional[str] = None, pathGlobFilter: Union[bool, str, NoneType] = None, recursiveFileLookup: Union[bool, str, NoneType] = None, modifiedBefore: Union[bool, str, NoneType] = None, modifiedAfter: Union[bool, str, NoneType] = None, allowNonNumericNumbers: Union[bool, str, NoneType] = None) -> 'DataFrame' method of pyspark.sql.readwriter.DataFrameReader instance
    Loads JSON files and returns the results as a :class:`DataFrame`.
    
    `JSON Lines <http://jsonlines.org/>`_ (newline-delimited JSON) is supported by default.
    For JSON (one record per file), set the ``multiLine`` parameter to ``true``.
    
    If the ``schema`` parameter is not specified, this function goes
    through the input once to determine the input schema.
    
    .. versionadded:: 1.4.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Parameters
    ----------
    path : str, list or :class:`RDD`
        string represents path to the JSON dataset, or a list of paths,
        or RDD of Strings storing JSON objects.
    schema : :class:`pyspark.sql.types.StructType` or str, optional
        an optional :class:`pyspark.sql.types.StructType` for the input schema or
        a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
    
    Other Parameters
    ----------------
    Extra options
        For the extra options, refer to
        `Data Source Option <https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option>`_
        for the version you use.
    
        .. # noqa
    
    Examples
    --------
    Write a DataFrame into a JSON file and read it back.
    
    >>> import tempfile
    >>> with tempfile.TemporaryDirectory() as d:
    ...     # Write a DataFrame into a JSON file
    ...     spark.createDataFrame(
    ...         [{"age": 100, "name": "Hyukjin Kwon"}]
    ...     ).write.mode("overwrite").format("json").save(d)
    ...
    ...     # Read the JSON file as a DataFrame.
    ...     spark.read.json(d).show()
    +---+------------+
    |age|        name|
    +---+------------+
    |100|Hyukjin Kwon|
    +---+------------+
df = spark.read.json("data/employees1.json")
df.printSchema()
df.show()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: double (nullable = true)

+---+--------------------+------+
| id|                name|salary|
+---+--------------------+------+
|  1|   Giovanna Teixeira|6238.0|
|  2|Dra. Isabel Silveira|2311.0|
|  3|   Davi Luiz da Rosa|5428.0|
|  4|João Gabriel Cald...|8027.0|
|  5|    Natália Teixeira|8968.0|
+---+--------------------+------+

We can also tell spark the schema:

schema = "id long, name string, salary double"
df = spark.read.json("data/employees1.json", schema=schema)
df.printSchema()
df.show()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: double (nullable = true)

+---+--------------------+------+
| id|                name|salary|
+---+--------------------+------+
|  1|   Giovanna Teixeira|6238.0|
|  2|Dra. Isabel Silveira|2311.0|
|  3|   Davi Luiz da Rosa|5428.0|
|  4|João Gabriel Cald...|8027.0|
|  5|    Natália Teixeira|8968.0|
+---+--------------------+------+

What if the json file is multiline?

with open('ml_data/employees2.json', 'r') as file:
    pretty_json = json.dumps(json.load(file), indent=4)
    print(pretty_json)
[
    {
        "id": 1,
        "name": "Pedro Henrique Moreira",
        "salary": 7979.0
    },
    {
        "id": 2,
        "name": "Jo\u00e3o Miguel da Concei\u00e7\u00e3o",
        "salary": 5590.0
    },
    {
        "id": 3,
        "name": "Luiz Miguel Monteiro",
        "salary": 7854.0
    },
    {
        "id": 4,
        "name": "Jo\u00e3o Miguel Porto",
        "salary": 4581.0
    },
    {
        "id": 5,
        "name": "Dr. Luiz Gustavo Moraes",
        "salary": 9965.0
    }
]

Let’s try to read it:

df = spark.read.json("ml_data/employees2.json")
df.printSchema()
df.show()
root
 |-- _corrupt_record: string (nullable = true)
AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).csv(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().

We got an error

To fix it, we need to tell spark the json file is multiline:

df = spark.read.json("ml_data/employees2.json", multiLine=True)
df.printSchema()
df.show()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: double (nullable = true)

+---+--------------------+------+
| id|                name|salary|
+---+--------------------+------+
|  1|Pedro Henrique Mo...|7979.0|
|  2|João Miguel da Co...|5590.0|
|  3|Luiz Miguel Monteiro|7854.0|
|  4|   João Miguel Porto|4581.0|
|  5|Dr. Luiz Gustavo ...|9965.0|
+---+--------------------+------+

What if we want to load multiple json files?

Just use a list :)

df = spark.read.json(["data/employees1.json", "data/employees3.json"])
df.printSchema()
df.show()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: double (nullable = true)

+---+--------------------+------+
| id|                name|salary|
+---+--------------------+------+
|  1|   Giovanna Teixeira|6238.0|
|  2|Dra. Isabel Silveira|2311.0|
|  3|   Davi Luiz da Rosa|5428.0|
|  4|João Gabriel Cald...|8027.0|
|  5|    Natália Teixeira|8968.0|
|  6|     Marcela Freitas|8091.0|
|  7|     Stephany Fogaça|3337.0|
|  8|     Vicente Moreira|4439.0|
|  9|       Leandro Cunha|2241.0|
| 10|      Amanda Peixoto|5925.0|
+---+--------------------+------+

If all the files are in the same folder, you can pass *.json:

df = spark.read.json("data/*.json")
df.printSchema()
df.show()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: double (nullable = true)

+---+--------------------+------+
| id|                name|salary|
+---+--------------------+------+
|  1|   Giovanna Teixeira|6238.0|
|  2|Dra. Isabel Silveira|2311.0|
|  3|   Davi Luiz da Rosa|5428.0|
|  4|João Gabriel Cald...|8027.0|
|  5|    Natália Teixeira|8968.0|
|  6|     Marcela Freitas|8091.0|
|  7|     Stephany Fogaça|3337.0|
|  8|     Vicente Moreira|4439.0|
|  9|       Leandro Cunha|2241.0|
| 10|      Amanda Peixoto|5925.0|
+---+--------------------+------+