Using JSON data with Python

This notebook is concerned with JSON a format that serves many purposes. Just as csv files, json files are important sources and sinks for Spark. As a exchange format, JSON is also a serialization tool for Python and many other languages. JSON provides a way to accomodate semi-structured data in otherwise tabular environments (dataframes and databases tables).

The notebook is organized in the following way:

Serialization and deserialization of built-in types

Code
import json

obj = {
    "name": "Foo Bar",
    "age": 78,
    "friends": ["Jane","John"],
    "balance": 345.80,
    "other_names":("Doe","Joe"),
    "active": True,
    "spouse": None
}

print(json.dumps(obj, sort_keys=True, indent=4))
{
    "active": true,
    "age": 78,
    "balance": 345.8,
    "friends": [
        "Jane",
        "John"
    ],
    "name": "Foo Bar",
    "other_names": [
        "Doe",
        "Joe"
    ],
    "spouse": null
}
Note

json.dumps() outputs a JSON formatted string.

Not every type of object can be fed to json.dumps().

Code
with open('user.json','w') as file:
    json.dump(obj, file, sort_keys=True, indent=4)
Code
!cat user.json
{
    "active": true,
    "age": 78,
    "balance": 345.8,
    "friends": [
        "Jane",
        "John"
    ],
    "name": "Foo Bar",
    "other_names": [
        "Doe",
        "Joe"
    ],
    "spouse": null
}
Code
json.loads('{"active": true, "age": 78, "balance": 345.8, "friends": ["Jane","John"], "name": "Foo Bar", "other_names": ["Doe","Joe"],"spouse":null}')
{'active': True,
 'age': 78,
 'balance': 345.8,
 'friends': ['Jane', 'John'],
 'name': 'Foo Bar',
 'other_names': ['Doe', 'Joe'],
 'spouse': None}
Code
with open('user.json', 'r') as file:
    user_data = json.load(file)

print(user_data)
{'active': True, 'age': 78, 'balance': 345.8, 'friends': ['Jane', 'John'], 'name': 'Foo Bar', 'other_names': ['Doe', 'Joe'], 'spouse': None}
Question

What happens if we feed json.dumps() with a numpy array?

Question

What happens if we feed json.dumps() with a datatime object?

Serialization and deserialization of custom objects

Code
class User(object):
    """Custom User Class
    """
    def __init__(self, name, age, active, balance, 
                 other_names, friends, spouse):
        self.name = name
        self.age = age
        self.active = active
        self.balance = balance
        self.other_names = other_names
        self.friends = friends
        self.spouse = spouse
            
    def __repr__(self):
        s = "User("
        s += "name=" + repr(self.name)
        s += ", age=" + repr(self.age)
        s += ", active=" + repr(self.active)
        s += ", other_names=" + repr(self.other_names)
        s += ", friends=" + repr(self.friends)
        s += ", spouse=" + repr(self.spouse) + ")"
        return s
Tip

Brush up your dunder/magic methods, for example in Fluent Python by Ramalho (Chapter I: The Python data model, Section Overview of Special Methods)

Code
new_user = User(
    name = "Foo Bar",
    age = 78,
    friends = ["Jane", "John"],
    balance = 345.80,
    other_names = ("Doe", "Joe"),
    active = True,
    spouse = None
)

new_user
User(name='Foo Bar', age=78, active=True, other_names=('Doe', 'Joe'), friends=['Jane', 'John'], spouse=None)
Note

Uncomment to see what happens

Code
# This will raise a TypeError
# json.dumps(new_user)

As expected, the custom object new_user is not JSON serializable. So let’s build a method that does that for us.

  • This comes as no surprise to us, since earlier on we observed that the json module only handles the built-in types, and User is not one.

  • We need to send our user data to a client over a network, so how do we get ourselves out of this error state?

  • A simple solution would be to convert our custom type into a serializable type that is a built-in type. We can conveniently define a method convert_to_dict() that returns a dictionary representation of our object. json.dumps() takes in a optional argument, default, which specifies a function to be called if the object is not serializable. This function returns a JSON encodable version of the object.

Recall that class obj has a dunder attribute __dict__ that provides a basis for obtaining a dictionary with the attributes of any object:

Code
new_user.__dict__
{'name': 'Foo Bar',
 'age': 78,
 'active': True,
 'balance': 345.8,
 'other_names': ('Doe', 'Joe'),
 'friends': ['Jane', 'John'],
 'spouse': None}
Code
def obj_to_dict(obj):
    """Converts an object to a dictionary representation of the object including 
    meta-data information about the object's module and class name.

    Parameters
    ----------
    obj : `object`
        A python object to be converted into a dictionary representation

    Returns
    -------
    output : `dict`
        A dictionary representation of the object
    """
    # Add object meta data 
    obj_dict = {
        "__class__": obj.__class__.__name__,
        "__module__": obj.__module__
    }
    # Add the object properties
    return obj_dict | obj.__dict__
Code
obj_to_dict(new_user)
{'__class__': 'User',
 '__module__': '__main__',
 'name': 'Foo Bar',
 'age': 78,
 'active': True,
 'balance': 345.8,
 'other_names': ('Doe', 'Joe'),
 'friends': ['Jane', 'John'],
 'spouse': None}

The function convert_to_dict does the following:

  • create a dictionary named obj_dict to act as the dict representation of our object.

  • dunder attributes __class__.__name__ and __module__ provide crucial metadata on the object: the class name and the module name

  • add the instance attributes of the object using obj.__dict__ (Python stores instance attributes in a dictionary)

The resulting obj_dict is now serializable (provided all attributes of our object are).

Now we can comfortably call json.dumps() on the object and pass default=convert_to_dict

Note

Obviously this fails if one of the attributes is not JSON serializable

Code
print(json.dumps(new_user, default=obj_to_dict, indent=4, sort_keys=True))
{
    "__class__": "User",
    "__module__": "__main__",
    "active": true,
    "age": 78,
    "balance": 345.8,
    "friends": [
        "Jane",
        "John"
    ],
    "name": "Foo Bar",
    "other_names": [
        "Doe",
        "Joe"
    ],
    "spouse": null
}

Now, if we want to decode (deserialiaze) a custom object, and create the correct object type, we need a function that does the inverse of obj_to_dict, since json.loads simply returns a dict:

Code
user_data = json.loads(json.dumps(new_user, default=obj_to_dict))
print(user_data)
{'__class__': 'User', '__module__': '__main__', 'name': 'Foo Bar', 'age': 78, 'active': True, 'balance': 345.8, 'other_names': ['Doe', 'Joe'], 'friends': ['Jane', 'John'], 'spouse': None}
Note

We need json.loads() to reconstruct a User object from this dictionary: json.loads() takes an optional argument object_hook which specifies a function that returns the desired custom object, given the decoded output (which in this case, is a dict).

Code
def dict_to_obj(input_dict):
    """Converts a dictionary representation of an object to an instance of the object.

    Parameters
    ----------
    input_dict : `dict`
        A dictionary representation of the object, containing "__module__" 
        and "__class__" metadata

    Returns
    -------    
    obj : `object`
        A python object constructed from the dictionary representation    
    """
    assert "__class__" in input_dict and "__module__" in input_dict
    class_name = input_dict.pop("__class__")
    module_name = input_dict.pop("__module__")
    module = __import__(module_name)
    class_ = getattr(module, class_name)
    obj = class_(**input_dict)
    return obj

