Code
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
Code
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

conf = SparkConf().setAppName("Spark SQL Course")
sc = SparkContext(conf=conf)  # no need for Spark 3...

spark = (SparkSession
    .builder
    .appName("Spark SQL Course")
    .getOrCreate()
)
25/04/03 15:10:49 WARN Utils: Your hostname, boucheron-Precision-5480 resolves to a loopback address: 127.0.1.1; using 172.23.32.10 instead (on interface eth0)
25/04/03 15:10:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/03 15:10:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Code
sc = spark._sc
Code
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
Code
rdd.reduceByKey(lambda a, b: a + b).collect()
[Stage 0:>                                                        (0 + 20) / 20]                                                                                
[('b', 1), ('a', 2)]
Code
import requests, zipfile, io
from pathlib import Path

path = Path('webdata.parquet')
if not path.exists():
    url = "https://stephanegaiffas.github.io/big_data_course/data/webdata.parquet.zip"
    r = requests.get(url)
    z = zipfile.ZipFile(io.BytesIO(r.content))
    z.extractall(path='./')
Code
input_path = './'

input_file = os.path.join(input_path, 'webdata.parquet')
df = spark.read.parquet(input_file)
Code
df.head(6)
[Row(xid='001ff9b6-5383-4221-812d-58c2c3f234cc', action='O', date=datetime.datetime(2017, 1, 25, 7, 2, 18), website_id='3', url='http://www.8chances.com/grille', category_id=1002.0, zipcode='11370', device='SMP'),
 Row(xid='0056ab7a-3cba-4ed5-a495-3d4abf79ab66', action='O', date=datetime.datetime(2016, 12, 28, 9, 47, 8), website_id='54', url='http://www.salaire-brut-en-net.fr/differences-brut-net/', category_id=1002.0, zipcode='86000', device='DSK'),
 Row(xid='005ae4ab-363a-41a0-b8f9-faee47d622a4', action='O', date=datetime.datetime(2017, 1, 27, 22, 21, 6), website_id='74', url='http://www.realite-virtuelle.com/top-applications-horreur-vr-halloween', category_id=1002.0, zipcode='49700', device='DSK'),
 Row(xid='006f867c-70cb-41f0-82af-f3688fa719c5', action='O', date=datetime.datetime(2016, 12, 20, 12, 45, 14), website_id='43', url='http://www.frenchblues.fr/', category_id=1002.0, zipcode='42660', device='DSK'),
 Row(xid='006f867c-70cb-41f0-82af-f3688fa719c5', action='O', date=datetime.datetime(2016, 12, 20, 12, 56, 50), website_id='43', url='http://www.frenchblues.fr/', category_id=1002.0, zipcode='42660', device='DSK'),
 Row(xid='006f867c-70cb-41f0-82af-f3688fa719c5', action='O', date=datetime.datetime(2016, 12, 20, 12, 56, 53), website_id='43', url='http://www.frenchblues.fr/contact/', category_id=1002.0, zipcode='42660', device='DSK')]
Code
df.describe()
DataFrame[summary: string, xid: string, action: string, website_id: string, url: string, category_id: string, zipcode: string, device: string]
Code
df.count()
1179532

Basic statistics

First we need to import some things

Code
from pyspark.sql import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql.functions import col, lit

Compute the total number of unique users

Code
df.select('xid').distinct().count()
473761

Construct a column containing the total number of actions per user

Code
xid_partition = Window.partitionBy('xid')
n_events = func.count(col('action')).over(xid_partition)
df = df.withColumn('n_events', n_events)
df.head(n=2)
[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', n_events=1),
 Row(xid='000893c8-a14b-4f33-858f-210440f37def', action='O', date=datetime.datetime(2016, 12, 23, 16, 18, 37), website_id='56', url='http://blague.dumatin.fr/', category_id=1002.0, zipcode=None, device='DSK', n_events=1)]
Code
df.groupBy('xid').agg(func.count('action')).head(5)
[Row(xid='001c4a21-52c6-4890-b6ce-2b9d4ba06a56', count(action)=1),
 Row(xid='0024344b-7ee2-4fcd-a0b4-bec26d8c8b0e', count(action)=4),
 Row(xid='004564e3-87c1-4e16-ad2c-0e96afc3d617', count(action)=1),
 Row(xid='006d807f-91c3-415a-bb5e-6b9f7e6517a1', count(action)=1),
 Row(xid='006e0463-b24c-4996-84ab-d6d0d65a52aa', count(action)=1)]

Construct a column containing the number of days since the last action of the user

Code
xid_partition = Window.partitionBy('xid')
max_date = func.max(col('date')).over(xid_partition)
n_days_since_last_event = func.datediff(func.current_date(), max_date)
df = df.withColumn('n_days_since_last_event',
                   n_days_since_last_event)
df.head(n=2)
[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', n_events=1, n_days_since_last_event=3020),
 Row(xid='000893c8-a14b-4f33-858f-210440f37def', action='O', date=datetime.datetime(2016, 12, 23, 16, 18, 37), website_id='56', url='http://blague.dumatin.fr/', category_id=1002.0, zipcode=None, device='DSK', n_events=1, n_days_since_last_event=3023)]

Construct a column containing the number of actions of each user for each modality of device

Code
xid_device_partition = Window.partitionBy('xid', 'device')
n_events_per_device = func.count(col('action')).over(xid_device_partition)
df = df.withColumn('n_events_per_device', n_events_per_device)
df.head(n=2)
[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', n_events=1, n_days_since_last_event=3020, n_events_per_device=1),
 Row(xid='000893c8-a14b-4f33-858f-210440f37def', action='O', date=datetime.datetime(2016, 12, 23, 16, 18, 37), website_id='56', url='http://blague.dumatin.fr/', category_id=1002.0, zipcode=None, device='DSK', n_events=1, n_days_since_last_event=3023, n_events_per_device=1)]

Number of device per user: some mental gymnastics

Code
xid_partition = Window.partitionBy('xid')
rank_device = func.dense_rank().over(xid_partition.orderBy('device'))
n_unique_device = func.last(rank_device).over(xid_partition)
df = df.withColumn('n_device', n_unique_device)
df.head(n=2)
[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', n_events=1, n_days_since_last_event=3020, n_events_per_device=1, n_device=1),
 Row(xid='000893c8-a14b-4f33-858f-210440f37def', action='O', date=datetime.datetime(2016, 12, 23, 16, 18, 37), website_id='56', url='http://blague.dumatin.fr/', category_id=1002.0, zipcode=None, device='DSK', n_events=1, n_days_since_last_event=3023, n_events_per_device=1, n_device=1)]
Code
df\
    .where(col('n_device') > 1)\
    .select('xid', 'device', 'n_events',  'n_device', 'n_events_per_device')\
    .head(n=8)
[Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
 Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
 Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
 Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
 Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
 Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='MOB', n_events=6, n_device=2, n_events_per_device=1),
 Row(xid='78156cdf-7229-46eb-bb6b-92d384f9a6fa', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
 Row(xid='78156cdf-7229-46eb-bb6b-92d384f9a6fa', device='DSK', n_events=6, n_device=2, n_events_per_device=5)]

Let’s select the correct users and build a training dataset

We construct a ETL (Extract Transform Load) process on this data using the pyspark.sql API.

Extraction

Extraction is easy here, it’s just about reading the data

Code
df = spark.read.parquet(input_file)
df.head(n=3)
[Row(xid='001ff9b6-5383-4221-812d-58c2c3f234cc', action='O', date=datetime.datetime(2017, 1, 25, 7, 2, 18), website_id='3', url='http://www.8chances.com/grille', category_id=1002.0, zipcode='11370', device='SMP'),
 Row(xid='0056ab7a-3cba-4ed5-a495-3d4abf79ab66', action='O', date=datetime.datetime(2016, 12, 28, 9, 47, 8), website_id='54', url='http://www.salaire-brut-en-net.fr/differences-brut-net/', category_id=1002.0, zipcode='86000', device='DSK'),
 Row(xid='005ae4ab-363a-41a0-b8f9-faee47d622a4', action='O', date=datetime.datetime(2017, 1, 27, 22, 21, 6), website_id='74', url='http://www.realite-virtuelle.com/top-applications-horreur-vr-halloween', category_id=1002.0, zipcode='49700', device='DSK')]

Transformation of the data

At this step we compute a lot of extra things from the data. The aim is to build features that describe users.

Code
def n_events_transformer(df):
    xid_partition = Window.partitionBy('xid')
    n_events = func.count(col('action')).over(xid_partition)
    df = df.withColumn('n_events', n_events)
    return df

