Code
import os
import sys
'PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable os.environ[
pyspark
for data preprocessingThe data is a parquet
file which contains a dataframe with 8 columns:
xid
: unique user idaction
: type of action. ‘C’ is a click, ‘O’ or ‘VSL’ is a web-displaydate
: date of the actionwebsite_id
: unique id of the websiteurl
: url of the webpagecategory_id
: id of the displayzipcode
: postal zipcode of the userdevice
: type of device used by the userUsing pyspark.sql
we want to do the following things:
Then, we want to construct a classifier to predict the click on the category 1204. Here is an agenda for this:
import os
import sys
'PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable os.environ[
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
= (SparkSession
spark
.builder"Spark Webdata")
.appName(
.getOrCreate() )
25/04/03 15:09:27 WARN Utils: Your hostname, boucheron-Precision-5480 resolves to a loopback address: 127.0.1.1; using 172.23.32.10 instead (on interface eth0)
25/04/03 15:09:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/03 15:09:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
import requests, zipfile, io
from pathlib import Path
= Path('webdata.parquet')
path if not path.exists():
= "https://s-v-b.github.io/IFEBY310/data/webdata.parquet.zip"
url = requests.get(url)
r = zipfile.ZipFile(io.BytesIO(r.content))
z ='./') z.extractall(path
= './'
input_path
= os.path.join(input_path, 'webdata.parquet')
input_file
= spark.read.parquet(input_file) df
We can also give a try to pyarrow.parquet
module to load the Parquet file in an Arrow table.
import pyarrow as pa
import comet as co
import pyarrow.parquet as pq
= pq.read_table(input_file) dfa
dfa.num_columns
8
:::
Let us go back to the spark data frame
df.printSchema()
root
|-- xid: string (nullable = true)
|-- action: string (nullable = true)
|-- date: timestamp (nullable = true)
|-- website_id: string (nullable = true)
|-- url: string (nullable = true)
|-- category_id: float (nullable = true)
|-- zipcode: string (nullable = true)
|-- device: string (nullable = true)
df.rdd.getNumPartitions()
12
Explain the partition size.
df.rdd.toDebugString()
b'(12) MapPartitionsRDD[7] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[6] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | SQLExecutionRDD[5] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[4] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | MapPartitionsRDD[3] at javaToPython at NativeMethodAccessorImpl.java:0 []\n | FileScanRDD[2] at javaToPython at NativeMethodAccessorImpl.java:0 []'
First we need to import some things:
Window
classfrom pyspark.sql import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql.functions import col, lit
( 'xid')
df.select(
.distinct()
.count() )
[Stage 1:=============================> (6 + 6) / 12]
473761
def foo(x): yield len(set(x))
( df.rddmap(lambda x : x.xid)
.
.mapPartitions(foo)
.collect() )
[Stage 7:========================> (5 + 7) / 12][Stage 7:=============================> (6 + 6) / 12][Stage 7:===========================================> (9 + 3) / 12]
[78120, 78636, 79090, 79754, 79296, 78865, 0, 0, 0, 0, 0, 0]
This might pump up some computational resources
( 'xid')
df.select(
.distinct()
.explain() )
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[xid#0], functions=[])
+- Exchange hashpartitioning(xid#0, 200), ENSURE_REQUIREMENTS, [plan_id=108]
+- HashAggregate(keys=[xid#0], functions=[])
+- FileScan parquet [xid#0] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/boucheron/sandbox/IFEBY310/core/notebooks/webdata.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<xid:string>
The distinct values of xid
seem to be evenly spread among the six files making the parquet
directory. Note that the last six partitions look empty.
= Window.partitionBy('xid')
xid_partition
= func.count(col('action')).over(xid_partition)
n_events
= df.withColumn('n_events', n_events)
df
=2) df.head(n
[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', n_events=1),
Row(xid='000893c8-a14b-4f33-858f-210440f37def', action='O', date=datetime.datetime(2016, 12, 23, 16, 18, 37), website_id='56', url='http://blague.dumatin.fr/', category_id=1002.0, zipcode=None, device='DSK', n_events=1)]
(
df'xid')
.groupBy('action'))
.agg(func.count(5)
.head( )
[Row(xid='001c4a21-52c6-4890-b6ce-2b9d4ba06a56', count(action)=1),
Row(xid='0024344b-7ee2-4fcd-a0b4-bec26d8c8b0e', count(action)=4),
Row(xid='004564e3-87c1-4e16-ad2c-0e96afc3d617', count(action)=1),
Row(xid='006d807f-91c3-415a-bb5e-6b9f7e6517a1', count(action)=1),
Row(xid='006e0463-b24c-4996-84ab-d6d0d65a52aa', count(action)=1)]
= (
max_date
funcmax(col('date'))
.
.over(xid_partition)
)
= func.datediff(func.current_date(), max_date)
n_days_since_last_event
= df.withColumn('n_days_since_last_event',
df
n_days_since_last_event)
=2) df.head(n
[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', n_events=1, n_days_since_last_event=3020),
Row(xid='000893c8-a14b-4f33-858f-210440f37def', action='O', date=datetime.datetime(2016, 12, 23, 16, 18, 37), website_id='56', url='http://blague.dumatin.fr/', category_id=1002.0, zipcode=None, device='DSK', n_events=1, n_days_since_last_event=3023)]
df.printSchema()
root
|-- xid: string (nullable = true)
|-- action: string (nullable = true)
|-- date: timestamp (nullable = true)
|-- website_id: string (nullable = true)
|-- url: string (nullable = true)
|-- category_id: float (nullable = true)
|-- zipcode: string (nullable = true)
|-- device: string (nullable = true)
|-- n_events: long (nullable = false)
|-- n_days_since_last_event: integer (nullable = true)
Does this partitionBy
triggers shuffling?
= xid_partition.partitionBy('device')
xid_device_partition
= func.count(col('action')).over(xid_device_partition)
n_events_per_device
= df.withColumn('n_events_per_device', n_events_per_device)
df
=2) df.head(n
[Stage 19:==> (1 + 20) / 22]
[Row(xid='000893c8-a14b-4f33-858f-210440f37def', action='O', date=datetime.datetime(2016, 12, 23, 16, 18, 37), website_id='56', url='http://blague.dumatin.fr/', category_id=1002.0, zipcode=None, device='DSK', n_events=1, n_days_since_last_event=3023, n_events_per_device=1021837),
Row(xid='0008c5d2-c263-4b55-ae7d-82c4bf566cc4', action='O', date=datetime.datetime(2017, 1, 16, 4, 26, 21), website_id='74', url='http://www.realite-virtuelle.com/meilleure-videos-360-vr', category_id=1002.0, zipcode=None, device='DSK', n_events=1, n_days_since_last_event=2999, n_events_per_device=1021837)]
# xid_partition = Window.partitionBy('xid')
= (
rank_device
func
.dense_rank()'device'))
.over(xid_partition.orderBy(
)
= (
n_unique_device
func
.last(rank_device)
.over(xid_partition)
)
= df.withColumn('n_device', n_unique_device)
df
=2) df.head(n
[Stage 28:======================================> (2 + 1) / 3]
[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', n_events=1, n_days_since_last_event=3020, n_events_per_device=132013, n_device=1),
Row(xid='000893c8-a14b-4f33-858f-210440f37def', action='O', date=datetime.datetime(2016, 12, 23, 16, 18, 37), website_id='56', url='http://blague.dumatin.fr/', category_id=1002.0, zipcode=None, device='DSK', n_events=1, n_days_since_last_event=3023, n_events_per_device=1021837, n_device=1)]
\
df'n_device') > 1)\
.where(col('xid', 'device', 'n_events', 'n_device', 'n_events_per_device')\
.select(=8) .head(n
[Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=1021837),
Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=1021837),
Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=1021837),
Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=1021837),
Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='DSK', n_events=6, n_device=2, n_events_per_device=1021837),
Row(xid='4c1dc79d-a140-4da9-ae28-540b4503c3b8', device='MOB', n_events=6, n_device=2, n_events_per_device=1564),
Row(xid='78156cdf-7229-46eb-bb6b-92d384f9a6fa', device='DSK', n_events=6, n_device=2, n_events_per_device=1021837),
Row(xid='78156cdf-7229-46eb-bb6b-92d384f9a6fa', device='DSK', n_events=6, n_device=2, n_events_per_device=1021837)]
\
df'n_device') > 1)\
.where(col('xid', 'device', 'n_events', 'n_device', 'n_events_per_device')\
.select( .count()
3153
We construct a ETL (Extract Transform Load) process on this data using the pyspark.sql
API.
Here extraction is just about reading the data
= spark.read.parquet(input_file)
df =3) df.head(n
[Row(xid='001ff9b6-5383-4221-812d-58c2c3f234cc', action='O', date=datetime.datetime(2017, 1, 25, 7, 2, 18), website_id='3', url='http://www.8chances.com/grille', category_id=1002.0, zipcode='11370', device='SMP'),
Row(xid='0056ab7a-3cba-4ed5-a495-3d4abf79ab66', action='O', date=datetime.datetime(2016, 12, 28, 9, 47, 8), website_id='54', url='http://www.salaire-brut-en-net.fr/differences-brut-net/', category_id=1002.0, zipcode='86000', device='DSK'),
Row(xid='005ae4ab-363a-41a0-b8f9-faee47d622a4', action='O', date=datetime.datetime(2017, 1, 27, 22, 21, 6), website_id='74', url='http://www.realite-virtuelle.com/top-applications-horreur-vr-halloween', category_id=1002.0, zipcode='49700', device='DSK')]
At this step we compute a lot of extra things from the data. The aim is to build features that describe users.
def n_events_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.count(col('action')).over(xid_partition)
n_events
= df.withColumn('n_events', n_events)
df
return df
def n_events_per_action_transformer(df):
= Window.partitionBy('xid', 'action')
xid_action_partition = func.count(col('action')).over(xid_action_partition)
n_events_per_action
= df.withColumn('n_events_per_action', n_events_per_action)
df
return df
def hour_transformer(df):
= func.hour(col('date'))
hour = df.withColumn('hour', hour)
df return df
def weekday_transformer(df):
= func.date_format(col('date'), 'EEEE')
weekday = df.withColumn('weekday', weekday)
df return df
def n_events_per_hour_transformer(df):
= Window.partitionBy('xid', 'hour')
xid_hour_partition = func.count(col('action')).over(xid_hour_partition)
n_events_per_hour = df.withColumn('n_events_per_hour', n_events_per_hour)
df return df
def n_events_per_weekday_transformer(df):
= Window.partitionBy('xid', 'weekday')
xid_weekday_partition = func.count(col('action')).over(xid_weekday_partition)
n_events_per_weekday = df.withColumn('n_events_per_weekday', n_events_per_weekday)
df return df
def n_days_since_last_event_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.max(col('date')).over(xid_partition)
max_date = func.datediff(func.current_date(), max_date)
n_days_since_last_event = df.withColumn('n_days_since_last_event',
df + lit(0.1))
n_days_since_last_event return df
def n_days_since_last_action_transformer(df):
= Window.partitionBy('xid', 'action')
xid_partition_action = func.max(col('date')).over(xid_partition_action)
max_date = func.datediff(func.current_date(),
n_days_since_last_action
max_date)= df.withColumn('n_days_since_last_action',
df + lit(0.1))
n_days_since_last_action return df
def n_unique_day_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.dayofyear(col('date'))
dayofyear = func.dense_rank().over(xid_partition.orderBy(dayofyear))
rank_day = func.last(rank_day).over(xid_partition)
n_unique_day = df.withColumn('n_unique_day', n_unique_day)
df return df
def n_unique_hour_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.dense_rank().over(xid_partition.orderBy('hour'))
rank_hour = func.last(rank_hour).over(xid_partition)
n_unique_hour = df.withColumn('n_unique_hour', n_unique_hour)
df return df
def n_events_per_device_transformer(df):
= Window.partitionBy('xid', 'device')
xid_device_partition = func.count(func.col('device')) \
n_events_per_device
.over(xid_device_partition)= df.withColumn('n_events_per_device', n_events_per_device)
df return df
def n_unique_device_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.dense_rank().over(xid_partition.orderBy('device'))
rank_device = func.last(rank_device).over(xid_partition)
n_unique_device = df.withColumn('n_device', n_unique_device)
df return df
def n_actions_per_category_id_transformer(df):
= Window.partitionBy('xid', 'category_id',
xid_category_id_partition 'action')
= func.count(func.col('action')) \
n_actions_per_category_id
.over(xid_category_id_partition)= df.withColumn('n_actions_per_category_id', n_actions_per_category_id)
df return df
def n_unique_category_id_transformer(df):
= Window.partitionBy('xid')
xid_partition = func.dense_rank().over(xid_partition\
rank_category_id 'category_id'))
.orderBy(= func.last(rank_category_id).over(xid_partition)
n_unique_category_id = df.withColumn('n_unique_category_id', n_unique_category_id)
df return df
def n_events_per_category_id_transformer(df):
= Window.partitionBy('xid', 'category_id')
xid_category_id_partition = func.count(func.col('action')) \
n_events_per_category_id
.over(xid_category_id_partition)= df.withColumn('n_events_per_category_id', n_events_per_category_id)
df return df
def n_events_per_website_id_transformer(df):
= Window.partitionBy('xid', 'website_id')
xid_website_id_partition = func.count(col('action'))\
n_events_per_website_id
.over(xid_website_id_partition)= df.withColumn('n_events_per_website_id', n_events_per_website_id)
df return df
= [
transformers
hour_transformer,
weekday_transformer,
n_events_per_hour_transformer,
n_events_per_weekday_transformer,
n_days_since_last_event_transformer,
n_days_since_last_action_transformer,
n_unique_day_transformer,
n_unique_hour_transformer,
n_events_per_device_transformer,
n_unique_device_transformer,
n_actions_per_category_id_transformer,
n_events_per_category_id_transformer,
n_events_per_website_id_transformer, ]
= 10000 N
= df.sample(withReplacement=False, fraction=.05) sample_df
sample_df.count()
58815
for transformer in transformers:
= transformer(df)
df
=1) df.head(n
[Stage 54:===================================================> (11 + 1) / 12][Stage 56:> (0 + 1) / 1]
[Row(xid='0006cea7-1679-4264-bdef-0cd089749ede', action='O', date=datetime.datetime(2016, 12, 26, 13, 41, 8), website_id='51', url='https://www.footlegende.fr/mercato-psg-coutinho-10166', category_id=1002.0, zipcode='34290', device='TAB', hour=13, weekday='Monday', n_events_per_hour=1, n_events_per_weekday=1, n_days_since_last_event=3020.1, n_days_since_last_action=3020.1, n_unique_day=1, n_unique_hour=1, n_events_per_device=1, n_device=1, n_actions_per_category_id=1, n_events_per_category_id=1, n_events_per_website_id=1)]
for transformer in transformers:
= transformer(sample_df)
sample_df
=1) sample_df.head(n
[Row(xid='0004d0b7-a7fd-44ca-accc-422b62de4436', action='O', date=datetime.datetime(2017, 1, 19, 8, 12, 31), website_id='3', url='http://www.8chances.com/grille', category_id=1002.0, zipcode='78480', device='DSK', hour=8, weekday='Thursday', n_events_per_hour=2, n_events_per_weekday=2, n_days_since_last_event=2996.1, n_days_since_last_action=2996.1, n_unique_day=1, n_unique_hour=1, n_events_per_device=2, n_device=1, n_actions_per_category_id=2, n_events_per_category_id=2, n_events_per_website_id=2)]
= sample_df df
sorted(df.columns)
['action',
'category_id',
'date',
'device',
'hour',
'n_actions_per_category_id',
'n_days_since_last_action',
'n_days_since_last_event',
'n_device',
'n_events_per_category_id',
'n_events_per_device',
'n_events_per_hour',
'n_events_per_website_id',
'n_events_per_weekday',
'n_unique_day',
'n_unique_hour',
'url',
'website_id',
'weekday',
'xid',
'zipcode']
df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [count(action#298) windowspecdefinition(xid#297, website_id#300, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_events_per_website_id#841L], [xid#297, website_id#300]
+- Sort [xid#297 ASC NULLS FIRST, website_id#300 ASC NULLS FIRST], false, 0
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, n_unique_day#686, n_unique_hour#719, n_events_per_device#747L, n_device#768, n_actions_per_category_id#798L, n_events_per_category_id#819L]
+- Window [count(action#298) windowspecdefinition(xid#297, category_id#302, action#298, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_actions_per_category_id#798L], [xid#297, knownfloatingpointnormalized(normalizenanandzero(category_id#302)), action#298]
+- Sort [xid#297 ASC NULLS FIRST, knownfloatingpointnormalized(normalizenanandzero(category_id#302)) ASC NULLS FIRST, action#298 ASC NULLS FIRST], false, 0
+- Window [count(action#298) windowspecdefinition(xid#297, category_id#302, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_events_per_category_id#819L], [xid#297, knownfloatingpointnormalized(normalizenanandzero(category_id#302))]
+- Sort [xid#297 ASC NULLS FIRST, knownfloatingpointnormalized(normalizenanandzero(category_id#302)) ASC NULLS FIRST], false, 0
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, n_unique_day#686, n_unique_hour#719, n_events_per_device#747L, n_device#768]
+- Window [count(device#304) windowspecdefinition(xid#297, device#304, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_events_per_device#747L], [xid#297, device#304]
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, n_unique_day#686, n_unique_hour#719, n_device#768]
+- Window [last(_w0#775, false) windowspecdefinition(xid#297, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_device#768], [xid#297]
+- Window [dense_rank(device#304) windowspecdefinition(xid#297, device#304 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _w0#775], [xid#297], [device#304 ASC NULLS FIRST]
+- Sort [xid#297 ASC NULLS FIRST, device#304 ASC NULLS FIRST], false, 0
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, n_unique_day#686, n_unique_hour#719]
+- Window [last(_w0#726, false) windowspecdefinition(xid#297, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_unique_hour#719], [xid#297]
+- Window [dense_rank(hour#602) windowspecdefinition(xid#297, hour#602 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _w0#726], [xid#297], [hour#602 ASC NULLS FIRST]
+- Sort [xid#297 ASC NULLS FIRST, hour#602 ASC NULLS FIRST], false, 0
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, n_unique_day#686]
+- Window [last(_w0#693, false) windowspecdefinition(xid#297, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_unique_day#686], [xid#297]
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, _w0#693]
+- Window [dense_rank(_w0#694) windowspecdefinition(xid#297, _w0#694 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _w0#693], [xid#297], [_w0#694 ASC NULLS FIRST]
+- Sort [xid#297 ASC NULLS FIRST, _w0#694 ASC NULLS FIRST], false, 0
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, (cast(datediff(2025-04-03, cast(_we0#668 as date)) as double) + 0.1) AS n_days_since_last_action#667, dayofyear(cast(date#299 as date)) AS _w0#694]
+- Window [max(date#299) windowspecdefinition(xid#297, action#298, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#668], [xid#297, action#298]
+- Sort [xid#297 ASC NULLS FIRST, action#298 ASC NULLS FIRST], false, 0
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, (cast(datediff(2025-04-03, cast(_we0#652 as date)) as double) + 0.1) AS n_days_since_last_event#651]
+- Window [count(action#298) windowspecdefinition(xid#297, weekday#612, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_events_per_weekday#637L], [xid#297, weekday#612]
+- Sort [xid#297 ASC NULLS FIRST, weekday#612 ASC NULLS FIRST], false, 0
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, _we0#652]
+- Window [count(action#298) windowspecdefinition(xid#297, hour#602, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_events_per_hour#624L], [xid#297, hour#602]
+- Sort [xid#297 ASC NULLS FIRST, hour#602 ASC NULLS FIRST], false, 0
+- Window [max(date#299) windowspecdefinition(xid#297, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#652], [xid#297]
+- Sort [xid#297 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(xid#297, 200), ENSURE_REQUIREMENTS, [plan_id=2113]
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour(date#299, Some(Europe/Paris)) AS hour#602, date_format(date#299, EEEE, Some(Europe/Paris)) AS weekday#612]
+- Sample 0.0, 0.05, false, 7061523513992374299
+- FileScan parquet [xid#297,action#298,date#299,website_id#300,url#301,category_id#302,zipcode#303,device#304] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/boucheron/sandbox/IFEBY310/core/notebooks/webdata.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<xid:string,action:string,date:timestamp,website_id:string,url:string,category_id:float,zip...
".")
spark._sc.setCheckpointDir(
df.checkpoint()
DataFrame[xid: string, action: string, date: timestamp, website_id: string, url: string, category_id: float, zipcode: string, device: string, hour: int, weekday: string, n_events_per_hour: bigint, n_events_per_weekday: bigint, n_days_since_last_event: double, n_days_since_last_action: double, n_unique_day: int, n_unique_hour: int, n_events_per_device: bigint, n_device: int, n_actions_per_category_id: bigint, n_events_per_category_id: bigint, n_events_per_website_id: bigint]
df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
Window [count(action#298) windowspecdefinition(xid#297, website_id#300, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_events_per_website_id#841L], [xid#297, website_id#300]
+- *(13) Sort [xid#297 ASC NULLS FIRST, website_id#300 ASC NULLS FIRST], false, 0
+- *(13) Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, n_unique_day#686, n_unique_hour#719, n_events_per_device#747L, n_device#768, n_actions_per_category_id#798L, n_events_per_category_id#819L]
+- Window [count(action#298) windowspecdefinition(xid#297, category_id#302, action#298, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_actions_per_category_id#798L], [xid#297, knownfloatingpointnormalized(normalizenanandzero(category_id#302)), action#298]
+- *(12) Sort [xid#297 ASC NULLS FIRST, knownfloatingpointnormalized(normalizenanandzero(category_id#302)) ASC NULLS FIRST, action#298 ASC NULLS FIRST], false, 0
+- Window [count(action#298) windowspecdefinition(xid#297, category_id#302, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_events_per_category_id#819L], [xid#297, knownfloatingpointnormalized(normalizenanandzero(category_id#302))]
+- *(11) Sort [xid#297 ASC NULLS FIRST, knownfloatingpointnormalized(normalizenanandzero(category_id#302)) ASC NULLS FIRST], false, 0
+- *(11) Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, n_unique_day#686, n_unique_hour#719, n_events_per_device#747L, n_device#768]
+- Window [count(device#304) windowspecdefinition(xid#297, device#304, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_events_per_device#747L], [xid#297, device#304]
+- *(10) Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, n_unique_day#686, n_unique_hour#719, n_device#768]
+- Window [last(_w0#775, false) windowspecdefinition(xid#297, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_device#768], [xid#297]
+- Window [dense_rank(device#304) windowspecdefinition(xid#297, device#304 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _w0#775], [xid#297], [device#304 ASC NULLS FIRST]
+- *(9) Sort [xid#297 ASC NULLS FIRST, device#304 ASC NULLS FIRST], false, 0
+- *(9) Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, n_unique_day#686, n_unique_hour#719]
+- Window [last(_w0#726, false) windowspecdefinition(xid#297, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_unique_hour#719], [xid#297]
+- Window [dense_rank(hour#602) windowspecdefinition(xid#297, hour#602 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _w0#726], [xid#297], [hour#602 ASC NULLS FIRST]
+- *(8) Sort [xid#297 ASC NULLS FIRST, hour#602 ASC NULLS FIRST], false, 0
+- *(8) Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, n_unique_day#686]
+- Window [last(_w0#693, false) windowspecdefinition(xid#297, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_unique_day#686], [xid#297]
+- *(7) Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, _w0#693]
+- Window [dense_rank(_w0#694) windowspecdefinition(xid#297, _w0#694 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _w0#693], [xid#297], [_w0#694 ASC NULLS FIRST]
+- *(6) Sort [xid#297 ASC NULLS FIRST, _w0#694 ASC NULLS FIRST], false, 0
+- *(6) Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, (cast(datediff(2025-04-03, cast(_we0#668 as date)) as double) + 0.1) AS n_days_since_last_action#667, dayofyear(cast(date#299 as date)) AS _w0#694]
+- Window [max(date#299) windowspecdefinition(xid#297, action#298, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#668], [xid#297, action#298]
+- *(5) Sort [xid#297 ASC NULLS FIRST, action#298 ASC NULLS FIRST], false, 0
+- *(5) Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, (cast(datediff(2025-04-03, cast(_we0#652 as date)) as double) + 0.1) AS n_days_since_last_event#651]
+- Window [count(action#298) windowspecdefinition(xid#297, weekday#612, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_events_per_weekday#637L], [xid#297, weekday#612]
+- *(4) Sort [xid#297 ASC NULLS FIRST, weekday#612 ASC NULLS FIRST], false, 0
+- *(4) Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, _we0#652]
+- Window [count(action#298) windowspecdefinition(xid#297, hour#602, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_events_per_hour#624L], [xid#297, hour#602]
+- *(3) Sort [xid#297 ASC NULLS FIRST, hour#602 ASC NULLS FIRST], false, 0
+- Window [max(date#299) windowspecdefinition(xid#297, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#652], [xid#297]
+- *(2) Sort [xid#297 ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 0
+- Exchange hashpartitioning(xid#297, 200), ENSURE_REQUIREMENTS, [plan_id=2171]
+- *(1) Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour(date#299, Some(Europe/Paris)) AS hour#602, date_format(date#299, EEEE, Some(Europe/Paris)) AS weekday#612]
+- *(1) Sample 0.0, 0.05, false, 7061523513992374299
+- *(1) ColumnarToRow
+- FileScan parquet [xid#297,action#298,date#299,website_id#300,url#301,category_id#302,zipcode#303,device#304] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/boucheron/sandbox/IFEBY310/core/notebooks/webdata.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<xid:string,action:string,date:timestamp,website_id:string,url:string,category_id:float,zip...
+- == Initial Plan ==
Window [count(action#298) windowspecdefinition(xid#297, website_id#300, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_events_per_website_id#841L], [xid#297, website_id#300]
+- Sort [xid#297 ASC NULLS FIRST, website_id#300 ASC NULLS FIRST], false, 0
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, n_unique_day#686, n_unique_hour#719, n_events_per_device#747L, n_device#768, n_actions_per_category_id#798L, n_events_per_category_id#819L]
+- Window [count(action#298) windowspecdefinition(xid#297, category_id#302, action#298, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_actions_per_category_id#798L], [xid#297, knownfloatingpointnormalized(normalizenanandzero(category_id#302)), action#298]
+- Sort [xid#297 ASC NULLS FIRST, knownfloatingpointnormalized(normalizenanandzero(category_id#302)) ASC NULLS FIRST, action#298 ASC NULLS FIRST], false, 0
+- Window [count(action#298) windowspecdefinition(xid#297, category_id#302, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_events_per_category_id#819L], [xid#297, knownfloatingpointnormalized(normalizenanandzero(category_id#302))]
+- Sort [xid#297 ASC NULLS FIRST, knownfloatingpointnormalized(normalizenanandzero(category_id#302)) ASC NULLS FIRST], false, 0
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, n_unique_day#686, n_unique_hour#719, n_events_per_device#747L, n_device#768]
+- Window [count(device#304) windowspecdefinition(xid#297, device#304, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_events_per_device#747L], [xid#297, device#304]
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, n_unique_day#686, n_unique_hour#719, n_device#768]
+- Window [last(_w0#775, false) windowspecdefinition(xid#297, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_device#768], [xid#297]
+- Window [dense_rank(device#304) windowspecdefinition(xid#297, device#304 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _w0#775], [xid#297], [device#304 ASC NULLS FIRST]
+- Sort [xid#297 ASC NULLS FIRST, device#304 ASC NULLS FIRST], false, 0
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, n_unique_day#686, n_unique_hour#719]
+- Window [last(_w0#726, false) windowspecdefinition(xid#297, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_unique_hour#719], [xid#297]
+- Window [dense_rank(hour#602) windowspecdefinition(xid#297, hour#602 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _w0#726], [xid#297], [hour#602 ASC NULLS FIRST]
+- Sort [xid#297 ASC NULLS FIRST, hour#602 ASC NULLS FIRST], false, 0
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, n_unique_day#686]
+- Window [last(_w0#693, false) windowspecdefinition(xid#297, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_unique_day#686], [xid#297]
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, n_days_since_last_action#667, _w0#693]
+- Window [dense_rank(_w0#694) windowspecdefinition(xid#297, _w0#694 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _w0#693], [xid#297], [_w0#694 ASC NULLS FIRST]
+- Sort [xid#297 ASC NULLS FIRST, _w0#694 ASC NULLS FIRST], false, 0
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, n_days_since_last_event#651, (cast(datediff(2025-04-03, cast(_we0#668 as date)) as double) + 0.1) AS n_days_since_last_action#667, dayofyear(cast(date#299 as date)) AS _w0#694]
+- Window [max(date#299) windowspecdefinition(xid#297, action#298, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#668], [xid#297, action#298]
+- Sort [xid#297 ASC NULLS FIRST, action#298 ASC NULLS FIRST], false, 0
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, n_events_per_weekday#637L, (cast(datediff(2025-04-03, cast(_we0#652 as date)) as double) + 0.1) AS n_days_since_last_event#651]
+- Window [count(action#298) windowspecdefinition(xid#297, weekday#612, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_events_per_weekday#637L], [xid#297, weekday#612]
+- Sort [xid#297 ASC NULLS FIRST, weekday#612 ASC NULLS FIRST], false, 0
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour#602, weekday#612, n_events_per_hour#624L, _we0#652]
+- Window [count(action#298) windowspecdefinition(xid#297, hour#602, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n_events_per_hour#624L], [xid#297, hour#602]
+- Sort [xid#297 ASC NULLS FIRST, hour#602 ASC NULLS FIRST], false, 0
+- Window [max(date#299) windowspecdefinition(xid#297, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#652], [xid#297]
+- Sort [xid#297 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(xid#297, 200), ENSURE_REQUIREMENTS, [plan_id=2113]
+- Project [xid#297, action#298, date#299, website_id#300, url#301, category_id#302, zipcode#303, device#304, hour(date#299, Some(Europe/Paris)) AS hour#602, date_format(date#299, EEEE, Some(Europe/Paris)) AS weekday#612]
+- Sample 0.0, 0.05, false, 7061523513992374299
+- FileScan parquet [xid#297,action#298,date#299,website_id#300,url#301,category_id#302,zipcode#303,device#304] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/boucheron/sandbox/IFEBY310/core/notebooks/webdata.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<xid:string,action:string,date:timestamp,website_id:string,url:string,category_id:float,zip...
Here, we use all the previous computations (saved in the columns of the dataframe) to compute aggregated informations about each user.
This should be DRYED
def n_events_per_hour_loader(df):
= df\
csr 'xid', 'hour', 'n_events_per_hour')\
.select('n_events_per_hour', 'value')\
.withColumnRenamed(
.distinct() # action
= func.concat(lit('n_events_per_hour#'), col('hour'))
feature_name
= csr\
csr 'feature_name', feature_name)\
.withColumn('hour')
.drop(return csr
def n_events_per_website_id_loader(df):
= df.select('xid', 'website_id', 'n_events_per_website_id')\
csr 'n_events_per_hour', 'value')\
.withColumnRenamed(
.distinct()
= func.concat(lit('n_events_per_website_id#'),
feature_name 'website_id'))
col(
= csr\
csr 'feature_name', feature_name)\
.withColumn('website_id')
.drop(return csr
def n_events_per_hour_loader(df):
= df\
csr 'xid', 'hour', 'n_events_per_hour')\
.select('n_events_per_hour', 'value')\
.withColumnRenamed(
.distinct()
= func.concat(lit('n_events_per_hour#'), col('hour'))
feature_name
= csr\
csr 'feature_name', feature_name)\
.withColumn('hour')
.drop(return csr
def n_events_per_weekday_loader(df):
= df\
csr 'xid', 'weekday', 'n_events_per_weekday')\
.select('n_events_per_weekday', 'value')\
.withColumnRenamed(
.distinct()
= func.concat(lit('n_events_per_weekday#'), col('weekday'))
feature_name
= csr\
csr 'feature_name', feature_name)\
.withColumn('weekday')
.drop(
return csr
def n_days_since_last_event_loader(df):
= df.select('xid', 'n_days_since_last_event')\
csr 'n_days_since_last_event', 'value')\
.withColumnRenamed(
.distinct()= lit('n_days_since_last_event')
feature_name = csr\
csr 'feature_name', feature_name)
.withColumn(return csr
def n_days_since_last_action_loader(df):
= df.select('xid', 'action', 'n_days_since_last_action')\
csr 'n_days_since_last_action', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_days_since_last_action#'), col('action'))
feature_name = csr\
csr 'feature_name', feature_name)\
.withColumn('action')
.drop(return csr
def n_unique_day_loader(df):
= df.select('xid', 'n_unique_day')\
csr 'n_unique_day', 'value')\
.withColumnRenamed(
.distinct()= lit('n_unique_day')
feature_name = csr\
csr 'feature_name', feature_name)
.withColumn(return csr
def n_unique_hour_loader(df):
= df.select('xid', 'n_unique_hour')\
csr 'n_unique_hour', 'value')\
.withColumnRenamed(
.distinct()= lit('n_unique_hour')
feature_name = csr\
csr 'feature_name', feature_name)
.withColumn(return csr
def n_events_per_device_loader(df):
= df\
csr 'xid', 'device', 'n_events_per_device')\
.select('n_events_per_device', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_device#'), col('device'))
feature_name = csr\
csr 'feature_name', feature_name)\
.withColumn('device')
.drop(return csr
def n_unique_device_loader(df):
= df.select('xid', 'n_device')\
csr 'n_device', 'value')\
.withColumnRenamed(
.distinct()= lit('n_device')
feature_name = csr\
csr 'feature_name', feature_name)
.withColumn(return csr
def n_events_per_category_id_loader(df):
= df.select('xid', 'category_id', 'n_events_per_category_id')\
csr 'n_events_per_category_id', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_category_id#'),
feature_name 'category_id'))
col(= csr\
csr 'feature_name', feature_name)\
.withColumn('category_id')
.drop(return csr
def n_actions_per_category_id_loader(df):
= df.select('xid', 'category_id', 'action', 'n_actions_per_category_id')\
csr 'n_actions_per_category_id', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_actions_per_category_id#'),
feature_name 'action'), lit('#'),
col('category_id'))
col(= csr\
csr 'feature_name', feature_name)\
.withColumn('category_id')\
.drop('action')
.drop(return csr
def n_events_per_website_id_loader(df):
= df.select('xid', 'website_id', 'n_events_per_website_id')\
csr 'n_events_per_website_id', 'value')\
.withColumnRenamed(
.distinct()= func.concat(lit('n_events_per_website_id#'),
feature_name 'website_id'))
col(= csr\
csr 'feature_name', feature_name)\
.withColumn('website_id')
.drop(return csr
from functools import reduce
= [
loaders
n_events_per_hour_loader,
n_events_per_website_id_loader,
n_events_per_hour_loader,
n_events_per_weekday_loader,
n_days_since_last_event_loader,
n_days_since_last_action_loader,
n_unique_day_loader,
n_unique_hour_loader,
n_events_per_device_loader,
n_unique_device_loader,
n_events_per_category_id_loader,
n_actions_per_category_id_loader,
n_events_per_website_id_loader, ]
def union(df, other):
return df.union(other)
This method performs a SQL-style set union of the rows from both DataFrame objects, with no automatic deduplication of elements.
Use the distinct() method to perform deduplication of rows.
The method resolves columns by position (not by name), following the standard behavior in SQL.
= [loader(df) for loader in loaders] spam
0].printSchema() spam[
root
|-- xid: string (nullable = true)
|-- value: long (nullable = false)
|-- feature_name: string (nullable = true)
all(spam[0].columns == it.columns for it in spam[1:])
True
len(spam)
13
= reduce(
csr lambda df1, df2: df1.union(df2),
spam
)
=3) csr.head(n
[Stage 72:========> (6 + 6) / 12][Stage 73:==> (2 + 10) / 12]
[Row(xid='0004d0b7-a7fd-44ca-accc-422b62de4436', value=2.0, feature_name='n_events_per_hour#8'),
Row(xid='00069fa0-492f-4e54-90e2-ec456dd3b3f8', value=1.0, feature_name='n_events_per_hour#22'),
Row(xid='000717fa-9076-4df0-b261-2637ce5d2325', value=1.0, feature_name='n_events_per_hour#22')]
csr.columns
['xid', 'value', 'feature_name']
5) csr.show(
+--------------------+-----+--------------------+
| xid|value| feature_name|
+--------------------+-----+--------------------+
|0004d0b7-a7fd-44c...| 2.0| n_events_per_hour#8|
|00069fa0-492f-4e5...| 1.0|n_events_per_hour#22|
|000717fa-9076-4df...| 1.0|n_events_per_hour#22|
|00077fd6-ea8c-412...| 1.0|n_events_per_hour#17|
|0008839b-79ea-4bc...| 1.0|n_events_per_hour#19|
+--------------------+-----+--------------------+
only showing top 5 rows
csr.rdd.getNumPartitions()
17
# Replace features names and xid by a unique number
= Window().orderBy('feature_name')
feature_name_partition
= Window().orderBy('xid')
xid_partition
= func.dense_rank().over(feature_name_partition)
col_idx = func.dense_rank().over(xid_partition) row_idx
= csr.withColumn('col', col_idx)\
csr 'row', row_idx)
.withColumn(
= csr.na.drop('any')
csr
=5) csr.head(n
25/04/03 15:09:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:49 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:51 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Row(xid='00009986-95f1-4f7d-a0f8-2941e2ad330c', value=1.0, feature_name='n_actions_per_category_id#O#1002.0', col=4, row=1),
Row(xid='00009986-95f1-4f7d-a0f8-2941e2ad330c', value=3019.1, feature_name='n_days_since_last_action#O', col=7, row=1),
Row(xid='00009986-95f1-4f7d-a0f8-2941e2ad330c', value=3019.1, feature_name='n_days_since_last_event', col=8, row=1),
Row(xid='00009986-95f1-4f7d-a0f8-2941e2ad330c', value=1.0, feature_name='n_device', col=9, row=1),
Row(xid='00009986-95f1-4f7d-a0f8-2941e2ad330c', value=1.0, feature_name='n_events_per_category_id#1002.0', col=10, row=1)]
# Let's save the result of our hard work into a new parquet file
= './'
output_path = os.path.join(output_path, 'csr.parquet')
output_file ='overwrite') csr.write.parquet(output_file, mode
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 223:> (0 + 1) / 1]
= './'
csr_path = os.path.join(csr_path, 'csr.parquet')
csr_file
= spark.read.parquet(csr_file)
df =5) df.head(n
[Row(xid='00009986-95f1-4f7d-a0f8-2941e2ad330c', value=1.0, feature_name='n_actions_per_category_id#O#1002.0', col=4, row=1),
Row(xid='00009986-95f1-4f7d-a0f8-2941e2ad330c', value=3019.1, feature_name='n_days_since_last_action#O', col=7, row=1),
Row(xid='00009986-95f1-4f7d-a0f8-2941e2ad330c', value=3019.1, feature_name='n_days_since_last_event', col=8, row=1),
Row(xid='00009986-95f1-4f7d-a0f8-2941e2ad330c', value=1.0, feature_name='n_device', col=9, row=1),
Row(xid='00009986-95f1-4f7d-a0f8-2941e2ad330c', value=1.0, feature_name='n_events_per_category_id#1002.0', col=10, row=1)]
df.count()
518891
# What are the features related to campaign_id 1204 ?
= \
features_names 'feature_name')\
df.select(\
.distinct()'feature_name'] .toPandas()[
features_names
0 n_events_per_website_id#25
1 n_events_per_weekday#Sunday
2 n_events_per_website_id#42
3 n_events_per_hour#0
4 n_events_per_hour#10
...
95 n_events_per_hour#19
96 n_events_per_website_id#29
97 n_events_per_website_id#60
98 n_events_per_weekday#Monday
99 n_events_per_website_id#32
Name: feature_name, Length: 100, dtype: object
for feature_name in features_names if '1204' in feature_name] [feature_name
['n_events_per_category_id#1204.0',
'n_actions_per_category_id#C#1204.0',
'n_actions_per_category_id#O#1204.0']
# Look for the xid that have at least one exposure to campaign 1204
= func.when(
keep 'feature_name') == 'n_actions_per_category_id#C#1204.0') |
(col('feature_name') == 'n_actions_per_category_id#O#1204.0'),
(col(1).otherwise(0)
= df.withColumn('keep', keep)
df
'keep') > 0).count() df.where(col(
11032
# Sum of the keeps :)
= Window.partitionBy('xid')
xid_partition = func.sum(col('keep')).over(xid_partition)
sum_keep = df.withColumn('sum_keep', sum_keep) df
# Let's keep the xid exposed to 1204
= df.where(col('sum_keep') > 0) df
df.count()
157726
'xid').distinct().count() df.select(
11028
= Window().orderBy('row')
row_partition = Window().orderBy('col')
col_partition
= func.dense_rank().over(row_partition)
row_new = func.dense_rank().over(col_partition)
col_new
= df.withColumn('row_new', row_new)
df = df.withColumn('col_new', col_new)
df
= df.select('row_new', 'col_new', 'value').toPandas() csr_data
25/04/03 15:09:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:56 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
csr_data.head()
row_new | col_new | value | |
---|---|---|---|
0 | 464 | 1 | 1.0 |
1 | 2080 | 1 | 1.0 |
2 | 4174 | 1 | 1.0 |
3 | 5070 | 1 | 1.0 |
4 | 7212 | 1 | 1.0 |
= df.select('feature_name', 'col_new').distinct()
features_names 'feature_name') == 'n_actions_per_category_id#C#1204.0').head() features_names.where(col(
25/04/03 15:09:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
Row(feature_name='n_actions_per_category_id#C#1204.0', col_new=2)
'feature_name') == 'n_actions_per_category_id#O#1204.0').head() features_names.where(col(
25/04/03 15:09:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
Row(feature_name='n_actions_per_category_id#O#1204.0', col_new=4)
from scipy.sparse import csr_matrix
import numpy as np
= csr_data['row_new'].values - 1
rows = csr_data['col_new'].values - 1
cols = csr_data['value'].values
vals
= csr_matrix((vals, (rows, cols))) X_csr
X_csr.shape
(11028, 82)
X_csr.shape, X_csr.nnz
((11028, 82), 131962)
/ (X_csr.shape[0]* X_csr.shape[1]) # 0152347 * 92) X_csr.nnz
0.1459278820209312
# The label vector. Let's make it dense, flat and binary
= np.array(X_csr[:, 1].todense()).ravel()
y = np.array(y > 0, dtype=np.int64) y
X_csr.shape
(11028, 82)
# We remove the second and fourth column.
# It actually contain the label we'll want to predict.
= list(range(X_csr.shape[1]))
kept_cols 1)
kept_cols.pop(2)
kept_cols.pop(= X_csr[:, kept_cols] X
len(kept_cols)
80
X_csr.shape, X.shape
((11028, 82), (11028, 80))
Wow ! That was a lot of work. Now we have a features matrix \(X\) and a vector of labels \(y\).
X.indices
array([ 1, 3, 4, ..., 74, 78, 79], shape=(120930,), dtype=int32)
X.indptr
array([ 0, 12, 22, ..., 120910, 120920, 120930],
shape=(11029,), dtype=int32)
X.shape, X.nnz
((11028, 80), 120930)
sum() y.shape, y.
((11028,), np.int64(87))
from sklearn.preprocessing import MaxAbsScaler
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
# Normalize the features
= MaxAbsScaler().fit_transform(X)
X = train_test_split(X, y, stratify=y, test_size=0.3)
X_train, X_test, y_train, y_test
= LogisticRegression(
clf ='l2',
penalty=1e3,
C='lbfgs',
solver='balanced'
class_weight
)
clf.fit(X_train, y_train)
LogisticRegression(C=1000.0, class_weight='balanced')In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
LogisticRegression(C=1000.0, class_weight='balanced')
= features_names.toPandas()['feature_name'] features_names
25/04/03 15:09:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/03 15:09:58 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
range(6)] features_names[
0 n_actions_per_category_id#C#1002.0
1 n_actions_per_category_id#C#1204.0
2 n_actions_per_category_id#O#1002.0
3 n_actions_per_category_id#O#1204.0
4 n_days_since_last_action#C
5 n_days_since_last_action#O
Name: feature_name, dtype: object
import matplotlib.pyplot as plt
%matplotlib inline
=(16, 5))
plt.figure(figsize0]) # , use_line_collection=True)
plt.stem(clf.coef_['Logistic regression coefficients', fontsize=18) plt.title(
Text(0.5, 1.0, 'Logistic regression coefficients')
0].shape[0] clf.coef_[
80
len(features_names)
82
# We change the fontsize of minor ticks label
= plt.xticks(np.arange(clf.coef_[0].shape[0]), features_names,
_ ='vertical', fontsize=8) rotation
= plt.yticks(fontsize=14) _
from sklearn.metrics import precision_recall_curve, f1_score
= precision_recall_curve(y_test, clf.predict_proba(X_test)[:, 1])
precision, recall, _
=(8, 6))
plt.figure(figsize='LR (F1=%.2f)' % f1_score(y_test, clf.predict(X_test)), lw=2)
plt.plot(recall, precision, label0.0, 1.0])
plt.xlim([0.0, 1.05])
plt.ylim(['Recall', fontsize=16)
plt.xlabel('Precision', fontsize=16)
plt.ylabel('Precision/recall curve', fontsize=18)
plt.title(="upper right", fontsize=14) plt.legend(loc
= """ANALYZE TABLE db_table COMPUTE STATISTICS
query FOR COLUMNS xid"""
"db_table") df.createOrReplaceTempView(
df.columns
['xid',
'value',
'feature_name',
'col',
'row',
'keep',
'sum_keep',
'row_new',
'col_new']
"cache table db_table") spark.sql(
spark.sql(query)
"show tables") spark.sql(