This function does the following:

  • Extract the class name from the dictionary under the key __class__

  • Extract the module name from the dictionary under the key __module__

  • Imports the module and get the class

  • Instantiate the class by giving to the class constructor all the instance arguments through dictionary unpacking

Code
obj_data = json.dumps(new_user, default=obj_to_dict)
new_object = json.loads(obj_data, object_hook=dict_to_obj)
new_object
User(name='Foo Bar', age=78, active=True, other_names=['Doe', 'Joe'], friends=['Jane', 'John'], spouse=None)
Code
type(new_object)
__main__.User
Code
new_object.age
78
Note

Functions obj_to_dict() and dict_to_obj() are showcases for special/magic/dunder methods.

In the definition of class User, two special methods were explicitly defined: __init__() and __repr__(). But many more are available, including __dir__().

Remember that some dunder members of the object are not callable.

Code
[dude for dude in dir(new_object) if dude.startswith('__') and callable(getattr(new_object, dude))]
['__class__',
 '__delattr__',
 '__dir__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__']
Code
[dude for dude in dir(new_object) if dude.startswith('__') and not callable(getattr(new_object, dude))]
['__dict__', '__doc__', '__module__', '__weakref__']
Code
new_object.__getattribute__('age')

getattr(new_object, 'age')
78
Note

Class User could have been implemented as a dataclass

Code
from dataclasses import dataclass

@dataclass
class UserBis(object):
    """Custom User Class
    """
    name: str 
    age: int
    active: bool
    balance: float
    other_names: list[str]
    friends: list[str]
    spouse: str
Note

@dataclass is a decorator. Have a look at the chapter on decorators in [Fluent Python] by Ramalho

Code
other_user = UserBis(**(new_user.__dict__))
Code
repr(other_user)
"UserBis(name='Foo Bar', age=78, active=True, balance=345.8, other_names=('Doe', 'Joe'), friends=['Jane', 'John'], spouse=None)"
Code
{dude for dude in dir(other_user) if dude.startswith('__')} -  {dude for dude in dir(new_user) if dude.startswith('__')}
{'__annotations__',
 '__dataclass_fields__',
 '__dataclass_params__',
 '__match_args__'}

Using JSON with Spark

First, we download the data if it’s not there yet

Code
import requests, zipfile, io
from pathlib import Path

path = Path('drug-enforcement.json.zip')
if not path.exists():
    url = "https://s-v-b.github.io/IFEBY310/data/drug-enforcement.json.zip"
    r = requests.get(url)
    z = zipfile.ZipFile(io.BytesIO(r.content))
    z.extractall(path='./')
Code
!ls drug*
drug-enforcement.json

Reading a JSON dataset with Spark

Code
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.functions import col

spark = (SparkSession
    .builder
    .appName("Spark JSON")
    .getOrCreate()
)

sc = spark._sc
25/04/03 15:09:09 WARN Utils: Your hostname, boucheron-Precision-5480 resolves to a loopback address: 127.0.1.1; using 172.23.32.10 instead (on interface eth0)
25/04/03 15:09:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/03 15:09:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Code
filename = "drug-enforcement.json"

First, lets look at the data. It’s a large set of JSON records about drugs enforcement.

