scala 在 Spark 中将 Dataframe 转换为 Map(Key-Value)

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36239791/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:06:45  来源:igfitidea点击:

Convert Dataframe to a Map(Key-Value) in Spark

scaladictionaryapache-spark

提问by Abhinav Bhardwaj

So, I have a DataFrame in Spark which looks like this:

所以,我在 Spark 中有一个 DataFrame,它看起来像这样:

It has 30 columns: only showing some of them!

它有 30 列:只显示其中的一部分!

[ABCD,color,NORMAL,N,2015-02-20,1]
[XYZA,color,NORMAL,N,2015-05-04,1]
[GFFD,color,NORMAL,N,2015-07-03,1]
[NAAS,color,NORMAL,N,2015-08-26,1]
[LOWW,color,NORMAL,N,2015-09-26,1]
[KARA,color,NORMAL,N,2015-11-08,1]
[ALEQ,color,NORMAL,N,2015-12-04,1]
[VDDE,size,NORMAL,N,2015-12-23,1]
[QWER,color,NORMAL,N,2016-01-18,1]
[KDSS,color,NORMAL,Y,2015-08-29,1]
[KSDS,color,NORMAL,Y,2015-08-29,1]
[ADSS,color,NORMAL,Y,2015-08-29,1]
[BDSS,runn,NORMAL,Y,2015-08-29,1]
[EDSS,color,NORMAL,Y,2015-08-29,1]

So, I have to convert this dataFrame into a key-Value Pair in Scala, using the key as some of the columns in the Dataframe and assigning unique values to those keys from index 0 to the count(distinct number of keys).

因此,我必须将此数据帧转换为 Scala 中的键值对,使用键作为数据帧中的某些列,并将唯一值分配给从索引 0 到计数(不同键数)的那些键。

For example: using the case above, I want to have an output in a map(key-value) collection in Scala like this:

例如:使用上面的案例,我想在 Scala 中的 map(key-value) 集合中有一个输出,如下所示:

    ([ABC_color_NORMAL_N_1->0]
    [XYZA_color_NORMAL_N_1->1]
    [GFFD_color_NORMAL_N_1->2]
    [NAAS_color_NORMAL_N_1->3]
    [LOWW_color_NORMAL_N_1->4]
    [KARA_color_NORMAL_N_1->5]
    [ALEQ_color_NORMAL_N_1->6]
    [VDDE_size_NORMAL_N_1->7]
    [QWER_color_NORMAL_N_1->8]
    [KDSS_color_NORMAL_Y_1->9]
    [KSDS_color_NORMAL_Y_1->10]
    [ADSS_color_NORMAL_Y_1->11]
    [BDSS_runn_NORMAL_Y_1->12]
    [EDSS_color_NORMAL_Y_1->13]
    )

I'm new to Scala and Spark and I tried doing something Like this.

我是 Scala 和 Spark 的新手,我尝试做这样的事情。

 var map: Map[String, Int] = Map()
    var i = 0
    dataframe.foreach( record =>{
    //Is there a better way of creating a key!
        val key = record(0) + record(1) + record(2) + record(3)
        var index = i
        map += (key -> index)
        i+=1
          }
        )

But, this is not working.:/ The Map is null after this completes.

但是,这不起作用。:/ 完成后,地图为空。

回答by Tzach Zohar

The main issue in your code is trying to modifya variable created on driver-side within code executed on the workers. When using Spark, you can use driver-side variables within RDD transformations only as "read only" values.

您代码中的主要问题是尝试修改workers上执行的代码中在驱动程序端创建的变量。使用 Spark 时,您只能将 RDD 转换中的驱动程序端变量用作“只读”值。

Specifically:

具体来说:

  • The map is created on the driver machine
  • The map (with its initial, empty value) is serializedand sent to worker nodes
  • Each node might change the map (locally)
  • Result is just thrown away when foreachis done - result is notsent back to driver.
  • 地图是在驱动机器上创建的
  • 映射(带有初始的空值)被序列化并发送到工作节点
  • 每个节点可能会更改地图(本地)
  • 结果只是在foreach完成后被丢弃- 结果不会发送回驱动程序。

To fix this - you should choose a transformation that returns a changed RDD (e.g. map) to create the keys, use zipWithIndexto add the running "ids", and then use collectAsMapto get all the data back to the driver as a Map:

要解决此问题 - 您应该选择一个返回更改后的 RDD(例如map)的转换来创建键,用于zipWithIndex添加正在运行的“id”,然后用于collectAsMap将所有数据作为 Map 返回给驱动程序:

val result: Map[String, Long] = dataframe
  .map(record => record(0) + record(1) + record(2) + record(3))
  .zipWithIndex()
  .collectAsMap()

As for the key creation itself - assuming you want to include first 5 columns, and add a separator (_) between them, you can use:

至于密钥创建本身 - 假设您要包含前 5 列,并_在它们之间添加分隔符 ( ),您可以使用:

record => record.toList.take(5).mkString("_")