博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
初识Flink广播变量broadcast
阅读量:5156 次
发布时间:2019-06-13

本文共 2235 字,大约阅读时间需要 7 分钟。

  Broadcast 广播变量:可以理解为是一个公共的共享变量,我们可以把一个dataset 或者不变的缓存对象(例如map list集合对象等)数据集广播出去,然后不同的任务在节点上都能够获取到,并在每个节点上只会存在一份,而不是在每个并发线程中存在。如果不使用broadcast,则在每个节点中的每个任务中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。

import org.apache.flink.api.common.functions.RichMapFunctionimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.configuration.Configurationimport scala.collection.mutable.ListBufferobject BatchDemoBroadcastScala {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    import org.apache.flink.api.scala._    //1: 准备需要广播的数据    val broadData = ListBuffer[Tuple2[String,Int]]()    broadData.append(("zs",18))    broadData.append(("ls",20))    broadData.append(("ww",17))    //1.1处理需要广播的数据    val tupleData = env.fromCollection(broadData)    val toBroadcastData = tupleData.map(tup=>{      Map(tup._1->tup._2)    })    val text = env.fromElements("zs","ls","ww")    val result = text.map(new RichMapFunction[String,String] {      var listData: java.util.List[Map[String,Int]] = null      var allMap  = Map[String,Int]()      override def open(parameters: Configuration): Unit = {        super.open(parameters)        this.listData = getRuntimeContext.getBroadcastVariable[Map[String,Int]]("broadcastMapName")        val it = listData.iterator()        while (it.hasNext){          val next = it.next()          allMap = allMap.++(next)        }      }      override def map(value: String) = {        val age = allMap.get(value).get        value+","+age      }    }).withBroadcastSet(toBroadcastData,"broadcastMapName")    result.print()  }}

1、设置广播变量

  在某个需要用到该广播变量的算子后调用withBroadcastSet(var1, var2)进行设置,var1为需要广播变量的变量名,var2是自定义变量名,为String类型。注意,被广播的变量只能为DataSet类型,不能为List、Int、String等类型。
2、

获取广播变量

创建该算子对应的富函数类,例如map函数的富函数类是RichMapFunction,该类有两个构造参数,第一个参数为算子输入数据类型,第二个参数为算子输出数据类型。首先创建一个Traversable[_]接口用于接收广播变量并初始化为空,接收类型与算子输入数据类型相对应;然后重写open函数,通过getRuntimeContext.getBroadcastVariable[_](var)获取到广播变量,var即为设置广播变量时的自定义变量名,类型为String,open函数在算子生命周期的初始化阶段便会调用;最后在map方法中对获取到的广播变量进行访问及其它操作。

 

参考:

https://blog.csdn.net/fct2001140269/article/details/84402798

https://blog.csdn.net/qq_34842671/article/details/80746593

转载于:https://www.cnblogs.com/linkmust/p/10901731.html

你可能感兴趣的文章
Redis面试题及分布式集群
查看>>
发短信的简单实现——C#版
查看>>
(算法)构造最大数
查看>>
命令行开发、编译、打包Android应用程序
查看>>
Java实现HTML页面转PDF解决方案(转)
查看>>
eclipse PHP开发环境配置
查看>>
Java 反射 使用总结
查看>>
nginx https配置
查看>>
在Linux环境下mysql的root密码忘记解决方法
查看>>
我的php命名规范
查看>>
关于关闭Eclipse的控制台自动跳出
查看>>
Docker入门系列(一):目标和安排
查看>>
爬虫开发.1爬虫介绍
查看>>
Kotlin定义静态变量、静态方法
查看>>
Kafka数据可靠性深度解读
查看>>
struts2基础---->自定义拦截器
查看>>
SDOI2009
查看>>
bzoj3255 一个关于序列的游戏
查看>>
JavaScript总结(四)
查看>>
华为企业互动社区云计算板块
查看>>