- /*
- * spark-submit --queue=root.zhiliangbu_prod_datamonitor spark-join-1.0-SNAPSHOT-jar-with-dependencies.jar
- * */
- public class Join implements Serializable {
- private transient JavaSparkContext javaSparkContext;
- private transient HiveContext hiveContext;
- /*
- * 初始化Load
- * 创建sparkContext, sqlContext, hiveContext
- * */
- public Join() {
- initSparckContext();
- initHiveContext();
- }
- /*
- * 创建sparkContext
- * */
- private void initSparckContext() {
- String warehouseLocation = System.getProperty("user.dir");
- SparkConf sparkConf = new SparkConf()
- .setAppName("spark-join")
- .set("spark.sql.warehouse.dir", warehouseLocation)
- .setMaster("yarn-client");
- javaSparkContext = new JavaSparkContext(sparkConf);
- }
- /*
- * 创建hiveContext
- * 用于读取Hive中的数据
- * */
- private void initHiveContext() {
- hiveContext = new HiveContext(javaSparkContext);
- }
- public void join() {
- /*
- * 生成rdd1
- * */
- String query1 = "select * from gulfstream_test.orders";
- DataFrame rows1 = hiveContext.sql(query1).select("order_id", "driver_id");
- JavaPairRDD<String, String> rdd1 = rows1.toJavaRDD().mapToPair(new PairFunction<Row, String, String>() {
- @Override
- public Tuple2<String, String> call(Row row) throws Exception {
- String orderId = (String)row.get(0);
- String driverId = (String)row.get(1);
- return new Tuple2<String, String>(driverId, orderId);
- }
- });
- /*
- * 生成rdd2
- * */
- String query2 = "select * from gulfstream_test.drivers";
- DataFrame rows2 = hiveContext.sql(query2).select("driver_id", "car_id");
- JavaPairRDD<String, String> rdd2 = rows2.toJavaRDD().mapToPair(new PairFunction<Row, String, String>() {
- @Override
- public Tuple2<String, String> call(Row row) throws Exception {
- String driverId = (String)row.get(0);
- String carId = (String)row.get(1);
- return new Tuple2<String, String>(driverId, carId);
- }
- });
- /*
- * join
- * */
- System.out.println(" ****************** join *******************");
- JavaPairRDD<String, Tuple2<String, String>> joinRdd = rdd1.join(rdd2);
- Iterator<Tuple2<String, Tuple2<String, String>>> it1 = joinRdd.collect().iterator();
- while (it1.hasNext()) {
- Tuple2<String, Tuple2<String, String>> item = it1.next();
- System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 );
- }
- /*
- * leftOuterJoin
- * */
- System.out.println(" ****************** leftOuterJoin *******************");
- JavaPairRDD<String, Tuple2<String, Optional<String>>> leftOuterJoinRdd = rdd1.leftOuterJoin(rdd2);
- Iterator<Tuple2<String, Tuple2<String, Optional<String>>>> it2 = leftOuterJoinRdd.collect().iterator();
- while (it2.hasNext()) {
- Tuple2<String, Tuple2<String, Optional<String>>> item = it2.next();
- System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 );
- }
- /*
- * rightOuterJoin
- * */
- System.out.println(" ****************** rightOuterJoin *******************");
- JavaPairRDD<String, Tuple2<Optional<String>, String>> rightOuterJoinRdd = rdd1.rightOuterJoin(rdd2);
- Iterator<Tuple2<String, Tuple2<Optional<String>, String>>> it3 = rightOuterJoinRdd.collect().iterator();
- while (it3.hasNext()) {
- Tuple2<String, Tuple2<Optional<String>, String>> item = it3.next();
- System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 );
- }
- }
- public static void main(String[] args) {
- Join sj = new Join();
- sj.join();
- }
- }
来源: http://www.cnblogs.com/kangoroo/p/7778962.html