Code
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executableimport os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executablefrom pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
conf = SparkConf().setAppName("Spark SQL Course")
sc = SparkContext(conf=conf)  # no need for Spark 3...
spark = (SparkSession
    .builder
    .appName("Spark SQL Course")
    .getOrCreate()
)25/04/03 15:10:49 WARN Utils: Your hostname, boucheron-Precision-5480 resolves to a loopback address: 127.0.1.1; using 172.23.32.10 instead (on interface eth0)
25/04/03 15:10:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/03 15:10:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicablesc = spark._scrdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])rdd.reduceByKey(lambda a, b: a + b).collect()[Stage 0:>                                                        (0 + 20) / 20]                                                                                [('b', 1), ('a', 2)]import requests, zipfile, io
from pathlib import Path
path = Path('webdata.parquet')
if not path.exists():
    url = "https://stephanegaiffas.github.io/big_data_course/data/webdata.parquet.zip"
    r = requests.get(url)
    z = zipfile.ZipFile(io.BytesIO(r.content))
    z.extractall(path='./')input_path = './'
input_file = os.path.join(input_path, 'webdata.parquet')
df = spark.read.parquet(input_file)df.head(6)[Row(xid='001ff9b6-5383-4221-812d-58c2c3f234cc', action='O', date=datetime.datetime(2017, 1, 25, 7, 2, 18), website_id='3', url='http://www.8chances.com/grille', category_id=1002.0, zipcode='11370', device='SMP'),
 Row(xid='0056ab7a-3cba-4ed5-a495-3d4abf79ab66', action='O', date=datetime.datetime(2016, 12, 28, 9, 47, 8), website_id='54', url='http://www.salaire-brut-en-net.fr/differences-brut-net/', category_id=1002.0, zipcode='86000', device='DSK'),
 Row(xid='005ae4ab-363a-41a0-b8f9-faee47d622a4', action='O', date=datetime.datetime(2017, 1, 27, 22, 21, 6), website_id='74', url='http://www.realite-virtuelle.com/top-applications-horreur-vr-halloween', category_id=1002.0, zipcode='49700', device='DSK'),
 Row(xid='006f867c-70cb-41f0-82af-f3688fa719c5', action='O', date=datetime.datetime(2016, 12, 20, 12, 45, 14), website_id='43', url='http://www.frenchblues.fr/', category_id=1002.0, zipcode='42660', device='DSK'),
 Row(xid='006f867c-70cb-41f0-82af-f3688fa719c5', action='O', date=datetime.datetime(2016, 12, 20, 12, 56, 50), website_id='43', url='http://www.frenchblues.fr/', category_id=1002.0, zipcode='42660', device='DSK'),
 Row(xid='006f867c-70cb-41f0-82af-f3688fa719c5', action='O', date=datetime.datetime(2016, 12, 20, 12, 56, 53), website_id='43', url='http://www.frenchblues.fr/contact/', category_id=1002.0, zipcode='42660', device='DSK')]df.describe()DataFrame[summary: string, xid: string, action: string, website_id: string, url: string, category_id: string, zipcode: string, device: string]df.count()1179532First we need to import some things
from pyspark.sql import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql.functions import col, litdf.select('xid').distinct().count()473761xid_partition = Window.partitionBy('xid')
n_events = func.count(col('action')).over(xid_partition)
df = df.withColumn('n_events', n_events)
df.head(n=2)[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', n_events=1),
 Row(xid='000893c8-a14b-4f33-858f-210440f37def', action='O', date=datetime.datetime(2016, 12, 23, 16, 18, 37), website_id='56', url='http://blague.dumatin.fr/', category_id=1002.0, zipcode=None, device='DSK', n_events=1)]df.groupBy('xid').agg(func.count('action')).head(5)[Row(xid='001c4a21-52c6-4890-b6ce-2b9d4ba06a56', count(action)=1),
 Row(xid='0024344b-7ee2-4fcd-a0b4-bec26d8c8b0e', count(action)=4),
 Row(xid='004564e3-87c1-4e16-ad2c-0e96afc3d617', count(action)=1),
 Row(xid='006d807f-91c3-415a-bb5e-6b9f7e6517a1', count(action)=1),
 Row(xid='006e0463-b24c-4996-84ab-d6d0d65a52aa', count(action)=1)]xid_partition = Window.partitionBy('xid')
max_date = func.max(col('date')).over(xid_partition)
n_days_since_last_event = func.datediff(func.current_date(), max_date)
df = df.withColumn('n_days_since_last_event',
                   n_days_since_last_event)
df.head(n=2)[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', n_events=1, n_days_since_last_event=3020),
 Row(xid='000893c8-a14b-4f33-858f-210440f37def', action='O', date=datetime.datetime(2016, 12, 23, 16, 18, 37), website_id='56', url='http://blague.dumatin.fr/', category_id=1002.0, zipcode=None, device='DSK', n_events=1, n_days_since_last_event=3023)]xid_device_partition = Window.partitionBy('xid', 'device')
n_events_per_device = func.count(col('action')).over(xid_device_partition)
df = df.withColumn('n_events_per_device', n_events_per_device)
df.head(n=2)[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', n_events=1, n_days_since_last_event=3020, n_events_per_device=1),
 Row(xid='000893c8-a14b-4f33-858f-210440f37def', action='O', date=datetime.datetime(2016, 12, 23, 16, 18, 37), website_id='56', url='http://blague.dumatin.fr/', category_id=1002.0, zipcode=None, device='DSK', n_events=1, n_days_since_last_event=3023, n_events_per_device=1)]xid_partition = Window.partitionBy('xid')
rank_device = func.dense_rank().over(xid_partition.orderBy('device'))
n_unique_device = func.last(rank_device).over(xid_partition)
df = df.withColumn('n_device', n_unique_device)
df.head(n=2)[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', n_events=1, n_days_since_last_event=3020, n_events_per_device=1, n_device=1),
 Row(xid='000893c8-a14b-4f33-858f-210440f37def', action='O', date=datetime.datetime(2016, 12, 23, 16, 18, 37), website_id='56', url='http://blague.dumatin.fr/', category_id=1002.0, zipcode=None, device='DSK', n_events=1, n_days_since_last_event=3023, n_events_per_device=1, n_device=1)]df\
    .where(col('n_device') > 1)\
    .select('xid', 'device', 'n_events',  'n_device', 'n_events_per_device')\
    .head(n=8)[Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
 Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
 Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
 Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
 Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
 Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='MOB', n_events=6, n_device=2, n_events_per_device=1),
 Row(xid='78156cdf-7229-46eb-bb6b-92d384f9a6fa', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
 Row(xid='78156cdf-7229-46eb-bb6b-92d384f9a6fa', device='DSK', n_events=6, n_device=2, n_events_per_device=5)]We construct a ETL (Extract Transform Load) process on this data using the pyspark.sql API.
