scala 什么是 Apache Spark 中的随机读取和随机写入

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/27276884/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 06:45:32  来源:igfitidea点击:

What is shuffle read & shuffle write in Apache Spark

scalaapache-spark

提问by blue-sky

In below screenshot of Spark admin running on port 8080 :

在以下在端口 8080 上运行的 Spark 管理员的屏幕截图中:

enter image description here

在此处输入图片说明

The "Shuffle Read" & "Shuffle Write" parameters are always empty for this code :

此代码的“随机读取”和“随机写入”参数始终为空:

import org.apache.spark.SparkContext;

object first {
  println("Welcome to the Scala worksheet")

  val conf = new org.apache.spark.SparkConf()
    .setMaster("local")
    .setAppName("distances")
    .setSparkHome("C:\spark-1.1.0-bin-hadoop2.4\spark-1.1.0-bin-hadoop2.4")
    .set("spark.executor.memory", "2g")
  val sc = new SparkContext(conf)

  def euclDistance(userA: User, userB: User) = {

    val subElements = (userA.features zip userB.features) map {
      m => (m._1 - m._2) * (m._1 - m._2)
    }
    val summed = subElements.sum
    val sqRoot = Math.sqrt(summed)

    println("value is" + sqRoot)
    ((userA.name, userB.name), sqRoot)
  }

  case class User(name: String, features: Vector[Double])

  def createUser(data: String) = {

    val id = data.split(",")(0)
    val splitLine = data.split(",")

    val distanceVector = (splitLine.toList match {
      case h :: t => t
    }).map(m => m.toDouble).toVector

    User(id, distanceVector)

  }

  val dataFile = sc.textFile("c:\data\example.txt")
  val users = dataFile.map(m => createUser(m))
  val cart = users.cartesian(users) //
  val distances = cart.map(m => euclDistance(m._1, m._2))
  //> distances  : org.apache.spark.rdd.RDD[((String, String), Double)] = MappedR
  //| DD[4] at map at first.scala:46
  val d = distances.collect //

  d.foreach(println) //> ((a,a),0.0)
  //| ((a,b),0.0)
  //| ((a,c),1.0)
  //| ((a,),0.0)
  //| ((b,a),0.0)
  //| ((b,b),0.0)
  //| ((b,c),1.0)
  //| ((b,),0.0)
  //| ((c,a),1.0)
  //| ((c,b),1.0)
  //| ((c,c),0.0)
  //| ((c,),0.0)
  //| ((,a),0.0)
  //| ((,b),0.0)
  //| ((,c),0.0)
  //| ((,),0.0)

}

Why are "Shuffle Read" & "Shuffle Write" fields empty ? Can above code be tweaked in order to populate these fields so as to understand how

为什么“随机读取”和“随机写入”字段为空?可以调整上面的代码以填充这些字段以了解如何

采纳答案by Soumya Simanta

I believe you have to run your application in cluster/distributed mode to see any Shuffle read or write values. Typically "shuffle" are triggered by a subset of Spark actions (e.g., groupBy, join, etc)

我相信您必须在集群/分布式模式下运行您的应用程序才能查看任何 Shuffle 读取或写入值。通常“shuffle”由 Spark 操作的子集(例如 groupBy、join 等)触发

回答by taruxtin

Shuffling means the reallocation of data between multiple Spark stages. "Shuffle Write" is the sum of all written serialized data on all executors before transmitting (normally at the end of a stage) and "Shuffle Read" means the sum of read serialized data on all executors at the beginning of a stage.

混洗意味着在多个 Spark 阶段之间重新分配数据。“Shuffle Write”是传输前所有executor上所有写入的序列化数据的总和(通常在一个stage的末尾),“Shuffle Read”是指在一个stage开始时所有executor上读取的序列化数据的总和。

Your programm has only one stage, triggered by the "collect" operation. No shuffling is required, because you have only a bunch of consecutive map operations which are pipelined in one Stage.

您的程序只有一个阶段,由“收集”操作触发。不需要改组,因为您只有一堆连续的地图操作,这些操作在一个阶段中流水线化。

Try to take a look at these slides: http://de.slideshare.net/colorant/spark-shuffle-introduction

尝试看看这些幻灯片:http: //de.slideshare.net/colorant/spark-shuffle-introduction

It could also help to read chapture 5 from the original paper: http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf

阅读原始论文中的第 5 章也有帮助:http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf