Code
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
Code
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

conf = SparkConf().setAppName("Spark SQL Course")
sc = SparkContext(conf=conf)  # no need for Spark 3...

spark = (SparkSession
    .builder
    .appName("Spark SQL Course")
    .getOrCreate()
)
25/01/18 14:18:52 WARN Utils: Your hostname, boucheron-Precision-5480 resolves to a loopback address: 127.0.1.1; using 192.168.10.120 instead (on interface wlp0s20f3)
25/01/18 14:18:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/18 14:18:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Code
sc = spark._sc
Code
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
Code
rdd.reduceByKey(lambda a, b: a + b).collect()
[Stage 0:>                                                        (0 + 20) / 20][Stage 0:================================================>        (17 + 3) / 20]                                                                                
[('b', 1), ('a', 2)]
Code
import requests, zipfile, io
from pathlib import Path

path = Path('webdata.parquet')
if not path.exists():
    url = "https://stephanegaiffas.github.io/big_data_course/data/webdata.parquet.zip"
    r = requests.get(url)
    z = zipfile.ZipFile(io.BytesIO(r.content))
    z.extractall(path='./')
---------------------------------------------------------------------------
BadZipFile                                Traceback (most recent call last)
Cell In[6], line 8
      6 url = "https://stephanegaiffas.github.io/big_data_course/data/webdata.parquet.zip"
      7 r = requests.get(url)
----> 8 z = zipfile.ZipFile(io.BytesIO(r.content))
      9 z.extractall(path='./')

File /usr/lib/python3.12/zipfile/__init__.py:1349, in ZipFile.__init__(self, file, mode, compression, allowZip64, compresslevel, strict_timestamps, metadata_encoding)
   1347 try:
   1348     if mode == 'r':
-> 1349         self._RealGetContents()
   1350     elif mode in ('w', 'x'):
   1351         # set the modified flag so central directory gets written
   1352         # even if no files are added to the archive
   1353         self._didModify = True

File /usr/lib/python3.12/zipfile/__init__.py:1416, in ZipFile._RealGetContents(self)
   1414     raise BadZipFile("File is not a zip file")
   1415 if not endrec:
-> 1416     raise BadZipFile("File is not a zip file")
   1417 if self.debug > 1:
   1418     print(endrec)

BadZipFile: File is not a zip file
Code
input_path = './'

input_file = os.path.join(input_path, 'webdata.parquet')
df = spark.read.parquet(input_file)
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
Cell In[7], line 4
      1 input_path = './'
      3 input_file = os.path.join(input_path, 'webdata.parquet')
----> 4 df = spark.read.parquet(input_file)

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:544, in DataFrameReader.parquet(self, *paths, **options)
    533 int96RebaseMode = options.get("int96RebaseMode", None)
    534 self._set_opts(
    535     mergeSchema=mergeSchema,
    536     pathGlobFilter=pathGlobFilter,
   (...)
    541     int96RebaseMode=int96RebaseMode,
    542 )
--> 544 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw)
    181 converted = convert_exception(e.java_exception)
    182 if not isinstance(converted, UnknownException):
    183     # Hide where the exception came from that shows a non-Pythonic
    184     # JVM exception message.
--> 185     raise converted from None
    186 else:
    187     raise

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/boucheron/Documents/IFEBY310/core/notebooks/webdata.parquet.
Code
df.head(6)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[8], line 1
----> 1 df.head(6)

NameError: name 'df' is not defined
Code
df.describe()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[9], line 1
----> 1 df.describe()

NameError: name 'df' is not defined
Code
df.count()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[10], line 1
----> 1 df.count()

NameError: name 'df' is not defined

Basic statistics

First we need to import some things

Code
from pyspark.sql import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql.functions import col, lit

Compute the total number of unique users

Code
df.select('xid').distinct().count()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[12], line 1
----> 1 df.select('xid').distinct().count()

NameError: name 'df' is not defined

Construct a column containing the total number of actions per user

Code
xid_partition = Window.partitionBy('xid')
n_events = func.count(col('action')).over(xid_partition)
df = df.withColumn('n_events', n_events)
df.head(n=2)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[13], line 3
      1 xid_partition = Window.partitionBy('xid')
      2 n_events = func.count(col('action')).over(xid_partition)