def n_events_per_action_transformer(df):
    xid_action_partition = Window.partitionBy('xid', 'action')
    n_events_per_action = func.count(col('action')).over(xid_action_partition)
    df = df.withColumn('n_events_per_action', n_events_per_action)
    return df

def hour_transformer(df):
    hour = func.hour(col('date'))
    df = df.withColumn('hour', hour)
    return df

def weekday_transformer(df):
    weekday = func.date_format(col('date'), 'EEEE')
    df = df.withColumn('weekday', weekday)
    return df

def n_events_per_hour_transformer(df):
    xid_hour_partition = Window.partitionBy('xid', 'hour')
    n_events_per_hour = func.count(col('action')).over(xid_hour_partition)
    df = df.withColumn('n_events_per_hour', n_events_per_hour)
    return df

def n_events_per_weekday_transformer(df):
    xid_weekday_partition = Window.partitionBy('xid', 'weekday')
    n_events_per_weekday = func.count(col('action')).over(xid_weekday_partition)
    df = df.withColumn('n_events_per_weekday', n_events_per_weekday)
    return df

def n_days_since_last_event_transformer(df):
    xid_partition = Window.partitionBy('xid')
    max_date = func.max(col('date')).over(xid_partition)
    n_days_since_last_event = func.datediff(func.current_date(), max_date)
    df = df.withColumn('n_days_since_last_event',
                       n_days_since_last_event + lit(0.1))
    return df

def n_days_since_last_action_transformer(df):
    xid_partition_action = Window.partitionBy('xid', 'action')
    max_date = func.max(col('date')).over(xid_partition_action)
    n_days_since_last_action = func.datediff(func.current_date(),
                                                        max_date)
    df = df.withColumn('n_days_since_last_action',
                       n_days_since_last_action + lit(0.1))
    return df

def n_unique_day_transformer(df):
    xid_partition = Window.partitionBy('xid')
    dayofyear = func.dayofyear(col('date'))
    rank_day = func.dense_rank().over(xid_partition.orderBy(dayofyear))
    n_unique_day = func.last(rank_day).over(xid_partition)
    df = df.withColumn('n_unique_day', n_unique_day)
    return df

def n_unique_hour_transformer(df):
    xid_partition = Window.partitionBy('xid')
    rank_hour = func.dense_rank().over(xid_partition.orderBy('hour'))
    n_unique_hour = func.last(rank_hour).over(xid_partition)
    df = df.withColumn('n_unique_hour', n_unique_hour)
    return df

def n_events_per_device_transformer(df):
    xid_device_partition = Window.partitionBy('xid', 'device')
    n_events_per_device = func.count(func.col('device')) \
        .over(xid_device_partition)
    df = df.withColumn('n_events_per_device', n_events_per_device)
    return df

def n_unique_device_transformer(df):
    xid_partition = Window.partitionBy('xid')
    rank_device = func.dense_rank().over(xid_partition.orderBy('device'))
    n_unique_device = func.last(rank_device).over(xid_partition)
    df = df.withColumn('n_device', n_unique_device)
    return df

def n_actions_per_category_id_transformer(df):
    xid_category_id_partition = Window.partitionBy('xid', 'category_id',
                                                   'action')
    n_actions_per_category_id = func.count(func.col('action')) \
        .over(xid_category_id_partition)
    df = df.withColumn('n_actions_per_category_id', n_actions_per_category_id)
    return df

def n_unique_category_id_transformer(df):
    xid_partition = Window.partitionBy('xid')
    rank_category_id = func.dense_rank().over(xid_partition\
                                              .orderBy('category_id'))
    n_unique_category_id = func.last(rank_category_id).over(xid_partition)
    df = df.withColumn('n_unique_category_id', n_unique_category_id)
    return df

def n_events_per_category_id_transformer(df):
    xid_category_id_partition = Window.partitionBy('xid', 'category_id')
    n_events_per_category_id = func.count(func.col('action')) \
        .over(xid_category_id_partition)
    df = df.withColumn('n_events_per_category_id', n_events_per_category_id)
    return df

def n_events_per_website_id_transformer(df):
    xid_website_id_partition = Window.partitionBy('xid', 'website_id')
    n_events_per_website_id = func.count(col('action'))\
        .over(xid_website_id_partition)
    df = df.withColumn('n_events_per_website_id', n_events_per_website_id)
    return df
Code
transformers = [
    hour_transformer,
    weekday_transformer,
    n_events_per_hour_transformer,
    n_events_per_weekday_transformer,
    n_days_since_last_event_transformer,
    n_days_since_last_action_transformer,
    n_unique_day_transformer,
    n_unique_hour_transformer,
    n_events_per_device_transformer,
    n_unique_device_transformer,
    n_actions_per_category_id_transformer,
    n_events_per_category_id_transformer,
    n_events_per_website_id_transformer,
]

for transformer in transformers:
    df = transformer(df)

