本文描述了一个系统,功能是评价和抽象地理围栏 (Geo-fencing),以及监控和分析核心地理围栏中业务的表现。
技术栈:Spring-JQuery - 百度地图 web SDK
存储:Hive-Elasticsearch-MySQL-Redis 什么是地理围栏?
LBS 系统中,地理围栏指的是虚拟边界围成的部分。
tips:这只是一个 demo,支撑实习生的本科毕设,不代表生产环境,而且数据已经做了脱密处理,为了安全还是隐去了所有数据。功能描述 1、地理围栏的圈选
(1) 热力图
热力图展示的是,北京市最近一天的业务密度(这里是 T+1 数据,在实际工作场景中往往是通过实时流采集分析实时的数据)
(2) 圈选地理围栏
系统提供了圆形 (距中心点距离)、矩形、多边形三种类型的图形圈选,并通过百度地图 SDK 采集图形的信息。2、地理围栏的持久化
(1) 提供地理围栏的持久化功能
(2) 地理围栏列表
下面是持久化的地理围栏列表,可以看到类型和围栏信息。
当圈选完成,可以选择持久化地理围栏,这个围栏将会沉淀下来,供后续业务分析和监控。3、聚合分析
(1) 提供日订单量,日盈利和日取消率的聚合分析
例如下图是在某个地理围栏区域内,11 月这 30 天内,订单量的变化。
(2) 详细列表
提供每一天数据的详细信息,对异常点可以标红和预警
上面基本就是系统的全部核心功能。下面进入实现部分。
实现 - 数据准备1、数据源
数据源应该是业务的数据库 (例如订单库) 以及客户端埋点日志(端动作),公司的离线采集和 ETL 团队经过了漫长的工作,将数据处理好存入了 Hive 中。
对于本文系统来说,数据源就是 Hive 中的 order 表。要做的是将 Hive 中的数据导入到 Elasticsearch 中,使用 Elasticsearch 强大的 GEO Query 支持进行分析。2、数据导入
数据的导入使用的是一段 Java 的 Spark 脚本。
(1)先解决依赖
spark-core 是必备依赖。引入 spark-hive 来处理 Hive 中的数据。引入 elasticsearch-hadoop 来搞定 Hive 到 ES 的写入。
(2)编写 spark 脚本
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
- <version>1.6.0</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.10</artifactId>
- <version>1.6.0</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch-hadoop</artifactId>
- <version>2.3.4</version>
- </dependency>
先上代码
- public class ToES implements Serializable {
- transient private JavaSparkContext javaSparkContext;
- transient private HiveContext hiveContext;
- private String num;
- /*
- * 初始化Load
- * 创建sparkContext, hiveContext
- * */
- public ToES(String num) {
- this.num = num;
- initSparckContext();
- initHiveContext();
- }
- /*
- * 创建sparkContext
- * */
- private void initSparckContext() {
- SparkConf sparkConf;
- String warehouseLocation = System.getProperty("user.dir");
- sparkConf = new SparkConf().setAppName("to-es").set("spark.sql.warehouse.dir", warehouseLocation).setMaster("yarn-client").set("es.nodes", "10.93.21.21,10.93.18.34,10.93.18.35,100.90.62.33,100.90.61.14").set("es.port", "8049").set("pushdown", "true").set("es.index.auto.create", "true");
- javaSparkContext = new JavaSparkContext(sparkConf);
- }
- /*
- * 创建hiveContext
- * 用于读取Hive中的数据
- * */
- private void initHiveContext() {
- hiveContext = new HiveContext(javaSparkContext);
- }
- /*
- * 使用spark-sql从hive中读取数据, 然后写入es.
- * */
- public void hive2es() {
- String query = String.format("select * from kangaroo.order where concat_ws('-', year, month, day) = '%s' and product_id in (3,4) and area = 1", transTimeToFormat(System.currentTimeMillis() - Integer.parseInt(num) * 24 * 60 * 60 * 1000L, "yyyy-MM-dd"));
- DataFrame rows = hiveContext.sql(query).select("order_id", "starting_lng", "starting_lat", "order_status", "tip", "bouns", "pre_total_fee", "dynamic_price", "product_id", "starting_name", "dest_name", "type");
- JavaRDD < Map < String,
- Object >> rdd = rows.toJavaRDD().map(new Function < Row, Map < String, Object >> () {
- /*
- * 转换成Map, 解决字段类型不匹配问题
- * */
- @Override public Map < String,
- Object > call(Row row) throws Exception {
- Map < String,
- Object > map = new HashMap < String,
- Object > ();
- Map < String,
- Object > location = new HashMap < String,
- Object > ();
- for (int i = 0; i < row.size(); i++) {
- String key = row.schema().fields()[i].name();
- Object value = row.get(i);
- map.put(key, value);
- }
- location.put("lat", Double.parseDouble(map.get("starting_lat").toString()));
- location.put("lon", Double.parseDouble(map.get("starting_lng").toString()));
- map.remove("starting_lat");
- map.remove("starting_lng");
- map.put("location", location);
- map.put("date", transTimeToFormat(System.currentTimeMillis() - Integer.parseInt(num) * 24 * 60 * 60 * 1000L, "yyyy-MM-dd"));
- return map;
- }
- });
- Map < String,
- String > map = new HashMap < String,
- String > ();
- map.put("es.mapping.id", "order_id");
- JavaEsSpark.saveToEs(rdd, "moon/bj", map);
- }
- public String transTimeToFormat(long currentTime, String formatStr) {
- String formatTime = null;
- try {
- SimpleDateFormat format = new SimpleDateFormat(formatStr);
- formatTime = format.format(currentTime);
- } catch(Exception e) {}
- return formatTime;
- }
- public static void main(String[] args) {
- String num = args[0];
- ToES toES = new ToES(num);
- toES.hive2es();
- }
- }
SparkContext 和 HiveContext 的初始化,请自行参考代码。
ES 的集群配置是在 sparkConf 中加载进去的,加载方式请自己参照代码。1)数据过滤
hive-sql
- select * from kangaroo.order where concat_ws('-', year, month, day) = '%s'and product_id in (3, 4) and area = 1
说明:
a)Hive 的 order 表实现为一个外部表,year/month/day 是分区字段,也就是说数据是按照天为粒度挂载的。
b)product_id 是业务编号,这里过滤出了目标业务的订单。
c)area 为城市编号,这里只过滤出北京。2)列的裁剪
Elasticsearch 有个弊端是由于索引的建立,当数据导入 Elasticsearch 数据量会膨胀,所以一定要进行维度的裁剪。
我们的订单 Hive 表姑且就叫它 order 吧,这个表有 40 + 个字段,我们导入到 ES 中,只选用了其中的 12 个字段。
在代码中是,通过 DataFrame 的 select 实现的裁剪
- DataFrame rows = hiveContext.sql(query)
- .select("order_id", "starting_lng", "starting_lat", "order_status", "tip", "bouns",
- "pre_total_fee", "dynamic_price", "product_id", "starting_name", "dest_name", "type");
可能会有这样的好奇,这样做在 hive-sql 中把所有字段全拿到然后在裁剪?为什么不直接在 sql 语句中进行裁剪?简单解释一下,由于 spark 的惰性求值,应该是没有区别的。3)map 转换操作
下面将 dataFrame 转换成 rdd,执行 map 操作,将每一条记录进行处理,处理的核心逻辑,是将 starting_lng、starting_lat 压成一个 HashMap 的 location 字段。
为什么要这样做呢?
因为在 Elasticsearch 中要这样存储点的经纬度,并且将 location 字段声明为 geo_point 类型,才能使用空间索引查询。
然后我们顺便生成了一个 date 字段,表示订单是哪一天的,方便后面的以天为粒度进行聚合查询。4)批量存入 ES
- Map<String, String> map = new HashMap<String, String>();
- map.put("es.mapping.id", "order_id");
- JavaEsSpark.saveToEs(rdd, "moon/bj", map);
这样就将 rdd 中的数据批量存入到 ES 中了,存入的索引是 index=moon,type=bj,这里映射了 order_id 为 ES 文档的 document_id。我们下面马上就会说如何建立 moon/bj 的 mapping5)ES 索引建立
再将数据导入到 ES 之前,要建立 index 和 mapping。
创建 index=moon
- curl - XPOST "http://10.93.21.21:8049/moon?pretty"
创建 type=bj 的 mapping
- curl - XPOST "http://10.93.21.21:8049/moon/bj/_mapping?pretty" - d '
- {
- "bj": {
- "properties": {
- "order_id": {"type": "long"},
- "order_status": {"type": "long"},
- "tip": {"type": "long"},
- "bouns": {"type": "long"},
- "pre_total_fee": {"type": "long"},
- "dynamic_price": {"type": "long"},
- "product_id": {"type": "long"},
- "type": {"type": "long"},
- "dest_name": {"index": "not_analyzed","type": "string"},
- "starting_name": {"index": "not_analyzed","type": "string"},
- "departure_time": {"index": "not_analyzed","type": "string"},
- "location": {"type" : "geo_point"},
- "date": {"index": "not_analyzed", "type" : "string"}
- }
- }
- }'
这里要注意的是,location 字段的类型 - geo_point。6)打包编译 spark 程序
以 yarn 队列形式运行
- spark - submit--queue = root. * **to - es - 1.0 - SNAPSHOT - jar - with - dependencies.jar
然后在 ES 的 head 中可以看到数据已经加载进去了
至此,数据已经准备好了。
今天先到这,后面的博客会描述如何搞定百度地图前端和 Elasticsearch GEO 查询。
来源: http://www.cnblogs.com/kangoroo/p/8047586.html