Broadcast 广播变量:可以理解为是一个公共的共享变量,我们可以把一个dataset 或者不变的缓存对象(例如map list集合对象等)数据集广播出去,然后不同的任务在节点上都能够获取到,并在每个节点上只会存在一份,而不是在每个并发线程中存在。如果不使用broadcast,则在每个节点中的每个任务中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。
import org.apache.flink.api.common.functions.RichMapFunctionimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.configuration.Configurationimport scala.collection.mutable.ListBufferobject BatchDemoBroadcastScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //1: 准备需要广播的数据 val broadData = ListBuffer[Tuple2[String,Int]]() broadData.append(("zs",18)) broadData.append(("ls",20)) broadData.append(("ww",17)) //1.1处理需要广播的数据 val tupleData = env.fromCollection(broadData) val toBroadcastData = tupleData.map(tup=>{ Map(tup._1->tup._2) }) val text = env.fromElements("zs","ls","ww") val result = text.map(new RichMapFunction[String,String] { var listData: java.util.List[Map[String,Int]] = null var allMap = Map[String,Int]() override def open(parameters: Configuration): Unit = { super.open(parameters) this.listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastMapName") val it = listData.iterator() while (it.hasNext){ val next = it.next() allMap = allMap.++(next) } } override def map(value: String) = { val age = allMap.get(value).get value+","+age } }).withBroadcastSet(toBroadcastData,"broadcastMapName") result.print() }}
1、设置广播变量
在某个需要用到该广播变量的算子后调用withBroadcastSet(var1, var2)进行设置,var1为需要广播变量的变量名,var2是自定义变量名,为String类型。注意,被广播的变量只能为DataSet类型,不能为List、Int、String等类型。2、获取广播变量
创建该算子对应的富函数类,例如map函数的富函数类是RichMapFunction,该类有两个构造参数,第一个参数为算子输入数据类型,第二个参数为算子输出数据类型。首先创建一个Traversable[_]接口用于接收广播变量并初始化为空,接收类型与算子输入数据类型相对应;然后重写open函数,通过getRuntimeContext.getBroadcastVariable[_](var)获取到广播变量,var即为设置广播变量时的自定义变量名,类型为String,open函数在算子生命周期的初始化阶段便会调用;最后在map方法中对获取到的广播变量进行访问及其它操作。
参考:
https://blog.csdn.net/fct2001140269/article/details/84402798
https://blog.csdn.net/qq_34842671/article/details/80746593