df.head(n=1)
[Stage 35:>                                                         (0 + 1) / 1]                                                                                
[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', hour=13, weekday='Monday', n_events_per_hour=1, n_events_per_weekday=1, n_days_since_last_event=3020.1, n_days_since_last_action=3020.1, n_unique_day=1, n_unique_hour=1, n_events_per_device=1, n_device=1, n_actions_per_category_id=1, n_events_per_category_id=1, n_events_per_website_id=1)]
Code
sorted(df.columns)
['action',
 'category_id',
 'date',
 'device',
 'hour',
 'n_actions_per_category_id',
 'n_days_since_last_action',
 'n_days_since_last_event',
 'n_device',
 'n_events_per_category_id',
 'n_events_per_device',
 'n_events_per_hour',
 'n_events_per_website_id',
 'n_events_per_weekday',
 'n_unique_day',
 'n_unique_hour',
 'url',
 'website_id',
 'weekday',
 'xid',
 'zipcode']

Load step

Here, we use all the previous computations (saved in the columns of the dataframe) to compute aggregated informations about each user.

Code
def n_events_per_hour_loader(df):
    csr = df\
        .select('xid', 'hour', 'n_events_per_hour')\
        .withColumnRenamed('n_events_per_hour', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_hour#'), col('hour'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('hour')
    return csr

def n_events_per_website_id_loader(df):
    csr = df.select('xid', 'website_id', 'n_events_per_website_id')\
        .withColumnRenamed('n_events_per_hour', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_website_id#'),
                               col('website_id'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('website_id')
    return csr

def n_events_per_hour_loader(df):
    csr = df\
        .select('xid', 'hour', 'n_events_per_hour')\
        .withColumnRenamed('n_events_per_hour', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_hour#'), col('hour'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('hour')
    return csr

def n_events_per_weekday_loader(df):
    csr = df\
        .select('xid', 'weekday', 'n_events_per_weekday')\
        .withColumnRenamed('n_events_per_weekday', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_weekday#'), col('weekday'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('weekday')
    return csr

def n_days_since_last_event_loader(df):
    csr = df.select('xid',  'n_days_since_last_event')\
        .withColumnRenamed('n_days_since_last_event#', 'value')\
        .distinct()
    feature_name = lit('n_days_since_last_event')
    csr = csr\
        .withColumn('feature_name', feature_name)
    return csr

def n_days_since_last_action_loader(df):
    csr = df.select('xid', 'action', 'n_days_since_last_action')\
        .withColumnRenamed('n_days_since_last_action', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_days_since_last_action#'), col('action'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('action')
    return csr

def n_unique_day_loader(df):
    csr = df.select('xid', 'n_unique_day')\
        .withColumnRenamed('n_unique_day', 'value')\
        .distinct()
    feature_name = lit('n_unique_day')
    csr = csr\
        .withColumn('feature_name', feature_name)
    return csr

def n_unique_hour_loader(df):
    csr = df.select('xid', 'n_unique_hour')\
        .withColumnRenamed('n_unique_hour', 'value')\
        .distinct()
    feature_name = lit('n_unique_hour')
    csr = csr\
        .withColumn('feature_name', feature_name)
    return csr

def n_events_per_device_loader(df):
    csr = df\
        .select('xid', 'device', 'n_events_per_device')\
        .withColumnRenamed('n_events_per_device', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_device#'), col('device'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('device')
    return csr

def n_unique_device_loader(df):
    csr = df.select('xid', 'n_device')\
        .withColumnRenamed('n_device', 'value')\
        .distinct()
    feature_name = lit('n_device')
    csr = csr\
        .withColumn('feature_name', feature_name)
    return csr

def n_events_per_category_id_loader(df):
    csr = df.select('xid', 'category_id', 'n_events_per_category_id')\
        .withColumnRenamed('n_events_per_category_id', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_category_id#'),
                               col('category_id'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('category_id')
    return csr

def n_actions_per_category_id_loader(df):
    csr = df.select('xid', 'category_id', 'action', 'n_actions_per_category_id')\
        .withColumnRenamed('n_actions_per_category_id', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_actions_per_category_id#'),
                               col('action'), lit('#'), 
                               col('category_id'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('category_id')\
        .drop('action')
    return csr

def n_events_per_website_id_loader(df):
    csr = df.select('xid', 'website_id', 'n_events_per_website_id')\
        .withColumnRenamed('n_events_per_website_id', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_website_id#'),
                               col('website_id'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('website_id')
    return csr
Code
from functools import reduce

loaders = [
    n_events_per_hour_loader,
    n_events_per_website_id_loader,
    n_events_per_hour_loader,
    n_events_per_weekday_loader,
    n_days_since_last_event_loader,
    n_days_since_last_action_loader,
    n_unique_day_loader,
    n_unique_hour_loader,
    n_events_per_device_loader,
    n_unique_device_loader,
    n_events_per_category_id_loader,
    n_actions_per_category_id_loader,
    n_events_per_website_id_loader,
]

def union(df, other):
    return df.union(other)

csr = reduce(
    lambda df1, df2: df1.union(df2),
    [loader(df) for loader in loaders]
)

csr.head(n=3)
[Stage 38:=>(10 + 2) / 12][Stage 39:==>(9 + 3) / 12][Stage 40:=> (6 + 6) / 12][Stage 40:=> (6 + 6) / 12][Stage 41:=> (6 + 6) / 12][Stage 42:=> (6 + 6) / 12][Stage 43:==>(8 + 4) / 12][Stage 44:=> (6 + 6) / 12][Stage 45:>  (3 + 9) / 12][Stage 44:=>(10 + 2) / 12][Stage 45:=> (6 + 6) / 12][Stage 46:=> (6 + 6) / 12][Stage 48:>               (0 + 19) / 19][Stage 50:>                (1 + 1) / 20][Stage 50:==============>                                         (5 + 15) / 20][Stage 64:>                                                         (0 + 1) / 1]                                                                                
[Row(xid='000095cc-9a61-49b5-8ad5-83442daa93d6', value=2.0, feature_name='n_events_per_hour#21'),
 Row(xid='0000fa20-47ca-4548-82e9-78d81aa83fba', value=1.0, feature_name='n_events_per_hour#23'),
 Row(xid='00010386-a996-48ad-9888-4df5440188f2', value=1.0, feature_name='n_events_per_hour#21')]
Code
csr.columns
['xid', 'value', 'feature_name']
Code
csr.count()
[Stage 65:=>(10 + 2) / 12][Stage 67:=> (7 + 5) / 12][Stage 68:==>(9 + 3) / 12][Stage 69:=========>       (7 + 5) / 12][Stage 70:===========>     (8 + 4) / 12][Stage 72:=> (7 + 5) / 12][Stage 73:=> (6 + 6) / 12][Stage 74:=> (6 + 6) / 12][Stage 75:==========================================>              (9 + 3) / 12]25/04/03 15:11:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
[Stage 93:>                                                       (0 + 20) / 20]25/04/03 15:11:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
[19,167s][warning][gc,alloc] Executor task launch worker for task 7.0 in stage 93.0 (TID 527): Retried waiting for GCLocker too often allocating 524290 words
[19,195s][warning][gc,alloc] Executor task launch worker for task 9.0 in stage 93.0 (TID 529): Retried waiting for GCLocker too often allocating 524290 words
[19,195s][warning][gc,alloc] Executor task launch worker for task 7.0 in stage 93.0 (TID 527): Retried waiting for GCLocker too often allocating 524290 words
[19,214s][warning][gc,alloc] Executor task launch worker for task 7.0 in stage 93.0 (TID 527): Retried waiting for GCLocker too often allocating 486289 words
[19,215s][warning][gc,alloc] Executor task launch worker for task 9.0 in stage 93.0 (TID 529): Retried waiting for GCLocker too often allocating 131074 words
[19,215s][warning][gc,alloc] Executor task launch worker for task 1.0 in stage 93.0 (TID 521): Retried waiting for GCLocker too often allocating 131074 words
[19,218s][warning][gc,alloc] Executor task launch worker for task 3.0 in stage 93.0 (TID 523): Retried waiting for GCLocker too often allocating 524290 words
[19,225s][warning][gc,alloc] Executor task launch worker for task 3.0 in stage 93.0 (TID 523): Retried waiting for GCLocker too often allocating 131074 words
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (3890290 bytes), try again.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/04/03 15:11:08 ERROR Executor: Exception in task 9.0 in stage 93.0 (TID 529)
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR Executor: Exception in task 3.0 in stage 93.0 (TID 523)
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR Executor: Exception in task 1.0 in stage 93.0 (TID 521)
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 1.0 in stage 93.0 (TID 521),5,main]
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker for task 9.0 in stage 93.0 (TID 529),5,main]
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker for task 3.0 in stage 93.0 (TID 523),5,main]
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 WARN TaskSetManager: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

25/04/03 15:11:08 ERROR TaskSetManager: Task 1 in stage 93.0 failed 1 times; aborting job
25/04/03 15:11:08 WARN TaskSetManager: Lost task 18.0 in stage 93.0 (TID 538) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 14.0 in stage 93.0 (TID 534) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 4.0 in stage 93.0 (TID 524) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 15.0 in stage 93.0 (TID 535) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 12.0 in stage 93.0 (TID 532) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 5.0 in stage 93.0 (TID 525) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 7.0 in stage 93.0 (TID 527) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 0.0 in stage 93.0 (TID 520) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 2.0 in stage 93.0 (TID 522) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 17.0 in stage 93.0 (TID 537) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 13.0 in stage 93.0 (TID 533) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 8.0 in stage 93.0 (TID 528) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 19.0 in stage 93.0 (TID 539) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 11.0 in stage 93.0 (TID 531) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 6.0 in stage 93.0 (TID 526) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 16.0 in stage 93.0 (TID 536) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 10.0 in stage 93.0 (TID 530) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/IPython/core/interactiveshell.py", line 3577, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_87987/3594614522.py", line 1, in <module>
    csr.count()
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py", line 1240, in count
    return int(self._jdf.count())
               ^^^^^^^^^^^^^^^^^
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
    [... skipping hidden 1 frame]

Cell In[26], line 1
----> 1 csr.count()

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:1240, in DataFrame.count(self)
   1218 """Returns the number of rows in this :class:`DataFrame`.
   1219 
   1220 .. versionadded:: 1.3.0
   (...)
   1238 3
   1239 """
-> 1240 return int(self._jdf.count())

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw)
    178 try:
--> 179     return f(*a, **kw)
    180 except Py4JJavaError as e:

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:

<class 'str'>: (<class 'ConnectionRefusedError'>, ConnectionRefusedError(111, 'Connection refused'))

During handling of the above exception, another exception occurred:

ConnectionRefusedError                    Traceback (most recent call last)
    [... skipping hidden 1 frame]

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/IPython/core/interactiveshell.py:2179, in InteractiveShell.showtraceback(self, exc_tuple, filename, tb_offset, exception_only, running_compiled_code)
   2176         traceback.print_exc()
   2177         return None
-> 2179     self._showtraceback(etype, value, stb)
   2180 if self.call_pdb:
   2181     # drop into debugger
   2182     self.debugger(force=True)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/ipykernel/zmqshell.py:559, in ZMQInteractiveShell._showtraceback(self, etype, evalue, stb)
    553 sys.stdout.flush()
    554 sys.stderr.flush()
    556 exc_content = {
    557     "traceback": stb,
    558     "ename": str(etype.__name__),
--> 559     "evalue": str(evalue),
    560 }
    562 dh = self.displayhook
    563 # Send exception info over pub socket for other clients than the caller
    564 # to pick up

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/protocol.py:471, in Py4JJavaError.__str__(self)
    469 def __str__(self):
    470     gateway_client = self.java_exception._gateway_client
--> 471     answer = gateway_client.send_command(self.exception_cmd)
    472     return_value = get_return_value(answer, gateway_client, None, None)
    473     # Note: technically this should return a bytestring 'str' rather than
    474     # unicodes in Python 2; however, it can return unicodes for now.
    475     # See https://github.com/bartdag/py4j/issues/306 for more details.

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary)
   1015 def send_command(self, command, retry=True, binary=False):
   1016     """Sends a command to the JVM. This method is not intended to be
   1017        called directly by Py4J users. It is usually called by
   1018        :class:`JavaMember` instances.
   (...)
   1034      if `binary` is `True`.
   1035     """
-> 1036     connection = self._get_connection()
   1037     try:
   1038         response = connection.send_command(command)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self)
    281     pass
    283 if connection is None or connection.socket is None:
--> 284     connection = self._create_new_connection()
    285 return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self)
    287 def _create_new_connection(self):
    288     connection = ClientServerConnection(
    289         self.java_parameters, self.python_parameters,
    290         self.gateway_property, self)
--> 291     connection.connect_to_java_server()
    292     self.set_thread_connection(connection)
    293     return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self)
    435 if self.ssl_context:
    436     self.socket = self.ssl_context.wrap_socket(
    437         self.socket, server_hostname=self.java_address)
--> 438 self.socket.connect((self.java_address, self.java_port))
    439 self.stream = self.socket.makefile("rb")
    440 self.is_connected = True

ConnectionRefusedError: [Errno 111] Connection refused
Code
# Replace features names and xid by a unique number
feature_name_partition = Window().orderBy('feature_name')
xid_partition = Window().orderBy('xid')

col_idx = func.dense_rank().over(feature_name_partition)
row_idx = func.dense_rank().over(xid_partition)

csr = csr.withColumn('col', col_idx)\
    .withColumn('row', row_idx)

csr = csr.na.drop('any')

csr.head(n=5)
---------------------------------------------------------------------------
ConnectionRefusedError                    Traceback (most recent call last)
Cell In[27], line 2
      1 # Replace features names and xid by a unique number
----> 2 feature_name_partition = Window().orderBy('feature_name')
      3 xid_partition = Window().orderBy('xid')
      5 col_idx = func.dense_rank().over(feature_name_partition)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/utils.py:222, in try_remote_window.<locals>.wrapped(*args, **kwargs)
    220     return getattr(Window, f.__name__)(*args, **kwargs)
    221 else:
--> 222     return f(*args, **kwargs)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/window.py:186, in Window.orderBy(*cols)
    137 """
    138 Creates a :class:`WindowSpec` with the ordering defined.
    139 
   (...)
    183 +---+--------+----------+
    184 """
    185 sc = get_active_spark_context()
--> 186 jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.orderBy(
    187     _to_java_cols(cols)
    188 )
    189 return WindowSpec(jspec)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name)
   1709 if name == UserHelpAutoCompletion.KEY:
   1710     return UserHelpAutoCompletion()
-> 1712 answer = self._gateway_client.send_command(
   1713     proto.REFLECTION_COMMAND_NAME +
   1714     proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id +
   1715     "\n" + proto.END_COMMAND_PART)
   1716 if answer == proto.SUCCESS_PACKAGE:
   1717     return JavaPackage(name, self._gateway_client, jvm_id=self._id)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary)
   1015 def send_command(self, command, retry=True, binary=False):
   1016     """Sends a command to the JVM. This method is not intended to be
   1017        called directly by Py4J users. It is usually called by
   1018        :class:`JavaMember` instances.
   (...)
   1034      if `binary` is `True`.
   1035     """
-> 1036     connection = self._get_connection()
   1037     try:
   1038         response = connection.send_command(command)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self)
    281     pass
    283 if connection is None or connection.socket is None:
--> 284     connection = self._create_new_connection()
    285 return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self)
    287 def _create_new_connection(self):
    288     connection = ClientServerConnection(
    289         self.java_parameters, self.python_parameters,
    290         self.gateway_property, self)
--> 291     connection.connect_to_java_server()
    292     self.set_thread_connection(connection)
    293     return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self)
    435 if self.ssl_context:
    436     self.socket = self.ssl_context.wrap_socket(
    437         self.socket, server_hostname=self.java_address)
--> 438 self.socket.connect((self.java_address, self.java_port))
    439 self.stream = self.socket.makefile("rb")
    440 self.is_connected = True

ConnectionRefusedError: [Errno 111] Connection refused
Code
# Let's save the result of our hard work into a new parquet file
output_path = './'
output_file = os.path.join(output_path, 'csr.parquet')
csr.write.parquet(output_file, mode='overwrite')
---------------------------------------------------------------------------
ConnectionRefusedError                    Traceback (most recent call last)
Cell In[28], line 4
      2 output_path = './'
      3 output_file = os.path.join(output_path, 'csr.parquet')
----> 4 csr.write.parquet(output_file, mode='overwrite')

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:509, in DataFrame.write(self)
    482 @property
    483 def write(self) -> DataFrameWriter:
    484     """
    485     Interface for saving the content of the non-streaming :class:`DataFrame` out into external
    486     storage.
   (...)
    507     >>> _ = spark.sql("DROP TABLE tab2")
    508     """
--> 509     return DataFrameWriter(self)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:964, in DataFrameWriter.__init__(self, df)
    962 self._df = df
    963 self._spark = df.sparkSession
--> 964 self._jwrite = df._jdf.write()

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1314 args_command, temp_args = self._build_args(*args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
-> 1321 answer = self.gateway_client.send_command(command)
   1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary)
   1015 def send_command(self, command, retry=True, binary=False):
   1016     """Sends a command to the JVM. This method is not intended to be
   1017        called directly by Py4J users. It is usually called by
   1018        :class:`JavaMember` instances.
   (...)
   1034      if `binary` is `True`.
   1035     """
-> 1036     connection = self._get_connection()
   1037     try:
   1038         response = connection.send_command(command)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self)
    281     pass
    283 if connection is None or connection.socket is None:
