大数据时代, 一大技术特征是对海量数据采集, 存储和分析的多组件解决方案. 而其中对来自于传感器, App 的 SDK 和各类互联网应用的原生日志数据的采集存储则是基本中的基本. 本系列文章将从 0 到 1, 概述一下搭建基于 Kafka,Flume,Zookeeper,HDFS,Hive 的海量数据分析系统的框架, 核心应用和关键模块.
项目源代码存储于 GitHub: 源码
系统架构概述
本系列文章所介绍的数据分析系统, 定位于一种通用的大数据分析系统, 可用于电商, 互联网和物联网的实际解决方案中. 该应用主要解决从多种多样的互联网应用, App, 传感器, 小程序等网络客户端中预设的接口采集数据, 并进行分布式存储, 通过 RESTful 或服务订阅的方式, 连接 BI 应用或者嵌入了机器学习模块的业务数据分析系统. 其项目架构如下:
项目主体实现了从各种互联网客户端的日志数据到集中的 BI 分析系统的全过程, 主要包括以下构件:
1. 日志收集 web 应用: 基于 REST 风格的接口, 处理从网络客户端回传的数据文件, 其中包括了对数据对象的定义, 核心 Web 应用和模拟客户端测试程序.
2. Kafka 集群: Kafka 是一种高吞吐量的分布式发布订阅消息系统, 它可以处理消费者规模的网站中的所有动作流数据. Kafka 的目的是通过 Hadoop 的并行加载机制来统一线上和离线的消息处理, 也是为了通过集群来提供实时的消息.
2. Zookeeper 集群: 是一个为 Kafka 的分布式应用提供一致性服务的软件, 提供的功能包括: 配置维护, 域名服务, 分布式同步, 组服务等. Zookeeper 可以实现封装好复杂易出错的关键服务, 将简单易用的接口和性能高效, 功能稳定的系统提供给用户.
3. Flume: 用于存储数据到 HDFS.Flume 的意义在于: 当收集数据的速度超过将写入数据的时候, 也就是当收集信息遇到峰值时, 这时候收集的信息非常大, 甚至超过了系统的写入数据能力, 这时候, Flume 会在数据生产者和数据收容器间做出调整, 保证其能够在两者之间提供平稳的数据.
4. HDFS: 提供高吞吐量的分布式存储方案.
5. Hive:Hive 是建立在 Hadoop 上的数据仓库基础构架, 定义了简单的类 SQL 查询语言, 便于快速搭建基于 SQL 的数据应用.
6. Hive Server2: 一种可选服务, 允许远程客户端可以使用各种编程语言向 Hive 提交请求并检索结果.
7. Dubbo 和 RPC:Dubbo 是阿里开源的一个高性能优秀的服务框架, 使得应用可通过高性能的 RPC 实现服务的输出和输入功能, 轻松实现面向服务的应用开发.
数据收集应用
数据收集应用的目标是提供一个对外的接口, 基于实时或准实时的要求收集来自海量客户端应用所上传的数据文件, 因此可以根据需求进行集群化和添加负载均衡机制. 以常规的日志数据收集应用为例, 一个数据应用应该实现的主要功能包括: 数据属性拷贝, 数据对象封装, 时间校对, 地理数据提取和缓存, 发送数据至 Kafka, 以及一个可选的模拟客户端上传数据应用.
一, 应用结构
基于 Maven 的多模块应用布局方案, 具体包括:
--EasyBI-Parent: 父组件, 仅维护一个 pom 文件, 作为个子组件的 parent pom 文件, 定义了统一的项目版本, 依赖管理和 Maven 插件管理.
|--EasyBI-Common: 子组件, 定义了日志数据对象和通用的工具类方法.
|--EasyBI-Logs-Collect-Web: 核心组件, 基于 REST 风格收集日志数据, 封装数据对象并发送至 Kafka, 其中对一些数据进行初级加工.
|--EasyBI-Logs-MockApp: 模拟一个客户端上传数据的应用, 可选.
二, Common 组件
数据对象
数据对象以日志对象为载体, 里面封装了从客户端发送过来的不同日志的 POJO 对象, 其类图为:
AppBaseLog 为日志类型的统一父类, 定义了一些公共的数据属性, 被用于各个具体日志实现类继承.
Startup,Event,Page,Usage 和 Error 分别对应了应用启动, 事件, 页面, 功能和错误的日志记录, 继承了公共基类并维护了各自的特有属性.
APPLogEntity 是按客户端为单位的日志对象, 组合了各个不同的子日志对象, 作为整个数据分析系统的核心数据模型.
通用的工具类
主要包括两个部分, 分别是复制各子日志对象的属性至 LogEntity 对象的一个工具方法, 以及一个提取 IP 位置信息的工具方法.
拷贝日志属性的工具类, 核心代码如下:
- public class PropertiesUtil {
- /*
- * 通过内省进行属性复制 (对象到对象)
- */
- public static void copyProperties(Object src, Object dest) {
- try {
- // 源对象的 BeanInfo
- BeanInfo srcBeanInfo = Introspector.getBeanInfo(src.getClass());
- // 获取属性描述符
- PropertyDescriptor[] descriptors = srcBeanInfo.getPropertyDescriptors();
- for (PropertyDescriptor descriptor : descriptors) {
- // 获取 getter 和 setter 方法
- Method getter = descriptor.getReadMethod();
- Method setter = descriptor.getWriteMethod();
- // 获取 set 方法名称
- String setterName = setter.getName();
- // 获取 setter 方法参数
- Class<?>[] parameterTypes = setter.getParameterTypes();
- Object value = getter.invoke(src);
- try {
- Method destSetter = dest.getClass().getMethod(setterName, parameterTypes);
- destSetter.invoke(dest, value);
- } catch (Exception e) {
- continue;
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /*
- * 复制对象属性至一个数组的重载方法
- */
- public static void copyProperties(Object src, Object[] arr) {
- for (Object obj : arr) {
- copyProperties(src, obj);
- }
- }
- }
该工具类包括两个重载的方法, 基于内省, 分别实现深度复制一个对象的成员变量到另一个对象, 或者到另一个对象数组中. 具体到本例子, 包括:
获取 A 对象的 getter 方法和 setter 方法, 然后获取 setter 方法的名称和参数值.
通过反射, 调用 A 对象的 getter 方法, 获取成员变量值.
通过反射, 调用 B 对象的 setter 方法, 为其赋值, 完成复制.
提取 IP 地理信息的工具类, 通过使用 maxmind-db 库, 来实现对 Host 地址的地理信息提取, 用来填充至数据对象.
- public static final int COUNTRY = 1;
- public static final int PROVINCE = 2;
- public static final int CITY = 3;
- private static InputStream inputStream;
- private static Reader reader;
- static {
- try {
- inputStream = ClassLoader.getSystemResourceAsStream("GeoLite2-City.mmdb");
- reader = new Reader(inputStream);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
使用 maxmind-db 库需要用到 GeoLite2-City.mmdb 文件, 通过静态代码块来初始化资源文件的读取流, 并且定义用来获取国家, 省, 市的常量代码.
- public static String getLocation(String ip, int level) {
- try {
- JsonNode node = reader.get(InetAddress.getByName(ip));
- switch (level) {
- case 1:
- return node.get("country").get("names").get("zh-CN").textValue();
- case 2:
- return node.get("subdivisions").get(0).get("names").get("zh-CN").textValue();
- case 3:
- return node.get("city").get("names").get("zh-CN").textValue();
- default:
- return null;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
通过调用 com.maxmind.db.Reader 对象的 get 方法, 可以获取传入 IP 地址的地址节点对象, 如果对文档节点模型比较熟悉的话, 可以很快地获取到节点对象所对应的不同地址信息.
三, Logs_Collect_Web 应用组件
Logs_Collect_Web 是基于 SpringMVC 的 Web 应用, 目标是收集各客户端的日志数据, 组件结构为
DispatcherServlet 是 SpringMVC 的核心调度类, 关于 SpringMVC 的 Web 应用可参考: 基于 SSM 的 Java Web 应用开发原理初探
其 Controller 类需要实现如下的核心功能.
基本信息复制
利用上面介绍的工具类, 实现对从请求体中所提取的日志数据进行属性复制, 封装到 LogEntity 的数据对象中, 用于传输.
- private void copyBaseProperties(AppLogEntity e) {
- PropertiesUtil.copyProperties(e, e.getAppStartupLogs());
- 4 }
时间校准
因为日志文件的上传并不是瞬时的, 客户端提交时间与服务器收到时间存在时间差, 因此需要使用服务器时间, 与 Http 请求的时间差, 来对原始的日志文件时间进行校正.
- //server 时间
- long serverTime = System.currentTimeMillis();
- //client 时间
- long clientTime = Long.parseLong(request.getHeader("clientTime"));
- // 时间校对
- long duration = serverTime - clientTime;
- /*
- * 校正时间
- */
- private void verifyTime(AppLogEntity e, long duration) {
- for (AppBaseLog log : e.getAppStartupLogs()) {
- log.setCreatedAtMs(log.getCreatedAtMs() + duration);
- }
- }
提取地理信息并缓存
缓存地理位置信息的方法, 是通过维护一个 HashMap, 把 Host 的字符串作为键, 封装一个包括国家, 省, 市的位置对象作为值, 实现赋值位置信息到数据对象时:
如果缓存中包含该位置, 直接从 HashMap 中查找该值并返回, 实现高性能的查找.
如果缓存中没有, 再调用 GeoUtil 方法, 获取地址, 并添加到 HashMap 中.
- /*
- * 操作 IP 的方法 (缓存地理位置信息)
- */
- private void processIP(AppLogEntity e, String clientIP) {
- GeoInfo info = geoCache.get(clientIP);
- if (info == null) {
- info = new GeoInfo();
- info.setCountry(GeoUtil.getLocation(clientIP, GeoUtil.COUNTRY));
- info.setProvince(GeoUtil.getLocation(clientIP, GeoUtil.PROVINCE));
- geoCache.put(clientIP, info);
- }
- for (AppStartupLog log : e.getAppStartupLogs()) {
- log.setCountry(info.getCountry());
- log.setProvince(info.getProvince());
- log.setIpAddress(clientIP);
- }
- }
发送至 Kafka 的方法
Kafka 的核心方法是 Producer, 通过将数据对象转为 JSON 的字符串封装到不同的 Topic 中, 再通过 Producer 来发送出去, 即完成了发送至 Kafka 的方法实现. 代码实现如下:
- private void sendMessage(AppLogEntity e) {
- // 创建配置对象
- Properties properties = new Properties();
- properties.put("metadata.broker.list", "s202:9092");
- properties.put("serializer.class", "kafka.serializer.StringEncoder");
- properties.put("request.required.acks", "1");
- // 创建生产者
- Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(properties));
- sendSingleLog(producer, Constants.TOPIC_APP_STARTUP, e.getAppStartupLogs());
- sendSingleLog(producer,Constants.TOPIC_APP_ERRROR,e.getAppErrorLogs());
- // 发送消息
- producer.close();
- }
- /*
- * 发送单个消息的方法
- */
- private void sendSingleLog(Producer<Integer, String> producer, String topic,
- AppBaseLog[] logs) {
- for (AppBaseLog log : logs) {
- String logMessage = JSONObject.toJSONString(log);
- KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, logMessage);
- producer.send(data);
- }
- }
四, Mock_Client 应用组件
Mock_Client 是可选组件, 用于模拟一个客户端向服务器发送带数据对象的请求方法, 来测试服务器的可用性.
实现的原理是基于一个 JSON 数据样本, 通过随机组合数据对象的属性并发送请求, 并获取响应代码来判断. 具体可以参考 GitHub 源码.
来源: https://www.cnblogs.com/leoliu168/p/9973691.html