1. 引入
云上对象存储的廉价让不少公司将其作为主要的存储方案, 而 Hudi 作为数据湖解决方案, 支持对象存储也是必不可少. 之前 AWS EMR 已经内置集成 Hudi, 也意味着可以在 S3 上无缝使用 Hudi. 当然国内用户可能更多使用阿里云 OSS 作为云上存储方案, 那么如果用户想基于 OSS 构建数据湖, 那么 Hudi 是否支持呢? 随着 Hudi 社区主分支已经合并了支持 OSS 的 PR, 现在只需要基于 master 分支 build 版本即可, 或者等待下一个版本释出便可直接使用, 经过简单的配置便可将数据写入 OSS.
2. 配置
2.1 pom 依赖
需要额外添加的主要 pom 依赖如下
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-aliyun</artifactId>
- <version>3.2.1</version>
- </dependency>
- <dependency>
- <groupId>com.aliyun.oss</groupId>
- <artifactId>aliyun-sdk-oss</artifactId>
- <version>3.8.1</version>
- </dependency>
2.2 core-site.xml 配置
若需访问 OSS, 需要修改 core-site.xml, 关键配置如下
- <property>
- <name>fs.defaultFS</name>
- <value>oss://bucketname/</value>
- </property>
- <property>
- <name>fs.oss.endpoint</name>
- <value>oss-endpoint-address</value>
- <description>Aliyun OSS endpoint to connect to.</description>
- </property>
- <property>
- <name>fs.oss.accessKeyId</name>
- <value>oss_key</value>
- <description>Aliyun access key ID</description>
- </property>
- <property>
- <name>fs.oss.accessKeySecret</name>
- <value>oss-secret</value>
- <description>Aliyun access key secret</description>
- </property>
- <property>
- <name>fs.oss.impl</name>
- <value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>
- </property>
3. 源码
示例源码如下
- import org.apache.hudi.QuickstartUtils.*;
- import org.apache.spark.API.java.JavaSparkContext;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- import java.io.IOException;
- import java.util.List;
- import static org.apache.hudi.QuickstartUtils.convertToStringList;
- import static org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs;
- import static org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME;
- import static org.apache.spark.sql.SaveMode.Overwrite;
- public class OssHudiDemo {
- public static void main(String[] args) throws IOException {
- SparkSession spark = SparkSession.builder().appName("Hoodie Datasource test")
- .master("local[2]")
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .config("spark.io.compression.codec", "snappy")
- .config("spark.sql.hive.convertMetastoreParquet", "false")
- .getOrCreate();
- JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
- String tableName = "hudi_trips_cow";
- String basePath = "/tmp/hudi_trips_cow";
- DataGenerator dataGen = new DataGenerator();
- List<String> inserts = convertToStringList(dataGen.generateInserts(10));
- Dataset<Row> df = spark.read().JSON(jsc.parallelize(inserts, 2));
- df.write().format("org.apache.hudi").
- options(getQuickstartWriteConfigs()).
- option(TABLE_NAME, tableName).
- mode(Overwrite).
- save(basePath);
- Dataset<Row> roViewDF = spark.read().format("org.apache.hudi").load(basePath + "/*/*/*");
- roViewDF.registerTempTable("hudi_ro_table");
- spark.sql("select * from hudi_ro_table").show(false);
- spark.stop();
- }
- }
即先写入 OSS, 下图可以看到 OSS 的 Bucket 中已经成功写入了数据, 然后再通过 spark 查询写入的结果.
部分查询结果如下
|20200421205942 |20200421205942_2_10 |6fd496f8-ebee-4f67-8f86-783ff3fed3ab|asia/india/chennai |1f71bed9-833b-4fca-8b4b-4cd014bdf88a-0_2-22-30_20200421205942.parquet|0.40613510977307 |0.5644092139040959 |driver-213|0.798706304941517 |0.02698359227182834|17.851135255091155|asia/india/chennai |rider-213|0.0|6fd496f8-ebee-4f67-8f86-783ff3fed3ab|
所有源代码已经上传至 https://github.com/leesf/oss-hudi-demo
4. 最后
本篇文章很简单, 只用作展示如何通过 Hudi 将数据写入 OSS. 当数据写入 OSS 后, 便可打通阿里云上几乎所有产品, 这使得基于阿里云技术栈进行数据湖分析将变得非常简单, 比如使用 DLA(Data Lake Analytics), 对标 AWS 的 Athena, 对 Hudi 数据集进行分析查询, 一体化的流程会让分析变得异常简单.
来源: https://www.cnblogs.com/leesf456/p/12773101.html