--> 284     connection = self._create_new_connection()
    285 return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self)
    287 def _create_new_connection(self):
    288     connection = ClientServerConnection(
    289         self.java_parameters, self.python_parameters,
    290         self.gateway_property, self)
--> 291     connection.connect_to_java_server()
    292     self.set_thread_connection(connection)
    293     return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self)
    435 if self.ssl_context:
    436     self.socket = self.ssl_context.wrap_socket(
    437         self.socket, server_hostname=self.java_address)
--> 438 self.socket.connect((self.java_address, self.java_port))
    439 self.stream = self.socket.makefile("rb")
    440 self.is_connected = True

ConnectionRefusedError: [Errno 111] Connection refused

Preparation of the training dataset

Code
csr_path = './'
csr_file = os.path.join(csr_path, 'csr.parquet')

df = spark.read.parquet(csr_file)
df.head(n=5)
---------------------------------------------------------------------------
ConnectionRefusedError                    Traceback (most recent call last)
Cell In[29], line 4
      1 csr_path = './'
      2 csr_file = os.path.join(csr_path, 'csr.parquet')
----> 4 df = spark.read.parquet(csr_file)
      5 df.head(n=5)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/session.py:1706, in SparkSession.read(self)
   1669 @property
   1670 def read(self) -> DataFrameReader:
   1671     """
   1672     Returns a :class:`DataFrameReader` that can be used to read data
   1673     in as a :class:`DataFrame`.
   (...)
   1704     +---+------------+
   1705     """
-> 1706     return DataFrameReader(self)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:70, in DataFrameReader.__init__(self, spark)
     69 def __init__(self, spark: "SparkSession"):
---> 70     self._jreader = spark._jsparkSession.read()
     71     self._spark = spark

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1314 args_command, temp_args = self._build_args(*args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
-> 1321 answer = self.gateway_client.send_command(command)
   1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary)
   1015 def send_command(self, command, retry=True, binary=False):
   1016     """Sends a command to the JVM. This method is not intended to be
   1017        called directly by Py4J users. It is usually called by
   1018        :class:`JavaMember` instances.
   (...)
   1034      if `binary` is `True`.
   1035     """
-> 1036     connection = self._get_connection()
   1037     try:
   1038         response = connection.send_command(command)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self)
    281     pass
    283 if connection is None or connection.socket is None:
--> 284     connection = self._create_new_connection()
    285 return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self)
    287 def _create_new_connection(self):
    288     connection = ClientServerConnection(
    289         self.java_parameters, self.python_parameters,
    290         self.gateway_property, self)
--> 291     connection.connect_to_java_server()
    292     self.set_thread_connection(connection)
    293     return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self)
    435 if self.ssl_context:
    436     self.socket = self.ssl_context.wrap_socket(
    437         self.socket, server_hostname=self.java_address)
