This notebook is concerned with JSON a format that serves many purposes. Just as csv files, json files are important sources and sinks for Spark. As a exchange format, JSON is also a serialization tool for Python and many other languages. JSON provides a way to accomodate semi-structured data in otherwise tabular environments (dataframes and databases tables).
The notebook is organized in the following way:
Serialization/Deserialization of Python builtin types using JSON
Serialization/Deserialization of (some) custom types using JSON
JSON readers and writers for Spark dataframes
Composite types in Spark dataframes
Advanced JSON readers and writers for Spark dataframes
Serialization and deserialization of built-in types
What happens if we feed json.dumps() with a numpy array?
Serialization and deserialization of custom objects
Code
class User(object):"""Custom User Class """def__init__(self, name, age, active, balance, other_names, friends, spouse):self.name = nameself.age = ageself.active = activeself.balance = balanceself.other_names = other_namesself.friends = friendsself.spouse = spousedef__repr__(self): s ="User(" s +="name="+repr(self.name) s +=", age="+repr(self.age) s +=", active="+repr(self.active) s +=", other_names="+repr(self.other_names) s +=", friends="+repr(self.friends) s +=", spouse="+repr(self.spouse) +")"return s
Code
new_user = User( name ="Foo Bar", age =78, friends = ["Jane", "John"], balance =345.80, other_names = ("Doe", "Joe"), active =True, spouse =None)new_user
Code
# This will raise a TypeError# json.dumps(new_user)
As expected, the custom object new_user is not JSON serializable. So let’s build a method that does that for us.
This comes as no surprise to us, since earlier on we observed that the json module only handles the built-in types, and User is not one.
We need to send our user data to a client over a network, so how do we get ourselves out of this error state?
A simple solution would be to convert our custom type into a serializable type that is a built-in type. We can conveniently define a method convert_to_dict() that returns a dictionary representation of our object. json.dumps() takes in a optional argument, default, which specifies a function to be called if the object is not serializable. This function returns a JSON encodable version of the object.
Recall that class obj has a dunder method __dict__ that provides a basis for obtaining a dictionary with the attributes of any object:
Code
new_user.__dict__
Code
def obj_to_dict(obj):"""Converts an object to a dictionary representation of the object including meta-data information about the object's module and class name. Parameters ---------- obj : `object` A python object to be converted into a dictionary representation Returns ------- output : `dict` A dictionary representation of the object """# Add object meta data obj_dict = {"__class__": obj.__class__.__name__,"__module__": obj.__module__ }# Add the object propertiesreturn obj_dict | obj.__dict__
Code
obj_to_dict(new_user)
The function convert_to_dict does the following:
create a dictionary named obj_dict to act as the dict representation of our object.
magic methods __class__.__name__ and __module__ provide crucial metadata on the object: the class name and the module name
add the instance attributes of the object using obj.__dict__ (Python stores instance attributes in a dictionary under the hood)
The resulting obj_dict is now serializable (provided all attributes of our object are).
Now we can comfortably call json.dumps() on the object and pass default=convert_to_dict
Note
Obviously this fails if one of the attributes is not JSON serializable
Now, if we want to decode (deserialiaze) a custom object, and create the correct object type, we need a function that does the opposite of convert_to_dict, since json.loads simply returns a dict:
We need json.loads() to reconstruct a User object from this dictionary: json.loads() takes an optional argument object_hook which specifies a function that returns the desired custom object, given the decoded output (which in this case is a dict).
Code
def dict_to_obj(input_dict):"""Converts a dictionary representation of an object to an instance of the object. Parameters ---------- input_dict : `dict` A dictionary representation of the object, containing "__module__" and "__class__" metadata Returns ------- obj : `object` A python object constructed from the dictionary representation """assert"__class__"in input_dict and"__module__"in input_dict class_name = input_dict.pop("__class__") module_name = input_dict.pop("__module__") module =__import__(module_name) class_ =getattr(module, class_name) obj = class_(**input_dict)return obj
This function does the following:
Extract the class name from the dictionary under the key __class__
Extract the module name from the dictionary under the key __module__
Imports the module and get the class
Instantiate the class by giving to the class constructor all the instance arguments through dictionary unpacking
First, lets look at the data. It’s a large set of JSON records about drugs enforcement.
Code
!head -n 1000 drug-enforcement.json
We need to tell spark that rows span on several lines with the multLine option
Code
df = spark.read.json(filename, multiLine=True)
Code
df.printSchema()
Code
df.schema
Note
This dataset is a little bit of a mess!
This should not be surprising. The data used to populate the Spark dataframe are not classically tabular but what people call semi-structured. Json is well-suited to store, represent, and exchange such data.
In the classical age of tabular data (according to Codd’s principles), a table cell could only hold a scalar value (numeric, logical, text, date, timestamp, …), nowadays Relational Database Management Systems handle Arrays, Composite Types, Range Types, …, and Json (see PostgreSQL).
Spark, R, and Pandas also allow us to work with complex types.
First, there is a nested opendfa dictionary. Each element of the dictionary is an array
A first good idea is to “flatten” the schema of the DataFrame, so that there is no nested types
Flattening of the schema
All the columns in the nested structure openfda are put up in the schema. These columns nested in the openfda are as follows:
Code
df.select('openfda.*').columns
Code
df.select("openfda.*").head(2)
Code
for c in df.select("openfda.*").columns: df = df.withColumn("openfda_"+ c, col("openfda."+ c))
Code
df = df.select([c for c in df.columns if c !="openfda"])
Code
df.printSchema()
Code
df.head(2)
Note that the display of the DataFrame is not as usual… it displays the dataframe like a list of Row, since the columns “openfda*” contain arrays of varying length
Missing data
A strategy can be to remove rows with missing data. dropna has several options, explained below.
Code
df.dropna().count()
If we remove all lines with at least one missing value, we end up with an empty dataframe !
Code
df.dropna(how='all').count()
dropna accepts the following arguments
how: can be 'any' or 'all'. If 'any', rows containing any null values will be dropped entirely (this is the default). If 'all', only rows which are entirely empty will be dropped.
thresh: accepts an integer representing the “threshold” for how many empty cells a row must have before being dropped. tresh is a middle ground between how='any' and how='all'. As a result, the presence of thresh will override how
subset: accepts a list of column names. When a subset is present, N/A values will only be checked against the columns whose names are provided.
But before this, let’s count the number of missing value for each column
Code
# For each column we create a new column containing 1 if the value is null and 0 otherwise.# We need to bast Boolean to Int so that we can use fn.sum afterfor c in df.columns:# Do not do this for _isnull columns (ince case you run this cell twice...)ifnot c.endswith("_isnull"): df = df.withColumn(c +"_isnull", fn.isnull(col(c)).cast('int'))
Code
df.head()
Code
# Get the list of _isnull columnsisnull_columns = [c for c in df.columns if c.endswith("_isnull")]# On the _isnull columns :# - we compute the sum to have the number of null values and rename the column# - convert to pandas for better readability# - transpose the pandas dataframe for better readabilitymissing_values = df.select(isnull_columns)\ .agg(*[fn.sum(c).alias(c.replace("_isnull", "")) for c in isnull_columns])\ .toPandas()missing_values.T\ .rename({0: "missing values"}, axis="columns")
We see that more_code_info is always null and that termination_date if often null. Most of the openfda* columns are also almost always empty.
We can keep only the columns with no missing values
Code
# This line can seem complicated, run pieces of each to understandkept_columns =list( missing_values.columns[(missing_values.iloc[0] ==0).values])
Code
df_kept = df.select(kept_columns)
Code
df_kept.head(2)
Code
df_kept.printSchema()
Code
df_kept.count()
Filtering by string values
Cases from South San Francisco
Code
df.filter(df.city =="South San Francisco")\ .toPandas()
Remark. Once again, we use .toPandas() to pretty format the results in the notebook. But it’s a BAD idea to do this if the spark DataFrame is large, since it requires a collect()
Aside from filtering strings by a perfect match, there are plenty of other powerful ways to filter by strings in pyspark :
df.filter(df.city.contains('San Francisco')): returns rows where strings of a column contain a provided substring. In our example, filtering by rows which contain the substring “San Francisco” would be a good way to get all rows in San Francisco, instead of just “South San Francisco”.
df.filter(df.city.startswith('San')): Returns rows where a string starts with a provided substring.
df.filter(df.city.endswith('ice')): Returns rows where a string starts with a provided substring.
df.filter(df.city.isNull()): Returns rows where values in a provided column are null.
df.filter(df.city.isNotNull()): Opposite of the above.
df.filter(df.city.like('San%')): Performs a SQL-like query containing the LIKE clause.
df.filter(df.city.rlike('[A-Z]*ice$')): Performs a regexp filter.
df.filter(df.city.isin('San Francisco', 'Los Angeles')): Looks for rows where the string value of a column matches any of the provided strings exactly.
In addition to filtering by strings, we can also filter by columns where the values are stored as dates or datetimes (or strings that can be inferred as dates). Perhaps the most useful way to filter dates is by using the between() method, which allows us to find results within a certain date range. Here we find all the results which were reported in the years 2013 and 2014:
Code
df.filter(df.city =="South San Francisco")\ .filter(df.report_date.between('2013-01-01 00:00:00','2015-03-11 00:00:00'))\ .toPandas()
Caution
Is Spark smart enough to understand that the string in column report_date contains a date?
Code
df.filter(df.city =="South San Francisco")\ .filter(df.center_classification_date.between('2013-01-01 00:00:00','2013-12-31 00:00:00'))\ .toPandas()
Code
df_dates = df.select([c for c in df.columns if c.endswith("date")])df_dates.printSchema()
Code
df_dates.show(5)
Columns are not dates (DateType) but strings (StringType). When comparing report_date with '2013-01-01 00:00:00' and '2015-03-11 00:00:00', we are comparing strings and are lucky enough that in unicode '-' < '0' < '...' < '9' so that 2013-.... is less that any string starting with 20130..., while any string starting with 2013... is less than any string starting with 2015...
Caution
If some field in a Jason string is meant to represent a date or a datetime object, spark should be given a hint.
Json loaders (from Python) as well as the Spark Json reader have optional arguments that can be used to indicate the date parser to be used.
Handling complex types
Bridging the gap between tabular and semi-structured data.
Note
SQL, R, Pandas …
struct, array, map
Code
# struct
The problems we faced after loading data from the json file pertained to the fact that column fda was of complex StrucType() type. We shall revisit this dataframe.
Code
df = spark.read.json(filename, multiLine=True)
The dataframe schema df.schema which is of type StructType (defined in pyspark.sql.types) can be converted to a json string which in turn can be converted into a Python dictionary.
[(f['name'], f['type']) for f in sj['fields'] ifnotisinstance(f['type'], str)]
Column openfda has type StrucType() with fields with composite type.
Code
{f['type']['type']for f in sj['fields'] ifnotisinstance(f['type'], str)}
Projecting on row_id and openfda.* leads to a (partially) flattened datafame, that, thanks to the row_id column can be joined with the original dataframe.