Introduction to Spark RDD

Code
import numpy as np
Code
import os
import sys
import inspect

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
Code
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("Spark RDD Course")
sc = SparkContext(conf=conf)
25/01/18 14:17:53 WARN Utils: Your hostname, boucheron-Precision-5480 resolves to a loopback address: 127.0.1.1; using 192.168.10.120 instead (on interface wlp0s20f3)
25/01/18 14:17:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/18 14:17:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Code
rdd = sc.parallelize(range(64))

Note that parallelize takes an optional argument to choose the number of partitions

Code
rdd.getNumPartitions()
20
Code
rdd = sc.parallelize(range(1000), 10)
rdd.getNumPartitions()
10

Transformations

map

Code
rdd = sc.parallelize([2, 3, 4])
rdd = rdd.map(lambda x: list(range(1, x)))
Code
rdd
PythonRDD[3] at RDD at PythonRDD.scala:53
Code
(
    sc.parallelize([2, 3, 4])
      .map(lambda x: list(range(1, x)))
)
PythonRDD[5] at RDD at PythonRDD.scala:53

map is a transformation. It is lazily evaluated. Hence execution is delayed until an action is met in the DAG).

Code
rdd.collect()  # collect is an action 
[Stage 0:>                                                        (0 + 20) / 20]                                                                                
[[1], [1, 2], [1, 2, 3]]
Code
(
    sc.parallelize([2, 3, 4])
      .map(lambda x: list(range(1, x)))
      .collect()
)
[[1], [1, 2], [1, 2, 3]]

Exercice: map with a method

Warning. This example is a bad practice !!! Don’t do this at home

Code
dbtel = {'arthur': 1234, 'riad': 4567, 'anatole': 3615}
Code
class TelephoneDB(object):
    
    def __init__(self):
        self.tel = {'arthur': 1234, 'riad': 4567, 'anatole': 3615}
   
    def add_tel(self, name):
        return name, self.tel.get(name)
Code
tel_db = TelephoneDB()
names = ['arthur', 'riad']
Code
rdd = sc.parallelize(names).map(tel_db.add_tel).collect()
rdd
[('arthur', 1234), ('riad', 4567)]
  • Replace the tel dictionary by a defaultdict with default number 999
  • Use it on a rdd containing names as above including an unknown one, and try it
Code
from collections import defaultdict

class TelephoneDefaultDB(object):
    
    def __init__(self):
        self.tel = defaultdict(lambda: 999, {'arthur': 1234, 'riad': 4567, 'anatole': 3615})
    
    def add_tel(self, name):
        return name, self.tel[name]
    
    def add_tel_rdd(self, rdd):  
        return rdd.map(self.add_tel)
Code
tel_db = TelephoneDefaultDB()
names = ['riad', 'anatole', 'yiyang']
rdd = sc.parallelize(names).map(tel_db.add_tel).collect()
rdd
[('riad', 4567), ('anatole', 3615), ('yiyang', 999)]

Warning. Once again, this is a bad idea to pass class methods to spark’s map. Since add_tel needs self, the whole object is serialized so that spark can use it. This breaks if the tel is large, or if it is not serializable.

flatMap

Code
rdd = sc.parallelize([2, 3, 4, 5])
rdd.flatMap(lambda x: range(1, x)).collect()
[1, 1, 2, 1, 2, 3, 1, 2, 3, 4]

filter

Code
rdd = sc.parallelize(range(10))
rdd.filter(lambda x: x % 2 == 0).collect()
[0, 2, 4, 6, 8]

distinct

Code
rdd = sc.parallelize([1, 1, 4, 2, 1, 3, 3])
rdd.distinct().collect()
[1, 2, 3, 4]

“Pseudo-set” operations

