scala Apache Spark 中的矩阵乘法

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33558755/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:46:49  来源:igfitidea点击:

Matrix Multiplication in Apache Spark

javascalaapache-sparkrddapache-spark-mllib

提问by Jigar

I am trying to perform matrix multiplication using Apache Spark and Java.

我正在尝试使用 Apache Spark 和 Java 执行矩阵乘法。

I have 2 main questions:

我有两个主要问题:

  1. How to create RDD that can represent matrix in Apache Spark?
  2. How to multiply two such RDDs?
  1. 如何在Apache Spark中创建可以表示矩阵的RDD?
  2. 如何将两个这样的 RDD 相乘?

回答by zero323

All depends on the input data and dimensions but generally speaking what you want is not a RDDbut one of the distributed data structures from org.apache.spark.mllib.linalg.distributed. At this moment it provides four different implementations of the DistributedMatrix

一切都取决于输入数据和维度,但一般来说,您想要的不是RDD来自org.apache.spark.mllib.linalg.distributed. 目前它提供了四种不同的实现DistributedMatrix

  • IndexedRowMatrix- can be created directly from a RDD[IndexedRow]where IndexedRowconsist of row index and org.apache.spark.mllib.linalg.Vector

    import org.apache.spark.mllib.linalg.{Vectors, Matrices}
    import org.apache.spark.mllib.linalg.distributed.{IndexedRowMatrix,
      IndexedRow}
    
    val rows =  sc.parallelize(Seq(
      (0L, Array(1.0, 0.0, 0.0)),
      (0L, Array(0.0, 1.0, 0.0)),
      (0L, Array(0.0, 0.0, 1.0)))
    ).map{case (i, xs) => IndexedRow(i, Vectors.dense(xs))}
    
    val indexedRowMatrix = new IndexedRowMatrix(rows)
    
  • RowMatrix- similar to IndexedRowMatrixbut without meaningful row indices. Can be created directly from RDD[org.apache.spark.mllib.linalg.Vector]

    import org.apache.spark.mllib.linalg.distributed.RowMatrix
    
    val rowMatrix = new RowMatrix(rows.map(_.vector))      
    
  • BlockMatrix- can be created from RDD[((Int, Int), Matrix)]where first element of the tuple contains coordinates of the block and the second one is a local org.apache.spark.mllib.linalg.Matrix

    val eye = Matrices.sparse(
      3, 3, Array(0, 1, 2, 3), Array(0, 1, 2), Array(1, 1, 1))
    
    val blocks = sc.parallelize(Seq(
       ((0, 0), eye), ((1, 1), eye), ((2, 2), eye)))
    
    val blockMatrix = new BlockMatrix(blocks, 3, 3, 9, 9)
    
  • CoordinateMatrix- can be created from RDD[MatrixEntry]where MatrixEntryconsist of row, column and value.

    import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix,
      MatrixEntry}
    
    val entries = sc.parallelize(Seq(
       (0, 0, 3.0), (2, 0, -5.0), (3, 2, 1.0),
       (4, 1, 6.0), (6, 2, 2.0), (8, 1, 4.0))
    ).map{case (i, j, v) => MatrixEntry(i, j, v)}
    
    val coordinateMatrix = new CoordinateMatrix(entries, 9, 3)
    
  • IndexedRowMatrix-可以直接从被创建RDD[IndexedRow],其中IndexedRow由行索引和org.apache.spark.mllib.linalg.Vector

    import org.apache.spark.mllib.linalg.{Vectors, Matrices}
    import org.apache.spark.mllib.linalg.distributed.{IndexedRowMatrix,
      IndexedRow}
    
    val rows =  sc.parallelize(Seq(
      (0L, Array(1.0, 0.0, 0.0)),
      (0L, Array(0.0, 1.0, 0.0)),
      (0L, Array(0.0, 0.0, 1.0)))
    ).map{case (i, xs) => IndexedRow(i, Vectors.dense(xs))}
    
    val indexedRowMatrix = new IndexedRowMatrix(rows)
    
  • RowMatrix- 类似于IndexedRowMatrix但没有有意义的行索引。可以直接从RDD[org.apache.spark.mllib.linalg.Vector]

    import org.apache.spark.mllib.linalg.distributed.RowMatrix
    
    val rowMatrix = new RowMatrix(rows.map(_.vector))      
    
  • BlockMatrix- 可以从RDD[((Int, Int), Matrix)]元组的第一个元素包含块坐标的位置创建,第二个元素是本地的org.apache.spark.mllib.linalg.Matrix

    val eye = Matrices.sparse(
      3, 3, Array(0, 1, 2, 3), Array(0, 1, 2), Array(1, 1, 1))
    
    val blocks = sc.parallelize(Seq(
       ((0, 0), eye), ((1, 1), eye), ((2, 2), eye)))
    
    val blockMatrix = new BlockMatrix(blocks, 3, 3, 9, 9)
    
  • CoordinateMatrix-可以从被创建RDD[MatrixEntry],其中MatrixEntry由行,列和价值。

    import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix,
      MatrixEntry}
    
    val entries = sc.parallelize(Seq(
       (0, 0, 3.0), (2, 0, -5.0), (3, 2, 1.0),
       (4, 1, 6.0), (6, 2, 2.0), (8, 1, 4.0))
    ).map{case (i, j, v) => MatrixEntry(i, j, v)}
    
    val coordinateMatrix = new CoordinateMatrix(entries, 9, 3)
    

First two implementations support multiplication by a local Matrix:

前两个实现支持乘以本地Matrix

val localMatrix = Matrices.dense(3, 2, Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))

indexedRowMatrix.multiply(localMatrix).rows.collect
// Array(IndexedRow(0,[1.0,4.0]), IndexedRow(0,[2.0,5.0]),
//   IndexedRow(0,[3.0,6.0]))

and the third one can be multiplied by an another BlockMatrixas long as number of columns per block in this matrix matches number of rows per block of the other matrix. CoordinateMatrixdoesn't support multiplications but is pretty easy to create and transform to other types of distributed matrices:

BlockMatrix只要该矩阵中每个块的列数与另一个矩阵的每个块的行数相匹配,第三个可以乘以另一个。CoordinateMatrix不支持乘法,但很容易创建和转换为其他类型的分布式矩阵:

blockMatrix.multiply(coordinateMatrix.toBlockMatrix(3, 3))

Each type has its own strong and weak sides and there are some additional factors to consider when you use sparse or dense elements (Vectorsor block Matrices). Multiplying by a local matrix is usually preferable since it doesn't require expensive shuffling.

每种类型都有自己的强弱方面,当您使用稀疏或密集元素(Vectors或块Matrices)时,还有一些额外的因素需要考虑。乘以局部矩阵通常是可取的,因为它不需要昂贵的改组。

You can find more details about each type in the MLlib Data Types guide.

您可以在MLlib 数据类型指南 中找到有关每种类型的更多详细信息。