Extraction is easy here, it’s just about reading the data
df = spark.read.parquet(input_file)
df.head(n=3)[Row(xid='001ff9b6-5383-4221-812d-58c2c3f234cc', action='O', date=datetime.datetime(2017, 1, 25, 7, 2, 18), website_id='3', url='http://www.8chances.com/grille', category_id=1002.0, zipcode='11370', device='SMP'),
 Row(xid='0056ab7a-3cba-4ed5-a495-3d4abf79ab66', action='O', date=datetime.datetime(2016, 12, 28, 9, 47, 8), website_id='54', url='http://www.salaire-brut-en-net.fr/differences-brut-net/', category_id=1002.0, zipcode='86000', device='DSK'),
 Row(xid='005ae4ab-363a-41a0-b8f9-faee47d622a4', action='O', date=datetime.datetime(2017, 1, 27, 22, 21, 6), website_id='74', url='http://www.realite-virtuelle.com/top-applications-horreur-vr-halloween', category_id=1002.0, zipcode='49700', device='DSK')]At this step we compute a lot of extra things from the data. The aim is to build features that describe users.
def n_events_transformer(df):
    xid_partition = Window.partitionBy('xid')
    n_events = func.count(col('action')).over(xid_partition)
    df = df.withColumn('n_events', n_events)
    return df
def n_events_per_action_transformer(df):
    xid_action_partition = Window.partitionBy('xid', 'action')
    n_events_per_action = func.count(col('action')).over(xid_action_partition)
    df = df.withColumn('n_events_per_action', n_events_per_action)
    return df
def hour_transformer(df):
    hour = func.hour(col('date'))
    df = df.withColumn('hour', hour)
    return df
def weekday_transformer(df):
    weekday = func.date_format(col('date'), 'EEEE')
    df = df.withColumn('weekday', weekday)
    return df
def n_events_per_hour_transformer(df):
    xid_hour_partition = Window.partitionBy('xid', 'hour')
    n_events_per_hour = func.count(col('action')).over(xid_hour_partition)
    df = df.withColumn('n_events_per_hour', n_events_per_hour)
    return df
def n_events_per_weekday_transformer(df):
    xid_weekday_partition = Window.partitionBy('xid', 'weekday')
    n_events_per_weekday = func.count(col('action')).over(xid_weekday_partition)
    df = df.withColumn('n_events_per_weekday', n_events_per_weekday)
    return df
def n_days_since_last_event_transformer(df):
    xid_partition = Window.partitionBy('xid')
    max_date = func.max(col('date')).over(xid_partition)
    n_days_since_last_event = func.datediff(func.current_date(), max_date)
    df = df.withColumn('n_days_since_last_event',
                       n_days_since_last_event + lit(0.1))
    return df
def n_days_since_last_action_transformer(df):
    xid_partition_action = Window.partitionBy('xid', 'action')
    max_date = func.max(col('date')).over(xid_partition_action)
    n_days_since_last_action = func.datediff(func.current_date(),
                                                        max_date)
    df = df.withColumn('n_days_since_last_action',
                       n_days_since_last_action + lit(0.1))
    return df
def n_unique_day_transformer(df):
    xid_partition = Window.partitionBy('xid')
    dayofyear = func.dayofyear(col('date'))
    rank_day = func.dense_rank().over(xid_partition.orderBy(dayofyear))
    n_unique_day = func.last(rank_day).over(xid_partition)
    df = df.withColumn('n_unique_day', n_unique_day)
    return df
def n_unique_hour_transformer(df):
    xid_partition = Window.partitionBy('xid')
    rank_hour = func.dense_rank().over(xid_partition.orderBy('hour'))
    n_unique_hour = func.last(rank_hour).over(xid_partition)
    df = df.withColumn('n_unique_hour', n_unique_hour)
    return df
def n_events_per_device_transformer(df):
    xid_device_partition = Window.partitionBy('xid', 'device')
    n_events_per_device = func.count(func.col('device')) \
        .over(xid_device_partition)
    df = df.withColumn('n_events_per_device', n_events_per_device)
    return df
def n_unique_device_transformer(df):
    xid_partition = Window.partitionBy('xid')
    rank_device = func.dense_rank().over(xid_partition.orderBy('device'))
    n_unique_device = func.last(rank_device).over(xid_partition)
    df = df.withColumn('n_device', n_unique_device)
    return df
def n_actions_per_category_id_transformer(df):
    xid_category_id_partition = Window.partitionBy('xid', 'category_id',
                                                   'action')
    n_actions_per_category_id = func.count(func.col('action')) \
        .over(xid_category_id_partition)
    df = df.withColumn('n_actions_per_category_id', n_actions_per_category_id)
    return df
def n_unique_category_id_transformer(df):
    xid_partition = Window.partitionBy('xid')
    rank_category_id = func.dense_rank().over(xid_partition\
                                              .orderBy('category_id'))
    n_unique_category_id = func.last(rank_category_id).over(xid_partition)
    df = df.withColumn('n_unique_category_id', n_unique_category_id)
    return df
def n_events_per_category_id_transformer(df):
    xid_category_id_partition = Window.partitionBy('xid', 'category_id')
    n_events_per_category_id = func.count(func.col('action')) \
        .over(xid_category_id_partition)
    df = df.withColumn('n_events_per_category_id', n_events_per_category_id)
    return df
def n_events_per_website_id_transformer(df):
    xid_website_id_partition = Window.partitionBy('xid', 'website_id')
    n_events_per_website_id = func.count(col('action'))\
        .over(xid_website_id_partition)
    df = df.withColumn('n_events_per_website_id', n_events_per_website_id)
    return dftransformers = [
    hour_transformer,
    weekday_transformer,
    n_events_per_hour_transformer,
    n_events_per_weekday_transformer,
    n_days_since_last_event_transformer,
    n_days_since_last_action_transformer,
    n_unique_day_transformer,
    n_unique_hour_transformer,
    n_events_per_device_transformer,
    n_unique_device_transformer,
    n_actions_per_category_id_transformer,
    n_events_per_category_id_transformer,
    n_events_per_website_id_transformer,
]
for transformer in transformers:
    df = transformer(df)
