敬请期待该系列的后续内容。
在本教程系列的第 1 部分中,您学习了如何将一个表从 Informix® 移动到 Spark,如何将整个数据库放在 Spark 中,以及如何为 Informix 构建一种特定方言。
第 2 部分将重点介绍分析数据并从中获取基本洞察。完全不需要掌握 Informix 知识。
您将使用 Informix 商店演示数据库,它模拟了一个企业数据库。该公司是一家向小商店批发商品的体育设备批发商。本教程仅使用一个表:orders 表。下面给出了导入 Apache Spark 后的 orders 表的模式。
对于这个数据库图,稍微说明一下:在原始数据库中,有主键、外键、约束条件、原生数据类型(比如
)。所有这些都转换为 Spark 的数据类型和简化的结构;不需要索引、约束条件或键。
- SERIAL
第一个练习是测量仓库从客户下单到订单发货需要多长时间。
诚然,使用 Spark 完成此操作有点大材小用,但您会在 Spark 分析世界中有越来越多的发现!
可以从 GitHub 下载相关代码。
准备就绪。第一个导入可能看起来有点不寻常,因为 Spark 有大量预定义的函数。如果需要,org.apache.spark.sql.functions 包中的函数列表是一个很有帮助的参考。
在下面的代码中,将
替换为
- datediff
以导入所有函数。但是,对于本教程,只需要
- *
。您或许已对它的作用有所了解。
- datediff
- import
- static
- org
- .
- apache
- .
- spark
- .
- sql
- .
- functions
- .
- datediff
- ;
导入类中的其余部分类似于第 1 部分中使用的类。我会在分析代码的过程中分享这些导入。否则,很容易与同名的引用类相混淆。
- import
- java
- .
- sql
- .
- Connection
- ;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- import org.apache.spark.sql.jdbc.JdbcDialect;
- import org.apache.spark.sql.jdbc.JdbcDialects;
- import net.jgp.labs.informix2spark.utils.Config;
- import net.jgp.labs.informix2spark.utils.ConfigManager;
- import net.jgp.labs.informix2spark.utils.InformixJdbcDialect;
- import net.jgp.labs.informix2spark.utils.K;
是时候启动应用程序了:
- public
- class
- TimeToShipApp
- {
- public static void main(String[] args) {
- TimeToShipApp app = new TimeToShipApp();
- app.start();
- }
- private void start() {
使用 Spark 的本地模式启动一个会话:
- SparkSession
- spark
- =
- SparkSession
- .builder()
- .appName("Time to Ship")
- .master("local")
- .getOrCreate();
尽管这个示例不需要 Informix 方言,但注册该方言是一种不错的做法:
- JdbcDialect
- dialect
- =
- new
- InformixJdbcDialect
- ();
- JdbcDialects.registerDialect(dialect);
获取 Informix 参数:
- Config
- config
- =
- ConfigManager
- .
- getConfig
- (
- K
- .
- INFORMIX
- );
创建想要处理的所有表的列表(并添加到您的数据湖)。在本例中,这似乎大材小用了,因为我们只使用了一个表,但可在其他示例中重用同样的想法(而且重复练习有助于学习)。
- List
- <
- String
- >
- tables
- =
- new
- ArrayList
- <>();
- tables.add("orders");
众所周知,映射包含通过键来建立索引的值。来自 Spark 的所有数据帧都存储在一个映射中,该映射的键是表的名称,值是包含您的数据的数据帧。
- Map
- <
- String
- ,
- Dataset
- <
- Row
- >>
- datalake
- =
- new
- HashMap
- <>();
- for (String table : tables) {
- System.out.print("Loading table [" + table
- + "] ...");
- Dataset<Row> df = spark.read()
- .format("jdbc")
- .option("url", config.getJdbcUrl())
- .option("dbtable", table)
- .option("user", config.getUser())
- .option("password", config.getPassword())
- .option("driver", config.getDriver())
- .load();
- datalake.put(table, df);
- System.out.println("done.");
- }
目前为止,执行结果为:
- Loading
- table
- [
- orders
- ]
- ...
- done
- .
- We have loaded 1 table(s) in our data lake.
您现在已准备好探索该数据。使用
数据帧:
- ordersDF
- Dataset
- <
- Row
- >
- ordersDf
- =
- datalake
- .
- get
- (
- "orders"
- );
使用
方法创建一个新列。这个新列是执行 datediff() Spark SQL 函数的结果。此函数接受 2 个参数: 结束日期和开始日期。
- withColumn()
- ordersDf
- =
- ordersDf
- .
- withColumn
- (
- "time_to_ship",
- datediff(ordersDf.col("ship_date"), ordersDf.col("order_date")));
请注意:
- datediff
- (
- ordersDf
- .
- col
- (
- "ship_date"
- ),
- ordersDf
- .
- col
- (
- "order_date"
- ))
不同于:
- datediff
- (
- ordersDf
- .
- col
- (
- "order_date"
- ),
- ordersDf
- .
- col
- (
- "ship_date"
- ))
您实际上执行的是减法。所以,应首先放入最大的数(或更近的日期);否则您将得到负数。
此刻,如果输出数据帧的模式,就会看到:
- ordersDf
- .
- printSchema
- ();
- root
- |-- order_num: integer (nullable = false)
- |-- order_date: date (nullable = true)
- |-- customer_num: integer (nullable = false)
- |-- ship_instruct: string (nullable = true)
- |-- backlog: string (nullable = true)
- |-- po_num: string (nullable = true)
- |-- ship_date: date (nullable = true)
- |-- ship_weight: decimal(8,2) (nullable = true)
- |-- ship_charge: decimal(6,2) (nullable = true)
- |-- paid_date: date (nullable = true)
- |-- time_to_ship: integer (nullable = true)
看到您添加的列了吗?该列名为
,类型为整数(而且可以为
- time_to_ship
)。
- null
如果查看该数据:
- ordersDf
- .
- show
- (
- 10
- );
- System.out.println("We have " + ordersDf.count() + " orders");
- +---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
- |order_num|order_date|customer_num| ship_instruct|backlog| po_num| ship_date|ship_weight|ship_charge| paid_date|time_to_ship|
- +---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
- | 1001|2008-05-20| 104|express ...| n|B77836 |2008-06-01| 20.40| 10.00|2008-07-22| 12|
- | 1002|2008-05-21| 101|PO on box; delive...| n|9270 |2008-05-26| 50.60| 15.30|2008-06-03| 5|
- | 1003|2008-05-22| 104|express ...| n|B77890 |2008-05-23| 35.60| 10.80|2008-06-14| 1|
- | 1004|2008-05-22| 106|ring bell twice ...| y|8006 |2008-05-30| 95.80| 19.20| null| 8|
- | 1005|2008-05-24| 116|call before deliv...| n|2865 |2008-06-09| 80.80| 16.20|2008-06-21| 16|
- | 1006|2008-05-30| 112|after 10 am ...| y|Q13557 | null| 70.80| 14.20| null| null|
- | 1007|2008-05-31| 117| null| n|278693 |2008-06-05| 125.90| 25.20| null| 5|
- | 1008|2008-06-07| 110|closed Monday ...| y|LZ230 |2008-07-06| 45.60| 13.80|2008-07-21| 29|
- | 1009|2008-06-14| 111|next door to groc...| n|4745 |2008-06-21| 20.40| 10.00|2008-08-21| 7|
- | 1010|2008-06-17| 115|deliver 776 King ...| n|429Q |2008-06-29| 40.60| 12.30|2008-08-22| 12|
- +---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
- only showing top 10 rows
- We have 23 orders
可以看到,订单 #1006 在
列中的值为 null,因为它还未发货。分析发货时间时,您会认识到它不属于亚马逊金牌服务!
- time_to_ship
接下来,您想要删除 null 值,但数据帧是不可变的,这意味着无法更改数据。要解决此问题,请创建一个新数据帧并排除您不喜欢的值。数据帧不是 SQL 表,所以没有
。
- DELETE FROM
- Dataset
- <
- Row
- >
- ordersDf2
- =
- ordersDf
- .
- filter
- (
- "time_to_ship IS NOT NULL");
- ordersDf2.printSchema();
- ordersDf2.show(5);
- System.out.println("We have " + ordersDf2.count()
- + " delivered orders");
- }
- }
输出为:
- +---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
- |order_num|order_date|customer_num| ship_instruct|backlog| po_num| ship_date|ship_weight|ship_charge| paid_date|time_to_ship|
- +---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
- | 1001|2008-05-20| 104|express ...| n|B77836 |2008-06-01| 20.40| 10.00|2008-07-22| 12|
- | 1002|2008-05-21| 101|PO on box; delive...| n|9270 |2008-05-26| 50.60| 15.30|2008-06-03| 5|
- | 1003|2008-05-22| 104|express ...| n|B77890 |2008-05-23| 35.60| 10.80|2008-06-14| 1|
- | 1004|2008-05-22| 106|ring bell twice ...| y|8006 |2008-05-30| 95.80| 19.20| null| 8|
- | 1005|2008-05-24| 116|call before deliv...| n|2865 |2008-06-09| 80.80| 16.20|2008-06-21| 16|
- +---------+----------+------------+--------------------+-------+----------+----------+-----------+-----------+----------+------------+
- only showing top 5 rows
- We have 22 delivered orders
Spark 基于惰性计算的概念。我将此过程想成您要求一个小孩做某件事:清洁房间,将要洗的衣服放在篮子中,将书放在书架上……通常,在您发出类似“开始”的指令(而且有时要用稍微大点的声音)后,这些转变才会开始发生。这是他们等待开始行动的行动信号。Spark 中的原理完全相同。
这里的负责人名为 Catalyst。Catalyst 是一个优化器,负责查找执行转换的最佳方式。此外,可以将此操作想成一个小孩需要从书架上拿走他的衣服,才能将书放回去。在 V2.2 中,Spark 向 Catalyst 添加了一个基于成本的优化器,这是来自 IBM 的一项重要贡献。
对 Apache Spark 而言,数据帧几乎就像关系数据库世界中的一个表。只是几乎这样……
数据和
- 更新
新行。数据帧不同。要执行这些操作,需要每次都创建一个新数据帧。这是您经常要做的事。
- 插入
属性)。 与表不同的是,您找不到索引、外键、组合键或触发器。
- nullable
开发人员使用了数据帧 API。相比访问 Facebook,您将更频繁地访问 Class Dataset 页面!可以直接从这个 API 访问列信息,执行连接 (join) 和合并 (union) 等操作。(本系列第 3 部分将探讨这个 API。)
在 Java 中,将数据帧实现为
。存储依赖于集群中的分区。Spark 提供了一个更低级的 API,可使用它重新分区并了解执行计划,但这不属于本文的讨论范围。从 Spark 2 开始,存储由一个名为 Tungsten 的组件进行处理,该组件比 Java/Scala 对象更高效。
- 数据集 < Row >
图 2 显示了一个 Spark 数据帧的 API、实现和存储。
点击查看大图
希望您现在已经更好地了解了 Spark 如何存储数据,如何通过数据帧 API 处理数据。您还了解了内置函数和在何处找到它们。最重要的是,您现在已为构建更复杂的分析和机器学习打好了基础……这是本教程系列接下来要讨论的主题。
来源: http://www.ibm.com/developerworks/cn/web/wa-lo-base-openwhisk-ontime-monitor-v1/index.html