关于 Spark Streaming 中的任务有如下几个概念:
其实 Job,Stage,Task 都是 Spark Core 里就有的概念,Batch 则是 Streaming 特有的概念。同一 Stage 里的 Task 一般都是并行的。同一 Job 里的 Stage 可以并行,但是一般如果有依赖则是串行, 可以参考我这篇文章 。
Job 的并行度复杂些,由两个配置决定:
我们知道一个 Batch 可能会有多个 Action 执行,比如你注册了多个 Kafka 数据流,每个 Action 都会产生一个 Job, 所以一个 Batch 有可能是一批 Job, 也就是 JobSet 的概念,这些 Job 由 jobExecutor 依次提交执行, 而 JobExecutor 是一个默认池子大小为 1 的线程池,所以只能执行完一个 Job 再执行另外一个 Job。这里说的池子,他的大小就是由
控制的。
- spark.streaming.concurrentJobs
concurrentJobs 其实决定了向 Spark Core 提交 Job 的并行度。提交一个 Job,必须等这个执行完了,才会提交第二个。假设我们把它设置为 2,则会并发的把 Job 提交给 Spark Core,Spark 有自己的机制决定如何运行这两个 Job, 这个机制其实就是 FIFO 或者 FAIR(决定了资源的分配规则)。默认是 FIFO, 也就是先进先出,你把 concurrentJobs 设置为 2,但是如果底层是 FIFO, 那么会优先执行先提交的 Job, 虽然如此,如果资源够两个 job 运行,还是会并行运行两个 Job。
我们搞个例子来论证下上面的结论:
- object JobTest {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf()
- conf.setAppName("test")
- conf.setMaster("local[2]")
- conf.set("spark.streaming.concurrentJobs", "2")
- val sc = new StreamingContext(conf, Seconds(10))
- val input = new TestInputStream[String](sc, Seq(Seq("1", "2", "3"), Seq("1", "2", "3"), Seq("1", "2", "3")), 2)
- val input2 = new TestInputStream[String](sc, Seq(Seq("1", "2", "3"), Seq("1", "2", "3"), Seq("1", "2", "3")), 2)
- input.map{f=>
- Thread.sleep(5000)
- f
- }.foreachRDD(f=>f.count())
- input2.map{f=>
- Thread.sleep(5000)
- f
- }.foreachRDD(f=>f.count())
- sc.start()
- sc.awaitTermination()
- }
- }
上面的 TestInputStream 的签名如下:
- class TestInputStream[T: ClassTag](_ssc: StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
- extends InputDStream[T](_ssc) {
所以等同于一个数据源,其中最后䘝参数是分区数。这里,我们把 concurrentJobs 设置为 2,意味着 TaskScheduler 接受到了两个 Job, 然后 setMaster[local(2)] 表示只可以并发执行两个 Task。因为 input1,input2 每个 batch 至少都有 3 个元素,每个元素需要运行 5 秒,所以有一个 task 需要运行两个元素,那么第一次 input2 需要运行 10 秒。input2 在运行五秒后,空出了一个线程,这个时候 input1 的 job 开始运行,到第十秒的时候,input2 完成,input1 开始运行也已经完成一个元素的计算,这个时候启动另外两个元素运行。所以 input2 花了 10 秒,input1 花了 15 秒,但是因为 input1 被延时了五秒才得以运行,所以 input2 其实相当于花了 20 秒。这段话是解释下面那张图。
WX20170211-225643@2x.png
接着呢,input2 在剩下两条记录处理的 10 秒过程中,其实第二个周期已经开始了,input1 的任务又得以开始运行,这个时候因为只有一个线程可以用,所以运行了两个元素,input2 处理完成,空出线程,第二个周期的 input2 继续调度,input1 的剩下的一个元素也继续运行,最后 input1,input2 都花了 15 秒。
WX20170211-230145@2x.png
有点绕,如果大家迷惑,可以把代码贴在自己的 IDE 上运行一下,然后观察他们的交错时间。
如果我们再做个调整:
- conf.setMaster("local[4]")
- conf.set("spark.streaming.concurrentJobs", "3")
- conf.set("spark.scheduler.mode", "FIFO")
- val sc = new StreamingContext(conf, Seconds(5))
你会发现,不同 batch 的 job 其实也可以并行运行的,这里需要有几个条件:
Mode 是 FAIR 则尽力保证你的 Job 是并行运行的,毫无疑问是可以并行的。
回到我们的标题,不同 Batch 的 job 有可能会同时在运行么,只要满足我前面提到的三个条件,就有可能。
来源: