在 Spark java 中将 JavaRDD 转换为 DataFrame

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41302666/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 23:16:42  来源:igfitidea点击:

Converting JavaRDD to DataFrame in Spark java

javaapache-sparkhadoopapache-spark-sql

提问by Satish Karuturi

I am trying to process the LogFile. first i read the log file and split these file as per my requirement and saved each column into separate JavaRDD. Now i need to convert these JavaRDD's to DataFrames for future operations. This is the code what i tried so far:

我正在尝试处理日志文件。首先,我读取日志文件并根据我的要求拆分这些文件,并将每一列保存到单独的 JavaRDD 中。现在我需要将这些 JavaRDD 转换为 DataFrames 以供将来操作。这是我到目前为止尝试过的代码:

         SparkConf conf = new SparkConf().setAppName("AuctionBid").setMaster("local");
         JavaSparkContext sc = new JavaSparkContext(conf);
         JavaRDD<String> diskfile = sc.textFile("/Users/karuturi/Downloads/log.txt");
         JavaRDD<String> urlrdd=diskfile.flatMap(line -> Arrays.asList(line.split("\t")[0]));
         System.out.println(urlrdd.take(1));
         SQLContext sql = new SQLContext(sc);

and this is the way how i am trying to convert JavaRDD into DataFrame:

这就是我尝试将 JavaRDD 转换为 DataFrame 的方式:

DataFrame fileDF = sqlContext.createDataFrame(urlRDD, Model.class);

But the above line is not working.I confusing about Model.class.

但是上面的行不起作用。我对 Model.class 感到困惑。

can anyone suggest me.

任何人都可以建议我。

Thanks.

谢谢。

采纳答案by mrsrinivas

Imports:

进口:

import java.io.Serializable;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Create a POJO class for URL. I'd recommend you to write for Log line which consists of url, date, time, method, target,.. etc as members

为 URL 创建一个 POJO 类。我建议您为由 url、日期、时间、方法、目标等组成的日志行编写成员

public static class Url implements Serializable {
  private String value;

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }
}  

Create an RDD of Url objects from a text file

从文本文件创建 Url 对象的 RDD

JavaRDD<Url> urlsRDD = spark.read()
  .textFile("/Users/karuturi/Downloads/log.txt")
  .javaRDD()
  .map(new Function<String, Url>() {
    @Override
    public Url call(String line) throws Exception {
      String[] parts = line.split("\t");
      Url url = new Url();
      url.setValue(parts[0].replaceAll("[", ""));
      return url;
    }
  });

Create DataFrame from RDD

从 RDD 创建 DataFrame

Dataset<Row> urlsDF = spark.createDataFrame(urlsRDD, Url.class);

RDD to DataFrame - Spark 2.0
RDD to DataFrame - Spark 1.6

RDD 到 DataFrame - Spark 2.0
RDD 到 DataFrame - Spark 1.6

回答by Akash Sethi

You can directly read the file using sqlContext directly

可以直接使用sqlContext直接读取文件

Use read method of sqlContext

使用 sqlContext 的读取方法

For more info you can follow this link

有关更多信息,您可以点击此链接

https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#creating-dataframes

https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#creating-dataframes

Or you can import the

或者你可以导入

import sqlContext.implicits.*;

Then use toDF()method on rdd to convert into dataframe.

然后使用toDF()rdd 上的方法转换为数据帧。

回答by FaigB

Just flatmap your data according to 7 column table and use code snippet below

只需根据 7 列表对您的数据进行平面映射,并使用下面的代码片段

String[] columns = new String[7] {"clumn1","column2","column3","column4","column5","column6","column7"};
List<String> tableColumns = Arrays.asList(columns);

StrucType schema = createSchema(tableColumns);

    public StructType createSchema(List<String> tableColumns){

        List<StructField> fields  = new ArrayList<StructField>();
        for(String column : tableColumns){         

                fields.add(DataTypes.createStructField(column, DataTypes.StringType, true));            

        }
        return DataTypes.createStructType(fields);
    }

sqlContext.createDataFrame(urlRDD, schema);

回答by Assaf Mendelson

You can do something like (I am converting on the fly from scala so excuse any typos):

您可以执行以下操作(我正在从 Scala 即时转换,因此请原谅任何错别字):

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

JavaRDD<Row> rowRDD = urlrdd.map(new Function<String, Row>() {
    @Override
    public Row call(String record) throws Exception {
        return RowFactory.create(record());
    }
}
// now you wish to create the target schema. This is basically a list of
// fields (each field would be a column) which you are adding to a StructType
List<StructField> fields = new ArrayList<>();
StructField field = DataTypes.createStructField("url", DataTypes.StringType, true);
fields.add(field);
StructType schema = DataTypes.createStructType(fields);

// now you can create the dataframe:
DataFrame df= sqlContext.createDataFrame(rowRDD, schema);    

A couple additional notes:

一些额外的注意事项:

  • Why are you flatmaping when you are only taking the first element? You could have simply done:

    JavaRDD<String> urlrdd=diskfile.flatMap(line -> line.split("\t")[0]);

  • I assume in real life you would want to remove the '[' from the url (you can easily do this in the map).

  • If you are moving to spark 2.0 or later then instead of sqlContext you should be using spark session (spark).

  • You can create a single dataframe with all columns. You can do this by adding all fields to the schema (i.e. instead of just doing a single add to the fields add all of them). Instead of using urlrdd, use diskfile and do the split inside the "public Row call" creation. This would be something like this:

    JavaRDD<Row> rowRDD = diskfile.map(new Function<String, Row>() { @override public Row call(String record) throws Exception { String[] recs = record.split("\t") return RowFactory.create(recs[0], recs[1], ...); } });

  • You can create it directly: Just use

    sqlContext.read.option("sep","\t").csv.load(filename,schema)

  • 当您只取第一个元素时,为什么要进行平面映射?你可以简单地做:

    JavaRDD<String> urlrdd=diskfile.flatMap(line -> line.split("\t")[0]);

  • 我假设在现实生活中,您希望从 url 中删除“[”(您可以在地图中轻松完成此操作)。

  • 如果您要迁移到 spark 2.0 或更高版本,那么您应该使用 spark session (spark) 而不是 sqlContext。

  • 您可以创建包含所有列的单个数据框。您可以通过将所有字段添加到架构中来实现此目的(即,不仅仅是对字段进行一次添加,而是添加所有字段)。不使用 urlrdd,而是使用 diskfile 并在“公共行调用”创建中进行拆分。这将是这样的:

    JavaRDD<Row> rowRDD = diskfile.map(new Function<String, Row>() { @override public Row call(String record) throws Exception { String[] recs = record.split("\t") return RowFactory.create(recs[0], recs[1], ...); } });

  • 您可以直接创建它:只需使用

    sqlContext.read.option("sep","\t").csv.load(filename,schema)