--> 438 self.socket.connect((self.java_address, self.java_port))
    439 self.stream = self.socket.makefile("rb")
    440 self.is_connected = True

ConnectionRefusedError: [Errno 111] Connection refused
Code
df.count()
---------------------------------------------------------------------------
ConnectionRefusedError                    Traceback (most recent call last)
Cell In[30], line 1
----> 1 df.count()

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:1240, in DataFrame.count(self)
   1217 def count(self) -> int:
   1218     """Returns the number of rows in this :class:`DataFrame`.
   1219 
   1220     .. versionadded:: 1.3.0
   (...)
   1238     3
   1239     """
-> 1240     return int(self._jdf.count())

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1314 args_command, temp_args = self._build_args(*args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
-> 1321 answer = self.gateway_client.send_command(command)
   1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary)
   1015 def send_command(self, command, retry=True, binary=False):
   1016     """Sends a command to the JVM. This method is not intended to be
   1017        called directly by Py4J users. It is usually called by
   1018        :class:`JavaMember` instances.
   (...)
   1034      if `binary` is `True`.
   1035     """
-> 1036     connection = self._get_connection()
   1037     try:
   1038         response = connection.send_command(command)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self)
    281     pass
    283 if connection is None or connection.socket is None:
--> 284     connection = self._create_new_connection()
    285 return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self)
    287 def _create_new_connection(self):
    288     connection = ClientServerConnection(
    289         self.java_parameters, self.python_parameters,
    290         self.gateway_property, self)
--> 291     connection.connect_to_java_server()
    292     self.set_thread_connection(connection)
    293     return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self)
    435 if self.ssl_context:
    436     self.socket = self.ssl_context.wrap_socket(
    437         self.socket, server_hostname=self.java_address)
--> 438 self.socket.connect((self.java_address, self.java_port))
    439 self.stream = self.socket.makefile("rb")
    440 self.is_connected = True

ConnectionRefusedError: [Errno 111] Connection refused
Code
# What are the features related to campaign_id 1204 ?
features_names = \
    df.select('feature_name')\
    .distinct()\
    .toPandas()['feature_name']
---------------------------------------------------------------------------
ConnectionRefusedError                    Traceback (most recent call last)
Cell In[31], line 3
      1 # What are the features related to campaign_id 1204 ?
      2 features_names = \
----> 3     df.select('feature_name')\
      4     .distinct()\
      5     .toPandas()['feature_name']

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:3229, in DataFrame.select(self, *cols)
   3184 def select(self, *cols: "ColumnOrName") -> "DataFrame":  # type: ignore[misc]
   3185     """Projects a set of expressions and returns a new :class:`DataFrame`.
   3186 
   3187     .. versionadded:: 1.3.0
   (...)
   3227     +-----+---+
   3228     """
-> 3229     jdf = self._jdf.select(self._jcols(*cols))
   3230     return DataFrame(jdf, self.sparkSession)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2766, in DataFrame._jcols(self, *cols)
   2764 if len(cols) == 1 and isinstance(cols[0], list):
   2765     cols = cols[0]
-> 2766 return self._jseq(cols, _to_java_column)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2753, in DataFrame._jseq(self, cols, converter)
   2747 def _jseq(
   2748     self,
   2749     cols: Sequence,
   2750     converter: Optional[Callable[..., Union["PrimitiveType", JavaObject]]] = None,
   2751 ) -> JavaObject:
   2752     """Return a JVM Seq of Columns from a list of Column or names"""
-> 2753     return _to_seq(self.sparkSession._sc, cols, converter)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:88, in _to_seq(sc, cols, converter)
     81 """
     82 Convert a list of Columns (or names) into a JVM Seq of Column.
     83 
     84 An optional `converter` could be used to convert items in `cols`
     85 into JVM Column objects.
     86 """
     87 if converter:
---> 88     cols = [converter(c) for c in cols]
     89 assert sc._jvm is not None
     90 return sc._jvm.PythonUtils.toSeq(cols)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:63, in _to_java_column(col)
     61     jcol = col._jc
     62 elif isinstance(col, str):
---> 63     jcol = _create_column_from_name(col)
     64 else:
     65     raise PySparkTypeError(
     66         error_class="NOT_COLUMN_OR_STR",
     67         message_parameters={"arg_name": "col", "arg_type": type(col).__name__},
     68     )

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:56, in _create_column_from_name(name)
     54 def _create_column_from_name(name: str) -> "Column":
     55     sc = get_active_spark_context()
---> 56     return cast(JVMView, sc._jvm).functions.col(name)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name)
   1709 if name == UserHelpAutoCompletion.KEY:
   1710     return UserHelpAutoCompletion()
-> 1712 answer = self._gateway_client.send_command(
   1713     proto.REFLECTION_COMMAND_NAME +
   1714     proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id +
   1715     "\n" + proto.END_COMMAND_PART)
   1716 if answer == proto.SUCCESS_PACKAGE:
   1717     return JavaPackage(name, self._gateway_client, jvm_id=self._id)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary)
   1015 def send_command(self, command, retry=True, binary=False):
   1016     """Sends a command to the JVM. This method is not intended to be
   1017        called directly by Py4J users. It is usually called by
   1018        :class:`JavaMember` instances.
   (...)
   1034      if `binary` is `True`.
   1035     """
-> 1036     connection = self._get_connection()
   1037     try:
   1038         response = connection.send_command(command)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self)
    281     pass
    283 if connection is None or connection.socket is None:
--> 284     connection = self._create_new_connection()
    285 return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self)
    287 def _create_new_connection(self):
    288     connection = ClientServerConnection(
    289         self.java_parameters, self.python_parameters,
    290         self.gateway_property, self)
--> 291     connection.connect_to_java_server()
    292     self.set_thread_connection(connection)
    293     return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self)
    435 if self.ssl_context:
    436     self.socket = self.ssl_context.wrap_socket(
    437         self.socket, server_hostname=self.java_address)
--> 438 self.socket.connect((self.java_address, self.java_port))
    439 self.stream = self.socket.makefile("rb")
    440 self.is_connected = True

ConnectionRefusedError: [Errno 111] Connection refused
Code
features_names
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[32], line 1
----> 1 features_names

NameError: name 'features_names' is not defined
Code
[feature_name for feature_name in features_names if '1204' in feature_name]
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[33], line 1
----> 1 [feature_name for feature_name in features_names if '1204' in feature_name]

NameError: name 'features_names' is not defined
Code
# Look for the xid that have at least one exposure to campaign 1204
keep = func.when(
    (col('feature_name') == 'n_actions_per_category_id#C#1204.0') |
    (col('feature_name') == 'n_actions_per_category_id#O#1204.0'),
    1).otherwise(0)
df = df.withColumn('keep', keep)

df.where(col('keep') > 0).count()
---------------------------------------------------------------------------
ConnectionRefusedError                    Traceback (most recent call last)
Cell In[34], line 3
      1 # Look for the xid that have at least one exposure to campaign 1204
      2 keep = func.when(
----> 3     (col('feature_name') == 'n_actions_per_category_id#C#1204.0') |
      4     (col('feature_name') == 'n_actions_per_category_id#O#1204.0'),
      5     1).otherwise(0)
      6 df = df.withColumn('keep', keep)
      8 df.where(col('keep') > 0).count()

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/utils.py:174, in try_remote_functions.<locals>.wrapped(*args, **kwargs)
    172     return getattr(functions, f.__name__)(*args, **kwargs)
    173 else:
--> 174     return f(*args, **kwargs)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:223, in col(col)
    196 @try_remote_functions
    197 def col(col: str) -> Column:
    198     """
    199     Returns a :class:`~pyspark.sql.Column` based on the given column name.
    200 
   (...)
    221     Column<'x'>
    222     """
--> 223     return _invoke_function("col", col)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:96, in _invoke_function(name, *args)
     91 """
     92 Invokes JVM function identified by name with args
     93 and wraps the result with :class:`~pyspark.sql.Column`.
     94 """
     95 assert SparkContext._active_spark_context is not None
---> 96 jf = _get_jvm_function(name, SparkContext._active_spark_context)
     97 return Column(jf(*args))

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:87, in _get_jvm_function(name, sc)
     82 """
     83 Retrieves JVM function identified by name from
     84 Java gateway associated with sc.
     85 """
     86 assert sc._jvm is not None
---> 87 return getattr(sc._jvm.functions, name)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name)
   1709 if name == UserHelpAutoCompletion.KEY:
   1710     return UserHelpAutoCompletion()
