- -05-07 18:56:18
- package com.amoscloud.log.analyze
- import java.text.SimpleDateFormat
- import java.util.Date
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
- object LogAnalyze1 {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName("LogAnalyze2")
- val sc = new SparkContext(conf)
- val data = sc.textFile("C:\\Users\\Administrator\\Desktop\\HTTP.txt")
- data.cache()
- // 1.(手机号, 归属地, 设备品牌, 设备型号, 连接时长)
- // analyze1(data)
- // 2.(时间段秒, 访问流量)
- analyze2(data)
- // 3.(品牌, Array[(String,Int)]((型号 1, 个数 1),(型号 2, 个数 2)))
- // analyze(data)
- }
- private def analyze(data: RDD[String]) = {
- data.filter(_.split(",").length>= 72)
- .map(x => {
- val arr = x.split(",")
- val brand = arr(70)
- val model = arr(71)
- ((brand, model), 1)
- })
- .reduceByKey(_ + _)
- .map(t => {
- val k = t._1
- (k._1, (k._2, t._2))
- })
- .groupByKey()
- .collect()
- .foreach(println)
- }
- private def analyze2(data: RDD[String]) = {
- data.map(x => {
- val arr = x.split(",")
- val time = arr(16).take(arr(16).length - 4)
- val flow = arr(7).toLong
- (time, flow)
- })
- .reduceByKey(_ + _)
- // .map(x => (x._1, (x._2 / 1024.0).formatted("%.3f") + "KB"))
- .map(x => (x._1, x._2))
- .collect()
- .foreach(println)
- }
- private def analyze1(data: RDD[String]) = {
- data
- .filter(_.split(",").length>= 72)
- .map(x => {
- val arr = x.split(",")
- val phoneNum = arr(3).takeRight(11)
- val local = arr(61) + arr(62) + arr(63)
- val brand = arr(70)
- val model = arr(71)
- val connectTime = timeDiff(arr(16), arr(17))
- (phoneNum + "|" + local + "|" + brand + "|" + model, connectTime)
- // 1.(手机号, 归属地, 设备品牌, 设备型号, 连接时长)
- })
- .reduceByKey(_ + _)
- .map(t => (t._1, formatTime(t._2)))
- .collect()
- .foreach(println)
- }
- def timeDiff(time1: String, time2: String): Long = {
- val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
- val timeStamp2 = sdf.parse(time2.take(time2.length - 4)).getTime + time2.takeRight(3).toLong
- val timeStamp1 = sdf.parse(time1.take(time1.length - 4)).getTime + time1.takeRight(3).toLong
- timeStamp2 - timeStamp1
- }
- def formatTime(time: Long): String = {
- val timeS = time / 1000
- val s = timeS % 60
- val m = timeS / 60 % 60
- val h = timeS / 60 / 60 % 24
- h + ":" + m + ":" + s
- }
- }
2: 写 spark 程序统计 iis 网站请求日志中 每天每个小时段成功访问 ip 的数量
- package com.amoscloud.log.analyze
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
- import scala.collection.mutable
- object LogAnalyze {
- def main(args: Array[String]): Unit = {
- // 写 spark 程序统计 iis 网站请求日志中 每天每个小时段成功访问 ip 的数量
- // 获取 sc
- val conf = new SparkConf().setAppName("LogAnalyze").setMaster("local[2]")
- val sc = new SparkContext(conf)
- // 读取数据
- val log: RDD[String] = sc.textFile("C:\\Users\\Administrator\\Desktop\\iis 网站请求日志")
- // 将日志中, 日期, 时间, IP 和响应码 保留
- log
- .filter(_.split("\\s").length> 10)
- .map(line => {
- val strings = line.split("\\s+")
- //RDD[(String,String,String,String)]
- (strings(0), strings(1).split(":")(0), strings(8), strings(10))
- })
- //RDD[(String,String,String,String)]
- .filter(_._4 == "200")
- //RDD[(日期 | 时间, IP)]
- .map(t => (t._1 + "|" + t._2, t._3))
- //RDD[(日期 | 时间, Iterable[IP])]
- .groupByKey()
- .map(t => (t._1, t._2.toList.size, t._2.toList.distinct.size))
- .collect()
- .foreach(t => {
- val spl = t._1.split("\\|")
- printf("%s\t%s\t%d\t%d\n", spl(0), spl(1), t._2, t._3)
- })
- // 数据按照 日期和时间进行分区 相同 key 的数据都在同一个分区中
- // .partitionBy(new HashPartitioner(48))
- // .mapPartitions((iter: Iterator[(String, String)]) => {
- // val set = mutable.HashSet[String]()
- // var count = 0
- // var next = ("","")
- // while (iter.hasNext) {
- // next = iter.next()
- // count += 1
- // set.add(next._2)
- // }
- // ((next._1, count, set.size) :: Nil).iterator
- // })
- // .filter(_._1.nonEmpty)
- }
- }
更灵活的运用 spark 算子, 意味着写更少的代码
2019-05-07 19:06:57
来源: http://www.bubuko.com/infodetail-3050168.html