Using JSON data with Python

This notebook is concerned with JSON a format that serves many purposes. Just as csv files, json files are important sources and sinks for Spark. As a exchange format, JSON is also a serialization tool for Python and many other languages. JSON provides a way to accomodate semi-structured data in otherwise tabular environments (dataframes and databases tables).

The notebook is organized in the following way:

Serialization and deserialization of built-in types

Code
import json

obj = {
    "name": "Foo Bar",
    "age": 78,
    "friends": ["Jane","John"],
    "balance": 345.80,
    "other_names":("Doe","Joe"),
    "active": True,
    "spouse": None
}

print(json.dumps(obj, sort_keys=True, indent=4))
Note

json.dumps() outputs a JSON formatted string.

Not every type of object can be fed to json.dumps(). Licit types are:

Code
with open('user.json','w') as file:
    json.dump(obj, file, sort_keys=True, indent=4)
Code
!cat user.json
Code
json.loads('{"active": true, "age": 78, "balance": 345.8, "friends": ["Jane","John"], "name": "Foo Bar", "other_names": ["Doe","Joe"],"spouse":null}')
Code
with open('user.json', 'r') as file:
    user_data = json.load(file)

print(user_data)

What happens if we feed json.dumps() with a numpy array?

Serialization and deserialization of custom objects

Code
class User(object):
    """Custom User Class
    """
    def __init__(self, name, age, active, balance, 
                 other_names, friends, spouse):
        self.name = name
        self.age = age
        self.active = active
        self.balance = balance
        self.other_names = other_names
        self.friends = friends
        self.spouse = spouse
            
    def __repr__(self):
        s = "User("
        s += "name=" + repr(self.name)
        s += ", age=" + repr(self.age)
        s += ", active=" + repr(self.active)
        s += ", other_names=" + repr(self.other_names)
        s += ", friends=" + repr(self.friends)
        s += ", spouse=" + repr(self.spouse) + ")"
        return s
Code
new_user = User(
    name = "Foo Bar",
    age = 78,
    friends = ["Jane", "John"],
    balance = 345.80,
    other_names = ("Doe", "Joe"),
    active = True,
    spouse = None
)

new_user
Code
# This will raise a TypeError
# json.dumps(new_user)

As expected, the custom object new_user is not JSON serializable. So let’s build a method that does that for us.

  • This comes as no surprise to us, since earlier on we observed that the json module only handles the built-in types, and User is not one.

  • We need to send our user data to a client over a network, so how do we get ourselves out of this error state?

  • A simple solution would be to convert our custom type into a serializable type that is a built-in type. We can conveniently define a method convert_to_dict() that returns a dictionary representation of our object. json.dumps() takes in a optional argument, default, which specifies a function to be called if the object is not serializable. This function returns a JSON encodable version of the object.

Recall that class obj has a dunder method __dict__ that provides a basis for obtaining a dictionary with the attributes of any object:

Code
new_user.__dict__
Code
def obj_to_dict(obj):
    """Converts an object to a dictionary representation of the object including 
    meta-data information about the object's module and class name.

    Parameters
    ----------
    obj : `object`
        A python object to be converted into a dictionary representation

    Returns
    -------
    output : `dict`
        A dictionary representation of the object
    """
    # Add object meta data 
    obj_dict = {
        "__class__": obj.__class__.__name__,
        "__module__": obj.__module__
    }
    # Add the object properties
    return obj_dict | obj.__dict__
Code
obj_to_dict(new_user)

The function convert_to_dict does the following:

  • create a dictionary named obj_dict to act as the dict representation of our object.

  • magic methods __class__.__name__ and __module__ provide crucial metadata on the object: the class name and the module name

    add the instance attributes of the object using obj.__dict__ (Python stores instance attributes in a dictionary under the hood)

  • The resulting obj_dict is now serializable (provided all attributes of our object are).

Now we can comfortably call json.dumps() on the object and pass default=convert_to_dict

Note

Obviously this fails if one of the attributes is not JSON serializable