----> 3 df = df.withColumn('n_events', n_events)
      4 df.head(n=2)

NameError: name 'df' is not defined
Code
df.groupBy('xid').agg(func.count('action')).head(5)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[14], line 1
----> 1 df.groupBy('xid').agg(func.count('action')).head(5)

NameError: name 'df' is not defined

Construct a column containing the number of days since the last action of the user

Code
xid_partition = Window.partitionBy('xid')
max_date = func.max(col('date')).over(xid_partition)
n_days_since_last_event = func.datediff(func.current_date(), max_date)
df = df.withColumn('n_days_since_last_event',
                   n_days_since_last_event)
df.head(n=2)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[15], line 4
      2 max_date = func.max(col('date')).over(xid_partition)
      3 n_days_since_last_event = func.datediff(func.current_date(), max_date)
----> 4 df = df.withColumn('n_days_since_last_event',
      5                    n_days_since_last_event)
      6 df.head(n=2)

NameError: name 'df' is not defined

Construct a column containing the number of actions of each user for each modality of device

Code
xid_device_partition = Window.partitionBy('xid', 'device')
n_events_per_device = func.count(col('action')).over(xid_device_partition)
df = df.withColumn('n_events_per_device', n_events_per_device)
df.head(n=2)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[16], line 3
      1 xid_device_partition = Window.partitionBy('xid', 'device')
      2 n_events_per_device = func.count(col('action')).over(xid_device_partition)
----> 3 df = df.withColumn('n_events_per_device', n_events_per_device)
      4 df.head(n=2)

NameError: name 'df' is not defined

Number of device per user: some mental gymnastics

Code
xid_partition = Window.partitionBy('xid')
rank_device = func.dense_rank().over(xid_partition.orderBy('device'))
n_unique_device = func.last(rank_device).over(xid_partition)
df = df.withColumn('n_device', n_unique_device)
df.head(n=2)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[17], line 4
      2 rank_device = func.dense_rank().over(xid_partition.orderBy('device'))
      3 n_unique_device = func.last(rank_device).over(xid_partition)
----> 4 df = df.withColumn('n_device', n_unique_device)
      5 df.head(n=2)

NameError: name 'df' is not defined
Code
df\
    .where(col('n_device') > 1)\
    .select('xid', 'device', 'n_events',  'n_device', 'n_events_per_device')\
    .head(n=8)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[18], line 1
----> 1 df\
      2     .where(col('n_device') > 1)\
      3     .select('xid', 'device', 'n_events',  'n_device', 'n_events_per_device')\
      4     .head(n=8)

NameError: name 'df' is not defined

Let’s select the correct users and build a training dataset

We construct a ETL (Extract Transform Load) process on this data using the pyspark.sql API.

Extraction

Extraction is easy here, it’s just about reading the data

Code
df = spark.read.parquet(input_file)
df.head(n=3)
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
Cell In[19], line 1
----> 1 df = spark.read.parquet(input_file)
      2 df.head(n=3)

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:544, in DataFrameReader.parquet(self, *paths, **options)
    533 int96RebaseMode = options.get("int96RebaseMode", None)
    534 self._set_opts(
    535     mergeSchema=mergeSchema,
    536     pathGlobFilter=pathGlobFilter,
   (...)
    541     int96RebaseMode=int96RebaseMode,
    542 )
--> 544 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw)
    181 converted = convert_exception(e.java_exception)
    182 if not isinstance(converted, UnknownException):
    183     # Hide where the exception came from that shows a non-Pythonic
    184     # JVM exception message.
--> 185     raise converted from None
    186 else:
    187     raise

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/boucheron/Documents/IFEBY310/core/notebooks/webdata.parquet.

Transformation of the data

At this step we compute a lot of extra things from the data. The aim is to build features that describe users.

Code
def n_events_transformer(df):
    xid_partition = Window.partitionBy('xid')
    n_events = func.count(col('action')).over(xid_partition)
    df = df.withColumn('n_events', n_events)
    return df

def n_events_per_action_transformer(df):
    xid_action_partition = Window.partitionBy('xid', 'action')
    n_events_per_action = func.count(col('action')).over(xid_action_partition)
    df = df.withColumn('n_events_per_action', n_events_per_action)
    return df

