scala Spark 数据帧中的序列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/40177895/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:46:25  来源:igfitidea点击:

Sequences in Spark dataframe

scalaapache-sparkdataframespark-dataframe

提问by Tim

I have dataframe in Spark. Looks like this:

我在 Spark 中有数据框。看起来像这样:

+-------+----------+-------+
|  value|     group|     ts|
+-------+----------+-------+
|      A|         X|      1|
|      B|         X|      2|
|      B|         X|      3|
|      D|         X|      4|
|      E|         X|      5|
|      A|         Y|      1|
|      C|         Y|      2|
+-------+----------+-------+

Endgoal: I'd like to find how many sequences A-B-E(a sequence is just a list of subsequent rows) there are. With the added constraint that subsequent parts of the sequence can be maximum nrows apart. Let's consider for this example that nis 2.

最终目标:我想找出有多少个序列A-B-E(一个序列只是一个后续行的列表)。添加的约束是序列的后续部分可以最大n行分开。让我们考虑这个例子n是 2。

Consider group X. In this case there is exactly 1 Dbetween Band E(multiple consecutive Bs are ignored). Which means Band Eare 1 row apart and thus there is a sequence A-B-E

考虑组X。在这种情况下,存在完全1D之间BE(多个连续Bs的忽略)。这意味着BE相距 1 行,因此有一个序列A-B-E

I have thought about using collect_list(), creating a string (like DNA) and using substring search with regex. But I was wondering if there's a more elegant distributed way, perhaps using window functions?

我考虑过使用collect_list(),创建一个字符串(如 DNA)并使用带有正则表达式的子字符串搜索。但我想知道是否有更优雅的分布式方式,也许使用窗口函数?

Edit:

编辑:

Note that the provided dataframe is just an example. The real dataframe (and thus groups) can be arbitrary long.

请注意,提供的数据框只是一个示例。真实的数据帧(以及组)可以是任意长的。

采纳答案by Wilmerton

Edited to answer @Tim's comment + fix patterns of the type "AABE"

编辑以回答@Tim 的评论 + 修复“AABE”类型的模式

Yep, using a window function helps, but I created an idto have an ordering:

是的,使用窗口函数有帮助,但我创建了id一个排序:

val df = List(
  (1,"A","X",1),
  (2,"B","X",2),
  (3,"B","X",3),
  (4,"D","X",4),
  (5,"E","X",5),
  (6,"A","Y",1),
  (7,"C","Y",2)
).toDF("id","value","group","ts")

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('group).orderBy('id)

Then lag will collect what is needed, but a function is required to generate the Columnexpression (note the split to eliminate double counting of "AABE". WARNING: this rejects patterns of the type "ABAEXX"):

然后滞后将收集所需的内容,但需要一个函数来生成Column表达式(注意拆分以消除“AABE”的重复计数。警告:这拒绝了“ABAEXX”类型的模式):