-> 1712 answer = self._gateway_client.send_command(
   1713     proto.REFLECTION_COMMAND_NAME +
   1714     proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id +
   1715     "\n" + proto.END_COMMAND_PART)
   1716 if answer == proto.SUCCESS_PACKAGE:
   1717     return JavaPackage(name, self._gateway_client, jvm_id=self._id)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary)
   1015 def send_command(self, command, retry=True, binary=False):
   1016     """Sends a command to the JVM. This method is not intended to be
   1017        called directly by Py4J users. It is usually called by
   1018        :class:`JavaMember` instances.
   (...)
   1034      if `binary` is `True`.
   1035     """
-> 1036     connection = self._get_connection()
   1037     try:
   1038         response = connection.send_command(command)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self)
    281     pass
    283 if connection is None or connection.socket is None:
--> 284     connection = self._create_new_connection()
    285 return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self)
    287 def _create_new_connection(self):
    288     connection = ClientServerConnection(
    289         self.java_parameters, self.python_parameters,
    290         self.gateway_property, self)
--> 291     connection.connect_to_java_server()
    292     self.set_thread_connection(connection)
    293     return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self)
    435 if self.ssl_context:
    436     self.socket = self.ssl_context.wrap_socket(
    437         self.socket, server_hostname=self.java_address)
--> 438 self.socket.connect((self.java_address, self.java_port))
    439 self.stream = self.socket.makefile("rb")
    440 self.is_connected = True

ConnectionRefusedError: [Errno 111] Connection refused
Code
# Sum of the keeps :)
xid_partition = Window.partitionBy('xid')
sum_keep = func.sum(col('keep')).over(xid_partition)
df = df.withColumn('sum_keep', sum_keep)
---------------------------------------------------------------------------
ConnectionRefusedError                    Traceback (most recent call last)
Cell In[35], line 2
      1 # Sum of the keeps :)
----> 2 xid_partition = Window.partitionBy('xid')
      3 sum_keep = func.sum(col('keep')).over(xid_partition)
      4 df = df.withColumn('sum_keep', sum_keep)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/utils.py:222, in try_remote_window.<locals>.wrapped(*args, **kwargs)
    220     return getattr(Window, f.__name__)(*args, **kwargs)
    221 else:
--> 222     return f(*args, **kwargs)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/window.py:129, in Window.partitionBy(*cols)
     80 """
     81 Creates a :class:`WindowSpec` with the partitioning defined.
     82 
   (...)
    126 +---+--------+----------+
    127 """
    128 sc = get_active_spark_context()
--> 129 jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.partitionBy(
    130     _to_java_cols(cols)
    131 )
    132 return WindowSpec(jspec)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name)
   1709 if name == UserHelpAutoCompletion.KEY:
   1710     return UserHelpAutoCompletion()
-> 1712 answer = self._gateway_client.send_command(
   1713     proto.REFLECTION_COMMAND_NAME +
   1714     proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id +
   1715     "\n" + proto.END_COMMAND_PART)
   1716 if answer == proto.SUCCESS_PACKAGE:
   1717     return JavaPackage(name, self._gateway_client, jvm_id=self._id)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary)
   1015 def send_command(self, command, retry=True, binary=False):
   1016     """Sends a command to the JVM. This method is not intended to be
   1017        called directly by Py4J users. It is usually called by
   1018        :class:`JavaMember` instances.
   (...)
   1034      if `binary` is `True`.
   1035     """
-> 1036     connection = self._get_connection()
   1037     try:
   1038         response = connection.send_command(command)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self)
    281     pass
    283 if connection is None or connection.socket is None:
--> 284     connection = self._create_new_connection()
    285 return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self)
    287 def _create_new_connection(self):
    288     connection = ClientServerConnection(
    289         self.java_parameters, self.python_parameters,
    290         self.gateway_property, self)
--> 291     connection.connect_to_java_server()
    292     self.set_thread_connection(connection)
    293     return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self)
    435 if self.ssl_context:
    436     self.socket = self.ssl_context.wrap_socket(
    437         self.socket, server_hostname=self.java_address)
--> 438 self.socket.connect((self.java_address, self.java_port))
    439 self.stream = self.socket.makefile("rb")
    440 self.is_connected = True

ConnectionRefusedError: [Errno 111] Connection refused
Code
# Let's keep the xid exposed to 1204
df = df.where(col('sum_keep') > 0)
---------------------------------------------------------------------------
ConnectionRefusedError                    Traceback (most recent call last)
Cell In[36], line 2
      1 # Let's keep the xid exposed to 1204
----> 2 df = df.where(col('sum_keep') > 0)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/utils.py:174, in try_remote_functions.<locals>.wrapped(*args, **kwargs)
    172     return getattr(functions, f.__name__)(*args, **kwargs)
    173 else:
--> 174     return f(*args, **kwargs)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:223, in col(col)
    196 @try_remote_functions
    197 def col(col: str) -> Column:
    198     """
    199     Returns a :class:`~pyspark.sql.Column` based on the given column name.
    200 
   (...)
    221     Column<'x'>
    222     """
--> 223     return _invoke_function("col", col)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:96, in _invoke_function(name, *args)
     91 """
     92 Invokes JVM function identified by name with args
     93 and wraps the result with :class:`~pyspark.sql.Column`.
     94 """
     95 assert SparkContext._active_spark_context is not None
---> 96 jf = _get_jvm_function(name, SparkContext._active_spark_context)
     97 return Column(jf(*args))

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:87, in _get_jvm_function(name, sc)
     82 """
     83 Retrieves JVM function identified by name from
     84 Java gateway associated with sc.
     85 """
     86 assert sc._jvm is not None
---> 87 return getattr(sc._jvm.functions, name)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name)
   1709 if name == UserHelpAutoCompletion.KEY:
   1710     return UserHelpAutoCompletion()
-> 1712 answer = self._gateway_client.send_command(
   1713     proto.REFLECTION_COMMAND_NAME +
   1714     proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id +
   1715     "\n" + proto.END_COMMAND_PART)
   1716 if answer == proto.SUCCESS_PACKAGE:
   1717     return JavaPackage(name, self._gateway_client, jvm_id=self._id)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary)
   1015 def send_command(self, command, retry=True, binary=False):
   1016     """Sends a command to the JVM. This method is not intended to be
   1017        called directly by Py4J users. It is usually called by
   1018        :class:`JavaMember` instances.
   (...)
   1034      if `binary` is `True`.
   1035     """
-> 1036     connection = self._get_connection()
   1037     try:
   1038         response = connection.send_command(command)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self)
    281     pass
    283 if connection is None or connection.socket is None:
--> 284     connection = self._create_new_connection()
    285 return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self)
    287 def _create_new_connection(self):
    288     connection = ClientServerConnection(
    289         self.java_parameters, self.python_parameters,
    290         self.gateway_property, self)
--> 291     connection.connect_to_java_server()
    292     self.set_thread_connection(connection)
    293     return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self)
    435 if self.ssl_context:
    436     self.socket = self.ssl_context.wrap_socket(
    437         self.socket, server_hostname=self.java_address)
--> 438 self.socket.connect((self.java_address, self.java_port))
    439 self.stream = self.socket.makefile("rb")
    440 self.is_connected = True

ConnectionRefusedError: [Errno 111] Connection refused
Code
df.count()
---------------------------------------------------------------------------
ConnectionRefusedError                    Traceback (most recent call last)
Cell In[37], line 1
----> 1 df.count()

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:1240, in DataFrame.count(self)
   1217 def count(self) -> int:
   1218     """Returns the number of rows in this :class:`DataFrame`.
   1219 
   1220     .. versionadded:: 1.3.0
   (...)
   1238     3
   1239     """
-> 1240     return int(self._jdf.count())

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1314 args_command, temp_args = self._build_args(*args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
-> 1321 answer = self.gateway_client.send_command(command)
   1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary)
   1015 def send_command(self, command, retry=True, binary=False):
   1016     """Sends a command to the JVM. This method is not intended to be
   1017        called directly by Py4J users. It is usually called by
   1018        :class:`JavaMember` instances.
   (...)
   1034      if `binary` is `True`.
   1035     """
-> 1036     connection = self._get_connection()
   1037     try:
   1038         response = connection.send_command(command)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self)
    281     pass
    283 if connection is None or connection.socket is None:
--> 284     connection = self._create_new_connection()
    285 return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self)
    287 def _create_new_connection(self):
    288     connection = ClientServerConnection(
    289         self.java_parameters, self.python_parameters,
    290         self.gateway_property, self)