def hour_transformer(df):
    hour = func.hour(col('date'))
    df = df.withColumn('hour', hour)
    return df

def weekday_transformer(df):
    weekday = func.date_format(col('date'), 'EEEE')
    df = df.withColumn('weekday', weekday)
    return df

def n_events_per_hour_transformer(df):
    xid_hour_partition = Window.partitionBy('xid', 'hour')
    n_events_per_hour = func.count(col('action')).over(xid_hour_partition)
    df = df.withColumn('n_events_per_hour', n_events_per_hour)
    return df

def n_events_per_weekday_transformer(df):
    xid_weekday_partition = Window.partitionBy('xid', 'weekday')
    n_events_per_weekday = func.count(col('action')).over(xid_weekday_partition)
    df = df.withColumn('n_events_per_weekday', n_events_per_weekday)
    return df

def n_days_since_last_event_transformer(df):
    xid_partition = Window.partitionBy('xid')
    max_date = func.max(col('date')).over(xid_partition)
    n_days_since_last_event = func.datediff(func.current_date(), max_date)
    df = df.withColumn('n_days_since_last_event',
                       n_days_since_last_event + lit(0.1))
    return df

def n_days_since_last_action_transformer(df):
    xid_partition_action = Window.partitionBy('xid', 'action')
    max_date = func.max(col('date')).over(xid_partition_action)
    n_days_since_last_action = func.datediff(func.current_date(),
                                                        max_date)
    df = df.withColumn('n_days_since_last_action',
                       n_days_since_last_action + lit(0.1))
    return df

def n_unique_day_transformer(df):
    xid_partition = Window.partitionBy('xid')
    dayofyear = func.dayofyear(col('date'))
    rank_day = func.dense_rank().over(xid_partition.orderBy(dayofyear))
    n_unique_day = func.last(rank_day).over(xid_partition)
    df = df.withColumn('n_unique_day', n_unique_day)
    return df

def n_unique_hour_transformer(df):
    xid_partition = Window.partitionBy('xid')
    rank_hour = func.dense_rank().over(xid_partition.orderBy('hour'))
    n_unique_hour = func.last(rank_hour).over(xid_partition)
    df = df.withColumn('n_unique_hour', n_unique_hour)
    return df

def n_events_per_device_transformer(df):
    xid_device_partition = Window.partitionBy('xid', 'device')
    n_events_per_device = func.count(func.col('device')) \
        .over(xid_device_partition)
    df = df.withColumn('n_events_per_device', n_events_per_device)
    return df

def n_unique_device_transformer(df):
    xid_partition = Window.partitionBy('xid')
    rank_device = func.dense_rank().over(xid_partition.orderBy('device'))
    n_unique_device = func.last(rank_device).over(xid_partition)
    df = df.withColumn('n_device', n_unique_device)
    return df

def n_actions_per_category_id_transformer(df):
    xid_category_id_partition = Window.partitionBy('xid', 'category_id',
                                                   'action')
    n_actions_per_category_id = func.count(func.col('action')) \
        .over(xid_category_id_partition)
    df = df.withColumn('n_actions_per_category_id', n_actions_per_category_id)
    return df

def n_unique_category_id_transformer(df):
    xid_partition = Window.partitionBy('xid')
    rank_category_id = func.dense_rank().over(xid_partition\
                                              .orderBy('category_id'))
    n_unique_category_id = func.last(rank_category_id).over(xid_partition)
    df = df.withColumn('n_unique_category_id', n_unique_category_id)
    return df

def n_events_per_category_id_transformer(df):
    xid_category_id_partition = Window.partitionBy('xid', 'category_id')
    n_events_per_category_id = func.count(func.col('action')) \
        .over(xid_category_id_partition)
    df = df.withColumn('n_events_per_category_id', n_events_per_category_id)
    return df

def n_events_per_website_id_transformer(df):
    xid_website_id_partition = Window.partitionBy('xid', 'website_id')
    n_events_per_website_id = func.count(col('action'))\
        .over(xid_website_id_partition)
    df = df.withColumn('n_events_per_website_id', n_events_per_website_id)
    return df
