在这篇文章里,我们模拟了一个场景,实时分析订单数据,统计实时收益。
场景模拟
我试图覆盖工程上最为常用的一个场景:
1)首先,向Kafka里实时的写入订单数据,JSON格式,包含订单ID-订单类型-订单收益
2)然后,spark-streaming每十秒实时去消费kafka中的订单数据,并以订单类型分组统计收益
3)最后,spark-streaming统计结果实时的存入本地MySQL。
前提条件
安装
1)spark:我使用的yarn-client模式下的spark,环境中集群客户端已经搞定
2)zookeeper:我使用的是这个集群:10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181
3)kafka:我使用的是standalone模式:10.93.21.21:9093
4)mysql:10.93.84.53:3306
语言
python:pykafka,pip install pykafka
java:spark,spark-streaming
下面开始
1、数据写入kafka
我们使用pykafka模拟数据实时写入,代码如下:
kafka_producer.py
- # -* coding:utf8 *-
- import time
- import json
- import uuid
- import random
- import threading
- from pykafka import KafkaClient
- # 创建kafka实例
- hosts = '10.93.21.21:9093'
- client = KafkaClient(hosts=hosts)
- # 打印一下有哪些topic
- print client.topics
- # 创建kafka producer句柄
- topic = client.topics['kafka_spark']
- producer = topic.get_producer()
- # work
- def work():
- while 1:
- msg = json.dumps({
- "id": str(uuid.uuid4()).replace('-', ''),
- "type": random.randint(1, 5),
- "profit": random.randint(13, 100)})
- producer.produce(msg)
- # 多线程执行
- thread_list = [threading.Thread(target=work) for i in range(10)]
- for thread in thread_list:
- thread.setDaemon(True)
- thread.start()
- time.sleep(60)
- # 关闭句柄, 退出
- producer.stop()
可以看到,我们写入的形式是一个json,订单id是一个uuid,订单类型type从1-5随机,订单收益profit从13-100随机,形如
- {"id": ${uid}, "type": 1, "profit": 30}
注意:1)python对kafka的读写不需要借助zookeeper,2)使用多线程的形式写入,让数据量具有一定的规模。
执行producer,会持续写入数据1分钟。
- python kafka_producer.py
kafka_consumer.py
- # -* coding:utf8 *-
- from pykafka import KafkaClient
- hosts = '10.93.21.21:9093'
- client = KafkaClient(hosts=hosts)
- # 消费者
- topic = client.topics['kafka_spark']
- consumer = topic.get_simple_consumer(consumer_group='test', auto_commit_enable=True, auto_commit_interval_ms=1,
- consumer_id='test')
- for message in consumer:
- if message is not None:
- print message.offset, message.value
执行,可以消费kafka刚才写入的数据
- python kafka_consumer.py
2、spark-streaming
1)先解决依赖
其中比较核心的是spark-streaming和kafka集成包spark-streaming-kafka_2.10,还有spark引擎spark-core_2.10
json和mysql看大家爱好。
pom.xml
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka_2.10</artifactId>
- <version>1.6.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.10</artifactId>
- <version>1.6.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
- <version>1.6.0</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.19</version>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.38</version>
- </dependency>
- <dependency>
- <groupId>commons-dbcp</groupId>
- <artifactId>commons-dbcp</artifactId>
- <version>1.4</version>
- </dependency>
- </dependencies>
2)MySQL准备
我们的结果去向是MySQL,先建立一个结果表。
id:主键,自增id
type:订单类型
profit:每个spark batch聚合出的订单收益结果
time:时间戳
- CREATE TABLE `order` (
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `type` int(11) DEFAULT NULL,
- `profit` int(11) DEFAULT NULL,
- `time` mediumtext,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=56 DEFAULT CHARSET=utf8
采用了单例线程池的模式简单写了一下。
ConnectionPool.java
- package com.xiaoju.dqa.realtime_streaming;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.util.LinkedList;
- public class ConnectionPool {
- private static LinkedList < Connection > connectionQueue;
- static {
- try {
- Class.forName("com.mysql.jdbc.Driver");
- } catch(ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
- public synchronized static Connection getConnection() {
- try {
- if (connectionQueue == null) {
- connectionQueue = new LinkedList < Connection > ();
- for (int i = 0; i < 5; i++) {
- Connection conn = DriverManager.getConnection("jdbc:mysql://10.93.84.53:3306/big_data", "root", "1234");
- connectionQueue.push(conn);
- }
- }
- } catch(Exception e) {
- e.printStackTrace();
- }
- return connectionQueue.poll();
- }
- public static void returnConnection(Connection conn) {
- connectionQueue.push(conn);
- }
- }
3)代码实现
我用java写的,不会用scala很尴尬。
即时用java整个的处理过程依然比较简单。跟常见的wordcount也没有多大的差别。
SparkStreaming特点
spark的特点就是RDD,通过对RDD的操作,来屏蔽分布式运算的复杂度。
而spark-streaming的操作对象是RDD的时间序列DStream,这个序列的生成是跟batch的选取有关。例如我这里Batch是10s一个,那么每隔10s会产出一个RDD,对RDD的切割和序列的生成,spark-streaming对我们透明了。唯一暴露给我们的DStream和原生RDD的使用方式基本一致。
这里需要讲解一下MySQL写入注意的事项。
MySQL写入
在处理mysql写入时使用了foreachPartition方法,即,在foreachPartition中使用borrow mysql句柄。
这样做的原因是:
1)你无法再Driver端创建mysql句柄,并通过序列化的形式发送到worker端
2)如果你在处理rdd中创建mysql句柄,很容易对每一条数据创建一个句柄,在处理过程中很快内存就会溢出。
OrderProfitAgg.java
- package com.xiaoju.dqa.realtime_streaming;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaPairRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.
- function.Function;
- import org.apache.spark.api.java.
- function.Function2;
- import org.apache.spark.api.java.
- function.PairFunction;
- import org.apache.spark.api.java.
- function.VoidFunction;
- import org.apache.spark.streaming.Durations;
- import org.apache.spark.streaming.api.java.JavaPairDStream;
- import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
- import org.apache.spark.streaming.kafka.KafkaUtils;
- import scala.Tuple2;
- import java.sql.Connection;
- import java.sql.Statement;
- import java.util. * ;
- /*
- * 生产者可以选用kafka自带的producer脚本
- * bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test
- * */
- public class OrderProfitAgg {
- public static void main(String[] args) throws InterruptedException {
- /*
- * kafka所注册的zk集群
- * */
- String zkQuorum = "10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181";
- /*
- * spark-streaming消费kafka的topic名称, 多个以逗号分隔
- * */
- String topics = "kafka_spark,kafka_spark2";
- /*
- * 消费组 group
- * */
- String group = "bigdata_qa";
- /*
- * topic的分区数
- * */
- int numThreads = 2;
- /*
- * 选用yarn队列模式, spark-streaming程序的app名称是"order profit"
- * */
- SparkConf sparkConf = new SparkConf().setMaster("yarn-client").setAppName("order profit");
- /*
- * 创建sc, 全局唯一, 设置logLevel可以打印一些东西到控制台
- * */
- JavaSparkContext sc = new JavaSparkContext(sparkConf);
- sc.setLogLevel("WARN");
- /*
- * 创建jssc, spark-streaming的batch是每10s划分一个
- * */
- JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));
- /*
- * 准备topicMap
- * */
- Map < String,
- Integer > topicMap = new HashMap < String,
- Integer > ();
- for (String topic: topics.split(",")) {
- topicMap.put(topic, numThreads);
- }
- /*
- * kafka数据流
- * */
- List < JavaPairReceiverInputDStream < String,
- String >> streams = new ArrayList < JavaPairReceiverInputDStream < String,
- String >> ();
- for (int i = 0; i < numThreads; i++) {
- streams.add(KafkaUtils.createStream(jssc, zkQuorum, group, topicMap));
- }
- /*
- * 从kafka消费数据的RDD
- * */
- JavaPairDStream < String,
- String > streamsRDD = streams.get(0);
- for (int i = 1; i < streams.size(); i++) {
- streamsRDD = streamsRDD.union(streams.get(i));
- }
- /*
- * kafka消息形如: {"id": ${uuid}, "type": 1, "profit": 35}
- * 统计结果, 以type分组的总收益
- * mapToPair, 将kafka消费的数据, 转化为type-profit key-value对
- * reduceByKey, 以type分组, 聚合profit
- * */
- JavaPairDStream < Integer,
- Integer > profits = streamsRDD.mapToPair(new PairFunction < Tuple2 < String, String > , Integer, Integer > () {@Override public Tuple2 < Integer,
- Integer > call(Tuple2 < String, String > s_tuple2) throws Exception {
- JSONObject jsonObject = JSON.parseObject(s_tuple2._2);
- return new Tuple2 < Integer,
- Integer > (jsonObject.getInteger("type"), jsonObject.getInteger("profit"));
- }
- }).reduceByKey(new Function2 < Integer, Integer, Integer > () {@Override public Integer call(Integer i1, Integer i2) throws Exception {
- return i1 + i2;
- }
- });
- /*
- * 输出结果到MySQL
- * 需要为每一个partition创建一个MySQL句柄, 使用foreachPartition
- * */
- profits.foreachRDD(new Function < JavaPairRDD < Integer, Integer > , Void > () {@Override public Void call(JavaPairRDD < Integer, Integer > integerIntegerJavaPairRDD) throws Exception {
- integerIntegerJavaPairRDD.foreachPartition(new VoidFunction < Iterator < Tuple2 < Integer, Integer >>> () {@Override public void call(Iterator < Tuple2 < Integer, Integer >> tuple2Iterator) throws Exception {
- Connection connection = ConnectionPool.getConnection();
- Statement stmt = connection.createStatement();
- long timestamp = System.currentTimeMillis();
- while (tuple2Iterator.hasNext()) {
- Tuple2 < Integer,
- Integer > tuple = tuple2Iterator.next();
- Integer type = tuple._1;
- Integer profit = tuple._2;
- String sql = String.format("insert into `order` (`type`, `profit`, `time`) values (%s, %s, %s)", type, profit, timestamp);
- stmt.executeUpdate(sql);
- }
- ConnectionPool.returnConnection(connection);
- }
- });
- return null;
- }
- });
- /*
- * 开始计算, 等待计算结束
- * */
- jssc.start();
- try {
- jssc.awaitTermination();
- } catch(Exception ex) {
- ex.printStackTrace();
- } finally {
- jssc.close();
- }
- }
- }
4)打包方法
编写pom.xml build tag。
mvn clean package打包即可。
pom.xml
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <archive>
- <manifest>
- <!--这里要替换成jar包main方法所在类 -->
- <!--<mainClass>com.bigdata.qa.hotdog.driver.WordCount</mainClass>-->
- <mainClass>com.xiaoju.dqa.realtime_streaming.OrderProfitAgg</mainClass>
- </manifest>
- </archive>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id> <!-- this is used for inheritance merges -->
- <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
3、执行与结果
1)执行kafka_producer.py
- python kafka_producer.py
2) 执行spark-streaming
这里使用的是默认参数提交yarn队列。
- spark - submit--queue = root.XXXX realtime - streaming - 1.0 - SNAPSHOT - jar - with - dependencies.jar
3)查看结果
到MySQL中查看结果,每隔10秒会聚合出type=1-5的5条数据。
例如第一条数据,就是type=4这种类型的业务,在10s内收益是555473元。业务量惊人啊。哈哈。
完结。
来源: http://www.cnblogs.com/kangoroo/p/7754581.html