scala 通过键 Spark 写入多个输出 - 一项 Spark 作业

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/23995040/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:18:01  来源:igfitidea点击:

Write to multiple outputs by key Spark - one Spark job

scalahadoopoutputhdfsapache-spark

提问by samthebest

How can you write to multiple outputs dependent on the key using Spark in a single Job.

如何在单个作业中使用 Spark 写入依赖于键的多个输出。

Related: Write to multiple outputs by key Scalding Hadoop, one MapReduce Job

相关:通过键 Scalding Hadoop 写入多个输出,一个 MapReduce 作业

E.g.

例如

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)

would ensure cat prefix/1is

将确保cat prefix/1

a
b

and cat prefix/2would be

并且cat prefix/2

c

EDIT: I've recently added a new answer that includes full imports, pimp and compression codec, see https://stackoverflow.com/a/46118044/1586965, which may be helpful in addition to the earlier answers.

编辑:我最近添加了一个新的答案,其中包括完整的导入、皮条客和压缩编解码器,请参阅https://stackoverflow.com/a/46118044/1586965,除了早期的答案外,这可能会有所帮助。

采纳答案by samthebest

This includes the codec as requested, necessary imports, and pimp as requested.

这包括请求的编解码器、必要的导入和请求的皮条客。

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext

// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
  def writeAsMultiple(prefix: String, codec: String,
                      keyName: String = "key")
                     (implicit sqlContext: SQLContext): Unit = {
    import sqlContext.implicits._

    rdd.toDF(keyName, "_2").write.partitionBy(keyName)
    .format("text").option("codec", codec).save(prefix)
  }
}

val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

One subtle difference to the OP is that it will prefix <keyName>=to the directory names. E.g.

与 OP 的一个细微差别是它将<keyName>=作为目录名称的前缀。例如

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

Would give:

会给:

prefix/key=1/part-00000
prefix/key=2/part-00000

where prefix/my_number=1/part-00000would contain the lines aand b, and prefix/my_number=2/part-00000would contain the line c.

whereprefix/my_number=1/part-00000将包含行ab,并且prefix/my_number=2/part-00000将包含行c

And

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")

Would give:

会给:

prefix/foo=1/part-00000
prefix/foo=2/part-00000

It should be clear how to edit for parquet.

应该清楚如何编辑parquet.

Finally below is an example for Dataset, which is perhaps nicer that using Tuples.

最后,下面是 的示例Dataset,这可能比使用元组更好。

implicit class PimpedDataset[T](dataset: Dataset[T]) {
  def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
    dataset.write.partitionBy(field)
    .format("text").option("codec", codec).save(prefix)
  }
}

回答by Nick Chammas

If you use Spark 1.4+, this has become much, much easier thanks to the DataFrame API. (DataFrames were introduced in Spark 1.3, but partitionBy(), which we need, was introduced in 1.4.)

如果您使用 Spark 1.4+,这将变得更加容易,这要归功于DataFrame API。(DataFrames 是在 Spark 1.3 中引入的,但是partitionBy()我们需要的 是在 1.4引入的。)

If you're starting out with an RDD, you'll first need to convert it to a DataFrame:

如果您从 RDD 开始,首先需要将其转换为 DataFrame:

val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")

In Python, this same code is:

在 Python 中,同样的代码是:

people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])

Once you have a DataFrame, writing to multiple outputs based on a particular key is simple. What's more -- and this is the beauty of the DataFrame API -- the code is pretty much the same across Python, Scala, Java and R:

拥有 DataFrame 后,基于特定键写入多个输出很简单。更重要的是——这就是 DataFrame API 的美妙之处——Python、Scala、Java 和 R 的代码几乎相同:

people_df.write.partitionBy("number").text("people")

And you can easily use other output formats if you want:

如果需要,您可以轻松使用其他输出格式:

people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")

In each of these examples, Spark will create a subdirectory for each of the keys that we've partitioned the DataFrame on:

