使用 Spark Scala 计算平均值
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/44987336/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Calculate average using Spark Scala
提问by akrockz
How do I calculate the Average salary per location in Spark Scala with below two data sets ?
如何使用以下两个数据集计算 Spark Scala 中每个位置的平均工资?
File1.csv(Column 4 is salary)
File1.csv(第4列是工资)
Ram, 30, Engineer, 40000
Bala, 27, Doctor, 30000
Hari, 33, Engineer, 50000
Siva, 35, Doctor, 60000
File2.csv(Column 2 is location)
File2.csv(第2列是位置)
Hari, Bangalore
Ram, Chennai
Bala, Bangalore
Siva, Chennai
The above files are not sorted. Need to join these 2 files and find average salary per location. I tried with below code but unable to make it.
以上文件未排序。需要加入这 2 个文件并找到每个位置的平均工资。我尝试使用以下代码但无法实现。
val salary = sc.textFile("File1.csv").map(e => e.split(","))
val location = sc.textFile("File2.csv").map(e.split(","))
val joined = salary.map(e=>(e(0),e(3))).join(location.map(e=>(e(0),e(1)))
val joinedData = joined.sortByKey()
val finalData = joinedData.map(v => (v._1,v._2._1._1,v._2._2))
val aggregatedDF = finalData.map(e=> e.groupby(e(2)).agg(avg(e(1))))
aggregatedDF.repartition(1).saveAsTextFile("output.txt")
Please help with code and sample output how it will look.
请帮助提供代码和示例输出它的外观。
Many Thanks
非常感谢
回答by Leo C
You can read the CSV files as DataFrames, then join and group them to get the averages:
您可以将 CSV 文件作为 DataFrame 读取,然后将它们加入并分组以获得平均值:
val df1 = spark.read.csv("/path/to/file1.csv").toDF(
"name", "age", "title", "salary"
)
val df2 = spark.read.csv("/path/to/file2.csv").toDF(
"name", "location"
)
import org.apache.spark.sql.functions._
val dfAverage = df1.join(df2, Seq("name")).
groupBy(df2("location")).agg(avg(df1("salary")).as("average")).
select("location", "average")
dfAverage.show
+-----------+-------+
| location|average|
+-----------+-------+
|Bangalore |40000.0|
| Chennai |50000.0|
+-----------+-------+
[UPDATE] For calculating average dimensions:
[更新] 用于计算平均尺寸:
// file1.csv:
Ram,30,Engineer,40000,600*200
Bala,27,Doctor,30000,800*400
Hari,33,Engineer,50000,700*300
Siva,35,Doctor,60000,600*200
// file2.csv
Hari,Bangalore
Ram,Chennai
Bala,Bangalore
Siva,Chennai
val df1 = spark.read.csv("/path/to/file1.csv").toDF(
"name", "age", "title", "salary", "dimensions"
)
val df2 = spark.read.csv("/path/to/file2.csv").toDF(
"name", "location"
)
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
val dfAverage = df1.join(df2, Seq("name")).
groupBy(df2("location")).
agg(
avg(split(df1("dimensions"), ("\*")).getItem(0).cast(IntegerType)).as("avg_length"),
avg(split(df1("dimensions"), ("\*")).getItem(1).cast(IntegerType)).as("avg_width")
).
select(
$"location", $"avg_length", $"avg_width",
concat($"avg_length", lit("*"), $"avg_width").as("avg_dimensions")
)
dfAverage.show
+---------+----------+---------+--------------+
| location|avg_length|avg_width|avg_dimensions|
+---------+----------+---------+--------------+
|Bangalore| 750.0| 350.0| 750.0*350.0|
| Chennai| 600.0| 200.0| 600.0*200.0|
+---------+----------+---------+--------------+
回答by Assaf Mendelson
I would use dataframes: First read the dataframes such as:
我会使用数据帧:首先读取数据帧,例如:
val salary = spark.read.option("header", "true").csv("File1.csv")
val location = spark.read.option("header", "true").csv("File2.csv")
if you don't have headers you would need to set the option to "false" and use withColumnRenamed to change the default names.
如果您没有标题,则需要将该选项设置为“false”并使用 withColumnRenamed 来更改默认名称。
val salary = spark.read.option("header", "false").csv("File1.csv").toDF("name", "age", "job", "salary")
val location = spark.read.option("header", "false").csv("File2.csv").toDF("name", "location")
now do the join:
现在加入:
val joined = salary.join(location, "name")
lastly do the average calculation:
最后做平均计算:
val avg = joined.groupby("location").agg(avg($"salary"))
to save do:
保存做:
avg.repartition(1).write.csv("output.csv")
回答by Raphael Roth
I would use DataFrame API, this should work:
我会使用 DataFrame API,这应该可以工作:
val salary = sc.textFile("File1.csv")
.map(e => e.split(","))
.map{case Seq(name,_,_,salary) => (name,salary)}
.toDF("name","salary")
val location = sc.textFile("File2.csv")
.map(e => e.split(","))
.map{case Seq(name,location) => (name,location)}
.toDF("name","location")
import org.apache.spark.sql.functions._
salary
.join(location,Seq("name"))
.groupBy($"location")
.agg(
avg($"salary").as("avg_salary")
)
.repartition(1)
.write.csv("output.csv")
回答by Harald Gliebe
You could do something like this:
你可以这样做:
val salary = sc.textFile("File1.csv").map(_.split(",").map(_.trim))
val location = sc.textFile("File2.csv").map(_.split(",").map(_.trim))
val joined = salary.map(e=>(e(0),e(3).toInt)).join(location.map(e=>(e(0),e(1))))
val locSalary = joined.map(v => (v._2._2, v._2._1))
val averages = locSalary.aggregateByKey((0,0))((t,e) => (t._1 + 1, t._2 + e),
(t1,t2) => (t1._1 + t2._1, t1._2 + t2._2)).mapValues(t => t._2/t._1)
then averages.take(10)will give:
然后averages.take(10)会给:
res5: Array[(String, Int)] = Array((Chennai,50000), (Bangalore,40000))