df.head(n=1)[Stage 35:>                                                         (0 + 1) / 1]                                                                                [Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', hour=13, weekday='Monday', n_events_per_hour=1, n_events_per_weekday=1, n_days_since_last_event=3020.1, n_days_since_last_action=3020.1, n_unique_day=1, n_unique_hour=1, n_events_per_device=1, n_device=1, n_actions_per_category_id=1, n_events_per_category_id=1, n_events_per_website_id=1)]sorted(df.columns)['action',
 'category_id',
 'date',
 'device',
 'hour',
 'n_actions_per_category_id',
 'n_days_since_last_action',
 'n_days_since_last_event',
 'n_device',
 'n_events_per_category_id',
 'n_events_per_device',
 'n_events_per_hour',
 'n_events_per_website_id',
 'n_events_per_weekday',
 'n_unique_day',
 'n_unique_hour',
 'url',
 'website_id',
 'weekday',
 'xid',
 'zipcode']Here, we use all the previous computations (saved in the columns of the dataframe) to compute aggregated informations about each user.
def n_events_per_hour_loader(df):
    csr = df\
        .select('xid', 'hour', 'n_events_per_hour')\
        .withColumnRenamed('n_events_per_hour', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_hour#'), col('hour'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('hour')
    return csr
def n_events_per_website_id_loader(df):
    csr = df.select('xid', 'website_id', 'n_events_per_website_id')\
        .withColumnRenamed('n_events_per_hour', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_website_id#'),
                               col('website_id'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('website_id')
    return csr
def n_events_per_hour_loader(df):
    csr = df\
        .select('xid', 'hour', 'n_events_per_hour')\
        .withColumnRenamed('n_events_per_hour', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_hour#'), col('hour'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('hour')
    return csr
def n_events_per_weekday_loader(df):
    csr = df\
        .select('xid', 'weekday', 'n_events_per_weekday')\
        .withColumnRenamed('n_events_per_weekday', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_weekday#'), col('weekday'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('weekday')
    return csr
def n_days_since_last_event_loader(df):
    csr = df.select('xid',  'n_days_since_last_event')\
        .withColumnRenamed('n_days_since_last_event#', 'value')\
        .distinct()
    feature_name = lit('n_days_since_last_event')
    csr = csr\
        .withColumn('feature_name', feature_name)
    return csr
def n_days_since_last_action_loader(df):
    csr = df.select('xid', 'action', 'n_days_since_last_action')\
        .withColumnRenamed('n_days_since_last_action', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_days_since_last_action#'), col('action'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('action')
    return csr
def n_unique_day_loader(df):
    csr = df.select('xid', 'n_unique_day')\
        .withColumnRenamed('n_unique_day', 'value')\
        .distinct()
    feature_name = lit('n_unique_day')
    csr = csr\
        .withColumn('feature_name', feature_name)
    return csr
def n_unique_hour_loader(df):
    csr = df.select('xid', 'n_unique_hour')\
        .withColumnRenamed('n_unique_hour', 'value')\
        .distinct()
    feature_name = lit('n_unique_hour')
    csr = csr\
        .withColumn('feature_name', feature_name)
    return csr
def n_events_per_device_loader(df):
    csr = df\
        .select('xid', 'device', 'n_events_per_device')\
        .withColumnRenamed('n_events_per_device', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_device#'), col('device'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('device')
    return csr
def n_unique_device_loader(df):
    csr = df.select('xid', 'n_device')\
        .withColumnRenamed('n_device', 'value')\
        .distinct()
    feature_name = lit('n_device')
    csr = csr\
        .withColumn('feature_name', feature_name)
    return csr
def n_events_per_category_id_loader(df):
    csr = df.select('xid', 'category_id', 'n_events_per_category_id')\
        .withColumnRenamed('n_events_per_category_id', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_category_id#'),
                               col('category_id'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('category_id')
    return csr
def n_actions_per_category_id_loader(df):
    csr = df.select('xid', 'category_id', 'action', 'n_actions_per_category_id')\
        .withColumnRenamed('n_actions_per_category_id', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_actions_per_category_id#'),
                               col('action'), lit('#'), 
                               col('category_id'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('category_id')\
        .drop('action')
    return csr
def n_events_per_website_id_loader(df):
    csr = df.select('xid', 'website_id', 'n_events_per_website_id')\
        .withColumnRenamed('n_events_per_website_id', 'value')\
        .distinct()
    feature_name = func.concat(lit('n_events_per_website_id#'),
                               col('website_id'))
    csr = csr\
        .withColumn('feature_name', feature_name)\
        .drop('website_id')
    return csrfrom functools import reduce
loaders = [
    n_events_per_hour_loader,
    n_events_per_website_id_loader,
    n_events_per_hour_loader,
    n_events_per_weekday_loader,
    n_days_since_last_event_loader,
    n_days_since_last_action_loader,
    n_unique_day_loader,
    n_unique_hour_loader,
    n_events_per_device_loader,
    n_unique_device_loader,
    n_events_per_category_id_loader,
    n_actions_per_category_id_loader,
    n_events_per_website_id_loader,
]
def union(df, other):
    return df.union(other)
csr = reduce(
    lambda df1, df2: df1.union(df2),
    [loader(df) for loader in loaders]
)
csr.head(n=3)[Stage 38:=>(10 + 2) / 12][Stage 39:==>(9 + 3) / 12][Stage 40:=> (6 + 6) / 12][Stage 40:=> (6 + 6) / 12][Stage 41:=> (6 + 6) / 12][Stage 42:=> (6 + 6) / 12][Stage 43:==>(8 + 4) / 12][Stage 44:=> (6 + 6) / 12][Stage 45:>  (3 + 9) / 12][Stage 44:=>(10 + 2) / 12][Stage 45:=> (6 + 6) / 12][Stage 46:=> (6 + 6) / 12][Stage 48:>               (0 + 19) / 19][Stage 50:>                (1 + 1) / 20][Stage 50:==============>                                         (5 + 15) / 20][Stage 64:>                                                         (0 + 1) / 1]                                                                                [Row(xid='000095cc-9a61-49b5-8ad5-83442daa93d6', value=2.0, feature_name='n_events_per_hour#21'),
 Row(xid='0000fa20-47ca-4548-82e9-78d81aa83fba', value=1.0, feature_name='n_events_per_hour#23'),
 Row(xid='00010386-a996-48ad-9888-4df5440188f2', value=1.0, feature_name='n_events_per_hour#21')]csr.columns['xid', 'value', 'feature_name']csr.count()[Stage 65:=>(10 + 2) / 12][Stage 67:=> (7 + 5) / 12][Stage 68:==>(9 + 3) / 12][Stage 69:=========>       (7 + 5) / 12][Stage 70:===========>     (8 + 4) / 12][Stage 72:=> (7 + 5) / 12][Stage 73:=> (6 + 6) / 12][Stage 74:=> (6 + 6) / 12][Stage 75:==========================================>              (9 + 3) / 12]25/04/03 15:11:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
[Stage 93:>                                                       (0 + 20) / 20]25/04/03 15:11:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.[19,167s][warning][gc,alloc] Executor task launch worker for task 7.0 in stage 93.0 (TID 527): Retried waiting for GCLocker too often allocating 524290 words
[19,195s][warning][gc,alloc] Executor task launch worker for task 9.0 in stage 93.0 (TID 529): Retried waiting for GCLocker too often allocating 524290 words
[19,195s][warning][gc,alloc] Executor task launch worker for task 7.0 in stage 93.0 (TID 527): Retried waiting for GCLocker too often allocating 524290 words
[19,214s][warning][gc,alloc] Executor task launch worker for task 7.0 in stage 93.0 (TID 527): Retried waiting for GCLocker too often allocating 486289 words
[19,215s][warning][gc,alloc] Executor task launch worker for task 9.0 in stage 93.0 (TID 529): Retried waiting for GCLocker too often allocating 131074 words
[19,215s][warning][gc,alloc] Executor task launch worker for task 1.0 in stage 93.0 (TID 521): Retried waiting for GCLocker too often allocating 131074 words
[19,218s][warning][gc,alloc] Executor task launch worker for task 3.0 in stage 93.0 (TID 523): Retried waiting for GCLocker too often allocating 524290 words
[19,225s][warning][gc,alloc] Executor task launch worker for task 3.0 in stage 93.0 (TID 523): Retried waiting for GCLocker too often allocating 131074 words25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (3890290 bytes), try again.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/04/03 15:11:08 ERROR Executor: Exception in task 9.0 in stage 93.0 (TID 529)
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR Executor: Exception in task 3.0 in stage 93.0 (TID 523)
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR Executor: Exception in task 1.0 in stage 93.0 (TID 521)
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 1.0 in stage 93.0 (TID 521),5,main]
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker for task 9.0 in stage 93.0 (TID 529),5,main]
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker for task 3.0 in stage 93.0 (TID 523),5,main]
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 WARN TaskSetManager: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR TaskSetManager: Task 1 in stage 93.0 failed 1 times; aborting job
25/04/03 15:11:08 WARN TaskSetManager: Lost task 18.0 in stage 93.0 (TID 538) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 14.0 in stage 93.0 (TID 534) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 4.0 in stage 93.0 (TID 524) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 15.0 in stage 93.0 (TID 535) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 12.0 in stage 93.0 (TID 532) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 5.0 in stage 93.0 (TID 525) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 7.0 in stage 93.0 (TID 527) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 0.0 in stage 93.0 (TID 520) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 2.0 in stage 93.0 (TID 522) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 17.0 in stage 93.0 (TID 537) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 13.0 in stage 93.0 (TID 533) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 8.0 in stage 93.0 (TID 528) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 19.0 in stage 93.0 (TID 539) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 11.0 in stage 93.0 (TID 531) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 6.0 in stage 93.0 (TID 526) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 16.0 in stage 93.0 (TID 536) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 10.0 in stage 93.0 (TID 530) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/IPython/core/interactiveshell.py", line 3577, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_87987/3594614522.py", line 1, in <module>
    csr.count()
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py", line 1240, in count
    return int(self._jdf.count())
               ^^^^^^^^^^^^^^^^^
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [Errno 104] Connection reset by peer
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) [... skipping hidden 1 frame] Cell In[26], line 1 ----> 1 csr.count() File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:1240, in DataFrame.count(self) 1218 """Returns the number of rows in this :class:`DataFrame`. 1219 1220 .. versionadded:: 1.3.0 (...) 1238 3 1239 """ -> 1240 return int(self._jdf.count()) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw) 178 try: --> 179 return f(*a, **kw) 180 except Py4JJavaError as e: File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: <class 'str'>: (<class 'ConnectionRefusedError'>, ConnectionRefusedError(111, 'Connection refused')) During handling of the above exception, another exception occurred: ConnectionRefusedError Traceback (most recent call last) [... skipping hidden 1 frame] File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/IPython/core/interactiveshell.py:2179, in InteractiveShell.showtraceback(self, exc_tuple, filename, tb_offset, exception_only, running_compiled_code) 2176 traceback.print_exc() 2177 return None -> 2179 self._showtraceback(etype, value, stb) 2180 if self.call_pdb: 2181 # drop into debugger 2182 self.debugger(force=True) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/ipykernel/zmqshell.py:559, in ZMQInteractiveShell._showtraceback(self, etype, evalue, stb) 553 sys.stdout.flush() 554 sys.stderr.flush() 556 exc_content = { 557 "traceback": stb, 558 "ename": str(etype.__name__), --> 559 "evalue": str(evalue), 560 } 562 dh = self.displayhook 563 # Send exception info over pub socket for other clients than the caller 564 # to pick up File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/protocol.py:471, in Py4JJavaError.__str__(self) 469 def __str__(self): 470 gateway_client = self.java_exception._gateway_client --> 471 answer = gateway_client.send_command(self.exception_cmd) 472 return_value = get_return_value(answer, gateway_client, None, None) 473 # Note: technically this should return a bytestring 'str' rather than 474 # unicodes in Python 2; however, it can return unicodes for now. 475 # See https://github.com/bartdag/py4j/issues/306 for more details. File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
# Replace features names and xid by a unique number
feature_name_partition = Window().orderBy('feature_name')
xid_partition = Window().orderBy('xid')
col_idx = func.dense_rank().over(feature_name_partition)
row_idx = func.dense_rank().over(xid_partition)
csr = csr.withColumn('col', col_idx)\
    .withColumn('row', row_idx)
csr = csr.na.drop('any')
csr.head(n=5)--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[27], line 2 1 # Replace features names and xid by a unique number ----> 2 feature_name_partition = Window().orderBy('feature_name') 3 xid_partition = Window().orderBy('xid') 5 col_idx = func.dense_rank().over(feature_name_partition) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/utils.py:222, in try_remote_window.<locals>.wrapped(*args, **kwargs) 220 return getattr(Window, f.__name__)(*args, **kwargs) 221 else: --> 222 return f(*args, **kwargs) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/window.py:186, in Window.orderBy(*cols) 137 """ 138 Creates a :class:`WindowSpec` with the ordering defined. 139 (...) 183 +---+--------+----------+ 184 """ 185 sc = get_active_spark_context() --> 186 jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.orderBy( 187 _to_java_cols(cols) 188 ) 189 return WindowSpec(jspec) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name) 1709 if name == UserHelpAutoCompletion.KEY: 1710 return UserHelpAutoCompletion() -> 1712 answer = self._gateway_client.send_command( 1713 proto.REFLECTION_COMMAND_NAME + 1714 proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id + 1715 "\n" + proto.END_COMMAND_PART) 1716 if answer == proto.SUCCESS_PACKAGE: 1717 return JavaPackage(name, self._gateway_client, jvm_id=self._id) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
# Let's save the result of our hard work into a new parquet file
output_path = './'
output_file = os.path.join(output_path, 'csr.parquet')
csr.write.parquet(output_file, mode='overwrite')--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[28], line 4 2 output_path = './' 3 output_file = os.path.join(output_path, 'csr.parquet') ----> 4 csr.write.parquet(output_file, mode='overwrite') File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:509, in DataFrame.write(self) 482 @property 483 def write(self) -> DataFrameWriter: 484 """ 485 Interface for saving the content of the non-streaming :class:`DataFrame` out into external 486 storage. (...) 507 >>> _ = spark.sql("DROP TABLE tab2") 508 """ --> 509 return DataFrameWriter(self) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:964, in DataFrameWriter.__init__(self, df) 962 self._df = df 963 self._spark = df.sparkSession --> 964 self._jwrite = df._jdf.write() File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args) 1314 args_command, temp_args = self._build_args(*args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART -> 1321 answer = self.gateway_client.send_command(command) 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
csr_path = './'
csr_file = os.path.join(csr_path, 'csr.parquet')
df = spark.read.parquet(csr_file)
df.head(n=5)--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[29], line 4 1 csr_path = './' 2 csr_file = os.path.join(csr_path, 'csr.parquet') ----> 4 df = spark.read.parquet(csr_file) 5 df.head(n=5) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/session.py:1706, in SparkSession.read(self) 1669 @property 1670 def read(self) -> DataFrameReader: 1671 """ 1672 Returns a :class:`DataFrameReader` that can be used to read data 1673 in as a :class:`DataFrame`. (...) 1704 +---+------------+ 1705 """ -> 1706 return DataFrameReader(self) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:70, in DataFrameReader.__init__(self, spark) 69 def __init__(self, spark: "SparkSession"): ---> 70 self._jreader = spark._jsparkSession.read() 71 self._spark = spark File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args) 1314 args_command, temp_args = self._build_args(*args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART -> 1321 answer = self.gateway_client.send_command(command) 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
df.count()--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[30], line 1 ----> 1 df.count() File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:1240, in DataFrame.count(self) 1217 def count(self) -> int: 1218 """Returns the number of rows in this :class:`DataFrame`. 1219 1220 .. versionadded:: 1.3.0 (...) 1238 3 1239 """ -> 1240 return int(self._jdf.count()) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args) 1314 args_command, temp_args = self._build_args(*args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART -> 1321 answer = self.gateway_client.send_command(command) 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
# What are the features related to campaign_id 1204 ?
features_names = \
    df.select('feature_name')\
    .distinct()\
    .toPandas()['feature_name']--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[31], line 3 1 # What are the features related to campaign_id 1204 ? 2 features_names = \ ----> 3 df.select('feature_name')\ 4 .distinct()\ 5 .toPandas()['feature_name'] File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:3229, in DataFrame.select(self, *cols) 3184 def select(self, *cols: "ColumnOrName") -> "DataFrame": # type: ignore[misc] 3185 """Projects a set of expressions and returns a new :class:`DataFrame`. 3186 3187 .. versionadded:: 1.3.0 (...) 3227 +-----+---+ 3228 """ -> 3229 jdf = self._jdf.select(self._jcols(*cols)) 3230 return DataFrame(jdf, self.sparkSession) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2766, in DataFrame._jcols(self, *cols) 2764 if len(cols) == 1 and isinstance(cols[0], list): 2765 cols = cols[0] -> 2766 return self._jseq(cols, _to_java_column) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2753, in DataFrame._jseq(self, cols, converter) 2747 def _jseq( 2748 self, 2749 cols: Sequence, 2750 converter: Optional[Callable[..., Union["PrimitiveType", JavaObject]]] = None, 2751 ) -> JavaObject: 2752 """Return a JVM Seq of Columns from a list of Column or names""" -> 2753 return _to_seq(self.sparkSession._sc, cols, converter) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:88, in _to_seq(sc, cols, converter) 81 """ 82 Convert a list of Columns (or names) into a JVM Seq of Column. 83 84 An optional `converter` could be used to convert items in `cols` 85 into JVM Column objects. 86 """ 87 if converter: ---> 88 cols = [converter(c) for c in cols] 89 assert sc._jvm is not None 90 return sc._jvm.PythonUtils.toSeq(cols) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:63, in _to_java_column(col) 61 jcol = col._jc 62 elif isinstance(col, str): ---> 63 jcol = _create_column_from_name(col) 64 else: 65 raise PySparkTypeError( 66 error_class="NOT_COLUMN_OR_STR", 67 message_parameters={"arg_name": "col", "arg_type": type(col).__name__}, 68 ) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:56, in _create_column_from_name(name) 54 def _create_column_from_name(name: str) -> "Column": 55 sc = get_active_spark_context() ---> 56 return cast(JVMView, sc._jvm).functions.col(name) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name) 1709 if name == UserHelpAutoCompletion.KEY: 1710 return UserHelpAutoCompletion() -> 1712 answer = self._gateway_client.send_command( 1713 proto.REFLECTION_COMMAND_NAME + 1714 proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id + 1715 "\n" + proto.END_COMMAND_PART) 1716 if answer == proto.SUCCESS_PACKAGE: 1717 return JavaPackage(name, self._gateway_client, jvm_id=self._id) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
features_names--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[32], line 1 ----> 1 features_names NameError: name 'features_names' is not defined
[feature_name for feature_name in features_names if '1204' in feature_name]--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[33], line 1 ----> 1 [feature_name for feature_name in features_names if '1204' in feature_name] NameError: name 'features_names' is not defined
# Look for the xid that have at least one exposure to campaign 1204
keep = func.when(
    (col('feature_name') == 'n_actions_per_category_id#C#1204.0') |
    (col('feature_name') == 'n_actions_per_category_id#O#1204.0'),
    1).otherwise(0)
df = df.withColumn('keep', keep)
df.where(col('keep') > 0).count()--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[34], line 3 1 # Look for the xid that have at least one exposure to campaign 1204 2 keep = func.when( ----> 3 (col('feature_name') == 'n_actions_per_category_id#C#1204.0') | 4 (col('feature_name') == 'n_actions_per_category_id#O#1204.0'), 5 1).otherwise(0) 6 df = df.withColumn('keep', keep) 8 df.where(col('keep') > 0).count() File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/utils.py:174, in try_remote_functions.<locals>.wrapped(*args, **kwargs) 172 return getattr(functions, f.__name__)(*args, **kwargs) 173 else: --> 174 return f(*args, **kwargs) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:223, in col(col) 196 @try_remote_functions 197 def col(col: str) -> Column: 198 """ 199 Returns a :class:`~pyspark.sql.Column` based on the given column name. 200 (...) 221 Column<'x'> 222 """ --> 223 return _invoke_function("col", col) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:96, in _invoke_function(name, *args) 91 """ 92 Invokes JVM function identified by name with args 93 and wraps the result with :class:`~pyspark.sql.Column`. 94 """ 95 assert SparkContext._active_spark_context is not None ---> 96 jf = _get_jvm_function(name, SparkContext._active_spark_context) 97 return Column(jf(*args)) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:87, in _get_jvm_function(name, sc) 82 """ 83 Retrieves JVM function identified by name from 84 Java gateway associated with sc. 85 """ 86 assert sc._jvm is not None ---> 87 return getattr(sc._jvm.functions, name) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name) 1709 if name == UserHelpAutoCompletion.KEY: 1710 return UserHelpAutoCompletion() -> 1712 answer = self._gateway_client.send_command( 1713 proto.REFLECTION_COMMAND_NAME + 1714 proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id + 1715 "\n" + proto.END_COMMAND_PART) 1716 if answer == proto.SUCCESS_PACKAGE: 1717 return JavaPackage(name, self._gateway_client, jvm_id=self._id) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
# Sum of the keeps :)
xid_partition = Window.partitionBy('xid')
sum_keep = func.sum(col('keep')).over(xid_partition)
df = df.withColumn('sum_keep', sum_keep)--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[35], line 2 1 # Sum of the keeps :) ----> 2 xid_partition = Window.partitionBy('xid') 3 sum_keep = func.sum(col('keep')).over(xid_partition) 4 df = df.withColumn('sum_keep', sum_keep) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/utils.py:222, in try_remote_window.<locals>.wrapped(*args, **kwargs) 220 return getattr(Window, f.__name__)(*args, **kwargs) 221 else: --> 222 return f(*args, **kwargs) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/window.py:129, in Window.partitionBy(*cols) 80 """ 81 Creates a :class:`WindowSpec` with the partitioning defined. 82 (...) 126 +---+--------+----------+ 127 """ 128 sc = get_active_spark_context() --> 129 jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.partitionBy( 130 _to_java_cols(cols) 131 ) 132 return WindowSpec(jspec) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name) 1709 if name == UserHelpAutoCompletion.KEY: 1710 return UserHelpAutoCompletion() -> 1712 answer = self._gateway_client.send_command( 1713 proto.REFLECTION_COMMAND_NAME + 1714 proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id + 1715 "\n" + proto.END_COMMAND_PART) 1716 if answer == proto.SUCCESS_PACKAGE: 1717 return JavaPackage(name, self._gateway_client, jvm_id=self._id) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
# Let's keep the xid exposed to 1204
df = df.where(col('sum_keep') > 0)--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[36], line 2 1 # Let's keep the xid exposed to 1204 ----> 2 df = df.where(col('sum_keep') > 0) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/utils.py:174, in try_remote_functions.<locals>.wrapped(*args, **kwargs) 172 return getattr(functions, f.__name__)(*args, **kwargs) 173 else: --> 174 return f(*args, **kwargs) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:223, in col(col) 196 @try_remote_functions 197 def col(col: str) -> Column: 198 """ 199 Returns a :class:`~pyspark.sql.Column` based on the given column name. 200 (...) 221 Column<'x'> 222 """ --> 223 return _invoke_function("col", col) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:96, in _invoke_function(name, *args) 91 """ 92 Invokes JVM function identified by name with args 93 and wraps the result with :class:`~pyspark.sql.Column`. 94 """ 95 assert SparkContext._active_spark_context is not None ---> 96 jf = _get_jvm_function(name, SparkContext._active_spark_context) 97 return Column(jf(*args)) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:87, in _get_jvm_function(name, sc) 82 """ 83 Retrieves JVM function identified by name from 84 Java gateway associated with sc. 85 """ 86 assert sc._jvm is not None ---> 87 return getattr(sc._jvm.functions, name) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name) 1709 if name == UserHelpAutoCompletion.KEY: 1710 return UserHelpAutoCompletion() -> 1712 answer = self._gateway_client.send_command( 1713 proto.REFLECTION_COMMAND_NAME + 1714 proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id + 1715 "\n" + proto.END_COMMAND_PART) 1716 if answer == proto.SUCCESS_PACKAGE: 1717 return JavaPackage(name, self._gateway_client, jvm_id=self._id) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
df.count()--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[37], line 1 ----> 1 df.count() File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:1240, in DataFrame.count(self) 1217 def count(self) -> int: 1218 """Returns the number of rows in this :class:`DataFrame`. 1219 1220 .. versionadded:: 1.3.0 (...) 1238 3 1239 """ -> 1240 return int(self._jdf.count()) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args) 1314 args_command, temp_args = self._build_args(*args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART -> 1321 answer = self.gateway_client.send_command(command) 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
df.select('xid').distinct().count()--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[38], line 1 ----> 1 df.select('xid').distinct().count() File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:3229, in DataFrame.select(self, *cols) 3184 def select(self, *cols: "ColumnOrName") -> "DataFrame": # type: ignore[misc] 3185 """Projects a set of expressions and returns a new :class:`DataFrame`. 3186 3187 .. versionadded:: 1.3.0 (...) 3227 +-----+---+ 3228 """ -> 3229 jdf = self._jdf.select(self._jcols(*cols)) 3230 return DataFrame(jdf, self.sparkSession) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2766, in DataFrame._jcols(self, *cols) 2764 if len(cols) == 1 and isinstance(cols[0], list): 2765 cols = cols[0] -> 2766 return self._jseq(cols, _to_java_column) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2753, in DataFrame._jseq(self, cols, converter) 2747 def _jseq( 2748 self, 2749 cols: Sequence, 2750 converter: Optional[Callable[..., Union["PrimitiveType", JavaObject]]] = None, 2751 ) -> JavaObject: 2752 """Return a JVM Seq of Columns from a list of Column or names""" -> 2753 return _to_seq(self.sparkSession._sc, cols, converter) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:88, in _to_seq(sc, cols, converter) 81 """ 82 Convert a list of Columns (or names) into a JVM Seq of Column. 83 84 An optional `converter` could be used to convert items in `cols` 85 into JVM Column objects. 86 """ 87 if converter: ---> 88 cols = [converter(c) for c in cols] 89 assert sc._jvm is not None 90 return sc._jvm.PythonUtils.toSeq(cols) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:63, in _to_java_column(col) 61 jcol = col._jc 62 elif isinstance(col, str): ---> 63 jcol = _create_column_from_name(col) 64 else: 65 raise PySparkTypeError( 66 error_class="NOT_COLUMN_OR_STR", 67 message_parameters={"arg_name": "col", "arg_type": type(col).__name__}, 68 ) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:56, in _create_column_from_name(name) 54 def _create_column_from_name(name: str) -> "Column": 55 sc = get_active_spark_context() ---> 56 return cast(JVMView, sc._jvm).functions.col(name) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name) 1709 if name == UserHelpAutoCompletion.KEY: 1710 return UserHelpAutoCompletion() -> 1712 answer = self._gateway_client.send_command( 1713 proto.REFLECTION_COMMAND_NAME + 1714 proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id + 1715 "\n" + proto.END_COMMAND_PART) 1716 if answer == proto.SUCCESS_PACKAGE: 1717 return JavaPackage(name, self._gateway_client, jvm_id=self._id) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
row_partition = Window().orderBy('row')
col_partition = Window().orderBy('col')
row_new = func.dense_rank().over(row_partition)
col_new = func.dense_rank().over(col_partition)
df = df.withColumn('row_new', row_new)
df = df.withColumn('col_new', col_new)
csr_data = df.select('row_new', 'col_new', 'value').toPandas()--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[39], line 1 ----> 1 row_partition = Window().orderBy('row') 2 col_partition = Window().orderBy('col') 3 row_new = func.dense_rank().over(row_partition) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/utils.py:222, in try_remote_window.<locals>.wrapped(*args, **kwargs) 220 return getattr(Window, f.__name__)(*args, **kwargs) 221 else: --> 222 return f(*args, **kwargs) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/window.py:186, in Window.orderBy(*cols) 137 """ 138 Creates a :class:`WindowSpec` with the ordering defined. 139 (...) 183 +---+--------+----------+ 184 """ 185 sc = get_active_spark_context() --> 186 jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.orderBy( 187 _to_java_cols(cols) 188 ) 189 return WindowSpec(jspec) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name) 1709 if name == UserHelpAutoCompletion.KEY: 1710 return UserHelpAutoCompletion() -> 1712 answer = self._gateway_client.send_command( 1713 proto.REFLECTION_COMMAND_NAME + 1714 proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id + 1715 "\n" + proto.END_COMMAND_PART) 1716 if answer == proto.SUCCESS_PACKAGE: 1717 return JavaPackage(name, self._gateway_client, jvm_id=self._id) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
csr_data.head()--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[40], line 1 ----> 1 csr_data.head() NameError: name 'csr_data' is not defined
features_names = df.select('feature_name', 'col_new').distinct()
features_names.where(col('feature_name') == 'n_actions_per_category_id#C#1204.0').head()--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[41], line 1 ----> 1 features_names = df.select('feature_name', 'col_new').distinct() 2 features_names.where(col('feature_name') == 'n_actions_per_category_id#C#1204.0').head() File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:3229, in DataFrame.select(self, *cols) 3184 def select(self, *cols: "ColumnOrName") -> "DataFrame": # type: ignore[misc] 3185 """Projects a set of expressions and returns a new :class:`DataFrame`. 3186 3187 .. versionadded:: 1.3.0 (...) 3227 +-----+---+ 3228 """ -> 3229 jdf = self._jdf.select(self._jcols(*cols)) 3230 return DataFrame(jdf, self.sparkSession) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2766, in DataFrame._jcols(self, *cols) 2764 if len(cols) == 1 and isinstance(cols[0], list): 2765 cols = cols[0] -> 2766 return self._jseq(cols, _to_java_column) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2753, in DataFrame._jseq(self, cols, converter) 2747 def _jseq( 2748 self, 2749 cols: Sequence, 2750 converter: Optional[Callable[..., Union["PrimitiveType", JavaObject]]] = None, 2751 ) -> JavaObject: 2752 """Return a JVM Seq of Columns from a list of Column or names""" -> 2753 return _to_seq(self.sparkSession._sc, cols, converter) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:88, in _to_seq(sc, cols, converter) 81 """ 82 Convert a list of Columns (or names) into a JVM Seq of Column. 83 84 An optional `converter` could be used to convert items in `cols` 85 into JVM Column objects. 86 """ 87 if converter: ---> 88 cols = [converter(c) for c in cols] 89 assert sc._jvm is not None 90 return sc._jvm.PythonUtils.toSeq(cols) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:63, in _to_java_column(col) 61 jcol = col._jc 62 elif isinstance(col, str): ---> 63 jcol = _create_column_from_name(col) 64 else: 65 raise PySparkTypeError( 66 error_class="NOT_COLUMN_OR_STR", 67 message_parameters={"arg_name": "col", "arg_type": type(col).__name__}, 68 ) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:56, in _create_column_from_name(name) 54 def _create_column_from_name(name: str) -> "Column": 55 sc = get_active_spark_context() ---> 56 return cast(JVMView, sc._jvm).functions.col(name) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name) 1709 if name == UserHelpAutoCompletion.KEY: 1710 return UserHelpAutoCompletion() -> 1712 answer = self._gateway_client.send_command( 1713 proto.REFLECTION_COMMAND_NAME + 1714 proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id + 1715 "\n" + proto.END_COMMAND_PART) 1716 if answer == proto.SUCCESS_PACKAGE: 1717 return JavaPackage(name, self._gateway_client, jvm_id=self._id) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
features_names.where(col('feature_name') == 'n_actions_per_category_id#O#1204.0').head()--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[42], line 1 ----> 1 features_names.where(col('feature_name') == 'n_actions_per_category_id#O#1204.0').head() NameError: name 'features_names' is not defined
from scipy.sparse import csr_matrix
import numpy as np
rows = csr_data['row_new'].values - 1
cols = csr_data['col_new'].values - 1
vals = csr_data['value'].values
X_csr = csr_matrix((vals, (rows, cols)))--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[43], line 4 1 from scipy.sparse import csr_matrix 2 import numpy as np ----> 4 rows = csr_data['row_new'].values - 1 5 cols = csr_data['col_new'].values - 1 6 vals = csr_data['value'].values NameError: name 'csr_data' is not defined
X_csr.shape--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[44], line 1 ----> 1 X_csr.shape NameError: name 'X_csr' is not defined
X_csr.shape, X_csr.nnz--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[45], line 1 ----> 1 X_csr.shape, X_csr.nnz NameError: name 'X_csr' is not defined
X_csr.nnz / (152347 * 92)--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[46], line 1 ----> 1 X_csr.nnz / (152347 * 92) NameError: name 'X_csr' is not defined
# The label vector. Let's make it dense, flat and binary
y = np.array(X_csr[:, 1].todense()).ravel()
y = np.array(y > 0, dtype=np.int64)--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[47], line 2 1 # The label vector. Let's make it dense, flat and binary ----> 2 y = np.array(X_csr[:, 1].todense()).ravel() 3 y = np.array(y > 0, dtype=np.int64) NameError: name 'X_csr' is not defined
X_csr.shape--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[48], line 1 ----> 1 X_csr.shape NameError: name 'X_csr' is not defined
# We remove the second and fourth column. 
# It actually contain the label we'll want to predict.
kept_cols = list(range(92))
kept_cols.pop(1)
kept_cols.pop(2)
X = X_csr[:, kept_cols]--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[49], line 6 4 kept_cols.pop(1) 5 kept_cols.pop(2) ----> 6 X = X_csr[:, kept_cols] NameError: name 'X_csr' is not defined
X_csr.shape--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[50], line 1 ----> 1 X_csr.shape NameError: name 'X_csr' is not defined
Wow ! That was a lot of work. Now we have a features matrix \(X\) and a vector of labels \(y\).
X.indices--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[51], line 1 ----> 1 X.indices NameError: name 'X' is not defined
X.indptr--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[52], line 1 ----> 1 X.indptr NameError: name 'X' is not defined
X.shape, X.nnz--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[53], line 1 ----> 1 X.shape, X.nnz NameError: name 'X' is not defined
y.shape, y.sum()--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[54], line 1 ----> 1 y.shape, y.sum() NameError: name 'y' is not defined
from sklearn.preprocessing import MaxAbsScaler
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
# Normalize the features
X = MaxAbsScaler().fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.3)
clf = LogisticRegression(
    penalty='l2',
    C=1e3,
    solver='lbfgs',
    class_weight='balanced'
)
clf.fit(X_train, y_train)--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[55], line 6 3 from sklearn.linear_model import LogisticRegression 5 # Normalize the features ----> 6 X = MaxAbsScaler().fit_transform(X) 7 X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.3) 9 clf = LogisticRegression( 10 penalty='l2', 11 C=1e3, 12 solver='lbfgs', 13 class_weight='balanced' 14 ) NameError: name 'X' is not defined
features_names = features_names.toPandas()['feature_name']--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[56], line 1 ----> 1 features_names = features_names.toPandas()['feature_name'] NameError: name 'features_names' is not defined
import matplotlib.pyplot as plt
%matplotlib inline
plt.figure(figsize=(16, 5))
plt.stem(clf.coef_[0], use_line_collection=True)
plt.title('Logistic regression coefficients', fontsize=18)
# We change the fontsize of minor ticks label
_ = plt.xticks(np.arange(clf.coef_[0].shape[0]), features_names, 
           rotation='vertical', fontsize=8)
_ = plt.yticks(fontsize=14)--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[57], line 5 2 get_ipython().run_line_magic('matplotlib', 'inline') 4 plt.figure(figsize=(16, 5)) ----> 5 plt.stem(clf.coef_[0], use_line_collection=True) 6 plt.title('Logistic regression coefficients', fontsize=18) 7 # We change the fontsize of minor ticks label NameError: name 'clf' is not defined
<Figure size 1536x480 with 0 Axes>from sklearn.metrics import precision_recall_curve, f1_score
precision, recall, _ = precision_recall_curve(y_test, clf.predict_proba(X_test)[:, 1])
    
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, label='LR (F1=%.2f)' % f1_score(y_test, clf.predict(X_test)), lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('Recall', fontsize=16)
plt.ylabel('Precision', fontsize=16)
plt.title('Precision/recall curve', fontsize=18)
plt.legend(loc="upper right", fontsize=14)--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[58], line 3 1 from sklearn.metrics import precision_recall_curve, f1_score ----> 3 precision, recall, _ = precision_recall_curve(y_test, clf.predict_proba(X_test)[:, 1]) 5 plt.figure(figsize=(8, 6)) 6 plt.plot(recall, precision, label='LR (F1=%.2f)' % f1_score(y_test, clf.predict(X_test)), lw=2) NameError: name 'y_test' is not defined