变换
- map //abstract class RDD
- /**
- * Return a new RDD by applying a function to all elements of this RDD.
- */
- def map[U: ClassTag](f: T => U): RDD[U] = withScope {
- val cleanF = sc.clean(f)
- new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
- }
- filter
- /**
- * Return a new RDD containing only the elements that satisfy a predicate.
- */
- def filter(f: T => Boolean): RDD[T] = withScope {
- val cleanF = sc.clean(f)
- new MapPartitionsRDD[T, T](
- this,
- (context, pid, iter) => iter.filter(cleanF),
- preservesPartitioning = true)
- }
- flatMap
- /**
- * Return a new RDD by first applying a function to all elements of this
- * RDD, and then flattening the results.
- */
- def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
- val cleanF = sc.clean(f)
- new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
- }
- mapPartitions
- /**
- * Return a new RDD by applying a function to each partition of this RDD.
- *
- * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
- * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
- */
- def mapPartitions[U: ClassTag](
- f: Iterator[T] => Iterator[U],
- preservesPartitioning: Boolean = false): RDD[U] = withScope {
- val cleanedF = sc.clean(f)
- new MapPartitionsRDD(
- this,
- (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
- preservesPartitioning)
- }
- mapPartitionsWithIndex
- /**
- * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
- * of the original partition.
- *
- * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
- * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
- */
- def mapPartitionsWithIndex[U: ClassTag](
- f: (Int, Iterator[T]) => Iterator[U],
- preservesPartitioning: Boolean = false): RDD[U] = withScope {
- val cleanedF = sc.clean(f)
- new MapPartitionsRDD(
- this,
- (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
- preservesPartitioning)
- }
- sample
- /**
- * Return a sampled subset of this RDD.
- *
- * @param withReplacement can elements be sampled multiple times (replaced when sampled out)
- * @param fraction expected size of the sample as a fraction of this RDD's size
- * without replacement: probability that each element is chosen; fraction must be [0, 1]
- * with replacement: expected number of times each element is chosen; fraction must be greater
- * than or equal to 0
- * @param seed seed for the random number generator
- *
- * @note This is NOT guaranteed to provide exactly the fraction of the count
- * of the given [[RDD]].
- */
- def sample(
- withReplacement: Boolean,
- fraction: Double,
- seed: Long = Utils.random.nextLong): RDD[T] = {
- require(fraction>= 0,
- s"Fraction must be nonnegative, but got ${fraction}")
- withScope {
- require(fraction>= 0.0, "Negative fraction value:" + fraction)
- if (withReplacement) {
- new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
- } else {
- new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
- }
- }
- }
- union
- /**
- * Return the union of this RDD and another one. Any identical elements will appear multiple
- * times (use `.distinct()` to eliminate them).
- */
- def union(other: RDD[T]): RDD[T] = withScope {
- sc.union(this, other)
- }
- intersection
- /**
- * Return the intersection of this RDD and another one. The output will not contain any duplicate
- * elements, even if the input RDDs did.
- *
- * @note This method performs a shuffle internally.
- */
- def intersection(other: RDD[T]): RDD[T] = withScope {
- this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
- .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
- .keys
- }
- /**
- * Return the intersection of this RDD and another one. The output will not contain any duplicate
- * elements, even if the input RDDs did.
- *
- * @note This method performs a shuffle internally.
- *
- * @param partitioner Partitioner to use for the resulting RDD
- */
- def intersection(
- other: RDD[T],
- partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
- this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner)
- .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
- .keys
- }
- /**
- * Return the intersection of this RDD and another one. The output will not contain any duplicate
- * elements, even if the input RDDs did. Performs a hash partition across the cluster
- *
- * @note This method performs a shuffle internally.
- *
- * @param numPartitions How many partitions to use in the resulting RDD
- */
- def intersection(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
- intersection(other, new HashPartitioner(numPartitions))
- }
- distinct
- /**
- * Return a new RDD containing the distinct elements in this RDD.
- */
- def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
- map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
- }
- /**
- * Return a new RDD containing the distinct elements in this RDD.
- */
- def distinct(): RDD[T] = withScope {
- distinct(partitions.length)
- }
- groupByKey //class PairRDDFunctions
- /**
- * Group the values for each key in the RDD into a single sequence. Hash-partitions the
- * resulting RDD with the existing partitioner/parallelism level. The ordering of elements
- * within each group is not guaranteed, and may even differ each time the resulting RDD is
- * evaluated.
- *
- * @note This operation may be very expensive. If you are grouping in order to perform an
- * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
- * or `PairRDDFunctions.reduceByKey` will provide much better performance.
- */
- def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
- groupByKey(defaultPartitioner(self))
- }
- /**
- * Group the values for each key in the RDD into a single sequence. Allows controlling the
- * partitioning of the resulting key-value pair RDD by passing a Partitioner.
- * The ordering of elements within each group is not guaranteed, and may even differ
- * each time the resulting RDD is evaluated.
- *
- * @note This operation may be very expensive. If you are grouping in order to perform an
- * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
- * or `PairRDDFunctions.reduceByKey` will provide much better performance.
- *
- * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
- * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
- */
- def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
- // groupByKey shouldn't use map side combine because map side combine does not
- // reduce the amount of data shuffled and requires all map side data be inserted
- // into a hash table, leading to more objects in the old gen.
- val createCombiner = (v: V) => CompactBuffer(v)
- val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
- val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
- val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
- createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
- bufs.asInstanceOf[RDD[(K, Iterable[V])]]
- }
- /**
- * Group the values for each key in the RDD into a single sequence. Hash-partitions the
- * resulting RDD with into `numPartitions` partitions. The ordering of elements within
- * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
- *
- * @note This operation may be very expensive. If you are grouping in order to perform an
- * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
- * or `PairRDDFunctions.reduceByKey` will provide much better performance.
- *
- * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
- * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
- */
- def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
- groupByKey(new HashPartitioner(numPartitions))
- }
- reduceByKey
- /**
- * Merge the values for each key using an associative and commutative reduce function. This will
- * also perform the merging locally on each mapper before sending results to a reducer, similarly
- * to a "combiner" in MapReduce.
- */
- def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
- combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
- }
- /**
- * Merge the values for each key using an associative and commutative reduce function. This will
- * also perform the merging locally on each mapper before sending results to a reducer, similarly
- * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
- */
- def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
- reduceByKey(new HashPartitioner(numPartitions), func)
- }
- /**
- * Merge the values for each key using an associative and commutative reduce function. This will
- * also perform the merging locally on each mapper before sending results to a reducer, similarly
- * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
- * parallelism level.
- */
- def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
- reduceByKey(defaultPartitioner(self), func)
- }
- aggregateByKey
- /**
- * Aggregate the values of each key, using given combine functions and a neutral "zero value".
- * This function can return a different result type, U, than the type of the values in this RDD,
- * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
- * as in scala.TraversableOnce. The former operation is used for merging values within a
- * partition, and the latter is used for merging values between partitions. To avoid memory
- * allocation, both of these functions are allowed to modify and return their first argument
- * instead of creating a new U.
- */
- def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
- combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
- // Serialize the zero value to a byte array so that we can get a new clone of it on each key
- val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
- val zeroArray = new Array[Byte](zeroBuffer.limit)
- zeroBuffer.get(zeroArray)
- lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
- val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
- // We will clean the combiner closure later in `combineByKey`
- val cleanedSeqOp = self.context.clean(seqOp)
- combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
- cleanedSeqOp, combOp, partitioner)
- }
- /**
- * Aggregate the values of each key, using given combine functions and a neutral "zero value".
- * This function can return a different result type, U, than the type of the values in this RDD,
- * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
- * as in scala.TraversableOnce. The former operation is used for merging values within a
- * partition, and the latter is used for merging values between partitions. To avoid memory
- * allocation, both of these functions are allowed to modify and return their first argument
- * instead of creating a new U.
- */
- def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U,
- combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
- aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp)
- }
- /**
- * Aggregate the values of each key, using given combine functions and a neutral "zero value".
- * This function can return a different result type, U, than the type of the values in this RDD,
- * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
- * as in scala.TraversableOnce. The former operation is used for merging values within a
- * partition, and the latter is used for merging values between partitions. To avoid memory
- * allocation, both of these functions are allowed to modify and return their first argument
- * instead of creating a new U.
- */
- def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
- combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
- aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
- }
- sortByKey //class OrderedRDDFunctions; 全排序
- sortBy// 全排序, 调用了 sortByKey
- /**
- * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
- * `collect` or `save` on the resulting RDD will return or output an ordered list of records
- * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
- * order of the keys).
- */
- // TODO: this currently doesn't work on P other than Tuple2!
- def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
- : RDD[(K, V)] = self.withScope
- {
- val part = new RangePartitioner(numPartitions, self, ascending)
- new ShuffledRDD[K, V, V](self, part)
- .setKeyOrdering(if (ascending) ordering else ordering.reverse)
- }
- join//key 可重复
- /**
- * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
- * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
- * (k, v2) is in `other`. Performs a hash join across the cluster.
- */
- def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
- join(other, defaultPartitioner(self, other))
- }
- /**
- * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
- * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
- * (k, v2) is in `other`. Performs a hash join across the cluster.
- */
- def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = self.withScope {
- join(other, new HashPartitioner(numPartitions))
- }
- cogroup// 协分组, key 不重复显示
- /**
- * For each key k in `this` or `other1` or `other2` or `other3`,
- * return a resulting RDD that contains a tuple with the list of values
- * for that key in `this`, `other1`, `other2` and `other3`.
- */
- def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
- other2: RDD[(K, W2)],
- other3: RDD[(K, W3)],
- partitioner: Partitioner)
- : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
- if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
- throw new SparkException("HashPartitioner cannot partition array keys.")
- }
- val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
- cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
- (vs.asInstanceOf[Iterable[V]],
- w1s.asInstanceOf[Iterable[W1]],
- w2s.asInstanceOf[Iterable[W2]],
- w3s.asInstanceOf[Iterable[W3]])
- }
- }
- /**
- * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
- * list of values for that key in `this` as well as `other`.
- */
- def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
- : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
- if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
- throw new SparkException("HashPartitioner cannot partition array keys.")
- }
- val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
- cg.mapValues { case Array(vs, w1s) =>
- (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
- }
- }
- /**
- * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
- * tuple with the list of values for that key in `this`, `other1` and `other2`.
- */
- def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
- : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
- if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
- throw new SparkException("HashPartitioner cannot partition array keys.")
- }
- val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
- cg.mapValues { case Array(vs, w1s, w2s) =>
- (vs.asInstanceOf[Iterable[V]],
- w1s.asInstanceOf[Iterable[W1]],
- w2s.asInstanceOf[Iterable[W2]])
- }
- }
- /**
- * For each key k in `this` or `other1` or `other2` or `other3`,
- * return a resulting RDD that contains a tuple with the list of values
- * for that key in `this`, `other1`, `other2` and `other3`.
- */
- def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
- : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
- cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
- }
- /**
- * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
- * list of values for that key in `this` as well as `other`.
- */
- def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
- cogroup(other, defaultPartitioner(self, other))
- }
- /**
- * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
- * tuple with the list of values for that key in `this`, `other1` and `other2`.
- */
- def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
- : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
- cogroup(other1, other2, defaultPartitioner(self, other1, other2))
- }
- /**
- * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
- * list of values for that key in `this` as well as `other`.
- */
- def cogroup[W](
- other: RDD[(K, W)],
- numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
- cogroup(other, new HashPartitioner(numPartitions))
- }
- /**
- * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
- * tuple with the list of values for that key in `this`, `other1` and `other2`.
- */
- def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
- : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
- cogroup(other1, other2, new HashPartitioner(numPartitions))
- }
- /**
- * For each key k in `this` or `other1` or `other2` or `other3`,
- * return a resulting RDD that contains a tuple with the list of values
- * for that key in `this`, `other1`, `other2` and `other3`.
- */
- def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
- other2: RDD[(K, W2)],
- other3: RDD[(K, W3)],
- numPartitions: Int)
- : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
- cogroup(other1, other2, other3, new HashPartitioner(numPartitions))
- }
- cartesian// 笛卡尔积
- /**
- * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
- * elements (a, b) where a is in `this` and b is in `other`.
- */
- def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
- new CartesianRDD(sc, this, other)
- }
- pipe
- coalesce// 改变分区数, 减少分区不用 shuffle, 增加必须开启 shuffle 否则无效
- /**
- * Return a new RDD that is reduced into `numPartitions` partitions.
- *
- * This results in a narrow dependency, e.g. if you go from 1000 partitions
- * to 100 partitions, there will not be a shuffle, instead each of the 100
- * new partitions will claim 10 of the current partitions.
- *
- * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
- * this may result in your computation taking place on fewer nodes than
- * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
- * you can pass shuffle = true. This will add a shuffle step, but means the
- * current upstream partitions will be executed in parallel (per whatever
- * the current partitioning is).
- *
- * @note With shuffle = true, you can actually coalesce to a larger number
- * of partitions. This is useful if you have a small number of partitions,
- * say 100, potentially with a few partitions being abnormally large. Calling
- * coalesce(1000, shuffle = true) will result in 1000 partitions with the
- * data distributed using a hash partitioner. The optional partition coalescer
- * passed in must be serializable.
- */
- def coalesce(numPartitions: Int, shuffle: Boolean = false,
- partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
- (implicit ord: Ordering[T] = null)
- : RDD[T] = withScope {
- require(numPartitions> 0, s"Number of partitions ($numPartitions) must be positive.")
- if (shuffle) {
- /** Distributes elements evenly across output partitions, starting from a random partition. */
- val distributePartition = (index: Int, items: Iterator[T]) => {
- var position = (new Random(index)).nextInt(numPartitions)
- items.map { t =>
- // Note that the hash code of the key will just be the key itself. The HashPartitioner
- // will mod it with the number of total partitions.
- position = position + 1
- (position, t)
- }
- } : Iterator[(Int, T)]
- // include a shuffle step so that our upstream tasks are still distributed
- new CoalescedRDD(
- new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
- new HashPartitioner(numPartitions)),
- numPartitions,
- partitionCoalescer).values
- } else {
- new CoalescedRDD(this, numPartitions, partitionCoalescer)
- }
- }
- repartition// 改变分区数, 有 shuffle
- /**
- * Return a new RDD that has exactly numPartitions partitions.
- *
- * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
- * a shuffle to redistribute data.
- *
- * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
- * which can avoid performing a shuffle.
- */
- def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
- coalesce(numPartitions, shuffle = true)
- }
- repartitionAndSortWithinPartitions// 升序
- /**
- * Repartition the RDD according to the given partitioner and, within each resulting partition,
- * sort records by their keys.
- *
- * This is more efficient than calling `repartition` and then sorting within each partition
- * because it can push the sorting down into the shuffle machinery.
- */
- def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
- new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
- }
动作
- reduce
- collect
- count
- first
- take
- takeSample
- takeOrdered
- saveAsTextFile
- saveAsSequenceFile
- saveAsObjectFile
- countByKey
- foreach
来源: http://www.bubuko.com/infodetail-2807866.html