Code
rdd1 = sc.parallelize(range(5))
rdd2 = sc.parallelize(range(3, 9))
rdd3 = rdd1.union(rdd2)
rdd3.collect()
[0, 1, 2, 3, 4, 3, 4, 5, 6, 7, 8]
Code
rdd3.distinct().collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8]
Code
rdd1 = sc.parallelize([1, 2])
rdd2 = sc.parallelize(["a", "b"])
rdd1.cartesian(rdd2).collect()
[(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]

Actions

Well, collect is obviously an action…

count, countByValue

Code
rdd = sc.parallelize([1, 3, 1, 2, 2, 2])
rdd.count()
6
Code
rdd.countByValue()
defaultdict(int, {1: 2, 3: 1, 2: 3})

Why does countByValue() returns de dictionary?

Are count() and countByValue() actions or transformations?

Code
u = np.int32((np.random.sample(100000) * 100000))  # 100000 random integers uniformly distributed on 0, ..., 100000

p = (
    sc.parallelize(u)
    .countByValue()
)

q = sorted(p.items(), key = lambda x : x[1], reverse=True)

q[0]

# q[0], 1 + np.log(len(u))/ np.log(np.log(len(u))), len(q)
(np.int32(13647), 8)
  • How many distinct values do you expect in u ?
  • How large is the largest value in \(q\) ?
Code
len(q), (1-np.exp(-1)) * len(u), 1 + np.log(len(u))/ np.log(np.log(len(u))), np.log(len(u))
(63130,
 np.float64(63212.05588285577),
 np.float64(5.711710714547694),
 np.float64(11.512925464970229))

take, takeOrdered

Code
rdd = sc.parallelize([(3, 'a'), (1, 'b'), (2, 'd')])
Code
(1, 'b') <=  (2, 'd') <= (3, 'a')
True
Code
rdd.takeOrdered(2)
[(1, 'b'), (2, 'd')]
Code
rdd.takeOrdered(2, key=lambda x: x[1])
[(3, 'a'), (1, 'b')]

reduce, fold

Code
rdd = sc.range(1, 4)
rdd.reduce(lambda a, b: a + b)
6
Code
rdd = sc.range(1, 4, numSlices=7)
rdd.reduce(lambda a, b: a + b)
6
Code
rdd = sc.parallelize(range(1,4), 3)
rdd.reduce(lambda a, b: a + b)
6
Code
( 
    sc.parallelize(range(1, 4), 2)
      .fold(0, lambda a, b: a + b)
)
6
Code
( 
    sc.parallelize(range(1, 4), 1)
      .fold(3, lambda a, b: a + b)
),( 
    sc.parallelize(range(1, 4), 2)
      .fold(2, lambda a, b: a + b)
)
(12, 12)
Code
rdd =  sc.parallelize(range(1, 4),3)
rdd.fold(1, lambda a, b: a + b), rdd.getNumPartitions()
(10, 3)
Code
rdd =  sc.parallelize(range(1, 4),4)
rdd.fold(1, lambda a, b: a + b), rdd.getNumPartitions()
(11, 4)
Code
rdd = sc.parallelize([1, 2, 4], 2)
rdd.fold(2, lambda a, b: a + b)
13
Code
rdd = sc.parallelize([1, 2, 4], 3)
rdd.fold(2, lambda a, b: a + b)
15
Code
rdd.getNumPartitions()
3

aggregate

Code
seqOp = lambda x, y: (x[0] + y, x[1] + 1)
combOp = lambda x, y: (x[0] + y[0], x[1] + y[1])

rdd = sc.parallelize([1, 2, 3, 4], 8)
rdd.aggregate((0, 0), seqOp, combOp), rdd.getNumPartitions()
((10, 4), 8)
Code
op = lambda x, y: x+y
rdd = sc.parallelize([1, 2, 3, 4], 4)
rdd.aggregate(0, op, op), rdd.getNumPartitions()
(10, 4)

Exercice: sum of powers with aggregate

  • Using aggregate, compute the sum, the sum of squares \(x^2\) and the sum of \(x^3\) for \(x \in \{1, \ldots, 10 \}\).
  • Check your computations using numpy
Code
seqOp = lambda x, y: (x[0] + y, x[1] + y ** 2, x[2] + y ** 3)
Code
combOp = lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2])
Code
sc.range(5)
PythonRDD[68] at RDD at PythonRDD.scala:53
Code
sc.range(1, 11).aggregate((0, 0, 0), seqOp, combOp)
(55, 385, 3025)
Code
import numpy as np

x = np.arange(1, 11)
x
array([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10])
Code
x.sum(), (x**2).sum(), (x**3).sum(), x.cumsum()
(np.int64(55),
 np.int64(385),
 np.int64(3025),
 array([ 1,  3,  6, 10, 15, 21, 28, 36, 45, 55]))

Computing an empirical variance with aggregate

