Code
import os
import sys
'PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable os.environ[
pyspark
for data preprocessingThe data is a parquet
file which contains a dataframe with 8 columns:
xid
: unique user idaction
: type of action. ‘C’ is a click, ‘O’ or ‘VSL’ is a web-displaydate
: date of the actionwebsite_id
: unique id of the websiteurl
: url of the webpagecategory_id
: id of the displayzipcode
: postal zipcode of the userdevice
: type of device used by the userUsing pyspark.sql
we want to do the following things:
Then, we want to construct a classifier to predict the click on the category 1204. Here is an agenda for this:
import os
import sys
'PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable os.environ[
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
= (SparkSession
spark
.builder"Spark Webdata")
.appName(
.getOrCreate() )
25/01/18 14:18:40 WARN Utils: Your hostname, boucheron-Precision-5480 resolves to a loopback address: 127.0.1.1; using 192.168.10.120 instead (on interface wlp0s20f3)
25/01/18 14:18:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/18 14:18:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
import requests, zipfile, io
from pathlib import Path
= Path('webdata.parquet')
path if not path.exists():
= "https://stephanegaiffas.github.io/big_data_course/data/webdata.parquet.zip"
url = requests.get(url)
r = zipfile.ZipFile(io.BytesIO(r.content))
z ='./') z.extractall(path
--------------------------------------------------------------------------- BadZipFile Traceback (most recent call last) Cell In[3], line 8 6 url = "https://stephanegaiffas.github.io/big_data_course/data/webdata.parquet.zip" 7 r = requests.get(url) ----> 8 z = zipfile.ZipFile(io.BytesIO(r.content)) 9 z.extractall(path='./') File /usr/lib/python3.12/zipfile/__init__.py:1349, in ZipFile.__init__(self, file, mode, compression, allowZip64, compresslevel, strict_timestamps, metadata_encoding) 1347 try: 1348 if mode == 'r': -> 1349 self._RealGetContents() 1350 elif mode in ('w', 'x'): 1351 # set the modified flag so central directory gets written 1352 # even if no files are added to the archive 1353 self._didModify = True File /usr/lib/python3.12/zipfile/__init__.py:1416, in ZipFile._RealGetContents(self) 1414 raise BadZipFile("File is not a zip file") 1415 if not endrec: -> 1416 raise BadZipFile("File is not a zip file") 1417 if self.debug > 1: 1418 print(endrec) BadZipFile: File is not a zip file
= './'
input_path
= os.path.join(input_path, 'webdata.parquet')
input_file
= spark.read.parquet(input_file) df
--------------------------------------------------------------------------- AnalysisException Traceback (most recent call last) Cell In[4], line 5 1 input_path = './' 3 input_file = os.path.join(input_path, 'webdata.parquet') ----> 5 df = spark.read.parquet(input_file) File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:544, in DataFrameReader.parquet(self, *paths, **options) 533 int96RebaseMode = options.get("int96RebaseMode", None) 534 self._set_opts( 535 mergeSchema=mergeSchema, 536 pathGlobFilter=pathGlobFilter, (...) 541 int96RebaseMode=int96RebaseMode, 542 ) --> 544 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw) 181 converted = convert_exception(e.java_exception) 182 if not isinstance(converted, UnknownException): 183 # Hide where the exception came from that shows a non-Pythonic 184 # JVM exception message. --> 185 raise converted from None 186 else: 187 raise AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/boucheron/Documents/IFEBY310/core/notebooks/webdata.parquet.
We can also give a try to pyarrow.parquet
module to load the Parquet file in an Arrow table.
import pyarrow as pa
import comet as co
import pyarrow.parquet as pq
= pq.read_table(input_file) dfa
--------------------------------------------------------------------------- ModuleNotFoundError Traceback (most recent call last) Cell In[5], line 2 1 import pyarrow as pa ----> 2 import comet as co 3 import pyarrow.parquet as pq 5 dfa = pq.read_table(input_file) ModuleNotFoundError: No module named 'comet'
dfa.num_columns
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[6], line 1 ----> 1 dfa.num_columns NameError: name 'dfa' is not defined
Let us go back to the spark data frame
df.printSchema()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[7], line 1 ----> 1 df.printSchema() NameError: name 'df' is not defined
df.rdd.getNumPartitions()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[8], line 1 ----> 1 df.rdd.getNumPartitions() NameError: name 'df' is not defined
Explain the partition size.
df.rdd.toDebugString()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[9], line 1 ----> 1 df.rdd.toDebugString() NameError: name 'df' is not defined
First we need to import some things:
Window
classfrom pyspark.sql import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql.functions import col, lit
( 'xid')
df.select(
.distinct()
.count() )
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[11], line 2 1 ( ----> 2 df.select('xid') 3 .distinct() 4 .count() 5 ) NameError: name 'df' is not defined
def foo(x): yield len(set(x))
( df.rddmap(lambda x : x.xid)
.
.mapPartitions(foo)
.collect() )
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[13], line 1 ----> 1 ( df.rdd 2 .map(lambda x : x.xid) 3 .mapPartitions(foo) 4 .collect() 5 ) NameError: name 'df' is not defined
This might pump up some computational resources
( 'xid')
df.select(
.distinct()
.explain() )
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[14], line 2 1 ( ----> 2 df.select('xid') 3 .distinct() 4 .explain() 5 ) NameError: name 'df' is not defined
The distinct values of xid
seem to be evenly spread among the six files making the parquet
directory. Note that the last six partitions look empty.
= Window.partitionBy('xid')
xid_partition
= func.count(col('action')).over(xid_partition)
n_events
= df.withColumn('n_events', n_events)
df
=2) df.head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[15], line 5 1 xid_partition = Window.partitionBy('xid') 3 n_events = func.count(col('action')).over(xid_partition) ----> 5 df = df.withColumn('n_events', n_events) 7 df.head(n=2) NameError: name 'df' is not defined
(
df'xid')
.groupBy('action'))
.agg(func.count(5)
.head( )
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[16], line 2 1 ( ----> 2 df 3 .groupBy('xid') 4 .agg(func.count('action')) 5 .head(5) 6 ) NameError: name 'df' is not defined
= (
max_date
funcmax(col('date'))
.
.over(xid_partition)
)
= func.datediff(func.current_date(), max_date)
n_days_since_last_event
= df.withColumn('n_days_since_last_event',
df
n_days_since_last_event)
=2) df.head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[17], line 9 1 max_date = ( 2 func 3 .max(col('date')) 4 .over(xid_partition) 5 ) 7 n_days_since_last_event = func.datediff(func.current_date(), max_date) ----> 9 df = df.withColumn('n_days_since_last_event', 10 n_days_since_last_event) 12 df.head(n=2) NameError: name 'df' is not defined
df.printSchema()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[18], line 1 ----> 1 df.printSchema() NameError: name 'df' is not defined
Does this partitionBy
triggers shuffling?
= xid_partition.partitionBy('device')
xid_device_partition
= func.count(col('action')).over(xid_device_partition)
n_events_per_device
= df.withColumn('n_events_per_device', n_events_per_device)
df
=2) df.head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[19], line 5 1 xid_device_partition = xid_partition.partitionBy('device') 3 n_events_per_device = func.count(col('action')).over(xid_device_partition) ----> 5 df = df.withColumn('n_events_per_device', n_events_per_device) 7 df.head(n=2) NameError: name 'df' is not defined
# xid_partition = Window.partitionBy('xid')
= (
rank_device
func
.dense_rank()'device'))
.over(xid_partition.orderBy(
)
= (
n_unique_device
func
.last(rank_device)
.over(xid_partition)
)
= df.withColumn('n_device', n_unique_device)
df
=2) df.head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[20], line 15 3 rank_device = ( 4 func 5 .dense_rank() 6 .over(xid_partition.orderBy('device')) 7 ) 9 n_unique_device = ( 10 func 11 .last(rank_device) 12 .over(xid_partition) 13 ) ---> 15 df = df.withColumn('n_device', n_unique_device) 17 df.head(n=2) NameError: name 'df' is not defined
\
df'n_device') > 1)\
.where(col('xid', 'device', 'n_events', 'n_device', 'n_events_per_device')\
.select(=8) .head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[21], line 1 ----> 1 df\ 2 .where(col('n_device') > 1)\ 3 .select('xid', 'device', 'n_events', 'n_device', 'n_events_per_device')\ 4 .head(n=8) NameError: name 'df' is not defined
\
df'n_device') > 1)\
.where(col('xid', 'device', 'n_events', 'n_device', 'n_events_per_device')\
.select( .count()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[22], line 1 ----> 1 df\ 2 .where(col('n_device') > 1)\ 3 .select('xid', 'device', 'n_events', 'n_device', 'n_events_per_device')\ 4 .count() NameError: name 'df' is not defined
We construct a ETL (Extract Transform Load) process on this data using the pyspark.sql
API.
Here extraction is just about reading the data
= spark.read.parquet(input_file)
df =3) df.head(n
--------------------------------------------------------------------------- AnalysisException Traceback (most recent call last) Cell In[23], line 1 ----> 1 df = spark.read.parquet(input_file) 2 df.head(n=3) File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:544, in DataFrameReader.parquet(self, *paths, **options) 533 int96RebaseMode = options.get("int96RebaseMode", None) 534 self._set_opts( 535 mergeSchema=mergeSchema, 536 pathGlobFilter=pathGlobFilter, (...) 541 int96RebaseMode=int96RebaseMode, 542 ) --> 544 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw) 181 converted = convert_exception(e.java_exception) 182 if not isinstance(converted, UnknownException): 183 # Hide where the exception came from that shows a non-Pythonic 184 # JVM exception message. --> 185 raise converted from None 186 else: 187 raise AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/boucheron/Documents/IFEBY310/core/notebooks/webdata.parquet.
At this step we compute a lot of extra things from the data. The aim is to build features that describe users.
def n_events_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.count(col('action')).over(xid_partition)
n_events
= df.withColumn('n_events', n_events)
df
return df
def n_events_per_action_transformer(df):
= Window.partitionBy('xid', 'action')
xid_action_partition = func.count(col('action')).over(xid_action_partition)
n_events_per_action
= df.withColumn('n_events_per_action', n_events_per_action)
df
return df
def hour_transformer(df):
= func.hour(col('date'))
hour = df.withColumn('hour', hour)
df return df
def weekday_transformer(df):
= func.date_format(col('date'), 'EEEE')
weekday = df.withColumn('weekday', weekday)
df return df
def n_events_per_hour_transformer(df):
= Window.partitionBy('xid', 'hour')
xid_hour_partition = func.count(col('action')).over(xid_hour_partition)
n_events_per_hour = df.withColumn('n_events_per_hour', n_events_per_hour)
df return df
def n_events_per_weekday_transformer(df):
= Window.partitionBy('xid', 'weekday')
xid_weekday_partition = func.count(col('action')).over(xid_weekday_partition)
n_events_per_weekday = df.withColumn('n_events_per_weekday', n_events_per_weekday)
df return df
def n_days_since_last_event_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.max(col('date')).over(xid_partition)
max_date = func.datediff(func.current_date(), max_date)
n_days_since_last_event = df.withColumn('n_days_since_last_event',
df + lit(0.1))
n_days_since_last_event return df
def n_days_since_last_action_transformer(df):
= Window.partitionBy('xid', 'action')
xid_partition_action = func.max(col('date')).over(xid_partition_action)
max_date = func.datediff(func.current_date(),
n_days_since_last_action
max_date)= df.withColumn('n_days_since_last_action',
df + lit(0.1))
n_days_since_last_action return df
def n_unique_day_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.dayofyear(col('date'))
dayofyear = func.dense_rank().over(xid_partition.orderBy(dayofyear))
rank_day = func.last(rank_day).over(xid_partition)
n_unique_day = df.withColumn('n_unique_day', n_unique_day)
df return df
def n_unique_hour_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.dense_rank().over(xid_partition.orderBy('hour'))
rank_hour = func.last(rank_hour).over(xid_partition)
n_unique_hour = df.withColumn('n_unique_hour', n_unique_hour)
df return df
def n_events_per_device_transformer(df):
= Window.partitionBy('xid', 'device')
xid_device_partition = func.count(func.col('device')) \
n_events_per_device
.over(xid_device_partition)= df.withColumn('n_events_per_device', n_events_per_device)
df return df
def n_unique_device_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.dense_rank().over(xid_partition.orderBy('device'))
rank_device = func.last(rank_device).over(xid_partition)
n_unique_device = df.withColumn('n_device', n_unique_device)
df return df
def n_actions_per_category_id_transformer(df):
= Window.partitionBy('xid', 'category_id',
xid_category_id_partition 'action')
= func.count(func.col('action')) \
n_actions_per_category_id
.over(xid_category_id_partition)= df.withColumn('n_actions_per_category_id', n_actions_per_category_id)
df return df
def n_unique_category_id_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.dense_rank().over(xid_partition\
rank_category_id 'category_id'))
.orderBy(= func.last(rank_category_id).over(xid_partition)
n_unique_category_id = df.withColumn('n_unique_category_id', n_unique_category_id)
df return df
def n_events_per_category_id_transformer(df):
= Window.partitionBy('xid', 'category_id')
xid_category_id_partition = func.count(func.col('action')) \
n_events_per_category_id
.over(xid_category_id_partition)= df.withColumn('n_events_per_category_id', n_events_per_category_id)
df return df
def n_events_per_website_id_transformer(df):
= Window.partitionBy('xid', 'website_id')
xid_website_id_partition = func.count(col('action'))\
n_events_per_website_id
.over(xid_website_id_partition)= df.withColumn('n_events_per_website_id', n_events_per_website_id)
df return df
= [
transformers
hour_transformer,
weekday_transformer,
n_events_per_hour_transformer,
n_events_per_weekday_transformer,
n_days_since_last_event_transformer,
n_days_since_last_action_transformer,
n_unique_day_transformer,
n_unique_hour_transformer,
n_events_per_device_transformer,
n_unique_device_transformer,
n_actions_per_category_id_transformer,
n_events_per_category_id_transformer,
n_events_per_website_id_transformer, ]
= 10000 N
= df.sample(withReplacement=False, fraction=.05) sample_df
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[29], line 1 ----> 1 sample_df = df.sample(withReplacement=False, fraction=.05) NameError: name 'df' is not defined
sample_df.count()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[30], line 1 ----> 1 sample_df.count() NameError: name 'sample_df' is not defined
for transformer in transformers:
= transformer(df)
df
=1) df.head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[31], line 2 1 for transformer in transformers: ----> 2 df = transformer(df) 4 df.head(n=1) NameError: name 'df' is not defined
for transformer in transformers:
= transformer(sample_df)
sample_df
=1) sample_df.head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[32], line 2 1 for transformer in transformers: ----> 2 sample_df = transformer(sample_df) 4 sample_df.head(n=1) NameError: name 'sample_df' is not defined
= sample_df df
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[33], line 1 ----> 1 df = sample_df NameError: name 'sample_df' is not defined
sorted(df.columns)
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[34], line 1 ----> 1 sorted(df.columns) NameError: name 'df' is not defined
df.explain()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[35], line 1 ----> 1 df.explain() NameError: name 'df' is not defined
".")
spark._sc.setCheckpointDir(
df.checkpoint()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[36], line 3 1 spark._sc.setCheckpointDir(".") ----> 3 df.checkpoint() NameError: name 'df' is not defined
df.explain()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[37], line 1 ----> 1 df.explain() NameError: name 'df' is not defined
Here, we use all the previous computations (saved in the columns of the dataframe) to compute aggregated informations about each user.
This should be DRYED
def n_events_per_hour_loader(df):
= df\
csr 'xid', 'hour', 'n_events_per_hour')\
.select('n_events_per_hour', 'value')\
.withColumnRenamed(
.distinct() # action
= func.concat(lit('n_events_per_hour#'), col('hour'))
feature_name
= csr\
csr 'feature_name', feature_name)\
.withColumn('hour')
.drop(return csr
def n_events_per_website_id_loader(df):
= df.select('xid', 'website_id', 'n_events_per_website_id')\
csr 'n_events_per_hour', 'value')\
.withColumnRenamed(
.distinct()
= func.concat(lit('n_events_per_website_id#'),
feature_name 'website_id'))
col(
= csr\
csr 'feature_name', feature_name)\
.withColumn('website_id')
.drop(return csr
def n_events_per_hour_loader(df):
= df\
csr 'xid', 'hour', 'n_events_per_hour')\
.select('n_events_per_hour', 'value')\
.withColumnRenamed(
.distinct()
= func.concat(lit('n_events_per_hour#'), col('hour'))
feature_name
= csr\
csr 'feature_name', feature_name)\
.withColumn('hour')
.drop(return csr
def n_events_per_weekday_loader(df):
= df\
csr 'xid', 'weekday', 'n_events_per_weekday')\
.select('n_events_per_weekday', 'value')\
.withColumnRenamed(
.distinct()
= func.concat(lit('n_events_per_weekday#'), col('weekday'))
feature_name
= csr\
csr 'feature_name', feature_name)\
.withColumn('weekday')
.drop(
return csr
def n_days_since_last_event_loader(df):
= df.select('xid', 'n_days_since_last_event')\
csr 'n_days_since_last_event', 'value')\
.withColumnRenamed(
.distinct()= lit('n_days_since_last_event')
feature_name = csr\
csr 'feature_name', feature_name)
.withColumn(return csr
def n_days_since_last_action_loader(df):
= df.select('xid', 'action', 'n_days_since_last_action')\
csr 'n_days_since_last_action', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_days_since_last_action#'), col('action'))
feature_name = csr\
csr 'feature_name', feature_name)\
.withColumn('action')
.drop(return csr
def n_unique_day_loader(df):
= df.select('xid', 'n_unique_day')\
csr 'n_unique_day', 'value')\
.withColumnRenamed(
.distinct()= lit('n_unique_day')
feature_name = csr\
csr 'feature_name', feature_name)
.withColumn(return csr
def n_unique_hour_loader(df):
= df.select('xid', 'n_unique_hour')\
csr 'n_unique_hour', 'value')\
.withColumnRenamed(
.distinct()= lit('n_unique_hour')
feature_name = csr\
csr 'feature_name', feature_name)
.withColumn(return csr
def n_events_per_device_loader(df):
= df\
csr 'xid', 'device', 'n_events_per_device')\
.select('n_events_per_device', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_device#'), col('device'))
feature_name = csr\
csr 'feature_name', feature_name)\
.withColumn('device')
.drop(return csr
def n_unique_device_loader(df):
= df.select('xid', 'n_device')\
csr 'n_device', 'value')\
.withColumnRenamed(
.distinct()= lit('n_device')
feature_name = csr\
csr 'feature_name', feature_name)
.withColumn(return csr
def n_events_per_category_id_loader(df):
= df.select('xid', 'category_id', 'n_events_per_category_id')\
csr 'n_events_per_category_id', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_category_id#'),
feature_name 'category_id'))
col(= csr\
csr 'feature_name', feature_name)\
.withColumn('category_id')
.drop(return csr
def n_actions_per_category_id_loader(df):
= df.select('xid', 'category_id', 'action', 'n_actions_per_category_id')\
csr 'n_actions_per_category_id', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_actions_per_category_id#'),
feature_name 'action'), lit('#'),
col('category_id'))
col(= csr\
csr 'feature_name', feature_name)\
.withColumn('category_id')\
.drop('action')
.drop(return csr
def n_events_per_website_id_loader(df):
= df.select('xid', 'website_id', 'n_events_per_website_id')\
csr 'n_events_per_website_id', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_website_id#'),
feature_name 'website_id'))
col(= csr\
csr 'feature_name', feature_name)\
.withColumn('website_id')
.drop(return csr
from functools import reduce
= [
loaders
n_events_per_hour_loader,
n_events_per_website_id_loader,
n_events_per_hour_loader,
n_events_per_weekday_loader,
n_days_since_last_event_loader,
n_days_since_last_action_loader,
n_unique_day_loader,
n_unique_hour_loader,
n_events_per_device_loader,
n_unique_device_loader,
n_events_per_category_id_loader,
n_actions_per_category_id_loader,
n_events_per_website_id_loader, ]
def union(df, other):
return df.union(other)
This method performs a SQL-style set union of the rows from both DataFrame objects, with no automatic deduplication of elements.
Use the distinct() method to perform deduplication of rows.
The method resolves columns by position (not by name), following the standard behavior in SQL.
= [loader(df) for loader in loaders] spam
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[42], line 1 ----> 1 spam = [loader(df) for loader in loaders] NameError: name 'df' is not defined
0].printSchema() spam[
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[43], line 1 ----> 1 spam[0].printSchema() NameError: name 'spam' is not defined
all(spam[0].columns == it.columns for it in spam[1:])
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[44], line 1 ----> 1 all(spam[0].columns == it.columns for it in spam[1:]) NameError: name 'spam' is not defined
len(spam)
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[45], line 1 ----> 1 len(spam) NameError: name 'spam' is not defined
= reduce(
csr lambda df1, df2: df1.union(df2),
spam
)
=3) csr.head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[46], line 3 1 csr = reduce( 2 lambda df1, df2: df1.union(df2), ----> 3 spam 4 ) 6 csr.head(n=3) NameError: name 'spam' is not defined
csr.columns
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[47], line 1 ----> 1 csr.columns NameError: name 'csr' is not defined
5) csr.show(
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[48], line 1 ----> 1 csr.show(5) NameError: name 'csr' is not defined
csr.rdd.getNumPartitions()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[49], line 1 ----> 1 csr.rdd.getNumPartitions() NameError: name 'csr' is not defined
# Replace features names and xid by a unique number
= Window().orderBy('feature_name')
feature_name_partition
= Window().orderBy('xid')
xid_partition
= func.dense_rank().over(feature_name_partition)
col_idx = func.dense_rank().over(xid_partition) row_idx
= csr.withColumn('col', col_idx)\
csr 'row', row_idx)
.withColumn(
= csr.na.drop('any')
csr
=5) csr.head(n
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[51], line 1 ----> 1 csr = csr.withColumn('col', col_idx)\ 2 .withColumn('row', row_idx) 4 csr = csr.na.drop('any') 6 csr.head(n=5) NameError: name 'csr' is not defined
# Let's save the result of our hard work into a new parquet file
= './'
output_path = os.path.join(output_path, 'csr.parquet')
output_file ='overwrite') csr.write.parquet(output_file, mode
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[52], line 4 2 output_path = './' 3 output_file = os.path.join(output_path, 'csr.parquet') ----> 4 csr.write.parquet(output_file, mode='overwrite') NameError: name 'csr' is not defined
= './'
csr_path = os.path.join(csr_path, 'csr.parquet')
csr_file
= spark.read.parquet(csr_file)
df =5) df.head(n
--------------------------------------------------------------------------- AnalysisException Traceback (most recent call last) Cell In[53], line 4 1 csr_path = './' 2 csr_file = os.path.join(csr_path, 'csr.parquet') ----> 4 df = spark.read.parquet(csr_file) 5 df.head(n=5) File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/sql/readwriter.py:544, in DataFrameReader.parquet(self, *paths, **options) 533 int96RebaseMode = options.get("int96RebaseMode", None) 534 self._set_opts( 535 mergeSchema=mergeSchema, 536 pathGlobFilter=pathGlobFilter, (...) 541 int96RebaseMode=int96RebaseMode, 542 ) --> 544 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw) 181 converted = convert_exception(e.java_exception) 182 if not isinstance(converted, UnknownException): 183 # Hide where the exception came from that shows a non-Pythonic 184 # JVM exception message. --> 185 raise converted from None 186 else: 187 raise AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/boucheron/Documents/IFEBY310/core/notebooks/csr.parquet.
df.count()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[54], line 1 ----> 1 df.count() NameError: name 'df' is not defined
# What are the features related to campaign_id 1204 ?
= \
features_names 'feature_name')\
df.select(\
.distinct()'feature_name'] .toPandas()[
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[55], line 3 1 # What are the features related to campaign_id 1204 ? 2 features_names = \ ----> 3 df.select('feature_name')\ 4 .distinct()\ 5 .toPandas()['feature_name'] NameError: name 'df' is not defined
features_names
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[56], line 1 ----> 1 features_names NameError: name 'features_names' is not defined
for feature_name in features_names if '1204' in feature_name] [feature_name
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[57], line 1 ----> 1 [feature_name for feature_name in features_names if '1204' in feature_name] NameError: name 'features_names' is not defined
# Look for the xid that have at least one exposure to campaign 1204
= func.when(
keep 'feature_name') == 'n_actions_per_category_id#C#1204.0') |
(col('feature_name') == 'n_actions_per_category_id#O#1204.0'),
(col(1).otherwise(0)
= df.withColumn('keep', keep)
df
'keep') > 0).count() df.where(col(
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[58], line 6 1 # Look for the xid that have at least one exposure to campaign 1204 2 keep = func.when( 3 (col('feature_name') == 'n_actions_per_category_id#C#1204.0') | 4 (col('feature_name') == 'n_actions_per_category_id#O#1204.0'), 5 1).otherwise(0) ----> 6 df = df.withColumn('keep', keep) 8 df.where(col('keep') > 0).count() NameError: name 'df' is not defined
# Sum of the keeps :)
= Window.partitionBy('xid')
xid_partition = func.sum(col('keep')).over(xid_partition)
sum_keep = df.withColumn('sum_keep', sum_keep) df
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[59], line 4 2 xid_partition = Window.partitionBy('xid') 3 sum_keep = func.sum(col('keep')).over(xid_partition) ----> 4 df = df.withColumn('sum_keep', sum_keep) NameError: name 'df' is not defined
# Let's keep the xid exposed to 1204
= df.where(col('sum_keep') > 0) df
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[60], line 2 1 # Let's keep the xid exposed to 1204 ----> 2 df = df.where(col('sum_keep') > 0) NameError: name 'df' is not defined
df.count()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[61], line 1 ----> 1 df.count() NameError: name 'df' is not defined
'xid').distinct().count() df.select(
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[62], line 1 ----> 1 df.select('xid').distinct().count() NameError: name 'df' is not defined
= Window().orderBy('row')
row_partition = Window().orderBy('col')
col_partition
= func.dense_rank().over(row_partition)
row_new = func.dense_rank().over(col_partition)
col_new
= df.withColumn('row_new', row_new)
df = df.withColumn('col_new', col_new)
df
= df.select('row_new', 'col_new', 'value').toPandas() csr_data
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[63], line 7 4 row_new = func.dense_rank().over(row_partition) 5 col_new = func.dense_rank().over(col_partition) ----> 7 df = df.withColumn('row_new', row_new) 8 df = df.withColumn('col_new', col_new) 10 csr_data = df.select('row_new', 'col_new', 'value').toPandas() NameError: name 'df' is not defined
csr_data.head()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[64], line 1 ----> 1 csr_data.head() NameError: name 'csr_data' is not defined
= df.select('feature_name', 'col_new').distinct()
features_names 'feature_name') == 'n_actions_per_category_id#C#1204.0').head() features_names.where(col(
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[65], line 1 ----> 1 features_names = df.select('feature_name', 'col_new').distinct() 2 features_names.where(col('feature_name') == 'n_actions_per_category_id#C#1204.0').head() NameError: name 'df' is not defined
'feature_name') == 'n_actions_per_category_id#O#1204.0').head() features_names.where(col(
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[66], line 1 ----> 1 features_names.where(col('feature_name') == 'n_actions_per_category_id#O#1204.0').head() NameError: name 'features_names' is not defined
from scipy.sparse import csr_matrix
import numpy as np
= csr_data['row_new'].values - 1
rows = csr_data['col_new'].values - 1
cols = csr_data['value'].values
vals
= csr_matrix((vals, (rows, cols))) X_csr
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[67], line 4 1 from scipy.sparse import csr_matrix 2 import numpy as np ----> 4 rows = csr_data['row_new'].values - 1 5 cols = csr_data['col_new'].values - 1 6 vals = csr_data['value'].values NameError: name 'csr_data' is not defined
X_csr.shape
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[68], line 1 ----> 1 X_csr.shape NameError: name 'X_csr' is not defined
X_csr.shape, X_csr.nnz
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[69], line 1 ----> 1 X_csr.shape, X_csr.nnz NameError: name 'X_csr' is not defined
/ (X_csr.shape[0]* X_csr.shape[1]) # 0152347 * 92) X_csr.nnz
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[70], line 1 ----> 1 X_csr.nnz / (X_csr.shape[0]* X_csr.shape[1]) # 0152347 * 92) NameError: name 'X_csr' is not defined
# The label vector. Let's make it dense, flat and binary
= np.array(X_csr[:, 1].todense()).ravel()
y = np.array(y > 0, dtype=np.int64) y
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[71], line 2 1 # The label vector. Let's make it dense, flat and binary ----> 2 y = np.array(X_csr[:, 1].todense()).ravel() 3 y = np.array(y > 0, dtype=np.int64) NameError: name 'X_csr' is not defined
X_csr.shape
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[72], line 1 ----> 1 X_csr.shape NameError: name 'X_csr' is not defined
# We remove the second and fourth column.
# It actually contain the label we'll want to predict.
= list(range(X_csr.shape[1]))
kept_cols 1)
kept_cols.pop(2)
kept_cols.pop(= X_csr[:, kept_cols] X
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[73], line 3 1 # We remove the second and fourth column. 2 # It actually contain the label we'll want to predict. ----> 3 kept_cols = list(range(X_csr.shape[1])) 4 kept_cols.pop(1) 5 kept_cols.pop(2) NameError: name 'X_csr' is not defined
len(kept_cols)
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[74], line 1 ----> 1 len(kept_cols) NameError: name 'kept_cols' is not defined
X_csr.shape, X.shape
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[75], line 1 ----> 1 X_csr.shape, X.shape NameError: name 'X_csr' is not defined
Wow ! That was a lot of work. Now we have a features matrix \(X\) and a vector of labels \(y\).
X.indices
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[76], line 1 ----> 1 X.indices NameError: name 'X' is not defined
X.indptr
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[77], line 1 ----> 1 X.indptr NameError: name 'X' is not defined
X.shape, X.nnz
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[78], line 1 ----> 1 X.shape, X.nnz NameError: name 'X' is not defined
sum() y.shape, y.
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[79], line 1 ----> 1 y.shape, y.sum() NameError: name 'y' is not defined
from sklearn.preprocessing import MaxAbsScaler
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
# Normalize the features
= MaxAbsScaler().fit_transform(X)
X = train_test_split(X, y, stratify=y, test_size=0.3)
X_train, X_test, y_train, y_test
= LogisticRegression(
clf ='l2',
penalty=1e3,
C='lbfgs',
solver='balanced'
class_weight
)
clf.fit(X_train, y_train)
--------------------------------------------------------------------------- ModuleNotFoundError Traceback (most recent call last) Cell In[80], line 1 ----> 1 from sklearn.preprocessing import MaxAbsScaler 2 from sklearn.model_selection import train_test_split 3 from sklearn.linear_model import LogisticRegression ModuleNotFoundError: No module named 'sklearn'
= features_names.toPandas()['feature_name'] features_names
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[81], line 1 ----> 1 features_names = features_names.toPandas()['feature_name'] NameError: name 'features_names' is not defined
range(6)] features_names[
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[82], line 1 ----> 1 features_names[range(6)] NameError: name 'features_names' is not defined
import matplotlib.pyplot as plt
%matplotlib inline
=(16, 5))
plt.figure(figsize0]) # , use_line_collection=True)
plt.stem(clf.coef_['Logistic regression coefficients', fontsize=18) plt.title(
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[84], line 2 1 plt.figure(figsize=(16, 5)) ----> 2 plt.stem(clf.coef_[0]) # , use_line_collection=True) 3 plt.title('Logistic regression coefficients', fontsize=18) NameError: name 'clf' is not defined
<Figure size 1536x480 with 0 Axes>
0].shape[0] clf.coef_[
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[85], line 1 ----> 1 clf.coef_[0].shape[0] NameError: name 'clf' is not defined
len(features_names)
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[86], line 1 ----> 1 len(features_names) NameError: name 'features_names' is not defined
# We change the fontsize of minor ticks label
= plt.xticks(np.arange(clf.coef_[0].shape[0]), features_names,
_ ='vertical', fontsize=8) rotation
= plt.yticks(fontsize=14) _
from sklearn.metrics import precision_recall_curve, f1_score
= precision_recall_curve(y_test, clf.predict_proba(X_test)[:, 1])
precision, recall, _
=(8, 6))
plt.figure(figsize='LR (F1=%.2f)' % f1_score(y_test, clf.predict(X_test)), lw=2)
plt.plot(recall, precision, label0.0, 1.0])
plt.xlim([0.0, 1.05])
plt.ylim(['Recall', fontsize=16)
plt.xlabel('Precision', fontsize=16)
plt.ylabel('Precision/recall curve', fontsize=18)
plt.title(="upper right", fontsize=14) plt.legend(loc
--------------------------------------------------------------------------- ModuleNotFoundError Traceback (most recent call last) Cell In[88], line 1 ----> 1 from sklearn.metrics import precision_recall_curve, f1_score 3 precision, recall, _ = precision_recall_curve(y_test, clf.predict_proba(X_test)[:, 1]) 5 plt.figure(figsize=(8, 6)) ModuleNotFoundError: No module named 'sklearn'
= """ANALYZE TABLE db_table COMPUTE STATISTICS
query FOR COLUMNS xid"""
"db_table") df.createOrReplaceTempView(
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[90], line 1 ----> 1 df.createOrReplaceTempView("db_table") NameError: name 'df' is not defined
df.columns
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[91], line 1 ----> 1 df.columns NameError: name 'df' is not defined
"cache table db_table") spark.sql(
spark.sql(query)
"show tables") spark.sql(