在每个示例中,Spark 将为我们对 DataFrame 进行分区的每个键创建一个子目录:

people/
  _SUCCESS
  number=1/
    part-abcd
    part-efgh
  number=2/
    part-abcd
    part-efgh

回答by zhang zhan

I would do it like this which is scalable

我会这样做,这是可扩展的

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Split" + args(1))
    val sc = new SparkContext(conf)
    sc.textFile("input/path")
    .map(a => (k, v)) // Your own implementation
    .partitionBy(new HashPartitioner(num))
    .saveAsHadoopFile("output/path", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}

Just saw similar answer above, but actually we don't need customized partitions. The MultipleTextOutputFormat will create file for each key. It is ok that multiple record with same keys fall into the same partition.

刚刚看到上面类似的答案,但实际上我们不需要自定义分区。MultipleTextOutputFormat 将为每个键创建文件。多个具有相同键的记录落入同一个分区是可以的。

new HashPartitioner(num), where the num is the partition number you want. In case you have a big number of different keys, you can set number to big. In this case, each partition will not open too many hdfs file handlers.

new HashPartitioner(num),其中num是你想要的分区号。如果您有大量不同的键,您可以将 number 设置为 big。在这种情况下,每个分区不会打开太多的 hdfs 文件处理程序。

回答by Daniel Darabos

If you potentially have many values for a given key, I think the scalable solution is to write out one file per key per partition. Unfortunately there is no built-in support for this in Spark, but we can whip something up.

如果给定键可能有多个值,我认为可扩展的解决方案是为每个分区的每个键写出一个文件。不幸的是,Spark 中没有对此的内置支持,但我们可以做出一些贡献。

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
  .mapPartitionsWithIndex { (p, it) =>
    val outputs = new MultiWriter(p.toString)
    for ((k, v) <- it) {
      outputs.write(k.toString, v)
    }
    outputs.close
    Nil.iterator
  }
  .foreach((x: Nothing) => ()) // To trigger the job.

// This one is Local, but you could write one for HDFS
class MultiWriter(suffix: String) {
  private val writers = collection.mutable.Map[String, java.io.PrintWriter]()
  def write(key: String, value: Any) = {
    if (!writers.contains(key)) {
      val f = new java.io.File("output/" + key + "/" + suffix)
      f.getParentFile.mkdirs
      writers(key) = new java.io.PrintWriter(f)
    }
    writers(key).println(value)
  }
  def close = writers.values.foreach(_.close)
}

(Replace PrintWriterwith your choice of distributed filesystem operation.)

(替换PrintWriter为您选择的分布式文件系统操作。)

This makes a single pass over the RDD and performs no shuffle. It gives you one directory per key, with a number of files inside each.

这对 RDD 进行了一次传递,并且不执行 shuffle。它为每个键提供一个目录,每个目录中有许多文件。

回答by Thamme Gowda

I was in need of the same thing in Java. Posting my translation of Zhang Zhan's Scala answerto Spark Java API users:

我在 Java 中也需要同样的东西。将我翻译的Zhang Zhan 的 Scala 回答发布给 Spark Java API 用户:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;


class RDDMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {

    @Override
    protected String generateFileNameForKeyValue(A key, B value, String name) {
        return key.toString();
    }
}

public class Main {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Split Job")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        String[] strings = {"Abcd", "Azlksd", "whhd", "wasc", "aDxa"};
        sc.parallelize(Arrays.asList(strings))
                // The first character of the string is the key
                .mapToPair(s -> new Tuple2<>(s.substring(0,1).toLowerCase(), s))
                .saveAsHadoopFile("output/", String.class, String.class,
                        RDDMultipleTextOutputFormat.class);
        sc.stop();
    }
}

回答by douglaz

I have a similar need and found an way. But it has one drawback (which is not a problem for my case): you need to re-partition you data with one partition per output file.

