scala 如何更新火花流中的广播变量?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33372264/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:45:13  来源:igfitidea点击:

How can I update a broadcast variable in spark streaming?

javascalaapache-sparkspark-streamingbroadcast

提问by Andrew Stubbs

I have, I believe, a relatively common use case for spark streaming:

我相信,我有一个相对常见的 Spark 流用例:

I have a stream of objects that I would like to filter based on some reference data

我有一个对象流,我想根据一些参考数据进行过滤

Initially, I thought that this would be a very simple thing to achieve using a Broadcast Variable:

最初,我认为使用广播变量实现这将是一件非常简单的事情:

public void startSparkEngine {
    Broadcast<ReferenceData> refdataBroadcast
      = sparkContext.broadcast(getRefData());

    final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
        final ReferenceData refData = refdataBroadcast.getValue();
        return obj.getField().equals(refData.getField());
    }

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
            // Final processing of filtered objects
        });
        return null;
    });
}

However, albeit infrequently, my reference data will change periodically

但是,尽管很少,我的参考数据会定期更改

I was under the impression that I could modify and re-broadcastmy variable on the driver and it would be propagated to each of the workers, however the Broadcastobject is not Serializableand needs to be final.

我的印象是我可以在驱动程序上修改和重新广播我的变量,它会传播给每个工作人员,但是Broadcast对象不是Serializable并且需要是final.

What alternatives do I have? The three solutions I can think of are:

我有哪些选择?我能想到的三个解决方案是:

  1. Move the reference data lookup into a forEachPartitionor forEachRddso that it resides entirely on the workers. However the reference data lives beind a REST API so I would also need to somehow store a timer / counter to stop the remote being accessed for every element in the stream.

  2. Restart the Spark Context every time the refdata changes, with a new Broadcast Variable.

  3. Convert the Reference Data to an RDD, then jointhe streams in such a way that I am now streaming Pair<MyObject, RefData>, though this will ship the reference data with every object.

  1. 将参考数据查找移动到一个forEachPartitionor 中,forEachRdd以便它完全驻留在工作人员身上。然而,参考数据存在于 REST API 中,因此我还需要以某种方式存储一个计时器/计数器,以停止对流中每个元素的远程访问。

  2. 每次 refdata 更改时,使用新的广播变量重新启动 Spark 上下文。

  3. 将参考数据转换为RDD,然后join以我现在正在流式传输的方式传输流Pair<MyObject, RefData>,尽管这将随每个对象一起传送参考数据。

回答by Aastha

Extending the answer By @Rohan Aletty. Here is a sample code of a BroadcastWrapper that refresh broadcast variable based on some ttl

扩展答案@Rohan Aletty。这是一个基于某些 ttl 刷新广播变量的 BroadcastWrapper 示例代码

public class BroadcastWrapper {

    private Broadcast<ReferenceData> broadcastVar;
    private Date lastUpdatedAt = Calendar.getInstance().getTime();

    private static BroadcastWrapper obj = new BroadcastWrapper();

    private BroadcastWrapper(){}

    public static BroadcastWrapper getInstance() {
        return obj;
    }

    public JavaSparkContext getSparkContext(SparkContext sc) {
       JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
       return jsc;
    }

    public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
        Date currentDate = Calendar.getInstance().getTime();
        long diff = currentDate.getTime()-lastUpdatedAt.getTime();
        if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
            if (var != null)
               var.unpersist();
            lastUpdatedAt = new Date(System.currentTimeMillis());

            //Your logic to refresh
            ReferenceData data = getRefData();

            var = getSparkContext(sparkContext).broadcast(data);
       }
       return var;
   }
}

Your code would look like :

您的代码如下所示:

public void startSparkEngine() {

    final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {
        Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());

        stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
    });

    filteredStream.foreachRDD(rdd -> {
        rdd.foreach(obj -> {
        // Final processing of filtered objects
        });
        return null;
    });
}

This worked for me on multi-cluster as well. Hope this helps

这对我来说也适用于多集群。希望这可以帮助

回答by Ravi Reddy

Almost every one that is dealing with streaming applications need a way to weave (filter, lookup etc) reference data (from DB, files etc) into the streaming data. We have a partial solution of the whole two parts