Code
print(json.dumps(new_user, default=obj_to_dict, indent=4, sort_keys=True))

Now, if we want to decode (deserialiaze) a custom object, and create the correct object type, we need a function that does the opposite of convert_to_dict, since json.loads simply returns a dict:

Code
user_data = json.loads(json.dumps(new_user, default=obj_to_dict))
print(user_data)

We need json.loads() to reconstruct a User object from this dictionary: json.loads() takes an optional argument object_hook which specifies a function that returns the desired custom object, given the decoded output (which in this case is a dict).

Code
def dict_to_obj(input_dict):
    """Converts a dictionary representation of an object to an instance of the object.

    Parameters
    ----------
    input_dict : `dict`
        A dictionary representation of the object, containing "__module__" 
        and "__class__" metadata

    Returns
    -------    
    obj : `object`
        A python object constructed from the dictionary representation    
    """
    assert "__class__" in input_dict and "__module__" in input_dict
    class_name = input_dict.pop("__class__")
    module_name = input_dict.pop("__module__")
    module = __import__(module_name)
    class_ = getattr(module, class_name)
    obj = class_(**input_dict)
    return obj

This function does the following:

  • Extract the class name from the dictionary under the key __class__

  • Extract the module name from the dictionary under the key __module__

  • Imports the module and get the class

  • Instantiate the class by giving to the class constructor all the instance arguments through dictionary unpacking

Code
obj_data = json.dumps(new_user, default=obj_to_dict)
new_object = json.loads(obj_data, object_hook=dict_to_obj)
new_object
Code
type(new_object)
Code
new_object.age
Note

Functions obj_to_dict() and dict_to_obj() are showcases for special/magic/dunder methods.

In the definition of class User, two special methods were explicitly defined: __init__ and __repr__. But many more are available, including __dir__().

Code
[dude for dude in dir(new_object) if dude.startswith('__')]
Code
new_object.__getattribute__('age')
Note

Class User could have been implemented as a dataclass

Code
from dataclasses import dataclass

@dataclass
class UserBis(object):
    """Custom User Class
    """
    name: str 
    age: int
    active: bool
    balance: float
    other_names: list[str]
    friends: list[str]
    spouse: str
Code
other_user = UserBis(**(new_user.__dict__))
Code
repr(other_user)
Code
{dude for dude in dir(other_user) if dude.startswith('__')} -  {dude for dude in dir(new_user) if dude.startswith('__')}

Using JSON with Spark

First, we download the data if it’s not there yet

Code
import requests, zipfile, io
from pathlib import Path

path = Path('drug-enforcement.json')
if not path.exists():
    url = "https://stephanegaiffas.github.io/big_data_course/data/drug-enforcement.json.zip"
    r = requests.get(url)
    z = zipfile.ZipFile(io.BytesIO(r.content))
    z.extractall(path='./')
Code
!ls drug*

Reading a JSON dataset with Spark

Code
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.functions import col

spark = (SparkSession
    .builder
    .appName("Spark JSON")
    .getOrCreate()
)

sc = spark._sc
Code
filename = "drug-enforcement.json"

First, lets look at the data. It’s a large set of JSON records about drugs enforcement.

Code
!head -n 1000 drug-enforcement.json

We need to tell spark that rows span on several lines with the multLine option

Code
df = spark.read.json(filename, multiLine=True)
Code
df.printSchema()
Code
df.schema
Note

This dataset is a little bit of a mess!

This should not be surprising. The data used to populate the Spark dataframe are not classically tabular but what people call semi-structured. Json is well-suited to store, represent, and exchange such data.

In the classical age of tabular data (according to Codd’s principles), a table cell could only hold a scalar value (numeric, logical, text, date, timestamp, …), nowadays Relational Database Management Systems handle Arrays, Composite Types, Range Types, …, and Json (see PostgreSQL).

Spark, R, and Pandas also allow us to work with complex types.

  • First, there is a nested opendfa dictionary. Each element of the dictionary is an array
  • A first good idea is to “flatten” the schema of the DataFrame, so that there is no nested types

