添加外部数据源,获取更多洞察!
敬请期待该系列的后续内容。
在本系列的前 3 部分中,您学习了如何:
您可能认为,您可以对传统关系数据库执行您目前完成的所有操作。完全正确。现在,是时候通过添加另外两个数据源来发现 Spark 的一些功能了。
昨天早上,在庆祝国际咖啡日时,您无意中听到销售副总裁与新销售人员的对话。他们在讨论让销售人员赚得盆满钵满的销售区,以及该区域的零售店。顺便提醒一下,您的公司向全美国的零售店批发体育设备。尽管您没有完全理解他们的销售行话,但您对他们的数据知识感到非常失望。您不得不打断他们:"我可以分析销售数据,将其与每个邮政区的平均收入和此区域的人口规模交叉分析,然后可以了解哪个邮政区需要更多关注。"
您知道您会通过这番话交到一些新朋友。现在您需要提供一些分析数据。
您的第一反应是访问 IBM 的 developerWorks,这很不错。但是,您需要两个额外的数据集:每个邮政区的人口和收入。
要获得这些数据集,可以求助两个美国政府部门: 美国人口统计局 和 国内收入署 (IRS) 。针对本教程的用途,我们使用了人口统计局的一个经过整理的版本,因为原始数据有点难以理解。
对于 IRS 数据,第一个练习是使用原始 IRS 数据集,查找您的销售区域中收入高于 75,000 美元 / 年的预定义收入的家庭数量。
可以 从 GitHub 下载代码和数据 。对于本部分,实验数据包含在
包中。数据包含在 data 目录中。IRS 提供了该数据集的技术解释。解释文件的名称为 14zpdoc.doc,该文件包含在存储库的 data 目录中。
- net.jgp.labs.informix2spark.l4xx
基本上讲,每个区域都是通过其邮政编码来定义的。每个区域根据营业额等级划分为 6 个调整后总收入 (AGI) 组:
本教程的目标是 3 个收益较高的组。
我们提供了所有代码,让您有一个全局视图。
- package net.jgp.labs.informix2spark.l400;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- public class HouseholdsAboveMedianRevenuePerZipApp {
- public static void main(String[] args) {
- HouseholdsAboveMedianRevenuePerZipApp app = new HouseholdsAboveMedianRevenuePerZipApp();
传递要分析的区域的邮政编码来运行该应用程序。
- app.start(27514);
- }
- private void start(int zip) {
在本地模式下创建一个 Spark 会话。
- SparkSession spark = SparkSession
- .builder()
- .appName("Number of households in your ZIP Code")
- .master("local")
- .getOrCreate();
将 CSV 文件加载到 Spark 中很简单:使用 format 方法和 csv 参数。CSV 加载器接受许多选项(众所周知,CSV 很难处理)。IRS 文件遵循一种非常简单的方案,其中使用了两个选项:
如果没有让 Spark 推断模式,则会将此数据帧中的所有列都视为字符串。CSV 解析还有其他许多选项,每个选项都已在 我的博客 中进行了解释。
最后请注意,文件名使用了一个通配符。Spark 将所有与 14zpallagi*.csv 匹配的文件都加载到 data 目录中。可以在这里使用正则表达式:
读取:
- 14zpallagi-part[1-3].csv
、
- 14zpallagi-part1.csv
和
- 14zpallagi-part2.csv
。
- 14zpallagi-part3.csv
- String filename = "data/14zpallagi*.csv";
- Dataset<Row> df = spark
- .read()
- .format("csv")
- .option("inferSchema", "true")
- .option("header", "true")
- .load(filename);
现在可以检查已加载的数据:
- df.printSchema();
首先查看长模式。该模式之所以很长是因为 IRS 慷慨地共享了 127 个列(我删除了其中的许多列)。
- root
- |-- STATEFIPS: integer (nullable = true)
- |-- STATE: string (nullable = true)
- |-- zipcode: integer (nullable = true)
- |-- agi_stub: integer (nullable = true)
- |-- N1: double (nullable = true)
- |-- mars1: double (nullable = true)
- |-- MARS2: double (nullable = true)
- |-- MARS4: double (nullable = true)
- |-- PREP: double (nullable = true)
- |-- N2: double (nullable = true)
- |-- NUMDEP: double (nullable = true)
- |-- TOTAL_VITA: double (nullable = true)
- |-- VITA: double (nullable = true)
- |-- TCE: double (nullable = true)
- |-- A00100: double (nullable = true)
- |-- N02650: double (nullable = true)
- …
查看一个数据样本。sample() 方法最多接受 3 个参数: 替换性(或记录独立性) 、一个小数和一个种子(可选)。
- df.sample(true, 0.01, 4589).show(2);
- System.out.println("Dataframe has " + df.count() + " rows and " + df.columns().length
- + " columns.");
我将输出限制为两行!
- +---------+-----+-------+--------+-------+------+-------+-----+-------+--------+-------+----------+----+---+-----------+-------+-----------+-------+-----------+-------+--------+-------+--------+-------+--------+-------+-------+-------+--------+-------+---------+------+--------+-------+--------+------+------+------+-------+--------+-------+---------+-------+--------+------+------+------+-------+------+-------+------+-------+------+------+------+------+------+-------+-------+---------+-----------+-------+--------+------+------+-------+--------+-------+--------+-------+--------+-------+--------+-------+-----------+-------+---------+-------+-------+------+------+-------+-------+-------+-------+------+------+------+------+------+------+------+------+------+------+-------+--------+------+------+------+------+------+------+-------+---------+------+------+------+------+------+------+------+------+------+------+-------+---------+-------+---------+-------+-------+-------+-------+-------+--------+-------+--------+
- |STATEFIPS|STATE|zipcode|agi_stub| N1| mars1| MARS2|MARS4| PREP| N2| NUMDEP|TOTAL_VITA|VITA|TCE| A00100| N02650| A02650| N00200| A00200| N00300| A00300| N00600| A00600| N00650| A00650| N00700| A00700| N00900| A00900| N01000| A01000|N01400| A01400| N01700| A01700| SCHF|N02300|A02300| N02500| A02500| N26270| A26270| N02900| A02900|N03220|A03220|N03300| A03300|N03270| A03270|N03150| A03150|N03210|A03210|N03230|A03230|N03240| A03240| N04470| A04470| A00101| N18425| A18425|N18450|A18450| N18500| A18500| N18300| A18300| N19300| A19300| N19700| A19700| N04800| A04800| N05800| A05800| N09600| A09600|N05780|A05780| N07100| A07100| N07300| A07300|N07180|A07180|N07230|A07230|N07240|A07240|N07220|A07220|N07260|A07260| N09400| A09400|N85770|A85770|N85775|A85775|N09750|A09750| N10600| A10600|N59660|A59660|N59720|A59720|N11070|A11070|N10960|A10960|N11560|A11560| N06500| A06500| N10300| A10300| N85530| A85530| N85300| A85300| N11901| A11901| N11902| A11902|
- +---------+-----+-------+--------+-------+------+-------+-----+-------+--------+-------+----------+----+---+-----------+-------+-----------+-------+-----------+-------+--------+-------+--------+-------+--------+-------+-------+-------+--------+-------+---------+------+--------+-------+--------+------+------+------+-------+--------+-------+---------+-------+--------+------+------+------+-------+------+-------+------+-------+------+------+------+------+------+-------+-------+---------+-----------+-------+--------+------+------+-------+--------+-------+--------+-------+--------+-------+--------+-------+-----------+-------+---------+-------+-------+------+------+-------+-------+-------+-------+------+------+------+------+------+------+------+------+------+------+-------+--------+------+------+------+------+------+------+-------+---------+------+------+------+------+------+------+------+------+------+------+-------+---------+-------+---------+-------+-------+-------+-------+-------+--------+-------+--------+
- | 1| AL| 0| 6|51270.0|4160.0|45860.0|910.0|39250.0|148020.0|50820.0| 80.0|80.0|0.0|2.2652783E7|51270.0|2.3073614E7|44660.0|1.1051815E7|40610.0|264670.0|32640.0|791136.0|31530.0|620013.0|29090.0|96411.0|13720.0|855431.0|31500.0|2331256.0|9340.0|444369.0|14820.0|833058.0|2630.0| 520.0|1842.0|11220.0|292923.0|22620.0|4633716.0|22380.0|420831.0|1080.0| 267.0|2770.0|86011.0|8520.0|92498.0|1140.0|10827.0| 0.0| 0.0| 0.0| 0.0|1660.0|61679.0|47840.0|2499208.0|2.1437993E7|45930.0|761404.0|1440.0|4867.0|45630.0|168515.0|47820.0|962132.0|36310.0|456194.0|45490.0|920293.0|51240.0|1.9692443E7|51230.0|5274998.0|15590.0|77387.0| 0.0| 0.0|21020.0|54852.0|15290.0|25542.0|3230.0|1747.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|1410.0| 459.0|14770.0|124954.0| 0.0| 0.0| 0.0| 0.0| 100.0| 219.0|50690.0|5185930.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|51170.0|5204318.0|51230.0|5472274.0|19090.0|33930.0|25450.0|89886.0|28340.0|774418.0|15560.0|243494.0|
- | 1| AL| 35004| 5| 590.0| 40.0| 530.0| 0.0| 300.0| 1660.0| 550.0| 0.0| 0.0|0.0| 74554.0| 590.0| 75493.0| 560.0| 64835.0| 260.0| 150.0| 140.0| 236.0| 130.0| 149.0| 350.0| 310.0| 100.0| 1671.0| 100.0| 364.0| 60.0| 1459.0| 150.0| 3991.0| 0.0| 0.0| 0.0| 90.0| 1796.0| 40.0| 1408.0| 240.0| 939.0| 30.0| 8.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 130.0| 132.0| 0.0| 0.0| 0.0| 0.0| 450.0| 9296.0| 57663.0| 430.0| 2268.0| 0.0| 0.0| 420.0| 353.0| 450.0| 2766.0| 400.0| 2900.0| 420.0| 2321.0| 590.0| 56931.0| 590.0| 9612.0| 0.0| 0.0| 0.0| 0.0| 310.0| 448.0| 40.0| 3.0| 120.0| 66.0| 70.0| 99.0| 0.0| 0.0| 190.0| 249.0| 0.0| 0.0| 60.0| 228.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 590.0| 10176.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 60.0| 59.0| 0.0| 0.0| 580.0| 9179.0| 580.0| 9419.0| 0.0| 0.0| 0.0| 0.0| 200.0| 625.0| 380.0| 1553.0|
- +---------+-----+-------+--------+-------+------+-------+-----+-------+--------+-------+----------+----+---+-----------+-------+-----------+-------+-----------+-------+--------+-------+--------+-------+--------+-------+-------+-------+--------+-------+---------+------+--------+-------+--------+------+------+------+-------+--------+-------+---------+-------+--------+------+------+------+-------+------+-------+------+-------+------+------+------+------+------+-------+-------+---------+-----------+-------+--------+------+------+-------+--------+-------+--------+-------+--------+-------+--------+-------+-----------+-------+---------+-------+-------+------+------+-------+-------+-------+-------+------+------+------+------+------+------+------+------+------+------+-------+--------+------+------+------+------+------+------+-------+---------+------+------+------+------+------+------+------+------+------+------+-------+---------+-------+---------+-------+-------+-------+-------+-------+--------+-------+--------+
- only showing top 2 rows
- The dataframe has 166719 rows and 127 columns.
您不需要清理您的数据集,但我发现这样做更便于阅读。要清理您的数据集,可以按邮政编码进行过滤并丢弃额外的列。
- Dataset<Row> df2 = df.filter(df.col("zipcode").equalTo(zip));
- String[] colsToDrop = { "STATEFIPS", "mars1", "MARS2", "MARS4", "PREP", "N2",
- "NUMDEP", "TOTAL_VITA", "VITA", "TCE", "A00100", "N02650", "N00200", "A00200",
- "N00300", "A00300", "N00600", "A00600", "N00650", "A00650", "N00700", "A00700",
- "N00900", "A00900", "N01000", "A01000", "N01400", "A01400", "N01700", "A01700",
- "SCHF", "N02300", "A02300", "N02500", "A02500", "N26270", "A26270", "N02900",
- "A02900", "N03220", "A03220", "N03300", "A03300", "N03270", "A03270", "N03150",
- "A03150", "N03210", "A03210", "N03230", "A03230", "N03240", "A03240", "N04470",
- "A04470", "A00101", "N18425", "A18425", "N18450", "A18450", "N18500", "A18500",
- "N18300", "A18300", "N19300", "A19300", "N19700", "A19700", "N04800", "A04800",
- "N05800", "A05800", "N09600", "A09600", "N05780", "A05780", "N07100", "A07100",
- "N07300", "A07300", "N07180", "A07180", "N07230", "A07230", "N07240", "A07240",
- "N07220", "A07220", "N07260", "A07260", "N09400", "A09400", "N85770", "A85770",
- "N85775", "A85775", "N09750", "A09750", "N10600", "A10600", "N59660", "A59660",
- "N59720", "A59720", "N11070", "A11070", "N10960", "A10960", "N11560", "A11560",
- "N06500", "A06500", "N10300", "A10300", "N85530", "A85530", "N85300", "A85300",
- "N11901", "A11901", "N11902", "A11902" };
- for (String colName : colsToDrop) {
- df2 = df2.drop(colName);
- }
- df2.printSchema();
- df2.show();
- System.out.println("Dataframe has " + df2.count() + " rows and " + df2
- .columns().length + " columns.");
现在来查看一下结果。您获得了更容易理解的结果。
- root
- |-- STATE: string (nullable = true)
- |-- zipcode: integer (nullable = true)
- |-- agi_stub: integer (nullable = true)
- |-- N1: double (nullable = true)
- |-- A02650: double (nullable = true)
- +-----+-------+--------+------+--------+
- |STATE|zipcode|agi_stub| N1| A02650|
- +-----+-------+--------+------+--------+
- | NC| 27514| 1|3590.0| 42542.0|
- | NC| 27514| 2|2030.0| 74332.0|
- | NC| 27514| 3|1040.0| 65651.0|
- | NC| 27514| 4| 800.0| 71410.0|
- | NC| 27514| 5|1690.0|249042.0|
- | NC| 27514| 6|1650.0|843353.0|
- +-----+-------+--------+------+--------+
- Dataframe has 6 rows and 5 columns.
对于这个较小的数据集,可以看到您关心的是 agi_stub 大于 3 的记录。您想要对它们进行统计,按邮政编码进行分组,然后对列 N1 中返回的数量进行求和。Spark 代码为:
- Dataset<Row> df3 = df2.filter(df2.col("agi_stub").$greater(3));
- df3 = df3.groupBy("zipcode").sum("N1").withColumnRenamed("sum(N1)", "households");
- df3.show();
- }
- }
您会获得:
- +-------+----------+
- |zipcode|households|
- +-------+----------+
- | 27514| 4140.0|
- +-------+----------+
这些结果是何含义?在这个邮政区中,4,140 笔税收是由收入高于 75,000 美元的人上缴的。税收笔数与家庭的比例不是 1:1,但它能让您看清销售潜力。
收入数据集的大小接近 200MB,而您将仅使用它的一小部分。 经过整理的人口统计数据只有 400KB。将此数据集加载到您主要用于销售和仓库交易的 Informix® 数据库中是否有意义?或许没有意义。不要误会,Informix 能完美地处理这些数据集,但这不是它的职责。
在您成长为数据科学家的路上,会逐渐添加越来越多的数据集,并对它们进行试验(而且可能使用 IBM Data Science Experience 等专用工具)。但是,您可能不希望每次试验都将所有这些数据集加载到生产数据库中。
向销售人员展示了这些第一批结果后,你们一起发现了一个预测潜在收入的不错指标。
基本思路是:
备注:在本文的剩余部分中,index 指的是 "指数"。不要将该指数与数据库索引混淆。
对于每个区域或邮政区,应用:
这很不错,但如何在 Spark 中实现此操作?
可以在
包中找到一个示例,它是一个名为 SalesTargetApp 的应用程序。
- net.jgp.labs.informix2spark.l420
您可以浏览一下该代码,先执行导入,然后执行初始化。
- package net.jgp.labs.informix2spark.l420;
- import static org.apache.spark.sql.functions.col;
- import static org.apache.spark.sql.functions.lit;
- import java.math.BigDecimal;
- import java.sql.Connection;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SparkSession;
- import org.apache.spark.sql.jdbc.JdbcDialect;
- import org.apache.spark.sql.jdbc.JdbcDialects;
- import net.jgp.labs.informix2spark.utils.Config;
- import net.jgp.labs.informix2spark.utils.ConfigManager;
- import net.jgp.labs.informix2spark.utils.InformixJdbcDialect;
- import net.jgp.labs.informix2spark.utils.K;
- import scala.collection.Seq;
- public class SalesTargetApp {
- SparkSession spark;
- public SalesTargetApp() {
- init();
- }
- private void init() {
- this.spark = SparkSession
- .builder()
- .appName("Sales Target")
- .master("local")
- .getOrCreate();
- }
- public static void main(String[] args) {
- SalesTargetApp app = new SalesTargetApp();
- app.start();
- }
与前面的所有示例一样,所有示例都以 start() 方法开头。
- private void start() {
- Dataset<Row> householdDf = getHouseholdDataframe();
- Dataset<Row> populationDf = getPopulationDataframe();
- Dataset<Row> indexDf = joinHouseholdPopulation(householdDf, populationDf);
- Dataset<Row> salesDf = getSalesData();
您通过调用方法构建了 4 个数据帧。让我们详细分析一下它们。
此方法非常类似于本教程中对 IRS 数据执行的第一个试验。首先读取 CSV 文件。
- private Dataset<Row> getHouseholdDataframe() {
- String filename = "data/14zpallagi*.csv";
- Dataset<Row> df = spark.read().format("csv")
- .option("inferSchema", "true")
- .option("header", "true").load(filename);
像在 SQL 中一样,选择您感兴趣的列。这是丢弃您不想要的所有列的替代方法。
- df = df.select(
- df.col("zipcode"),
- df.col("agi_stub"),
- df.col("N1"),
- df.col("A02650"),
在最后一列中,您想要获得每组的所有税收的总收入。请记住,IRS 将该数据拆分为 6 组。
- df.col("N1").multiply(df.col("A02650")));
此操作在数据帧的末尾处创建了一个名为 (N1 * A02650) 的列。这不是一个描述性的列名称,所以您决定将它重命名为 income。 df.columns() 返回列名称的列表,
提供了最后一列的名称。
- df.columns()[df.columns().length - 1]
- df = df.withColumnRenamed(df.columns()[df.columns().length - 1], "income");
因为您对分析每个邮政区的 AGI 不感兴趣,所以需要按邮政编码对不同的 AGI 类别进行分组。在执行此操作的过程中,还可以将所有收益添加到一个重命名为 total_income 的列中。
- df = df.groupBy("zipcode").sum("income"); df = df.withColumnRenamed(df.columns()[df.columns().length - 1], "total_income");
- return df;
- }
如果此刻显示了该数据帧,您会看到:
- +-------+------------+
- |zipcode|total_income|
- +-------+------------+
- | 35071| 4.7763307E8|
- | 36525| 6306850.0|
- | 36538| 201690.0|
- | 85253|9.45181039E9|
- | 85321| 1.290294E7|
- +-------+------------+
- only showing top 5 rows
有了您目前获得的所有经验,加载人口数据帧就会很简单。
- private Dataset<Row> getPopulationDataframe() {
- String filename = "data/2010+Census+Population+By+Zipcode+(ZCTA).csv";
- Dataset<Row> df = spark.read().format("csv")
- .option("inferSchema", "true")
- .option("header", "true")
- .load(filename);
还有另一种重命名列的方法,但采用该方法需要提供列名称(而且您需要确实拥有列名称)。
- df = df.withColumnRenamed("Zip Code ZCTA", "zipcode");
- df = df.withColumnRenamed("2010 Census Population", "pop");
- return df;
- }
您的数据帧类似于:
- +-------+-----+
- |zipcode| pop|
- +-------+-----+
- | 1001|16769|
- | 1002|29049|
- | 1003|10372|
- | 1005| 5079|
- | 1007|14649|
- +-------+-----+
- only showing top 5 rows
像 第 3 部分 中学到的一样,采用一种外联接方式来联接该邮政区的两个数据集 - 这意味着可能会有一些 null 值。
- private Dataset<Row> joinHouseholdPopulation(
- Dataset<Row> householdDf,
- Dataset<Row> populationDf) {
- Dataset<Row> df = householdDf
- .join(
- populationDf,
- householdDf.col("zipcode").equalTo(populationDf.col("zipcode")),
- "outer")
- .drop(populationDf.col("zipcode"))
现在,创建一个名为 income_per_inh 的新列。此列包含按人口对总收益进行划分的结果。 它提供了每位居民的预计收益。
- .withColumn(
- "income_per_inh",
- householdDf.col("total_income").divide(populationDf.col("pop")));
- return df;
- }
withColumn() 支持使用一个或多个列和函数,在您的数据帧中创建一个新列。 随着您执行的转换越来越多,withColumn() 可能会成为您最喜欢的方法。使用该方法比尝试查找最后一个要重命名的列更容易。
以下是这个新数据帧现在的外观:
- +-------+------------+-----+------------------+
- |zipcode|total_income| pop| income_per_inh|
- +-------+------------+-----+------------------+
- | 1088| 1144910.0| 670|1708.8208955223881|
- | 1238| 7.228838E7| 6047|11954.420373739043|
- | 1342| 4992920.0| 1492| 3346.461126005362|
- | 2122|1.09356174E9|23479| 46576.16338004174|
- | 2142| 1.0935586E8| 3141| 34815.61922954473|
- +-------+------------+-----+------------------+
- only showing top 5 rows
对于销售数据,重用您为本教程系列的 第 3 部分 编写的代码。您需要将它放在一个方法中。
- private Dataset<Row> getSalesData() {
- …
- return salesDf;
- }
请记住,所有代码都已在 GitHub 上提供。
您的销售数据帧现在包含邮政编码和您获得的收入数据。
- +-------+-------+
- |zipcode|revenue|
- +-------+-------+
- | 94062|1390.00|
- | 94040| 562.00|
- | 94022| 448.00|
- | 19898|1131.00|
- | 74006|1614.00|
- +-------+-------+
- only showing top 5 rows
您现在已拥有所有数据集,并且已准备好构建您与销售团队一起定义的指数。
- Dataset<Row> salesIndexDf = salesDf
- .join(indexDf, salesDf.col("zipcode").equalTo(indexDf.col("zipcode")), "left")
- .drop(indexDf.col("zipcode"));
查看人均收入。
- salesIndexDf = salesIndexDf.withColumn("revenue_by_inh", salesIndexDf.col("revenue")
- .divide(salesIndexDf.col("pop")));
现在识别最佳销量区域。
- salesIndexDf = salesIndexDf.orderBy(col("revenue_by_inh").desc());
提取 "最佳行"。最佳行包含要供销售团队选择最佳区域时参考的所有值。
- Row bestRow = salesIndexDf.first();
- double bestRevenuePerInhabitant = ((BigDecimal) bestRow.getAs("revenue_by_inh"))
- .doubleValue();
- int populationOfBestRevenuePerInhabitant = bestRow.getAs("pop");
- double incomeOfBestRevenuePerInhabitant = bestRow.getAs("income_per_inh");
接下来,在您的数据帧中创建一列。您可以使用 withColumn() 方法。但是如果您想添加一个包含特定值的列,该怎么做?为此,可以接受一个数字列,按该数字列的值(您拥有值 1)对其进行拆分,然后将它与该值相乘。
- salesIndexDf = salesIndexDf.withColumn(
- "best_revenue_per_inh",
- salesIndexDf.col("pop").divide(salesIndexDf.col("pop"))
- .multiply(bestRevenuePerInhabitant));
或者可以使用 lit() 静态函数:
- salesIndexDf = salesIndexDf.withColumn(
- "pop_of_best",
- lit(populationOfBestRevenuePerInhabitant));
- salesIndexDf = salesIndexDf.withColumn(
- "income_of_best",
- lit(incomeOfBestRevenuePerInhabitant));
现在您已准备好创建这 3 个指数。
- salesIndexDf = salesIndexDf.withColumn(
- "idx_revenue",
- salesIndexDf.col("best_revenue_per_inh")
- .divide(salesIndexDf.col("revenue_by_inh")));
- salesIndexDf = salesIndexDf.withColumn(
- "idx_pop",
- salesIndexDf.col("pop").divide(salesIndexDf.col("pop_of_best")));
- salesIndexDf = salesIndexDf.withColumn(
- "idx_income",
- salesIndexDf.col("income_per_inh").divide(salesIndexDf.col("income_of_best")));
现在是时候为每个区域创建最后一个指数了,即每个指数的乘积。
- salesIndexDf = salesIndexDf.withColumn(
- "index",
- salesIndexDf.col("idx_revenue").multiply(salesIndexDf.col("idx_pop")
- .multiply(salesIndexDf.col("idx_income"))));
将该指数应用于现有的收入。
- salesIndexDf = salesIndexDf.withColumn(
- "potential_revenue",
- salesIndexDf.col("revenue").multiply(salesIndexDf.col("index")));
您可以丢弃一些列来增强输出。您可以丢弃更多列,但也需要让您的报告看起来足够科学。 最后,确保按潜在收入的降序进行了排序。
- salesIndexDf = salesIndexDf
- .drop("idx_income")
- .drop("idx_pop")
- .drop("idx_revenue")
- .drop("income_of_best")
- .drop("pop_of_best")
- .drop("best_revenue_per_inh")
- .orderBy(salesIndexDf.col("potential_revenue").desc());
您可以查看结果数据。
- salesIndexDf.show();
- }
- +-------+-------+-----+------------------+------------------+------------------+
- |zipcode|revenue| pop| income_per_inh| index| potential_revenue|
- +-------+-------+-----+------------------+------------------+------------------+
- | 94025| 84.00|40526| 840368.1256477323|1610.5247342457083|135284.07767663948|
- | 08540|1499.97|47115|469565.43117903004| 68.11481294046366|102170.17596630729|
- | 94086|1200.00|45697| 244836.9227739239| 41.76194133319635| 50114.32959983562|
- | 94062|1390.00|25876| 738260.2450146854| 34.85768977158333| 48452.18878250083|
- | 80219| 232.00|61296|104358.72308144088| 165.6588621914614|38432.856028419046|
- | 94022| 448.00|18500|1081220.6994594594| 80.96352858991159|36271.660808280394|
- | 94040| 562.00|32996|257082.76791126197| 48.8167513518355| 27435.01425973155|
- | 32256| 438.00|38483|142462.90881688017| 47.21447575076384|20679.940378834563|
- | 85016| 584.00|33896| 94057.98914326174| 18.13799917867292|10592.591520344986|
- | 94063|5592.00|30949| 59561.31635917154| 1.0| 5592.0|
- | 94085| 450.00|21247| 95544.70842942533| 9.395047814603645| 4227.77151657164|
- | 74006|1614.00|25750| 63162.90718446602|2.5434469314609442| 4105.123347377964|
- | 08002| 654.00|22274|59319.770584538026| 4.410905752434176| 2884.732362091951|
- | 60406| 824.00|25460| 36702.44422623723|2.8300506455361445| 2331.961731921783|
- | 94026|1451.80| null| null| null| null|
- | 19898|1131.00| null| null| null| null|
- +-------+-------+-----+------------------+------------------+------------------+
您以邮政编码为 94063 的区域的销售额作为参照,您在这里的销售额为 5,592 美元。 此区域的指数为 1。
基于您构建的指数,可以看到最有潜力的是邮政编码为 94025 的区域,这是加利福尼亚州的门洛帕克(有趣的是,这是 Informix 的诞生地)。数据表明,这里每个家庭的收入都处于国内最高水平。因此,可以合理的认为邮政编码为 94025 的区域具有更高的销售潜力。
潜力最低的区域的邮政编码为 60406,也就是伊利诺斯州的蓝岛 ,这是芝加哥南部的一个人口稀少的地区。
您可以看到具有最高潜力的 5 个邮政区:加利福尼亚州的门洛帕克、新泽西州的普林斯顿、加利福尼亚州的森尼维耳、加利福尼亚州的雷德伍德城,以及科罗拉多州的丹佛。您的销售经理可能会得出这样的结论:可以合理地加强这些区域的销售工作。
在本教程系列的第 4 部分中,您学习了:
来源: http://www.ibm.com/developerworks/cn/opensource/ba-offloading-informix-data-spark-4/index.html