Code
import numpy as npSpark RDDimport numpy as npimport os
import sys
import inspect
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executablefrom pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("Spark RDD Course")
sc = SparkContext(conf=conf)25/04/03 15:08:23 WARN Utils: Your hostname, boucheron-Precision-5480 resolves to a loopback address: 127.0.1.1; using 172.23.32.10 instead (on interface eth0)
25/04/03 15:08:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/03 15:08:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
rdd = sc.parallelize(range(64))Note that parallelize takes an optional argument to choose the number of partitions
rdd.getNumPartitions()20
rdd = sc.parallelize(range(1000), 10)
rdd.getNumPartitions()10
maprdd = sc.parallelize([2, 3, 4])
rdd = rdd.map(lambda x: list(range(1, x)))rddPythonRDD[3] at RDD at PythonRDD.scala:53
(
sc.parallelize([2, 3, 4])
.map(lambda x: list(range(1, x)))
)PythonRDD[5] at RDD at PythonRDD.scala:53
map is a transformation. It is lazily evaluated. Hence execution is delayed until an action is met in the DAG).
rdd.collect() # collect is an action [Stage 0:> (0 + 20) / 20]
[[1], [1, 2], [1, 2, 3]]
(
sc.parallelize([2, 3, 4])
.map(lambda x: list(range(1, x)))
.collect()
)[[1], [1, 2], [1, 2, 3]]
map with a methodWarning. This example is a bad practice !!! Don’t do this at home
dbtel = {'arthur': 1234, 'riad': 4567, 'anatole': 3615}class TelephoneDB(object):
def __init__(self):
self.tel = {'arthur': 1234, 'riad': 4567, 'anatole': 3615}
def add_tel(self, name):
return name, self.tel.get(name)tel_db = TelephoneDB()
names = ['arthur', 'riad']rdd = (
sc
.parallelize(names)
.map(tel_db.add_tel)
.collect()
)
rdd[('arthur', 1234), ('riad', 4567)]
tel dictionary by a defaultdict with default number 999rdd containing names as above including an unknown one, and try itfrom collections import defaultdict
class TelephoneDefaultDB(object):
def __init__(self):
self.tel = defaultdict(lambda: 999, {'arthur': 1234, 'riad': 4567, 'anatole': 3615})
def add_tel(self, name):
return name, self.tel[name]
def add_tel_rdd(self, rdd):
return rdd.map(self.add_tel)tel_db = TelephoneDefaultDB()
names = ['riad', 'anatole', 'yiyang']
rdd = (
sc
.parallelize(names)
.map(tel_db.add_tel)
.collect()
)
rdd[('riad', 4567), ('anatole', 3615), ('yiyang', 999)]
It is a bad idea to pass methods to spark’s map. Since add_tel needs self, the whole object is serialized so that spark can use it.
This breaks if the tel is large, or if it is not serializable.
flatMaprdd = sc.parallelize([2, 3, 4, 5])
(
rdd
.flatMap(lambda x: range(1, x))
.collect()
)[1, 1, 2, 1, 2, 3, 1, 2, 3, 4]
filterrdd = sc.parallelize(range(10))
rdd\
.filter(lambda x: x % 2 == 0)\
.collect()[0, 2, 4, 6, 8]
distinctrdd = sc.parallelize([1, 1, 4, 2, 1, 3, 3])
rdd.distinct().collect()[1, 2, 3, 4]
rdd1 = sc.parallelize(range(5))
rdd2 = sc.parallelize(range(3, 9))
rdd3 = rdd1.union(rdd2)
rdd3.collect()[0, 1, 2, 3, 4, 3, 4, 5, 6, 7, 8]
rdd3.distinct().collect()[0, 1, 2, 3, 4, 5, 6, 7, 8]
rdd1 = sc.parallelize([1, 2])
rdd2 = sc.parallelize(["a", "b"])
rdd1.cartesian(rdd2).collect()[(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]
collect is obviously an action…
count, countByValuerdd = sc.parallelize([1, 3, 1, 2, 2, 2])
rdd.count()6
rdd.countByValue()defaultdict(int, {1: 2, 3: 1, 2: 3})
Why does countByValue() returns a dictionary?
Are count() and countByValue() actions or transformations?
u = np.int32((np.random.sample(100000) * 100000)) # 100000 random integers uniformly distributed on 0, ..., 100000
p = (
sc.parallelize(u)
.countByValue()
)
q = sorted(
p.items(),
key = lambda x : x[1],
reverse=True
)
q[0:10]
q[0], 1 + np.log(len(u))/ np.log(np.log(len(u))), len(q)((np.int32(87583), 8), np.float64(5.711710714547694), 63259)
u ?from scipy.stats import poisson
(
len(q),
(1-np.exp(-1)) * len(u),
poisson.ppf(1.-1./len(u), 1)
)(63259, np.float64(63212.05588285577), np.float64(8.0))
take, takeOrderedrdd = sc.parallelize([(3, 'a'), (1, 'b'), (2, 'd')])(1, 'b') <= (2, 'd') <= (3, 'a')True
rdd.takeOrdered(2)[(1, 'b'), (2, 'd')]
rdd.takeOrdered(2, key=lambda x: x[1])[(3, 'a'), (1, 'b')]
reduce, foldrdd = sc.range(1, 4)
rdd.reduce(lambda a, b: a + b)6
rdd = sc.range(1, 4, numSlices=7)
rdd.reduce(lambda a, b: a + b)6
rdd = sc.parallelize(range(1,4), 3)
rdd.reduce(lambda a, b: a + b)6
(
sc.parallelize(range(1, 4), 2)
.fold(0, lambda a, b: a + b)
)6
(
sc.parallelize(range(1, 4), 1)
.fold(3, lambda a, b: a + b)
),(
sc.parallelize(range(1, 4), 2)
.fold(2, lambda a, b: a + b)
)(12, 12)
rdd = sc.parallelize(range(1, 4),3)
(
rdd.fold(1, lambda a, b: a + b),
rdd.getNumPartitions()
)(10, 3)
rdd = sc.parallelize(range(1, 4),4)
(
rdd.fold(1, lambda a, b: a + b),
rdd.getNumPartitions()
)(11, 4)
rdd = sc.parallelize([1, 2, 4], 2)
rdd.fold(2, lambda a, b: a + b)13
rdd = sc.parallelize([1, 2, 4], 3)
rdd.fold(2, lambda a, b: a + b)15
rdd.getNumPartitions()3
aggregateseqOp = lambda x, y: (x[0] + y, x[1] + 1)
combOp = lambda x, y: (x[0] + y[0], x[1] + y[1])
rdd = sc.parallelize([1, 2, 3, 4], 8)
(
rdd.aggregate((0, 0), seqOp, combOp), rdd.getNumPartitions()
)((10, 4), 8)
op = lambda x, y: x+y
rdd = sc.parallelize([1, 2, 3, 4], 4)
(
rdd.aggregate(0, op, op),
rdd.getNumPartitions()
)(10, 4)
aggregateaggregate, compute the sum, the sum of squares \(x^2\) and the sum of cubes \(x^3\) for \(x \in \{1, \ldots, 10 \}\).numpyseqOp = lambda x, y: (x[0] + y, x[1] + y ** 2, x[2] + y ** 3)combOp = lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2])sc.range(5)PythonRDD[68] at RDD at PythonRDD.scala:53
(
sc
.range(1, 11)
.aggregate((0, 0, 0), seqOp, combOp)
)(55, 385, 3025)
import numpy as np
x = np.arange(1, 11)
xarray([ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
x.sum(), (x**2).sum(), (x**3).sum(), x.cumsum()(np.int64(55),
np.int64(385),
np.int64(3025),
array([ 1, 3, 6, 10, 15, 21, 28, 36, 45, 55]))
aggregateAssume a sample is stored as a RDD. Using aggregate, compute the sample variance \(\frac{1}{n}\sum_{i=1}^n (x_i - \overline{X}_n)^2\) where \(\overline{X}_n = \frac{1}{n} \sum_{i=1}^n x_i\)
PairRDDrdd = sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
rdd.collect() # not yet [[1, 'a', 7], [2, 'b', 13], [2, 'c', 17]]
rdd = rdd.map(lambda x: (x[0], x[1:]))
rdd.collect() # done [(1, ['a', 7]), (2, ['b', 13]), (2, ['c', 17])]
keys, valuesrdd.keys().collect()[1, 2, 2]
rdd.values().collect()[['a', 7], ['b', 13], ['c', 17]]
All elements must be tuples with two elements (key and value)
rdd = sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
rdd.keys().collect()[1, 2, 2]
rdd.values().collect()['a', 'b', 'c']
The values are not what we expected wrong… so we must do
rdd = ( sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
.map(lambda x: (x[0], x[1:]))
)
rdd.keys().collect()[1, 2, 2]
rdd.values().collect()[['a', 7], ['b', 13], ['c', 17]]
Now, the values are correct.
mapValues, flatMapValuesrdd = sc.parallelize([("a", "x y z"), ("b", "p r")])
rdd.mapValues(lambda v: v.split(' ')).collect(), rdd.collect()([('a', ['x', 'y', 'z']), ('b', ['p', 'r'])], [('a', 'x y z'), ('b', 'p r')])
rdd.flatMapValues(lambda v: v.split(' ')).collect()[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
groupByKeyrdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("b", 3), ("c", 42)])
(
rdd.groupByKey()
.mapValues(list)
.collect()
)[('b', [1, 3]), ('c', [42]), ('a', [1, 1])]
rdd.groupByKey().collect()[('b', <pyspark.resultiterable.ResultIterable at 0x7058255e9520>),
('c', <pyspark.resultiterable.ResultIterable at 0x7058256348f0>),
('a', <pyspark.resultiterable.ResultIterable at 0x7058256349b0>)]
reduceByKeyrdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.reduceByKey(lambda a, b: a + b).collect()[('b', 1), ('a', 2)]
combineByKeyrdd = sc.parallelize([('a', 1), ('b', 2), ('a', 13)])
def add(a, b):
return a + str(b)
rdd.combineByKey(str, add, add).collect()[('b', '2'), ('a', '113')]
join, rightOuterJoin, leftOuterJoinemployees = sc.parallelize([
(31, "Rafferty"),
(33, "Jones"),
(33, "Heisenberg"),
(34, "Robinson"),
(34, "Smith"),
(None, "Williams")
])departments = sc.parallelize([
(31, "Sales"),
(33, "Engineering"),
(34, "Clerical"),
(35, "Marketing")
])(
employees
.join(departments)
.sortByKey()
.collect()
)[(31, ('Rafferty', 'Sales')),
(33, ('Jones', 'Engineering')),
(33, ('Heisenberg', 'Engineering')),
(34, ('Robinson', 'Clerical')),
(34, ('Smith', 'Clerical'))]
(
employees
.rightOuterJoin(departments)
.sortByKey()
.collect()
)[(31, ('Rafferty', 'Sales')),
(33, ('Jones', 'Engineering')),
(33, ('Heisenberg', 'Engineering')),
(34, ('Robinson', 'Clerical')),
(34, ('Smith', 'Clerical')),
(35, (None, 'Marketing'))]
(
employees
.leftOuterJoin(departments)
.collect()
)[(None, ('Williams', None)),
(31, ('Rafferty', 'Sales')),
(33, ('Jones', 'Engineering')),
(33, ('Heisenberg', 'Engineering')),
(34, ('Robinson', 'Clerical')),
(34, ('Smith', 'Clerical'))]
employees.countByKey()defaultdict(int, {31: 1, 33: 2, 34: 2, None: 1})
employees.lookup(33)['Jones', 'Heisenberg']
employees.lookup(None)['Williams']
employees.collectAsMap(){31: 'Rafferty', 33: 'Heisenberg', 34: 'Smith', None: 'Williams'}