如何在带有 Java 的 SPARK 中使用映射函数
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/26817940/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to use map-function in SPARK with Java
提问by progNewbie
I try to read a csv-file in spark and I want to split the lines, which are comma-seperated, so that I have an RDD with a two dimensional Array. I am very new to Spark.
我尝试在 spark 中读取一个 csv 文件,我想拆分以逗号分隔的行,以便我有一个带有二维数组的 RDD。我对 Spark 很陌生。
I tried to do this:
我试图这样做:
public class SimpleApp
{
public static void main(String[] args) throws Exception
{
String master = "local[2]";
String csvInput = "/home/userName/Downloads/countrylist.csv";
String csvOutput = "/home/userName/Downloads/countrylist";
JavaSparkContext sc = new JavaSparkContext(master, "loadwholecsv", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> csvData = sc.textFile(csvInput, 1);
JavaRDD<String> words = csvData.map(new Function <List<String>>() { //line 43
@Override
public List<String> call(String s) {
return Arrays.asList(s.split("\s*,\s*"));
}
});
words.saveAsTextFile(csvOutput);
}
}
This should split the lines and return an ArrayList. But I am not sure about this. I get this error:
这应该拆分行并返回一个 ArrayList。但我不确定这一点。我收到此错误:
SimpleApp.java:[43,58] wrong number of type arguments; required 2
采纳答案by Holden
So there are a two small issues with the program. First is you probably want flatMap rather than map, since you are trying to return an RDD of words rather than an RDD of Lists of words, we can use flatMap to flatten the result. The other is, our function class also requires the type of the input it is called on. I'd replace the JavaRDD words... with:
所以这个程序有两个小问题。首先是您可能想要 flatMap 而不是 map,因为您试图返回单词的 RDD 而不是单词列表的 RDD,我们可以使用 flatMap 来展平结果。另一个是,我们的函数类还需要调用它的输入的类型。我会用以下内容替换 JavaRDD 词...:
JavaRDD<String> words = rdd.flatMap(
new FlatMapFunction<String, String>() { public Iterable<String> call(String s) {
return Arrays.asList(s.split("\s*,\s*"));
}});
回答by Harish Pathak
This is what you should do...
这是你应该做的...
//======Using flatMap(RDD of words)==============
JavaRDD<String> csvData = spark.textFile(GlobalConstants.STR_INPUT_FILE_PATH, 1);
JavaRDD<String> counts = csvData.flatMap(new FlatMapFunction<String, String>() {
//line 43
@Override
public Iterable<String> call(String s) {
return Arrays.asList(s.split("\s*,\s*"));
}
});
//======Using map(RDD of Lists of words)==============
JavaRDD<String> csvData = spark.textFile(GlobalConstants.STR_INPUT_FILE_PATH, 1);
JavaRDD<List<String>> counts = csvData.map(new Function <String, List<String>>() { //line 43
@Override
public List<String> call(String s) {
return Arrays.asList(s.split("\s*,\s*"));
}
});
//=====================================
counts.saveAsTextFile(GlobalConstants.STR_OUTPUT_FILE_PATH);
回答by Rzv Razvan
This is the sample of code from https://opencredo.com/data-analytics-using-cassandra-and-spark/tutorial in Java.
这是来自https://opencredo.com/data-analytics-using-cassandra-and-spark/教程的 Java代码示例。
Scala code :
斯卡拉代码:
/* 1*/ val includedStatuses = Set("COMPLETED", "REPAID")
/* 2*/ val now = new Date();
/* 3*/ sc.cassandraTable("cc", "cc_transactions")
/* 4*/ .select("customerid", "amount", "card", "status", "id")
/* 5*/ .where("id < minTimeuuid(?)", now)
/* 6*/ .filter(includedStatuses contains _.getString("status"))
/* 7*/ .keyBy(row => (row.getString("customerid"), row.getString("card")))
/* 8*/ .map { case (key, value) => (key, value.getInt("amount")) }
/* 9*/ .reduceByKey(_ + _)
/*10*/ .map { case ((customerid, card), balance) => (customerid, card, balance, now) }
/*11*/ .saveToCassandra("cc", "cc_balance", SomeColumns("customerid", "card", "balance", "updated_at"))
Java code :
Java代码:
SparkContextJavaFunctions functions = CassandraJavaUtil.javaFunctions(ProjectPropertie.context);
JavaRDD<Balance> balances = functions.cassandraTable(ProjectPropertie.KEY_SPACE, Transaction.TABLE_NAME)
.select("customerid", "amount", "card", "status", "id")
.where("id < minTimeuuid(?)", date)
.filter( row -> row.getString("status").equals("COMPLETED") )
.keyBy(row -> new Tuple2<>(row.getString("customerid"), row.getString("card")))
.mapToPair( row -> new Tuple2<>(row._1,row._2.getInt("amount")))
.reduceByKey( (i1,i2) -> i1.intValue()+i2.intValue())
.flatMap(new FlatMapFunction<Tuple2<Tuple2<String, String>, Integer>, Balance>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Iterator<Balance> call(Tuple2<Tuple2<String, String>, Integer> r) throws Exception {
List<Balance> list = new ArrayList<Balance>();
list.add(new Balance(r._1._1, r._1._2, r._2,reportDate));
return list.iterator();
}
}).cache();
Where ProjectPropertie.context
is SparkContext
Here is how you can get SparkContext (only one context per JVM you should use):
哪里ProjectPropertie.context
是SparkContext
这里是你如何能得到SparkContext(每个JVM你应该只使用一个上下文):
SparkConf conf = new SparkConf(true).setAppName("App_name").setMaster("local[2]").set("spark.executor.memory", "1g")
.set("spark.cassandra.connection.host", "127.0.0.1,172.17.0.2")
.set("spark.cassandra.connection.port", "9042")
.set("spark.cassandra.auth.username", "cassandra")
.set("spark.cassandra.auth.password", "cassandra");
SparkContext context = new SparkContext(conf);
For datasource I'm using Cassandra, where 172.17.0.2 is docker container where my Cassandra node is running and 127.0.0.1 is the host (in this case is local)
对于数据源,我使用的是 Cassandra,其中 172.17.0.2 是我的 Cassandra 节点正在运行的 docker 容器,而 127.0.0.1 是主机(在这种情况下是本地的)