Package pyspark :: Module rdd
[frames] | no frames]

Source Code for Module pyspark.rdd

  1  # 
  2  # Licensed to the Apache Software Foundation (ASF) under one or more 
  3  # contributor license agreements.  See the NOTICE file distributed with 
  4  # this work for additional information regarding copyright ownership. 
  5  # The ASF licenses this file to You under the Apache License, Version 2.0 
  6  # (the "License"); you may not use this file except in compliance with 
  7  # the License.  You may obtain a copy of the License at 
  8  # 
  9  #    http://www.apache.org/licenses/LICENSE-2.0 
 10  # 
 11  # Unless required by applicable law or agreed to in writing, software 
 12  # distributed under the License is distributed on an "AS IS" BASIS, 
 13  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 14  # See the License for the specific language governing permissions and 
 15  # limitations under the License. 
 16  # 
 17   
 18  from base64 import standard_b64encode as b64enc 
 19  import copy 
 20  from collections import defaultdict 
 21  from itertools import chain, ifilter, imap, product 
 22  import operator 
 23  import os 
 24  import sys 
 25  import shlex 
 26  from subprocess import Popen, PIPE 
 27  from tempfile import NamedTemporaryFile 
 28  from threading import Thread 
 29   
 30  from pyspark import cloudpickle 
 31  from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \ 
 32      read_from_pickle_file 
 33  from pyspark.join import python_join, python_left_outer_join, \ 
 34      python_right_outer_join, python_cogroup 
 35  from pyspark.statcounter import StatCounter 
 36  from pyspark.rddsampler import RDDSampler 
 37   
 38  from py4j.java_collections import ListConverter, MapConverter 
 39   
 40   
 41  __all__ = ["RDD"] 