我有类似的需求并找到了方法。但它有一个缺点(这对我来说不是问题):您需要将数据重新分区为每个输出文件的一个分区。

To partition in this way it generally requires to know beforehand how many files the job will output and find a function that will map each key to each partition.

要以这种方式进行分区,通常需要事先知道作业将输出多少个文件,并找到将每个键映射到每个分区的函数。

First let's create our MultipleTextOutputFormat-based class:

首先让我们创建基于 MultipleTextOutputFormat 的类:

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class KeyBasedOutput[T >: Null, V <: AnyRef] extends MultipleTextOutputFormat[T , V] {
  override def generateFileNameForKeyValue(key: T, value: V, leaf: String) = {
    key.toString
  }
  override protected def generateActualKey(key: T, value: V) = {
    null
  }
}

With this class Spark will get a key from a partition (the first/last, I guess) and name the file with this key, so it's not good to mix multiple keys on the same partition.

有了这个类,Spark 将从一个分区(我猜是第一个/最后一个)获取一个键并用这个键命名文件,所以在同一个分区上混合多个键是不好的。

For your example, you will require a custom partitioner. This will do the job:

对于您的示例,您将需要一个自定义分区程序。这将完成这项工作:

import org.apache.spark.Partitioner

class IdentityIntPartitioner(maxKey: Int) extends Partitioner {
  def numPartitions = maxKey

  def getPartition(key: Any): Int = key match {
    case i: Int if i < maxKey => i
  }
}

Now let's put everything together:

现在让我们把所有东西放在一起:

val rdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c"), (7, "d"), (7, "e")))

// You need to know the max number of partitions (files) beforehand
// In this case we want one partition per key and we have 3 keys,
// with the biggest key being 7, so 10 will be large enough
val partitioner = new IdentityIntPartitioner(10)

val prefix = "hdfs://.../prefix"

val partitionedRDD = rdd.partitionBy(partitioner)

partitionedRDD.saveAsHadoopFile(prefix,
    classOf[Integer], classOf[String], classOf[KeyBasedOutput[Integer, String]])

This will generate 3 files under prefix (named 1, 2 and 7), processing everything in one pass.

这将在前缀下生成 3 个文件(命名为 1、2 和 7),一次处理所有内容。

As you can see, you need some knowledge about your keys to be able to use this solution.

如您所见,您需要一些有关密钥的知识才能使用此解决方案。

For me it was easier because I needed one output file for each key hash and the number of files was under my control, so I could use the stock HashPartitioner to do the trick.

对我来说,这更容易,因为我需要为每个键哈希创建一个输出文件,而且文件的数量在我的控制之下,所以我可以使用股票 HashPartitioner 来实现这一点。

回答by maasg

saveAsText() and saveAsHadoop(...) are implemented based on the RDD data, specifically by the method: PairRDD.saveAsHadoopDatasetwhich takes the data from the PairRdd where it's executed. I see two possible options: If your data is relatively small in size, you could save some implementation time by grouping over the RDD, creating a new RDD from each collection and using that RDD to write the data. Something like this:

saveAsText() 和 saveAsHadoop(...) 是基于 RDD 数据实现的,特别是通过以下方法: PairRDD.saveAsHadoopDataset,它从执行它的PairRdd获取数据。我看到了两种可能的选择:如果您的数据相对较小,您可以通过对 RDD 进行分组、从每个集合创建一个新的 RDD 并使用该 RDD 写入数据来节省一些实现时间。像这样的东西:

