Introduction to Spark RDD

Code
import numpy as np
Code
import os
import sys
import inspect

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
Code
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("Spark RDD Course")
sc = SparkContext(conf=conf)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[75], line 4
      1 from pyspark import SparkConf, SparkContext
      3 conf = SparkConf().setAppName("Spark RDD Course")
----> 4 sc = SparkContext(conf=conf)

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/core/context.py:206, in SparkContext.__init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls, udf_profiler_cls, memory_profiler_cls)
    200 if gateway is not None and gateway.gateway_parameters.auth_token is None:
    201     raise ValueError(
    202         "You are trying to pass an insecure Py4j gateway to Spark. This"
    203         " is not allowed as it is a security risk."
    204     )
--> 206 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
    207 try:
    208     self._do_init(
    209         master,
    210         appName,
   (...)
    220         memory_profiler_cls,
    221     )

File ~/Documents/IFEBY310/.venv/lib/python3.12/site-packages/pyspark/core/context.py:476, in SparkContext._ensure_initialized(cls, instance, gateway, conf)
    473     callsite = SparkContext._active_spark_context._callsite
    475     # Raise error if there is already a running Spark context
--> 476     raise ValueError(
    477         "Cannot run multiple SparkContexts at once; "
    478         "existing SparkContext(app=%s, master=%s)"
    479         " created by %s at %s:%s "
    480         % (
    481             currentAppName,
    482             currentMaster,
    483             callsite.function,
    484             callsite.file,
    485             callsite.linenum,
    486         )
    487     )
    488 else:
    489     SparkContext._active_spark_context = instance

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Spark RDD Course, master=local[*]) created by __init__ at /tmp/ipykernel_120359/4031476840.py:4 
Code
rdd = sc.parallelize(range(64))
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[76], line 1
----> 1 rdd = sc.parallelize(range(64))

NameError: name 'sc' is not defined

Note that parallelize takes an optional argument to choose the number of partitions

Code
rdd.getNumPartitions()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[77], line 1
----> 1 rdd.getNumPartitions()

NameError: name 'rdd' is not defined
Code
rdd = sc.parallelize(range(1000), 10)
rdd.getNumPartitions()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[78], line 1
----> 1 rdd = sc.parallelize(range(1000), 10)
      2 rdd.getNumPartitions()

NameError: name 'sc' is not defined

Transformations

map

Code
rdd = sc.parallelize([2, 3, 4])
rdd = rdd.map(lambda x: list(range(1, x)))
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[79], line 1
----> 1 rdd = sc.parallelize([2, 3, 4])
      2 rdd = rdd.map(lambda x: list(range(1, x)))

NameError: name 'sc' is not defined
Code
rdd
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[80], line 1
----> 1 rdd

NameError: name 'rdd' is not defined
Code
(
    sc.parallelize([2, 3, 4])
      .map(lambda x: list(range(1, x)))
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[81], line 2
      1 (
----> 2     sc.parallelize([2, 3, 4])
      3       .map(lambda x: list(range(1, x)))
      4 )

NameError: name 'sc' is not defined

map is a transformation. It is lazily evaluated. Hence execution is delayed until an action is met in the DAG).

Code
rdd.collect()  # collect is an action 
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[82], line 1
----> 1 rdd.collect()  # collect is an action 