42 43 44 -class RDD(object):
45 """ 46 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 47 Represents an immutable, partitioned collection of elements that can be 48 operated on in parallel. 49 """ 50
51 - def __init__(self, jrdd, ctx):
52 self._jrdd = jrdd 53 self.is_cached = False 54 self.is_checkpointed = False 55 self.ctx = ctx 56 self._partitionFunc = None
57 58 @property
59 - def context(self):
60 """ 61 The L{SparkContext} that this RDD was created on. 62 """ 63 return self.ctx
64
65 - def cache(self):
66 """ 67 Persist this RDD with the default storage level (C{MEMORY_ONLY}). 68 """ 69 self.is_cached = True 70 self._jrdd.cache() 71 return self
72
73 - def persist(self, storageLevel):
74 """ 75 Set this RDD's storage level to persist its values across operations after the first time 76 it is computed. This can only be used to assign a new storage level if the RDD does not 77 have a storage level set yet. 78 """ 79 self.is_cached = True 80 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) 81 self._jrdd.persist(javaStorageLevel) 82 return self
83
84 - def unpersist(self):
85 """ 86 Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. 87 """ 88 self.is_cached = False 89 self._jrdd.unpersist() 90 return self
91
92 - def checkpoint(self):
93 """ 94 Mark this RDD for checkpointing. It will be saved to a file inside the 95 checkpoint directory set with L{SparkContext.setCheckpointDir()} and 96 all references to its parent RDDs will be removed. This function must 97 be called before any job has been executed on this RDD. It is strongly 98 recommended that this RDD is persisted in memory, otherwise saving it 99 on a file will require recomputation. 100 """ 101 self.is_checkpointed = True 102 self._jrdd.rdd().checkpoint()
103
104 - def isCheckpointed(self):
105 """ 106 Return whether this RDD has been checkpointed or not 107 """ 108 return self._jrdd.rdd().isCheckpointed()
109
110 - def getCheckpointFile(self):
111 """ 112 Gets the name of the file to which this RDD was checkpointed 113 """ 114 checkpointFile = self._jrdd.rdd().getCheckpointFile() 115 if checkpointFile.isDefined(): 116 return checkpointFile.get() 117 else: 118 return None
119 120 # TODO persist(self, storageLevel) 121
122 - def map(self, f, preservesPartitioning=False):
123 """ 124 Return a new RDD containing the distinct elements in this RDD. 125 """ 126 def func(split, iterator): return imap(f, iterator) 127 return PipelinedRDD(self, func, preservesPartitioning)
128
129 - def flatMap(self, f, preservesPartitioning=False):
130 """ 131 Return a new RDD by first applying a function to all elements of this 132 RDD, and then flattening the results. 133 134 >>> rdd = sc.parallelize([2, 3, 4]) 135 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) 136 [1, 1, 1, 2, 2, 3] 137 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) 138 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] 139 """ 140 def func(s, iterator): return chain.from_iterable(imap(f, iterator)) 141 return self.mapPartitionsWithSplit(func, preservesPartitioning)
142
143 - def mapPartitions(self, f, preservesPartitioning=False):
144 """ 145 Return a new RDD by applying a function to each partition of this RDD. 146 147 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 148 >>> def f(iterator): yield sum(iterator) 149 >>> rdd.mapPartitions(f).collect() 150 [3, 7] 151 """ 152 def func(s, iterator): return f(iterator) 153 return self.mapPartitionsWithSplit(func)
154
155 - def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
156 """ 157 Return a new RDD by applying a function to each partition of this RDD, 158 while tracking the index of the original partition. 159 160 >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 161 >>> def f(splitIndex, iterator): yield splitIndex 162 >>> rdd.mapPartitionsWithSplit(f).sum() 163 6 164 """ 165 return PipelinedRDD(self, f, preservesPartitioning)
166
167 - def filter(self, f):
168 """ 169 Return a new RDD containing only the elements that satisfy a predicate. 170 171 >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 172 >>> rdd.filter(lambda x: x % 2 == 0).collect() 173 [2, 4] 174 """ 175 def func(iterator): return ifilter(f, iterator) 176 return self.mapPartitions(func)
177
178 - def distinct(self):
179 """ 180 Return a new RDD containing the distinct elements in this RDD. 181 182 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) 183 [1, 2, 3] 184 """ 185 return self.map(lambda x: (x, None)) \ 186 .reduceByKey(lambda x, _: x) \ 187 .map(lambda (x, _): x)
188
189 - def sample(self, withReplacement, fraction, seed):
190 """ 191 Return a sampled subset of this RDD (relies on numpy and falls back 192 on default random generator if numpy is unavailable). 193 194 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP 195 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] 196 """ 197 return self.mapPartitionsWithSplit(RDDSampler(withReplacement, fraction, seed).func, True)
198 199 # this is ported from scala/spark/RDD.scala
200 - def takeSample(self, withReplacement, num, seed):
201 """ 202 Return a fixed-size sampled subset of this RDD (currently requires numpy). 203 204 >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP 205 [4, 2, 1, 8, 2, 7, 0, 4, 1, 4] 206 """ 207 208 fraction = 0.0 209 total = 0 210 multiplier = 3.0 211 initialCount = self.count() 212 maxSelected = 0 213 214 if (num < 0): 215 raise ValueError 216 217 if initialCount > sys.maxint - 1: 218 maxSelected = sys.maxint - 1 219 else: 220 maxSelected = initialCount 221 222 if num > initialCount and not withReplacement: 223 total = maxSelected 224 fraction = multiplier * (maxSelected + 1) / initialCount 225 else: 226 fraction = multiplier * (num + 1) / initialCount 227 total = num 228 229 samples = self.sample(withReplacement, fraction, seed).collect() 230 231 # If the first sample didn't turn out large enough, keep trying to take samples; 232 # this shouldn't happen often because we use a big multiplier for their initial size. 233 # See: scala/spark/RDD.scala 234 while len(samples) < total: 235 if seed > sys.maxint - 2: 236 seed = -1 237 seed += 1 238 samples = self.sample(withReplacement, fraction, seed).collect() 239 240 sampler = RDDSampler(withReplacement, fraction, seed+1) 241 sampler.shuffle(samples) 242 return samples[0:total]
243
244 - def union(self, other):
245 """ 246 Return the union of this RDD and another one. 247 248 >>> rdd = sc.parallelize([1, 1, 2, 3]) 249 >>> rdd.union(rdd).collect() 250 [1, 1, 2, 3, 1, 1, 2, 3] 251 """ 252 return RDD(self._jrdd.union(other._jrdd), self.ctx)
253
254 - def __add__(self, other):
255 """ 256 Return the union of this RDD and another one. 257 258 >>> rdd = sc.parallelize([1, 1, 2, 3]) 259 >>> (rdd + rdd).collect() 260 [1, 1, 2, 3, 1, 1, 2, 3] 261 """ 262 if not isinstance(other, RDD): 263 raise TypeError 264 return self.union(other)
265 266 # TODO: sort 267
268 - def glom(self):
269 """ 270 Return an RDD created by coalescing all elements within each partition 271 into a list. 272 273 >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 274 >>> sorted(rdd.glom().collect()) 275 [[1, 2], [3, 4]] 276 """ 277 def func(iterator): yield list(iterator) 278 return self.mapPartitions(func)
279
280 - def cartesian(self, other):
281 """ 282 Return the Cartesian product of this RDD and another one, that is, the 283 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and 284 C{b} is in C{other}. 285 286 >>> rdd = sc.parallelize([1, 2]) 287 >>> sorted(rdd.cartesian(rdd).collect()) 288 [(1, 1), (1, 2), (2, 1), (2, 2)] 289 """ 290 # Due to batching, we can't use the Java cartesian method. 291 java_cartesian = RDD(self._jrdd.cartesian(other._jrdd), self.ctx) 292 def unpack_batches(pair): 293 (x, y) = pair 294 if type(x) == Batch or type(y) == Batch: 295 xs = x.items if type(x) == Batch else [x] 296 ys = y.items if type(y) == Batch else [y] 297 for pair in product(xs, ys): 298 yield pair 299 else: 300 yield pair
301 return java_cartesian.flatMap(unpack_batches)
302
303 - def groupBy(self, f, numPartitions=None):
304 """ 305 Return an RDD of grouped items. 306 307 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) 308 >>> result = rdd.groupBy(lambda x: x % 2).collect() 309 >>> sorted([(x, sorted(y)) for (x, y) in result]) 310 [(0, [2, 8]), (1, [1, 1, 3, 5])] 311 """ 312 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
313
314 - def pipe(self, command, env={}):
315 """ 316 Return an RDD created by piping elements to a forked external process. 317 318 >>> sc.parallelize([1, 2, 3]).pipe('cat').collect() 319 ['1', '2', '3'] 320 """ 321 def func(iterator): 322 pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 323 def pipe_objs(out): 324 for obj in iterator: 325 out.write(str(obj).rstrip('\n') + '\n') 326 out.close()
327 Thread(target=pipe_objs, args=[pipe.stdin]).start() 328 return (x.rstrip('\n') for x in pipe.stdout) 329 return self.mapPartitions(func) 330
331 - def foreach(self, f):
332 """ 333 Applies a function to all elements of this RDD. 334 335 >>> def f(x): print x 336 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) 337 """ 338 def processPartition(iterator): 339 for x in iterator: 340 f(x) 341 yield None
342 self.mapPartitions(processPartition).collect() # Force evaluation 343
344 - def collect(self):
345 """ 346 Return a list that contains all of the elements in this RDD. 347 """ 348 picklesInJava = self._jrdd.collect().iterator() 349 return list(self._collect_iterator_through_file(picklesInJava))
350
351 - def _collect_iterator_through_file(self, iterator):
352 # Transferring lots of data through Py4J can be slow because 353 # socket.readline() is inefficient. Instead, we'll dump the data to a 354 # file and read it back. 355 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) 356 tempFile.close() 357 self.ctx._writeIteratorToPickleFile(iterator, tempFile.name) 358 # Read the data into Python and deserialize it: 359 with open(tempFile.name, 'rb') as tempFile: 360 for item in read_from_pickle_file(tempFile): 361 yield item 362 os.unlink(tempFile.name)
363
364 - def reduce(self, f):
365 """ 366 Reduces the elements of this RDD using the specified commutative and 367 associative binary operator. 368 369 >>> from operator import add 370 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 371 15 372 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 373 10 374 """ 375 def func(iterator): 376 acc = None 377 for obj in iterator: 378 if acc is None: 379 acc = obj 380 else: 381 acc = f(obj, acc) 382 if acc is not None: 383 yield acc
384 vals = self.mapPartitions(func).collect() 385 return reduce(f, vals) 386
387 - def fold(self, zeroValue, op):
388 """ 389 Aggregate the elements of each partition, and then the results for all 390 the partitions, using a given associative function and a neutral "zero 391 value." 392 393 The function C{op(t1, t2)} is allowed to modify C{t1} and return it 394 as its result value to avoid object allocation; however, it should not 395 modify C{t2}. 396 397 >>> from operator import add 398 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 399 15 400 """ 401 def func(iterator): 402 acc = zeroValue 403 for obj in iterator: 404 acc = op(obj, acc) 405 yield acc
406 vals = self.mapPartitions(func).collect() 407 return reduce(op, vals, zeroValue) 408 409 # TODO: aggregate 410
411 - def sum(self):
412 """ 413 Add up the elements in this RDD. 414 415 >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 416 6.0 417 """ 418 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
419
420 - def count(self):
421 """ 422 Return the number of elements in this RDD. 423 424 >>> sc.parallelize([2, 3, 4]).count() 425 3 426 """ 427 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
428
429 - def stats(self):
430 """ 431 Return a L{StatCounter} object that captures the mean, variance 432 and count of the RDD's elements in one operation. 433 """ 434 def redFunc(left_counter, right_counter): 435 return left_counter.mergeStats(right_counter)
436 437 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) 438
439 - def mean(self):
440 """ 441 Compute the mean of this RDD's elements. 442 443 >>> sc.parallelize([1, 2, 3]).mean() 444 2.0 445 """ 446 return self.stats().mean()
447
448 - def variance(self):
449 """ 450 Compute the variance of this RDD's elements. 451 452 >>> sc.parallelize([1, 2, 3]).variance() 453 0.666... 454 """ 455 return self.stats().variance()
456
457 - def stdev(self):
458 """ 459 Compute the standard deviation of this RDD's elements. 460 461 >>> sc.parallelize([1, 2, 3]).stdev() 462 0.816... 463 """ 464 return self.stats().stdev()
465
466 - def sampleStdev(self):
467 """ 468 Compute the sample standard deviation of this RDD's elements (which corrects for bias in 469 estimating the standard deviation by dividing by N-1 instead of N). 470 471 >>> sc.parallelize([1, 2, 3]).sampleStdev() 472 1.0 473 """ 474 return self.stats().sampleStdev()
475
476 - def sampleVariance(self):
477 """ 478 Compute the sample variance of this RDD's elements (which corrects for bias in 479 estimating the variance by dividing by N-1 instead of N). 480 481 >>> sc.parallelize([1, 2, 3]).sampleVariance() 482 1.0 483 """ 484 return self.stats().sampleVariance()
485
486 - def countByValue(self):
487 """ 488 Return the count of each unique value in this RDD as a dictionary of 489 (value, count) pairs. 490 491 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) 492 [(1, 2), (2, 3)] 493 """ 494 def countPartition(iterator): 495 counts = defaultdict(int) 496 for obj in iterator: 497 counts[obj] += 1 498 yield counts
499 def mergeMaps(m1, m2): 500 for (k, v) in m2.iteritems(): 501 m1[k] += v 502 return m1 503 return self.mapPartitions(countPartition).reduce(mergeMaps) 504
505 - def take(self, num):
506 """ 507 Take the first num elements of the RDD. 508 509 This currently scans the partitions *one by one*, so it will be slow if 510 a lot of partitions are required. In that case, use L{collect} to get 511 the whole RDD instead. 512 513 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 514 [2, 3] 515 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) 516 [2, 3, 4, 5, 6] 517 """ 518 def takeUpToNum(iterator): 519 taken = 0 520 while taken < num: 521 yield next(iterator) 522 taken += 1
523 # Take only up to num elements from each partition we try 524 mapped = self.mapPartitions(takeUpToNum) 525 items = [] 526 for partition in range(mapped._jrdd.splits().size()): 527 iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition) 528 items.extend(self._collect_iterator_through_file(iterator)) 529 if len(items) >= num: 530 break 531 return items[:num] 532
533 - def first(self):
534 """ 535 Return the first element in this RDD. 536 537 >>> sc.parallelize([2, 3, 4]).first() 538 2 539 """ 540 return self.take(1)[0]
541
542 - def saveAsTextFile(self, path):
543 """ 544 Save this RDD as a text file, using string representations of elements. 545 546 >>> tempFile = NamedTemporaryFile(delete=True) 547 >>> tempFile.close() 548 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) 549 >>> from fileinput import input 550 >>> from glob import glob 551 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 552 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' 553 """ 554 def func(split, iterator): 555 return (str(x).encode("utf-8") for x in iterator)
556 keyed = PipelinedRDD(self, func) 557 keyed._bypass_serializer = True 558 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) 559 560 # Pair functions 561
562 - def collectAsMap(self):
563 """ 564 Return the key-value pairs in this RDD to the master as a dictionary. 565 566 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 567 >>> m[1] 568 2 569 >>> m[3] 570 4 571 """ 572 return dict(self.collect())
573
574 - def reduceByKey(self, func, numPartitions=None):
575 """ 576 Merge the values for each key using an associative reduce function. 577 578 This will also perform the merging locally on each mapper before 579 sending results to a reducer, similarly to a "combiner" in MapReduce. 580 581 Output will be hash-partitioned with C{numPartitions} partitions, or 582 the default parallelism level if C{numPartitions} is not specified. 583 584 >>> from operator import add 585 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 586 >>> sorted(rdd.reduceByKey(add).collect()) 587 [('a', 2), ('b', 1)] 588 """ 589 return self.combineByKey(lambda x: x, func, func, numPartitions)
590
591 - def reduceByKeyLocally(self, func):
592 """ 593 Merge the values for each key using an associative reduce function, but 594 return the results immediately to the master as a dictionary. 595 596 This will also perform the merging locally on each mapper before 597 sending results to a reducer, similarly to a "combiner" in MapReduce. 598 599 >>> from operator import add 600 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 601 >>> sorted(rdd.reduceByKeyLocally(add).items()) 602 [('a', 2), ('b', 1)] 603 """ 604 def reducePartition(iterator): 605 m = {} 606 for (k, v) in iterator: 607 m[k] = v if k not in m else func(m[k], v) 608 yield m
609 def mergeMaps(m1, m2): 610 for (k, v) in m2.iteritems(): 611 m1[k] = v if k not in m1 else func(m1[k], v) 612 return m1 613 return self.mapPartitions(reducePartition).reduce(mergeMaps) 614
615 - def countByKey(self):
616 """ 617 Count the number of elements for each key, and return the result to the 618 master as a dictionary. 619 620 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 621 >>> sorted(rdd.countByKey().items()) 622 [('a', 2), ('b', 1)] 623 """ 624 return self.map(lambda x: x[0]).countByValue()
625
626 - def join(self, other, numPartitions=None):
627 """ 628 Return an RDD containing all pairs of elements with matching keys in 629 C{self} and C{other}. 630 631 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where 632 (k, v1) is in C{self} and (k, v2) is in C{other}. 633 634 Performs a hash join across the cluster. 635 636 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 637 >>> y = sc.parallelize([("a", 2), ("a", 3)]) 638 >>> sorted(x.join(y).collect()) 639 [('a', (1, 2)), ('a', (1, 3))] 640 """ 641 return python_join(self, other, numPartitions)
642
643 - def leftOuterJoin(self, other, numPartitions=None):
644 """ 645 Perform a left outer join of C{self} and C{other}. 646 647 For each element (k, v) in C{self}, the resulting RDD will either 648 contain all pairs (k, (v, w)) for w in C{other}, or the pair 649 (k, (v, None)) if no elements in other have key k. 650 651 Hash-partitions the resulting RDD into the given number of partitions. 652 653 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 654 >>> y = sc.parallelize([("a", 2)]) 655 >>> sorted(x.leftOuterJoin(y).collect()) 656 [('a', (1, 2)), ('b', (4, None))] 657 """ 658 return python_left_outer_join(self, other, numPartitions)
659
660 - def rightOuterJoin(self, other, numPartitions=None):
661 """ 662 Perform a right outer join of C{self} and C{other}. 663 664 For each element (k, w) in C{other}, the resulting RDD will either 665 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 666 if no elements in C{self} have key k. 667 668 Hash-partitions the resulting RDD into the given number of partitions. 669 670 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 671 >>> y = sc.parallelize([("a", 2)]) 672 >>> sorted(y.rightOuterJoin(x).collect()) 673 [('a', (2, 1)), ('b', (None, 4))] 674 """ 675 return python_right_outer_join(self, other, numPartitions)
676 677 # TODO: add option to control map-side combining
678 - def partitionBy(self, numPartitions, partitionFunc=hash):
679 """ 680 Return a copy of the RDD partitioned using the specified partitioner. 681 682 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) 683 >>> sets = pairs.partitionBy(2).glom().collect() 684 >>> set(sets[0]).intersection(set(sets[1])) 685 set([]) 686 """ 687 if numPartitions is None: 688 numPartitions = self.ctx.defaultParallelism 689 # Transferring O(n) objects to Java is too expensive. Instead, we'll 690 # form the hash buckets in Python, transferring O(numPartitions) objects 691 # to Java. Each object is a (splitNumber, [objects]) pair. 692 def add_shuffle_key(split, iterator): 693 buckets = defaultdict(list) 694 for (k, v) in iterator: 695 buckets[partitionFunc(k) % numPartitions].append((k, v)) 696 for (split, items) in buckets.iteritems(): 697 yield str(split) 698 yield dump_pickle(Batch(items))
699 keyed = PipelinedRDD(self, add_shuffle_key) 700 keyed._bypass_serializer = True 701 pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() 702 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 703 id(partitionFunc)) 704 jrdd = pairRDD.partitionBy(partitioner).values() 705 rdd = RDD(jrdd, self.ctx) 706 # This is required so that id(partitionFunc) remains unique, even if 707 # partitionFunc is a lambda: 708 rdd._partitionFunc = partitionFunc 709 return rdd 710 711 # TODO: add control over map-side aggregation
712 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 713 numPartitions=None):
714 """ 715 Generic function to combine the elements for each key using a custom 716 set of aggregation functions. 717 718 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined 719 type" C. Note that V and C can be different -- for example, one might 720 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). 721 722 Users provide three functions: 723 724 - C{createCombiner}, which turns a V into a C (e.g., creates 725 a one-element list) 726 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of 727 a list) 728 - C{mergeCombiners}, to combine two C's into a single one. 729 730 In addition, users can control the partitioning of the output RDD. 731 732 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 733 >>> def f(x): return x 734 >>> def add(a, b): return a + str(b) 735 >>> sorted(x.combineByKey(str, add, add).collect()) 736 [('a', '11'), ('b', '1')] 737 """ 738 if numPartitions is None: 739 numPartitions = self.ctx.defaultParallelism 740 def combineLocally(iterator): 741 combiners = {} 742 for (k, v) in iterator: 743 if k not in combiners: 744 combiners[k] = createCombiner(v) 745 else: 746 combiners[k] = mergeValue(combiners[k], v) 747 return combiners.iteritems()
748 locally_combined = self.mapPartitions(combineLocally) 749 shuffled = locally_combined.partitionBy(numPartitions) 750 def _mergeCombiners(iterator): 751 combiners = {} 752 for (k, v) in iterator: 753 if not k in combiners: 754 combiners[k] = v 755 else: 756 combiners[k] = mergeCombiners(combiners[k], v) 757 return combiners.iteritems() 758 return shuffled.mapPartitions(_mergeCombiners) 759 760 # TODO: support variant with custom partitioner
761 - def groupByKey(self, numPartitions=None):
762 """ 763 Group the values for each key in the RDD into a single sequence. 764 Hash-partitions the resulting RDD with into numPartitions partitions. 765 766 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 767 >>> sorted(x.groupByKey().collect()) 768 [('a', [1, 1]), ('b', [1])] 769 """ 770 771 def createCombiner(x): 772 return [x]
773 774 def mergeValue(xs, x): 775 xs.append(x) 776 return xs 777 778 def mergeCombiners(a, b): 779 return a + b 780 781 return self.combineByKey(createCombiner, mergeValue, mergeCombiners, 782 numPartitions) 783 784 # TODO: add tests
785 - def flatMapValues(self, f):
786 """ 787 Pass each value in the key-value pair RDD through a flatMap function 788 without changing the keys; this also retains the original RDD's 789 partitioning. 790 """ 791 flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) 792 return self.flatMap(flat_map_fn, preservesPartitioning=True)
793
794 - def mapValues(self, f):
795 """ 796 Pass each value in the key-value pair RDD through a map function 797 without changing the keys; this also retains the original RDD's 798 partitioning. 799 """ 800 map_values_fn = lambda (k, v): (k, f(v)) 801 return self.map(map_values_fn, preservesPartitioning=True)
802 803 # TODO: support varargs cogroup of several RDDs.
804 - def groupWith(self, other):
805 """ 806 Alias for cogroup. 807 """ 808 return self.cogroup(other)
809 810 # TODO: add variant with custom parittioner
811 - def cogroup(self, other, numPartitions=None):
812 """ 813 For each key k in C{self} or C{other}, return a resulting RDD that 814 contains a tuple with the list of values for that key in C{self} as well 815 as C{other}. 816 817 >>> x = sc.parallelize([("a", 1), ("b", 4)]) 818 >>> y = sc.parallelize([("a", 2)]) 819 >>> sorted(x.cogroup(y).collect()) 820 [('a', ([1], [2])), ('b', ([4], []))] 821 """ 822 return python_cogroup(self, other, numPartitions)
823
824 - def subtractByKey(self, other, numPartitions=None):
825 """ 826 Return each (key, value) pair in C{self} that has no pair with matching key 827 in C{other}. 828 829 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) 830 >>> y = sc.parallelize([("a", 3), ("c", None)]) 831 >>> sorted(x.subtractByKey(y).collect()) 832 [('b', 4), ('b', 5)] 833 """ 834 filter_func = lambda tpl: len(tpl[1][0]) > 0 and len(tpl[1][1]) == 0 835 map_func = lambda tpl: [(tpl[0], val) for val in tpl[1][0]] 836 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
837
838 - def subtract(self, other, numPartitions=None):
839 """ 840 Return each value in C{self} that is not contained in C{other}. 841 842 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) 843 >>> y = sc.parallelize([("a", 3), ("c", None)]) 844 >>> sorted(x.subtract(y).collect()) 845 [('a', 1), ('b', 4), ('b', 5)] 846 """ 847 rdd = other.map(lambda x: (x, True)) # note: here 'True' is just a placeholder 848 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) # note: here 'True' is just a placeholder
849
850 - def keyBy(self, f):
851 """ 852 Creates tuples of the elements in this RDD by applying C{f}. 853 854 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) 855 >>> y = sc.parallelize(zip(range(0,5), range(0,5))) 856 >>> sorted(x.cogroup(y).collect()) 857 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] 858 """ 859 return self.map(lambda x: (f(x), x))
860
861 # TODO: `lookup` is disabled because we can't make direct comparisons based 862 # on the key; we need to compare the hash of the key to the hash of the 863 # keys in the pairs. This could be an expensive operation, since those 864 # hashes aren't retained. 865 866 867 -class PipelinedRDD(RDD):
868 """ 869 Pipelined maps: 870 >>> rdd = sc.parallelize([1, 2, 3, 4]) 871 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() 872 [4, 8, 12, 16] 873 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() 874 [4, 8, 12, 16] 875 876 Pipelined reduces: 877 >>> from operator import add 878 >>> rdd.map(lambda x: 2 * x).reduce(add) 879 20 880 >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 881 20 882 """
883 - def __init__(self, prev, func, preservesPartitioning=False):
884 if isinstance(prev, PipelinedRDD) and prev._is_pipelinable(): 885 prev_func = prev.func 886 def pipeline_func(split, iterator): 887 return func(split, prev_func(split, iterator))
888 self.func = pipeline_func 889 self.preservesPartitioning = \ 890 prev.preservesPartitioning and preservesPartitioning 891 self._prev_jrdd = prev._prev_jrdd 892 else: 893 self.func = func 894 self.preservesPartitioning = preservesPartitioning 895 self._prev_jrdd = prev._jrdd 896 self.is_cached = False 897 self.is_checkpointed = False 898 self.ctx = prev.ctx 899 self.prev = prev 900 self._jrdd_val = None 901 self._bypass_serializer = False
902 903 @property
904 - def _jrdd(self):
905 if self._jrdd_val: 906 return self._jrdd_val 907 func = self.func 908 if not self._bypass_serializer and self.ctx.batchSize != 1: 909 oldfunc = self.func 910 batchSize = self.ctx.batchSize 911 def batched_func(split, iterator): 912 return batched(oldfunc(split, iterator), batchSize)
913 func = batched_func 914 cmds = [func, self._bypass_serializer] 915 pipe_command = ' '.join(b64enc(cloudpickle.dumps(f)) for f in cmds) 916 broadcast_vars = ListConverter().convert( 917 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], 918 self.ctx._gateway._gateway_client) 919 self.ctx._pickled_broadcast_vars.clear() 920 class_manifest = self._prev_jrdd.classManifest() 921 env = MapConverter().convert(self.ctx.environment, 922 self.ctx._gateway._gateway_client) 923 includes = ListConverter().convert(self.ctx._python_includes, 924 self.ctx._gateway._gateway_client) 925 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 926 pipe_command, env, includes, self.preservesPartitioning, self.ctx.pythonExec, 927 broadcast_vars, self.ctx._javaAccumulator, class_manifest) 928 self._jrdd_val = python_rdd.asJavaRDD() 929 return self._jrdd_val 930
931 - def _is_pipelinable(self):
932 return not (self.is_cached or self.is_checkpointed)
933
934 935 -def _test():
936 import doctest 937 from pyspark.context import SparkContext 938 globs = globals().copy() 939 # The small batch size here ensures that we see multiple batches, 940 # even in these small test examples: 941 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 942 (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS) 943 globs['sc'].stop() 944 if failure_count: 945 exit(-1)
946 947 948 if __name__ == "__main__": 949 _test() 950