Code
transformers = [
    hour_transformer,
    weekday_transformer,
    n_events_per_hour_transformer,
    n_events_per_weekday_transformer,
    n_days_since_last_event_transformer,
    n_days_since_last_action_transformer,
    n_unique_day_transformer,
    n_unique_hour_transformer,
    n_events_per_device_transformer,
    n_unique_device_transformer,
    n_actions_per_category_id_transformer,
    n_events_per_category_id_transformer,
    n_events_per_website_id_transformer,
]

for transformer in transformers:
    df = transformer(df)

df.head(n=1)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[21], line 18
      1 transformers = [
      2     hour_transformer,
      3     weekday_transformer,
   (...)
     14     n_events_per_website_id_transformer,
     15 ]
     17 for transformer in transformers:
---> 18     df = transformer(df)
     20 df.head(n=1)

NameError: name 'df' is not defined
Code
sorted(df.columns)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[22], line 1
----> 1 sorted(df.columns)

NameError: name 'df' is not defined

Load step

Here, we use all the previous computations (saved in the columns of the dataframe) to compute aggregated informations about each user.

Code
def n_events_per_hour_loader(df):
    csr = df\
        .select('xid', 'hour', 'n_events_per_hour')\
        .withColumnRenamed('n_events_per_hour', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_hour#'), col('hour'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('hour')
    return csr

def n_events_per_website_id_loader(df):
    csr = df.select('xid', 'website_id', 'n_events_per_website_id')\
        .withColumnRenamed('n_events_per_hour', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_website_id#'),
                               col('website_id'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('website_id')
    return csr

def n_events_per_hour_loader(df):
    csr = df\
        .select('xid', 'hour', 'n_events_per_hour')\
        .withColumnRenamed('n_events_per_hour', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_hour#'), col('hour'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('hour')
    return csr

def n_events_per_weekday_loader(df):
    csr = df\
        .select('xid', 'weekday', 'n_events_per_weekday')\
        .withColumnRenamed('n_events_per_weekday', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_weekday#'), col('weekday'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('weekday')
    return csr

def n_days_since_last_event_loader(df):
    csr = df.select('xid',  'n_days_since_last_event')\
        .withColumnRenamed('n_days_since_last_event#', 'value')\
        .distinct()
    feature_name = lit('n_days_since_last_event')
    csr = csr\
        .withColumn('feature_name', feature_name)
    return csr

def n_days_since_last_action_loader(df):
    csr = df.select('xid', 'action', 'n_days_since_last_action')\
        .withColumnRenamed('n_days_since_last_action', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_days_since_last_action#'), col('action'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('action')
    return csr

def n_unique_day_loader(df):
    csr = df.select('xid', 'n_unique_day')\
        .withColumnRenamed('n_unique_day', 'value')\
        .distinct()
    feature_name = lit('n_unique_day')
    csr = csr\
        .withColumn('feature_name', feature_name)
    return csr

def n_unique_hour_loader(df):
    csr = df.select('xid', 'n_unique_hour')\
        .withColumnRenamed('n_unique_hour', 'value')\
        .distinct()
    feature_name = lit('n_unique_hour')
    csr = csr\
        .withColumn('feature_name', feature_name)
    return csr

def n_events_per_device_loader(df):
    csr = df\
        .select('xid', 'device', 'n_events_per_device')\
        .withColumnRenamed('n_events_per_device', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_device#'), col('device'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('device')
    return csr

def n_unique_device_loader(df):
    csr = df.select('xid', 'n_device')\
        .withColumnRenamed('n_device', 'value')\
        .distinct()
    feature_name = lit('n_device')
    csr = csr\
        .withColumn('feature_name', feature_name)
    return csr

def n_events_per_category_id_loader(df):
    csr = df.select('xid', 'category_id', 'n_events_per_category_id')\
        .withColumnRenamed('n_events_per_category_id', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_category_id#'),
                               col('category_id'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('category_id')
    return csr

def n_actions_per_category_id_loader(df):
    csr = df.select('xid', 'category_id', 'action', 'n_actions_per_category_id')\
        .withColumnRenamed('n_actions_per_category_id', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_actions_per_category_id#'),
                               col('action'), lit('#'), 
                               col('category_id'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('category_id')\
        .drop('action')
    return csr

def n_events_per_website_id_loader(df):
    csr = df.select('xid', 'website_id', 'n_events_per_website_id')\
        .withColumnRenamed('n_events_per_website_id', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_website_id#'),
                               col('website_id'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('website_id')
    return csr
Code
from functools import reduce

loaders = [
    n_events_per_hour_loader,
    n_events_per_website_id_loader,
    n_events_per_hour_loader,
    n_events_per_weekday_loader,
    n_days_since_last_event_loader,
    n_days_since_last_action_loader,
    n_unique_day_loader,
    n_unique_hour_loader,
    n_events_per_device_loader,
    n_unique_device_loader,
    n_events_per_category_id_loader,
    n_actions_per_category_id_loader,
    n_events_per_website_id_loader,
]

def union(df, other):
    return df.union(other)

csr = reduce(
    lambda df1, df2: df1.union(df2),
    [loader(df) for loader in loaders]
)

csr.head(n=3)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[24], line 24
     19 def union(df, other):
     20     return df.union(other)
     22 csr = reduce(
     23     lambda df1, df2: df1.union(df2),
---> 24     [loader(df) for loader in loaders]
     25 )
     27 csr.head(n=3)

NameError: name 'df' is not defined
Code
csr.columns
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[25], line 1
----> 1 csr.columns

NameError: name 'csr' is not defined
Code
csr.count()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[26], line 1
----> 1 csr.count()

NameError: name 'csr' is not defined
Code
# Replace features names and xid by a unique number
feature_name_partition = Window().orderBy('feature_name')
xid_partition = Window().orderBy('xid')

col_idx = func.dense_rank().over(feature_name_partition)
row_idx = func.dense_rank().over(xid_partition)

csr = csr.withColumn('col', col_idx)\
    .withColumn('row', row_idx)

csr = csr.na.drop('any')

csr.head(n=5)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[27], line 8
      5 col_idx = func.dense_rank().over(feature_name_partition)
      6 row_idx = func.dense_rank().over(xid_partition)
----> 8 csr = csr.withColumn('col', col_idx)\
      9     .withColumn('row', row_idx)
     11 csr = csr.na.drop('any')
     13 csr.head(n=5)

NameError: name 'csr' is not defined
Code
# Let's save the result of our hard work into a new parquet file
output_path = './'
output_file = os.path.join(output_path, 'csr.parquet')
csr.write.parquet(output_file, mode='overwrite')
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[28], line 4
      2 output_path = './'
      3 output_file = os.path.join(output_path, 'csr.parquet')
----> 4 csr.write.parquet(output_file, mode='overwrite')

NameError: name 'csr' is not defined

Preparation of the training dataset

Code
csr_path = './'
csr_file = os.path.join(csr_path, 'csr.parquet')

df = spark.read.parquet(csr_file)
df.head(n=5)
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
Cell In[29], line 4
      1 csr_path = './'
      2 csr_file = os.path.join(csr_path, 'csr.parquet')
----> 4 df = spark.read.parquet(csr_file)
      5 df.head(n=5)

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:544, in DataFrameReader.parquet(self, *paths, **options)
    533 int96RebaseMode = options.get("int96RebaseMode", None)
    534 self._set_opts(
    535     mergeSchema=mergeSchema,
    536     pathGlobFilter=pathGlobFilter,
   (...)
    541     int96RebaseMode=int96RebaseMode,
    542 )
--> 544 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw)
    181 converted = convert_exception(e.java_exception)
    182 if not isinstance(converted, UnknownException):
    183     # Hide where the exception came from that shows a non-Pythonic
    184     # JVM exception message.
--> 185     raise converted from None
    186 else:
    187     raise

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/boucheron/Documents/IFEBY310/core/notebooks/csr.parquet.
Code
df.count()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[30], line 1
----> 1 df.count()

NameError: name 'df' is not defined
Code
# What are the features related to campaign_id 1204 ?
features_names = \
    df.select('feature_name')\
    .distinct()\
    .toPandas()['feature_name']
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[31], line 3
      1 # What are the features related to campaign_id 1204 ?
      2 features_names = \
----> 3     df.select('feature_name')\
      4     .distinct()\
      5     .toPandas()['feature_name']

NameError: name 'df' is not defined
Code
features_names
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[32], line 1
----> 1 features_names

NameError: name 'features_names' is not defined
Code
[feature_name for feature_name in features_names if '1204' in feature_name]
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[33], line 1
----> 1 [feature_name for feature_name in features_names if '1204' in feature_name]

NameError: name 'features_names' is not defined
Code
# Look for the xid that have at least one exposure to campaign 1204
keep = func.when(
    (col('feature_name') == 'n_actions_per_category_id#C#1204.0') |
    (col('feature_name') == 'n_actions_per_category_id#O#1204.0'),
    1).otherwise(0)
df = df.withColumn('keep', keep)

df.where(col('keep') > 0).count()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[34], line 6
      1 # Look for the xid that have at least one exposure to campaign 1204
      2 keep = func.when(
      3     (col('feature_name') == 'n_actions_per_category_id#C#1204.0') |
      4     (col('feature_name') == 'n_actions_per_category_id#O#1204.0'),
      5     1).otherwise(0)
----> 6 df = df.withColumn('keep', keep)
      8 df.where(col('keep') > 0).count()

NameError: name 'df' is not defined
Code
# Sum of the keeps :)
xid_partition = Window.partitionBy('xid')
sum_keep = func.sum(col('keep')).over(xid_partition)
df = df.withColumn('sum_keep', sum_keep)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[35], line 4
      2 xid_partition = Window.partitionBy('xid')
      3 sum_keep = func.sum(col('keep')).over(xid_partition)
----> 4 df = df.withColumn('sum_keep', sum_keep)

NameError: name 'df' is not defined
Code
# Let's keep the xid exposed to 1204
df = df.where(col('sum_keep') > 0)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[36], line 2
      1 # Let's keep the xid exposed to 1204
----> 2 df = df.where(col('sum_keep') > 0)

NameError: name 'df' is not defined
Code
df.count()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[37], line 1
----> 1 df.count()

NameError: name 'df' is not defined
Code
df.select('xid').distinct().count()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[38], line 1
----> 1 df.select('xid').distinct().count()

NameError: name 'df' is not defined
Code
row_partition = Window().orderBy('row')
col_partition = Window().orderBy('col')
row_new = func.dense_rank().over(row_partition)
col_new = func.dense_rank().over(col_partition)
df = df.withColumn('row_new', row_new)
df = df.withColumn('col_new', col_new)
csr_data = df.select('row_new', 'col_new', 'value').toPandas()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[39], line 5
      3 row_new = func.dense_rank().over(row_partition)
      4 col_new = func.dense_rank().over(col_partition)
----> 5 df = df.withColumn('row_new', row_new)
      6 df = df.withColumn('col_new', col_new)
      7 csr_data = df.select('row_new', 'col_new', 'value').toPandas()

NameError: name 'df' is not defined
Code
csr_data.head()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[40], line 1
----> 1 csr_data.head()

NameError: name 'csr_data' is not defined
Code
features_names = df.select('feature_name', 'col_new').distinct()
features_names.where(col('feature_name') == 'n_actions_per_category_id#C#1204.0').head()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[41], line 1
----> 1 features_names = df.select('feature_name', 'col_new').distinct()
      2 features_names.where(col('feature_name') == 'n_actions_per_category_id#C#1204.0').head()

NameError: name 'df' is not defined
Code
features_names.where(col('feature_name') == 'n_actions_per_category_id#O#1204.0').head()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[42], line 1
----> 1 features_names.where(col('feature_name') == 'n_actions_per_category_id#O#1204.0').head()

NameError: name 'features_names' is not defined
Code
from scipy.sparse import csr_matrix
import numpy as np

rows = csr_data['row_new'].values - 1
cols = csr_data['col_new'].values - 1
vals = csr_data['value'].values

X_csr = csr_matrix((vals, (rows, cols)))
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[43], line 4
      1 from scipy.sparse import csr_matrix
      2 import numpy as np
----> 4 rows = csr_data['row_new'].values - 1
      5 cols = csr_data['col_new'].values - 1
      6 vals = csr_data['value'].values

NameError: name 'csr_data' is not defined
Code
X_csr.shape
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[44], line 1
----> 1 X_csr.shape

NameError: name 'X_csr' is not defined
Code
X_csr.shape, X_csr.nnz
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[45], line 1
----> 1 X_csr.shape, X_csr.nnz

NameError: name 'X_csr' is not defined
Code
X_csr.nnz / (152347 * 92)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[46], line 1
----> 1 X_csr.nnz / (152347 * 92)

NameError: name 'X_csr' is not defined
Code
# The label vector. Let's make it dense, flat and binary
y = np.array(X_csr[:, 1].todense()).ravel()
y = np.array(y > 0, dtype=np.int64)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[47], line 2
      1 # The label vector. Let's make it dense, flat and binary
----> 2 y = np.array(X_csr[:, 1].todense()).ravel()
      3 y = np.array(y > 0, dtype=np.int64)

NameError: name 'X_csr' is not defined
Code
X_csr.shape
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[48], line 1
----> 1 X_csr.shape

NameError: name 'X_csr' is not defined
Code
# We remove the second and fourth column. 
# It actually contain the label we'll want to predict.
kept_cols = list(range(92))
kept_cols.pop(1)
kept_cols.pop(2)
X = X_csr[:, kept_cols]
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[49], line 6
      4 kept_cols.pop(1)
      5 kept_cols.pop(2)
----> 6 X = X_csr[:, kept_cols]

NameError: name 'X_csr' is not defined
Code
X_csr.shape
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[50], line 1
----> 1 X_csr.shape

NameError: name 'X_csr' is not defined

Finally !!

Wow ! That was a lot of work. Now we have a features matrix \(X\) and a vector of labels \(y\).

Code
X.indices
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[51], line 1
----> 1 X.indices

NameError: name 'X' is not defined
Code
X.indptr
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[52], line 1
----> 1 X.indptr

NameError: name 'X' is not defined
Code
X.shape, X.nnz
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[53], line 1
----> 1 X.shape, X.nnz

NameError: name 'X' is not defined
Code
y.shape, y.sum()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[54], line 1
----> 1 y.shape, y.sum()

NameError: name 'y' is not defined

Some learning for this data

Code
from sklearn.preprocessing import MaxAbsScaler
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression

# Normalize the features
X = MaxAbsScaler().fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.3)

clf = LogisticRegression(
    penalty='l2',
    C=1e3,
    solver='lbfgs',
    class_weight='balanced'
)
clf.fit(X_train, y_train)
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[55], line 1
----> 1 from sklearn.preprocessing import MaxAbsScaler
      2 from sklearn.model_selection import train_test_split
      3 from sklearn.linear_model import LogisticRegression

ModuleNotFoundError: No module named 'sklearn'
Code
features_names = features_names.toPandas()['feature_name']
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[56], line 1
----> 1 features_names = features_names.toPandas()['feature_name']

NameError: name 'features_names' is not defined
Code
import matplotlib.pyplot as plt
%matplotlib inline

plt.figure(figsize=(16, 5))
plt.stem(clf.coef_[0], use_line_collection=True)
plt.title('Logistic regression coefficients', fontsize=18)
# We change the fontsize of minor ticks label
_ = plt.xticks(np.arange(clf.coef_[0].shape[0]), features_names, 
           rotation='vertical', fontsize=8)
_ = plt.yticks(fontsize=14)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[57], line 5
      2 get_ipython().run_line_magic('matplotlib', 'inline')
      4 plt.figure(figsize=(16, 5))
----> 5 plt.stem(clf.coef_[0], use_line_collection=True)
      6 plt.title('Logistic regression coefficients', fontsize=18)
      7 # We change the fontsize of minor ticks label

NameError: name 'clf' is not defined
<Figure size 1536x480 with 0 Axes>
Code
from sklearn.metrics import precision_recall_curve, f1_score

precision, recall, _ = precision_recall_curve(y_test, clf.predict_proba(X_test)[:, 1])
    
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, label='LR (F1=%.2f)' % f1_score(y_test, clf.predict(X_test)), lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('Recall', fontsize=16)
plt.ylabel('Precision', fontsize=16)
plt.title('Precision/recall curve', fontsize=18)
plt.legend(loc="upper right", fontsize=14)
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[58], line 1
----> 1 from sklearn.metrics import precision_recall_curve, f1_score
      3 precision, recall, _ = precision_recall_curve(y_test, clf.predict_proba(X_test)[:, 1])
      5 plt.figure(figsize=(8, 6))

ModuleNotFoundError: No module named 'sklearn'