--> 291     connection.connect_to_java_server()
    292     self.set_thread_connection(connection)
    293     return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self)
    435 if self.ssl_context:
    436     self.socket = self.ssl_context.wrap_socket(
    437         self.socket, server_hostname=self.java_address)
--> 438 self.socket.connect((self.java_address, self.java_port))
    439 self.stream = self.socket.makefile("rb")
    440 self.is_connected = True

ConnectionRefusedError: [Errno 111] Connection refused
Code
df.select('xid').distinct().count()
---------------------------------------------------------------------------
ConnectionRefusedError                    Traceback (most recent call last)
Cell In[38], line 1
----> 1 df.select('xid').distinct().count()

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:3229, in DataFrame.select(self, *cols)
   3184 def select(self, *cols: "ColumnOrName") -> "DataFrame":  # type: ignore[misc]
   3185     """Projects a set of expressions and returns a new :class:`DataFrame`.
   3186 
   3187     .. versionadded:: 1.3.0
   (...)
   3227     +-----+---+
   3228     """
-> 3229     jdf = self._jdf.select(self._jcols(*cols))
   3230     return DataFrame(jdf, self.sparkSession)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2766, in DataFrame._jcols(self, *cols)
   2764 if len(cols) == 1 and isinstance(cols[0], list):
   2765     cols = cols[0]
-> 2766 return self._jseq(cols, _to_java_column)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2753, in DataFrame._jseq(self, cols, converter)
   2747 def _jseq(
   2748     self,
   2749     cols: Sequence,
   2750     converter: Optional[Callable[..., Union["PrimitiveType", JavaObject]]] = None,
   2751 ) -> JavaObject:
   2752     """Return a JVM Seq of Columns from a list of Column or names"""
-> 2753     return _to_seq(self.sparkSession._sc, cols, converter)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:88, in _to_seq(sc, cols, converter)
     81 """
     82 Convert a list of Columns (or names) into a JVM Seq of Column.
     83 
     84 An optional `converter` could be used to convert items in `cols`
     85 into JVM Column objects.
     86 """
     87 if converter:
---> 88     cols = [converter(c) for c in cols]
     89 assert sc._jvm is not None
     90 return sc._jvm.PythonUtils.toSeq(cols)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:63, in _to_java_column(col)
     61     jcol = col._jc
     62 elif isinstance(col, str):
---> 63     jcol = _create_column_from_name(col)
     64 else:
     65     raise PySparkTypeError(
     66         error_class="NOT_COLUMN_OR_STR",
     67         message_parameters={"arg_name": "col", "arg_type": type(col).__name__},
     68     )

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:56, in _create_column_from_name(name)
     54 def _create_column_from_name(name: str) -> "Column":
     55     sc = get_active_spark_context()
---> 56     return cast(JVMView, sc._jvm).functions.col(name)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name)
   1709 if name == UserHelpAutoCompletion.KEY:
   1710     return UserHelpAutoCompletion()
-> 1712 answer = self._gateway_client.send_command(
   1713     proto.REFLECTION_COMMAND_NAME +
   1714     proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id +
   1715     "\n" + proto.END_COMMAND_PART)
   1716 if answer == proto.SUCCESS_PACKAGE:
   1717     return JavaPackage(name, self._gateway_client, jvm_id=self._id)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary)
   1015 def send_command(self, command, retry=True, binary=False):
   1016     """Sends a command to the JVM. This method is not intended to be
   1017        called directly by Py4J users. It is usually called by
   1018        :class:`JavaMember` instances.
   (...)
   1034      if `binary` is `True`.
   1035     """
-> 1036     connection = self._get_connection()
   1037     try:
   1038         response = connection.send_command(command)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self)
    281     pass
    283 if connection is None or connection.socket is None:
--> 284     connection = self._create_new_connection()
    285 return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self)
    287 def _create_new_connection(self):
    288     connection = ClientServerConnection(
    289         self.java_parameters, self.python_parameters,
    290         self.gateway_property, self)
--> 291     connection.connect_to_java_server()
    292     self.set_thread_connection(connection)
    293     return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self)
    435 if self.ssl_context:
    436     self.socket = self.ssl_context.wrap_socket(
    437         self.socket, server_hostname=self.java_address)
--> 438 self.socket.connect((self.java_address, self.java_port))
    439 self.stream = self.socket.makefile("rb")
    440 self.is_connected = True

ConnectionRefusedError: [Errno 111] Connection refused
Code
row_partition = Window().orderBy('row')
col_partition = Window().orderBy('col')
row_new = func.dense_rank().over(row_partition)
col_new = func.dense_rank().over(col_partition)
df = df.withColumn('row_new', row_new)
df = df.withColumn('col_new', col_new)
csr_data = df.select('row_new', 'col_new', 'value').toPandas()
---------------------------------------------------------------------------
ConnectionRefusedError                    Traceback (most recent call last)
Cell In[39], line 1
----> 1 row_partition = Window().orderBy('row')
      2 col_partition = Window().orderBy('col')
      3 row_new = func.dense_rank().over(row_partition)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/utils.py:222, in try_remote_window.<locals>.wrapped(*args, **kwargs)
    220     return getattr(Window, f.__name__)(*args, **kwargs)
    221 else:
--> 222     return f(*args, **kwargs)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/window.py:186, in Window.orderBy(*cols)
    137 """
    138 Creates a :class:`WindowSpec` with the ordering defined.
    139 
   (...)
    183 +---+--------+----------+
    184 """
    185 sc = get_active_spark_context()
--> 186 jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.orderBy(
    187     _to_java_cols(cols)
    188 )
    189 return WindowSpec(jspec)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name)
   1709 if name == UserHelpAutoCompletion.KEY:
   1710     return UserHelpAutoCompletion()
-> 1712 answer = self._gateway_client.send_command(
   1713     proto.REFLECTION_COMMAND_NAME +
   1714     proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id +
   1715     "\n" + proto.END_COMMAND_PART)
   1716 if answer == proto.SUCCESS_PACKAGE:
   1717     return JavaPackage(name, self._gateway_client, jvm_id=self._id)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary)
   1015 def send_command(self, command, retry=True, binary=False):
   1016     """Sends a command to the JVM. This method is not intended to be
   1017        called directly by Py4J users. It is usually called by
   1018        :class:`JavaMember` instances.
   (...)
   1034      if `binary` is `True`.
   1035     """
-> 1036     connection = self._get_connection()
   1037     try:
   1038         response = connection.send_command(command)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self)
    281     pass
    283 if connection is None or connection.socket is None:
--> 284     connection = self._create_new_connection()
    285 return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self)
    287 def _create_new_connection(self):
    288     connection = ClientServerConnection(
    289         self.java_parameters, self.python_parameters,
    290         self.gateway_property, self)
--> 291     connection.connect_to_java_server()
    292     self.set_thread_connection(connection)
    293     return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self)
    435 if self.ssl_context:
    436     self.socket = self.ssl_context.wrap_socket(
    437         self.socket, server_hostname=self.java_address)
--> 438 self.socket.connect((self.java_address, self.java_port))
    439 self.stream = self.socket.makefile("rb")
    440 self.is_connected = True

ConnectionRefusedError: [Errno 111] Connection refused
Code
csr_data.head()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[40], line 1
----> 1 csr_data.head()

NameError: name 'csr_data' is not defined
Code
features_names = df.select('feature_name', 'col_new').distinct()
features_names.where(col('feature_name') == 'n_actions_per_category_id#C#1204.0').head()
---------------------------------------------------------------------------
ConnectionRefusedError                    Traceback (most recent call last)
Cell In[41], line 1
----> 1 features_names = df.select('feature_name', 'col_new').distinct()
      2 features_names.where(col('feature_name') == 'n_actions_per_category_id#C#1204.0').head()

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:3229, in DataFrame.select(self, *cols)
   3184 def select(self, *cols: "ColumnOrName") -> "DataFrame":  # type: ignore[misc]
   3185     """Projects a set of expressions and returns a new :class:`DataFrame`.
   3186 
   3187     .. versionadded:: 1.3.0
   (...)
   3227     +-----+---+
   3228     """
-> 3229     jdf = self._jdf.select(self._jcols(*cols))
   3230     return DataFrame(jdf, self.sparkSession)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2766, in DataFrame._jcols(self, *cols)
   2764 if len(cols) == 1 and isinstance(cols[0], list):
   2765     cols = cols[0]
