scala Apache Spark 中的 DataFrame 相等性
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/31197353/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
DataFrame equality in Apache Spark
提问by Sim
Assume df1and df2are two DataFrames in Apache Spark, computed using two different mechanisms, e.g., Spark SQL vs. the Scala/Java/Python API.
假设df1和df2是DataFrameApache Spark中的两个s,使用两种不同的机制计算,例如 Spark SQL 与 Scala/Java/Python API。
Is there an idiomatic way to determine whether the two data frames are equivalent (equal, isomorphic), where equivalence is determined by the data (column names and column values for each row) being identical save for the ordering of rows & columns?
是否有一种惯用的方法来确定两个数据框是否相等(相等,同构),其中等价性是由数据(每行的列名和列值)除了行和列的排序之外是否相同来确定的?
The motivation for the question is that there are often many ways to compute some big data result, each with its own trade-offs. As one explores these trade-offs, it is important to maintain correctness and hence the need to check for the equivalence/equality on a meaningful test data set.
这个问题的动机是通常有很多方法可以计算一些大数据结果,每种方法都有自己的权衡。在探索这些权衡时,保持正确性很重要,因此需要检查有意义的测试数据集的等价性/相等性。
采纳答案by Holden
There are some standard ways in the Apache Spark test suites, however most of these involve collecting the data locally and if you want to do equality testing on large DataFrames then that is likely not a suitable solution.
Apache Spark 测试套件中有一些标准方法,但是其中大部分都涉及在本地收集数据,如果您想对大型 DataFrame 进行相等性测试,那么这可能不是一个合适的解决方案。
Checking the schema first and then you could do an intersection to df3 and verify that the count of df1,df2 & df3 are all equal (however this only works if there aren't duplicate rows, if there are different duplicates rows this method could still return true).
首先检查架构,然后您可以对 df3 进行交集并验证 df1、df2 和 df3 的计数是否全部相等(但是,这仅在没有重复行的情况下才有效,如果有不同的重复行,则此方法仍然可以返回真)。
Another option would be getting the underlying RDDs of both of the DataFrames, mapping to (Row, 1), doing a reduceByKey to count the number of each Row, and then cogrouping the two resulting RDDs and then do a regular aggregate and return false if any of the iterators are not equal.
另一种选择是获取两个 DataFrame 的底层 RDD,映射到 (Row, 1),执行 reduceByKey 来计算每行的数量,然后将两个结果 RDD 组合在一起,然后进行常规聚合并返回 false,如果任何迭代器都不相等。
回答by Nick Chammas
I don't know about idiomatic, but I think you can get a robust way to compare DataFrames as you describe as follows. (I'm using PySpark for illustration, but the approach carries across languages.)
我不知道惯用语,但我认为您可以获得一种稳健的方法来比较 DataFrames,如下所述。(我使用 PySpark 进行说明,但该方法适用于多种语言。)
a = spark.range(5)
b = spark.range(5)
a_prime = a.groupBy(sorted(a.columns)).count()
b_prime = b.groupBy(sorted(b.columns)).count()
assert a_prime.subtract(b_prime).count() == b_prime.subtract(a_prime).count() == 0
This approach correctly handles cases where the DataFrames may have duplicate rows, rows in different orders, and/or columns in different orders.
这种方法可以正确处理 DataFrame 可能具有重复行、不同顺序的行和/或不同顺序的列的情况。
For example:
例如:
a = spark.createDataFrame([('nick', 30), ('bob', 40)], ['name', 'age'])
b = spark.createDataFrame([(40, 'bob'), (30, 'nick')], ['age', 'name'])
c = spark.createDataFrame([('nick', 30), ('bob', 40), ('nick', 30)], ['name', 'age'])
a_prime = a.groupBy(sorted(a.columns)).count()
b_prime = b.groupBy(sorted(b.columns)).count()
c_prime = c.groupBy(sorted(c.columns)).count()
assert a_prime.subtract(b_prime).count() == b_prime.subtract(a_prime).count() == 0
assert a_prime.subtract(c_prime).count() != 0
This approach is quite expensive, but most of the expense is unavoidable given the need to perform a full diff. And this should scale fine as it doesn't require collecting anything locally. If you relax the constraint that the comparison should account for duplicate rows, then you can drop the groupBy()and just do the subtract(), which would probably speed things up notably.
这种方法非常昂贵,但考虑到需要执行完整的差异,大部分费用是不可避免的。这应该可以很好地扩展,因为它不需要在本地收集任何东西。如果您放宽比较应考虑重复行的约束,那么您可以删除groupBy()并只执行subtract(),这可能会显着加快速度。
回答by Powers
The spark-fast-testslibrary has two methods for making DataFrame comparisons (I'm the creator of the library):
该火花快速测试库中有使数据帧比较(我是图书馆的创建者)两种方法:
The assertSmallDataFrameEqualitymethod collects DataFrames on the driver node and makes the comparison
该assertSmallDataFrameEquality方法在驱动程序节点上收集DataFrames并进行比较
def assertSmallDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = {
if (!actualDF.schema.equals(expectedDF.schema)) {
throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF))
}
if (!actualDF.collect().sameElements(expectedDF.collect())) {
throw new DataFrameContentMismatch(contentMismatchMessage(actualDF, expectedDF))
}
}
The assertLargeDataFrameEqualitymethod compares DataFrames spread on multiple machines (the code is basically copied from spark-testing-base)
该assertLargeDataFrameEquality方法比较分布在多台机器上的DataFrames(代码基本上是从spark-testing-base复制的)
def assertLargeDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = {
if (!actualDF.schema.equals(expectedDF.schema)) {
throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF))
}
try {
actualDF.rdd.cache
expectedDF.rdd.cache
val actualCount = actualDF.rdd.count
val expectedCount = expectedDF.rdd.count
if (actualCount != expectedCount) {
throw new DataFrameContentMismatch(countMismatchMessage(actualCount, expectedCount))
}
val expectedIndexValue = zipWithIndex(actualDF.rdd)
val resultIndexValue = zipWithIndex(expectedDF.rdd)
val unequalRDD = expectedIndexValue
.join(resultIndexValue)
.filter {
case (idx, (r1, r2)) =>
!(r1.equals(r2) || RowComparer.areRowsEqual(r1, r2, 0.0))
}
val maxUnequalRowsToShow = 10
assertEmpty(unequalRDD.take(maxUnequalRowsToShow))
} finally {
actualDF.rdd.unpersist()
expectedDF.rdd.unpersist()
}
}
assertSmallDataFrameEqualityis faster for small DataFrame comparisons and I've found it sufficient for my test suites.
assertSmallDataFrameEquality对于小型 DataFrame 比较来说更快,我发现它对于我的测试套件来说已经足够了。
回答by user1442346
Java:
爪哇:
assert resultDs.union(answerDs).distinct().count() == resultDs.intersect(answerDs).count();
回答by EnricoM
A scalable and easy way is to diff the two DataFrames and count the non-matching rows:
一种可扩展且简单的方法是比较两个DataFrames 并计算不匹配的行:
df1.diff(df2).where($"diff" != "N").count
If that number is not zero, then the two DataFrames are not equivalent.
如果该数字不为零,则两个DataFrames 不等价。
The difftransformation is provided by spark-extension.
该diff转换由提供火花扩展。
It identifies Inserted, Changed, Deleted and uN-changed rows.
它确定我nserted,Ç上吊,deleted和u Ñ-changed行。
回答by Herman van Hovell
You can do this using a little bit of deduplication in combination with a full outer join. The advantage of this approach is that it does not require you to collect results to the driver, and that it avoids running multiple jobs.
您可以使用一点重复数据删除和完整的外部联接来完成此操作。这种方法的优点是不需要您向驱动程序收集结果,并且避免运行多个作业。
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
// Generate some random data.
def random(n: Int, s: Long) = {
spark.range(n).select(
(rand(s) * 10000).cast("int").as("a"),
(rand(s + 5) * 1000).cast("int").as("b"))
}
val df1 = random(10000000, 34)
val df2 = random(10000000, 17)
// Move all the keys into a struct (to make handling nulls easy), deduplicate the given dataset
// and count the rows per key.
def dedup(df: Dataset[Row]): Dataset[Row] = {
df.select(struct(df.columns.map(col): _*).as("key"))
.groupBy($"key")
.agg(count(lit(1)).as("row_count"))
}
// Deduplicate the inputs and join them using a full outer join. The result can contain
// the following things:
// 1. Both keys are not null (and thus equal), and the row counts are the same. The dataset
// is the same for the given key.
// 2. Both keys are not null (and thus equal), and the row counts are not the same. The dataset
// contains the same keys.
// 3. Only the right key is not null.
// 4. Only the left key is not null.
val joined = dedup(df1).as("l").join(dedup(df2).as("r"), $"l.key" === $"r.key", "full")
// Summarize the differences.
val summary = joined.select(
count(when($"l.key".isNotNull && $"r.key".isNotNull && $"r.row_count" === $"l.row_count", 1)).as("left_right_same_rc"),
count(when($"l.key".isNotNull && $"r.key".isNotNull && $"r.row_count" =!= $"l.row_count", 1)).as("left_right_different_rc"),
count(when($"l.key".isNotNull && $"r.key".isNull, 1)).as("left_only"),
count(when($"l.key".isNull && $"r.key".isNotNull, 1)).as("right_only"))
summary.show()
回答by Asher A
Try doing the following:
尝试执行以下操作:
df1.except(df2).isEmpty
回答by J. P
try {
return ds1.union(ds2)
.groupBy(columns(ds1, ds1.columns()))
.count()
.filter("count % 2 > 0")
.count()
== 0;
} catch (Exception e) {
return false;
}
Column[] columns(Dataset<Row> ds, String... columnNames) {
List<Column> l = new ArrayList<>();
for (String cn : columnNames) {
l.add(ds.col(cn));
}
return l.stream().toArray(Column[]::new);}
columns method is supplementary and can be replaced by any method that returns Seq
columns 方法是补充方法,可以被任何返回 Seq 的方法替换
Logic:
逻辑:
- Union both the datasets, if columns are not matching, it will throw an exception and hence return false.
- If columns are matching then groupBy on all columns and add a column count. Now, all the rows have count in the multiple of 2 (even for duplicate rows).
- Check if there is any row that has count not divisible by 2, those are the extra rows.
- 联合两个数据集,如果列不匹配,它将抛出异常并因此返回 false。
- 如果列匹配,则对所有列进行 groupBy 并添加列数。现在,所有行的计数都是 2 的倍数(即使是重复的行)。
- 检查是否有任何行的计数不能被 2 整除,这些是额外的行。

