1、为什么引入 Backpressure
默认情况下,Spark Streaming 通过 Receiver 以生产者生产数据的速率接收数据,计算过程中会出现 batch processing time > batch interval 的情况,其中 batch processing time 为实际计算一个批次花费时间, batch interval 为 Streaming 应用设置的批处理间隔。这意味着 Spark Streaming 的数据接收速率高于 Spark 从队列中移除数据的速率,也就是数据处理能力低,在设置间隔内不能完全处理当前接收速率接收的数据。如果这种情况持续过长的时间,会造成数据在内存中堆积,导致 Receiver 所在 Executor 内存溢出等问题(如果设置 StorageLevel 包含 disk, 则内存存放不下的数据会溢写至 disk, 加大延迟)。Spark 1.5 以前版本,用户如果要限制 Receiver 的数据接收速率,可以通过设置静态配制参数 "
" 的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer 数据生产高于 maxRate,当前集群处理能力也高于 maxRate,这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力,Spark Streaming 从 v1.5 开始引入反压机制(back-pressure), 通过动态控制数据接收速率来适配集群数据处理能力。
- spark.streaming.receiver.maxRate
2、Backpressure
Spark Streaming Backpressure: 根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。通过属性 "
" 来控制是否启用 backpressure 机制,默认值 false,即不启用。
- spark.streaming.backpressure.enabled
2.1 Streaming 架构如下图所示(详见 Streaming 数据接收过程文档和 Streaming 源码解析)
2.2 BackPressure 执行过程如下图所示:
在原架构的基础上加上一个新的组件 RateController, 这个组件负责监听 "OnBatchCompleted" 事件,然后从中抽取 processingDelay 及 schedulingDelay 信息. Estimator 依据这些信息估算出最大处理速度(rate),最后由基于 Receiver 的 Input Stream 将 rate 通过 ReceiverTracker 与 ReceiverSupervisorImpl 转发给 BlockGenerator(继承自 RateLimiter).
3、BackPressure 源码解析
3.1 RateController 类体系
RatenController 继承自 StreamingListener. 用于处理 BatchCompleted 事件。核心代码为:
- * **A StreamingListener that receives batch completion updates,
- and maintains * an estimate of the speed at which this stream should ingest messages,
- *given an estimate computation from a`RateEstimator` * /
- private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
- extends StreamingListener with Serializable {
- ……
- …… / * **Compute the new rate limit and publish it asynchronously. * /
- private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
- Future[Unit] {
- val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
- newRate.foreach { s =>
- rateLimit.set(s.toLong)
- publish(getLatestRate())
- }
- }
- def getLatestRate(): Long = rateLimit.get()
- override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
- val elements = batchCompleted.batchInfo.streamIdToInputInfo
- for {
- processingEnd <- batchCompleted.batchInfo.processingEndTime
- workDelay <- batchCompleted.batchInfo.processingDelay
- waitDelay <- batchCompleted.batchInfo.schedulingDelay
- elems <- elements.get(streamUID).map(_.numRecords)
- } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
- }
- } /
3.2 RateController 的注册
JobScheduler 启动时会抽取在 DStreamGraph 中注册的所有 InputDstream 中的 rateController,并向 ListenerBus 注册监听. 此部分代码如下:
- def start() : Unit = synchronized {
- if (eventLoop != null) return // scheduler has already been started
- logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
- override protected def onReceive(event: JobSchedulerEvent) : Unit = processEvent(event)
- override protected def onError(e: Throwable) : Unit = reportError("Error in job scheduler", e)
- }
- eventLoop.start()
- // attach rate controllers of input streams to receive batch completion updates
- < span style = "color: #800000;" >
- for {
- inputDStream < -ssc.graph.getInputStreams rateController < -inputDStream.rateController
- }
- ssc.addStreamingListener(rateController) < /span>
- listenerBus.start()
- receiverTracker = new ReceiverTracker(ssc)
- inputInfoTracker = new InputInfoTracker(ssc)
- receiverTracker.start()
- jobGenerator.start()
- logInfo("Started JobScheduler")
- }/
3.3 BackPressure 执行过程分析
BackPressure 执行过程分为 BatchCompleted 事件触发时机和事件处理两个过程
3.3.1 BatchCompleted 触发过程
对 BatchedCompleted 的分析,应该从 JobGenerator 入手,因为 BatchedCompleted 是批次处理结束的标志,也就是 JobGenerator 产生的作业执行完成时触发的,因此进行作业执行分析。
Streaming 应用中 JobGenerator 每个 Batch Interval 都会为应用中的每个 Output Stream 建立一个 Job, 该批次中的所有 Job 组成一个 Job Set. 使用 JobScheduler 的 submitJobSet 进行批量 Job 提交。此部分代码结构如下所示
- /** Generate jobs and perform checkpoint for the given `time`. */
- private def generateJobs(time: Time) {
- // Set the SparkEnv in this thread, so that job generation code can access the environment
- // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
- // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
- SparkEnv.set(ssc.env)
- // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
- // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
- ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") Try {
- jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
- graph.generateJobs(time) // generate jobs using allocated block
- }
- match {
- case Success(jobs) = >val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) < span style = "color: #ff0000;" > jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) < /span> case Failure(e) =>
- jobScheduler.reportError("Error generating jobs for time " + time, e)
- }
- eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
- }/
其中,sumitJobSet 会创建固定数量的后台线程(具体由 "spark.streaming.concurrentJobs" 指定),去处理 Job Set 中的 Job. 具体实现逻辑为:
- def submitJobSet(jobSet: JobSet) {
- if (jobSet.jobs.isEmpty) {
- logInfo("No jobs added for time " + jobSet.time)
- } else {
- listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job = >jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time)
- }
- }
其中 JobHandler 用于执行 Job 及处理 Job 执行结果信息。当 Job 执行完成时会产生 JobCompleted 事件. JobHandler 的具体逻辑如下面代码所示:
- private class JobHandler(job: Job) extends Runnable with Logging {
- import JobScheduler._
- def run() {
- try {
- val formattedTime = UIUtils.formatBatchTime(job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val batchUrl = s "/streaming/batch/?id=${job.time.milliseconds}"val batchLinkText = s "[output operation ${job.outputOpId}, batch time ${formattedTime}]"
- ssc.sc.setJobDescription(s """Streaming job from <a href="$batchUrl ">$batchLinkText</a>""") ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
- // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
- // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
- ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
- // We need to assign `eventLoop` to a temp variable. Otherwise, because
- // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
- // it's possible that when `post` is called, `eventLoop` happens to null.
- var _eventLoop = eventLoop
- if (_eventLoop != null) {
- _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
- // Disable checks for existing output directories in jobs launched by the streaming
- // scheduler, since we may need to write output to an existing directory during checkpoint
- // recovery; see SPARK-4835 for more details.
- PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
- job.run()
- }
- _eventLoop = eventLoop
- if (_eventLoop != null) {
- _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
- }
- } else {
- // JobScheduler has been stopped.
- }
- } finally {
- ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
- }
- }
- }
- }
当 Job 执行完成时,向 eventLoop 发送 JobCompleted 事件。EventLoop 事件处理器接到 JobCompleted 事件后将调用 handleJobCompletion 来处理 Job 完成事件。handleJobCompletion 使用 Job 执行信息创建 StreamingListenerBatchCompleted 事件并通过 StreamingListenerBus 向监听器发送。实现如下:
- private def handleJobCompletion(job: Job, completedTime: Long) {
- val jobSet = jobSets.get(job.time) jobSet.handleJobCompletion(job) job.setEndTime(completedTime) listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo)) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) {
- jobSets.remove(jobSet.time) jobGenerator.onBatchCompletion(jobSet.time) logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(jobSet.totalDelay / 1000.0, jobSet.time.toString, jobSet.processingDelay / 1000.0)) listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
- }
- job.result match {
- case Failure(e) = >reportError("Error running job " + job, e)
- case _ = >
- }
- }
3.3.2、BatchCompleted 事件处理过程
StreamingListenerBus 将事件转交给具体的 StreamingListener,因此 BatchCompleted 将交由 RateController 进行处理。RateController 接到 BatchCompleted 事件后将调用 onBatchCompleted 对事件进行处理。
- override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
- val elements = batchCompleted.batchInfo.streamIdToInputInfo
- for {
- processingEnd < -batchCompleted.batchInfo.processingEndTime workDelay < -batchCompleted.batchInfo.processingDelay waitDelay < -batchCompleted.batchInfo.schedulingDelay elems < -elements.get(streamUID).map(_.numRecords)
- }
- computeAndPublish(processingEnd, elems, workDelay, waitDelay)
- }
onBatchCompleted 会从完成的任务中抽取任务的执行延迟和调度延迟,然后用这两个参数用 RateEstimator(目前存在唯一实现 PIDRateEstimator,proportional-integral-derivative (PID) controller, )估算出新的 rate 并发布。代码如下:
- /**
- * Compute the new rate limit and publish it asynchronously.
- */
- private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long) : Unit = Future[Unit] {
- val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay) newRate.foreach {
- s = >rateLimit.set(s.toLong) publish(getLatestRate())
- }
- }
其中 publish()由 RateController 的子类 ReceiverRateController 来定义。具体逻辑如下(ReceiverInputDStream 中定义):
- /**
- * A RateController that sends the new rate to receivers, via the receiver tracker.
- */
- private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator) extends RateController(id, estimator) {
- override def publish(rate: Long) : Unit = ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
- }
publish 的功能为新生成的 rate 借助 ReceiverTracker 进行转发。ReceiverTracker 将 rate 包装成 UpdateReceiverRateLimit 事交 ReceiverTrackerEndpoint
- /** Update a receiver's maximum ingestion rate */
- def sendRateUpdate(streamUID: Int, newRate: Long) : Unit = synchronized {
- if (isTrackerStarted) {
- endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
- }
- }
ReceiverTrackerEndpoint 接到消息后,其将会从 receiverTrackingInfos 列表中获取 Receiver 注册时使用的 endpoint(实为 ReceiverSupervisorImpl),再将 rate 包装成 UpdateLimit 发送至 endpoint. 其接到信息后,使用 updateRate 更新 BlockGenerators(RateLimiter 子类), 来计算出一个固定的令牌间隔。
- /** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
- private val endpoint = env.rpcEnv.setupEndpoint("Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
- override val rpcEnv: RpcEnv = env.rpcEnv
- override def receive: PartialFunction[Any, Unit] = {
- case StopReceiver = >logInfo("Received stop signal") ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
- case CleanupOldBlocks(threshTime) = >logDebug("Received delete old batch signal") cleanupOldBlocks(threshTime) < span style = "color: #ff0000;" >
- case UpdateRateLimit(eps) = >logInfo(s "Received a new rate limit: $eps.") registeredBlockGenerators.asScala.foreach {
- bg = >bg.updateRate(eps)
- } < /span> }
- })/
其中 RateLimiter 的 updateRate 实现如下:
- /**
- * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
- * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
- *
- * @param newRate A new rate in events per second. It has no effect if it's 0 or negative.
- */
- private[receiver] def updateRate(newRate: Long) : Unit =
- if (newRate > 0) {
- if (maxRateLimit > 0) {
- rateLimiter.setRate(newRate.min(maxRateLimit))
- } else {
- rateLimiter.setRate(newRate)
- }
- }
setRate 的实现 如下:
- public final void setRate(double permitsPerSecond) {
- Preconditions.checkArgument(permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
- synchronized(mutex) {
- resync(readSafeMicros());
- double stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond; //固定间隔
- this.stableIntervalMicros = stableIntervalMicros;
- doSetRate(permitsPerSecond, stableIntervalMicros);
- }
- }
到此,backpressure 反压机制调整 rate 结束。
4.流量控制点
当 Receiver 开始接收数据时,会通过 supervisor.pushSingle() 方法将接收的数据存入 currentBuffer 等待 BlockGenerator 定时将数据取走,包装成 block. 在将数据存放入 currentBuffer 之时,要获取许可(令牌)。如果获取到许可就可以将数据存入 buffer, 否则将被阻塞,进而阻塞 Receiver 从数据源拉取数据。
- /**
- * Push a single data item into the buffer.
- */
- def addData(data: Any) : Unit = {
- if (state == Active) { < span style = "color: #ff0000;" > waitToPush() //获取令牌
- < /span> synchronized {
- if (state == Active) {
- currentBuffer += data
- } else {
- throw new SparkException(
- "Cannot add data as BlockGenerator has not been started or has been stopped")
- }
- }
- } else {
- throw new SparkException(
- "Cannot add data as BlockGenerator has not been started or has been stopped")
- }
- }/
其令牌投放采用令牌桶机制进行, 原理如下图所示:
令牌桶机制: 大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。当进行某操作时需要令牌时会从令牌桶中取出相应的令牌数,如果获取到则继续操作,否则阻塞。用完之后不用放回。
Streaming 数据流被 Receiver 接收后,按行解析后存入 iterator 中。然后逐个存入 Buffer,在存入 buffer 时会先获取 token,如果没有 token 存在,则阻塞;如果获取到则将数据存入 buffer. 然后等价后续生成 block 操作。
来源: http://lib.csdn.net/article/spark/42230