如何使用 Scala Stream 类读取大型 CSV 文件?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/4255021/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 02:35:28  来源:igfitidea点击:

How do I read a large CSV file with Scala Stream class?

scalacsvstreaminglarge-files

提问by Jan Willem Tulp

How do I read a large CSV file (> 1 Gb) with a Scala Stream? Do you have a code example? Or would you use a different way to read a large CSV file without loading it into memory first?

如何使用 Scala Stream 读取大型 CSV 文件(> 1 Gb)?你有代码示例吗?或者您是否会使用不同的方式来读取大型 CSV 文件而不先将其加载到内存中?

回答by Kevin Wright

Just use Source.fromFile(...).getLinesas you already stated.

就像Source.fromFile(...).getLines你已经说过的那样使用。

That returns an Iterator, which is already lazy (You'd use stream as a lazy collection where you wanted previously retrieved values to be memoized, so you can read them again)

这将返回一个已经是惰性的迭代器(您可以将流用作惰性集合,您希望之前检索到的值被记忆化,以便您可以再次读取它们)

If you're getting memory problems, then the problem will lie in what you're doing aftergetLines. Any operation like toList, which forces a strict collection, will cause the problem.

如果您遇到内存问题,那么问题将在于您getLines之后正在做什么。任何toList强制严格收集之类的操作都会导致问题。

回答by soc

I hope you don't mean Scala's collection.immutable.Streamwith Stream. This is notwhat you want. Stream is lazy, but does memoization.

我希望你不是说 Scalacollection.immutable.Stream带有 Stream。这不是你想要的。Stream 是懒惰的,但可以记忆。

I don't know what you plan to do, but just reading the file line-by-line should work very well without using high amounts of memory.

我不知道您打算做什么,但只需逐行读取文件就可以很好地工作,而无需使用大量内存。

getLinesshould evaluate lazily and should not crash (as long as your file does not have more than 232 lines, afaik). If it does, ask on #scala or file a bug ticket (or do both).

getLines应该懒惰地评估并且不应该崩溃(只要您的文件不超过 232 行,afaik)。如果是,请在#scala 上询问或提交错误票证(或两者都做)。

回答by chaotic3quilibrium

If you are looking to process the large file line-by-line while avoiding requiring the entire file's contents be loaded into memory all at once, then you can use the Iteratorreturned by scala.io.Source.

如果您希望逐行处理大文件,同时避免要求将整个文件的内容一次全部加载到内存中,那么您可以使用Iterator返回的scala.io.Source.

I have a small function, tryProcessSource, (containing two sub-functions) which I use for exactly these types of use-cases. The function takes up to four parameters, of which only the first is required. The other parameters have sane default values provided.

我有一个小函数 , tryProcessSource(包含两个子函数),我正好用于这些类型的用例。该函数最多需要四个参数,其中只有第一个是必需的。其他参数提供了合理的默认值。

Here's the function profile (full function implementation is at the bottom):

这是函数配置文件(完整的函数实现在底部):

def tryProcessSource(
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  retainValues: (Int, List[String]) => Option[List[String]] =
    (index, parsedValues) => Some(parsedValues),
): Try[List[List[String]]] = {
  ???
}

The first parameter, file: File, is required. And it is just any valid instance of java.io.Filewhich points to a line-oriented text file, like a CSV.

第一个参数file: File, 是必需的。它只是java.io.File指向面向行的文本文件(如 CSV)的任何有效实例。

The second parameter, parseLine: (Int, String) => Option[List[String]], is optional. And if provided, it must be a function expecting to receive two input parameters; index: Int, unparsedLine: String. And then return an Option[List[String]]. The function may return a Somewrapped List[String]consisting of the valid column values. Or it may return a Nonewhich indicates the entire streaming process is aborting early. If this parameter is not provided, a default value of (index, line) => Some(List(line))is provided. This default results in the entire line being returned as a single Stringvalue.

第二个参数parseLine: (Int, String) => Option[List[String]]是可选的。如果提供,它必须是一个期望接收两个输入参数的函数;index: Int, unparsedLine: String. 然后返回一个Option[List[String]]. 该函数可能会返回一个由有效列值组成的Some包装List[String]。或者它可能返回一个None指示整个流传输过程提前中止的。如果未提供此参数,则提供默认值(index, line) => Some(List(line))。此默认值导致整行作为单个String值返回。

The third parameter, filterLine: (Int, List[String]) => Option[Boolean], is optional. And if provided, it must be a function expecting to receive two input parameters; index: Int, parsedValues: List[String]. And then return an Option[Boolean]. The function may return a Somewrapped Booleanindicating whether this particular line should be included in the output. Or it may return a Nonewhich indicates the entire streaming process is aborting early. If this parameter is not provided, a default value of (index, values) => Some(true)is provided. This default results in all lines being included.

第三个参数filterLine: (Int, List[String]) => Option[Boolean]是可选的。如果提供,它必须是一个期望接收两个输入参数的函数;index: Int, parsedValues: List[String]. 然后返回一个Option[Boolean]. 该函数可能会返回一个Some包装,Boolean指示此特定行是否应包含在输出中。或者它可能返回一个None指示整个流传输过程提前中止的。如果未提供此参数,则提供默认值(index, values) => Some(true)。此默认值会导致包含所有行。

The fourth and final parameter, retainValues: (Int, List[String]) => Option[List[String]], is optional. And if provided, it must be a function expecting to receive two input parameters; index: Int, parsedValues: List[String]. And then return an Option[List[String]]. The function may return a Somewrapped List[String]consisting of some subset and/or alteration of the existing column values. Or it may return a Nonewhich indicates the entire streaming process is aborting early. If this parameter is not provided, a default value of (index, values) => Some(values)is provided. This default results in the values parsed by the second parameter, parseLine.

第四个也是最后一个参数retainValues: (Int, List[String]) => Option[List[String]]是可选的。如果提供,它必须是一个期望接收两个输入参数的函数;index: Int, parsedValues: List[String]. 然后返回一个Option[List[String]]. 该函数可能会返回一个包含一些子集和/或现有列值更改的Some包装List[String]。或者它可能返回一个None指示整个流传输过程提前中止的。如果未提供此参数,则提供默认值(index, values) => Some(values)。此默认值导致由第二个参数解析的值,parseLine

Consider a file with the following contents (4 lines):

考虑一个包含以下内容的文件(4 行):

street,street2,city,state,zip
100 Main Str,,Irving,TX,75039
231 Park Ave,,Irving,TX,75039
1400 Beltline Rd,Apt 312,Dallas,Tx,75240


The following calling profile...

以下调用配置文件...

val tryLinesDefaults =
  tryProcessSource(new File("path/to/file.csv"))

...results in this output for tryLinesDefaults(the unaltered contents of the file):

...导致此输出tryLinesDefaults(文件的未更改内容):

Success(
  List(
    List("street,street2,city,state,zip"),
    List("100 Main Str,,Irving,TX,75039"),
    List("231 Park Ave,,Irving,TX,75039"),
    List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
  )
)


The following calling profile...

以下调用配置文件...

val tryLinesParseOnly =
  tryProcessSource(
      new File("path/to/file.csv")
    , parseLine =
        (index, unparsedLine) => Some(unparsedLine.split(",").toList)
  )

...results in this output for tryLinesParseOnly(each line parsed into the individual column values):

...导致此输出tryLinesParseOnly(每行解析为单独的列值):

Success(
  List(
    List("street","street2","city","state","zip"),
    List("100 Main Str","","Irving,TX","75039"),
    List("231 Park Ave","","Irving","TX","75039"),
    List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
  )
)


The following calling profile...

以下调用配置文件...

val tryLinesIrvingTxNoHeader =
  tryProcessSource(
      new File("C:/Users/Jim/Desktop/test.csv")
    , parseLine =
        (index, unparsedLine) => Some(unparsedLine.split(",").toList)
    , filterLine =
        (index, parsedValues) =>
          Some(
            (index != 0) && //skip header line
            (parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
            (parsedValues(3).toLowerCase == "Tx".toLowerCase)
          )
  )

...results in this output for tryLinesIrvingTxNoHeader(each line parsed into the individual column values, no header and only the two rows in Irving,Tx):

...导致此输出tryLinesIrvingTxNoHeader(每行解析为单独的列值,没有标题,只有 Irving,Tx 中的两行):

Success(
  List(
    List("100 Main Str","","Irving,TX","75039"),
    List("231 Park Ave","","Irving","TX","75039"),
  )
)


Here's the entire tryProcessSourcefunction implementation:

这是整个tryProcessSource函数的实现:

import scala.io.Source
import scala.util.Try

import java.io.File

def tryProcessSource(
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  retainValues: (Int, List[String]) => Option[List[String]] =
    (index, parsedValues) => Some(parsedValues)
): Try[List[List[String]]] = {
  def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
    try {Try(transfer(source))} finally {source.close()}
  def recursive(
    remaining: Iterator[(String, Int)],
    accumulator: List[List[String]],
    isEarlyAbort: Boolean =
      false
  ): List[List[String]] = {
    if (isEarlyAbort || !remaining.hasNext)
      accumulator
    else {
      val (line, index) =
        remaining.next
      parseLine(index, line) match {
        case Some(values) =>
          filterLine(index, values) match {
            case Some(keep) =>
              if (keep)
                retainValues(index, values) match {
                  case Some(valuesNew) =>
                    recursive(remaining, valuesNew :: accumulator) //capture values
                  case None =>
                    recursive(remaining, accumulator, isEarlyAbort = true) //early abort
                }
              else
                recursive(remaining, accumulator) //discard row
            case None =>
              recursive(remaining, accumulator, isEarlyAbort = true) //early abort
          }
        case None =>
          recursive(remaining, accumulator, isEarlyAbort = true) //early abort
      }
    }
  }
  Try(Source.fromFile(file)).flatMap(
    bufferedSource =>
      usingSource(bufferedSource) {
        source =>
          recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
      }
  )
}

While this solution is relatively succinct, it took me considerable time and many refactoring passes before I was finally able to get to here. Please let me know if you see any ways it might be improved.

虽然这个解决方案相对简洁,但在我最终能够到达这里之前,我花了相当多的时间和多次重构。如果您看到任何可以改进的方法,请告诉我。



UPDATE: I have just asked the issue below as it's own StackOverflow question. And it now has an answer fixing the errormentioned below.

更新:我刚刚问了下面的问题,因为它是自己的 StackOverflow 问题。它现在有一个解决下面提到的错误的答案

I had the idea to try and make this even more generic changing the retainValuesparameter to transformLinewith the new generics-ified function definition below. However, I keep getting the highlight error in IntelliJ "Expression of type Some[List[String]] doesn't conform to expected type Option[A]" and wasn't able to figure out how to change the default value so the error goes away.

我有一个想法,尝试使用下面的新泛型化函数定义更改retainValues参数,使其更加通用transformLine。但是,我一直在 IntelliJ 中收到突出显示错误“Some[List[String]] 类型的表达式不符合预期的类型 Option[A]”,并且无法弄清楚如何更改默认值,因此错误消失了。

def tryProcessSource2[A <: AnyRef](
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  transformLine: (Int, List[String]) => Option[A] =
    (index, parsedValues) => Some(parsedValues)
): Try[List[A]] = {
  ???
}

Any assistance on how to make this work would be greatly appreciated.

任何有关如何使这项工作的帮助将不胜感激。