val byKey = dataRDD.groupByKey().collect()
val rddByKey = byKey.map{case (k,v) => k->sc.makeRDD(v.toSeq)}
val rddByKey.foreach{ case (k,rdd) => rdd.saveAsText(prefix+k}

Note that it will not work for large datasets b/c the materialization of the iterator at v.toSeqmight not fit in memory.

请注意,它不适用于大型数据集 b/c 迭代器的具体化v.toSeq可能不适合内存。

The other option I see, and actually the one I'd recommend in this case is: roll your own, by directly calling the hadoop/hdfs api.

我看到的另一个选项,实际上我在这种情况下推荐的选项是:通过直接调用 hadoop/hdfs api 自己动手。

Here's a discussion I started while researching this question: How to create RDDs from another RDD?

这是我在研究这个问题时开始的一个讨论: 如何从另一个 RDD 创建 RDD?

回答by shanmuga

I had a similar use case where I split the input file on Hadoop HDFS into multiple files based on a key (1 file per key). Here is my scala code for spark

我有一个类似的用例,我根据一个键(每个键 1 个文件)将 Hadoop HDFS 上的输入文件拆分为多个文件。这是我的 spark Scala 代码

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);

@serializable object processGroup {
    def apply(groupName:String, records:Iterable[String]): Unit = {
        val outFileStream = fs.create(new Path("/output_dir/"+groupName))
        for( line <- records ) {
                outFileStream.writeUTF(line+"\n")
            }
        outFileStream.close()
    }
}
val infile = sc.textFile("input_file")
val dateGrouped = infile.groupBy( _.split(",")(0))
dateGrouped.foreach( (x) => processGroup(x._1, x._2))

I have grouped the records based on key. The values for each key is written to separate file.

我已经根据密钥对记录进行了分组。每个键的值被写入单独的文件。

回答by dalin qin

good news for python user in the case you have multi columns and you want to save all the other columns not partitioned in csv format which will failed if you use "text" method as Nick Chammas' suggestion .

如果您有多列,并且想要保存所有其他未以 csv 格式分区的列,如果您使用“文本”方法作为 Nick Chammas 的建议,这将失败,这对 python 用户来说是个好消息。

people_df.write.partitionBy("number").text("people") 

error message is "AnalysisException: u'Text data source supports only a single column, and you have 2 columns.;'"

错误消息是“AnalysisException:u'Text 数据源仅支持单列,而您有 2 列。;'”

