Code
import numpy as np
Spark
RDDimport numpy as np
import os
import sys
import inspect
'PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable os.environ[
from pyspark import SparkConf, SparkContext
= SparkConf().setAppName("Spark RDD Course")
conf = SparkContext(conf=conf) sc
25/01/18 14:17:53 WARN Utils: Your hostname, boucheron-Precision-5480 resolves to a loopback address: 127.0.1.1; using 192.168.10.120 instead (on interface wlp0s20f3)
25/01/18 14:17:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/18 14:17:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
= sc.parallelize(range(64)) rdd
Note that parallelize
takes an optional argument to choose the number of partitions
rdd.getNumPartitions()
20
= sc.parallelize(range(1000), 10)
rdd rdd.getNumPartitions()
10
map
= sc.parallelize([2, 3, 4])
rdd = rdd.map(lambda x: list(range(1, x))) rdd
rdd
PythonRDD[3] at RDD at PythonRDD.scala:53
(2, 3, 4])
sc.parallelize([map(lambda x: list(range(1, x)))
. )
PythonRDD[5] at RDD at PythonRDD.scala:53
map
is a transformation. It is lazily evaluated. Hence execution is delayed until an action is met in the DAG).
# collect is an action rdd.collect()
[Stage 0:> (0 + 20) / 20]
[[1], [1, 2], [1, 2, 3]]
(2, 3, 4])
sc.parallelize([map(lambda x: list(range(1, x)))
.
.collect() )
[[1], [1, 2], [1, 2, 3]]
map
with a methodWarning. This example is a bad practice !!! Don’t do this at home
= {'arthur': 1234, 'riad': 4567, 'anatole': 3615} dbtel
class TelephoneDB(object):
def __init__(self):
self.tel = {'arthur': 1234, 'riad': 4567, 'anatole': 3615}
def add_tel(self, name):
return name, self.tel.get(name)
= TelephoneDB()
tel_db = ['arthur', 'riad'] names
= sc.parallelize(names).map(tel_db.add_tel).collect()
rdd rdd
[('arthur', 1234), ('riad', 4567)]
tel
dictionary by a defaultdict
with default number 999
rdd
containing names as above including an unknown one, and try itfrom collections import defaultdict
class TelephoneDefaultDB(object):
def __init__(self):
self.tel = defaultdict(lambda: 999, {'arthur': 1234, 'riad': 4567, 'anatole': 3615})
def add_tel(self, name):
return name, self.tel[name]
def add_tel_rdd(self, rdd):
return rdd.map(self.add_tel)
= TelephoneDefaultDB()
tel_db = ['riad', 'anatole', 'yiyang']
names = sc.parallelize(names).map(tel_db.add_tel).collect()
rdd rdd
[('riad', 4567), ('anatole', 3615), ('yiyang', 999)]
Warning. Once again, this is a bad idea to pass class methods to spark’s map
. Since add_tel
needs self
, the whole object is serialized so that spark
can use it. This breaks if the tel
is large, or if it is not serializable.
flatMap
= sc.parallelize([2, 3, 4, 5])
rdd lambda x: range(1, x)).collect() rdd.flatMap(
[1, 1, 2, 1, 2, 3, 1, 2, 3, 4]
filter
= sc.parallelize(range(10))
rdd filter(lambda x: x % 2 == 0).collect() rdd.
[0, 2, 4, 6, 8]
distinct
= sc.parallelize([1, 1, 4, 2, 1, 3, 3])
rdd rdd.distinct().collect()
[1, 2, 3, 4]
= sc.parallelize(range(5))
rdd1 = sc.parallelize(range(3, 9))
rdd2 = rdd1.union(rdd2)
rdd3 rdd3.collect()
[0, 1, 2, 3, 4, 3, 4, 5, 6, 7, 8]
rdd3.distinct().collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8]
= sc.parallelize([1, 2])
rdd1 = sc.parallelize(["a", "b"])
rdd2 rdd1.cartesian(rdd2).collect()
[(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]
Well, collect
is obviously an action…
count
, countByValue
= sc.parallelize([1, 3, 1, 2, 2, 2])
rdd rdd.count()
6
rdd.countByValue()
defaultdict(int, {1: 2, 3: 1, 2: 3})
Why does countByValue()
returns de dictionary?
Are count()
and countByValue()
actions or transformations?
= np.int32((np.random.sample(100000) * 100000)) # 100000 random integers uniformly distributed on 0, ..., 100000
u
= (
p
sc.parallelize(u)
.countByValue()
)
= sorted(p.items(), key = lambda x : x[1], reverse=True)
q
0]
q[
# q[0], 1 + np.log(len(u))/ np.log(np.log(len(u))), len(q)
(np.int32(13647), 8)
u
?len(q), (1-np.exp(-1)) * len(u), 1 + np.log(len(u))/ np.log(np.log(len(u))), np.log(len(u))
(63130,
np.float64(63212.05588285577),
np.float64(5.711710714547694),
np.float64(11.512925464970229))
take
, takeOrdered
= sc.parallelize([(3, 'a'), (1, 'b'), (2, 'd')]) rdd
1, 'b') <= (2, 'd') <= (3, 'a') (
True
2) rdd.takeOrdered(
[(1, 'b'), (2, 'd')]
2, key=lambda x: x[1]) rdd.takeOrdered(
[(3, 'a'), (1, 'b')]
reduce
, fold
= sc.range(1, 4)
rdd reduce(lambda a, b: a + b) rdd.
6
= sc.range(1, 4, numSlices=7)
rdd reduce(lambda a, b: a + b) rdd.
6
= sc.parallelize(range(1,4), 3)
rdd reduce(lambda a, b: a + b) rdd.
6
( range(1, 4), 2)
sc.parallelize(0, lambda a, b: a + b)
.fold( )
6
( range(1, 4), 1)
sc.parallelize(3, lambda a, b: a + b)
.fold(
),( range(1, 4), 2)
sc.parallelize(2, lambda a, b: a + b)
.fold( )
(12, 12)
= sc.parallelize(range(1, 4),3)
rdd 1, lambda a, b: a + b), rdd.getNumPartitions() rdd.fold(
(10, 3)
= sc.parallelize(range(1, 4),4)
rdd 1, lambda a, b: a + b), rdd.getNumPartitions() rdd.fold(
(11, 4)
= sc.parallelize([1, 2, 4], 2)
rdd 2, lambda a, b: a + b) rdd.fold(
13
= sc.parallelize([1, 2, 4], 3)
rdd 2, lambda a, b: a + b) rdd.fold(
15
rdd.getNumPartitions()
3
aggregate
= lambda x, y: (x[0] + y, x[1] + 1)
seqOp = lambda x, y: (x[0] + y[0], x[1] + y[1])
combOp
= sc.parallelize([1, 2, 3, 4], 8)
rdd 0, 0), seqOp, combOp), rdd.getNumPartitions() rdd.aggregate((
((10, 4), 8)
= lambda x, y: x+y
op = sc.parallelize([1, 2, 3, 4], 4)
rdd 0, op, op), rdd.getNumPartitions() rdd.aggregate(
(10, 4)
aggregate
aggregate
, compute the sum, the sum of squares \(x^2\) and the sum of \(x^3\) for \(x \in \{1, \ldots, 10 \}\).numpy
= lambda x, y: (x[0] + y, x[1] + y ** 2, x[2] + y ** 3) seqOp
= lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2]) combOp
range(5) sc.
PythonRDD[68] at RDD at PythonRDD.scala:53
range(1, 11).aggregate((0, 0, 0), seqOp, combOp) sc.
(55, 385, 3025)
import numpy as np
= np.arange(1, 11)
x x
array([ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
sum(), (x**2).sum(), (x**3).sum(), x.cumsum() x.
(np.int64(55),
np.int64(385),
np.int64(3025),
array([ 1, 3, 6, 10, 15, 21, 28, 36, 45, 55]))
aggregate
Assume a sample is stored as a RDD. Using aggregate
, compute the sample variance \(\frac{1}{n}\sum_{i=1}^n (x_i - \overline{X}_n)^2\) where \(\overline{X}_n = \frac{1}{n} \sum_{i=1}^n x_i\)
PairRDD
= sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
rdd
# not yet rdd.collect()
[[1, 'a', 7], [2, 'b', 13], [2, 'c', 17]]
= rdd.map(lambda x: (x[0], x[1:]))
rdd
# done rdd.collect()
[(1, ['a', 7]), (2, ['b', 13]), (2, ['c', 17])]
keys
, values
rdd.keys().collect()
[1, 2, 2]
rdd.values().collect()
[['a', 7], ['b', 13], ['c', 17]]
Warning. All elements must be tuples with two elements (the key and the value)
= sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
rdd rdd.keys().collect()
[1, 2, 2]
rdd.values().collect()
['a', 'b', 'c']
The values are not what we expected wrong… so we must do
= ( sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
rdd map(lambda x: (x[0], x[1:]))
.
) rdd.keys().collect()
[1, 2, 2]
rdd.values().collect()
[['a', 7], ['b', 13], ['c', 17]]
Now the values are correct.
mapValues
, flatMapValues
= sc.parallelize([("a", "x y z"), ("b", "p r")])
rdd
lambda v: v.split(' ')).collect(), rdd.collect() rdd.mapValues(
([('a', ['x', 'y', 'z']), ('b', ['p', 'r'])], [('a', 'x y z'), ('b', 'p r')])
lambda v: v.split(' ')).collect() rdd.flatMapValues(
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
groupByKey
= sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("b", 3), ("c", 42)])
rdd
(
rdd.groupByKey()list)
.mapValues(
.collect() )
[('b', [1, 3]), ('c', [42]), ('a', [1, 1])]
rdd.groupByKey().collect()
[('b', <pyspark.resultiterable.ResultIterable at 0x7759bdd84bf0>),
('c', <pyspark.resultiterable.ResultIterable at 0x7759bdd868a0>),
('a', <pyspark.resultiterable.ResultIterable at 0x7759bdd86f30>)]
reduceByKey
= sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd lambda a, b: a + b).collect() rdd.reduceByKey(
[('b', 1), ('a', 2)]
combineByKey
= sc.parallelize([('a', 1), ('b', 2), ('a', 13)])
rdd
def add(a, b):
return a + str(b)
str, add, add).collect() rdd.combineByKey(
[('b', '2'), ('a', '113')]
join
, rightOuterJoin
, leftOuterJoin
= sc.parallelize([
employees 31, "Rafferty"),
(33, "Jones"),
(33, "Heisenberg"),
(34, "Robinson"),
(34, "Smith"),
(None, "Williams")
( ])
= sc.parallelize([
departments 31, "Sales"),
(33, "Engineering"),
(34, "Clerical"),
(35, "Marketing")
( ])
employees.join(departments).sortByKey().collect()
[(31, ('Rafferty', 'Sales')),
(33, ('Jones', 'Engineering')),
(33, ('Heisenberg', 'Engineering')),
(34, ('Robinson', 'Clerical')),
(34, ('Smith', 'Clerical'))]
employees.rightOuterJoin(departments).sortByKey().collect()
[(31, ('Rafferty', 'Sales')),
(33, ('Jones', 'Engineering')),
(33, ('Heisenberg', 'Engineering')),
(34, ('Robinson', 'Clerical')),
(34, ('Smith', 'Clerical')),
(35, (None, 'Marketing'))]
employees.leftOuterJoin(departments).collect()
[(None, ('Williams', None)),
(31, ('Rafferty', 'Sales')),
(33, ('Jones', 'Engineering')),
(33, ('Heisenberg', 'Engineering')),
(34, ('Robinson', 'Clerical')),
(34, ('Smith', 'Clerical'))]
employees.countByKey()
defaultdict(int, {31: 1, 33: 2, 34: 2, None: 1})
33) employees.lookup(
['Jones', 'Heisenberg']
None) employees.lookup(
['Williams']
employees.collectAsMap()
{31: 'Rafferty', 33: 'Heisenberg', 34: 'Smith', None: 'Williams'}