NameError: name 'rdd' is not defined
Code
(
    sc.parallelize([2, 3, 4])
      .map(lambda x: list(range(1, x)))
      .collect()
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[83], line 2
      1 (
----> 2     sc.parallelize([2, 3, 4])
      3       .map(lambda x: list(range(1, x)))
      4       .collect()
      5 )

NameError: name 'sc' is not defined

Exercice: map with a method

Warning. This example is a bad practice !!! Don’t do this at home

Code
dbtel = {'arthur': 1234, 'riad': 4567, 'anatole': 3615}
Code
class TelephoneDB(object):
    
    def __init__(self):
        self.tel = {'arthur': 1234, 'riad': 4567, 'anatole': 3615}
   
    def add_tel(self, name):
        return name, self.tel.get(name)
Code
tel_db = TelephoneDB()
names = ['arthur', 'riad']
Code
rdd = (
    sc
        .parallelize(names)
        .map(tel_db.add_tel)
        .collect()
)

rdd
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[87], line 2
      1 rdd = (
----> 2     sc
      3         .parallelize(names)
      4         .map(tel_db.add_tel)
      5         .collect()
      6 )
      8 rdd

NameError: name 'sc' is not defined
  • Replace the tel dictionary by a defaultdict with default number 999
  • Use it on a rdd containing names as above including an unknown one, and try it
Code
from collections import defaultdict

class TelephoneDefaultDB(object):
    
    def __init__(self):
        self.tel = defaultdict(lambda: 999, {'arthur': 1234, 'riad': 4567, 'anatole': 3615})
    
    def add_tel(self, name):
        return name, self.tel[name]
    
    def add_tel_rdd(self, rdd):  
        return rdd.map(self.add_tel)
Code
tel_db = TelephoneDefaultDB()
names = ['riad', 'anatole', 'yiyang']
rdd = (
    sc
        .parallelize(names)
        .map(tel_db.add_tel)
        .collect()
)
rdd
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[89], line 4
      1 tel_db = TelephoneDefaultDB()
      2 names = ['riad', 'anatole', 'yiyang']
      3 rdd = (
----> 4     sc
      5         .parallelize(names)
      6         .map(tel_db.add_tel)
      7         .collect()
      8 )
      9 rdd

NameError: name 'sc' is not defined
Caution

It is a bad idea to pass methods to spark’s map. Since add_tel needs self, the whole object is serialized so that spark can use it.

This breaks if the tel is large, or if it is not serializable.

flatMap

Code
rdd = sc.parallelize([2, 3, 4, 5])
( 
    rdd
        .flatMap(lambda x: range(1, x))
        .collect()
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[90], line 1
----> 1 rdd = sc.parallelize([2, 3, 4, 5])
      2 ( 
      3     rdd
      4         .flatMap(lambda x: range(1, x))
      5         .collect()
      6 )

NameError: name 'sc' is not defined

filter

Code
rdd = sc.parallelize(range(10))

rdd\
    .filter(lambda x: x % 2 == 0)\
    .collect()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[91], line 1
----> 1 rdd = sc.parallelize(range(10))
      3 rdd\
      4     .filter(lambda x: x % 2 == 0)\
      5     .collect()

NameError: name 'sc' is not defined

distinct

Code
rdd = sc.parallelize([1, 1, 4, 2, 1, 3, 3])
rdd.distinct().collect()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[92], line 1
----> 1 rdd = sc.parallelize([1, 1, 4, 2, 1, 3, 3])
      2 rdd.distinct().collect()

NameError: name 'sc' is not defined

“Pseudo-set” operations

Code
rdd1 = sc.parallelize(range(5))
rdd2 = sc.parallelize(range(3, 9))
rdd3 = rdd1.union(rdd2)
rdd3.collect()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[93], line 1
----> 1 rdd1 = sc.parallelize(range(5))
      2 rdd2 = sc.parallelize(range(3, 9))
      3 rdd3 = rdd1.union(rdd2)

NameError: name 'sc' is not defined
Code
rdd3.distinct().collect()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[94], line 1
----> 1 rdd3.distinct().collect()

NameError: name 'rdd3' is not defined
Code
rdd1 = sc.parallelize([1, 2])
rdd2 = sc.parallelize(["a", "b"])
rdd1.cartesian(rdd2).collect()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[95], line 1
----> 1 rdd1 = sc.parallelize([1, 2])
      2 rdd2 = sc.parallelize(["a", "b"])
      3 rdd1.cartesian(rdd2).collect()

NameError: name 'sc' is not defined

Actions

collect is obviously an action…

count, countByValue

Code
rdd = sc.parallelize([1, 3, 1, 2, 2, 2])
rdd.count()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[96], line 1
----> 1 rdd = sc.parallelize([1, 3, 1, 2, 2, 2])
      2 rdd.count()

NameError: name 'sc' is not defined
Code
rdd.countByValue()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[97], line 1
----> 1 rdd.countByValue()

NameError: name 'rdd' is not defined

Why does countByValue() returns a dictionary?

Are count() and countByValue() actions or transformations?

Code
u = np.int32((np.random.sample(100000) * 100000))  # 100000 random integers uniformly distributed on 0, ..., 100000

p = (
    sc.parallelize(u)
    .countByValue()
)

q = sorted(
    p.items(), 
    key = lambda x : x[1], 
    reverse=True
)

q[0:10]

q[0], 1 + np.log(len(u))/ np.log(np.log(len(u))), len(q)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[98], line 4
      1 u = np.int32((np.random.sample(100000) * 100000))  # 100000 random integers uniformly distributed on 0, ..., 100000
      3 p = (
----> 4     sc.parallelize(u)
      5     .countByValue()
      6 )
      8 q = sorted(
      9     p.items(), 
     10     key = lambda x : x[1], 
     11     reverse=True
     12 )
     14 q[0:10]

NameError: name 'sc' is not defined
  • How many distinct values do you expect in u ?
  • How large is the largest value in \(q\) ?
Code
from scipy.stats import poisson 

( 
    len(q), 
    (1-np.exp(-1)) * len(u),
    poisson.ppf(1.-1./len(u), 1)
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[99], line 4
      1 from scipy.stats import poisson 
      3 ( 
----> 4     len(q), 
      5     (1-np.exp(-1)) * len(u),
      6     poisson.ppf(1.-1./len(u), 1)
      7 )

NameError: name 'q' is not defined

take, takeOrdered

Code
rdd = sc.parallelize([(3, 'a'), (1, 'b'), (2, 'd')])
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[100], line 1
----> 1 rdd = sc.parallelize([(3, 'a'), (1, 'b'), (2, 'd')])

NameError: name 'sc' is not defined
Code
(1, 'b') <=  (2, 'd') <= (3, 'a')
True
Code
rdd.takeOrdered(2)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[102], line 1
----> 1 rdd.takeOrdered(2)

NameError: name 'rdd' is not defined
Code
rdd.takeOrdered(2, key=lambda x: x[1])
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[103], line 1
----> 1 rdd.takeOrdered(2, key=lambda x: x[1])

NameError: name 'rdd' is not defined

reduce, fold

Code
rdd = sc.range(1, 4)
rdd.reduce(lambda a, b: a + b)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[104], line 1
----> 1 rdd = sc.range(1, 4)
      2 rdd.reduce(lambda a, b: a + b)

NameError: name 'sc' is not defined
Code
rdd = sc.range(1, 4, numSlices=7)
rdd.reduce(lambda a, b: a + b)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[105], line 1
----> 1 rdd = sc.range(1, 4, numSlices=7)
      2 rdd.reduce(lambda a, b: a + b)

NameError: name 'sc' is not defined
Code
rdd = sc.parallelize(range(1,4), 3)
rdd.reduce(lambda a, b: a + b)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[106], line 1
----> 1 rdd = sc.parallelize(range(1,4), 3)
      2 rdd.reduce(lambda a, b: a + b)

NameError: name 'sc' is not defined
Code
( 
    sc.parallelize(range(1, 4), 2)
      .fold(0, lambda a, b: a + b)
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[107], line 2
      1 ( 
----> 2     sc.parallelize(range(1, 4), 2)
      3       .fold(0, lambda a, b: a + b)
      4 )

NameError: name 'sc' is not defined
Code
( 
    sc.parallelize(range(1, 4), 1)
      .fold(3, lambda a, b: a + b)
),( 
    sc.parallelize(range(1, 4), 2)
      .fold(2, lambda a, b: a + b)
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[108], line 2
      1 ( 
----> 2     sc.parallelize(range(1, 4), 1)
      3       .fold(3, lambda a, b: a + b)
      4 ),( 
      5     sc.parallelize(range(1, 4), 2)
      6       .fold(2, lambda a, b: a + b)
      7 )

NameError: name 'sc' is not defined
Code
rdd =  sc.parallelize(range(1, 4),3)
( 
    rdd.fold(1, lambda a, b: a + b), 
    rdd.getNumPartitions()
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[109], line 1
----> 1 rdd =  sc.parallelize(range(1, 4),3)
      2 ( 
      3     rdd.fold(1, lambda a, b: a + b), 
      4     rdd.getNumPartitions()
      5 )

NameError: name 'sc' is not defined
Code
rdd =  sc.parallelize(range(1, 4),4)

(
    rdd.fold(1, lambda a, b: a + b), 
    rdd.getNumPartitions()
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[110], line 1
----> 1 rdd =  sc.parallelize(range(1, 4),4)
      3 (
      4     rdd.fold(1, lambda a, b: a + b), 
      5     rdd.getNumPartitions()
      6 )

NameError: name 'sc' is not defined
Code
rdd = sc.parallelize([1, 2, 4], 2)
rdd.fold(2, lambda a, b: a + b)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[111], line 1
----> 1 rdd = sc.parallelize([1, 2, 4], 2)
      2 rdd.fold(2, lambda a, b: a + b)

NameError: name 'sc' is not defined
Code
rdd = sc.parallelize([1, 2, 4], 3)
rdd.fold(2, lambda a, b: a + b)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[112], line 1
----> 1 rdd = sc.parallelize([1, 2, 4], 3)
      2 rdd.fold(2, lambda a, b: a + b)

NameError: name 'sc' is not defined
Code
rdd.getNumPartitions()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[113], line 1
----> 1 rdd.getNumPartitions()

NameError: name 'rdd' is not defined

aggregate

Code
seqOp = lambda x, y: (x[0] + y, x[1] + 1)
combOp = lambda x, y: (x[0] + y[0], x[1] + y[1])

rdd = sc.parallelize([1, 2, 3, 4], 8)
(
    rdd.aggregate((0, 0), seqOp, combOp), rdd.getNumPartitions()
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[114], line 4
      1 seqOp = lambda x, y: (x[0] + y, x[1] + 1)
      2 combOp = lambda x, y: (x[0] + y[0], x[1] + y[1])
----> 4 rdd = sc.parallelize([1, 2, 3, 4], 8)
      5 (
      6     rdd.aggregate((0, 0), seqOp, combOp), rdd.getNumPartitions()
      7 )

NameError: name 'sc' is not defined
Code
op = lambda x, y: x+y
rdd = sc.parallelize([1, 2, 3, 4], 4)
(
    rdd.aggregate(0, op, op),
    rdd.getNumPartitions()
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[115], line 2
      1 op = lambda x, y: x+y
----> 2 rdd = sc.parallelize([1, 2, 3, 4], 4)
      3 (
      4     rdd.aggregate(0, op, op),
      5     rdd.getNumPartitions()
      6 )

NameError: name 'sc' is not defined

Exercice: sum of powers with aggregate

  • Using aggregate, compute the sum, the sum of squares \(x^2\) and the sum of cubes \(x^3\) for \(x \in \{1, \ldots, 10 \}\).
  • Check your computations using numpy
Code
seqOp = lambda x, y: (x[0] + y, x[1] + y ** 2, x[2] + y ** 3)
Code
combOp = lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2])
Code
sc.range(5)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[118], line 1
----> 1 sc.range(5)

NameError: name 'sc' is not defined
Code
( 
    sc
        .range(1, 11)
        .aggregate((0, 0, 0), seqOp, combOp)
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[119], line 2
      1 ( 
----> 2     sc
      3         .range(1, 11)
      4         .aggregate((0, 0, 0), seqOp, combOp)
      5 )

NameError: name 'sc' is not defined
Code
import numpy as np

x = np.arange(1, 11)
x
array([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10])
Code
x.sum(), (x**2).sum(), (x**3).sum(), x.cumsum()
(np.int64(55),
 np.int64(385),
 np.int64(3025),
 array([ 1,  3,  6, 10, 15, 21, 28, 36, 45, 55]))

Computing an empirical variance with aggregate

Assume a sample is stored as a RDD. Using aggregate, compute the sample variance \(\frac{1}{n}\sum_{i=1}^n (x_i - \overline{X}_n)^2\) where \(\overline{X}_n = \frac{1}{n} \sum_{i=1}^n x_i\)

PairRDD

Code
rdd = sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])

rdd.collect()  # not yet 
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[122], line 1
----> 1 rdd = sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
      3 rdd.collect()  # not yet 

NameError: name 'sc' is not defined
Code
rdd = rdd.map(lambda x: (x[0], x[1:]))

rdd.collect()  # done 
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[123], line 1
----> 1 rdd = rdd.map(lambda x: (x[0], x[1:]))
      3 rdd.collect()  # done 

NameError: name 'rdd' is not defined

Transformations

keys, values

Code
rdd.keys().collect()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[124], line 1
----> 1 rdd.keys().collect()

NameError: name 'rdd' is not defined
Code
rdd.values().collect()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[125], line 1
----> 1 rdd.values().collect()

NameError: name 'rdd' is not defined
Warning

All elements must be tuples with two elements (key and value)

Code
rdd = sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
rdd.keys().collect()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[126], line 1
----> 1 rdd = sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
      2 rdd.keys().collect()

NameError: name 'sc' is not defined
Code
rdd.values().collect()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[127], line 1
----> 1 rdd.values().collect()

NameError: name 'rdd' is not defined

The values are not what we expected wrong… so we must do

Code
rdd = ( sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
          .map(lambda x: (x[0], x[1:]))
      )
rdd.keys().collect()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[128], line 1
----> 1 rdd = ( sc.parallelize([[1, "a", 7], [2, "b", 13], [2, "c", 17]])
      2           .map(lambda x: (x[0], x[1:]))
      3       )
      4 rdd.keys().collect()

NameError: name 'sc' is not defined
Code
rdd.values().collect()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[129], line 1
----> 1 rdd.values().collect()

NameError: name 'rdd' is not defined

Now, the values are correct.

mapValues, flatMapValues

Code
rdd = sc.parallelize([("a", "x y z"), ("b", "p r")])

rdd.mapValues(lambda v: v.split(' ')).collect(), rdd.collect()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[130], line 1
----> 1 rdd = sc.parallelize([("a", "x y z"), ("b", "p r")])
      3 rdd.mapValues(lambda v: v.split(' ')).collect(), rdd.collect()

NameError: name 'sc' is not defined
Code
rdd.flatMapValues(lambda v: v.split(' ')).collect()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[131], line 1
----> 1 rdd.flatMapValues(lambda v: v.split(' ')).collect()

NameError: name 'rdd' is not defined

groupByKey

Code
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("b", 3), ("c", 42)])
( 
    rdd.groupByKey()
       .mapValues(list)
       .collect()
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[132], line 1
----> 1 rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("b", 3), ("c", 42)])
      2 ( 
      3     rdd.groupByKey()
      4        .mapValues(list)
      5        .collect()
      6 )

NameError: name 'sc' is not defined
Code
rdd.groupByKey().collect()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[133], line 1
----> 1 rdd.groupByKey().collect()

NameError: name 'rdd' is not defined

reduceByKey

Code
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.reduceByKey(lambda a, b: a + b).collect()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[134], line 1
----> 1 rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
      2 rdd.reduceByKey(lambda a, b: a + b).collect()

NameError: name 'sc' is not defined

combineByKey

Code
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 13)])

