Code
import os
import sys
'PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable os.environ[
DataFrame
import os
import sys
'PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable os.environ[
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
= SparkConf().setAppName("Spark SQL Course")
conf = SparkContext(conf=conf) # no need for Spark 3...
sc
= (SparkSession
spark
.builder"Spark SQL Course")
.appName(
.getOrCreate() )
25/01/18 14:18:12 WARN Utils: Your hostname, boucheron-Precision-5480 resolves to a loopback address: 127.0.1.1; using 192.168.10.120 instead (on interface wlp0s20f3)
25/01/18 14:18:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/18 14:18:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
from pyspark.sql import Row
= Row(name="John", age=21)
row1 = Row(name="James", age=32)
row2 = Row(name="Jane", age=18)
row3 'name'] row1[
'John'
= spark.createDataFrame([row1, row2, row3]) df
df.printSchema()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
df.show()
[Stage 0:> (0 + 1) / 1]
+-----+---+
| name|age|
+-----+---+
| John| 21|
|James| 32|
| Jane| 18|
+-----+---+
print(df.rdd.toDebugString().decode("utf-8"))
(20) MapPartitionsRDD[10] at javaToPython at NativeMethodAccessorImpl.java:0 []
| MapPartitionsRDD[9] at javaToPython at NativeMethodAccessorImpl.java:0 []
| SQLExecutionRDD[8] at javaToPython at NativeMethodAccessorImpl.java:0 []
| MapPartitionsRDD[7] at javaToPython at NativeMethodAccessorImpl.java:0 []
| MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0 []
| MapPartitionsRDD[3] at map at SerDeUtil.scala:69 []
| MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:117 []
| PythonRDD[1] at RDD at PythonRDD.scala:53 []
| ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289 []
df.rdd.getNumPartitions()
20
= [
rows ="John", age=21, gender="male"),
Row(name="James", age=25, gender="female"),
Row(name="Albert", age=46, gender="male")
Row(name
]
= spark.createDataFrame(rows) df
df.show()
+------+---+------+
| name|age|gender|
+------+---+------+
| John| 21| male|
| James| 25|female|
|Albert| 46| male|
+------+---+------+
help(Row)
Help on class Row in module pyspark.sql.types:
class Row(builtins.tuple)
| Row(*args: Optional[str], **kwargs: Optional[Any]) -> 'Row'
|
| A row in :class:`DataFrame`.
| The fields in it can be accessed:
|
| * like attributes (``row.key``)
| * like dictionary values (``row[key]``)
|
| ``key in row`` will search through row keys.
|
| Row can be used to create a row object by using named arguments.
| It is not allowed to omit a named argument to represent that the value is
| None or missing. This should be explicitly set to None in this case.
|
| .. versionchanged:: 3.0.0
| Rows created from named arguments no longer have
| field names sorted alphabetically and will be ordered in the position as
| entered.
|
| Examples
| --------
| >>> from pyspark.sql import Row
| >>> row = Row(name="Alice", age=11)
| >>> row
| Row(name='Alice', age=11)
| >>> row['name'], row['age']
| ('Alice', 11)
| >>> row.name, row.age
| ('Alice', 11)
| >>> 'name' in row
| True
| >>> 'wrong_key' in row
| False
|
| Row also can be used to create another Row like class, then it
| could be used to create Row objects, such as
|
| >>> Person = Row("name", "age")
| >>> Person
| <Row('name', 'age')>
| >>> 'name' in Person
| True
| >>> 'wrong_key' in Person
| False
| >>> Person("Alice", 11)
| Row(name='Alice', age=11)
|
| This form can also be used to create rows as tuple values, i.e. with unnamed
| fields.
|
| >>> row1 = Row("Alice", 11)
| >>> row2 = Row(name="Alice", age=11)
| >>> row1 == row2
| True
|
| Method resolution order:
| Row
| builtins.tuple
| builtins.object
|
| Methods defined here:
|
| __call__(self, *args: Any) -> 'Row'
| create new Row object
|
| __contains__(self, item: Any) -> bool
| Return bool(key in self).
|
| __getattr__(self, item: str) -> Any
|
| __getitem__(self, item: Any) -> Any
| Return self[key].
|
| __reduce__(self) -> Union[str, Tuple[Any, ...]]
| Returns a tuple so Python knows how to pickle Row.
|
| __repr__(self) -> str
| Printable representation of Row used in Python REPL.
|
| __setattr__(self, key: Any, value: Any) -> None
| Implement setattr(self, name, value).
|
| asDict(self, recursive: bool = False) -> Dict[str, Any]
| Return as a dict
|
| Parameters
| ----------
| recursive : bool, optional
| turns the nested Rows to dict (default: False).
|
| Notes
| -----
| If a row contains duplicate field names, e.g., the rows of a join
| between two :class:`DataFrame` that both have the fields of same names,
| one of the duplicate fields will be selected by ``asDict``. ``__getitem__``
| will also return one of the duplicate fields, however returned value might
| be different to ``asDict``.
|
| Examples
| --------
| >>> from pyspark.sql import Row
| >>> Row(name="Alice", age=11).asDict() == {'name': 'Alice', 'age': 11}
| True
| >>> row = Row(key=1, value=Row(name='a', age=2))
| >>> row.asDict() == {'key': 1, 'value': Row(name='a', age=2)}
| True
| >>> row.asDict(True) == {'key': 1, 'value': {'name': 'a', 'age': 2}}
| True
|
| ----------------------------------------------------------------------
| Static methods defined here:
|
| __new__(cls, *args: Optional[str], **kwargs: Optional[Any]) -> 'Row'
| Create and return a new object. See help(type) for accurate signature.
|
| ----------------------------------------------------------------------
| Data descriptors defined here:
|
| __dict__
| dictionary for instance variables
|
| ----------------------------------------------------------------------
| Methods inherited from builtins.tuple:
|
| __add__(self, value, /)
| Return self+value.
|
| __eq__(self, value, /)
| Return self==value.
|
| __ge__(self, value, /)
| Return self>=value.
|
| __getattribute__(self, name, /)
| Return getattr(self, name).
|
| __getnewargs__(self, /)
|
| __gt__(self, value, /)
| Return self>value.
|
| __hash__(self, /)
| Return hash(self).
|
| __iter__(self, /)
| Implement iter(self).
|
| __le__(self, value, /)
| Return self<=value.
|
| __len__(self, /)
| Return len(self).
|
| __lt__(self, value, /)
| Return self<value.
|
| __mul__(self, value, /)
| Return self*value.
|
| __ne__(self, value, /)
| Return self!=value.
|
| __rmul__(self, value, /)
| Return value*self.
|
| count(self, value, /)
| Return number of occurrences of value.
|
| index(self, value, start=0, stop=9223372036854775807, /)
| Return first index of value.
|
| Raises ValueError if the value is not present.
|
| ----------------------------------------------------------------------
| Class methods inherited from builtins.tuple:
|
| __class_getitem__(...)
| See PEP 585
= ["name", "age", "gender"]
column_names = [
rows "John", 21, "male"],
["James", 25, "female"],
["Albert", 46, "male"]
[
]= spark.createDataFrame(rows, column_names)
df df.show()
+------+---+------+
| name|age|gender|
+------+---+------+
| John| 21| male|
| James| 25|female|
|Albert| 46| male|
+------+---+------+
df.printSchema()
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
# sc = SparkContext(conf=conf) # no need for Spark 3...
= ["name", "age", "gender"]
column_names = sc.parallelize([
rdd "John", 21, "male"),
("James", 25, "female"),
("Albert", 46, "male")
(
])= spark.createDataFrame(rdd, column_names)
df df.show()
+------+---+------+
| name|age|gender|
+------+---+------+
| John| 21| male|
| James| 25|female|
|Albert| 46| male|
+------+---+------+
df.schema
StructType([StructField('name', StringType(), True), StructField('age', LongType(), True), StructField('gender', StringType(), True)])
type(df.schema)
pyspark.sql.types.StructType
from pyspark.sql.types import *
= StructType([
schema "name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("gender", StringType(), True)
StructField(
])= [("John", 21, "male")]
rows = spark.createDataFrame(rows, schema)
df
df.printSchema() df.show()
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- gender: string (nullable = true)
+----+---+------+
|name|age|gender|
+----+---+------+
|John| 21| male|
+----+---+------+
= ["name", "age", "gender"]
column_names = [
rows "John", 21, "male"],
["Jane", 25, "female"]
[
]= spark.createDataFrame(rows, column_names)
df
# Create a temporary view from the DataFrame
"new_view")
df.createOrReplaceTempView(
# Apply the query
= "SELECT name, age FROM new_view WHERE gender='male'"
query = spark.sql(query)
men_df men_df.show()
+----+---+
|name|age|
+----+---+
|John| 21|
+----+---+
SELECT
"table")
df.createOrReplaceTempView(= "SELECT name, age FROM table"
query spark.sql(query).show()
+----+---+
|name|age|
+----+---+
|John| 21|
|Jane| 25|
+----+---+
"name", "age").show() df.select(
+----+---+
|name|age|
+----+---+
|John| 21|
|Jane| 25|
+----+---+
WHERE
"table")
df.createOrReplaceTempView(= "SELECT * FROM table WHERE age > 21"
query spark.sql(query).show()
+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
+----+---+------+
"age > 21").show() df.where(
+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
+----+---+------+
# Alternatively:
'age'] > 21).show() df.where(df[
+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
+----+---+------+
> 21).show() df.where(df.age
+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
+----+---+------+
( "age > 21")
df.where("name", "age"])
.select([
.show() )
+----+---+
|name|age|
+----+---+
|Jane| 25|
+----+---+
LIMIT
"table")
df.createOrReplaceTempView(= query = "SELECT * FROM table LIMIT 1"
query spark.sql(query).show()
+----+---+------+
|name|age|gender|
+----+---+------+
|John| 21| male|
+----+---+------+
1).show() df.limit(
+----+---+------+
|name|age|gender|
+----+---+------+
|John| 21| male|
+----+---+------+
"*").limit(1).show() df.select(
+----+---+------+
|name|age|gender|
+----+---+------+
|John| 21| male|
+----+---+------+
ORDER BY
"table")
df.createOrReplaceTempView(
= "SELECT * FROM table ORDER BY name ASC"
query
spark.sql(query).show()
+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
|John| 21| male|
+----+---+------+
df.orderBy(df.name.asc()).show()
+----+---+------+
|name|age|gender|
+----+---+------+
|Jane| 25|female|
|John| 21| male|
+----+---+------+
ALIAS
"table")
df.createOrReplaceTempView(= "SELECT name, age, gender AS sex FROM table"
query spark.sql(query).show()
+----+---+------+
|name|age| sex|
+----+---+------+
|John| 21| male|
|Jane| 25|female|
+----+---+------+
type(df.age)
pyspark.sql.column.Column
'sex')).show() df.select(df.name, df.age, df.gender.alias(
+----+---+------+
|name|age| sex|
+----+---+------+
|John| 21| male|
|Jane| 25|female|
+----+---+------+
CAST
"table")
df.createOrReplaceTempView(= "SELECT name, cast(age AS float) AS age_f FROM table"
query spark.sql(query).show()
+----+-----+
|name|age_f|
+----+-----+
|John| 21.0|
|Jane| 25.0|
+----+-----+
"float").alias("age_f")).show() df.select(df.name, df.age.cast(
+----+-----+
|name|age_f|
+----+-----+
|John| 21.0|
|Jane| 25.0|
+----+-----+
= df.age.cast("float").alias("age_f")
new_age_col type(new_age_col), type(df.age)
(pyspark.sql.column.Column, pyspark.sql.column.Column)
df.select(df.name, new_age_col).show()
+----+-----+
|name|age_f|
+----+-----+
|John| 21.0|
|Jane| 25.0|
+----+-----+
"table")
df.createOrReplaceTempView(= "SELECT *, 12*age AS age_months FROM table"
query spark.sql(query).show()
+----+---+------+----------+
|name|age|gender|age_months|
+----+---+------+----------+
|John| 21| male| 252|
|Jane| 25|female| 300|
+----+---+------+----------+
"age_months", df.age * 12).show() df.withColumn(
+----+---+------+----------+
|name|age|gender|age_months|
+----+---+------+----------+
|John| 21| male| 252|
|Jane| 25|female| 300|
+----+---+------+----------+
"*", (df.age * 12).alias("age_months")).show() df.select(
+----+---+------+----------+
|name|age|gender|age_months|
+----+---+------+----------+
|John| 21| male| 252|
|Jane| 25|female| 300|
+----+---+------+----------+
import datetime
= datetime.date.today()
hui
= hui.replace(year=hui.year-21)
hui
str(hui)
'2004-01-18'
# df.select("*", hui.replace(year=hui.year - df.age ).alias("yob")).show()
from pyspark.sql import functions as fn
= ["brand", "cost"]
columns = spark.createDataFrame([
df "garnier", 3.49),
("elseve", 2.71)
(
], columns)
= fn.round(df.cost, 1)
round_cost = fn.floor(df.cost)
floor_cost = fn.ceil(df.cost)
ceil_cost
'round', round_cost)\
df.withColumn('floor', floor_cost)\
.withColumn('ceil', ceil_cost)\
.withColumn( .show()
+-------+----+-----+-----+----+
| brand|cost|round|floor|ceil|
+-------+----+-----+-----+----+
|garnier|3.49| 3.5| 3| 4|
| elseve|2.71| 2.7| 2| 3|
+-------+----+-----+-----+----+
from pyspark.sql import functions as fn
= ["first_name", "last_name"]
columns
= spark.createDataFrame([
df "John", "Doe"),
("Mary", "Jane")
(
], columns)
= fn.substring(df.last_name, 0, 1)
last_name_initial # last_name_initial_dotted = fn.concat(last_name_initial, ".")
= fn.concat_ws(" ", df.first_name, last_name_initial)
name "name", name).show() df.withColumn(
+----------+---------+------+
|first_name|last_name| name|
+----------+---------+------+
| John| Doe|John D|
| Mary| Jane|Mary J|
+----------+---------+------+
( "*", "substring(last_name, 0, 1) as lni")
df.selectExpr("first_name", "last_name", "concat(first_name, ' ', lni, '.') as nname")
.selectExpr(
.show() )
+----------+---------+-------+
|first_name|last_name| nname|
+----------+---------+-------+
| John| Doe|John D.|
| Mary| Jane|Mary J.|
+----------+---------+-------+
from datetime import date
from pyspark.sql import functions as fn
= spark.createDataFrame([
df 2015, 1, 1), date(2015, 1, 15)),
(date(2015, 2, 21), date(2015, 3, 8)),
(date("start_date", "end_date"])
], [
= fn.datediff(df.end_date, df.start_date)
days_between = fn.month(df.start_date)
start_month
'days_between', days_between)\
df.withColumn('start_month', start_month)\
.withColumn( .show()
+----------+----------+------------+-----------+
|start_date| end_date|days_between|start_month|
+----------+----------+------------+-----------+
|2015-01-01|2015-01-15| 14| 1|
|2015-02-21|2015-03-08| 15| 2|
+----------+----------+------------+-----------+
str(date(2015, 1, 1) - date(2015, 1, 15))
'-14 days, 0:00:00'
from datetime import timedelta
2023, 2 , 14) + timedelta(days=3) date(
datetime.date(2023, 2, 17)
= spark.createDataFrame([
df "John", 21, "male"),
("Jane", 25, "female"),
("Albert", 46, "male"),
("Brad", 49, "super-hero")
("name", "age", "gender"]) ], [
= (
supervisor == 'male', 'Mr. Smith')
fn.when(df.gender == 'female', 'Miss Jones')
.when(df.gender 'NA')
.otherwise(
)
type(supervisor), type(fn.when)
(pyspark.sql.column.Column, function)
"supervisor", supervisor).show() df.withColumn(
+------+---+----------+----------+
| name|age| gender|supervisor|
+------+---+----------+----------+
| John| 21| male| Mr. Smith|
| Jane| 25| female|Miss Jones|
|Albert| 46| male| Mr. Smith|
| Brad| 49|super-hero| NA|
+------+---+----------+----------+
from pyspark.sql import functions as fn
from pyspark.sql.types import StringType
= spark.createDataFrame([(1, 3), (4, 2)], ["first", "second"])
df
def my_func(col_1, col_2):
if (col_1 > col_2):
return "{} is bigger than {}".format(col_1, col_2)
else:
return "{} is bigger than {}".format(col_2, col_1)
= fn.udf(my_func, StringType())
my_udf
"udf", my_udf(df['first'], df['second'])).show() df.withColumn(
+-----+------+------------------+
|first|second| udf|
+-----+------+------------------+
| 1| 3|3 is bigger than 1|
| 4| 2|4 is bigger than 2|
+-----+------+------------------+
spark.sql
APIfrom datetime import date
= spark.createDataFrame([
products '1', 'mouse', 'microsoft', 39.99),
('2', 'keyboard', 'logitech', 59.99),
('prod_id', 'prod_cat', 'prod_brand', 'prod_value'])
], [
= spark.createDataFrame([
purchases 2017, 11, 1), 2, '1'),
(date(2017, 11, 2), 1, '1'),
(date(2017, 11, 5), 1, '2'),
(date('date', 'quantity', 'prod_id'])
], [
# The default join type is the "INNER" join
'prod_id').show() purchases.join(products,
+-------+----------+--------+--------+----------+----------+
|prod_id| date|quantity|prod_cat|prod_brand|prod_value|
+-------+----------+--------+--------+----------+----------+
| 1|2017-11-01| 2| mouse| microsoft| 39.99|
| 1|2017-11-02| 1| mouse| microsoft| 39.99|
| 2|2017-11-05| 1|keyboard| logitech| 59.99|
+-------+----------+--------+--------+----------+----------+
'prod_id').explain() purchases.join(products,
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [prod_id#577, date#575, quantity#576L, prod_cat#568, prod_brand#569, prod_value#570]
+- SortMergeJoin [prod_id#577], [prod_id#567], Inner
:- Sort [prod_id#577 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(prod_id#577, 200), ENSURE_REQUIREMENTS, [plan_id=575]
: +- Filter isnotnull(prod_id#577)
: +- Scan ExistingRDD[date#575,quantity#576L,prod_id#577]
+- Sort [prod_id#567 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(prod_id#567, 200), ENSURE_REQUIREMENTS, [plan_id=576]
+- Filter isnotnull(prod_id#567)
+- Scan ExistingRDD[prod_id#567,prod_cat#568,prod_brand#569,prod_value#570]
SQL
query"products")
products.createOrReplaceTempView("purchases")
purchases.createOrReplaceTempView(
= """
query SELECT *
FROM purchases AS prc INNER JOIN
products AS prd
ON prc.prod_id = prd.prod_id
"""
spark.sql(query).show()
+----------+--------+-------+-------+--------+----------+----------+
| date|quantity|prod_id|prod_id|prod_cat|prod_brand|prod_value|
+----------+--------+-------+-------+--------+----------+----------+
|2017-11-01| 2| 1| 1| mouse| microsoft| 39.99|
|2017-11-02| 1| 1| 1| mouse| microsoft| 39.99|
|2017-11-05| 1| 2| 2|keyboard| logitech| 59.99|
+----------+--------+-------+-------+--------+----------+----------+
spark.sql(query).explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [prod_id#577], [prod_id#567], Inner
:- Sort [prod_id#577 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(prod_id#577, 200), ENSURE_REQUIREMENTS, [plan_id=718]
: +- Filter isnotnull(prod_id#577)
: +- Scan ExistingRDD[date#575,quantity#576L,prod_id#577]
+- Sort [prod_id#567 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(prod_id#567, 200), ENSURE_REQUIREMENTS, [plan_id=719]
+- Filter isnotnull(prod_id#567)
+- Scan ExistingRDD[prod_id#567,prod_cat#568,prod_brand#569,prod_value#570]
= spark.createDataFrame([
new_purchases 2017, 11, 1), 2, '1'),
(date(2017, 11, 2), 1, '3'),
(date('date', 'quantity', 'prod_id_x'])
], [
# The default join type is the "INNER" join
= new_purchases.prod_id_x == products.prod_id
join_rule
print(type(join_rule))
'left').show() new_purchases.join(products, join_rule,
<class 'pyspark.sql.column.Column'>
+----------+--------+---------+-------+--------+----------+----------+
| date|quantity|prod_id_x|prod_id|prod_cat|prod_brand|prod_value|
+----------+--------+---------+-------+--------+----------+----------+
|2017-11-01| 2| 1| 1| mouse| microsoft| 39.99|
|2017-11-02| 1| 3| NULL| NULL| NULL| NULL|
+----------+--------+---------+-------+--------+----------+----------+
join_rule.info
Column<'(prod_id_x = prod_id)[info]'>
= spark.createDataFrame([
new_purchases 2017, 11, 1), 2, '1'),
(date(2017, 11, 2), 1, '3'),
(date('date', 'quantity', 'prod_id_x'])
], [
# The default join type is the "INNER" join
= new_purchases.prod_id_x == products.prod_id
join_rule
'left').show() new_purchases.join(products, join_rule,
+----------+--------+---------+-------+--------+----------+----------+
| date|quantity|prod_id_x|prod_id|prod_cat|prod_brand|prod_value|
+----------+--------+---------+-------+--------+----------+----------+
|2017-11-01| 2| 1| 1| mouse| microsoft| 39.99|
|2017-11-02| 1| 3| NULL| NULL| NULL| NULL|
+----------+--------+---------+-------+--------+----------+----------+
= spark.createDataFrame([
left 1, "A1"), (2, "A2"), (3, "A3"), (4, "A4")],
("id", "value"])
[
= spark.createDataFrame([
right 3, "A3"), (4, "A4"), (4, "A4_1"), (5, "A5"), (6, "A6")],
("id", "value"])
[
= [
join_types "inner", "outer", "left", "right",
"leftsemi", "leftanti"
]
for join_type in join_types:
print(join_type)
="id", how=join_type)\
left.join(right, on"id")\
.orderBy( .show()
inner
+---+-----+-----+
| id|value|value|
+---+-----+-----+
| 3| A3| A3|
| 4| A4| A4|
| 4| A4| A4_1|
+---+-----+-----+
outer
+---+-----+-----+
| id|value|value|
+---+-----+-----+
| 1| A1| NULL|
| 2| A2| NULL|
| 3| A3| A3|
| 4| A4| A4|
| 4| A4| A4_1|
| 5| NULL| A5|
| 6| NULL| A6|
+---+-----+-----+
left
+---+-----+-----+
| id|value|value|
+---+-----+-----+
| 1| A1| NULL|
| 2| A2| NULL|
| 3| A3| A3|
| 4| A4| A4|
| 4| A4| A4_1|
+---+-----+-----+
right
+---+-----+-----+
| id|value|value|
+---+-----+-----+
| 3| A3| A3|
| 4| A4| A4|
| 4| A4| A4_1|
| 5| NULL| A5|
| 6| NULL| A6|
+---+-----+-----+
leftsemi
+---+-----+
| id|value|
+---+-----+
| 3| A3|
| 4| A4|
+---+-----+
leftanti
+---+-----+
| id|value|
+---+-----+
| 1| A1|
| 2| A2|
+---+-----+
from pyspark.sql import functions as fn
= spark.createDataFrame([
products '1', 'mouse', 'microsoft', 39.99),
('2', 'mouse', 'microsoft', 59.99),
('3', 'keyboard', 'microsoft', 59.99),
('4', 'keyboard', 'logitech', 59.99),
('5', 'mouse', 'logitech', 29.99),
('prod_id', 'prod_cat', 'prod_brand', 'prod_value'])
], [
'prod_cat').avg('prod_value').show() products.groupBy(
+--------+-----------------+
|prod_cat| avg(prod_value)|
+--------+-----------------+
| mouse|43.32333333333333|
|keyboard| 59.99|
+--------+-----------------+
'prod_cat').agg(fn.avg('prod_value')).show() products.groupBy(
+--------+-----------------+
|prod_cat| avg(prod_value)|
+--------+-----------------+
| mouse|43.32333333333333|
|keyboard| 59.99|
+--------+-----------------+
(
products'prod_cat')
.groupBy('prod_value'), fn.stddev('prod_value'))
.agg(fn.mean(
.show() )
+--------+-----------------+------------------+
|prod_cat| avg(prod_value)|stddev(prod_value)|
+--------+-----------------+------------------+
| mouse|43.32333333333333|15.275252316519468|
|keyboard| 59.99| 0.0|
+--------+-----------------+------------------+
from pyspark.sql import functions as fn
'prod_brand', 'prod_cat')\
products.groupBy('prod_value')).show() .agg(fn.avg(
+----------+--------+---------------+
|prod_brand|prod_cat|avg(prod_value)|
+----------+--------+---------------+
| microsoft| mouse| 49.99|
| microsoft|keyboard| 59.99|
| logitech|keyboard| 59.99|
| logitech| mouse| 29.99|
+----------+--------+---------------+
from pyspark.sql import functions as fn
'prod_brand').agg(
products.groupBy(round(fn.avg('prod_value'), 1).alias('average'),
fn.sum('prod_value')).alias('sum'),
fn.ceil(fn.min('prod_value').alias('min')
fn. ).show()
+----------+-------+---+-----+
|prod_brand|average|sum| min|
+----------+-------+---+-----+
| microsoft| 53.3|160|39.99|
| logitech| 45.0| 90|29.99|
+----------+-------+---+-----+
"products") products.createOrReplaceTempView(
= """
query SELECT
prod_brand,
round(avg(prod_value), 1) AS average,
min(prod_value) AS min
FROM products
GROUP BY prod_brand
"""
spark.sql(query).show()
+----------+-------+-----+
|prod_brand|average| min|
+----------+-------+-----+
| microsoft| 53.3|39.99|
| logitech| 45.0|29.99|
+----------+-------+-----+
from pyspark.sql import Window
from pyspark.sql import functions as fn
# First, we create the Window definition
= Window.partitionBy('prod_brand')
window
print(type(window))
<class 'pyspark.sql.window.WindowSpec'>
# Then, we can use "over" to aggregate on this window
= fn.avg('prod_value').over(window)
avg
# Finally, we can it as a classical column
'avg_brand_value', fn.round(avg, 2)).show() products.withColumn(
+-------+--------+----------+----------+---------------+
|prod_id|prod_cat|prod_brand|prod_value|avg_brand_value|
+-------+--------+----------+----------+---------------+
| 4|keyboard| logitech| 59.99| 44.99|
| 5| mouse| logitech| 29.99| 44.99|
| 1| mouse| microsoft| 39.99| 53.32|
| 2| mouse| microsoft| 59.99| 53.32|
| 3|keyboard| microsoft| 59.99| 53.32|
+-------+--------+----------+----------+---------------+
With SQL queries, using multiple windows is not a problem
= """
query SELECT *,
ROUND(AVG(prod_value) OVER w1, 2) AS avg_brand_value,
ROUND(AVG(prod_value) OVER w2, 1) AS avg_prod_value
FROM products
WINDOW w1 AS (PARTITION BY prod_brand),
w2 AS (PARTITION BY prod_cat)
"""
spark.sql(query).show()
+-------+--------+----------+----------+---------------+--------------+
|prod_id|prod_cat|prod_brand|prod_value|avg_brand_value|avg_prod_value|
+-------+--------+----------+----------+---------------+--------------+
| 4|keyboard| logitech| 59.99| 44.99| 60.0|
| 3|keyboard| microsoft| 59.99| 53.32| 60.0|
| 5| mouse| logitech| 29.99| 44.99| 43.3|
| 1| mouse| microsoft| 39.99| 53.32| 43.3|
| 2| mouse| microsoft| 59.99| 53.32| 43.3|
+-------+--------+----------+----------+---------------+--------------+
= Window.partitionBy('prod_cat')
window2
= fn.avg('prod_value').over(window2)
avg2
# Finally, we can it as a classical column
(
products'avg_brand_value', fn.round(avg, 2))
.withColumn('avg_prod_value', fn.round(avg2, 1))
.withColumn(
.show() )
+-------+--------+----------+----------+---------------+--------------+
|prod_id|prod_cat|prod_brand|prod_value|avg_brand_value|avg_prod_value|
+-------+--------+----------+----------+---------------+--------------+
| 4|keyboard| logitech| 59.99| 44.99| 60.0|
| 3|keyboard| microsoft| 59.99| 53.32| 60.0|
| 5| mouse| logitech| 29.99| 44.99| 43.3|
| 1| mouse| microsoft| 39.99| 53.32| 43.3|
| 2| mouse| microsoft| 59.99| 53.32| 43.3|
+-------+--------+----------+----------+---------------+--------------+
(
products'avg_brand_value', fn.round(avg, 2))
.withColumn('avg_prod_value', fn.round(avg2, 1))
.withColumn(
.explain() )
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [prod_id#854, prod_cat#855, prod_brand#856, prod_value#857, avg_brand_value#1195, round(_we0#1203, 1) AS avg_prod_value#1202]
+- Window [avg(prod_value#857) windowspecdefinition(prod_cat#855, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#1203], [prod_cat#855]
+- Sort [prod_cat#855 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(prod_cat#855, 200), ENSURE_REQUIREMENTS, [plan_id=2256]
+- Project [prod_id#854, prod_cat#855, prod_brand#856, prod_value#857, round(_we0#1196, 2) AS avg_brand_value#1195]
+- Window [avg(prod_value#857) windowspecdefinition(prod_brand#856, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#1196], [prod_brand#856]
+- Sort [prod_brand#856 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(prod_brand#856, 200), ENSURE_REQUIREMENTS, [plan_id=2251]
+- Scan ExistingRDD[prod_id#854,prod_cat#855,prod_brand#856,prod_value#857]
spark.sql(query).explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [prod_id#854, prod_cat#855, prod_brand#856, prod_value#857, round(_we0#1214, 2) AS avg_brand_value#1210, round(_we1#1215, 1) AS avg_prod_value#1211]
+- Window [avg(prod_value#857) windowspecdefinition(prod_cat#855, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we1#1215], [prod_cat#855]
+- Sort [prod_cat#855 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(prod_cat#855, 200), ENSURE_REQUIREMENTS, [plan_id=2280]
+- Window [avg(prod_value#857) windowspecdefinition(prod_brand#856, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#1214], [prod_brand#856]
+- Sort [prod_brand#856 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(prod_brand#856, 200), ENSURE_REQUIREMENTS, [plan_id=2276]
+- Scan ExistingRDD[prod_id#854,prod_cat#855,prod_brand#856,prod_value#857]
from pyspark.sql import Window
from pyspark.sql import functions as fn
# The window can be defined on multiple columns
= Window.partitionBy('prod_brand', 'prod_cat')
window
= fn.avg('prod_value').over(window)
avg
'avg_value', fn.round(avg, 2)).show() products.withColumn(
+-------+--------+----------+----------+---------+
|prod_id|prod_cat|prod_brand|prod_value|avg_value|
+-------+--------+----------+----------+---------+
| 4|keyboard| logitech| 59.99| 59.99|
| 5| mouse| logitech| 29.99| 29.99|
| 3|keyboard| microsoft| 59.99| 59.99|
| 1| mouse| microsoft| 39.99| 49.99|
| 2| mouse| microsoft| 59.99| 49.99|
+-------+--------+----------+----------+---------+
= spark.createDataFrame([
purchases 2017, 11, 1), 'mouse'),
(date(2017, 11, 2), 'mouse'),
(date(2017, 11, 4), 'keyboard'),
(date(2017, 11, 6), 'keyboard'),
(date(2017, 11, 9), 'keyboard'),
(date(2017, 11, 12), 'mouse'),
(date(2017, 11, 18), 'keyboard')
(date('date', 'prod_cat'])
], [
purchases.show()
= Window.partitionBy('prod_cat').orderBy('date')
window
= fn.lag('date', 1).over(window)
prev_purch = fn.lead('date', 1).over(window)
next_purch
\
purchases'prev', prev_purch)\
.withColumn('next', next_purch)\
.withColumn('prod_cat', 'date')\
.orderBy( .show()
+----------+--------+
| date|prod_cat|
+----------+--------+
|2017-11-01| mouse|
|2017-11-02| mouse|
|2017-11-04|keyboard|
|2017-11-06|keyboard|
|2017-11-09|keyboard|
|2017-11-12| mouse|
|2017-11-18|keyboard|
+----------+--------+
+----------+--------+----------+----------+
| date|prod_cat| prev| next|
+----------+--------+----------+----------+
|2017-11-04|keyboard| NULL|2017-11-06|
|2017-11-06|keyboard|2017-11-04|2017-11-09|
|2017-11-09|keyboard|2017-11-06|2017-11-18|
|2017-11-18|keyboard|2017-11-09| NULL|
|2017-11-01| mouse| NULL|2017-11-02|
|2017-11-02| mouse|2017-11-01|2017-11-12|
|2017-11-12| mouse|2017-11-02| NULL|
+----------+--------+----------+----------+
= spark.createDataFrame([
contestants 'veterans', 'John', 3000),
('veterans', 'Bob', 3200),
('veterans', 'Mary', 4000),
('young', 'Jane', 4000),
('young', 'April', 3100),
('young', 'Alice', 3700),
('young', 'Micheal', 4000),
('category', 'name', 'points'])
], [
contestants.show()
+--------+-------+------+
|category| name|points|
+--------+-------+------+
|veterans| John| 3000|
|veterans| Bob| 3200|
|veterans| Mary| 4000|
| young| Jane| 4000|
| young| April| 3100|
| young| Alice| 3700|
| young|Micheal| 4000|
+--------+-------+------+
= Window.partitionBy('category')\
window
.orderBy(contestants.points.desc())
= fn.rank().over(window)
rank = fn.dense_rank().over(window)
dense_rank = fn.row_number().over(window)
row_number
\
contestants'rank', rank)\
.withColumn('dense_rank', dense_rank)\
.withColumn('row_number', row_number)\
.withColumn('category', fn.col('points').desc())\
.orderBy( .show()
+--------+-------+------+----+----------+----------+
|category| name|points|rank|dense_rank|row_number|
+--------+-------+------+----+----------+----------+
|veterans| Mary| 4000| 1| 1| 1|
|veterans| Bob| 3200| 2| 2| 2|
|veterans| John| 3000| 3| 3| 3|
| young| Jane| 4000| 1| 1| 1|
| young|Micheal| 4000| 1| 1| 2|
| young| Alice| 3700| 3| 2| 3|
| young| April| 3100| 4| 3| 4|
+--------+-------+------+----+----------+----------+
# df = spark.read \
# .format("jdbc") \
# .option("url", "jdbc:postgresql://localhost:5432/postgres") \
# .option("dbschema", "imdb")\
# .option("dbtable", "title_basics") \
# .option("user", "postgres") \
# .option("password", "postgres") \
# .option("driver", "org.postgresql.Driver") \
# .load()
# df.printSchema()