替换 csv 文件中的新行 (\n) 字符 - spark scala

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36990177/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 08:15:20  来源:igfitidea点击:

Replace new line (\n) character in csv file - spark scala

scalareplaceapache-sparkcharacternewline

提问by user3560220

Just to illustrate the problem I have taken a testset csv file. But in real case scenario, the problem has to handle more than a TeraByte data.

只是为了说明问题,我采用了一个测试集 csv 文件。但在实际情况下,问题必须处理的不仅仅是一个 TeraByte 数据。

I have a CSV file, where the columns are enclosed by quotes("col1"). But when the data import was done. One column contains new line character(\n). This is leading me to lot of problems, when I want to save them as Hive tables.

我有一个 CSV 文件,其中的列用引号(“col1”)括起来。但是当数据导入完成时。一列包含换行符 (\n)。这导致我遇到很多问题,当我想将它们保存为 Hive 表时。

My idea was to replace the \n character with "|" pipe in spark.

我的想法是用“|”替换\n字符 管火花。

I achieved so far :

到目前为止我实现了:

1. val test = sqlContext.load(
        "com.databricks.spark.csv",
        Map("path" -> "test_set.csv", "header" -> "true", "inferSchema" -> "true", "delimiter" -> "," , "quote" -> "\"", "escape" -> "\" ,"parserLib" -> "univocity" ))#read a csv file

 2.   val dataframe = test.toDF() #convert to dataframe

  3.    dataframe.foreach(println) #print

    4. dataframe.map(row => {
        val row4 = row.getAs[String](4)
        val make = row4.replaceAll("[\r\n]", "|") 
        (make)
      }).collect().foreach(println) #replace not working for me

Sample set :

样本集:

(17 , D73 ,525, 1  ,testing\n    ,  90 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,526, 1  ,null         ,  89 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,529, 1  ,once \n again,  10 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,531, 1  ,test3\n      ,  10 ,20.07.2011 ,null ,F10 , R)

Expected result set :

预期结果集:

(17 , D73 ,525, 1  ,testing|    ,  90 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,526, 1  ,null         ,  89 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,529, 1  ,once | again,  10 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,531, 1  ,test3|      ,  10 ,20.07.2011 ,null ,F10 , R)

what worked for me:

什么对我有用:

val rep = "\n123\n Main Street\n".replaceAll("[\r\n]", "|") rep: String = |123| Main Street|

but why I am not able to do on Tuple basis?

但为什么我不能在元组的基础上做?

 val dataRDD = lines_wo_header.map(line => line.split(";")).map(row => (row(0).toLong, row(1).toString, 
                                               row(2).toLong, row(3).toLong, 
                                               row(4).toString, row(5).toLong,
                                               row(6).toString, row(7).toString, row(8).toString,row(9).toString)) 

dataRDD.map(row => {
                val wert = row._5.replaceAll("[\r\n]", "|") 
                (row._1,row._2,row._3,row._4,wert,row._6, row._7,row._8,row._9,row._10)
                }).collect().foreach(println)

Spark --version 1.3.1

Spark --version 1.3.1

回答by Daniel de Paula

If you can use Spark SQL 1.5 or higher, you may consider using the functionsavailable for columns. Assuming you don't know (or don't have) the names for the columns, you can do as in the following snippet:

如果您可以使用 Spark SQL 1.5 或更高版本,您可以考虑使用可用于列的函数。假设您不知道(或没有)列的名称,您可以按照以下代码段进行操作:

val df = test.toDF()

import org.apache.spark.sql.functions._
val newDF = df.withColumn(df.columns(4), regexp_replace(col(df.columns(4)), "[\r\n]", "|"))

If you know the name of the column, you can replace df.columns(4)by its name in both occurences.

如果您知道列的名称,则可以df.columns(4)在两次出现时用其名称替换。

I hope that helps. Cheers.

我希望这有帮助。干杯。

回答by mrnakumar

My idea was to replace the \n character with "|" pipe in spark.

我的想法是用“|”替换\n字符 管火花。

I tried replaceAllmethod but it is not working. Here is an alternative to achieve the same:

我尝试了replaceAll方法,但它不起作用。这是实现相同目标的替代方法:

val test = sq.load(
        "com.databricks.spark.csv",
        Map("path" -> "file:///home/veda/sample.csv", "header" -> "false", "inferSchema" -> "true", "delimiter" -> "," , "quote" -> "\"", "escape" -> "\" ,"parserLib" -> "univocity" ))

val dataframe = test.toDF()

val mapped = dataframe.map({
    row => {
    val str = row.get(0).toString()
    var fnal=new StringBuilder(str)
    //replace newLine 
    var newLineIndex=fnal.indexOf("\n")
    while(newLineIndex != -1){
        fnal.replace(newLineIndex,newLineIndex+2,"|")
        newLineIndex = fnal.indexOf("\n")                  
    }

    //replace carriage returns
    var cgIndex=fnal.indexOf("\r")
    while(cgIndex != -1){
        fnal.replace(cgIndex,cgIndex+2,"|")
        cgIndex = fnal.indexOf("\r")                   
    }

    (fnal.toString()) //tuple modified

    }
})

mapped.collect().foreach(println)

Note: You might want to move the duplicate code to separate function.

注意:您可能希望将重复的代码移动到单独的函数中。

回答by Rahul Sharma

The multi line support for CSV is added in spark version 2.2 JIRAand spark 2.2 is not yet released.

spark 2.2 JIRA版本中添加了对 CSV 的多行支持,但 spark 2.2 尚未发布。

I had faced same issue and resolved it with the help us hadoop input format and reader.

我遇到了同样的问题,并在我们 hadoop 输入格式和阅读器的帮助下解决了它。

Copy InputFormat and reader classes from gitand implement like this:

git复制 InputFormat 和 reader 类并像这样实现:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

//implementation

 JavaPairRDD<LongWritable, Text> rdd =
                context.
                        newAPIHadoopFile(path, FileCleaningInputFormat.class, null, null, new Configuration());
JavaRDD<String> inputWithMultiline= rdd.map(s -> s._2().toString())

Another solution- use CSVInputFormatfrom Apache crunch to read CSV file then parse each CSV line using opencsv:

另一个解决方案- 使用CSVInputFormatApache crunch 读取 CSV 文件,然后使用 opencsv 解析每个 CSV 行:

sparkContext.newAPIHadoopFile(path, CSVInputFormat.class, null, null, new Configuration()).map(s -> s._2().toString());

Apache crunch maven dependency:

Apache 紧缩 maven 依赖项:

 <dependency>
      <groupId>org.apache.crunch</groupId>
      <artifactId>crunch-core</artifactId>
      <version>0.15.0</version>
  </dependency>