在项目中,遇到一个场景是,需要从Hive数据仓库中拉取数据,进行过滤、裁剪或者聚合之后生成中间结果导入MySQL。
对于这样一个极其普通的离线计算场景,有多种技术选型可以实现。例如,sqoop,MR,HSQL。
我们这里使用的spark,优点来说是两个:一是灵活性高,二是代码简洁。
1)灵活性高
相比sqoop和HSQL,spark可以更灵活的控制过滤和裁剪逻辑,甚至你可以通过外部的配置或者参数,来动态的调整spark的计算行为,提供定制化。
2)代码简洁
相比MR来说,代码量上少了很多。也无需实现MySQL客户端。
我抽象了一下需求,做了如下一个demo。
涉及的数据源有两个:Hive&MySQL;计算引擎:spark&spark-sql。我们的demo中分为两个步骤:
1)从Hive中读取数据,交给spark计算,最终输出到MySQL;
2)从MySQL中读取数据,交给spark计算,最终再输出到MySQL另一张表。
1、 数据准备
创建了Hive外部分区表
关于分区和外部表这里不说了。
- CREATE EXTERNAL TABLE `gulfstream_test.accounts`(
- `id` string COMMENT '用户id',
- `order_id` string COMMENT '订单id',
- `status` bigint COMMENT '用户状态',
- `count` decimal(16,9) COMMENT '订单数')
- COMMENT '用户信息'
- PARTITIONED BY (
- `year` string,
- `month` string,
- `day` string)
- ROW FORMAT DELIMITED
- FIELDS TERMINATED BY '\t'
- STORED AS INPUTFORMAT
- 'org.autonavi.udf.CustomInputFormat'
- OUTPUTFORMAT
- 'org.autonavi.udf.CustomHiveOutputFormat'
- LOCATION
- 'hdfs://mycluster-tj/***/acounts'
- TBLPROPERTIES (
- 'LEVEL'='1',
- 'TTL'='60',
- 'last_modified_by'='yangfan',
- 'last_modified_time'='2017-10-23',
- 'transient_lastDdlTime'='1508746808')
建立分区,并指定分区路径
这里分区使用的年月日三级分区。通过下面的命令将year=2017/month=10/day=23这个Hive分区的数据指向了location=hdfs://mycluster-tj/***/acounts/2017/10/23
- hive > alter table gulfstream_test.accounts add partition(year = '2017', month = '10', day = '23') location 'hdfs://mycluster-tj/***/acounts/2017/10/23';
查询一下分区是否建立成功
可以看到分区已经有了。
- show partitions gulfstream_test.accounts;
- OK
- partition
- year=2017/month=10/day=23
上传本地测试数据到hdfs
- hadoop fs -put a.txt hdfs://mycluster-tj/***/acounts/2017/10/23
看一下数据,取了前10行,原谅我数据比较假。
- monitor@bigdata-arch-client10 target]$ hadoop fs -cat hdfs://mycluster-tj/***/acounts/2017/10/23/a | head -10
- 0 0
- 1 1
- 2 2
- 3 3
- 4 4
- 5 5
- 6 6
- 7 7
- 8 8
- 9 9
在Hive中,也查一下前10条,是一样的。只是多了分区字段。
- hive (default)> select * from gulfstream_test.accounts where year=2017 and month=10 and day=23 limit 10;
- OK
- accounts.id accounts.order_id accounts.status accounts.count accounts.year accounts.month accounts.day
- 0 0 0 0 2017 10 23
- 1 1 1 1 2017 10 23
- 2 2 2 2 2017 10 23
- 3 3 3 3 2017 10 23
- 4 4 4 4 2017 10 23
- 5 5 5 5 2017 10 23
- 6 6 6 6 2017 10 23
- 7 7 7 7 2017 10 23
- 8 8 8 8 2017 10 23
- 9 9 9 9 2017 10 23
- Time taken: 1.38 seconds, Fetched: 10 row(s)
至此,测试数据准备好了。一共1000000条,1百万。
2、代码
1)POM依赖
可以通过pom依赖来看一下笔者使用的组件版本。
这里就不赘述了。
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
- <version>1.6.0</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.10</artifactId>
- <version>1.6.0</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.10</artifactId>
- <version>1.6.0</version>
- <scope>provided</scope>
- </dependency>
打包方式
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <archive>
- <manifest>
- <!--这里要替换成jar包main方法所在类 -->
- <mainClass>com.kangaroo.studio.algorithms.filter.LoadDB</mainClass>
- </manifest>
- </archive>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id> <!-- this is used for inheritance merges -->
- <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
2)java spark代码
先贴上代码,再说明
- package com.kangaroo.studio.algorithms.filter;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.sql.DataFrame;
- import org.apache.spark.sql.SQLContext;
- import org.apache.spark.sql.SaveMode;
- import org.apache.spark.sql.hive.HiveContext;
- import java.io.Serializable;
- import java.util.Properties;
- public class LoadDB implements Serializable {
- private SparkConf sparkConf;
- private JavaSparkContext javaSparkContext;
- private HiveContext hiveContext;
- private SQLContext sqlContext;
- /*
- * 初始化Load
- * 创建sparkContext, sqlContext, hiveContext
- * */
- public LoadDB() {
- initSparckContext();
- initSQLContext();
- initHiveContext();
- }
- /*
- * 创建sparkContext
- * */
- private void initSparckContext() {
- String warehouseLocation = System.getProperty("user.dir");
- sparkConf = new SparkConf().setAppName("from-to-mysql").set("spark.sql.warehouse.dir", warehouseLocation).setMaster("yarn-client");
- javaSparkContext = new JavaSparkContext(sparkConf);
- }
- /*
- * 创建hiveContext
- * 用于读取Hive中的数据
- * */
- private void initHiveContext() {
- hiveContext = new HiveContext(javaSparkContext);
- }
- /*
- * 创建sqlContext
- * 用于读写MySQL中的数据
- * */
- private void initSQLContext() {
- sqlContext = new SQLContext(javaSparkContext);
- }
- /*
- * 使用spark-sql从hive中读取数据, 然后写入mysql对应表.
- * */
- public void hive2db() {
- String url = "jdbc:mysql://10.93.84.53:3306/big_data?characterEncoding=UTF-8";
- String table = "accounts";
- Properties props = new Properties();
- props.put("user", "root");
- props.put("password", "1234");
- String query = "select * from gulfstream_test.accounts where year=2017 and month=10 and day=23";
- DataFrame rows = hiveContext.sql(query).select("id", "order_id", "status", "count");;
- rows.write().mode(SaveMode.Append).jdbc(url, table, props);
- }
- /*
- * 使用spark-sql从db中读取数据, 处理后再回写到db
- * */
- public void db2db() {
- String url = "jdbc:mysql://10.93.84.53:3306/big_data?characterEncoding=UTF-8";
- String fromTable = "accounts";
- String toTable = "accountsPart";
- Properties props = new Properties();
- props.put("user", "root");
- props.put("password", "1234");
- DataFrame rows = sqlContext.read().jdbc(url, fromTable, props).where("count < 1000");
- rows.write().mode(SaveMode.Append).jdbc(url, toTable, props);
- }
- public static void main(String[] args) {
- LoadDB loadDB = new LoadDB();
- System.out.println(" ---------------------- start hive2db ------------------------");
- loadDB.hive2db();
- System.out.println(" ---------------------- finish hive2db ------------------------");
- System.out.println(" ---------------------- start db2db ------------------------");
- loadDB.db2db();
- System.out.println(" ---------------------- finish db2db ------------------------");
- }
- }
说明:
- hive2db
核心动作是使用hiveContext.sql(query)执行了hiveSQL,过滤出Hive表中year=2017/month=10/day=23分钟的数据,返回一个DataFrame对象。
DataFrame是spark-sql数据处理的核心。对DataFrame的操作推荐这样一篇博客。你可以去使用这些方法,实现复杂的逻辑。
对DataFrame对象,我们使用了select裁剪了其中4列数据(id, order_id, status, count)出来,不过不裁剪的话,会有7列(加上分区的year,month,day)。
然后将数据以SaveMode.Append的方式,写入了mysql中的accounts表。
SaveMode.Append方式,数据会追加,而不会覆盖。如果想覆盖,还有一个常用的SaveMode.Overwrite。
最终accounts中的数据有1000000条,百万。
- db2db
db2db从刚刚生成的MySQL表accounts中读取出数据,也是返回了一个dataframe对象,通过执行where过滤除了其中id<1000的数据,这里正好是1000条。
然后写入了accountsPart。最终accountsPart数据应该有1000条。
3)编译和执行
编译完成后,生成jar包from-to-mysql-1.0-SNAPSHOT-jar-with-dependencies.jar
使用默认参数提交到yarn队列。
- spark - submit--queue = root.zhiliangbu_prod_datamonitor from - to - mysql - 1.0 - SNAPSHOT - jar - with - dependencies.jar
片刻之后,观察输出。已经全部finish了。
4)查看一下结果
我们到mysql中瞅一瞅。
accounts表
有没有注意到,其实不用建立mysql表!这个过程会自动给你创建,相当于if not exists。
- CREATE TABLE `accounts` (
- `id` text,
- `order_id` text,
- `status` bigint(20) DEFAULT NULL,
- `count` decimal(16,9) DEFAULT NULL
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8
简单看一下里面有多少数据。1百万
- MariaDB [big_data]> select count(1) from accounts ;
- +----------+
- | count(1) |
- +----------+
- | 1000000 |
- +----------+
- 1 row in set (0.32 sec)
acountsPart表
- CREATE TABLE `accountsPart` (
- `id` text,
- `order_id` text,
- `status` bigint(20) DEFAULT NULL,
- `count` decimal(16,9) DEFAULT NULL
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8
查看有多少数据,1000条,果然是没有问题的
- MariaDB [big_data]> select count(1) from accountsPart;
- +----------+
- | count(1) |
- +----------+
- | 1000 |
- +----------+
- 1 row in set (0.00 sec)
到此为止。