Assume a sample is stored as a RDD. Using aggregate, compute the sample variance \(\frac{1}{n}\sum_{i=1}^n (x_i - \overline{X}_n)^2\) where \(\overline{X}_n = \frac{1}{n} \sum_{i=1}^n x_i\)

PairRDD

Code
rdd = sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])

rdd.collect()  # not yet 
[[1, 'a', 7], [2, 'b', 13], [2, 'c', 17]]
Code
rdd = rdd.map(lambda x: (x[0], x[1:]))

rdd.collect()  # done 
[(1, ['a', 7]), (2, ['b', 13]), (2, ['c', 17])]

Transformations

keys, values

Code
rdd.keys().collect()
[1, 2, 2]
Code
rdd.values().collect()
[['a', 7], ['b', 13], ['c', 17]]

Warning. All elements must be tuples with two elements (the key and the value)

Code
rdd = sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
rdd.keys().collect()
[1, 2, 2]
Code
rdd.values().collect()
['a', 'b', 'c']

The values are not what we expected wrong… so we must do

Code
rdd = ( sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
          .map(lambda x: (x[0], x[1:]))
      )
rdd.keys().collect()
[1, 2, 2]
Code
rdd.values().collect()
[['a', 7], ['b', 13], ['c', 17]]

Now the values are correct.

mapValues, flatMapValues

Code
rdd = sc.parallelize([("a", "x y z"), ("b", "p r")])

rdd.mapValues(lambda v: v.split(' ')).collect(), rdd.collect()
([('a', ['x', 'y', 'z']), ('b', ['p', 'r'])], [('a', 'x y z'), ('b', 'p r')])
Code
rdd.flatMapValues(lambda v: v.split(' ')).collect()
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

groupByKey

Code
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("b", 3), ("c", 42)])
( 
    rdd.groupByKey()
       .mapValues(list)
       .collect()
)
[('b', [1, 3]), ('c', [42]), ('a', [1, 1])]
Code
rdd.groupByKey().collect()
[('b', <pyspark.resultiterable.ResultIterable at 0x7759bdd84bf0>),
 ('c', <pyspark.resultiterable.ResultIterable at 0x7759bdd868a0>),
 ('a', <pyspark.resultiterable.ResultIterable at 0x7759bdd86f30>)]

reduceByKey

Code
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.reduceByKey(lambda a, b: a + b).collect()
[('b', 1), ('a', 2)]

combineByKey

Code
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 13)])

def add(a, b): 
    return a + str(b)

rdd.combineByKey(str, add, add).collect()
[('b', '2'), ('a', '113')]

join, rightOuterJoin, leftOuterJoin

Code
employees = sc.parallelize([
    (31, "Rafferty"),
    (33, "Jones"),
    (33, "Heisenberg"),
    (34, "Robinson"),
    (34, "Smith"),
    (None, "Williams")
])
Code
departments = sc.parallelize([
    (31, "Sales"),
    (33, "Engineering"),
    (34, "Clerical"),
    (35, "Marketing")
])
Code
employees.join(departments).sortByKey().collect()
[(31, ('Rafferty', 'Sales')),
 (33, ('Jones', 'Engineering')),
 (33, ('Heisenberg', 'Engineering')),
 (34, ('Robinson', 'Clerical')),
 (34, ('Smith', 'Clerical'))]
Code
employees.rightOuterJoin(departments).sortByKey().collect()
[(31, ('Rafferty', 'Sales')),
 (33, ('Jones', 'Engineering')),
 (33, ('Heisenberg', 'Engineering')),
 (34, ('Robinson', 'Clerical')),
 (34, ('Smith', 'Clerical')),
 (35, (None, 'Marketing'))]
Code
employees.leftOuterJoin(departments).collect()
[(None, ('Williams', None)),
 (31, ('Rafferty', 'Sales')),
 (33, ('Jones', 'Engineering')),
 (33, ('Heisenberg', 'Engineering')),
 (34, ('Robinson', 'Clerical')),
 (34, ('Smith', 'Clerical'))]

Actions

Code
employees.countByKey()
defaultdict(int, {31: 1, 33: 2, 34: 2, None: 1})
Code
employees.lookup(33)
['Jones', 'Heisenberg']
Code
employees.lookup(None)
['Williams']
Code
employees.collectAsMap()
{31: 'Rafferty', 33: 'Heisenberg', 34: 'Smith', None: 'Williams'}

References

Spark Core reference