大数据 javaapi 老汤 rdd
RDD java api其实底层是调用了scala的api来实现的,所以我们有必要对java api是怎么样去调用scala api,我们先自己简单的实现一个scala版本和java版本的RDD和SparkContext
一、简单实现scala版本的RDD和SparkContext
- class RDD[T](value: Seq[T]) {
- //RDD的map操作
- def map[U](f: T => U): RDD[U] = {
- new RDD(value.map(f))
- }
- def iterator[T] = value.iterator
- }
- class SparkContext {
- //创建一个RDD
- def createRDD(): RDD[Integer] = new RDD[Integer](Seq(1, 2, 3))
- }
二、简单实现java版本的RDD和SparkContext
- //这个时java中的一个接口
- //我们可以将scala中的map需要的函数其实就是对应着java中的一个接口
- package com.twq.javaapi.java7.
- function;
- public interface Function < T1,
- R > extends Serializable { R call(T1 v1) throws Exception;
- }
- //这边实现的java版的RDD和SparkContext其实还是用scala代码实现,只不过这些scala代码可以被java代码调用了
- import java.util. {
- Iterator = > JIterator
- }
- import scala.collection.JavaConverters._ import com.twq.javaapi.java7.
- function. {
- Function = > JFunction
- }
- //每一个JavaRDD都会含有一个scala的RDD,用于调用该RDD的api
- class JavaRDD[T](val rdd: RDD[T]) {
- def map[R](f: JFunction[T, R]) : JavaRDD[R] = //这里是关键,调用scala RDD中的map方法
- //我们将java的接口构造成scala RDD的map需要的函数函数
- new JavaRDD(rdd.map(x = > f.call(x))) //我们需要将scala的Iterator转成java版的Iterator
- def iterator: JIterator[T] = rdd.iterator.asJava
- }
- //每个JavaSparkContext含有一个scala版本的SparkContext
- class JavaSparkContext(sc: SparkContext) { def this() = this(new SparkContext()) //转调scala版本的SparkContext来实现JavaSparkContext的功能
- def createRDD() : JavaRDD[Integer] = new JavaRDD[Integer](sc.createRDD())
- }
三、写java代码调用rdd java api
- package com.twq.javaapi.java7;
- import com.twq.javaapi.java7.function.Function;
- import com.twq.rdd.api.JavaRDD;
- import com.twq.rdd.api.JavaSparkContext;
- import java.util.Iterator;
- /**
- * Created by tangweiqun on 2017/9/16.
- */
- public class SelfImplJavaRDDTest {
- public static void main(String[] args) {
- //初始化JavaSparkContext
- JavaSparkContext jsc = new JavaSparkContext();
- //调用JavaSparkContext的api创建一个RDD
- JavaRDD<Integer> firstRDD = jsc.createRDD();
- //对创建好的firstRDD应用JavaRDD中的map操作
- JavaRDD<String> strRDD = firstRDD.map(new Function<Integer, String>() {
- @Override
- public String call(Integer v1) throws Exception {
- return v1 + "test";
- }
- });
- //将得到的RDD的结果打印,结果为
- //1test
- //2test
- //3test
- Iterator<String> result = strRDD.iterator();
- while (result.hasNext()) {
- System.out.println(result.next());
- }
- }
- }
以上就是RDD java api调用scala api的实现原理,虽然只举了map操作,但是其他的类似于flatMap操作的实现都是类似的
接下来可以详细了解RDD java的每一个api了
我们可以参考spark core RDD api来详细理解scala中的每一个api。。。
spark2.x由浅入深深到底系列六之RDD java api调用scala api的原理
来源: http://www.bubuko.com/infodetail-2313698.html