Using with pyspark for data preprocessing

Data description

The data is a parquet file which contains a dataframe with 8 columns:

  • xid: unique user id
  • action: type of action. ‘C’ is a click, ‘O’ or ‘VSL’ is a web-display
  • date: date of the action
  • website_id: unique id of the website
  • url: url of the webpage
  • category_id: id of the display
  • zipcode: postal zipcode of the user
  • device: type of device used by the user

Q1. Some statistics / computations

Using pyspark.sql we want to do the following things:

  1. Compute the total number of unique users
  2. Construct a column containing the total number of actions per user
  3. Construct a column containing the number of days since the last action of the user
  4. Construct a column containing the number of actions of each user for each modality of device

Q2. Binary classification

Then, we want to construct a classifier to predict the click on the category 1204. Here is an agenda for this:

  1. Construction of a features matrix for which each line corresponds to the information concerning a user.
  2. In this matrix, we need to keep only the users that have been exposed to the display in category 1204
  3. Using this training dataset, train a binary classifier, and evaluate your classifier using a precision / recall curve computed on test data.

Download/read the data and a first look at the data

Code
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
Spark in local mode
Code
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

spark = (SparkSession
    .builder
    .appName("Spark Webdata")
    .getOrCreate()
)
25/01/18 14:18:40 WARN Utils: Your hostname, boucheron-Precision-5480 resolves to a loopback address: 127.0.1.1; using 192.168.10.120 instead (on interface wlp0s20f3)
25/01/18 14:18:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/18 14:18:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Code
import requests, zipfile, io
from pathlib import Path

path = Path('webdata.parquet')
if not path.exists():
    url = "https://stephanegaiffas.github.io/big_data_course/data/webdata.parquet.zip"
    r = requests.get(url)
    z = zipfile.ZipFile(io.BytesIO(r.content))
    z.extractall(path='./')
---------------------------------------------------------------------------
BadZipFile                                Traceback (most recent call last)
Cell In[3], line 8
      6 url = "https://stephanegaiffas.github.io/big_data_course/data/webdata.parquet.zip"
      7 r = requests.get(url)
----> 8 z = zipfile.ZipFile(io.BytesIO(r.content))
      9 z.extractall(path='./')

File /usr/lib/python3.12/zipfile/__init__.py:1349, in ZipFile.__init__(self, file, mode, compression, allowZip64, compresslevel, strict_timestamps, metadata_encoding)
   1347 try:
   1348     if mode == 'r':
-> 1349         self._RealGetContents()
   1350     elif mode in ('w', 'x'):
   1351         # set the modified flag so central directory gets written
   1352         # even if no files are added to the archive
   1353         self._didModify = True

File /usr/lib/python3.12/zipfile/__init__.py:1416, in ZipFile._RealGetContents(self)
   1414     raise BadZipFile("File is not a zip file")
   1415 if not endrec:
-> 1416     raise BadZipFile("File is not a zip file")
   1417 if self.debug > 1:
   1418     print(endrec)

BadZipFile: File is not a zip file
Code
input_path = './'

input_file = os.path.join(input_path, 'webdata.parquet')

df = spark.read.parquet(input_file)
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
Cell In[4], line 5
      1 input_path = './'
      3 input_file = os.path.join(input_path, 'webdata.parquet')
----> 5 df = spark.read.parquet(input_file)

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:544, in DataFrameReader.parquet(self, *paths, **options)
    533 int96RebaseMode = options.get("int96RebaseMode", None)
    534 self._set_opts(
    535     mergeSchema=mergeSchema,
    536     pathGlobFilter=pathGlobFilter,
   (...)
    541     int96RebaseMode=int96RebaseMode,
    542 )
--> 544 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw)
    181 converted = convert_exception(e.java_exception)
    182 if not isinstance(converted, UnknownException):
    183     # Hide where the exception came from that shows a non-Pythonic
    184     # JVM exception message.
--> 185     raise converted from None
    186 else:
    187     raise

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/boucheron/Documents/IFEBY310/core/notebooks/webdata.parquet.
Note

We can also give a try to pyarrow.parquet module to load the Parquet file in an Arrow table.

Code
import pyarrow as pa
import comet    as co
import pyarrow.parquet as pq

dfa = pq.read_table(input_file)
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[5], line 2
      1 import pyarrow as pa
----> 2 import comet    as co
      3 import pyarrow.parquet as pq
      5 dfa = pq.read_table(input_file)

ModuleNotFoundError: No module named 'comet'
Code
dfa.num_columns
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[6], line 1
----> 1 dfa.num_columns