def add(a, b): 
    return a + str(b)

rdd.combineByKey(str, add, add).collect()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[135], line 1
----> 1 rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 13)])
      3 def add(a, b): 
      4     return a + str(b)

NameError: name 'sc' is not defined

join, rightOuterJoin, leftOuterJoin

Code
employees = sc.parallelize([
    (31, "Rafferty"),
    (33, "Jones"),
    (33, "Heisenberg"),
    (34, "Robinson"),
    (34, "Smith"),
    (None, "Williams")
])
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[136], line 1
----> 1 employees = sc.parallelize([
      2     (31, "Rafferty"),
      3     (33, "Jones"),
      4     (33, "Heisenberg"),
      5     (34, "Robinson"),
      6     (34, "Smith"),
      7     (None, "Williams")
      8 ])

NameError: name 'sc' is not defined
Code
departments = sc.parallelize([
    (31, "Sales"),
    (33, "Engineering"),
    (34, "Clerical"),
    (35, "Marketing")
])
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[137], line 1
----> 1 departments = sc.parallelize([
      2     (31, "Sales"),
      3     (33, "Engineering"),
      4     (34, "Clerical"),
      5     (35, "Marketing")
      6 ])

NameError: name 'sc' is not defined
Code
( 
    employees
        .join(departments)
        .sortByKey()
        .collect()
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[138], line 2
      1 ( 
----> 2     employees
      3         .join(departments)
      4         .sortByKey()
      5         .collect()
      6 )

NameError: name 'employees' is not defined
Code
( 
    employees
        .rightOuterJoin(departments)
        .sortByKey()
        .collect()
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[139], line 2
      1 ( 
----> 2     employees
      3         .rightOuterJoin(departments)
      4         .sortByKey()
      5         .collect()
      6 )

NameError: name 'employees' is not defined
Code
(
    employees
        .leftOuterJoin(departments)
        .collect()
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[140], line 2
      1 (
----> 2     employees
      3         .leftOuterJoin(departments)
      4         .collect()
      5 )

NameError: name 'employees' is not defined

Actions

Code
employees.countByKey()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[141], line 1
----> 1 employees.countByKey()

NameError: name 'employees' is not defined
Code
employees.lookup(33)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[142], line 1
----> 1 employees.lookup(33)

NameError: name 'employees' is not defined
Code
employees.lookup(None)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[143], line 1
----> 1 employees.lookup(None)

NameError: name 'employees' is not defined
Code
employees.collectAsMap()
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[144], line 1
----> 1 employees.collectAsMap()

NameError: name 'employees' is not defined

References

Spark Core reference