Flattening of the schema

All the columns in the nested structure openfda are put up in the schema. These columns nested in the openfda are as follows:

Code
df.select('openfda.*').columns
Code
df.select("openfda.*").head(2)
Code
for c in df.select("openfda.*").columns:
    df = df.withColumn("openfda_" + c, col("openfda." + c))
Code
df = df.select([c for c in df.columns if c != "openfda"])
Code
df.printSchema()
Code
df.head(2)

Note that the display of the DataFrame is not as usual… it displays the dataframe like a list of Row, since the columns “openfda*” contain arrays of varying length

Missing data

A strategy can be to remove rows with missing data. dropna has several options, explained below.

Code
df.dropna().count()

If we remove all lines with at least one missing value, we end up with an empty dataframe !

Code
df.dropna(how='all').count()

dropna accepts the following arguments

  • how: can be 'any' or 'all'. If 'any', rows containing any null values will be dropped entirely (this is the default). If 'all', only rows which are entirely empty will be dropped.

  • thresh: accepts an integer representing the “threshold” for how many empty cells a row must have before being dropped. tresh is a middle ground between how='any' and how='all'. As a result, the presence of thresh will override how

  • subset: accepts a list of column names. When a subset is present, N/A values will only be checked against the columns whose names are provided.

Code
n_columns = len(df.columns)
Code
df.dropna(thresh=n_columns).count()
Code
df.dropna(thresh=n_columns-1).count()
Code
df.dropna(thresh=n_columns-10).count()
Code
df = df.dropna(subset=['postal_code', 'city', 'country', 'address_1'])
df.count()

But before this, let’s count the number of missing value for each column

Code
# For each column we create a new column containing 1 if the value is null and 0 otherwise.
# We need to bast Boolean to Int so that we can use fn.sum after
for c in df.columns:
    # Do not do this for _isnull columns (ince case you run this cell twice...)
    if not c.endswith("_isnull"):
        df = df.withColumn(c + "_isnull", fn.isnull(col(c)).cast('int'))
Code
df.head()
Code
# Get the list of _isnull columns
isnull_columns = [c for c in df.columns if c.endswith("_isnull")]

# On the _isnull columns :
#  - we compute the sum to have the number of null values and rename the column
#  - convert to pandas for better readability
#  - transpose the pandas dataframe for better readability
missing_values = df.select(isnull_columns)\
    .agg(*[fn.sum(c).alias(c.replace("_isnull", "")) for c in isnull_columns])\
    .toPandas()

missing_values.T\
    .rename({0: "missing values"}, axis="columns")

We see that more_code_info is always null and that termination_date if often null. Most of the openfda* columns are also almost always empty.

We can keep only the columns with no missing values

Code
# This line can seem complicated, run pieces of each to understand
kept_columns = list(
    missing_values.columns[(missing_values.iloc[0] == 0).values]
)
Code
df_kept = df.select(kept_columns)
Code
df_kept.head(2)
Code
df_kept.printSchema()
Code
df_kept.count()

Filtering by string values

Cases from South San Francisco

Code
df.filter(df.city == "South San Francisco")\
    .toPandas()

Remark. Once again, we use .toPandas() to pretty format the results in the notebook. But it’s a BAD idea to do this if the spark DataFrame is large, since it requires a collect()

Aside from filtering strings by a perfect match, there are plenty of other powerful ways to filter by strings in pyspark :

  • df.filter(df.city.contains('San Francisco')): returns rows where strings of a column contain a provided substring. In our example, filtering by rows which contain the substring “San Francisco” would be a good way to get all rows in San Francisco, instead of just “South San Francisco”.

  • df.filter(df.city.startswith('San')): Returns rows where a string starts with a provided substring.

  • df.filter(df.city.endswith('ice')): Returns rows where a string starts with a provided substring.

  • df.filter(df.city.isNull()): Returns rows where values in a provided column are null.

  • df.filter(df.city.isNotNull()): Opposite of the above.

  • df.filter(df.city.like('San%')): Performs a SQL-like query containing the LIKE clause.

  • df.filter(df.city.rlike('[A-Z]*ice$')): Performs a regexp filter.

  • df.filter(df.city.isin('San Francisco', 'Los Angeles')): Looks for rows where the string value of a column matches any of the provided strings exactly.