-> 2766 return self._jseq(cols, _to_java_column)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2753, in DataFrame._jseq(self, cols, converter)
   2747 def _jseq(
   2748     self,
   2749     cols: Sequence,
   2750     converter: Optional[Callable[..., Union["PrimitiveType", JavaObject]]] = None,
   2751 ) -> JavaObject:
   2752     """Return a JVM Seq of Columns from a list of Column or names"""
-> 2753     return _to_seq(self.sparkSession._sc, cols, converter)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:88, in _to_seq(sc, cols, converter)
     81 """
     82 Convert a list of Columns (or names) into a JVM Seq of Column.
     83 
     84 An optional `converter` could be used to convert items in `cols`
     85 into JVM Column objects.
     86 """
     87 if converter:
---> 88     cols = [converter(c) for c in cols]
     89 assert sc._jvm is not None
     90 return sc._jvm.PythonUtils.toSeq(cols)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:63, in _to_java_column(col)
     61     jcol = col._jc
     62 elif isinstance(col, str):
---> 63     jcol = _create_column_from_name(col)
     64 else:
     65     raise PySparkTypeError(
     66         error_class="NOT_COLUMN_OR_STR",
     67         message_parameters={"arg_name": "col", "arg_type": type(col).__name__},
     68     )

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:56, in _create_column_from_name(name)
     54 def _create_column_from_name(name: str) -> "Column":
     55     sc = get_active_spark_context()
---> 56     return cast(JVMView, sc._jvm).functions.col(name)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name)
   1709 if name == UserHelpAutoCompletion.KEY:
   1710     return UserHelpAutoCompletion()
-> 1712 answer = self._gateway_client.send_command(
   1713     proto.REFLECTION_COMMAND_NAME +
   1714     proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id +
   1715     "\n" + proto.END_COMMAND_PART)
   1716 if answer == proto.SUCCESS_PACKAGE:
   1717     return JavaPackage(name, self._gateway_client, jvm_id=self._id)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary)
   1015 def send_command(self, command, retry=True, binary=False):
   1016     """Sends a command to the JVM. This method is not intended to be
   1017        called directly by Py4J users. It is usually called by
   1018        :class:`JavaMember` instances.
   (...)
   1034      if `binary` is `True`.
   1035     """
-> 1036     connection = self._get_connection()
   1037     try:
   1038         response = connection.send_command(command)

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self)
    281     pass
    283 if connection is None or connection.socket is None:
--> 284     connection = self._create_new_connection()
    285 return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self)
    287 def _create_new_connection(self):
    288     connection = ClientServerConnection(
    289         self.java_parameters, self.python_parameters,
    290         self.gateway_property, self)
--> 291     connection.connect_to_java_server()
    292     self.set_thread_connection(connection)
    293     return connection

File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self)
    435 if self.ssl_context:
    436     self.socket = self.ssl_context.wrap_socket(
    437         self.socket, server_hostname=self.java_address)
--> 438 self.socket.connect((self.java_address, self.java_port))
    439 self.stream = self.socket.makefile("rb")
    440 self.is_connected = True

ConnectionRefusedError: [Errno 111] Connection refused
Code
features_names.where(col('feature_name') == 'n_actions_per_category_id#O#1204.0').head()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[42], line 1
----> 1 features_names.where(col('feature_name') == 'n_actions_per_category_id#O#1204.0').head()

NameError: name 'features_names' is not defined
Code
from scipy.sparse import csr_matrix
import numpy as np

rows = csr_data['row_new'].values - 1
cols = csr_data['col_new'].values - 1
vals = csr_data['value'].values

X_csr = csr_matrix((vals, (rows, cols)))
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[43], line 4
      1 from scipy.sparse import csr_matrix
      2 import numpy as np
----> 4 rows = csr_data['row_new'].values - 1
      5 cols = csr_data['col_new'].values - 1
      6 vals = csr_data['value'].values

NameError: name 'csr_data' is not defined
Code
X_csr.shape
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[44], line 1
----> 1 X_csr.shape

NameError: name 'X_csr' is not defined
Code
X_csr.shape, X_csr.nnz
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[45], line 1
----> 1 X_csr.shape, X_csr.nnz

NameError: name 'X_csr' is not defined
Code
X_csr.nnz / (152347 * 92)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[46], line 1
----> 1 X_csr.nnz / (152347 * 92)

NameError: name 'X_csr' is not defined
Code
# The label vector. Let's make it dense, flat and binary
y = np.array(X_csr[:, 1].todense()).ravel()
y = np.array(y > 0, dtype=np.int64)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[47], line 2
      1 # The label vector. Let's make it dense, flat and binary
----> 2 y = np.array(X_csr[:, 1].todense()).ravel()
      3 y = np.array(y > 0, dtype=np.int64)

NameError: name 'X_csr' is not defined
Code
X_csr.shape
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[48], line 1
----> 1 X_csr.shape

NameError: name 'X_csr' is not defined
Code
# We remove the second and fourth column. 
# It actually contain the label we'll want to predict.
kept_cols = list(range(92))
kept_cols.pop(1)
kept_cols.pop(2)
X = X_csr[:, kept_cols]
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[49], line 6
      4 kept_cols.pop(1)
      5 kept_cols.pop(2)
----> 6 X = X_csr[:, kept_cols]

NameError: name 'X_csr' is not defined
Code
X_csr.shape
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[50], line 1
----> 1 X_csr.shape

NameError: name 'X_csr' is not defined

Finally !!

Wow ! That was a lot of work. Now we have a features matrix \(X\) and a vector of labels \(y\).

Code
X.indices
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[51], line 1
----> 1 X.indices

NameError: name 'X' is not defined
Code
X.indptr
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[52], line 1
----> 1 X.indptr

NameError: name 'X' is not defined
Code
X.shape, X.nnz
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[53], line 1
----> 1 X.shape, X.nnz

NameError: name 'X' is not defined
Code
y.shape, y.sum()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[54], line 1
----> 1 y.shape, y.sum()

NameError: name 'y' is not defined

Some learning for this data

Code
from sklearn.preprocessing import MaxAbsScaler
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression

# Normalize the features
X = MaxAbsScaler().fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.3)

clf = LogisticRegression(
    penalty='l2',
    C=1e3,
    solver='lbfgs',
    class_weight='balanced'
)
clf.fit(X_train, y_train)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[55], line 6
      3 from sklearn.linear_model import LogisticRegression
      5 # Normalize the features
----> 6 X = MaxAbsScaler().fit_transform(X)
      7 X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.3)
      9 clf = LogisticRegression(
     10     penalty='l2',
     11     C=1e3,
     12     solver='lbfgs',
     13     class_weight='balanced'
     14 )

NameError: name 'X' is not defined
Code
features_names = features_names.toPandas()['feature_name']
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[56], line 1
----> 1 features_names = features_names.toPandas()['feature_name']

NameError: name 'features_names' is not defined
Code
import matplotlib.pyplot as plt
%matplotlib inline

plt.figure(figsize=(16, 5))
plt.stem(clf.coef_[0], use_line_collection=True)
plt.title('Logistic regression coefficients', fontsize=18)
# We change the fontsize of minor ticks label
_ = plt.xticks(np.arange(clf.coef_[0].shape[0]), features_names, 
           rotation='vertical', fontsize=8)
_ = plt.yticks(fontsize=14)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[57], line 5
      2 get_ipython().run_line_magic('matplotlib', 'inline')
      4 plt.figure(figsize=(16, 5))
----> 5 plt.stem(clf.coef_[0], use_line_collection=True)
      6 plt.title('Logistic regression coefficients', fontsize=18)
      7 # We change the fontsize of minor ticks label

NameError: name 'clf' is not defined
<Figure size 1536x480 with 0 Axes>
Code
from sklearn.metrics import precision_recall_curve, f1_score

precision, recall, _ = precision_recall_curve(y_test, clf.predict_proba(X_test)[:, 1])
    
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, label='LR (F1=%.2f)' % f1_score(y_test, clf.predict(X_test)), lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('Recall', fontsize=16)
plt.ylabel('Precision', fontsize=16)
plt.title('Precision/recall curve', fontsize=18)
plt.legend(loc="upper right", fontsize=14)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[58], line 3
      1 from sklearn.metrics import precision_recall_curve, f1_score
----> 3 precision, recall, _ = precision_recall_curve(y_test, clf.predict_proba(X_test)[:, 1])
      5 plt.figure(figsize=(8, 6))
      6 plt.plot(recall, precision, label='LR (F1=%.2f)' % f1_score(y_test, clf.predict(X_test)), lw=2)

NameError: name 'y_test' is not defined