In spark 2.0.0 (my test enviroment is hdp's spark 2.0.0) package "com.databricks.spark.csv" is now integrated , and it allow us save text file partitioned by only one column, see the example blow:

在spark 2.0.0(我的测试环境是hdp的spark 2.0.0)包“com.databricks.spark.csv”现已集成,它允许我们保存仅由一列分区的文本文件,参见示例:

people_rdd = sc.parallelize([(1,"2016-12-26", "alice"),
                             (1,"2016-12-25", "alice"),
                             (1,"2016-12-25", "tom"), 
                             (1, "2016-12-25","bob"), 
                             (2,"2016-12-26" ,"charlie")])
df = people_rdd.toDF(["number", "date","name"])

df.coalesce(1).write.partitionBy("number").mode("overwrite").format('com.databricks.spark.csv').options(header='false').save("people")

[root@namenode people]# tree
.
├── number=1
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
├── number=2
│?? └── part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
└── _SUCCESS

[root@namenode people]# cat number\=1/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,alice
2016-12-25,alice
2016-12-25,tom
2016-12-25,bob
[root@namenode people]# cat number\=2/part-r-00000-6bd1b9a8-4092-474a-9ca7-1479a98126c2.csv
2016-12-26,charlie

In my spark 1.6.1 enviroment ,the code didn't throw any error,however ther is only one file generated. it's not partitioned by two folders.

在我的 spark 1.6.1 环境中,代码没有抛出任何错误,但是只生成了一个文件。它没有被两个文件夹分区。

Hope this can help .

希望这可以帮助。

回答by jeanr

I had a similar use case. I resolved it in Java by writing two custom classes implemeting MultipleTextOutputFormatand RecordWriter.

我有一个类似的用例。我通过编写两个实现MultipleTextOutputFormatRecordWriter.

My input was a JavaPairRDD<String, List<String>>and I wanted to store it in a file named by its key, with all the lines contained in its value.

我的输入是 a JavaPairRDD<String, List<String>>,我想将它存储在一个以其键命名的文件中,所有行都包含在它的值中。

Here is the code for my MultipleTextOutputFormatimplementation

这是我的MultipleTextOutputFormat实现代码

class RDDMultipleTextOutputFormat<K, V> extends MultipleTextOutputFormat<K, V> {

    @Override
    protected String generateFileNameForKeyValue(K key, V value, String name) {
        return key.toString(); //The return will be used as file name
    }

    /** The following 4 functions are only for visibility purposes                 
    (they are used in the class MyRecordWriter) **/
    protected String generateLeafFileName(String name) {
        return super.generateLeafFileName(name);
    }

    protected V generateActualValue(K key, V value) {
        return super.generateActualValue(key, value);
    }

    protected String getInputFileBasedOutputFileName(JobConf job,     String name) {
        return super.getInputFileBasedOutputFileName(job, name);
        }

    protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException {
        return super.getBaseRecordWriter(fs, job, name, arg3);
    }

    /** Use my custom RecordWriter **/
    @Override
    RecordWriter<K, V> getRecordWriter(final FileSystem fs, final JobConf job, String name, final Progressable arg3) throws IOException {
    final String myName = this.generateLeafFileName(name);
        return new MyRecordWriter<K, V>(this, fs, job, arg3, myName);
    }
} 

Here is the code for my RecordWriterimplementation.

这是我的RecordWriter实现代码。

class MyRecordWriter<K, V> implements RecordWriter<K, V> {

    private RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat;
    private final FileSystem fs;
    private final JobConf job;
    private final Progressable arg3;
    private String myName;

    TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap();

    MyRecordWriter(RDDMultipleTextOutputFormat<K, V> rddMultipleTextOutputFormat, FileSystem fs, JobConf job, Progressable arg3, String myName) {
        this.rddMultipleTextOutputFormat = rddMultipleTextOutputFormat;
        this.fs = fs;
        this.job = job;
        this.arg3 = arg3;
        this.myName = myName;
    }

    @Override
    void write(K key, V value) throws IOException {
        String keyBasedPath = rddMultipleTextOutputFormat.generateFileNameForKeyValue(key, value, myName);
        String finalPath = rddMultipleTextOutputFormat.getInputFileBasedOutputFileName(job, keyBasedPath);
        Object actualValue = rddMultipleTextOutputFormat.generateActualValue(key, value);
        RecordWriter rw = this.recordWriters.get(finalPath);
        if(rw == null) {
            rw = rddMultipleTextOutputFormat.getBaseRecordWriter(fs, job, finalPath, arg3);
            this.recordWriters.put(finalPath, rw);
        }
        List<String> lines = (List<String>) actualValue;
        for (String line : lines) {
            rw.write(null, line);
        }
    }

    @Override
    void close(Reporter reporter) throws IOException {
        Iterator keys = this.recordWriters.keySet().iterator();

        while(keys.hasNext()) {
            RecordWriter rw = (RecordWriter)this.recordWriters.get(keys.next());
            rw.close(reporter);
        }

        this.recordWriters.clear();
    }
}

Most of the code is exactly the same than in FileOutputFormat. The only difference is those few lines

大多数代码与FileOutputFormat. 唯一的区别是那几行

List<String> lines = (List<String>) actualValue;
for (String line : lines) {
    rw.write(null, line);
}

These lines allowed me to write each line of my input List<String>on the file. The first argument of the writefunction is set to nullin order to avoid writting the key on each line.

这些行允许我List<String>在文件上写下我输入的每一行。该write函数的第一个参数设置为null,以避免在每一行上写入密钥。

To finish, I only need to do this call to write my files

最后,我只需要执行此调用即可写入我的文件

javaPairRDD.saveAsHadoopFile(path, String.class, List.class, RDDMultipleTextOutputFormat.class);