You can try some of these to understand

Code
df.filter(df.city.contains('San Francisco'))\
    .toPandas()
Code
df.filter(df.city.isin('San Francisco', 'Los Angeles')).toPandas()

Filtering by Date Values

In addition to filtering by strings, we can also filter by columns where the values are stored as dates or datetimes (or strings that can be inferred as dates). Perhaps the most useful way to filter dates is by using the between() method, which allows us to find results within a certain date range. Here we find all the results which were reported in the years 2013 and 2014:

Code
df.filter(df.city == "South San Francisco")\
    .filter(df.report_date.between('2013-01-01 00:00:00','2015-03-11 00:00:00'))\
    .toPandas()
Caution

Is Spark smart enough to understand that the string in column report_date contains a date?

Code
df.filter(df.city == "South San Francisco")\
    .filter(df.center_classification_date.between('2013-01-01 00:00:00','2013-12-31 00:00:00'))\
    .toPandas()
Code
df_dates = df.select([c for c in df.columns if c.endswith("date")])

df_dates.printSchema()
Code
df_dates.show(5)

Columns are not dates (DateType) but strings (StringType). When comparing report_date with '2013-01-01 00:00:00' and '2015-03-11 00:00:00', we are comparing strings and are lucky enough that in unicode '-' < '0' < '...' < '9' so that 2013-.... is less that any string starting with 20130..., while any string starting with 2013... is less than any string starting with 2015...

Caution

If some field in a Jason string is meant to represent a date or a datetime object, spark should be given a hint.

Json loaders (from Python) as well as the Spark Json reader have optional arguments that can be used to indicate the date parser to be used.

Handling complex types

Bridging the gap between tabular and semi-structured data.

Note

SQL, R, Pandas

struct, array, map

Code
# struct

The problems we faced after loading data from the json file pertained to the fact that column fda was of complex StrucType() type. We shall revisit this dataframe.

Code
df = spark.read.json(filename, multiLine=True)

The dataframe schema df.schema which is of type StructType (defined in pyspark.sql.types) can be converted to a json string which in turn can be converted into a Python dictionary.

Code
df = spark.read.json(filename, multiLine=True)

sj = json.loads(df.schema.json())

We equip the dataframe with a primary key

Code
from pyspark.sql import Window

w = Window.orderBy(col("center_classification_date"))

df = (
  df
    .withColumn("row_id", fn.row_number()
    .over(w))
)
Code
[(f['name'], f['type']) 
 for f in sj['fields'] if not isinstance(f['type'], str)]

Column openfda has type StrucType() with fields with composite type.

Code
{f['type']['type']
 for f in sj['fields'] if not isinstance(f['type'], str)}

Projecting on row_id and openfda.* leads to a (partially) flattened datafame, that, thanks to the row_id column can be joined with the original dataframe.

Code
df_proj = df.select('row_id', 'openfda.*')

df_proj.printSchema()

We can inspect the length of the arrays.

Code
# array
df_proj.select(
    fn.max(fn.size(col("application_number"))).alias("Max"), 
    fn.min(fn.size(col("application_number"))).alias("min"), 
    fn.avg(fn.size(col("application_number"))).alias("Mean")).show(1)

In some rows, the size of the array is -1 because the field is NULL.

Code
(
  df_proj
    .where(fn.size(col("application_number"))>1)
    .select("row_id")
    .show(5)
)

An array column can be exploded. This is like pivoting into long form. The result contains one row per item in the array.

Code
(
  df_proj
    .select('row_id', 'application_number')
    .withColumn("exploded", fn.explode(col("application_number")))
    .select('row_id', 'exploded')
    .groupBy('row_id')
    .agg(fn.count('exploded').alias("n_lignes"))
    .where("n_lignes > 1")
    .show(5)
)