def createSeq(m:Int) = split(
  concat(
    (1 to 2*m)
      .map(i => coalesce(lag('value,-i).over(w),lit("")))
  :_*),"A")(0)


val m=2
val tmp = df
  .withColumn("seq",createSeq(m))

+---+-----+-----+---+----+
| id|value|group| ts| seq|
+---+-----+-----+---+----+
|  6|    A|    Y|  1|   C|
|  7|    C|    Y|  2|    |
|  1|    A|    X|  1|BBDE|
|  2|    B|    X|  2| BDE|
|  3|    B|    X|  3|  DE|
|  4|    D|    X|  4|   E|
|  5|    E|    X|  5|    |
+---+-----+-----+---+----+

Because of the poor set of collection functions available in the ColumnAPI, avoiding regex altogether is much easier using a UDF

由于ColumnAPI 中可用的集合函数集很差,因此使用 UDF 更容易完全避免正则表达式

def patternInSeq(m: Int) = udf((str: String) => {
  var notFound = str
    .split("B")
    .filter(_.contains("E"))
    .filter(_.indexOf("E") <= m)
    .isEmpty
  !notFound
})

val res = tmp
  .filter(('value === "A") && (locate("B",'seq) > 0))
  .filter(locate("B",'seq) <= m && (locate("E",'seq) > 1))
  .filter(patternInSeq(m)('seq))
  .groupBy('group)
  .count
res.show

+-----+-----+
|group|count|
+-----+-----+
|    X|    1|
+-----+-----+

Generalisation (out of scope)

泛化(超出范围)

If you want to generalise it sequence of letter that are longer, the question has to be generalised. It could be trivial, but in this case a pattern of the type ("ABAE") should be rejected (see comments). So the easiest way to generalise is to have a pair-wise rule as in the following implementation (I added a group "Z" to illustrate the behaviour of this algo)

如果您想概括较长的字母序列,则必须概括该问题。这可能是微不足道的,但在这种情况下,应该拒绝类型(“ABAE”)的模式(见评论)。所以最简单的概括方法是在下面的实现中使用成对规则(我添加了一个组“Z”来说明这个算法的行为)

val df = List(
  (1,"A","X",1),
  (2,"B","X",2),
  (3,"B","X",3),
  (4,"D","X",4),
  (5,"E","X",5),
  (6,"A","Y",1),
  (7,"C","Y",2),
  ( 8,"A","Z",1),
  ( 9,"B","Z",2),
  (10,"D","Z",3),
  (11,"B","Z",4),
  (12,"E","Z",5)
).toDF("id","value","group","ts")

First we define the logic for a pair

首先我们定义一对的逻辑

import org.apache.spark.sql.DataFrame
def createSeq(m:Int) = array((0 to 2*m).map(i => coalesce(lag('value,-i).over(w),lit(""))):_*)
def filterPairUdf(m: Int, t: (String,String)) = udf((ar: Array[String]) => {
  val (a,b) = t
  val foundAt = ar
    .dropWhile(_ != a)
    .takeWhile(_ != a)
    .indexOf(b)
  foundAt != -1 && foundAt <= m
})

Then we define a function that applies this logic is applied iteratively on the dataframe

然后我们定义一个函数,该函数将这个逻辑迭代地应用于数据帧

def filterSeq(seq: List[String], m: Int)(df: DataFrame): DataFrame = {
  var a = seq(0)
  seq.tail.foldLeft(df){(df: DataFrame, b: String) => {
    val res  = df.filter(filterPairUdf(m,(a,b))('seq))
    a = b
    res
  }}
}

A simplification and optimisation is obtained because we first filter on sequence beginning with the first character

获得了简化和优化,因为我们首先过滤从第一个字符开始的序列

val m = 2
val tmp = df
  .filter('value === "A") // reduce problem
  .withColumn("seq",createSeq(m))

scala> tmp.show()
+---+-----+-----+---+---------------+
| id|value|group| ts|            seq|
+---+-----+-----+---+---------------+
|  6|    A|    Y|  1|   [A, C, , , ]|
|  8|    A|    Z|  1|[A, B, D, B, E]|
|  1|    A|    X|  1|[A, B, B, D, E]|
+---+-----+-----+---+---------------+

val res = tmp.transform(filterSeq(List("A","B","E"),m))

scala> res.show()
+---+-----+-----+---+---------------+
| id|value|group| ts|            seq|
+---+-----+-----+---+---------------+
|  1|    A|    X|  1|[A, B, B, D, E]|
+---+-----+-----+---+---------------+

(transformis a simple sugar-coating of DataFrame => DataFrametransformation)

transform是简单的糖衣DataFrame => DataFrame转化)

res
  .groupBy('group)
  .count
  .show

+-----+-----+
|group|count|
+-----+-----+
|    X|    1|
+-----+-----+

As I said, there are different way to generalise the "resetting rules" when scanning a sequence,but this exemple hopefully helps in the implementation of more complex ones.

正如我所说,在扫描序列时有不同的方法可以概括“重置规则”,但是这个示例有望帮助实现更复杂的规则。