NameError: name 'dfa' is not defined
Warning

Let us go back to the spark data frame

Code
df.printSchema()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[7], line 1
----> 1 df.printSchema()

NameError: name 'df' is not defined
Code
df.rdd.getNumPartitions()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[8], line 1
----> 1 df.rdd.getNumPartitions()

NameError: name 'df' is not defined
Question

Explain the partition size.

Code
df.rdd.toDebugString()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[9], line 1
----> 1 df.rdd.toDebugString()

NameError: name 'df' is not defined

Basic statistics

First we need to import some things:

  • Window class
  • SQL functions module
  • Some very useful functions
  • Spark types
Code
from pyspark.sql import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql.functions import col, lit

Compute the total number of unique users

Code
( 
    df.select('xid')
      .distinct()
      .count()
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[11], line 2
      1 ( 
----> 2     df.select('xid')
      3       .distinct()
      4       .count()
      5 )

NameError: name 'df' is not defined
Code
def foo(x): yield len(set(x))
Code
( df.rdd
    .map(lambda x : x.xid)
    .mapPartitions(foo)
    .collect()
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[13], line 1
----> 1 ( df.rdd
      2     .map(lambda x : x.xid)
      3     .mapPartitions(foo)
      4     .collect()
      5 )

NameError: name 'df' is not defined

This might pump up some computational resources

Code
( 
    df.select('xid')
      .distinct() 
      .explain()
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[14], line 2
      1 ( 
----> 2     df.select('xid')
      3       .distinct() 
      4       .explain()
      5 )

NameError: name 'df' is not defined
Note

The distinct values of xid seem to be evenly spread among the six files making the parquet directory. Note that the last six partitions look empty.

Construct a column containing the total number of actions per user

Code
xid_partition = Window.partitionBy('xid')

n_events = func.count(col('action')).over(xid_partition)

df = df.withColumn('n_events', n_events)

df.head(n=2)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[15], line 5
      1 xid_partition = Window.partitionBy('xid')
      3 n_events = func.count(col('action')).over(xid_partition)
----> 5 df = df.withColumn('n_events', n_events)
      7 df.head(n=2)

NameError: name 'df' is not defined
Code
( 
  df
    .groupBy('xid')
    .agg(func.count('action'))
    .head(5)
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[16], line 2
      1 ( 
----> 2   df
      3     .groupBy('xid')
      4     .agg(func.count('action'))
      5     .head(5)
      6 )

NameError: name 'df' is not defined

Construct a column containing the number of days since the last action of the user

Code
max_date = (
  func
    .max(col('date'))
    .over(xid_partition)
)

n_days_since_last_event = func.datediff(func.current_date(), max_date)

df = df.withColumn('n_days_since_last_event',
                   n_days_since_last_event)

df.head(n=2)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[17], line 9
      1 max_date = (
      2   func
      3     .max(col('date'))
      4     .over(xid_partition)
      5 )
      7 n_days_since_last_event = func.datediff(func.current_date(), max_date)
----> 9 df = df.withColumn('n_days_since_last_event',
     10                    n_days_since_last_event)
     12 df.head(n=2)

NameError: name 'df' is not defined
Code
df.printSchema()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[18], line 1
----> 1 df.printSchema()

NameError: name 'df' is not defined

Construct a column containing the number of actions of each user for each modality of device

Does this partitionBy triggers shuffling?

Code
xid_device_partition = xid_partition.partitionBy('device')

n_events_per_device = func.count(col('action')).over(xid_device_partition)

df = df.withColumn('n_events_per_device', n_events_per_device)

df.head(n=2)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[19], line 5
      1 xid_device_partition = xid_partition.partitionBy('device')
      3 n_events_per_device = func.count(col('action')).over(xid_device_partition)
----> 5 df = df.withColumn('n_events_per_device', n_events_per_device)
      7 df.head(n=2)

NameError: name 'df' is not defined

Number of devices per user

Code
# xid_partition = Window.partitionBy('xid')

rank_device = (
  func
    .dense_rank()
    .over(xid_partition.orderBy('device'))
)

n_unique_device = (
    func
      .last(rank_device)
      .over(xid_partition)
)

df = df.withColumn('n_device', n_unique_device)

df.head(n=2)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[20], line 15
      3 rank_device = (
      4   func
      5     .dense_rank()
      6     .over(xid_partition.orderBy('device'))
      7 )
      9 n_unique_device = (
     10     func
     11       .last(rank_device)
     12       .over(xid_partition)
     13 )
---> 15 df = df.withColumn('n_device', n_unique_device)
     17 df.head(n=2)

NameError: name 'df' is not defined
Code
df\
    .where(col('n_device') > 1)\
    .select('xid', 'device', 'n_events',  'n_device', 'n_events_per_device')\
    .head(n=8)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[21], line 1
----> 1 df\
      2     .where(col('n_device') > 1)\
      3     .select('xid', 'device', 'n_events',  'n_device', 'n_events_per_device')\
      4     .head(n=8)

NameError: name 'df' is not defined
Code
df\
    .where(col('n_device') > 1)\
    .select('xid', 'device', 'n_events',  'n_device', 'n_events_per_device')\
    .count()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[22], line 1
----> 1 df\
      2     .where(col('n_device') > 1)\
      3     .select('xid', 'device', 'n_events',  'n_device', 'n_events_per_device')\
      4     .count()

NameError: name 'df' is not defined

Let’s select the correct users and build a training dataset

We construct a ETL (Extract Transform Load) process on this data using the pyspark.sql API.

Extraction

Here extraction is just about reading the data

Code
df = spark.read.parquet(input_file)
df.head(n=3)
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
Cell In[23], line 1
----> 1 df = spark.read.parquet(input_file)
      2 df.head(n=3)

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:544, in DataFrameReader.parquet(self, *paths, **options)
    533 int96RebaseMode = options.get("int96RebaseMode", None)
    534 self._set_opts(
    535     mergeSchema=mergeSchema,
    536     pathGlobFilter=pathGlobFilter,
   (...)
    541     int96RebaseMode=int96RebaseMode,
    542 )
--> 544 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw)
    181 converted = convert_exception(e.java_exception)
    182 if not isinstance(converted, UnknownException):
    183     # Hide where the exception came from that shows a non-Pythonic
    184     # JVM exception message.
--> 185     raise converted from None
    186 else:
    187     raise

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/boucheron/Documents/IFEBY310/core/notebooks/webdata.parquet.

Transformation of the data

At this step we compute a lot of extra things from the data. The aim is to build features that describe users.

Code
def n_events_transformer(df):
    xid_partition = Window.partitionBy('xid')
    n_events = func.count(col('action')).over(xid_partition)
    
    df = df.withColumn('n_events', n_events)

    return df
Code
def n_events_per_action_transformer(df):
    xid_action_partition = Window.partitionBy('xid', 'action')
    n_events_per_action = func.count(col('action')).over(xid_action_partition)

    df = df.withColumn('n_events_per_action', n_events_per_action)
    
    return df
Code
def hour_transformer(df):
    hour = func.hour(col('date'))
    df = df.withColumn('hour', hour)
    return df

def weekday_transformer(df):
    weekday = func.date_format(col('date'), 'EEEE')
    df = df.withColumn('weekday', weekday)
    return df

def n_events_per_hour_transformer(df):
    xid_hour_partition = Window.partitionBy('xid', 'hour')
    n_events_per_hour = func.count(col('action')).over(xid_hour_partition)
    df = df.withColumn('n_events_per_hour', n_events_per_hour)
    return df

def n_events_per_weekday_transformer(df):
    xid_weekday_partition = Window.partitionBy('xid', 'weekday')
    n_events_per_weekday = func.count(col('action')).over(xid_weekday_partition)
    df = df.withColumn('n_events_per_weekday', n_events_per_weekday)
    return df

def n_days_since_last_event_transformer(df):
    xid_partition = Window.partitionBy('xid')
    max_date = func.max(col('date')).over(xid_partition)
    n_days_since_last_event = func.datediff(func.current_date(), max_date)
    df = df.withColumn('n_days_since_last_event',
                       n_days_since_last_event + lit(0.1))
    return df

def n_days_since_last_action_transformer(df):
    xid_partition_action = Window.partitionBy('xid', 'action')
    max_date = func.max(col('date')).over(xid_partition_action)
    n_days_since_last_action = func.datediff(func.current_date(),
                                                        max_date)
    df = df.withColumn('n_days_since_last_action',
                       n_days_since_last_action + lit(0.1))
    return df

def n_unique_day_transformer(df):
    xid_partition = Window.partitionBy('xid')
    dayofyear = func.dayofyear(col('date'))
    rank_day = func.dense_rank().over(xid_partition.orderBy(dayofyear))
    n_unique_day = func.last(rank_day).over(xid_partition)
    df = df.withColumn('n_unique_day', n_unique_day)
    return df

def n_unique_hour_transformer(df):
    xid_partition = Window.partitionBy('xid')
    rank_hour = func.dense_rank().over(xid_partition.orderBy('hour'))
    n_unique_hour = func.last(rank_hour).over(xid_partition)
    df = df.withColumn('n_unique_hour', n_unique_hour)
    return df

def n_events_per_device_transformer(df):
    xid_device_partition = Window.partitionBy('xid', 'device')
    n_events_per_device = func.count(func.col('device')) \
        .over(xid_device_partition)
    df = df.withColumn('n_events_per_device', n_events_per_device)
    return df

def n_unique_device_transformer(df):
    xid_partition = Window.partitionBy('xid')
    rank_device = func.dense_rank().over(xid_partition.orderBy('device'))
    n_unique_device = func.last(rank_device).over(xid_partition)
    df = df.withColumn('n_device', n_unique_device)
    return df

def n_actions_per_category_id_transformer(df):
    xid_category_id_partition = Window.partitionBy('xid', 'category_id',
                                                   'action')
    n_actions_per_category_id = func.count(func.col('action')) \
        .over(xid_category_id_partition)
    df = df.withColumn('n_actions_per_category_id', n_actions_per_category_id)
    return df

def n_unique_category_id_transformer(df):
    xid_partition = Window.partitionBy('xid')
    rank_category_id = func.dense_rank().over(xid_partition\
                                              .orderBy('category_id'))
    n_unique_category_id = func.last(rank_category_id).over(xid_partition)
    df = df.withColumn('n_unique_category_id', n_unique_category_id)
    return df

def n_events_per_category_id_transformer(df):
    xid_category_id_partition = Window.partitionBy('xid', 'category_id')
    n_events_per_category_id = func.count(func.col('action')) \
        .over(xid_category_id_partition)
    df = df.withColumn('n_events_per_category_id', n_events_per_category_id)
    return df

def n_events_per_website_id_transformer(df):
    xid_website_id_partition = Window.partitionBy('xid', 'website_id')
    n_events_per_website_id = func.count(col('action'))\
        .over(xid_website_id_partition)
    df = df.withColumn('n_events_per_website_id', n_events_per_website_id)
    return df
Code
transformers = [
    hour_transformer,
    weekday_transformer,
    n_events_per_hour_transformer,
    n_events_per_weekday_transformer,
    n_days_since_last_event_transformer,
    n_days_since_last_action_transformer,
    n_unique_day_transformer,
    n_unique_hour_transformer,
    n_events_per_device_transformer,
    n_unique_device_transformer,
    n_actions_per_category_id_transformer,
    n_events_per_category_id_transformer,
    n_events_per_website_id_transformer,
]
Code
N = 10000
Code
sample_df = df.sample(withReplacement=False, fraction=.05)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[29], line 1
----> 1 sample_df = df.sample(withReplacement=False, fraction=.05)

NameError: name 'df' is not defined
Code
sample_df.count()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[30], line 1
----> 1 sample_df.count()

NameError: name 'sample_df' is not defined
Code
for transformer in transformers:
    df = transformer(df)

df.head(n=1)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[31], line 2
      1 for transformer in transformers:
----> 2     df = transformer(df)
      4 df.head(n=1)

NameError: name 'df' is not defined
Code
for transformer in transformers:
    sample_df = transformer(sample_df)

sample_df.head(n=1)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[32], line 2
      1 for transformer in transformers:
----> 2     sample_df = transformer(sample_df)
      4 sample_df.head(n=1)

NameError: name 'sample_df' is not defined
Code
df = sample_df
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[33], line 1
----> 1 df = sample_df

NameError: name 'sample_df' is not defined
Code
sorted(df.columns)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[34], line 1
----> 1 sorted(df.columns)

NameError: name 'df' is not defined
Code
df.explain()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[35], line 1
----> 1 df.explain()

NameError: name 'df' is not defined
Code
spark._sc.setCheckpointDir(".")   

df.checkpoint()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[36], line 3
      1 spark._sc.setCheckpointDir(".")   
----> 3 df.checkpoint()

NameError: name 'df' is not defined
Code
df.explain()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[37], line 1
----> 1 df.explain()

NameError: name 'df' is not defined

Load step

Here, we use all the previous computations (saved in the columns of the dataframe) to compute aggregated informations about each user.

Note

This should be DRYED

Code
def n_events_per_hour_loader(df):
    csr = df\
        .select('xid', 'hour', 'n_events_per_hour')\
        .withColumnRenamed('n_events_per_hour', 'value')\
        .distinct() 
            # action
    feature_name = func.concat(lit('n_events_per_hour#'), col('hour'))

    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('hour')
    return csr

def n_events_per_website_id_loader(df):
    csr = df.select('xid', 'website_id', 'n_events_per_website_id')\
        .withColumnRenamed('n_events_per_hour', 'value')\
        .distinct()

    feature_name = func.concat(lit('n_events_per_website_id#'),
                               col('website_id'))
    
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('website_id')
    return csr

def n_events_per_hour_loader(df):
    csr = df\
        .select('xid', 'hour', 'n_events_per_hour')\
        .withColumnRenamed('n_events_per_hour', 'value')\
        .distinct()

    feature_name = func.concat(lit('n_events_per_hour#'), col('hour'))
    
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('hour')
    return csr

def n_events_per_weekday_loader(df):
    csr = df\
        .select('xid', 'weekday', 'n_events_per_weekday')\
        .withColumnRenamed('n_events_per_weekday', 'value')\
        .distinct()

    feature_name = func.concat(lit('n_events_per_weekday#'), col('weekday'))
    
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('weekday')

    return csr

def n_days_since_last_event_loader(df):
    csr = df.select('xid',  'n_days_since_last_event')\
        .withColumnRenamed('n_days_since_last_event', 'value')\
        .distinct()
    feature_name = lit('n_days_since_last_event')
    csr = csr\
        .withColumn('feature_name', feature_name)
    return csr

def n_days_since_last_action_loader(df):
    csr = df.select('xid', 'action', 'n_days_since_last_action')\
        .withColumnRenamed('n_days_since_last_action', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_days_since_last_action#'), col('action'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('action')
    return csr

def n_unique_day_loader(df):
    csr = df.select('xid', 'n_unique_day')\
        .withColumnRenamed('n_unique_day', 'value')\
        .distinct()
    feature_name = lit('n_unique_day')
    csr = csr\
        .withColumn('feature_name', feature_name)
    return csr

def n_unique_hour_loader(df):
    csr = df.select('xid', 'n_unique_hour')\
        .withColumnRenamed('n_unique_hour', 'value')\
        .distinct()
    feature_name = lit('n_unique_hour')
    csr = csr\
        .withColumn('feature_name', feature_name)
    return csr

def n_events_per_device_loader(df):
    csr = df\
        .select('xid', 'device', 'n_events_per_device')\
        .withColumnRenamed('n_events_per_device', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_device#'), col('device'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('device')
    return csr

def n_unique_device_loader(df):
    csr = df.select('xid', 'n_device')\
        .withColumnRenamed('n_device', 'value')\
        .distinct()
    feature_name = lit('n_device')
    csr = csr\
        .withColumn('feature_name', feature_name)
    return csr

def n_events_per_category_id_loader(df):
    csr = df.select('xid', 'category_id', 'n_events_per_category_id')\
        .withColumnRenamed('n_events_per_category_id', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_category_id#'),
                               col('category_id'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('category_id')
    return csr

def n_actions_per_category_id_loader(df):
    csr = df.select('xid', 'category_id', 'action', 'n_actions_per_category_id')\
        .withColumnRenamed('n_actions_per_category_id', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_actions_per_category_id#'),
                               col('action'), lit('#'), 
                               col('category_id'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('category_id')\
        .drop('action')
    return csr

def n_events_per_website_id_loader(df):
    csr = df.select('xid', 'website_id', 'n_events_per_website_id')\
        .withColumnRenamed('n_events_per_website_id', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_website_id#'),
                               col('website_id'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('website_id')
    return csr
Code
from functools import reduce
Code
loaders = [
    n_events_per_hour_loader,
    n_events_per_website_id_loader,
    n_events_per_hour_loader,
    n_events_per_weekday_loader,
    n_days_since_last_event_loader,
    n_days_since_last_action_loader,
    n_unique_day_loader,
    n_unique_hour_loader,
    n_events_per_device_loader,
    n_unique_device_loader,
    n_events_per_category_id_loader,
    n_actions_per_category_id_loader,
    n_events_per_website_id_loader,
]
Code
def union(df, other):
    return df.union(other)
About DataFrame.union()

This method performs a SQL-style set union of the rows from both DataFrame objects, with no automatic deduplication of elements.

Use the distinct() method to perform deduplication of rows.

The method resolves columns by position (not by name), following the standard behavior in SQL.

Code
spam = [loader(df) for loader in loaders]
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[42], line 1
----> 1 spam = [loader(df) for loader in loaders]

NameError: name 'df' is not defined
Code
spam[0].printSchema()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[43], line 1
----> 1 spam[0].printSchema()

NameError: name 'spam' is not defined
Code
all(spam[0].columns == it.columns for it in spam[1:])
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[44], line 1
----> 1 all(spam[0].columns == it.columns for it in spam[1:])

NameError: name 'spam' is not defined
Code
len(spam)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[45], line 1
----> 1 len(spam)

NameError: name 'spam' is not defined
Code
csr = reduce(
    lambda df1, df2: df1.union(df2),
    spam
)

csr.head(n=3)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[46], line 3
      1 csr = reduce(
      2     lambda df1, df2: df1.union(df2),
----> 3     spam
      4 )
      6 csr.head(n=3)

NameError: name 'spam' is not defined
Code
csr.columns
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[47], line 1
----> 1 csr.columns

NameError: name 'csr' is not defined
Code
csr.show(5)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[48], line 1
----> 1 csr.show(5)

NameError: name 'csr' is not defined
Code
csr.rdd.getNumPartitions()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[49], line 1
----> 1 csr.rdd.getNumPartitions()

NameError: name 'csr' is not defined
Code
# Replace features names and xid by a unique number
feature_name_partition = Window().orderBy('feature_name')

xid_partition = Window().orderBy('xid')

col_idx = func.dense_rank().over(feature_name_partition)
row_idx = func.dense_rank().over(xid_partition)
Code
csr = csr.withColumn('col', col_idx)\
    .withColumn('row', row_idx)

csr = csr.na.drop('any')

csr.head(n=5)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[51], line 1
----> 1 csr = csr.withColumn('col', col_idx)\
      2     .withColumn('row', row_idx)
      4 csr = csr.na.drop('any')
      6 csr.head(n=5)

NameError: name 'csr' is not defined
Code
# Let's save the result of our hard work into a new parquet file
output_path = './'
output_file = os.path.join(output_path, 'csr.parquet')
csr.write.parquet(output_file, mode='overwrite')
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[52], line 4
      2 output_path = './'
      3 output_file = os.path.join(output_path, 'csr.parquet')
----> 4 csr.write.parquet(output_file, mode='overwrite')

NameError: name 'csr' is not defined

Preparation of the training dataset

Code
csr_path = './'
csr_file = os.path.join(csr_path, 'csr.parquet')

df = spark.read.parquet(csr_file)
df.head(n=5)
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
Cell In[53], line 4
      1 csr_path = './'
      2 csr_file = os.path.join(csr_path, 'csr.parquet')
----> 4 df = spark.read.parquet(csr_file)
      5 df.head(n=5)

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:544, in DataFrameReader.parquet(self, *paths, **options)
    533 int96RebaseMode = options.get("int96RebaseMode", None)
    534 self._set_opts(
    535     mergeSchema=mergeSchema,
    536     pathGlobFilter=pathGlobFilter,
   (...)
    541     int96RebaseMode=int96RebaseMode,
    542 )
--> 544 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw)
    181 converted = convert_exception(e.java_exception)
    182 if not isinstance(converted, UnknownException):
    183     # Hide where the exception came from that shows a non-Pythonic
    184     # JVM exception message.
--> 185     raise converted from None
    186 else:
    187     raise

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/boucheron/Documents/IFEBY310/core/notebooks/csr.parquet.
Code
df.count()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[54], line 1
----> 1 df.count()

NameError: name 'df' is not defined
Code
# What are the features related to campaign_id 1204 ?
features_names = \
    df.select('feature_name')\
    .distinct()\
    .toPandas()['feature_name']
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[55], line 3
      1 # What are the features related to campaign_id 1204 ?
      2 features_names = \
----> 3     df.select('feature_name')\
      4     .distinct()\
      5     .toPandas()['feature_name']

NameError: name 'df' is not defined
Code
features_names
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[56], line 1
----> 1 features_names

NameError: name 'features_names' is not defined
Code
[feature_name for feature_name in features_names if '1204' in feature_name]
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[57], line 1
----> 1 [feature_name for feature_name in features_names if '1204' in feature_name]

NameError: name 'features_names' is not defined
Code
# Look for the xid that have at least one exposure to campaign 1204
keep = func.when(
    (col('feature_name') == 'n_actions_per_category_id#C#1204.0') |
    (col('feature_name') == 'n_actions_per_category_id#O#1204.0'),
    1).otherwise(0)
df = df.withColumn('keep', keep)

df.where(col('keep') > 0).count()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[58], line 6
      1 # Look for the xid that have at least one exposure to campaign 1204
      2 keep = func.when(
      3     (col('feature_name') == 'n_actions_per_category_id#C#1204.0') |
      4     (col('feature_name') == 'n_actions_per_category_id#O#1204.0'),
      5     1).otherwise(0)
----> 6 df = df.withColumn('keep', keep)
      8 df.where(col('keep') > 0).count()

NameError: name 'df' is not defined
Code
# Sum of the keeps :)
xid_partition = Window.partitionBy('xid')
sum_keep = func.sum(col('keep')).over(xid_partition)
df = df.withColumn('sum_keep', sum_keep)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[59], line 4
      2 xid_partition = Window.partitionBy('xid')
      3 sum_keep = func.sum(col('keep')).over(xid_partition)
----> 4 df = df.withColumn('sum_keep', sum_keep)

NameError: name 'df' is not defined
Code
# Let's keep the xid exposed to 1204
df = df.where(col('sum_keep') > 0)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[60], line 2
      1 # Let's keep the xid exposed to 1204
----> 2 df = df.where(col('sum_keep') > 0)

NameError: name 'df' is not defined
Code
df.count()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[61], line 1
----> 1 df.count()

NameError: name 'df' is not defined
Code
df.select('xid').distinct().count()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[62], line 1
----> 1 df.select('xid').distinct().count()

NameError: name 'df' is not defined
Code
row_partition = Window().orderBy('row')
col_partition = Window().orderBy('col')

row_new = func.dense_rank().over(row_partition)
col_new = func.dense_rank().over(col_partition)

df = df.withColumn('row_new', row_new)
df = df.withColumn('col_new', col_new)

csr_data = df.select('row_new', 'col_new', 'value').toPandas()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[63], line 7
      4 row_new = func.dense_rank().over(row_partition)
      5 col_new = func.dense_rank().over(col_partition)
----> 7 df = df.withColumn('row_new', row_new)
      8 df = df.withColumn('col_new', col_new)
     10 csr_data = df.select('row_new', 'col_new', 'value').toPandas()

NameError: name 'df' is not defined
Code
csr_data.head()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[64], line 1
----> 1 csr_data.head()

NameError: name 'csr_data' is not defined
Code
features_names = df.select('feature_name', 'col_new').distinct()
features_names.where(col('feature_name') == 'n_actions_per_category_id#C#1204.0').head()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[65], line 1
----> 1 features_names = df.select('feature_name', 'col_new').distinct()
      2 features_names.where(col('feature_name') == 'n_actions_per_category_id#C#1204.0').head()

NameError: name 'df' is not defined
Code
features_names.where(col('feature_name') == 'n_actions_per_category_id#O#1204.0').head()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[66], line 1
----> 1 features_names.where(col('feature_name') == 'n_actions_per_category_id#O#1204.0').head()

NameError: name 'features_names' is not defined
Code
from scipy.sparse import csr_matrix
import numpy as np

rows = csr_data['row_new'].values - 1
cols = csr_data['col_new'].values - 1
vals = csr_data['value'].values

X_csr = csr_matrix((vals, (rows, cols)))
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[67], line 4
      1 from scipy.sparse import csr_matrix
      2 import numpy as np
----> 4 rows = csr_data['row_new'].values - 1
      5 cols = csr_data['col_new'].values - 1
      6 vals = csr_data['value'].values

NameError: name 'csr_data' is not defined
Code
X_csr.shape
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[68], line 1
----> 1 X_csr.shape

NameError: name 'X_csr' is not defined
Code
X_csr.shape, X_csr.nnz
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[69], line 1
----> 1 X_csr.shape, X_csr.nnz

NameError: name 'X_csr' is not defined
Code
X_csr.nnz / (X_csr.shape[0]* X_csr.shape[1])   # 0152347 * 92)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[70], line 1
----> 1 X_csr.nnz / (X_csr.shape[0]* X_csr.shape[1])   # 0152347 * 92)

NameError: name 'X_csr' is not defined
Code
# The label vector. Let's make it dense, flat and binary
y = np.array(X_csr[:, 1].todense()).ravel()
y = np.array(y > 0, dtype=np.int64)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[71], line 2
      1 # The label vector. Let's make it dense, flat and binary
----> 2 y = np.array(X_csr[:, 1].todense()).ravel()
      3 y = np.array(y > 0, dtype=np.int64)

NameError: name 'X_csr' is not defined
Code
X_csr.shape
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[72], line 1
----> 1 X_csr.shape

NameError: name 'X_csr' is not defined
Code
# We remove the second and fourth column. 
# It actually contain the label we'll want to predict.
kept_cols = list(range(X_csr.shape[1]))
kept_cols.pop(1)
kept_cols.pop(2)
X = X_csr[:, kept_cols]
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[73], line 3
      1 # We remove the second and fourth column. 
      2 # It actually contain the label we'll want to predict.
----> 3 kept_cols = list(range(X_csr.shape[1]))
      4 kept_cols.pop(1)
      5 kept_cols.pop(2)

NameError: name 'X_csr' is not defined
Code
len(kept_cols)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[74], line 1
----> 1 len(kept_cols)

NameError: name 'kept_cols' is not defined
Code
X_csr.shape, X.shape
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[75], line 1
----> 1 X_csr.shape, X.shape

NameError: name 'X_csr' is not defined

Finally !!

Wow ! That was a lot of work. Now we have a features matrix \(X\) and a vector of labels \(y\).

Code
X.indices
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[76], line 1
----> 1 X.indices

NameError: name 'X' is not defined
Code
X.indptr
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[77], line 1
----> 1 X.indptr

NameError: name 'X' is not defined
Code
X.shape, X.nnz
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[78], line 1
----> 1 X.shape, X.nnz

NameError: name 'X' is not defined
Code
y.shape, y.sum()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[79], line 1
----> 1 y.shape, y.sum()

NameError: name 'y' is not defined

Some learning for/from this data

Code
from sklearn.preprocessing import MaxAbsScaler
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression

# Normalize the features
X = MaxAbsScaler().fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.3)

clf = LogisticRegression(
    penalty='l2',
    C=1e3,
    solver='lbfgs',
    class_weight='balanced'
)

clf.fit(X_train, y_train)
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[80], line 1
----> 1 from sklearn.preprocessing import MaxAbsScaler
      2 from sklearn.model_selection import train_test_split
      3 from sklearn.linear_model import LogisticRegression

ModuleNotFoundError: No module named 'sklearn'
Code
features_names = features_names.toPandas()['feature_name']
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[81], line 1
----> 1 features_names = features_names.toPandas()['feature_name']

NameError: name 'features_names' is not defined
Code
features_names[range(6)]
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[82], line 1
----> 1 features_names[range(6)]

NameError: name 'features_names' is not defined
Code
import matplotlib.pyplot as plt
%matplotlib inline
Code
plt.figure(figsize=(16, 5))
plt.stem(clf.coef_[0]) # , use_line_collection=True)
plt.title('Logistic regression coefficients', fontsize=18)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[84], line 2
      1 plt.figure(figsize=(16, 5))
----> 2 plt.stem(clf.coef_[0]) # , use_line_collection=True)
      3 plt.title('Logistic regression coefficients', fontsize=18)

NameError: name 'clf' is not defined
<Figure size 1536x480 with 0 Axes>
Code
clf.coef_[0].shape[0]
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[85], line 1
----> 1 clf.coef_[0].shape[0]

NameError: name 'clf' is not defined
Code
len(features_names)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[86], line 1
----> 1 len(features_names)

NameError: name 'features_names' is not defined
Code
# We change the fontsize of minor ticks label
_ = plt.xticks(np.arange(clf.coef_[0].shape[0]), features_names, 
           rotation='vertical', fontsize=8)
Code
_ = plt.yticks(fontsize=14)

Code
from sklearn.metrics import precision_recall_curve, f1_score

precision, recall, _ = precision_recall_curve(y_test, clf.predict_proba(X_test)[:, 1])
    
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, label='LR (F1=%.2f)' % f1_score(y_test, clf.predict(X_test)), lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('Recall', fontsize=16)
plt.ylabel('Precision', fontsize=16)
plt.title('Precision/recall curve', fontsize=18)
plt.legend(loc="upper right", fontsize=14)
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[88], line 1
----> 1 from sklearn.metrics import precision_recall_curve, f1_score
      3 precision, recall, _ = precision_recall_curve(y_test, clf.predict_proba(X_test)[:, 1])
      5 plt.figure(figsize=(8, 6))

ModuleNotFoundError: No module named 'sklearn'

Analyse the tables

Code
query = """ANALYZE TABLE db_table COMPUTE STATISTICS
            FOR COLUMNS xid"""
Code
df.createOrReplaceTempView("db_table")
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[90], line 1
----> 1 df.createOrReplaceTempView("db_table")

NameError: name 'df' is not defined
Code
df.columns
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[91], line 1
----> 1 df.columns

NameError: name 'df' is not defined
Code
spark.sql("cache table db_table")
Code
spark.sql(query)
Code
spark.sql("show tables")