Spark Scala 从 rdd.foreachPartition 取回数据

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36951207/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:14:56  来源:igfitidea点击:

Spark Scala Get Data Back from rdd.foreachPartition

scalaapache-sparkspark-streamingscalikejdbc

提问by codeaperature

I have some code like this:

我有一些这样的代码:

      println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass)
      val lastRevs = distinctFileGidsRDD.
        foreachPartition(iter => {
          SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
          while(iter.hasNext) {
            val item = iter.next()
            //println(item(0))
            println("String: "+item(0).toString())
            val jsonStr = DB.readOnly { implicit session =>
              sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar".
                map { resultSet => resultSet.string(1) }.single.apply()
            }
            println("\nJSON: "+jsonStr)
          }
        })
      println("\nEND Last Revs Class: "+ lastRevs.getClass)

The code outputs (with heavy edits) something like:

代码输出(经过大量编辑)类似于:

BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
...
JSON: None()
END Last Revs Class: void

QUESTION 1: How can I get the lastRevs value to be in a useful format like the JSON string/null or an option like Some / None?

问题 1:如何使 lastRevs 值采用有用的格式,例如 JSON 字符串/null 或类似 Some / None 的选项?

QUESTION 2: My preference: IS there another way get at partitions data that an RDD-like format (rather than the iterator format)?

问题 2:我的偏好:是否有另一种方法可以获取类似 RDD 的格式(而不是迭代器格式)的分区数据?

dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  } 
}

from http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

来自http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

QUESTION 3: Is the method of getting data that I am using a sane method (given I am following the link above)? (Put aside the fact that this is a scalikejdbc system JDBC right now. This is going to be a key, value store of some type other than this prototype.)

问题 3:我使用的获取数据的方法是否合理(鉴于我正在关注上面的链接)?(撇开现在这是一个 scalikejdbc 系统 JDBC 的事实。这将是除此原型之外的某种类型的键值存储。)

采纳答案by maasg

To create a transformation that uses resources local to the executor (such as a DB or network connection), you should use rdd.mapPartitions. It allows to initialize some code locally to the executor and use those local resources to process the data in the partition.

要创建使用执行器本地资源(例如数据库或网络连接)的转换,您应该使用rdd.mapPartitions. 它允许在执行器本地初始化一些代码,并使用这些本地资源来处理分区中的数据。

The code should look like:

代码应如下所示:

 val lastRevs = distinctFileGidsRDD.
        mapPartitions{iter => 
          SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
          iter.map{ element => 
            DB.readOnly { implicit session =>
              sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${element(0)}::varchar"
              .map { resultSet => resultSet.string(1) }.single.apply()
            }
          }
        }