Code
!head -n 100 drug-enforcement.json
[
    {
      "classification": "Class II",
      "center_classification_date": "20121025",
      "report_date": "20121031",
      "postal_code": "08816-2108",
      "termination_date": "20141007",
      "recall_initiation_date": "20120904",
      "recall_number": "D-026-2013",
      "city": "East Brunswick",
      "event_id": "63384",
      "distribution_pattern": "Nationwide",
      "openfda": {},
      "recalling_firm": "Raritan Pharmaceuticals, Inc.",
      "voluntary_mandated": "Voluntary: Firm Initiated",
      "state": "NJ",
      "reason_for_recall": "Microbial Contamination of Non-Sterile Products: Product is being recalled due to possible microbial contamination by C. difficile discovered in the raw material.",
      "initial_firm_notification": "E-Mail",
      "status": "Terminated",
      "product_type": "Drugs",
      "country": "United States",
      "product_description": "Wal-Mucil 100% Natural Fiber, 100% Natural Psyllium Seed Husk, Fiber Laxative/Supplement, a)160 capsules per bottle (item #503663), and b) 320 capsules per bottle (Item #586143), Distributed by: Walgreen Co., 200 Wilmot Road, Deerfield, IL 60015-4616, www.walgreens.com, a) UPC 3-11917-08151-9, b) UPC 3-11917-07658-4",
      "code_info": "Lots a) 15952, 16270,16425, Exp 06/15; b)16459, 16466, 16467, Exp 07/15",
      "address_1": "8 Joanna Ct",
      "address_2": "",
      "product_quantity": "56,808 bottles"
    },
    {
      "classification": "Class II",
      "center_classification_date": "20121025",
      "report_date": "20121031",
      "postal_code": "08816-2108",
      "termination_date": "20141007",
      "recall_initiation_date": "20120904",
      "recall_number": "D-031-2013",
      "city": "East Brunswick",
      "event_id": "63384",
      "distribution_pattern": "Nationwide",
      "openfda": {},
      "recalling_firm": "Raritan Pharmaceuticals, Inc.",
      "voluntary_mandated": "Voluntary: Firm Initiated",
      "state": "NJ",
      "reason_for_recall": "Microbial Contamination of Non-Sterile Products: Product is being recalled due to possible microbial contamination by C. difficile discovered in the raw material.",
      "initial_firm_notification": "E-Mail",
      "status": "Terminated",
      "product_type": "Drugs",
      "country": "United States",
      "product_description": "Premier Value Fiber Plus Calcium Supplement Capsules, 120 capsules per bottle, Distributed by: Chain Drug Consortium, LLC, Boca Raton, FL, UPC 8-40986-01987-6",
      "code_info": "Lot 15087, Exp 08/15",
      "address_1": "8 Joanna Ct",
      "address_2": "",
      "product_quantity": "96 bottles"
    },
    {
      "classification": "Class III",
      "center_classification_date": "20121106",
      "report_date": "20121114",
      "postal_code": "08807",
      "termination_date": "20130325",
      "recall_initiation_date": "20121015",
      "recall_number": "D-047-2013",
      "city": "Bridgewater",
      "event_id": "63488",
      "distribution_pattern": "Nationwide",
      "openfda": {},
      "recalling_firm": "Valeant Pharmaceuticals",
      "voluntary_mandated": "Voluntary: Firm Initiated",
      "state": "NJ",
      "reason_for_recall": "Subpotent (Single Ingredient) Drug: This product was found to be subpotent for the salicylic acid ingredient.  Additionally, this product is mislabeled because the label either omits or erroneously added inactive ingredients to the label.",
      "initial_firm_notification": "Letter",
      "status": "Terminated",
      "product_type": "Drugs",
      "country": "United States",
      "product_description": "AcneFree 3-in-1 Acne Night Repair Foam (retinol + salicylic acid 1.5% w/v), 3 oz (85 g) canister, Dist. by: University Medical Pharmaceuticals Corp., Irvine, CA  92618, UPC 7 88521 13548 6.",
      "code_info": "All lots with expiration dates between 10/10/12 through 10/10/14 of UPC 7 88521 13548 6",
      "address_1": "700 Rte 206 North",
      "address_2": "",
      "product_quantity": "81,319 canisters"
    },
    {
      "classification": "Class III",
      "center_classification_date": "20121220",
      "report_date": "20121226",
      "postal_code": "08558-1311",
      "termination_date": "20140429",
      "recall_initiation_date": "20121204",
      "recall_number": "D-098-2013",
      "city": "Skillman",
      "more_code_info": null,
      "event_id": "63787",
      "distribution_pattern": "Nationwide",
      "openfda": {},
      "recalling_firm": "Johnson & Johnson",
      "voluntary_mandated": "Voluntary: Firm Initiated",
      "state": "NJ",
      "reason_for_recall": "Superpotent (Single Ingredient Drug): salicylic acid",
      "initial_firm_notification": "Letter",
      "status": "Terminated",
      "product_type": "Drugs",
      "country": "United States",
Note

We need to tell spark that rows span on several lines with the multLine option

Code
df = spark.read.json(filename, multiLine=True)
Code
df.printSchema()
root
 |-- address_1: string (nullable = true)
 |-- address_2: string (nullable = true)
 |-- center_classification_date: string (nullable = true)
 |-- city: string (nullable = true)
 |-- classification: string (nullable = true)
 |-- code_info: string (nullable = true)
 |-- country: string (nullable = true)
 |-- distribution_pattern: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- initial_firm_notification: string (nullable = true)
 |-- more_code_info: string (nullable = true)
 |-- openfda: struct (nullable = true)
 |    |-- application_number: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- brand_name: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- generic_name: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- is_original_packager: array (nullable = true)
 |    |    |-- element: boolean (containsNull = true)
 |    |-- manufacturer_name: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- nui: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- original_packager_product_ndc: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- package_ndc: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- pharm_class_cs: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- pharm_class_epc: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- pharm_class_moa: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- pharm_class_pe: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- product_ndc: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- product_type: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- route: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- rxcui: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- spl_id: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- spl_set_id: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- substance_name: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- unii: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- upc: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- postal_code: string (nullable = true)
 |-- product_description: string (nullable = true)
 |-- product_quantity: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- reason_for_recall: string (nullable = true)
 |-- recall_initiation_date: string (nullable = true)
 |-- recall_number: string (nullable = true)
 |-- recalling_firm: string (nullable = true)
 |-- report_date: string (nullable = true)
 |-- state: string (nullable = true)
 |-- status: string (nullable = true)
 |-- termination_date: string (nullable = true)
 |-- voluntary_mandated: string (nullable = true)
Note

In a less user-friendly format:

Code
df.schema
StructType([StructField('address_1', StringType(), True), StructField('address_2', StringType(), True), StructField('center_classification_date', StringType(), True), StructField('city', StringType(), True), StructField('classification', StringType(), True), StructField('code_info', StringType(), True), StructField('country', StringType(), True), StructField('distribution_pattern', StringType(), True), StructField('event_id', StringType(), True), StructField('initial_firm_notification', StringType(), True), StructField('more_code_info', StringType(), True), StructField('openfda', StructType([StructField('application_number', ArrayType(StringType(), True), True), StructField('brand_name', ArrayType(StringType(), True), True), StructField('generic_name', ArrayType(StringType(), True), True), StructField('is_original_packager', ArrayType(BooleanType(), True), True), StructField('manufacturer_name', ArrayType(StringType(), True), True), StructField('nui', ArrayType(StringType(), True), True), StructField('original_packager_product_ndc', ArrayType(StringType(), True), True), StructField('package_ndc', ArrayType(StringType(), True), True), StructField('pharm_class_cs', ArrayType(StringType(), True), True), StructField('pharm_class_epc', ArrayType(StringType(), True), True), StructField('pharm_class_moa', ArrayType(StringType(), True), True), StructField('pharm_class_pe', ArrayType(StringType(), True), True), StructField('product_ndc', ArrayType(StringType(), True), True), StructField('product_type', ArrayType(StringType(), True), True), StructField('route', ArrayType(StringType(), True), True), StructField('rxcui', ArrayType(StringType(), True), True), StructField('spl_id', ArrayType(StringType(), True), True), StructField('spl_set_id', ArrayType(StringType(), True), True), StructField('substance_name', ArrayType(StringType(), True), True), StructField('unii', ArrayType(StringType(), True), True), StructField('upc', ArrayType(StringType(), True), True)]), True), StructField('postal_code', StringType(), True), StructField('product_description', StringType(), True), StructField('product_quantity', StringType(), True), StructField('product_type', StringType(), True), StructField('reason_for_recall', StringType(), True), StructField('recall_initiation_date', StringType(), True), StructField('recall_number', StringType(), True), StructField('recalling_firm', StringType(), True), StructField('report_date', StringType(), True), StructField('state', StringType(), True), StructField('status', StringType(), True), StructField('termination_date', StringType(), True), StructField('voluntary_mandated', StringType(), True)])
Note

This dataset is a little bit of a mess!

This should not be surprising. The data used to populate the Spark dataframe are not classically tabular but what people call semi-structured. Json is well-suited to store, represent, and exchange such data.

In the classical age of tabular data (according to Codd’s principles), a table cell could only hold a scalar value (numeric, logical, text, date, timestamp, …), nowadays Relational Database Management Systems handle Arrays, Composite Types, Range Types, …, and Json (see PostgreSQL).

Spark, R, and Pandas, and modern relational databases also allow us to work with complex types.

Modern column oriented file format like parquet also work with nested structures.

  • First, there is a nested opendfa dictionary. Each element of the dictionary is an array
  • A first good idea is to “flatten” the schema of the DataFrame, so that there are no nested types any more.

Flattening the schema

All the columns in the nested structure openfda are put up in the schema. These columns nested in the openfda are as follows:

Code
df.select('openfda.*').columns
['application_number',
 'brand_name',
 'generic_name',
 'is_original_packager',
 'manufacturer_name',
 'nui',
 'original_packager_product_ndc',
 'package_ndc',
 'pharm_class_cs',
 'pharm_class_epc',
 'pharm_class_moa',
 'pharm_class_pe',
 'product_ndc',
 'product_type',
 'route',
 'rxcui',
 'spl_id',
 'spl_set_id',
 'substance_name',
 'unii',
 'upc']
Code
df.select("openfda.*").head(2)
[Row(application_number=None, brand_name=None, generic_name=None, is_original_packager=None, manufacturer_name=None, nui=None, original_packager_product_ndc=None, package_ndc=None, pharm_class_cs=None, pharm_class_epc=None, pharm_class_moa=None, pharm_class_pe=None, product_ndc=None, product_type=None, route=None, rxcui=None, spl_id=None, spl_set_id=None, substance_name=None, unii=None, upc=None),
 Row(application_number=None, brand_name=None, generic_name=None, is_original_packager=None, manufacturer_name=None, nui=None, original_packager_product_ndc=None, package_ndc=None, pharm_class_cs=None, pharm_class_epc=None, pharm_class_moa=None, pharm_class_pe=None, product_ndc=None, product_type=None, route=None, rxcui=None, spl_id=None, spl_set_id=None, substance_name=None, unii=None, upc=None)]
Code
for c in df.select("openfda.*").columns:
    df = df.withColumn("openfda_" + c, col("openfda." + c))
Code
df = df.select([c for c in df.columns if c != "openfda"])
Code
df.printSchema()
root
 |-- address_1: string (nullable = true)
 |-- address_2: string (nullable = true)
 |-- center_classification_date: string (nullable = true)
 |-- city: string (nullable = true)
 |-- classification: string (nullable = true)
 |-- code_info: string (nullable = true)
 |-- country: string (nullable = true)
 |-- distribution_pattern: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- initial_firm_notification: string (nullable = true)
 |-- more_code_info: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- product_description: string (nullable = true)
 |-- product_quantity: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- reason_for_recall: string (nullable = true)
 |-- recall_initiation_date: string (nullable = true)
 |-- recall_number: string (nullable = true)
 |-- recalling_firm: string (nullable = true)
 |-- report_date: string (nullable = true)
 |-- state: string (nullable = true)
 |-- status: string (nullable = true)
 |-- termination_date: string (nullable = true)
 |-- voluntary_mandated: string (nullable = true)
 |-- openfda_application_number: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_brand_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_generic_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_is_original_packager: array (nullable = true)
 |    |-- element: boolean (containsNull = true)
 |-- openfda_manufacturer_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_nui: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_original_packager_product_ndc: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_package_ndc: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_pharm_class_cs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_pharm_class_epc: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_pharm_class_moa: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_pharm_class_pe: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_product_ndc: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_product_type: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_route: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_rxcui: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_spl_id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_spl_set_id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_substance_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_unii: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_upc: array (nullable = true)
 |    |-- element: string (containsNull = true)
Code
df.head(2)
25/04/03 15:09:13 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Row(address_1='8 Joanna Ct', address_2='', center_classification_date='20121025', city='East Brunswick', classification='Class II', code_info='Lots a) 15952, 16270,16425, Exp 06/15; b)16459, 16466, 16467, Exp 07/15', country='United States', distribution_pattern='Nationwide', event_id='63384', initial_firm_notification='E-Mail', more_code_info=None, postal_code='08816-2108', product_description='Wal-Mucil 100% Natural Fiber, 100% Natural Psyllium Seed Husk, Fiber Laxative/Supplement, a)160 capsules per bottle (item #503663), and b) 320 capsules per bottle (Item #586143), Distributed by: Walgreen Co., 200 Wilmot Road, Deerfield, IL 60015-4616, www.walgreens.com, a) UPC 3-11917-08151-9, b) UPC 3-11917-07658-4', product_quantity='56,808 bottles', product_type='Drugs', reason_for_recall='Microbial Contamination of Non-Sterile Products: Product is being recalled due to possible microbial contamination by C. difficile discovered in the raw material.', recall_initiation_date='20120904', recall_number='D-026-2013', recalling_firm='Raritan Pharmaceuticals, Inc.', report_date='20121031', state='NJ', status='Terminated', termination_date='20141007', voluntary_mandated='Voluntary: Firm Initiated', openfda_application_number=None, openfda_brand_name=None, openfda_generic_name=None, openfda_is_original_packager=None, openfda_manufacturer_name=None, openfda_nui=None, openfda_original_packager_product_ndc=None, openfda_package_ndc=None, openfda_pharm_class_cs=None, openfda_pharm_class_epc=None, openfda_pharm_class_moa=None, openfda_pharm_class_pe=None, openfda_product_ndc=None, openfda_product_type=None, openfda_route=None, openfda_rxcui=None, openfda_spl_id=None, openfda_spl_set_id=None, openfda_substance_name=None, openfda_unii=None, openfda_upc=None),
 Row(address_1='8 Joanna Ct', address_2='', center_classification_date='20121025', city='East Brunswick', classification='Class II', code_info='Lot 15087, Exp 08/15', country='United States', distribution_pattern='Nationwide', event_id='63384', initial_firm_notification='E-Mail', more_code_info=None, postal_code='08816-2108', product_description='Premier Value Fiber Plus Calcium Supplement Capsules, 120 capsules per bottle, Distributed by: Chain Drug Consortium, LLC, Boca Raton, FL, UPC 8-40986-01987-6', product_quantity='96 bottles', product_type='Drugs', reason_for_recall='Microbial Contamination of Non-Sterile Products: Product is being recalled due to possible microbial contamination by C. difficile discovered in the raw material.', recall_initiation_date='20120904', recall_number='D-031-2013', recalling_firm='Raritan Pharmaceuticals, Inc.', report_date='20121031', state='NJ', status='Terminated', termination_date='20141007', voluntary_mandated='Voluntary: Firm Initiated', openfda_application_number=None, openfda_brand_name=None, openfda_generic_name=None, openfda_is_original_packager=None, openfda_manufacturer_name=None, openfda_nui=None, openfda_original_packager_product_ndc=None, openfda_package_ndc=None, openfda_pharm_class_cs=None, openfda_pharm_class_epc=None, openfda_pharm_class_moa=None, openfda_pharm_class_pe=None, openfda_product_ndc=None, openfda_product_type=None, openfda_route=None, openfda_rxcui=None, openfda_spl_id=None, openfda_spl_set_id=None, openfda_substance_name=None, openfda_unii=None, openfda_upc=None)]

Note that the display of the DataFrame is not as usual… it displays the dataframe like a list of Row, since the columns “openfda*” contain arrays of varying length

Note

A principled approach to schema flattening is embodied in the next chunk.

df.schema allows us to perform flattening in a programmatic way.

Code
from pyspark.sql.types import StructType
from pyspark.sql.functions import col

def flatten_schema(df):
    # Get fields and their data types
    fields = df.schema.fields
    
    # Flatten array of column names
    flat_cols = []
    
    for field in fields:
        # Handle nested structures
        if isinstance(field.dataType, StructType):
            nested = df.select(field.name + ".*").columns
            flat_cols.extend([field.name + "." + x for x in nested])
        else:
            flat_cols.append(field.name)
    
    # Select all flattened columns
    df_flattened = df.select([col(x).alias(x.replace(".","_")) for x in flat_cols])
    
    return df_flattened
Note

This function definition is from copilot under the following prompt:

How can I flatten the schema of a spark dataframe?
Code
df = spark.read.json(filename, multiLine=True)

df_flat = flatten_schema(df)

df_flat.printSchema()
root
 |-- address_1: string (nullable = true)
 |-- address_2: string (nullable = true)
 |-- center_classification_date: string (nullable = true)
 |-- city: string (nullable = true)
 |-- classification: string (nullable = true)
 |-- code_info: string (nullable = true)
 |-- country: string (nullable = true)
 |-- distribution_pattern: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- initial_firm_notification: string (nullable = true)
 |-- more_code_info: string (nullable = true)
 |-- openfda_application_number: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_brand_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_generic_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_is_original_packager: array (nullable = true)
 |    |-- element: boolean (containsNull = true)
 |-- openfda_manufacturer_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_nui: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_original_packager_product_ndc: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_package_ndc: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_pharm_class_cs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_pharm_class_epc: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_pharm_class_moa: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_pharm_class_pe: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_product_ndc: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_product_type: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_route: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_rxcui: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_spl_id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_spl_set_id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_substance_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_unii: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- openfda_upc: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- postal_code: string (nullable = true)
 |-- product_description: string (nullable = true)
 |-- product_quantity: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- reason_for_recall: string (nullable = true)
 |-- recall_initiation_date: string (nullable = true)
 |-- recall_number: string (nullable = true)
 |-- recalling_firm: string (nullable = true)
 |-- report_date: string (nullable = true)
 |-- state: string (nullable = true)
 |-- status: string (nullable = true)
 |-- termination_date: string (nullable = true)
 |-- voluntary_mandated: string (nullable = true)
Code
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

messy_schema = StructType([
    StructField("id", IntegerType()),
    StructField("info", StructType([
        StructField("name", StringType()),
        StructField("age", IntegerType()),
        StructField("zoo", StructType([
            StructField("cat", StringType()),
            StructField("dog", StringType())
        ]))
    ]))
])
Note

This principled approach is not the end of the story. If the schema exhibits hierarchical nesting, flatten_schema() only removes one level of nesting.

Code
data = [(1, ("John", 30, ("Fritz", "Medor"))), (2, ("Jane", 25, ("Grominet", "Goofy")))]

very_nested_df = spark.createDataFrame(data, messy_schema)
Code
flatten_schema(very_nested_df).show()
+---+---------+--------+-----------------+
| id|info_name|info_age|         info_zoo|
+---+---------+--------+-----------------+
|  1|     John|      30|   {Fritz, Medor}|
|  2|     Jane|      25|{Grominet, Goofy}|
+---+---------+--------+-----------------+
Code
flatten_schema(very_nested_df).printSchema()
root
 |-- id: integer (nullable = true)
 |-- info_name: string (nullable = true)
 |-- info_age: integer (nullable = true)
 |-- info_zoo: struct (nullable = true)
 |    |-- cat: string (nullable = true)
 |    |-- dog: string (nullable = true)
Warning

copilot pretends that the flattening function above handles nested structure recursively. This is not the case.

Fix this

Missing data

A strategy can be to remove rows with missing data. dropna() has several options, explained below.

Code
df.dropna().count()
2

If we remove all lines with at least one missing value, we end up with an empty dataframe !

Code
df.dropna(how='all').count()
11292

dropna() accepts the following arguments

  • how: can be 'any' or 'all'. If 'any', rows containing any null values will be dropped entirely (this is the default). If 'all', only rows which are entirely empty will be dropped.

  • thresh: accepts an integer representing the “threshold” for how many empty cells a row must have before being dropped. tresh is a middle ground between how='any' and how='all'. As a result, the presence of thresh will override how

  • subset: accepts a list of column names. When a subset is present, N/A values will only be checked against the columns whose names are provided.

Code
n_columns = len(df.columns)
Code
df.dropna(thresh=n_columns).count()
2
Code
df.dropna(thresh=n_columns-1).count()
7550
Code
df.dropna(thresh=n_columns-10).count()
11292
Code
df = df.dropna(subset=['postal_code', 'city', 'country', 'address_1'])
df.count()
11292

But before this, let’s count the number of missing value for each column

Code
# For each column we create a new column containing 1 if the value is null and 0 otherwise.
# We need to bast Boolean to Int so that we can use fn.sum after
for c in df.columns:
    # Do not do this for _isnull columns (just in case you run this cell twice...)
    if not c.endswith("_isnull"):
        df = df.withColumn(c + "_isnull", fn.isnull(col(c)).cast('int'))
Code
df.head()
Row(address_1='8 Joanna Ct', address_2='', center_classification_date='20121025', city='East Brunswick', classification='Class II', code_info='Lots a) 15952, 16270,16425, Exp 06/15; b)16459, 16466, 16467, Exp 07/15', country='United States', distribution_pattern='Nationwide', event_id='63384', initial_firm_notification='E-Mail', more_code_info=None, openfda=Row(application_number=None, brand_name=None, generic_name=None, is_original_packager=None, manufacturer_name=None, nui=None, original_packager_product_ndc=None, package_ndc=None, pharm_class_cs=None, pharm_class_epc=None, pharm_class_moa=None, pharm_class_pe=None, product_ndc=None, product_type=None, route=None, rxcui=None, spl_id=None, spl_set_id=None, substance_name=None, unii=None, upc=None), postal_code='08816-2108', product_description='Wal-Mucil 100% Natural Fiber, 100% Natural Psyllium Seed Husk, Fiber Laxative/Supplement, a)160 capsules per bottle (item #503663), and b) 320 capsules per bottle (Item #586143), Distributed by: Walgreen Co., 200 Wilmot Road, Deerfield, IL 60015-4616, www.walgreens.com, a) UPC 3-11917-08151-9, b) UPC 3-11917-07658-4', product_quantity='56,808 bottles', product_type='Drugs', reason_for_recall='Microbial Contamination of Non-Sterile Products: Product is being recalled due to possible microbial contamination by C. difficile discovered in the raw material.', recall_initiation_date='20120904', recall_number='D-026-2013', recalling_firm='Raritan Pharmaceuticals, Inc.', report_date='20121031', state='NJ', status='Terminated', termination_date='20141007', voluntary_mandated='Voluntary: Firm Initiated', address_1_isnull=0, address_2_isnull=0, center_classification_date_isnull=0, city_isnull=0, classification_isnull=0, code_info_isnull=0, country_isnull=0, distribution_pattern_isnull=0, event_id_isnull=0, initial_firm_notification_isnull=0, more_code_info_isnull=1, openfda_isnull=0, postal_code_isnull=0, product_description_isnull=0, product_quantity_isnull=0, product_type_isnull=0, reason_for_recall_isnull=0, recall_initiation_date_isnull=0, recall_number_isnull=0, recalling_firm_isnull=0, report_date_isnull=0, state_isnull=0, status_isnull=0, termination_date_isnull=0, voluntary_mandated_isnull=0)
Code
# Get the list of _isnull columns
isnull_columns = [c for c in df.columns if c.endswith("_isnull")]

# On the _isnull columns :
#  - we compute the sum to have the number of null values and rename the column
#  - convert to pandas for better readability
#  - transpose the pandas dataframe for better readability
missing_values = df.select(isnull_columns)\
    .agg(*[fn.sum(c).alias(c.replace("_isnull", "")) for c in isnull_columns])\
    .toPandas()

missing_values.T\
    .rename({0: "missing values"}, axis="columns")
missing values
address_1 0
address_2 0
center_classification_date 47
city 0
classification 0
code_info 0
country 0
distribution_pattern 0
event_id 0
initial_firm_notification 0
more_code_info 11290
openfda 0
postal_code 0
product_description 0
product_quantity 0
product_type 0
reason_for_recall 0
recall_initiation_date 0
recall_number 0
recalling_firm 0
report_date 0
state 0
status 0
termination_date 3741
voluntary_mandated 0

We see that more_code_info is always null and that termination_date if often null. Most of the openfda* columns are also almost always empty.

We can keep only the columns with no missing values

Code
# This line can seem complicated, run pieces of each to understand
kept_columns = list(
    missing_values.columns[(missing_values.iloc[0] == 0).values]
)
Code
df_kept = df.select(kept_columns)
Code
df_kept.head(2)
[Row(address_1='8 Joanna Ct', address_2='', city='East Brunswick', classification='Class II', code_info='Lots a) 15952, 16270,16425, Exp 06/15; b)16459, 16466, 16467, Exp 07/15', country='United States', distribution_pattern='Nationwide', event_id='63384', initial_firm_notification='E-Mail', openfda=Row(application_number=None, brand_name=None, generic_name=None, is_original_packager=None, manufacturer_name=None, nui=None, original_packager_product_ndc=None, package_ndc=None, pharm_class_cs=None, pharm_class_epc=None, pharm_class_moa=None, pharm_class_pe=None, product_ndc=None, product_type=None, route=None, rxcui=None, spl_id=None, spl_set_id=None, substance_name=None, unii=None, upc=None), postal_code='08816-2108', product_description='Wal-Mucil 100% Natural Fiber, 100% Natural Psyllium Seed Husk, Fiber Laxative/Supplement, a)160 capsules per bottle (item #503663), and b) 320 capsules per bottle (Item #586143), Distributed by: Walgreen Co., 200 Wilmot Road, Deerfield, IL 60015-4616, www.walgreens.com, a) UPC 3-11917-08151-9, b) UPC 3-11917-07658-4', product_quantity='56,808 bottles', product_type='Drugs', reason_for_recall='Microbial Contamination of Non-Sterile Products: Product is being recalled due to possible microbial contamination by C. difficile discovered in the raw material.', recall_initiation_date='20120904', recall_number='D-026-2013', recalling_firm='Raritan Pharmaceuticals, Inc.', report_date='20121031', state='NJ', status='Terminated', voluntary_mandated='Voluntary: Firm Initiated'),
 Row(address_1='8 Joanna Ct', address_2='', city='East Brunswick', classification='Class II', code_info='Lot 15087, Exp 08/15', country='United States', distribution_pattern='Nationwide', event_id='63384', initial_firm_notification='E-Mail', openfda=Row(application_number=None, brand_name=None, generic_name=None, is_original_packager=None, manufacturer_name=None, nui=None, original_packager_product_ndc=None, package_ndc=None, pharm_class_cs=None, pharm_class_epc=None, pharm_class_moa=None, pharm_class_pe=None, product_ndc=None, product_type=None, route=None, rxcui=None, spl_id=None, spl_set_id=None, substance_name=None, unii=None, upc=None), postal_code='08816-2108', product_description='Premier Value Fiber Plus Calcium Supplement Capsules, 120 capsules per bottle, Distributed by: Chain Drug Consortium, LLC, Boca Raton, FL, UPC 8-40986-01987-6', product_quantity='96 bottles', product_type='Drugs', reason_for_recall='Microbial Contamination of Non-Sterile Products: Product is being recalled due to possible microbial contamination by C. difficile discovered in the raw material.', recall_initiation_date='20120904', recall_number='D-031-2013', recalling_firm='Raritan Pharmaceuticals, Inc.', report_date='20121031', state='NJ', status='Terminated', voluntary_mandated='Voluntary: Firm Initiated')]
Code
df_kept.printSchema()
root
 |-- address_1: string (nullable = true)
 |-- address_2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- classification: string (nullable = true)
 |-- code_info: string (nullable = true)
 |-- country: string (nullable = true)
 |-- distribution_pattern: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- initial_firm_notification: string (nullable = true)
 |-- openfda: struct (nullable = true)
 |    |-- application_number: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- brand_name: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- generic_name: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- is_original_packager: array (nullable = true)
 |    |    |-- element: boolean (containsNull = true)
 |    |-- manufacturer_name: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- nui: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- original_packager_product_ndc: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- package_ndc: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- pharm_class_cs: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- pharm_class_epc: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- pharm_class_moa: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- pharm_class_pe: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- product_ndc: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- product_type: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- route: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- rxcui: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- spl_id: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- spl_set_id: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- substance_name: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- unii: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- upc: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- postal_code: string (nullable = true)
 |-- product_description: string (nullable = true)
 |-- product_quantity: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- reason_for_recall: string (nullable = true)
 |-- recall_initiation_date: string (nullable = true)
 |-- recall_number: string (nullable = true)
 |-- recalling_firm: string (nullable = true)
 |-- report_date: string (nullable = true)
 |-- state: string (nullable = true)
 |-- status: string (nullable = true)
 |-- voluntary_mandated: string (nullable = true)
Code
df_kept.count()
11292

Filtering by string values

Cases from South San Francisco

Code
df.filter(df.city == "South San Francisco")\
    .toPandas()
address_1 address_2 center_classification_date city classification code_info country distribution_pattern event_id initial_firm_notification ... product_type_isnull reason_for_recall_isnull recall_initiation_date_isnull recall_number_isnull recalling_firm_isnull report_date_isnull state_isnull status_isnull termination_date_isnull voluntary_mandated_isnull
0 1 DNA Way 20130122 South San Francisco Class I Lot #: 454138, Exp 07/14 (containing Trastuzum... United States Nationwide 63258 E-Mail ... 0 0 0 0 0 0 0 0 0 0
1 1 DNA Way 20121204 South San Francisco Class II Lot# 936674 Exp. 09/30/13 United States Nationwide 63243 Letter ... 0 0 0 0 0 0 0 0 0 0
2 1 DNA Way 20121204 South San Francisco Class II Lot # 936670 Exp. 09/30/13 United States Nationwide 63243 Letter ... 0 0 0 0 0 0 0 0 0 0
3 1 DNA Way 20121220 South San Francisco Class III Lot #: a) M1365B01, Exp 04/15; b) M1365, Exp 0... United States Nationwide 63874 Letter ... 0 0 0 0 0 0 0 0 0 0
4 1 Dna Way 20180213 South San Francisco Class III Lot # 3141989, EXP 08/31/2019 United States Distributed throughout the United States 79175 Letter ... 0 0 0 0 0 0 0 0 0 0
5 5000 Shoreline Ct. Ste 200 20130201 South San Francisco Class III Lot #: MA00AD5, Exp: 11/30/2014 United States CA & VA 64151 Letter ... 0 0 0 0 0 0 0 0 0 0
6 1 Dna Way 20171201 South San Francisco Class I Lot# 3128243, 3141239, EXP. 9/30/2018; 3166728... United States Nationwide in the USA 78088 Letter ... 0 0 0 0 0 0 0 0 0 0
7 1 Dna Way 20170405 South San Francisco Class II B1009MC, B1009M9, B1009MA; Exp. 02/18 B1009MT... United States NJ and IL 76726 Letter ... 0 0 0 0 0 0 0 0 0 0

8 rows × 50 columns

Caution

Once again, we use .toPandas() to pretty format the results in the notebook.

But it’s a BAD idea to do this if the spark DataFrame is large, since it requires a collect()

Aside from filtering strings by a perfect match, there are plenty of other powerful ways to filter by strings in pyspark :

  • df.filter(df.city.contains('San Francisco')): returns rows where strings of a column contain a provided substring. In our example, filtering by rows which contain the substring “San Francisco” would be a good way to get all rows in San Francisco, instead of just “South San Francisco”.

  • df.filter(df.city.startswith('San')): Returns rows where a string starts with a provided substring.

  • df.filter(df.city.endswith('ice')): Returns rows where a string starts with a provided substring.

  • df.filter(df.city.isNull()): Returns rows where values in a provided column are null.

  • df.filter(df.city.isNotNull()): Opposite of the above.

  • df.filter(df.city.like('San%')): Performs a SQL-like query containing the LIKE clause.

  • df.filter(df.city.rlike('[A-Z]*ice$')): Performs a regexp filter.

  • df.filter(df.city.isin('San Francisco', 'Los Angeles')): Looks for rows where the string value of a column matches any of the provided strings exactly.

You can try some of these to understand

Code
df.filter(df.city.contains('San Francisco'))\
    .toPandas()
address_1 address_2 center_classification_date city classification code_info country distribution_pattern event_id initial_firm_notification ... product_type_isnull reason_for_recall_isnull recall_initiation_date_isnull recall_number_isnull recalling_firm_isnull report_date_isnull state_isnull status_isnull termination_date_isnull voluntary_mandated_isnull
0 1 DNA Way 20130122 South San Francisco Class I Lot #: 454138, Exp 07/14 (containing Trastuzum... United States Nationwide 63258 E-Mail ... 0 0 0 0 0 0 0 0 0 0
1 1 DNA Way 20121204 South San Francisco Class II Lot# 936674 Exp. 09/30/13 United States Nationwide 63243 Letter ... 0 0 0 0 0 0 0 0 0 0
2 1 DNA Way 20121204 South San Francisco Class II Lot # 936670 Exp. 09/30/13 United States Nationwide 63243 Letter ... 0 0 0 0 0 0 0 0 0 0
3 1 DNA Way 20121220 South San Francisco Class III Lot #: a) M1365B01, Exp 04/15; b) M1365, Exp 0... United States Nationwide 63874 Letter ... 0 0 0 0 0 0 0 0 0 0
4 801 Irving St 20160706 San Francisco Class II All lots compounded between 03/24/2015 and 03/... United States U.S. Including: CA, HI, NM 73662 Press Release ... 0 0 0 0 0 0 0 0 0 0
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
73 801 Irving St 20160706 San Francisco Class II All lots compounded between 03/24/2015 and 03/... United States U.S. Including: CA, HI, NM 73662 Press Release ... 0 0 0 0 0 0 0 0 0 0
74 801 Irving St 20160706 San Francisco Class II All lots compounded between 03/24/2015 and 03/... United States U.S. Including: CA, HI, NM 73662 Press Release ... 0 0 0 0 0 0 0 0 0 0
75 5000 Shoreline Ct. Ste 200 20130201 South San Francisco Class III Lot #: MA00AD5, Exp: 11/30/2014 United States CA & VA 64151 Letter ... 0 0 0 0 0 0 0 0 0 0
76 1 Dna Way 20171201 South San Francisco Class I Lot# 3128243, 3141239, EXP. 9/30/2018; 3166728... United States Nationwide in the USA 78088 Letter ... 0 0 0 0 0 0 0 0 0 0
77 1 Dna Way 20170405 South San Francisco Class II B1009MC, B1009M9, B1009MA; Exp. 02/18 B1009MT... United States NJ and IL 76726 Letter ... 0 0 0 0 0 0 0 0 0 0

78 rows × 50 columns

Code
(
    df.filter(df.city.isin('San Francisco', 'Los Angeles'))
      .toPandas()
)
address_1 address_2 center_classification_date city classification code_info country distribution_pattern event_id initial_firm_notification ... product_type_isnull reason_for_recall_isnull recall_initiation_date_isnull recall_number_isnull recalling_firm_isnull report_date_isnull state_isnull status_isnull termination_date_isnull voluntary_mandated_isnull
0 1990 Westwood Blvd Ste 135 20170731 Los Angeles Class II Lot # 03022017+44906; BUD 08/29/17 United States There were only one customer in California 77757 Letter ... 0 0 0 0 0 0 0 0 0 0
1 450 N Van Ness Ave Apt 107 20180411 Los Angeles Class I UPC # 891656002209, exp date 12/31/2021 United States Product was distributed online via eBay. 79476 E-Mail ... 0 0 0 0 0 0 0 0 0 0
2 801 Irving St 20160706 San Francisco Class II All lots compounded between 03/24/2015 and 03/... United States U.S. Including: CA, HI, NM 73662 Press Release ... 0 0 0 0 0 0 0 0 0 0
3 801 Irving St 20160706 San Francisco Class II All lots compounded between 03/24/2015 and 03/... United States U.S. Including: CA, HI, NM 73662 Press Release ... 0 0 0 0 0 0 0 0 0 0
4 801 Irving St 20160706 San Francisco Class II All lots compounded between 03/24/2015 and 03/... United States U.S. Including: CA, HI, NM 73662 Press Release ... 0 0 0 0 0 0 0 0 0 0
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
70 801 Irving St 20160706 San Francisco Class II All lots compounded between 03/24/2015 and 03/... United States U.S. Including: CA, HI, NM 73662 Press Release ... 0 0 0 0 0 0 0 0 0 0
71 801 Irving St 20160706 San Francisco Class II All lots compounded between 03/24/2015 and 03/... United States U.S. Including: CA, HI, NM 73662 Press Release ... 0 0 0 0 0 0 0 0 0 0
72 154 W 131st St 20170524 Los Angeles Class I All lots within expiry United States Nationwide in the USA and Puerto Rico. 77009 Press Release ... 0 0 0 0 0 0 0 0 0 0
73 154 W 131st St 20170524 Los Angeles Class I All lots within expiry United States Nationwide in the USA and Puerto Rico. 77009 Press Release ... 0 0 0 0 0 0 0 0 0 0
74 304 E 11th St 20180313 Los Angeles Class I Lot #: MFD:10.15.2017, Exp.10/14/2019. United States Product was distributed in California to onlin... 79327 Press Release ... 0 0 0 0 0 0 0 0 0 0

75 rows × 50 columns

Filtering by Date Values

In addition to filtering by strings, we can also filter by columns where the values are stored as dates or datetimes (or strings that can be inferred as dates). Perhaps the most useful way to filter dates is by using the between() method, which allows us to find results within a certain date range. Here we find all the results which were reported in the years 2013 and 2014:

Code
( 
    df
        .filter(df.city == "South San Francisco")
        .filter(df.report_date.between('2013-01-01 00:00:00','2015-03-11 00:00:00'))
        .toPandas()
)
address_1 address_2 center_classification_date city classification code_info country distribution_pattern event_id initial_firm_notification ... product_type_isnull reason_for_recall_isnull recall_initiation_date_isnull recall_number_isnull recalling_firm_isnull report_date_isnull state_isnull status_isnull termination_date_isnull voluntary_mandated_isnull
0 1 DNA Way 20130122 South San Francisco Class I Lot #: 454138, Exp 07/14 (containing Trastuzum... United States Nationwide 63258 E-Mail ... 0 0 0 0 0 0 0 0 0 0
1 5000 Shoreline Ct. Ste 200 20130201 South San Francisco Class III Lot #: MA00AD5, Exp: 11/30/2014 United States CA & VA 64151 Letter ... 0 0 0 0 0 0 0 0 0 0

2 rows × 50 columns

Caution

Is Spark smart enough to understand that the string in column report_date contains a date?

Code
df.filter(df.city == "South San Francisco")\
    .filter(df.center_classification_date.between('2013-01-01 00:00:00','2013-12-31 00:00:00'))\
    .toPandas()
address_1 address_2 center_classification_date city classification code_info country distribution_pattern event_id initial_firm_notification ... product_type_isnull reason_for_recall_isnull recall_initiation_date_isnull recall_number_isnull recalling_firm_isnull report_date_isnull state_isnull status_isnull termination_date_isnull voluntary_mandated_isnull

0 rows × 50 columns

Code
df_dates = df.select([c for c in df.columns if c.endswith("date")])

df_dates.printSchema()
root
 |-- center_classification_date: string (nullable = true)
 |-- recall_initiation_date: string (nullable = true)
 |-- report_date: string (nullable = true)
 |-- termination_date: string (nullable = true)
Code
df_dates.show(5)
+--------------------------+----------------------+-----------+----------------+
|center_classification_date|recall_initiation_date|report_date|termination_date|
+--------------------------+----------------------+-----------+----------------+
|                  20121025|              20120904|   20121031|        20141007|
|                  20121025|              20120904|   20121031|        20141007|
|                  20121106|              20121015|   20121114|        20130325|
|                  20121220|              20121204|   20121226|        20140429|
|                  20121231|              20120926|   20130109|        20161007|
+--------------------------+----------------------+-----------+----------------+
only showing top 5 rows

Columns are not dates (DateType) but strings (StringType). When comparing report_date with '2013-01-01 00:00:00' and '2015-03-11 00:00:00', we are comparing strings and are lucky enough that in unicode '-' < '0' < '...' < '9' so that 2013-.... is less that any string starting with 20130..., while any string starting with 2013... is less than any string starting with 2015...

Caution

If some field in a Json string is meant to represent a date or a datetime object, spark should be given a hint.

Json loaders (from Python) as well as the Spark Json reader have optional arguments that can be used to indicate the date parser to be used.

Note

We have to tell the json loader about two things:

  1. which columns should be read as dates
  2. which format should be used for those columns

The first point can be settled using the schema argument of .json() method (see Documentation)

Code
ze_schema = df.schema 

list_fields = []

for f in ze_schema.fields:
  if f.name.endswith('date'):
    list_fields.append(StructField(f.name, DateType(), True))
  else:
    list_fields.append(f)

ze_schema = StructType(list_fields)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[67], line 7
      5 for f in ze_schema.fields:
      6   if f.name.endswith('date'):
----> 7     list_fields.append(StructField(f.name, DateType(), True))
      8   else:
      9     list_fields.append(f)

NameError: name 'DateType' is not defined
Code
# Alternative syntax using a dictionary of options
options = {
    "dateFormat": "yyyyMMdd",
    "multiLine": "true"
}

df = (
    spark.read
        .options(**options)
        .json(filename, ze_schema)
)
Code
df.select([c for c in df.columns if c.endswith("date")]).printSchema()
root
 |-- center_classification_date: string (nullable = true)
 |-- recall_initiation_date: string (nullable = true)
 |-- report_date: string (nullable = true)
 |-- termination_date: string (nullable = true)
Code
(
df.filter(df.city == "South San Francisco")
  .filter(df.center_classification_date.between('2013-01-01 00:00:00','2013-12-31 00:00:00'))
  .toPandas()
)
address_1 address_2 center_classification_date city classification code_info country distribution_pattern event_id initial_firm_notification ... product_type_isnull reason_for_recall_isnull recall_initiation_date_isnull recall_number_isnull recalling_firm_isnull report_date_isnull state_isnull status_isnull termination_date_isnull voluntary_mandated_isnull

0 rows × 50 columns

Handling complex types

Bridging the gap between tabular and semi-structured data.

Note

SQL, R, Pandas

struct, array, map

Code
# struct

The problems we faced after loading data from the json file pertained to the fact that column fda was of complex StrucType() type. We shall revisit this dataframe.

Code
df = spark.read.json(filename, multiLine=True)

The dataframe schema df.schema which is of type StructType (defined in pyspark.sql.types) can be converted to a json string which in turn can be converted into a Python dictionary.

Code
df = spark.read.json(filename, multiLine=True)

sj = json.loads(df.schema.json())

We equip the dataframe with a primary key

Code
from pyspark.sql import Window

w = Window.orderBy(col("center_classification_date"))

df = (
  df
    .withColumn("row_id", fn.row_number().over(w))
)
Code
[(f['name'], f['type'])  
    for f in sj['fields'] if not isinstance(f['type'], str)]
[('openfda',
  {'fields': [{'metadata': {},
     'name': 'application_number',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'brand_name',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'generic_name',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'is_original_packager',
     'nullable': True,
     'type': {'containsNull': True,
      'elementType': 'boolean',
      'type': 'array'}},
    {'metadata': {},
     'name': 'manufacturer_name',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'nui',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'original_packager_product_ndc',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'package_ndc',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'pharm_class_cs',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'pharm_class_epc',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'pharm_class_moa',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'pharm_class_pe',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'product_ndc',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'product_type',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'route',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'rxcui',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'spl_id',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'spl_set_id',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'substance_name',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'unii',
     'nullable': True,
     'type': {'containsNull': True, 'elementType': 'string', 'type': 'array'}},
    {'metadata': {},
     'name': 'upc',
     'nullable': True,
     'type': {'containsNull': True,
      'elementType': 'string',
      'type': 'array'}}],
   'type': 'struct'})]

Column openfda has type StrucType() with fields with composite type.

Code
{f.dataType  for f in df.schema.fields if not f.dataType==StringType()}
{IntegerType(),
 StructType([StructField('application_number', ArrayType(StringType(), True), True), StructField('brand_name', ArrayType(StringType(), True), True), StructField('generic_name', ArrayType(StringType(), True), True), StructField('is_original_packager', ArrayType(BooleanType(), True), True), StructField('manufacturer_name', ArrayType(StringType(), True), True), StructField('nui', ArrayType(StringType(), True), True), StructField('original_packager_product_ndc', ArrayType(StringType(), True), True), StructField('package_ndc', ArrayType(StringType(), True), True), StructField('pharm_class_cs', ArrayType(StringType(), True), True), StructField('pharm_class_epc', ArrayType(StringType(), True), True), StructField('pharm_class_moa', ArrayType(StringType(), True), True), StructField('pharm_class_pe', ArrayType(StringType(), True), True), StructField('product_ndc', ArrayType(StringType(), True), True), StructField('product_type', ArrayType(StringType(), True), True), StructField('route', ArrayType(StringType(), True), True), StructField('rxcui', ArrayType(StringType(), True), True), StructField('spl_id', ArrayType(StringType(), True), True), StructField('spl_set_id', ArrayType(StringType(), True), True), StructField('substance_name', ArrayType(StringType(), True), True), StructField('unii', ArrayType(StringType(), True), True), StructField('upc', ArrayType(StringType(), True), True)])}
Code
{f['type']['type']
    for f in sj['fields'] if not isinstance(f['type'], str)}
{'struct'}

Projecting on row_id and openfda.* leads to a (partially) flattened datafame, that, thanks to the row_id column can be joined with the original dataframe.

Code
df_proj = df.select('row_id', 'openfda.*')

df_proj.printSchema()
root
 |-- row_id: integer (nullable = false)
 |-- application_number: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- brand_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- generic_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- is_original_packager: array (nullable = true)
 |    |-- element: boolean (containsNull = true)
 |-- manufacturer_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- nui: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- original_packager_product_ndc: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- package_ndc: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- pharm_class_cs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- pharm_class_epc: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- pharm_class_moa: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- pharm_class_pe: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- product_ndc: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- product_type: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- route: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rxcui: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- spl_id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- spl_set_id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- substance_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- unii: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- upc: array (nullable = true)
 |    |-- element: string (containsNull = true)

We can inspect the length of the arrays.

Code
# array
df_proj.select(
    fn.max(fn.size(col("application_number"))).alias("Max"), 
    fn.min(fn.size(col("application_number"))).alias("min"), 
    fn.avg(fn.size(col("application_number"))).alias("Mean")).show(1)
+---+---+-------------------+
|Max|min|               Mean|
+---+---+-------------------+
|  2| -1|-0.7380446333687567|
+---+---+-------------------+

In some rows, the size of the array is -1 because the field is NULL.

Code
(
  df_proj
    .where(fn.size(col("application_number"))>1)
    .select("row_id")
    .show(5)
)
25/04/03 15:09:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+------+
|row_id|
+------+
|  7159|
|  7344|
+------+

An array column can be exploded. This is like pivoting into long form. The result contains one row per item in the array.

Code
(
  df_proj
    .select('row_id', 'application_number')
    .withColumn("exploded", 
                fn.explode(col("application_number")))
    .select('row_id', 'exploded')
    .groupBy('row_id')
    .agg(fn.count('exploded').alias("n_lignes"))
    .where("n_lignes > 1")
    .show(5)
)
25/04/03 15:09:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+------+--------+
|row_id|n_lignes|
+------+--------+
|  7159|       2|
|  7344|       2|
+------+--------+