几乎每个处理流应用程序的人都需要一种方法来将(过滤、查找等)参考数据(来自数据库、文件等)编织到流数据中。我们有整个两部分的部分解决方案

  1. Lookup reference data to be used in streaming operations

    • create CacheLookup object with desired cache TTL
    • wrap that in Broadcast
    • use CacheLookup as part of streaming logic
  1. 查找要在流操作中使用的参考数据

    • 使用所需的缓存 TTL 创建 CacheLookup 对象
    • 将其包装在广播中
    • 使用 CacheLookup 作为流逻辑的一部分

For most part this works fine, except for the following

在大多数情况下,这可以正常工作,但以下情况除外

  1. Update the reference data

    There is no definitive way achieve this despite the suggestions in these threads, i.e: kill the previous broadcast variable and create new one. Multiple unknowns like what to be expected between these operations.

  1. 更新参考数据

    尽管在这些线程中提出了建议,但没有确定的方法可以实现这一点,即:杀死先前的广播变量并创建新的变量。多个未知数,例如这些操作之间的预期结果。

This is such a common need, it would have helped if there is a way to send info to broadcast variable informing update. With that, it is possible to invalidate the local caches in "CacheLookup"

这是一个如此普遍的需求,如果有一种方法可以将信息发送到广播变量通知更新,它会有所帮助。这样,就可以使“CacheLookup”中的本地缓存无效

The second portion of the problem is still not solved. I would be interested if there is any viable approach to this

问题的第二部分仍然没有解决。如果有任何可行的方法,我会感兴趣

回答by Ram Ghadiyaram

Recently faced issue with this. Thought it might be helpful for scala users..

最近遇到了这个问题。认为它可能对 Scala 用户有帮助..

Scala way of doing BroadCastWrapperis like below example.

Scala 的做法BroadCastWrapper就像下面的例子。

import java.io.{ ObjectInputStream, ObjectOutputStream }
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.StreamingContext
import scala.reflect.ClassTag

/* wrapper lets us update brodcast variables within DStreams' foreachRDD
 without running into serialization issues */
case class BroadcastWrapper[T: ClassTag](
 @transient private val ssc: StreamingContext,
  @transient private val _v: T) {

  @transient private var v = ssc.sparkContext.broadcast(_v)

  def update(newValue: T, blocking: Boolean = false): Unit = {

    v.unpersist(blocking)
    v = ssc.sparkContext.broadcast(newValue)
  }

  def value: T = v.value

  private def writeObject(out: ObjectOutputStream): Unit = {
    out.writeObject(v)
  }

  private def readObject(in: ObjectInputStream): Unit = {
    v = in.readObject().asInstanceOf[Broadcast[T]]
  }
}

Every time you need to call update function to get new broadcast variable.

每次您需要调用更新函数来获取新的广播变量。

回答by Rohan Aletty

Not sure if you've tried this already but I think an update to a broadcast variable may be achieved without shutting down the SparkContext. Through use of the unpersist()method, copies of the broadcast variable are deleted on each executor and would need to be the variable would need to be rebroadcast in order to be accessed again. For your use case, when you want to update your broadcast, you can:

不确定您是否已经尝试过,但我认为可以在不关闭SparkContext. 通过使用该unpersist()方法,广播变量的副本在每个执行器上被删除,并且需要重新广播的变量才能被再次访问。对于您的用例,当您想要更新广播时,您可以:

  1. Wait for your executors to finish on a current series of data

  2. Unpersist the broadcast variable

  3. Update the broadcast variable

  4. Rebroadcast to send the new reference data to the executors

  1. 等待您的执行者完成当前的一系列数据

  2. 取消持久化广播变量

  3. 更新广播变量

  4. 重新广播以将新的参考数据发送给执行者

I'm drawing pretty heavily from this postbut the person who made the last reply claimed to have gotten it working locally. It's important to note that you probably want to set blocking to trueon the unpersist so that you can be sure executors are rid of the old data (so the stale values won't be read again on the next iteration).

我从这篇文章中大量借鉴,但最后回复的人声称已在本地工作。重要的是要注意,您可能希望将阻塞设置true为 unpersist,以便您可以确保执行程序清除旧数据(因此在下一次迭代中不会再次读取陈旧值)。