Code
import os
import sys
'PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable os.environ[
import os
import sys
'PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable os.environ[
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
= SparkConf().setAppName("Spark SQL Course")
conf = SparkContext(conf=conf) # no need for Spark 3...
sc
= (SparkSession
spark
.builder"Spark SQL Course")
.appName(
.getOrCreate() )
25/01/18 14:18:52 WARN Utils: Your hostname, boucheron-Precision-5480 resolves to a loopback address: 127.0.1.1; using 192.168.10.120 instead (on interface wlp0s20f3)
25/01/18 14:18:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/18 14:18:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
= spark._sc sc
= sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) rdd
lambda a, b: a + b).collect() rdd.reduceByKey(
[Stage 0:> (0 + 20) / 20][Stage 0:================================================> (17 + 3) / 20]
[('b', 1), ('a', 2)]
import requests, zipfile, io
from pathlib import Path
= Path('webdata.parquet')
path if not path.exists():
= "https://stephanegaiffas.github.io/big_data_course/data/webdata.parquet.zip"
url = requests.get(url)
r = zipfile.ZipFile(io.BytesIO(r.content))
z ='./') z.extractall(path
--------------------------------------------------------------------------- BadZipFile Traceback (most recent call last) Cell In[6], line 8 6 url = "https://stephanegaiffas.github.io/big_data_course/data/webdata.parquet.zip" 7 r = requests.get(url) ----> 8 z = zipfile.ZipFile(io.BytesIO(r.content)) 9 z.extractall(path='./') File /usr/lib/python3.12/zipfile/__init__.py:1349, in ZipFile.__init__(self, file, mode, compression, allowZip64, compresslevel, strict_timestamps, metadata_encoding) 1347 try: 1348 if mode == 'r': -> 1349 self._RealGetContents() 1350 elif mode in ('w', 'x'): 1351 # set the modified flag so central directory gets written 1352 # even if no files are added to the archive 1353 self._didModify = True File /usr/lib/python3.12/zipfile/__init__.py:1416, in ZipFile._RealGetContents(self) 1414 raise BadZipFile("File is not a zip file") 1415 if not endrec: -> 1416 raise BadZipFile("File is not a zip file") 1417 if self.debug > 1: 1418 print(endrec) BadZipFile: File is not a zip file
= './'
input_path
= os.path.join(input_path, 'webdata.parquet')
input_file = spark.read.parquet(input_file) df
--------------------------------------------------------------------------- AnalysisException Traceback (most recent call last) Cell In[7], line 4 1 input_path = './' 3 input_file = os.path.join(input_path, 'webdata.parquet') ----> 4 df = spark.read.parquet(input_file) File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:544, in DataFrameReader.parquet(self, *paths, **options) 533 int96RebaseMode = options.get("int96RebaseMode", None) 534 self._set_opts( 535 mergeSchema=mergeSchema, 536 pathGlobFilter=pathGlobFilter, (...) 541 int96RebaseMode=int96RebaseMode, 542 ) --> 544 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw) 181 converted = convert_exception(e.java_exception) 182 if not isinstance(converted, UnknownException): 183 # Hide where the exception came from that shows a non-Pythonic 184 # JVM exception message. --> 185 raise converted from None 186 else: 187 raise AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/boucheron/Documents/IFEBY310/core/notebooks/webdata.parquet.
6) df.head(
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[8], line 1 ----> 1 df.head(6) NameError: name 'df' is not defined
df.describe()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[9], line 1 ----> 1 df.describe() NameError: name 'df' is not defined
df.count()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[10], line 1 ----> 1 df.count() NameError: name 'df' is not defined
First we need to import some things
from pyspark.sql import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql.functions import col, lit
'xid').distinct().count() df.select(
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[12], line 1 ----> 1 df.select('xid').distinct().count() NameError: name 'df' is not defined
= Window.partitionBy('xid')
xid_partition = func.count(col('action')).over(xid_partition)
n_events = df.withColumn('n_events', n_events)
df =2) df.head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[13], line 3 1 xid_partition = Window.partitionBy('xid') 2 n_events = func.count(col('action')).over(xid_partition) ----> 3 df = df.withColumn('n_events', n_events) 4 df.head(n=2) NameError: name 'df' is not defined
'xid').agg(func.count('action')).head(5) df.groupBy(
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[14], line 1 ----> 1 df.groupBy('xid').agg(func.count('action')).head(5) NameError: name 'df' is not defined
= Window.partitionBy('xid')
xid_partition = func.max(col('date')).over(xid_partition)
max_date = func.datediff(func.current_date(), max_date)
n_days_since_last_event = df.withColumn('n_days_since_last_event',
df
n_days_since_last_event)=2) df.head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[15], line 4 2 max_date = func.max(col('date')).over(xid_partition) 3 n_days_since_last_event = func.datediff(func.current_date(), max_date) ----> 4 df = df.withColumn('n_days_since_last_event', 5 n_days_since_last_event) 6 df.head(n=2) NameError: name 'df' is not defined
= Window.partitionBy('xid', 'device')
xid_device_partition = func.count(col('action')).over(xid_device_partition)
n_events_per_device = df.withColumn('n_events_per_device', n_events_per_device)
df =2) df.head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[16], line 3 1 xid_device_partition = Window.partitionBy('xid', 'device') 2 n_events_per_device = func.count(col('action')).over(xid_device_partition) ----> 3 df = df.withColumn('n_events_per_device', n_events_per_device) 4 df.head(n=2) NameError: name 'df' is not defined
= Window.partitionBy('xid')
xid_partition = func.dense_rank().over(xid_partition.orderBy('device'))
rank_device = func.last(rank_device).over(xid_partition)
n_unique_device = df.withColumn('n_device', n_unique_device)
df =2) df.head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[17], line 4 2 rank_device = func.dense_rank().over(xid_partition.orderBy('device')) 3 n_unique_device = func.last(rank_device).over(xid_partition) ----> 4 df = df.withColumn('n_device', n_unique_device) 5 df.head(n=2) NameError: name 'df' is not defined
\
df'n_device') > 1)\
.where(col('xid', 'device', 'n_events', 'n_device', 'n_events_per_device')\
.select(=8) .head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[18], line 1 ----> 1 df\ 2 .where(col('n_device') > 1)\ 3 .select('xid', 'device', 'n_events', 'n_device', 'n_events_per_device')\ 4 .head(n=8) NameError: name 'df' is not defined
We construct a ETL (Extract Transform Load) process on this data using the pyspark.sql
API.
Extraction is easy here, it’s just about reading the data
= spark.read.parquet(input_file)
df =3) df.head(n
--------------------------------------------------------------------------- AnalysisException Traceback (most recent call last) Cell In[19], line 1 ----> 1 df = spark.read.parquet(input_file) 2 df.head(n=3) File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:544, in DataFrameReader.parquet(self, *paths, **options) 533 int96RebaseMode = options.get("int96RebaseMode", None) 534 self._set_opts( 535 mergeSchema=mergeSchema, 536 pathGlobFilter=pathGlobFilter, (...) 541 int96RebaseMode=int96RebaseMode, 542 ) --> 544 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw) 181 converted = convert_exception(e.java_exception) 182 if not isinstance(converted, UnknownException): 183 # Hide where the exception came from that shows a non-Pythonic 184 # JVM exception message. --> 185 raise converted from None 186 else: 187 raise AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/boucheron/Documents/IFEBY310/core/notebooks/webdata.parquet.
At this step we compute a lot of extra things from the data. The aim is to build features that describe users.
def n_events_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.count(col('action')).over(xid_partition)
n_events = df.withColumn('n_events', n_events)
df return df
def n_events_per_action_transformer(df):
= Window.partitionBy('xid', 'action')
xid_action_partition = func.count(col('action')).over(xid_action_partition)
n_events_per_action = df.withColumn('n_events_per_action', n_events_per_action)
df return df
def hour_transformer(df):
= func.hour(col('date'))
hour = df.withColumn('hour', hour)
df return df
def weekday_transformer(df):
= func.date_format(col('date'), 'EEEE')
weekday = df.withColumn('weekday', weekday)
df return df
def n_events_per_hour_transformer(df):
= Window.partitionBy('xid', 'hour')
xid_hour_partition = func.count(col('action')).over(xid_hour_partition)
n_events_per_hour = df.withColumn('n_events_per_hour', n_events_per_hour)
df return df
def n_events_per_weekday_transformer(df):
= Window.partitionBy('xid', 'weekday')
xid_weekday_partition = func.count(col('action')).over(xid_weekday_partition)
n_events_per_weekday = df.withColumn('n_events_per_weekday', n_events_per_weekday)
df return df
def n_days_since_last_event_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.max(col('date')).over(xid_partition)
max_date = func.datediff(func.current_date(), max_date)
n_days_since_last_event = df.withColumn('n_days_since_last_event',
df + lit(0.1))
n_days_since_last_event return df
def n_days_since_last_action_transformer(df):
= Window.partitionBy('xid', 'action')
xid_partition_action = func.max(col('date')).over(xid_partition_action)
max_date = func.datediff(func.current_date(),
n_days_since_last_action
max_date)= df.withColumn('n_days_since_last_action',
df + lit(0.1))
n_days_since_last_action return df
def n_unique_day_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.dayofyear(col('date'))
dayofyear = func.dense_rank().over(xid_partition.orderBy(dayofyear))
rank_day = func.last(rank_day).over(xid_partition)
n_unique_day = df.withColumn('n_unique_day', n_unique_day)
df return df
def n_unique_hour_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.dense_rank().over(xid_partition.orderBy('hour'))
rank_hour = func.last(rank_hour).over(xid_partition)
n_unique_hour = df.withColumn('n_unique_hour', n_unique_hour)
df return df
def n_events_per_device_transformer(df):
= Window.partitionBy('xid', 'device')
xid_device_partition = func.count(func.col('device')) \
n_events_per_device
.over(xid_device_partition)= df.withColumn('n_events_per_device', n_events_per_device)
df return df
def n_unique_device_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.dense_rank().over(xid_partition.orderBy('device'))
rank_device = func.last(rank_device).over(xid_partition)
n_unique_device = df.withColumn('n_device', n_unique_device)
df return df
def n_actions_per_category_id_transformer(df):
= Window.partitionBy('xid', 'category_id',
xid_category_id_partition 'action')
= func.count(func.col('action')) \
n_actions_per_category_id
.over(xid_category_id_partition)= df.withColumn('n_actions_per_category_id', n_actions_per_category_id)
df return df
def n_unique_category_id_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.dense_rank().over(xid_partition\
rank_category_id 'category_id'))
.orderBy(= func.last(rank_category_id).over(xid_partition)
n_unique_category_id = df.withColumn('n_unique_category_id', n_unique_category_id)
df return df
def n_events_per_category_id_transformer(df):
= Window.partitionBy('xid', 'category_id')
xid_category_id_partition = func.count(func.col('action')) \
n_events_per_category_id
.over(xid_category_id_partition)= df.withColumn('n_events_per_category_id', n_events_per_category_id)
df return df
def n_events_per_website_id_transformer(df):
= Window.partitionBy('xid', 'website_id')
xid_website_id_partition = func.count(col('action'))\
n_events_per_website_id
.over(xid_website_id_partition)= df.withColumn('n_events_per_website_id', n_events_per_website_id)
df return df
= [
transformers
hour_transformer,
weekday_transformer,
n_events_per_hour_transformer,
n_events_per_weekday_transformer,
n_days_since_last_event_transformer,
n_days_since_last_action_transformer,
n_unique_day_transformer,
n_unique_hour_transformer,
n_events_per_device_transformer,
n_unique_device_transformer,
n_actions_per_category_id_transformer,
n_events_per_category_id_transformer,
n_events_per_website_id_transformer,
]
for transformer in transformers:
= transformer(df)
df
=1) df.head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[21], line 18 1 transformers = [ 2 hour_transformer, 3 weekday_transformer, (...) 14 n_events_per_website_id_transformer, 15 ] 17 for transformer in transformers: ---> 18 df = transformer(df) 20 df.head(n=1) NameError: name 'df' is not defined
sorted(df.columns)
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[22], line 1 ----> 1 sorted(df.columns) NameError: name 'df' is not defined
Here, we use all the previous computations (saved in the columns of the dataframe) to compute aggregated informations about each user.
def n_events_per_hour_loader(df):
= df\
csr 'xid', 'hour', 'n_events_per_hour')\
.select('n_events_per_hour', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_hour#'), col('hour'))
feature_name = csr\
csr 'feature_name', feature_name)\
.withColumn('hour')
.drop(return csr
def n_events_per_website_id_loader(df):
= df.select('xid', 'website_id', 'n_events_per_website_id')\
csr 'n_events_per_hour', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_website_id#'),
feature_name 'website_id'))
col(= csr\
csr 'feature_name', feature_name)\
.withColumn('website_id')
.drop(return csr
def n_events_per_hour_loader(df):
= df\
csr 'xid', 'hour', 'n_events_per_hour')\
.select('n_events_per_hour', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_hour#'), col('hour'))
feature_name = csr\
csr 'feature_name', feature_name)\
.withColumn('hour')
.drop(return csr
def n_events_per_weekday_loader(df):
= df\
csr 'xid', 'weekday', 'n_events_per_weekday')\
.select('n_events_per_weekday', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_weekday#'), col('weekday'))
feature_name = csr\
csr 'feature_name', feature_name)\
.withColumn('weekday')
.drop(return csr
def n_days_since_last_event_loader(df):
= df.select('xid', 'n_days_since_last_event')\
csr 'n_days_since_last_event#', 'value')\
.withColumnRenamed(
.distinct()= lit('n_days_since_last_event')
feature_name = csr\
csr 'feature_name', feature_name)
.withColumn(return csr
def n_days_since_last_action_loader(df):
= df.select('xid', 'action', 'n_days_since_last_action')\
csr 'n_days_since_last_action', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_days_since_last_action#'), col('action'))
feature_name = csr\
csr 'feature_name', feature_name)\
.withColumn('action')
.drop(return csr
def n_unique_day_loader(df):
= df.select('xid', 'n_unique_day')\
csr 'n_unique_day', 'value')\
.withColumnRenamed(
.distinct()= lit('n_unique_day')
feature_name = csr\
csr 'feature_name', feature_name)
.withColumn(return csr
def n_unique_hour_loader(df):
= df.select('xid', 'n_unique_hour')\
csr 'n_unique_hour', 'value')\
.withColumnRenamed(
.distinct()= lit('n_unique_hour')
feature_name = csr\
csr 'feature_name', feature_name)
.withColumn(return csr
def n_events_per_device_loader(df):
= df\
csr 'xid', 'device', 'n_events_per_device')\
.select('n_events_per_device', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_device#'), col('device'))
feature_name = csr\
csr 'feature_name', feature_name)\
.withColumn('device')
.drop(return csr
def n_unique_device_loader(df):
= df.select('xid', 'n_device')\
csr 'n_device', 'value')\
.withColumnRenamed(
.distinct()= lit('n_device')
feature_name = csr\
csr 'feature_name', feature_name)
.withColumn(return csr
def n_events_per_category_id_loader(df):
= df.select('xid', 'category_id', 'n_events_per_category_id')\
csr 'n_events_per_category_id', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_category_id#'),
feature_name 'category_id'))
col(= csr\
csr 'feature_name', feature_name)\
.withColumn('category_id')
.drop(return csr
def n_actions_per_category_id_loader(df):
= df.select('xid', 'category_id', 'action', 'n_actions_per_category_id')\
csr 'n_actions_per_category_id', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_actions_per_category_id#'),
feature_name 'action'), lit('#'),
col('category_id'))
col(= csr\
csr 'feature_name', feature_name)\
.withColumn('category_id')\
.drop('action')
.drop(return csr
def n_events_per_website_id_loader(df):
= df.select('xid', 'website_id', 'n_events_per_website_id')\
csr 'n_events_per_website_id', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_website_id#'),
feature_name 'website_id'))
col(= csr\
csr 'feature_name', feature_name)\
.withColumn('website_id')
.drop(return csr
from functools import reduce
= [
loaders
n_events_per_hour_loader,
n_events_per_website_id_loader,
n_events_per_hour_loader,
n_events_per_weekday_loader,
n_days_since_last_event_loader,
n_days_since_last_action_loader,
n_unique_day_loader,
n_unique_hour_loader,
n_events_per_device_loader,
n_unique_device_loader,
n_events_per_category_id_loader,
n_actions_per_category_id_loader,
n_events_per_website_id_loader,
]
def union(df, other):
return df.union(other)
= reduce(
csr lambda df1, df2: df1.union(df2),
for loader in loaders]
[loader(df)
)
=3) csr.head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[24], line 24 19 def union(df, other): 20 return df.union(other) 22 csr = reduce( 23 lambda df1, df2: df1.union(df2), ---> 24 [loader(df) for loader in loaders] 25 ) 27 csr.head(n=3) NameError: name 'df' is not defined
csr.columns
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[25], line 1 ----> 1 csr.columns NameError: name 'csr' is not defined
csr.count()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[26], line 1 ----> 1 csr.count() NameError: name 'csr' is not defined
# Replace features names and xid by a unique number
= Window().orderBy('feature_name')
feature_name_partition = Window().orderBy('xid')
xid_partition
= func.dense_rank().over(feature_name_partition)
col_idx = func.dense_rank().over(xid_partition)
row_idx
= csr.withColumn('col', col_idx)\
csr 'row', row_idx)
.withColumn(
= csr.na.drop('any')
csr
=5) csr.head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[27], line 8 5 col_idx = func.dense_rank().over(feature_name_partition) 6 row_idx = func.dense_rank().over(xid_partition) ----> 8 csr = csr.withColumn('col', col_idx)\ 9 .withColumn('row', row_idx) 11 csr = csr.na.drop('any') 13 csr.head(n=5) NameError: name 'csr' is not defined
# Let's save the result of our hard work into a new parquet file
= './'
output_path = os.path.join(output_path, 'csr.parquet')
output_file ='overwrite') csr.write.parquet(output_file, mode
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[28], line 4 2 output_path = './' 3 output_file = os.path.join(output_path, 'csr.parquet') ----> 4 csr.write.parquet(output_file, mode='overwrite') NameError: name 'csr' is not defined
= './'
csr_path = os.path.join(csr_path, 'csr.parquet')
csr_file
= spark.read.parquet(csr_file)
df =5) df.head(n
--------------------------------------------------------------------------- AnalysisException Traceback (most recent call last) Cell In[29], line 4 1 csr_path = './' 2 csr_file = os.path.join(csr_path, 'csr.parquet') ----> 4 df = spark.read.parquet(csr_file) 5 df.head(n=5) File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:544, in DataFrameReader.parquet(self, *paths, **options) 533 int96RebaseMode = options.get("int96RebaseMode", None) 534 self._set_opts( 535 mergeSchema=mergeSchema, 536 pathGlobFilter=pathGlobFilter, (...) 541 int96RebaseMode=int96RebaseMode, 542 ) --> 544 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw) 181 converted = convert_exception(e.java_exception) 182 if not isinstance(converted, UnknownException): 183 # Hide where the exception came from that shows a non-Pythonic 184 # JVM exception message. --> 185 raise converted from None 186 else: 187 raise AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/boucheron/Documents/IFEBY310/core/notebooks/csr.parquet.
df.count()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[30], line 1 ----> 1 df.count() NameError: name 'df' is not defined
# What are the features related to campaign_id 1204 ?
= \
features_names 'feature_name')\
df.select(\
.distinct()'feature_name'] .toPandas()[
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[31], line 3 1 # What are the features related to campaign_id 1204 ? 2 features_names = \ ----> 3 df.select('feature_name')\ 4 .distinct()\ 5 .toPandas()['feature_name'] NameError: name 'df' is not defined
features_names
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[32], line 1 ----> 1 features_names NameError: name 'features_names' is not defined
for feature_name in features_names if '1204' in feature_name] [feature_name
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[33], line 1 ----> 1 [feature_name for feature_name in features_names if '1204' in feature_name] NameError: name 'features_names' is not defined
# Look for the xid that have at least one exposure to campaign 1204
= func.when(
keep 'feature_name') == 'n_actions_per_category_id#C#1204.0') |
(col('feature_name') == 'n_actions_per_category_id#O#1204.0'),
(col(1).otherwise(0)
= df.withColumn('keep', keep)
df
'keep') > 0).count() df.where(col(
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[34], line 6 1 # Look for the xid that have at least one exposure to campaign 1204 2 keep = func.when( 3 (col('feature_name') == 'n_actions_per_category_id#C#1204.0') | 4 (col('feature_name') == 'n_actions_per_category_id#O#1204.0'), 5 1).otherwise(0) ----> 6 df = df.withColumn('keep', keep) 8 df.where(col('keep') > 0).count() NameError: name 'df' is not defined
# Sum of the keeps :)
= Window.partitionBy('xid')
xid_partition = func.sum(col('keep')).over(xid_partition)
sum_keep = df.withColumn('sum_keep', sum_keep) df
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[35], line 4 2 xid_partition = Window.partitionBy('xid') 3 sum_keep = func.sum(col('keep')).over(xid_partition) ----> 4 df = df.withColumn('sum_keep', sum_keep) NameError: name 'df' is not defined
# Let's keep the xid exposed to 1204
= df.where(col('sum_keep') > 0) df
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[36], line 2 1 # Let's keep the xid exposed to 1204 ----> 2 df = df.where(col('sum_keep') > 0) NameError: name 'df' is not defined
df.count()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[37], line 1 ----> 1 df.count() NameError: name 'df' is not defined
'xid').distinct().count() df.select(
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[38], line 1 ----> 1 df.select('xid').distinct().count() NameError: name 'df' is not defined
= Window().orderBy('row')
row_partition = Window().orderBy('col')
col_partition = func.dense_rank().over(row_partition)
row_new = func.dense_rank().over(col_partition)
col_new = df.withColumn('row_new', row_new)
df = df.withColumn('col_new', col_new)
df = df.select('row_new', 'col_new', 'value').toPandas() csr_data
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[39], line 5 3 row_new = func.dense_rank().over(row_partition) 4 col_new = func.dense_rank().over(col_partition) ----> 5 df = df.withColumn('row_new', row_new) 6 df = df.withColumn('col_new', col_new) 7 csr_data = df.select('row_new', 'col_new', 'value').toPandas() NameError: name 'df' is not defined
csr_data.head()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[40], line 1 ----> 1 csr_data.head() NameError: name 'csr_data' is not defined
= df.select('feature_name', 'col_new').distinct()
features_names 'feature_name') == 'n_actions_per_category_id#C#1204.0').head() features_names.where(col(
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[41], line 1 ----> 1 features_names = df.select('feature_name', 'col_new').distinct() 2 features_names.where(col('feature_name') == 'n_actions_per_category_id#C#1204.0').head() NameError: name 'df' is not defined
'feature_name') == 'n_actions_per_category_id#O#1204.0').head() features_names.where(col(
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[42], line 1 ----> 1 features_names.where(col('feature_name') == 'n_actions_per_category_id#O#1204.0').head() NameError: name 'features_names' is not defined
from scipy.sparse import csr_matrix
import numpy as np
= csr_data['row_new'].values - 1
rows = csr_data['col_new'].values - 1
cols = csr_data['value'].values
vals
= csr_matrix((vals, (rows, cols))) X_csr
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[43], line 4 1 from scipy.sparse import csr_matrix 2 import numpy as np ----> 4 rows = csr_data['row_new'].values - 1 5 cols = csr_data['col_new'].values - 1 6 vals = csr_data['value'].values NameError: name 'csr_data' is not defined
X_csr.shape
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[44], line 1 ----> 1 X_csr.shape NameError: name 'X_csr' is not defined
X_csr.shape, X_csr.nnz
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[45], line 1 ----> 1 X_csr.shape, X_csr.nnz NameError: name 'X_csr' is not defined
/ (152347 * 92) X_csr.nnz
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[46], line 1 ----> 1 X_csr.nnz / (152347 * 92) NameError: name 'X_csr' is not defined
# The label vector. Let's make it dense, flat and binary
= np.array(X_csr[:, 1].todense()).ravel()
y = np.array(y > 0, dtype=np.int64) y
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[47], line 2 1 # The label vector. Let's make it dense, flat and binary ----> 2 y = np.array(X_csr[:, 1].todense()).ravel() 3 y = np.array(y > 0, dtype=np.int64) NameError: name 'X_csr' is not defined
X_csr.shape
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[48], line 1 ----> 1 X_csr.shape NameError: name 'X_csr' is not defined
# We remove the second and fourth column.
# It actually contain the label we'll want to predict.
= list(range(92))
kept_cols 1)
kept_cols.pop(2)
kept_cols.pop(= X_csr[:, kept_cols] X
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[49], line 6 4 kept_cols.pop(1) 5 kept_cols.pop(2) ----> 6 X = X_csr[:, kept_cols] NameError: name 'X_csr' is not defined
X_csr.shape
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[50], line 1 ----> 1 X_csr.shape NameError: name 'X_csr' is not defined
Wow ! That was a lot of work. Now we have a features matrix \(X\) and a vector of labels \(y\).
X.indices
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[51], line 1 ----> 1 X.indices NameError: name 'X' is not defined
X.indptr
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[52], line 1 ----> 1 X.indptr NameError: name 'X' is not defined
X.shape, X.nnz
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[53], line 1 ----> 1 X.shape, X.nnz NameError: name 'X' is not defined
sum() y.shape, y.
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[54], line 1 ----> 1 y.shape, y.sum() NameError: name 'y' is not defined
from sklearn.preprocessing import MaxAbsScaler
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
# Normalize the features
= MaxAbsScaler().fit_transform(X)
X = train_test_split(X, y, stratify=y, test_size=0.3)
X_train, X_test, y_train, y_test
= LogisticRegression(
clf ='l2',
penalty=1e3,
C='lbfgs',
solver='balanced'
class_weight
) clf.fit(X_train, y_train)
--------------------------------------------------------------------------- ModuleNotFoundError Traceback (most recent call last) Cell In[55], line 1 ----> 1 from sklearn.preprocessing import MaxAbsScaler 2 from sklearn.model_selection import train_test_split 3 from sklearn.linear_model import LogisticRegression ModuleNotFoundError: No module named 'sklearn'
= features_names.toPandas()['feature_name'] features_names
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[56], line 1 ----> 1 features_names = features_names.toPandas()['feature_name'] NameError: name 'features_names' is not defined
import matplotlib.pyplot as plt
%matplotlib inline
=(16, 5))
plt.figure(figsize0], use_line_collection=True)
plt.stem(clf.coef_['Logistic regression coefficients', fontsize=18)
plt.title(# We change the fontsize of minor ticks label
= plt.xticks(np.arange(clf.coef_[0].shape[0]), features_names,
_ ='vertical', fontsize=8)
rotation= plt.yticks(fontsize=14) _
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[57], line 5 2 get_ipython().run_line_magic('matplotlib', 'inline') 4 plt.figure(figsize=(16, 5)) ----> 5 plt.stem(clf.coef_[0], use_line_collection=True) 6 plt.title('Logistic regression coefficients', fontsize=18) 7 # We change the fontsize of minor ticks label NameError: name 'clf' is not defined
<Figure size 1536x480 with 0 Axes>
from sklearn.metrics import precision_recall_curve, f1_score
= precision_recall_curve(y_test, clf.predict_proba(X_test)[:, 1])
precision, recall, _
=(8, 6))
plt.figure(figsize='LR (F1=%.2f)' % f1_score(y_test, clf.predict(X_test)), lw=2)
plt.plot(recall, precision, label0.0, 1.0])
plt.xlim([0.0, 1.05])
plt.ylim(['Recall', fontsize=16)
plt.xlabel('Precision', fontsize=16)
plt.ylabel('Precision/recall curve', fontsize=18)
plt.title(="upper right", fontsize=14) plt.legend(loc
--------------------------------------------------------------------------- ModuleNotFoundError Traceback (most recent call last) Cell In[58], line 1 ----> 1 from sklearn.metrics import precision_recall_curve, f1_score 3 precision, recall, _ = precision_recall_curve(y_test, clf.predict_proba(X_test)[:, 1]) 5 plt.figure(figsize=(8, 6)) ModuleNotFoundError: No module named 'sklearn'