Broadcast 广播变量: 可以理解为是一个公共的共享变量, 我们可以把一个 dataset 或者不变的缓存对象 (例如 map list 集合对象等) 数据集广播出去, 然后不同的任务在节点上都能够获取到, 并在每个节点上只会存在一份, 而不是在每个并发线程中存在. 如果不使用 broadcast, 则在每个节点中的每个任务中都需要拷贝一份 dataset 数据集, 比较浪费内存(也就是一个节点中可能会存在多份 dataset 数据).
- import org.apache.flink.API.common.functions.RichMapFunction
- import org.apache.flink.API.scala.ExecutionEnvironment
- import org.apache.flink.configuration.Configuration
- import scala.collection.mutable.ListBuffer
- object BatchDemoBroadcastScala {
- def main(args: Array[String]): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- import org.apache.flink.API.scala._
- //1: 准备需要广播的数据
- val broadData = ListBuffer[Tuple2[String,Int]]()
- broadData.append(("zs",18))
- broadData.append(("ls",20))
- broadData.append(("ww",17))
- //1.1 处理需要广播的数据
- val tupleData = env.fromCollection(broadData)
- val toBroadcastData = tupleData.map(tup=>{
- Map(tup._1->tup._2)
- })
- val text = env.fromElements("zs","ls","ww")
- val result = text.map(new RichMapFunction[String,String] {
- var listData: java.util.List[Map[String,Int]] = null
- var allMap = Map[String,Int]()
- override def open(parameters: Configuration): Unit = {
- super.open(parameters)
- this.listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastMapName")
- val it = listData.iterator()
- while (it.hasNext){
- val next = it.next()
- allMap = allMap.++(next)
- }
- }
- override def map(value: String) = {
- val age = allMap.get(value).get
- value+","+age
- }
- }).withBroadcastSet(toBroadcastData,"broadcastMapName")
- result.print()
- }
- }
1, 设置广播变量
在某个需要用到该广播变量的算子后调用 withBroadcastSet(var1, var2)进行设置, var1 为需要广播变量的变量名, var2 是自定义变量名, 为 String 类型. 注意, 被广播的变量只能为 DataSet 类型, 不能为 List,Int,String 等类型.
2,
获取广播变量
创建该算子对应的富函数类, 例如 map 函数的富函数类是 RichMapFunction, 该类有两个构造参数, 第一个参数为算子输入数据类型, 第二个参数为算子输出数据类型. 首先创建一个 Traversable[_]接口用于接收广播变量并初始化为空, 接收类型与算子输入数据类型相对应; 然后重写 open 函数, 通过 getRuntimeContext.getBroadcastVariable[_](var)获取到广播变量, var 即为设置广播变量时的自定义变量名, 类型为 String,open 函数在算子生命周期的初始化阶段便会调用; 最后在 map 方法中对获取到的广播变量进行访问及其它操作.
参考:
- https://blog.csdn.net/fct2001140269/article/details/84402798
- https://blog.csdn.net/qq_34842671/article/details/80746593
来源: http://www.bubuko.com/infodetail-3067336.html