windows7 64、intellij idea 14.1.5、spark-1.5.2、scala 2.0.4、java1.7、maven3.05
将 spark 中的 assembly 包引入即可使用 local 模式运行相关的 scala 任务, 注意不要使用 scala2.11,非要使用的话先用这个版本的 scala 编译一遍 spark 哈
先附上 pom.xml 中的 jar 包依赖部分
- <dependencies>
- <dependency>
- <groupId>
- org.scala-lang
- </groupId>
- <artifactId>
- scala-library
- </artifactId>
- <version>
- 2.10.4
- </version>
- </dependency>
- <dependency>
- <groupId>
- org.scala-lang
- </groupId>
- <artifactId>
- scala-compiler
- </artifactId>
- <version>
- 2.10.4
- </version>
- </dependency>
- <dependency>
- <groupId>
- org.scala-lang
- </groupId>
- <artifactId>
- scala-reflect
- </artifactId>
- <version>
- 2.10.4
- </version>
- </dependency>
- <dependency>
- <groupId>
- log4j
- </groupId>
- <artifactId>
- log4j
- </artifactId>
- <version>
- 1.2.12
- </version>
- </dependency>
- <dependency>
- <groupId>
- com.google.collections
- </groupId>
- <artifactId>
- google-collections
- </artifactId>
- <version>
- 1.0
- </version>
- </dependency>
- <dependency>
- <groupId>
- org.apache.spark
- </groupId>
- <artifactId>
- spark-core_2.10
- </artifactId>
- <version>
- 1.5.2
- </version>
- </dependency>
- <dependency>
- <groupId>
- mysql
- </groupId>
- <artifactId>
- mysql-connector-java
- </artifactId>
- <version>
- 5.1.29
- </version>
- </dependency>
- <dependency>
- <groupId>
- org.apache.spark
- </groupId>
- <artifactId>
- spark-streaming_2.10
- </artifactId>
- <version>
- 1.5.2
- </version>
- <scope>
- provided
- </scope>
- </dependency>
- <dependency>
- <groupId>
- org.apache.spark
- </groupId>
- <artifactId>
- spark-streaming-kafka_2.10
- </artifactId>
- <version>
- 1.5.2
- </version>
- </dependency>
- <dependency>
- <groupId>
- com.jolbox
- </groupId>
- <artifactId>
- bonecp
- </artifactId>
- <version>
- 0.8.0.RELEASE
- </version>
- </dependency>
- <dependency>
- <groupId>
- postgresql
- </groupId>
- <artifactId>
- postgresql
- </artifactId>
- <version>
- 9.2-1002.jdbc4
- </version>
- </dependency>
- </dependencies>
连接池,利用 boncp 创建连接池供使用,引用自网上的代码,详细可看参考资料
- /**
- * Created by Administrator on 2016/2/29.
- * 参考资料 http://www.myexception.cn/mysql/1934808.html
- */
- import java.sql. {
- Connection,
- ResultSet
- }
- import com.jolbox.bonecp. {
- BoneCP,
- BoneCPConfig
- }
- import org.slf4j.LoggerFactory object ConnectionPool {
- val logger = LoggerFactory.getLogger(this.getClass) private val connectionPool = {
- try {
- Class.forName("com.mysql.jdbc.Driver") val config = new BoneCPConfig() config.setJdbcUrl("jdbc:mysql://localhost:3306/test") config.setUsername("etl") config.setPassword("xxxxx") config.setLazyInit(true) config.setMinConnectionsPerPartition(3) config.setMaxConnectionsPerPartition(5) config.setPartitionCount(5) config.setCloseConnectionWatch(true) config.setLogStatementsEnabled(false) Some(new BoneCP(config))
- } catch {
- case exception:
- Exception = >logger.warn("Error in creation of connection pool" + exception.printStackTrace()) None
- }
- }
- def getConnection: Option[Connection] = {
- connectionPool match {
- case Some(connPool) = >Some(connPool.getConnection)
- case None = >None
- }
- }
- def closeConnection(connection: Connection) : Unit = {
- if (!connection.isClosed) {
- connection.close()
- }
- }
- }
ssc 程序,数据示例如下,删除了一些关键数据 (不影响此次处理),__开头的 key 为系统自带属性.
- __clientip = 10.10.9.153 & paymentstatus = 0 & __opip = &memberid = 89385239 & iamount = 1 & itype = 16 & oper_res = 1 & channeltype = 8 & __timestamp = 1457252427 & productid = 112 & selectbank = &icount = 0 & ordersrc = web & paymentip = 61.159.104.134 & orderdate = 2016 - 03 - 06 16 : 19 : 55 & subjecttype = zheanaiMessenger & oper_type = 1 & paydate = &orderamount = 259.0 & paymentchannel = 16 & oper_time = 2016 - 03 - 06 16 : 20 : 27 & orderid = 127145727 & iunit = month & bussinessid = 80125727 & isuse = 0 __clientip = 10.10.9.175 & paymentstatus = 0 & __opip = &memberid = 89378034 & iamount = 12 & itype = 17 & oper_res = 1 & channeltype = 75 & __timestamp = 1457252429 & productid = 124 & selectbank = &icount = 0 & ordersrc = 100 & paymentip = 59.37.137.119 & orderdate = 2016 - 03 - 06 16 : 20 : 29 & subjecttype = zheanaiMessenger & oper_type = 0 & paydate = &orderamount = 388.0 & paymentchannel = 1028 & oper_time = 2016 - 03 - 06 16 : 20 : 29 & orderid = 127145736 & iunit = month & bussinessid = 8012580 & isuse = 0 __clientip = 10.10.9.153 & paymentstatus = 0 & __opip = &memberid = 75372899 & iamount = 12 & itype = 16 & oper_res = 1 & channeltype = &__timestamp = 1457252286 & productid = 131 & selectbank = &icount = 0 & ordersrc = web & paymentip = 113.226.244.206 & orderdate = 2016 - 03 - 06 16 : 18 : 06 & subjecttype = zheanaiMessenger & oper_type = 0 & paydate = &orderamount = 99.0 & paymentchannel = 307 & oper_time = 2016 - 03 - 06 16 : 18 : 06 & orderid = 127145700 & iunit = month & bussinessid = 80125477 & isuse = 0 __clientip = 10.10.9.175 & paymentstatus = 0 & __opip = &memberid = 87634711 & iamount = 1 & itype = 16 & oper_res = 1 & channeltype = 8 & __timestamp = 1457252432 & productid = 129 & selectbank = &icount = 0 & ordersrc = web & paymentip = 114.246.35.251 & orderdate = 2016 - 03 - 06 16 : 19 : 05 & subjecttype = zheanaiMessenger & oper_type = 1 & paydate = &orderamount = 19.0 & paymentchannel = 16 & oper_time = 2016 - 03 - 06 16 : 20 : 32 & orderid = 127145713 & iunit = month & bussinessid = 66213022 & isuse = 0 __clientip = 10.10.9.153 & paymentstatus = 0 & __opip = &memberid = 89172717 & iamount = 12 & itype = 17 & oper_res = 1 & channeltype = 77 & __timestamp = 1457252371 & productid = 124 & selectbank = &icount = 0 & ordersrc = 4 & paymentip = 111.126.43.83 & orderdate = 2016 - 03 - 06 16 : 19 : 31 & subjecttype = zheanaiMessenger & oper_type = 0 & paydate = &orderamount = 388.0 & paymentchannel = 1116 & oper_time = 2016 - 03 - 06 16 : 19 : 31 & orderid = 127145723 & iunit = month & bussinessid = 8012568 & isuse = 0
主要操作如下
读取,ssc 自带的 receiver
解析 (valueSplit 方法 处理成 kv 格式)
过滤 filterRegex,类似 sql 中的 where 条件放弃一些不需要的数据,比如只需要买单的数据而不要下单数据
转换, getPlatform、getFormatDate,类似 case when
创建了一个 class 命名为 result,重写了 toString 方法。该 class 存放从 kafka 中处理后的所有需要的数据字段。
写入 mysql,insertIntoMySQL,方法在每个 partition 中调用
另外代码中使用了 getOrCreate 以便恢复,利用了计数器简单统计了一下有效记录数
代码如下
- import java.text.SimpleDateFormat import java.util.Date import com.zhenai.SqlConnection.ConnectionPool import java.sql.Connection import org.apache.log4j.PropertyConfigurator import org.apache.spark.rdd.RDD import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming. {
- Time,
- Seconds,
- StreamingContext
- }
- import org.apache.spark. {
- SparkContext,
- SparkConf
- }
- import org.joda.time.DateTime import org.slf4j.LoggerFactory import scala.collection.mutable.Map
- /**
- * Created by Administrator on 2016/2/25.
- */
- object KafkaStreaming {
- val logger = LoggerFactory.getLogger(this.getClass) PropertyConfigurator.configure(System.getProperty("user.dir") + "\\src\\log4j.properties")
- case class result(ftime:
- String, hour: String, orderid: Long, memberid: Long, platform: String, iamount: Double, orderamount: Double) extends Serializable {
- override def toString: String = "%s\t%s\t%d\t%d\t%s\t%.2f\t%.2f".format(ftime, hour, orderid, memberid, platform, iamount, orderamount)
- }
- def getFormatDate(date: Date, format: SimpleDateFormat) : String = {
- format.format(date)
- }
- def stringFormatTime(time: String, simpleformat: SimpleDateFormat) : Date = {
- simpleformat.parse(time)
- }
- // kafka中的value解析为Map
- def valueSplit(value: String) : Map[String, String] = {
- val x = value.split("&") val valueMap: Map[String, String] = Map() x.foreach {
- kvs = >
- if (!kvs.startsWith("__")) {
- val kv = kvs.split("=") if (kv.length == 2) {
- valueMap += (kv(0) - >kv(1))
- }
- }
- }
- valueMap
- }
- // 实现类似where的条件,tips:优先过滤条件大的减少后续操作
- def filterRegex(map: Map[String, String]) : Boolean = {
- //过滤操作类型,控制为支付操作
- val oper_type = map.getOrElse("oper_type", "-1") if (!oper_type.equals("2") && !oper_type.equals("3")) return false
- // 过滤未支付成功记录
- if (!map.getOrElse("paymentstatus", "0").equals("1")) return false
- // 过滤无效支付ip
- val paymentip = map.getOrElse("paymentip", null) if (paymentip.startsWith("10.10") || paymentip.startsWith("183.62.134") || paymentip.contains("127.0.0.1")) return false
- return true
- }
- // 实现类似 case when的方法,上报的p字段不一定为数值
- def getPlatform(p: String, x: Int) : String = {
- val platformname = (p, x) match {
- case(p, x) if (Array[String]("1", "2", "3").contains(p)) = >"wap"
- case(p, x) if (Array[String]("4", "8").contains(p) && x != 18) = >"andriod"
- case(p, x) if ((Array[String]("5", "7", "51", "100").contains(p)) && (p != 18)) = >"ios"
- case _ = >"pc"
- }
- platformname
- }
- // 数据库写入
- def insertIntoMySQL(con: Connection, sql: String, data: result) : Unit = {
- // println(data.toString)
- try {
- val ps = con.prepareStatement(sql) ps.setString(1, data.ftime) ps.setString(2, data.hour) ps.setLong(3, data.orderid) ps.setLong(4, data.memberid) ps.setString(5, data.platform) ps.setDouble(6, data.iamount) ps.setDouble(7, data.orderamount) ps.executeUpdate() ps.close()
- } catch {
- case exception:
- Exception = >logger.error("Error in execution of query " + exception.getMessage + "\n-----------------------\n" + exception.printStackTrace() + "\n-----------------------------")
- }
- }
- def createContext(zkqurm: String, topic: scala.Predef.Map[String, Int], checkPointDir: String) : StreamingContext = {
- val simpleformat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val dateFormat = new SimpleDateFormat("yyyyMMdd") val timeFormat = new SimpleDateFormat("HH:mm") val sql = "insert into t_ssc_toufang_result_mi(ftime,hour,orderid,memberid,platform,iamount,orderamount) values(?,?,?,?,?,?,?);"val conf = new SparkConf() conf.setAppName("Scala Streaming read kafka")
- // VM option -Dspark.master=local
- // conf.setMaster("local[4]")
- val sc = new SparkContext(conf) val totalcounts = sc.accumulator(0L, "Total count") val ssc = new StreamingContext(sc, Seconds(60))
- //ssc.checkpoint(checkPointDir)
- //统计各平台最近一分钟实时注册收入 时间段,平台,金额,订单数
- val lines = KafkaUtils.createStream(ssc, zkqurm, "mytopic_local", topic).map(_._2) val filterRecord = lines.filter(x = >!x.isEmpty).map(valueSplit).filter(filterRegex).map {
- x = >val orderdate = stringFormatTime(x.getOrElse("orderdate", null), simpleformat) val day = getFormatDate(orderdate, dateFormat) val hour = getFormatDate(orderdate, timeFormat) var orderamount = x.getOrElse("orderamount", "0").toDouble
- if (x.getOrElse("oper_type", -1) == 3) orderamount = -1 * orderamount val res = new result(day, hour, x.getOrElse("orderid", null).toLong, x.getOrElse("memberid", null).toLong, getPlatform(x.getOrElse("ordersrc", null), x.getOrElse("itype", null).toInt), x.getOrElse("iamount", "0").toDouble, orderamount) res
- }
- filterRecord.foreachRDD((x: RDD[result], time: Time) = >{
- if (!x.isEmpty()) {
- // 打印一下这一批batch的处理时间段以及累计的有效记录数(不含档次)
- println("--" + new DateTime(time.milliseconds).toString("yyyy-MM-dd HH:mm:ss") + "--totalcounts:" + totalcounts.value + "-----") x.foreachPartition {
- res = >{
- if (!res.isEmpty) {
- val connection = ConnectionPool.getConnection.getOrElse(null) res.foreach {
- r: result = >totalcounts.add(1L) insertIntoMySQL(connection, sql, r)
- }
- ConnectionPool.closeConnection(connection)
- }
- }
- }
- }
- }) ssc
- }
- // =================================================================
- def main(args: Array[String]) : Unit = {
- val zkqurm = "10.10.10.177:2181,10.10.10.175:2181,10.10.10.179:2181"val topic = scala.Predef.Map("t_fw_00015" - >30) val checkPointDir = "/user/root/sparkcheck"val ssc = StreamingContext.getOrCreate(checkPointDir, () = >{
- createContext(zkqurm, topic, checkPointDir)
- }) ssc.start() ssc.awaitTermination()
- }
- }
补充 log4j.propertites 文件代码
- log4j.rootLogger = WARN,
- stdout,
- R log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern = %5p - %m % n log4j.appender.R = org.apache.log4j.RollingFileAppender log4j.appender.R.File = mapreduce_test.log log4j.appender.R.MaxFileSize = 10MB log4j.appender.R.MaxBackupIndex = 1 log4j.appender.R.layout = org.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern = %p % t % c - %m % n log4j.logger.com.codefutures = WARN
- 由于是在idea intellij上配置的maven工程,直接使用maven打包了
- 配置好要打包的类:file - >project structure - >artifacts,操作如下
选择 main 方法,点击确认,结果如下
发现结果包名为自己的项目名称,这里可以调整,选中 jar 一行右键重命名;引入了所有的类,选中全部右键 remove 掉,当然也可以根据需要选择保留需要的依赖,这里避免最终 jar 过大选择全部删掉
结果如下,注意有个输出路径,build 完之后在这个路径下找到 jar 包
点击确认返回界面,左上的导航栏选择 build ->build artifacts , 选择刚刚配置的名字 KafkaSsc:jar 进行 build
完成之后可到输出路径下发现有个 jar 包 KafkaStreaming.jar,可以使用 winrar 打开看看里面都写了什么
- 上传到linux下进行运行,指定两个jar包使用local模式,命令如下
- spark - submit--master local[4]\--class "com.xxxx.streaming.KafkaStreaming"\--jars spark - streaming - kafka - assembly_2.10 - 1.5.0.jar,
- mysql - connector - java - 5.1.29.jar\KafkaStreaming.jar
运行情况如下,错误信息为 log4j.properties 未设置好,这个代码注释掉或者修改路径即可
以上代码的不足
1、未考虑写入 mysql 失败的情况,处理方法: 可以在 insertIntoMySQL 中做容错处理,这个不行我写到另一个地方存放起来,后台再起一个线程定时把这些数据写到目标 mysql 中去
2、代码可以更简洁一点、mysql、zk 等参数应当支持配置文件处理
3、读取 kafka 数据建议 KafkaUtils.createStream 改为低阶 api 的实现 KafkaUtils.createDirectStream 去读取。具体可自行查询或者查看我的下一篇博客对这两个的一些总结
关于使用连接池说明:
1、不希望每写一条记录都创建一个连接,资源消耗大
2、分布式集群中,我们不知道这个数据记录会在那一台机,但是可以知道的是至少每一个 partition 里所有数据都是在一台机的
3、针对每个 parition 创建连接相对来说也很耗费资源,在处理时间段内整个 ssc 是大量 sc 的 job 组成的对 rdd 处理队列,存在多个 mysql 的长连接是必要的
为何不返回到 driver 端来执行呢?
个人认为在数据量不大的情况下这是可行的,完全可以返回所有的数据,并且批量写入 mysql;但是如果数据量大的话很影响效率
关于实时业务场景的使用
细心的同学可能已经发现,在上面的 ssc 程序中并没有使用 reduce 之类的聚合操作,这个其实关系到一个业务场景,这个数据出来主要是用于报表的;首先这个数据量实在是很小,每分钟有效的不超过 10 条记录(实际上可以直接 mysql 搞掂,当然并没有这样干);另外针对收入这一块,运营人员可能有很多维度需要查询,而且需求是变动的,这个时候数据还是尽量明细保留下来的好,避免需求变动带来的频繁修改代码。
以上,如有不足欢迎留言讨论
错误一、scala 版本错误
Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
解决方法:改为使用 2.10 版本
错误二、客户端调试 ssc 程序访问 kafka 异常
WARN - [mytopic_local_ZA6600-1456571483267-f287bec2-leader-finder-thread], Failed to find leader for Set([mytopic,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(mytopic)] from broker [ArrayBuffer(id:0,host:bj-230,port:9092)] failed
at kafka.client.ClientUtils
at kafka.consumer.ConsumerFetcherManager
来源: http://lib.csdn.net/article/spark/42228