Code
import os
import sys
'PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable os.environ[
import os
import sys
'PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable os.environ[
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
= SparkConf().setAppName("Spark SQL Course")
conf = SparkContext(conf=conf) # no need for Spark 3...
sc
= (SparkSession
spark
.builder"Spark SQL Course")
.appName(
.getOrCreate() )
25/04/03 15:10:49 WARN Utils: Your hostname, boucheron-Precision-5480 resolves to a loopback address: 127.0.1.1; using 172.23.32.10 instead (on interface eth0)
25/04/03 15:10:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/03 15:10:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
= spark._sc sc
= sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) rdd
lambda a, b: a + b).collect() rdd.reduceByKey(
[Stage 0:> (0 + 20) / 20]
[('b', 1), ('a', 2)]
import requests, zipfile, io
from pathlib import Path
= Path('webdata.parquet')
path if not path.exists():
= "https://stephanegaiffas.github.io/big_data_course/data/webdata.parquet.zip"
url = requests.get(url)
r = zipfile.ZipFile(io.BytesIO(r.content))
z ='./') z.extractall(path
= './'
input_path
= os.path.join(input_path, 'webdata.parquet')
input_file = spark.read.parquet(input_file) df
6) df.head(
[Row(xid='001ff9b6-5383-4221-812d-58c2c3f234cc', action='O', date=datetime.datetime(2017, 1, 25, 7, 2, 18), website_id='3', url='http://www.8chances.com/grille', category_id=1002.0, zipcode='11370', device='SMP'),
Row(xid='0056ab7a-3cba-4ed5-a495-3d4abf79ab66', action='O', date=datetime.datetime(2016, 12, 28, 9, 47, 8), website_id='54', url='http://www.salaire-brut-en-net.fr/differences-brut-net/', category_id=1002.0, zipcode='86000', device='DSK'),
Row(xid='005ae4ab-363a-41a0-b8f9-faee47d622a4', action='O', date=datetime.datetime(2017, 1, 27, 22, 21, 6), website_id='74', url='http://www.realite-virtuelle.com/top-applications-horreur-vr-halloween', category_id=1002.0, zipcode='49700', device='DSK'),
Row(xid='006f867c-70cb-41f0-82af-f3688fa719c5', action='O', date=datetime.datetime(2016, 12, 20, 12, 45, 14), website_id='43', url='http://www.frenchblues.fr/', category_id=1002.0, zipcode='42660', device='DSK'),
Row(xid='006f867c-70cb-41f0-82af-f3688fa719c5', action='O', date=datetime.datetime(2016, 12, 20, 12, 56, 50), website_id='43', url='http://www.frenchblues.fr/', category_id=1002.0, zipcode='42660', device='DSK'),
Row(xid='006f867c-70cb-41f0-82af-f3688fa719c5', action='O', date=datetime.datetime(2016, 12, 20, 12, 56, 53), website_id='43', url='http://www.frenchblues.fr/contact/', category_id=1002.0, zipcode='42660', device='DSK')]
df.describe()
DataFrame[summary: string, xid: string, action: string, website_id: string, url: string, category_id: string, zipcode: string, device: string]
df.count()
1179532
First we need to import some things
from pyspark.sql import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql.functions import col, lit
'xid').distinct().count() df.select(
473761
= Window.partitionBy('xid')
xid_partition = func.count(col('action')).over(xid_partition)
n_events = df.withColumn('n_events', n_events)
df =2) df.head(n
[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', n_events=1),
Row(xid='000893c8-a14b-4f33-858f-210440f37def', action='O', date=datetime.datetime(2016, 12, 23, 16, 18, 37), website_id='56', url='http://blague.dumatin.fr/', category_id=1002.0, zipcode=None, device='DSK', n_events=1)]
'xid').agg(func.count('action')).head(5) df.groupBy(
[Row(xid='001c4a21-52c6-4890-b6ce-2b9d4ba06a56', count(action)=1),
Row(xid='0024344b-7ee2-4fcd-a0b4-bec26d8c8b0e', count(action)=4),
Row(xid='004564e3-87c1-4e16-ad2c-0e96afc3d617', count(action)=1),
Row(xid='006d807f-91c3-415a-bb5e-6b9f7e6517a1', count(action)=1),
Row(xid='006e0463-b24c-4996-84ab-d6d0d65a52aa', count(action)=1)]
= Window.partitionBy('xid')
xid_partition = func.max(col('date')).over(xid_partition)
max_date = func.datediff(func.current_date(), max_date)
n_days_since_last_event = df.withColumn('n_days_since_last_event',
df
n_days_since_last_event)=2) df.head(n
[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', n_events=1, n_days_since_last_event=3020),
Row(xid='000893c8-a14b-4f33-858f-210440f37def', action='O', date=datetime.datetime(2016, 12, 23, 16, 18, 37), website_id='56', url='http://blague.dumatin.fr/', category_id=1002.0, zipcode=None, device='DSK', n_events=1, n_days_since_last_event=3023)]
= Window.partitionBy('xid', 'device')
xid_device_partition = func.count(col('action')).over(xid_device_partition)
n_events_per_device = df.withColumn('n_events_per_device', n_events_per_device)
df =2) df.head(n
[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', n_events=1, n_days_since_last_event=3020, n_events_per_device=1),
Row(xid='000893c8-a14b-4f33-858f-210440f37def', action='O', date=datetime.datetime(2016, 12, 23, 16, 18, 37), website_id='56', url='http://blague.dumatin.fr/', category_id=1002.0, zipcode=None, device='DSK', n_events=1, n_days_since_last_event=3023, n_events_per_device=1)]
= Window.partitionBy('xid')
xid_partition = func.dense_rank().over(xid_partition.orderBy('device'))
rank_device = func.last(rank_device).over(xid_partition)
n_unique_device = df.withColumn('n_device', n_unique_device)
df =2) df.head(n
[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', n_events=1, n_days_since_last_event=3020, n_events_per_device=1, n_device=1),
Row(xid='000893c8-a14b-4f33-858f-210440f37def', action='O', date=datetime.datetime(2016, 12, 23, 16, 18, 37), website_id='56', url='http://blague.dumatin.fr/', category_id=1002.0, zipcode=None, device='DSK', n_events=1, n_days_since_last_event=3023, n_events_per_device=1, n_device=1)]
\
df'n_device') > 1)\
.where(col('xid', 'device', 'n_events', 'n_device', 'n_events_per_device')\
.select(=8) .head(n
[Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='MOB', n_events=6, n_device=2, n_events_per_device=1),
Row(xid='78156cdf-7229-46eb-bb6b-92d384f9a6fa', device='DSK', n_events=6, n_device=2, n_events_per_device=5),
Row(xid='78156cdf-7229-46eb-bb6b-92d384f9a6fa', device='DSK', n_events=6, n_device=2, n_events_per_device=5)]
We construct a ETL (Extract Transform Load) process on this data using the pyspark.sql
API.
Extraction is easy here, it’s just about reading the data
= spark.read.parquet(input_file)
df =3) df.head(n
[Row(xid='001ff9b6-5383-4221-812d-58c2c3f234cc', action='O', date=datetime.datetime(2017, 1, 25, 7, 2, 18), website_id='3', url='http://www.8chances.com/grille', category_id=1002.0, zipcode='11370', device='SMP'),
Row(xid='0056ab7a-3cba-4ed5-a495-3d4abf79ab66', action='O', date=datetime.datetime(2016, 12, 28, 9, 47, 8), website_id='54', url='http://www.salaire-brut-en-net.fr/differences-brut-net/', category_id=1002.0, zipcode='86000', device='DSK'),
Row(xid='005ae4ab-363a-41a0-b8f9-faee47d622a4', action='O', date=datetime.datetime(2017, 1, 27, 22, 21, 6), website_id='74', url='http://www.realite-virtuelle.com/top-applications-horreur-vr-halloween', category_id=1002.0, zipcode='49700', device='DSK')]
At this step we compute a lot of extra things from the data. The aim is to build features that describe users.
def n_events_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.count(col('action')).over(xid_partition)
n_events = df.withColumn('n_events', n_events)
df return df
def n_events_per_action_transformer(df):
= Window.partitionBy('xid', 'action')
xid_action_partition = func.count(col('action')).over(xid_action_partition)
n_events_per_action = df.withColumn('n_events_per_action', n_events_per_action)
df return df
def hour_transformer(df):
= func.hour(col('date'))
hour = df.withColumn('hour', hour)
df return df
def weekday_transformer(df):
= func.date_format(col('date'), 'EEEE')
weekday = df.withColumn('weekday', weekday)
df return df
def n_events_per_hour_transformer(df):
= Window.partitionBy('xid', 'hour')
xid_hour_partition = func.count(col('action')).over(xid_hour_partition)
n_events_per_hour = df.withColumn('n_events_per_hour', n_events_per_hour)
df return df
def n_events_per_weekday_transformer(df):
= Window.partitionBy('xid', 'weekday')
xid_weekday_partition = func.count(col('action')).over(xid_weekday_partition)
n_events_per_weekday = df.withColumn('n_events_per_weekday', n_events_per_weekday)
df return df
def n_days_since_last_event_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.max(col('date')).over(xid_partition)
max_date = func.datediff(func.current_date(), max_date)
n_days_since_last_event = df.withColumn('n_days_since_last_event',
df + lit(0.1))
n_days_since_last_event return df
def n_days_since_last_action_transformer(df):
= Window.partitionBy('xid', 'action')
xid_partition_action = func.max(col('date')).over(xid_partition_action)
max_date = func.datediff(func.current_date(),
n_days_since_last_action
max_date)= df.withColumn('n_days_since_last_action',
df + lit(0.1))
n_days_since_last_action return df
def n_unique_day_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.dayofyear(col('date'))
dayofyear = func.dense_rank().over(xid_partition.orderBy(dayofyear))
rank_day = func.last(rank_day).over(xid_partition)
n_unique_day = df.withColumn('n_unique_day', n_unique_day)
df return df
def n_unique_hour_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.dense_rank().over(xid_partition.orderBy('hour'))
rank_hour = func.last(rank_hour).over(xid_partition)
n_unique_hour = df.withColumn('n_unique_hour', n_unique_hour)
df return df
def n_events_per_device_transformer(df):
= Window.partitionBy('xid', 'device')
xid_device_partition = func.count(func.col('device')) \
n_events_per_device
.over(xid_device_partition)= df.withColumn('n_events_per_device', n_events_per_device)
df return df
def n_unique_device_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.dense_rank().over(xid_partition.orderBy('device'))
rank_device = func.last(rank_device).over(xid_partition)
n_unique_device = df.withColumn('n_device', n_unique_device)
df return df
def n_actions_per_category_id_transformer(df):
= Window.partitionBy('xid', 'category_id',
xid_category_id_partition 'action')
= func.count(func.col('action')) \
n_actions_per_category_id
.over(xid_category_id_partition)= df.withColumn('n_actions_per_category_id', n_actions_per_category_id)
df return df
def n_unique_category_id_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.dense_rank().over(xid_partition\
rank_category_id 'category_id'))
.orderBy(= func.last(rank_category_id).over(xid_partition)
n_unique_category_id = df.withColumn('n_unique_category_id', n_unique_category_id)
df return df
def n_events_per_category_id_transformer(df):
= Window.partitionBy('xid', 'category_id')
xid_category_id_partition = func.count(func.col('action')) \
n_events_per_category_id
.over(xid_category_id_partition)= df.withColumn('n_events_per_category_id', n_events_per_category_id)
df return df
def n_events_per_website_id_transformer(df):
= Window.partitionBy('xid', 'website_id')
xid_website_id_partition = func.count(col('action'))\
n_events_per_website_id
.over(xid_website_id_partition)= df.withColumn('n_events_per_website_id', n_events_per_website_id)
df return df
= [
transformers
hour_transformer,
weekday_transformer,
n_events_per_hour_transformer,
n_events_per_weekday_transformer,
n_days_since_last_event_transformer,
n_days_since_last_action_transformer,
n_unique_day_transformer,
n_unique_hour_transformer,
n_events_per_device_transformer,
n_unique_device_transformer,
n_actions_per_category_id_transformer,
n_events_per_category_id_transformer,
n_events_per_website_id_transformer,
]
for transformer in transformers:
= transformer(df)
df
=1) df.head(n
[Stage 35:> (0 + 1) / 1]
[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', hour=13, weekday='Monday', n_events_per_hour=1, n_events_per_weekday=1, n_days_since_last_event=3020.1, n_days_since_last_action=3020.1, n_unique_day=1, n_unique_hour=1, n_events_per_device=1, n_device=1, n_actions_per_category_id=1, n_events_per_category_id=1, n_events_per_website_id=1)]
sorted(df.columns)
['action',
'category_id',
'date',
'device',
'hour',
'n_actions_per_category_id',
'n_days_since_last_action',
'n_days_since_last_event',
'n_device',
'n_events_per_category_id',
'n_events_per_device',
'n_events_per_hour',
'n_events_per_website_id',
'n_events_per_weekday',
'n_unique_day',
'n_unique_hour',
'url',
'website_id',
'weekday',
'xid',
'zipcode']
Here, we use all the previous computations (saved in the columns of the dataframe) to compute aggregated informations about each user.
def n_events_per_hour_loader(df):
= df\
csr 'xid', 'hour', 'n_events_per_hour')\
.select('n_events_per_hour', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_hour#'), col('hour'))
feature_name = csr\
csr 'feature_name', feature_name)\
.withColumn('hour')
.drop(return csr
def n_events_per_website_id_loader(df):
= df.select('xid', 'website_id', 'n_events_per_website_id')\
csr 'n_events_per_hour', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_website_id#'),
feature_name 'website_id'))
col(= csr\
csr 'feature_name', feature_name)\
.withColumn('website_id')
.drop(return csr
def n_events_per_hour_loader(df):
= df\
csr 'xid', 'hour', 'n_events_per_hour')\
.select('n_events_per_hour', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_hour#'), col('hour'))
feature_name = csr\
csr 'feature_name', feature_name)\
.withColumn('hour')
.drop(return csr
def n_events_per_weekday_loader(df):
= df\
csr 'xid', 'weekday', 'n_events_per_weekday')\
.select('n_events_per_weekday', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_weekday#'), col('weekday'))
feature_name = csr\
csr 'feature_name', feature_name)\
.withColumn('weekday')
.drop(return csr
def n_days_since_last_event_loader(df):
= df.select('xid', 'n_days_since_last_event')\
csr 'n_days_since_last_event#', 'value')\
.withColumnRenamed(
.distinct()= lit('n_days_since_last_event')
feature_name = csr\
csr 'feature_name', feature_name)
.withColumn(return csr
def n_days_since_last_action_loader(df):
= df.select('xid', 'action', 'n_days_since_last_action')\
csr 'n_days_since_last_action', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_days_since_last_action#'), col('action'))
feature_name = csr\
csr 'feature_name', feature_name)\
.withColumn('action')
.drop(return csr
def n_unique_day_loader(df):
= df.select('xid', 'n_unique_day')\
csr 'n_unique_day', 'value')\
.withColumnRenamed(
.distinct()= lit('n_unique_day')
feature_name = csr\
csr 'feature_name', feature_name)
.withColumn(return csr
def n_unique_hour_loader(df):
= df.select('xid', 'n_unique_hour')\
csr 'n_unique_hour', 'value')\
.withColumnRenamed(
.distinct()= lit('n_unique_hour')
feature_name = csr\
csr 'feature_name', feature_name)
.withColumn(return csr
def n_events_per_device_loader(df):
= df\
csr 'xid', 'device', 'n_events_per_device')\
.select('n_events_per_device', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_device#'), col('device'))
feature_name = csr\
csr 'feature_name', feature_name)\
.withColumn('device')
.drop(return csr
def n_unique_device_loader(df):
= df.select('xid', 'n_device')\
csr 'n_device', 'value')\
.withColumnRenamed(
.distinct()= lit('n_device')
feature_name = csr\
csr 'feature_name', feature_name)
.withColumn(return csr
def n_events_per_category_id_loader(df):
= df.select('xid', 'category_id', 'n_events_per_category_id')\
csr 'n_events_per_category_id', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_category_id#'),
feature_name 'category_id'))
col(= csr\
csr 'feature_name', feature_name)\
.withColumn('category_id')
.drop(return csr
def n_actions_per_category_id_loader(df):
= df.select('xid', 'category_id', 'action', 'n_actions_per_category_id')\
csr 'n_actions_per_category_id', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_actions_per_category_id#'),
feature_name 'action'), lit('#'),
col('category_id'))
col(= csr\
csr 'feature_name', feature_name)\
.withColumn('category_id')\
.drop('action')
.drop(return csr
def n_events_per_website_id_loader(df):
= df.select('xid', 'website_id', 'n_events_per_website_id')\
csr 'n_events_per_website_id', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_website_id#'),
feature_name 'website_id'))
col(= csr\
csr 'feature_name', feature_name)\
.withColumn('website_id')
.drop(return csr
from functools import reduce
= [
loaders
n_events_per_hour_loader,
n_events_per_website_id_loader,
n_events_per_hour_loader,
n_events_per_weekday_loader,
n_days_since_last_event_loader,
n_days_since_last_action_loader,
n_unique_day_loader,
n_unique_hour_loader,
n_events_per_device_loader,
n_unique_device_loader,
n_events_per_category_id_loader,
n_actions_per_category_id_loader,
n_events_per_website_id_loader,
]
def union(df, other):
return df.union(other)
= reduce(
csr lambda df1, df2: df1.union(df2),
for loader in loaders]
[loader(df)
)
=3) csr.head(n
[Stage 38:=>(10 + 2) / 12][Stage 39:==>(9 + 3) / 12][Stage 40:=> (6 + 6) / 12][Stage 40:=> (6 + 6) / 12][Stage 41:=> (6 + 6) / 12][Stage 42:=> (6 + 6) / 12][Stage 43:==>(8 + 4) / 12][Stage 44:=> (6 + 6) / 12][Stage 45:> (3 + 9) / 12][Stage 44:=>(10 + 2) / 12][Stage 45:=> (6 + 6) / 12][Stage 46:=> (6 + 6) / 12][Stage 48:> (0 + 19) / 19][Stage 50:> (1 + 1) / 20][Stage 50:==============> (5 + 15) / 20][Stage 64:> (0 + 1) / 1]
[Row(xid='000095cc-9a61-49b5-8ad5-83442daa93d6', value=2.0, feature_name='n_events_per_hour#21'),
Row(xid='0000fa20-47ca-4548-82e9-78d81aa83fba', value=1.0, feature_name='n_events_per_hour#23'),
Row(xid='00010386-a996-48ad-9888-4df5440188f2', value=1.0, feature_name='n_events_per_hour#21')]
csr.columns
['xid', 'value', 'feature_name']
csr.count()
[Stage 65:=>(10 + 2) / 12][Stage 67:=> (7 + 5) / 12][Stage 68:==>(9 + 3) / 12][Stage 69:=========> (7 + 5) / 12][Stage 70:===========> (8 + 4) / 12][Stage 72:=> (7 + 5) / 12][Stage 73:=> (6 + 6) / 12][Stage 74:=> (6 + 6) / 12][Stage 75:==========================================> (9 + 3) / 12]25/04/03 15:11:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
[Stage 93:> (0 + 20) / 20]25/04/03 15:11:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:07 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
[19,167s][warning][gc,alloc] Executor task launch worker for task 7.0 in stage 93.0 (TID 527): Retried waiting for GCLocker too often allocating 524290 words
[19,195s][warning][gc,alloc] Executor task launch worker for task 9.0 in stage 93.0 (TID 529): Retried waiting for GCLocker too often allocating 524290 words
[19,195s][warning][gc,alloc] Executor task launch worker for task 7.0 in stage 93.0 (TID 527): Retried waiting for GCLocker too often allocating 524290 words
[19,214s][warning][gc,alloc] Executor task launch worker for task 7.0 in stage 93.0 (TID 527): Retried waiting for GCLocker too often allocating 486289 words
[19,215s][warning][gc,alloc] Executor task launch worker for task 9.0 in stage 93.0 (TID 529): Retried waiting for GCLocker too often allocating 131074 words
[19,215s][warning][gc,alloc] Executor task launch worker for task 1.0 in stage 93.0 (TID 521): Retried waiting for GCLocker too often allocating 131074 words
[19,218s][warning][gc,alloc] Executor task launch worker for task 3.0 in stage 93.0 (TID 523): Retried waiting for GCLocker too often allocating 524290 words
[19,225s][warning][gc,alloc] Executor task launch worker for task 3.0 in stage 93.0 (TID 523): Retried waiting for GCLocker too often allocating 131074 words
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (3890290 bytes), try again.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/03 15:11:08 WARN TaskMemoryManager: Failed to allocate a page (4194304 bytes), try again.
25/04/03 15:11:08 ERROR Executor: Exception in task 9.0 in stage 93.0 (TID 529)
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR Executor: Exception in task 3.0 in stage 93.0 (TID 523)
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR Executor: Exception in task 1.0 in stage 93.0 (TID 521)
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 1.0 in stage 93.0 (TID 521),5,main]
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker for task 9.0 in stage 93.0 (TID 529),5,main]
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker for task 3.0 in stage 93.0 (TID 523),5,main]
java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 WARN TaskSetManager: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
25/04/03 15:11:08 ERROR TaskSetManager: Task 1 in stage 93.0 failed 1 times; aborting job
25/04/03 15:11:08 WARN TaskSetManager: Lost task 18.0 in stage 93.0 (TID 538) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 14.0 in stage 93.0 (TID 534) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 4.0 in stage 93.0 (TID 524) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 15.0 in stage 93.0 (TID 535) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 12.0 in stage 93.0 (TID 532) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 5.0 in stage 93.0 (TID 525) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 7.0 in stage 93.0 (TID 527) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 0.0 in stage 93.0 (TID 520) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 2.0 in stage 93.0 (TID 522) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 17.0 in stage 93.0 (TID 537) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 13.0 in stage 93.0 (TID 533) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 8.0 in stage 93.0 (TID 528) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 19.0 in stage 93.0 (TID 539) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 11.0 in stage 93.0 (TID 531) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 6.0 in stage 93.0 (TID 526) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 16.0 in stage 93.0 (TID 536) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
25/04/03 15:11:08 WARN TaskSetManager: Lost task 10.0 in stage 93.0 (TID 530) (host-32-10.sg.lan executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 93.0 failed 1 times, most recent failure: Lost task 1.0 in stage 93.0 (TID 521) (host-32-10.sg.lan executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/IPython/core/interactiveshell.py", line 3577, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "/tmp/ipykernel_87987/3594614522.py", line 1, in <module>
csr.count()
File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py", line 1240, in count
return int(self._jdf.count())
^^^^^^^^^^^^^^^^^
File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
^^^^^^^^^^^^^^^^^
File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
return f(*a, **kw)
^^^^^^^^^^^
File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 516, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
answer = smart_decode(self.stream.readline()[:-1])
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/socket.py", line 707, in readinto
return self._sock.recv_into(b)
^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [Errno 104] Connection reset by peer
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
response = connection.send_command(command)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/boucheron/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 539, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) [... skipping hidden 1 frame] Cell In[26], line 1 ----> 1 csr.count() File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:1240, in DataFrame.count(self) 1218 """Returns the number of rows in this :class:`DataFrame`. 1219 1220 .. versionadded:: 1.3.0 (...) 1238 3 1239 """ -> 1240 return int(self._jdf.count()) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw) 178 try: --> 179 return f(*a, **kw) 180 except Py4JJavaError as e: File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: <class 'str'>: (<class 'ConnectionRefusedError'>, ConnectionRefusedError(111, 'Connection refused')) During handling of the above exception, another exception occurred: ConnectionRefusedError Traceback (most recent call last) [... skipping hidden 1 frame] File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/IPython/core/interactiveshell.py:2179, in InteractiveShell.showtraceback(self, exc_tuple, filename, tb_offset, exception_only, running_compiled_code) 2176 traceback.print_exc() 2177 return None -> 2179 self._showtraceback(etype, value, stb) 2180 if self.call_pdb: 2181 # drop into debugger 2182 self.debugger(force=True) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/ipykernel/zmqshell.py:559, in ZMQInteractiveShell._showtraceback(self, etype, evalue, stb) 553 sys.stdout.flush() 554 sys.stderr.flush() 556 exc_content = { 557 "traceback": stb, 558 "ename": str(etype.__name__), --> 559 "evalue": str(evalue), 560 } 562 dh = self.displayhook 563 # Send exception info over pub socket for other clients than the caller 564 # to pick up File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/protocol.py:471, in Py4JJavaError.__str__(self) 469 def __str__(self): 470 gateway_client = self.java_exception._gateway_client --> 471 answer = gateway_client.send_command(self.exception_cmd) 472 return_value = get_return_value(answer, gateway_client, None, None) 473 # Note: technically this should return a bytestring 'str' rather than 474 # unicodes in Python 2; however, it can return unicodes for now. 475 # See https://github.com/bartdag/py4j/issues/306 for more details. File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
# Replace features names and xid by a unique number
= Window().orderBy('feature_name')
feature_name_partition = Window().orderBy('xid')
xid_partition
= func.dense_rank().over(feature_name_partition)
col_idx = func.dense_rank().over(xid_partition)
row_idx
= csr.withColumn('col', col_idx)\
csr 'row', row_idx)
.withColumn(
= csr.na.drop('any')
csr
=5) csr.head(n
--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[27], line 2 1 # Replace features names and xid by a unique number ----> 2 feature_name_partition = Window().orderBy('feature_name') 3 xid_partition = Window().orderBy('xid') 5 col_idx = func.dense_rank().over(feature_name_partition) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/utils.py:222, in try_remote_window.<locals>.wrapped(*args, **kwargs) 220 return getattr(Window, f.__name__)(*args, **kwargs) 221 else: --> 222 return f(*args, **kwargs) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/window.py:186, in Window.orderBy(*cols) 137 """ 138 Creates a :class:`WindowSpec` with the ordering defined. 139 (...) 183 +---+--------+----------+ 184 """ 185 sc = get_active_spark_context() --> 186 jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.orderBy( 187 _to_java_cols(cols) 188 ) 189 return WindowSpec(jspec) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name) 1709 if name == UserHelpAutoCompletion.KEY: 1710 return UserHelpAutoCompletion() -> 1712 answer = self._gateway_client.send_command( 1713 proto.REFLECTION_COMMAND_NAME + 1714 proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id + 1715 "\n" + proto.END_COMMAND_PART) 1716 if answer == proto.SUCCESS_PACKAGE: 1717 return JavaPackage(name, self._gateway_client, jvm_id=self._id) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
# Let's save the result of our hard work into a new parquet file
= './'
output_path = os.path.join(output_path, 'csr.parquet')
output_file ='overwrite') csr.write.parquet(output_file, mode
--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[28], line 4 2 output_path = './' 3 output_file = os.path.join(output_path, 'csr.parquet') ----> 4 csr.write.parquet(output_file, mode='overwrite') File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:509, in DataFrame.write(self) 482 @property 483 def write(self) -> DataFrameWriter: 484 """ 485 Interface for saving the content of the non-streaming :class:`DataFrame` out into external 486 storage. (...) 507 >>> _ = spark.sql("DROP TABLE tab2") 508 """ --> 509 return DataFrameWriter(self) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:964, in DataFrameWriter.__init__(self, df) 962 self._df = df 963 self._spark = df.sparkSession --> 964 self._jwrite = df._jdf.write() File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args) 1314 args_command, temp_args = self._build_args(*args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART -> 1321 answer = self.gateway_client.send_command(command) 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
= './'
csr_path = os.path.join(csr_path, 'csr.parquet')
csr_file
= spark.read.parquet(csr_file)
df =5) df.head(n
--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[29], line 4 1 csr_path = './' 2 csr_file = os.path.join(csr_path, 'csr.parquet') ----> 4 df = spark.read.parquet(csr_file) 5 df.head(n=5) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/session.py:1706, in SparkSession.read(self) 1669 @property 1670 def read(self) -> DataFrameReader: 1671 """ 1672 Returns a :class:`DataFrameReader` that can be used to read data 1673 in as a :class:`DataFrame`. (...) 1704 +---+------------+ 1705 """ -> 1706 return DataFrameReader(self) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:70, in DataFrameReader.__init__(self, spark) 69 def __init__(self, spark: "SparkSession"): ---> 70 self._jreader = spark._jsparkSession.read() 71 self._spark = spark File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args) 1314 args_command, temp_args = self._build_args(*args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART -> 1321 answer = self.gateway_client.send_command(command) 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
df.count()
--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[30], line 1 ----> 1 df.count() File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:1240, in DataFrame.count(self) 1217 def count(self) -> int: 1218 """Returns the number of rows in this :class:`DataFrame`. 1219 1220 .. versionadded:: 1.3.0 (...) 1238 3 1239 """ -> 1240 return int(self._jdf.count()) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args) 1314 args_command, temp_args = self._build_args(*args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART -> 1321 answer = self.gateway_client.send_command(command) 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
# What are the features related to campaign_id 1204 ?
= \
features_names 'feature_name')\
df.select(\
.distinct()'feature_name'] .toPandas()[
--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[31], line 3 1 # What are the features related to campaign_id 1204 ? 2 features_names = \ ----> 3 df.select('feature_name')\ 4 .distinct()\ 5 .toPandas()['feature_name'] File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:3229, in DataFrame.select(self, *cols) 3184 def select(self, *cols: "ColumnOrName") -> "DataFrame": # type: ignore[misc] 3185 """Projects a set of expressions and returns a new :class:`DataFrame`. 3186 3187 .. versionadded:: 1.3.0 (...) 3227 +-----+---+ 3228 """ -> 3229 jdf = self._jdf.select(self._jcols(*cols)) 3230 return DataFrame(jdf, self.sparkSession) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2766, in DataFrame._jcols(self, *cols) 2764 if len(cols) == 1 and isinstance(cols[0], list): 2765 cols = cols[0] -> 2766 return self._jseq(cols, _to_java_column) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2753, in DataFrame._jseq(self, cols, converter) 2747 def _jseq( 2748 self, 2749 cols: Sequence, 2750 converter: Optional[Callable[..., Union["PrimitiveType", JavaObject]]] = None, 2751 ) -> JavaObject: 2752 """Return a JVM Seq of Columns from a list of Column or names""" -> 2753 return _to_seq(self.sparkSession._sc, cols, converter) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:88, in _to_seq(sc, cols, converter) 81 """ 82 Convert a list of Columns (or names) into a JVM Seq of Column. 83 84 An optional `converter` could be used to convert items in `cols` 85 into JVM Column objects. 86 """ 87 if converter: ---> 88 cols = [converter(c) for c in cols] 89 assert sc._jvm is not None 90 return sc._jvm.PythonUtils.toSeq(cols) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:63, in _to_java_column(col) 61 jcol = col._jc 62 elif isinstance(col, str): ---> 63 jcol = _create_column_from_name(col) 64 else: 65 raise PySparkTypeError( 66 error_class="NOT_COLUMN_OR_STR", 67 message_parameters={"arg_name": "col", "arg_type": type(col).__name__}, 68 ) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:56, in _create_column_from_name(name) 54 def _create_column_from_name(name: str) -> "Column": 55 sc = get_active_spark_context() ---> 56 return cast(JVMView, sc._jvm).functions.col(name) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name) 1709 if name == UserHelpAutoCompletion.KEY: 1710 return UserHelpAutoCompletion() -> 1712 answer = self._gateway_client.send_command( 1713 proto.REFLECTION_COMMAND_NAME + 1714 proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id + 1715 "\n" + proto.END_COMMAND_PART) 1716 if answer == proto.SUCCESS_PACKAGE: 1717 return JavaPackage(name, self._gateway_client, jvm_id=self._id) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
features_names
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[32], line 1 ----> 1 features_names NameError: name 'features_names' is not defined
for feature_name in features_names if '1204' in feature_name] [feature_name
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[33], line 1 ----> 1 [feature_name for feature_name in features_names if '1204' in feature_name] NameError: name 'features_names' is not defined
# Look for the xid that have at least one exposure to campaign 1204
= func.when(
keep 'feature_name') == 'n_actions_per_category_id#C#1204.0') |
(col('feature_name') == 'n_actions_per_category_id#O#1204.0'),
(col(1).otherwise(0)
= df.withColumn('keep', keep)
df
'keep') > 0).count() df.where(col(
--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[34], line 3 1 # Look for the xid that have at least one exposure to campaign 1204 2 keep = func.when( ----> 3 (col('feature_name') == 'n_actions_per_category_id#C#1204.0') | 4 (col('feature_name') == 'n_actions_per_category_id#O#1204.0'), 5 1).otherwise(0) 6 df = df.withColumn('keep', keep) 8 df.where(col('keep') > 0).count() File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/utils.py:174, in try_remote_functions.<locals>.wrapped(*args, **kwargs) 172 return getattr(functions, f.__name__)(*args, **kwargs) 173 else: --> 174 return f(*args, **kwargs) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:223, in col(col) 196 @try_remote_functions 197 def col(col: str) -> Column: 198 """ 199 Returns a :class:`~pyspark.sql.Column` based on the given column name. 200 (...) 221 Column<'x'> 222 """ --> 223 return _invoke_function("col", col) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:96, in _invoke_function(name, *args) 91 """ 92 Invokes JVM function identified by name with args 93 and wraps the result with :class:`~pyspark.sql.Column`. 94 """ 95 assert SparkContext._active_spark_context is not None ---> 96 jf = _get_jvm_function(name, SparkContext._active_spark_context) 97 return Column(jf(*args)) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:87, in _get_jvm_function(name, sc) 82 """ 83 Retrieves JVM function identified by name from 84 Java gateway associated with sc. 85 """ 86 assert sc._jvm is not None ---> 87 return getattr(sc._jvm.functions, name) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name) 1709 if name == UserHelpAutoCompletion.KEY: 1710 return UserHelpAutoCompletion() -> 1712 answer = self._gateway_client.send_command( 1713 proto.REFLECTION_COMMAND_NAME + 1714 proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id + 1715 "\n" + proto.END_COMMAND_PART) 1716 if answer == proto.SUCCESS_PACKAGE: 1717 return JavaPackage(name, self._gateway_client, jvm_id=self._id) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
# Sum of the keeps :)
= Window.partitionBy('xid')
xid_partition = func.sum(col('keep')).over(xid_partition)
sum_keep = df.withColumn('sum_keep', sum_keep) df
--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[35], line 2 1 # Sum of the keeps :) ----> 2 xid_partition = Window.partitionBy('xid') 3 sum_keep = func.sum(col('keep')).over(xid_partition) 4 df = df.withColumn('sum_keep', sum_keep) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/utils.py:222, in try_remote_window.<locals>.wrapped(*args, **kwargs) 220 return getattr(Window, f.__name__)(*args, **kwargs) 221 else: --> 222 return f(*args, **kwargs) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/window.py:129, in Window.partitionBy(*cols) 80 """ 81 Creates a :class:`WindowSpec` with the partitioning defined. 82 (...) 126 +---+--------+----------+ 127 """ 128 sc = get_active_spark_context() --> 129 jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.partitionBy( 130 _to_java_cols(cols) 131 ) 132 return WindowSpec(jspec) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name) 1709 if name == UserHelpAutoCompletion.KEY: 1710 return UserHelpAutoCompletion() -> 1712 answer = self._gateway_client.send_command( 1713 proto.REFLECTION_COMMAND_NAME + 1714 proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id + 1715 "\n" + proto.END_COMMAND_PART) 1716 if answer == proto.SUCCESS_PACKAGE: 1717 return JavaPackage(name, self._gateway_client, jvm_id=self._id) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
# Let's keep the xid exposed to 1204
= df.where(col('sum_keep') > 0) df
--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[36], line 2 1 # Let's keep the xid exposed to 1204 ----> 2 df = df.where(col('sum_keep') > 0) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/utils.py:174, in try_remote_functions.<locals>.wrapped(*args, **kwargs) 172 return getattr(functions, f.__name__)(*args, **kwargs) 173 else: --> 174 return f(*args, **kwargs) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:223, in col(col) 196 @try_remote_functions 197 def col(col: str) -> Column: 198 """ 199 Returns a :class:`~pyspark.sql.Column` based on the given column name. 200 (...) 221 Column<'x'> 222 """ --> 223 return _invoke_function("col", col) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:96, in _invoke_function(name, *args) 91 """ 92 Invokes JVM function identified by name with args 93 and wraps the result with :class:`~pyspark.sql.Column`. 94 """ 95 assert SparkContext._active_spark_context is not None ---> 96 jf = _get_jvm_function(name, SparkContext._active_spark_context) 97 return Column(jf(*args)) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/functions.py:87, in _get_jvm_function(name, sc) 82 """ 83 Retrieves JVM function identified by name from 84 Java gateway associated with sc. 85 """ 86 assert sc._jvm is not None ---> 87 return getattr(sc._jvm.functions, name) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name) 1709 if name == UserHelpAutoCompletion.KEY: 1710 return UserHelpAutoCompletion() -> 1712 answer = self._gateway_client.send_command( 1713 proto.REFLECTION_COMMAND_NAME + 1714 proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id + 1715 "\n" + proto.END_COMMAND_PART) 1716 if answer == proto.SUCCESS_PACKAGE: 1717 return JavaPackage(name, self._gateway_client, jvm_id=self._id) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
df.count()
--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[37], line 1 ----> 1 df.count() File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:1240, in DataFrame.count(self) 1217 def count(self) -> int: 1218 """Returns the number of rows in this :class:`DataFrame`. 1219 1220 .. versionadded:: 1.3.0 (...) 1238 3 1239 """ -> 1240 return int(self._jdf.count()) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args) 1314 args_command, temp_args = self._build_args(*args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART -> 1321 answer = self.gateway_client.send_command(command) 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
'xid').distinct().count() df.select(
--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[38], line 1 ----> 1 df.select('xid').distinct().count() File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:3229, in DataFrame.select(self, *cols) 3184 def select(self, *cols: "ColumnOrName") -> "DataFrame": # type: ignore[misc] 3185 """Projects a set of expressions and returns a new :class:`DataFrame`. 3186 3187 .. versionadded:: 1.3.0 (...) 3227 +-----+---+ 3228 """ -> 3229 jdf = self._jdf.select(self._jcols(*cols)) 3230 return DataFrame(jdf, self.sparkSession) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2766, in DataFrame._jcols(self, *cols) 2764 if len(cols) == 1 and isinstance(cols[0], list): 2765 cols = cols[0] -> 2766 return self._jseq(cols, _to_java_column) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2753, in DataFrame._jseq(self, cols, converter) 2747 def _jseq( 2748 self, 2749 cols: Sequence, 2750 converter: Optional[Callable[..., Union["PrimitiveType", JavaObject]]] = None, 2751 ) -> JavaObject: 2752 """Return a JVM Seq of Columns from a list of Column or names""" -> 2753 return _to_seq(self.sparkSession._sc, cols, converter) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:88, in _to_seq(sc, cols, converter) 81 """ 82 Convert a list of Columns (or names) into a JVM Seq of Column. 83 84 An optional `converter` could be used to convert items in `cols` 85 into JVM Column objects. 86 """ 87 if converter: ---> 88 cols = [converter(c) for c in cols] 89 assert sc._jvm is not None 90 return sc._jvm.PythonUtils.toSeq(cols) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:63, in _to_java_column(col) 61 jcol = col._jc 62 elif isinstance(col, str): ---> 63 jcol = _create_column_from_name(col) 64 else: 65 raise PySparkTypeError( 66 error_class="NOT_COLUMN_OR_STR", 67 message_parameters={"arg_name": "col", "arg_type": type(col).__name__}, 68 ) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:56, in _create_column_from_name(name) 54 def _create_column_from_name(name: str) -> "Column": 55 sc = get_active_spark_context() ---> 56 return cast(JVMView, sc._jvm).functions.col(name) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name) 1709 if name == UserHelpAutoCompletion.KEY: 1710 return UserHelpAutoCompletion() -> 1712 answer = self._gateway_client.send_command( 1713 proto.REFLECTION_COMMAND_NAME + 1714 proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id + 1715 "\n" + proto.END_COMMAND_PART) 1716 if answer == proto.SUCCESS_PACKAGE: 1717 return JavaPackage(name, self._gateway_client, jvm_id=self._id) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
= Window().orderBy('row')
row_partition = Window().orderBy('col')
col_partition = func.dense_rank().over(row_partition)
row_new = func.dense_rank().over(col_partition)
col_new = df.withColumn('row_new', row_new)
df = df.withColumn('col_new', col_new)
df = df.select('row_new', 'col_new', 'value').toPandas() csr_data
--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[39], line 1 ----> 1 row_partition = Window().orderBy('row') 2 col_partition = Window().orderBy('col') 3 row_new = func.dense_rank().over(row_partition) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/utils.py:222, in try_remote_window.<locals>.wrapped(*args, **kwargs) 220 return getattr(Window, f.__name__)(*args, **kwargs) 221 else: --> 222 return f(*args, **kwargs) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/window.py:186, in Window.orderBy(*cols) 137 """ 138 Creates a :class:`WindowSpec` with the ordering defined. 139 (...) 183 +---+--------+----------+ 184 """ 185 sc = get_active_spark_context() --> 186 jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.orderBy( 187 _to_java_cols(cols) 188 ) 189 return WindowSpec(jspec) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name) 1709 if name == UserHelpAutoCompletion.KEY: 1710 return UserHelpAutoCompletion() -> 1712 answer = self._gateway_client.send_command( 1713 proto.REFLECTION_COMMAND_NAME + 1714 proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id + 1715 "\n" + proto.END_COMMAND_PART) 1716 if answer == proto.SUCCESS_PACKAGE: 1717 return JavaPackage(name, self._gateway_client, jvm_id=self._id) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
csr_data.head()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[40], line 1 ----> 1 csr_data.head() NameError: name 'csr_data' is not defined
= df.select('feature_name', 'col_new').distinct()
features_names 'feature_name') == 'n_actions_per_category_id#C#1204.0').head() features_names.where(col(
--------------------------------------------------------------------------- ConnectionRefusedError Traceback (most recent call last) Cell In[41], line 1 ----> 1 features_names = df.select('feature_name', 'col_new').distinct() 2 features_names.where(col('feature_name') == 'n_actions_per_category_id#C#1204.0').head() File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:3229, in DataFrame.select(self, *cols) 3184 def select(self, *cols: "ColumnOrName") -> "DataFrame": # type: ignore[misc] 3185 """Projects a set of expressions and returns a new :class:`DataFrame`. 3186 3187 .. versionadded:: 1.3.0 (...) 3227 +-----+---+ 3228 """ -> 3229 jdf = self._jdf.select(self._jcols(*cols)) 3230 return DataFrame(jdf, self.sparkSession) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2766, in DataFrame._jcols(self, *cols) 2764 if len(cols) == 1 and isinstance(cols[0], list): 2765 cols = cols[0] -> 2766 return self._jseq(cols, _to_java_column) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/dataframe.py:2753, in DataFrame._jseq(self, cols, converter) 2747 def _jseq( 2748 self, 2749 cols: Sequence, 2750 converter: Optional[Callable[..., Union["PrimitiveType", JavaObject]]] = None, 2751 ) -> JavaObject: 2752 """Return a JVM Seq of Columns from a list of Column or names""" -> 2753 return _to_seq(self.sparkSession._sc, cols, converter) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:88, in _to_seq(sc, cols, converter) 81 """ 82 Convert a list of Columns (or names) into a JVM Seq of Column. 83 84 An optional `converter` could be used to convert items in `cols` 85 into JVM Column objects. 86 """ 87 if converter: ---> 88 cols = [converter(c) for c in cols] 89 assert sc._jvm is not None 90 return sc._jvm.PythonUtils.toSeq(cols) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:63, in _to_java_column(col) 61 jcol = col._jc 62 elif isinstance(col, str): ---> 63 jcol = _create_column_from_name(col) 64 else: 65 raise PySparkTypeError( 66 error_class="NOT_COLUMN_OR_STR", 67 message_parameters={"arg_name": "col", "arg_type": type(col).__name__}, 68 ) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/column.py:56, in _create_column_from_name(name) 54 def _create_column_from_name(name: str) -> "Column": 55 sc = get_active_spark_context() ---> 56 return cast(JVMView, sc._jvm).functions.col(name) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1712, in JVMView.__getattr__(self, name) 1709 if name == UserHelpAutoCompletion.KEY: 1710 return UserHelpAutoCompletion() -> 1712 answer = self._gateway_client.send_command( 1713 proto.REFLECTION_COMMAND_NAME + 1714 proto.REFL_GET_UNKNOWN_SUB_COMMAND_NAME + name + "\n" + self._id + 1715 "\n" + proto.END_COMMAND_PART) 1716 if answer == proto.SUCCESS_PACKAGE: 1717 return JavaPackage(name, self._gateway_client, jvm_id=self._id) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1036, in GatewayClient.send_command(self, command, retry, binary) 1015 def send_command(self, command, retry=True, binary=False): 1016 """Sends a command to the JVM. This method is not intended to be 1017 called directly by Py4J users. It is usually called by 1018 :class:`JavaMember` instances. (...) 1034 if `binary` is `True`. 1035 """ -> 1036 connection = self._get_connection() 1037 try: 1038 response = connection.send_command(command) File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:284, in JavaClient._get_connection(self) 281 pass 283 if connection is None or connection.socket is None: --> 284 connection = self._create_new_connection() 285 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:291, in JavaClient._create_new_connection(self) 287 def _create_new_connection(self): 288 connection = ClientServerConnection( 289 self.java_parameters, self.python_parameters, 290 self.gateway_property, self) --> 291 connection.connect_to_java_server() 292 self.set_thread_connection(connection) 293 return connection File ~/sandbox/IFEBY310/.venv/lib/python3.12/site-packages/py4j/clientserver.py:438, in ClientServerConnection.connect_to_java_server(self) 435 if self.ssl_context: 436 self.socket = self.ssl_context.wrap_socket( 437 self.socket, server_hostname=self.java_address) --> 438 self.socket.connect((self.java_address, self.java_port)) 439 self.stream = self.socket.makefile("rb") 440 self.is_connected = True ConnectionRefusedError: [Errno 111] Connection refused
'feature_name') == 'n_actions_per_category_id#O#1204.0').head() features_names.where(col(
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[42], line 1 ----> 1 features_names.where(col('feature_name') == 'n_actions_per_category_id#O#1204.0').head() NameError: name 'features_names' is not defined
from scipy.sparse import csr_matrix
import numpy as np
= csr_data['row_new'].values - 1
rows = csr_data['col_new'].values - 1
cols = csr_data['value'].values
vals
= csr_matrix((vals, (rows, cols))) X_csr
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[43], line 4 1 from scipy.sparse import csr_matrix 2 import numpy as np ----> 4 rows = csr_data['row_new'].values - 1 5 cols = csr_data['col_new'].values - 1 6 vals = csr_data['value'].values NameError: name 'csr_data' is not defined
X_csr.shape
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[44], line 1 ----> 1 X_csr.shape NameError: name 'X_csr' is not defined
X_csr.shape, X_csr.nnz
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[45], line 1 ----> 1 X_csr.shape, X_csr.nnz NameError: name 'X_csr' is not defined
/ (152347 * 92) X_csr.nnz
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[46], line 1 ----> 1 X_csr.nnz / (152347 * 92) NameError: name 'X_csr' is not defined
# The label vector. Let's make it dense, flat and binary
= np.array(X_csr[:, 1].todense()).ravel()
y = np.array(y > 0, dtype=np.int64) y
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[47], line 2 1 # The label vector. Let's make it dense, flat and binary ----> 2 y = np.array(X_csr[:, 1].todense()).ravel() 3 y = np.array(y > 0, dtype=np.int64) NameError: name 'X_csr' is not defined
X_csr.shape
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[48], line 1 ----> 1 X_csr.shape NameError: name 'X_csr' is not defined
# We remove the second and fourth column.
# It actually contain the label we'll want to predict.
= list(range(92))
kept_cols 1)
kept_cols.pop(2)
kept_cols.pop(= X_csr[:, kept_cols] X
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[49], line 6 4 kept_cols.pop(1) 5 kept_cols.pop(2) ----> 6 X = X_csr[:, kept_cols] NameError: name 'X_csr' is not defined
X_csr.shape
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[50], line 1 ----> 1 X_csr.shape NameError: name 'X_csr' is not defined
Wow ! That was a lot of work. Now we have a features matrix \(X\) and a vector of labels \(y\).
X.indices
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[51], line 1 ----> 1 X.indices NameError: name 'X' is not defined
X.indptr
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[52], line 1 ----> 1 X.indptr NameError: name 'X' is not defined
X.shape, X.nnz
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[53], line 1 ----> 1 X.shape, X.nnz NameError: name 'X' is not defined
sum() y.shape, y.
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[54], line 1 ----> 1 y.shape, y.sum() NameError: name 'y' is not defined
from sklearn.preprocessing import MaxAbsScaler
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
# Normalize the features
= MaxAbsScaler().fit_transform(X)
X = train_test_split(X, y, stratify=y, test_size=0.3)
X_train, X_test, y_train, y_test
= LogisticRegression(
clf ='l2',
penalty=1e3,
C='lbfgs',
solver='balanced'
class_weight
) clf.fit(X_train, y_train)
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[55], line 6 3 from sklearn.linear_model import LogisticRegression 5 # Normalize the features ----> 6 X = MaxAbsScaler().fit_transform(X) 7 X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y, test_size=0.3) 9 clf = LogisticRegression( 10 penalty='l2', 11 C=1e3, 12 solver='lbfgs', 13 class_weight='balanced' 14 ) NameError: name 'X' is not defined
= features_names.toPandas()['feature_name'] features_names
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[56], line 1 ----> 1 features_names = features_names.toPandas()['feature_name'] NameError: name 'features_names' is not defined
import matplotlib.pyplot as plt
%matplotlib inline
=(16, 5))
plt.figure(figsize0], use_line_collection=True)
plt.stem(clf.coef_['Logistic regression coefficients', fontsize=18)
plt.title(# We change the fontsize of minor ticks label
= plt.xticks(np.arange(clf.coef_[0].shape[0]), features_names,
_ ='vertical', fontsize=8)
rotation= plt.yticks(fontsize=14) _
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[57], line 5 2 get_ipython().run_line_magic('matplotlib', 'inline') 4 plt.figure(figsize=(16, 5)) ----> 5 plt.stem(clf.coef_[0], use_line_collection=True) 6 plt.title('Logistic regression coefficients', fontsize=18) 7 # We change the fontsize of minor ticks label NameError: name 'clf' is not defined
<Figure size 1536x480 with 0 Axes>
from sklearn.metrics import precision_recall_curve, f1_score
= precision_recall_curve(y_test, clf.predict_proba(X_test)[:, 1])
precision, recall, _
=(8, 6))
plt.figure(figsize='LR (F1=%.2f)' % f1_score(y_test, clf.predict(X_test)), lw=2)
plt.plot(recall, precision, label0.0, 1.0])
plt.xlim([0.0, 1.05])
plt.ylim(['Recall', fontsize=16)
plt.xlabel('Precision', fontsize=16)
plt.ylabel('Precision/recall curve', fontsize=18)
plt.title(="upper right", fontsize=14) plt.legend(loc
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[58], line 3 1 from sklearn.metrics import precision_recall_curve, f1_score ----> 3 precision, recall, _ = precision_recall_curve(y_test, clf.predict_proba(X_test)[:, 1]) 5 plt.figure(figsize=(8, 6)) 6 plt.plot(recall, precision, label='LR (F1=%.2f)' % f1_score(y_test, clf.predict(X_test)), lw=2) NameError: name 'y_test' is not defined