一什么是 Sqoop
Sqoop 是一个在结构化数据和 Hadoop 之间进行批量数据迁移的工具, 结构化数据可以是 MysqlOracle 等 RDBMSSqoop 底层用 MapReduce 程序实现抽取转换加载, MapReduce 天生的特性保证了并行化和高容错率, 而且相比 Kettle 等传统 ETL 工具, 任务跑在 Hadoop 集群上, 减少了 ETL 服务器资源的使用情况在特定场景下, 抽取过程会有很大的性能提升
如果要用 Sqoop, 必须正确安装并配置 Hadoop, 因依赖于本地的 hadoop 环境启动 MR 程序; mysqloracle 等数据库的 JDBC 驱动也要放到 Sqoop 的 lib 目录下本文针对的是 Sqoop1, 不涉及到 Sqoop2, 两者有大区别, 感兴趣的读者可以看下官网说明
二 import
import 是数据从 RDBMS 导入到 Hadoop 的工具
2.1split
Sqoop 并行化是启多个 map task 实现的,-m(或 --num-mappers)参数指定 map task 数, 默认是四个并行度不是设置的越大越好, map task 的启动和销毁都会消耗资源, 而且过多的数据库连接对数据库本身也会造成压力在并行操作里, 首先要解决输入数据是以什么方式负债均衡到多个 map 的, 即怎么保证每个 map 处理的数据量大致相同且数据不重复 --split-by 指定了 split column, 在执行并行操作时(多个 map task),Sqoop 需要知道以什么列 split 数据, 其思想是:
1 先查出 split column 的最小值和最大值
2 然后根据 map task 数对 (max-min) 之间的数据进行均匀的范围切分
例如 id 作为 split column, 其最小值是 0 最大值 1000, 如果设置 4 个 map 数, 每个 map task 执行的查询语句类似于: SELECT * FROM sometable WHERE id >= lo AND id < hi, 每个 task 里 (lo,hi) 的值分别是 (0, 250), (250, 500), (500, 750), and (750, 1001)
Sqoop 不能在多列字段上进行拆分, 如果没有索引或者有组合键, 必须显示设置 splitting column; 默认的主键作为 split column, 如果表里没有主键或者没有指定 --split-by, 就要设置 num-mappers 1 或者 --autoreset-to-one-mapper, 这样就只会启动一个 task
从上面的分析过程可以看到 Sqoop 以理想化方式根据 split column 将数据切分成多个范围, 如果 split 键的值不是均匀分布, 每个任务分配的数据量可能相差很大导致数据倾斜
2.2 参数
--driver: 指定 JDBC 驱动, 默认 Mysql
--table: 指定查询的表
--columns: 指定从源数据库导入的列当没有设置 --table 参数, 就默认查询表中所有字段, 实现方式是在数据库执行一个查询语句, 就可得到每个字段及其对应的类型.
--where: 查询条件. 如果设置了 table 参数, 就以 tablecolumnswhere 三个参数拼接成的 SQL 查询数据
--query: 自定义查询 SQL, 语句要有 $CONDITIONS 关键字, 作用是动态替换, 当获取默认 boundary query 时,$CONDITIONS 会替换成(1=1); 获取查询的数据列和其对应的字段类型时 $CONDITIONS 会替换成(1=0)table 和 query 不能同时设置
--boundary-query: 指定 split 的 sql, 如果没有设置, 且有 --table 参数, 生成的 split sql 是根据 tablewhere 条件拼出来的
如果设置了 --query 参数, split sql 是基于 query sql 的子查询:
需要特别注意的是, 有的数据库对子查询没有进行优化(如 Mysql), 查询性能会很低, 这就要自定义 boundary-query, 提高查询效率
2.3HDFS
数据直接导入到 HDFS, 按行读取并写入到 HDFS 文件, 源表里的每一行数据在 HDFS 里作为单独记录, 记录可以是文本格式 (每行一个记录) 或 AvroSequenceFile 二进制格式导入过程可以并行, 因此可能生成多文件
--append: 生成的文件追加到目标目录里
--delete-target-dir: 如果目标目录已经存在, 会先把目录删掉, 类似 overwrite
执行上面的命令后, 可以看到详细的日志: 输入数据是怎么 split 的 mapreduce 执行进度 mapreduce 的 URL 等
2.4Hive
--hive-import 参数指定数据导入到 hive 表:
--target-dir: 需要指定该参数, 数据首先写入到该目录, 过程和直接导入 HDFS 是一样
--hive-drop-import-delims: 删除 string 字段内的特殊字符, 如果 Hive 使用这些字符作为分隔符, hive 的字段会解析错误出现错位的情况它的内部是用正则表达式替换的方式把 \ n, \r, \01 替换成 ""--null-string/--null-non-string: 指定空字段的值 Sqoop 默认空数据存的是 NULL" 字符串, 但 hive 把空解析成 \ N, 因此当文件存储的空是默认的 "NULL" 字符串, hive 就不能正常读取文件中的空值了
数据 import 到 hive 表的过程: 完成源数据写入到 hdfs 后, 就执行 LOAD DATA INPATH 命令把 target-dir 里的文件 LOAD 到 hive 表:
2.5Hbase
--hbase-table 指定数据直接导入到 Hbase 表而不是 HDFS, 对于每个输入行都会转换成 HBase 的 put 操作, 每行的 key 取自输入的列, 值转换成 stringUTF-8 格式存储在单元格里; 所有的列都在同一列簇下, 不能指定多个个列簇
然后通过 Hbase shell 查看表数据量数据,
参数详细说明:
--hbase-create-table: 当 HTable 不存在或列簇不存在, Sqoop 根据 HBase 的默认配置自动新建表; 如果没有指定该参数, 就会报异常
--hbase-row-key: 指定 row key 使用的列默认是 split-by 作为 row key, 如果没有指定, 会把源表的主键作为 row key; 如果 row key 是多个列组成的, 多个列必须用逗号隔开, 生成的 row key 是由用下划线隔开 (`ID`_`RUN_ID`) 的字段组合
--column-family: 指定列簇名, 所有的输出列都在同一列簇下
三 export
export 是 HDFS 里的文件导出到 RDBMS 的工具, 不能从 hivehbase 导出数据, 且 HDFS 文件只能是文本格式如果要把 hive 表数据导出到 RDBMS, 可以先把 hive 表通过查询写入到一个临时表, 临时用文本格式, 然后再从该临时表目录里 export 数据
3.1task 数
Sqoop 从 HDFS 目录里读取文件, 所以启动的 map task 数依赖于 - m 参数文件大小文件数量块大小等, 可以参考这篇文章
3.2 插入 / 更新
默认情况下 Sqoop 在数据导出到 RDBMS 时, 每行记录都转换成数据库的 INSERT 语句, 但也支持插入 / 更新模式, 即根据一定规则判断, 如果是新记录用 INSERT 语句, 否则就 UPDATE, 设置 --update-mode allowinsert 参数启用该功能, 插入 / 更新操作依赖于目标数据库
对于 Mysql, 使用 INSERT INTO ON DUPLICATE KEY UPDATE 语法, 用户不能指定列来判断是插入还是更新, 而是依赖于表的唯一约束, Mysql 在插入数据时, 如果是因唯一约束引起的错误, 就更新数据行 Sqoop 会忽略 --update-key 参数, 但要至少指定一个有效列, 才能启用更新模式
3.3 参数
--columns: 指定插入目标数据库的字段, sqoop 直接读取 hdfs 文件并把记录解析成多个字段, 此时解析后的记录是没有字段名的, 是通过位置和 columns 列表对应的, 数据库插入的 sql 类似于: insert into _table (c1,c2...) value(v1,v2...)
--export-dir: 指定 HDFS 输入文件的目录
--input-fields-terminated-by: 字段之间分隔符
--input-lines-terminated-by: 行分隔符
四问题及优化
4.1Hive 不支持的数据类型
关系型数据库的字段类型和 hive 的字段类型还是有差别的, 所以 Sqoop 有一个映射关系, 把 RDBMS 中的类型映射到 Hive 类型在 create hive 表时, 会根据 RDBMS 类型和 hive 类型进行映射进而设置 hive 表字段类型, 如果没有匹配到, 就会报异常, 如 VARBINARY:
其解决方案有三种:
1 在 --query 参数内显示对字段进行转换, 如 VARBINARY 转换成 VARCHAR, 而 Sqoop 会默认的把 VARCHAR 转换成 Hive 的 STRING.
2 增加 --map-column-hive 参数, 显示把字段映射到 Hive 指定的类型
3 修改 HiveTypes 类, 使 Sqoop 支持对特定类型 (如: VARBINARY) 的映射, 这种方案相对以上两种可以一劳永逸, 但要重编译 sqoop 源码
类型映射逻辑如下:
4.2Java 不支持的类型
Sqoop 创建 ORM 对象, 数据库中的字段映射到 Java 属性, 用于读取数据库 ResultSet 对象并解析字段, 需要把数据库的类型映射到 java 类型, 如果没有映射到, 就会报错解决方案也有三种:
1 在 --query 参数内显示对字段进行转换
2 设置 --map-column-java 参数
3 修改 ConnManager 类
映射逻辑代码:
4.3 特殊字符
当 \ t 特殊字符导入到 hive 后, hive 字段可能解析出错解决方法是修改 FieldFormatter 类, 使 Sqoop 可以删除或替换掉字段数据中包含 \ t 的特殊字符:
4.4 字段错位
使用 --query 和 --columns 参数时, 如果 columns 设置的列顺序和 query 列顺序不同, 会有个疑惑是 import 后的字段和实际字段的值不一样, 这是因为从数据库查询的 ResultSet 对象序列化到实体对象时, column 的值是根据索引取的
例如 readInteger 的代码:
如果要改为 columns 的字段值是根据字段名取而不是根据索引位置取, 可以更改一下几个地方的代码: ClassWriterJdbcWritableBridge
来源: http://www.jianshu.com/p/be33f4b5c62e