Spark Java 错误:大小超过 Integer.MAX_VALUE

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/28967111/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 14:28:05  来源:igfitidea点击:

Spark Java Error: Size exceeds Integer.MAX_VALUE

javapythonapache-sparkdistributed-computinglogistic-regression

提问by peng

I am trying to use spark for some simple machine learning task. I used pyspark and spark 1.2.0 to do a simple logistic regression problem. I have 1.2 million records for training, and I hashed the features of the records. When I set the number of hashed features as 1024, the program works fine, but when I set the number of hashed features as 16384, the program fails several times with the following error:

我正在尝试将 spark 用于一些简单的机器学习任务。我使用 pyspark 和 spark 1.2.0 来做一个简单的逻辑回归问题。我有 120 万条训练记录,我对记录的特征进行了哈希处理。当我将散列特征数设置为 1024 时,程序运行良好,但是当我将散列特征数设置为 16384 时,程序多次失败并出现以下错误:

Py4JJavaError: An error occurred while calling o84.trainLogisticRegressionModelWithSGD.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 (TID 9, workernode0.sparkexperience4a7.d5.internal.cloudapp.net): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
    at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
    at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun.apply(NettyBlockRpcServer.scala:57)
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun.apply(NettyBlockRpcServer.scala:57)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
    at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
    at java.lang.Thread.run(Thread.java:745)

    at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:156)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

This error happens when I training the LogisticRegressionWithSGD after transferring the data into LabeledPoint.

在将数据传输到 LabeledPoint 后训练 LogisticRegressionWithSGD 时,会发生此错误。

Does anyone have a idea on this?

有没有人对此有想法?

My code is as follows (I am using a IPython Notebook for this):

我的代码如下(我为此使用了 IPython Notebook):

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
from numpy import array
from sklearn.feature_extraction import FeatureHasher
from pyspark import SparkContext
sf = SparkConf().setAppName("test").set("spark.executor.memory", "50g").set("spark.cores.max", 30)
sc = SparkContext(conf=sf)
training_file = sc.textFile("train_small.txt")
def hash_feature(line):
    values = [0, dict()]
    for index, x in enumerate(line.strip("\n").split('\t')):
        if index == 0:
            values[0] = float(x)
        else:
            values[1][str(index)+"_"+x] = 1
    return values
n_feature = 2**14
hasher = FeatureHasher(n_features=n_feature)
training_file_hashed = training_file.map(lambda line: [hash_feature(line)[0], hasher.transform([hash_feature(line)[1]])])
def build_lable_points(line):
    values = [0.0] * n_feature
    for index, value in zip(line[1].indices, line[1].data):
        values[index] = value
    return LabeledPoint(line[0], values)
parsed_training_data = training_file_hashed.map(lambda line: build_lable_points(line))
model = LogisticRegressionWithSGD.train(parsed_training_data)

The error happens when executing the last line.

执行最后一行时发生错误。

回答by Daniel Langdon

The Integer.MAX_INTrestriction is on the size of a file being stored. 1.2M rows is not a big thing, to I'm not sure your problem is "the limits of spark". More likely, some part of your work is creating something too big to be handled by any given executor.

Integer.MAX_INT限制是存储文件的大小。1.2M 行并不是什么大事,我不确定您的问题是“火花的极限”。更有可能的是,您工作的某些部分正在创建太大的东西,任何给定的执行者都无法处理。

I'm no Python coder, but when you "hashed the features of the records" you might be taking a very sparse set of records for a sample and creating an non-sparse array. This will mean a lot of memory for 16384 features. Particularly, when you do zip(line[1].indices, line[1].data). The only reason that doesn't get you out of memory right there is the shitload of it you seem to have configured (50G).

我不是 Python 编码员,但是当您“对记录的特征进行哈希处理”时,您可能会为样本获取一组非常稀疏的记录并创建一个非稀疏数组。这将意味着 16384 个特征需要大量内存。特别是,当你做zip(line[1].indices, line[1].data). 不会让您耗尽内存的唯一原因是您似乎已配置了大量内存(50G)。

Another thing that might help is to increase the partitioning. So if you can't make your rows use less memory, at least you can try having fewer rows on any given task. Any temporary files being created are likely to depend on this, so you'll be more unlikely to hit file limits.

另一件可能有帮助的事情是增加分区。所以如果你不能让你的行使用更少的内存,至少你可以尝试在任何给定任务上使用更少的行。创建的任何临时文件都可能依赖于此,因此您不太可能达到文件限制。



And, totally unrelated to the error but relevant for what you are trying to do:

而且,与错误完全无关,但与您要执行的操作相关:

16384 is indeed a big number of features, in the optimistic case where each one is just a boolean feature, you have a total of 2^16384 possible permutations to learn from, this is a huge number(try it here: https://defuse.ca/big-number-calculator.htm).

16384 确实是一大堆特征,在乐观的情况下,每个都只是一个布尔特征,你总共有 2^16384 个可能的排列可供学习,这是一个巨大的数字(在这里试试:https:// defuse.ca/big-number-calculator.htm)。

It is VERY, VERY likely that no algorithm will be able to learn a decision boundary with just 1.2M samples, you would probably need at least a few trillion trillion examples to make a dent on such a feature space. Machine Learning has its limitations, so don't be surprised if you don't get better-than-random accuracy.

很可能没有算法能够仅用 120 万个样本来学习决策边界,您可能至少需要几万亿个示例才能在这样的特征空间上产生影响。机器学习有其局限性,因此如果您没有获得优于随机的准确性,请不要感到惊讶。

I would definitely recommend trying some sort of dimensionality reduction first!!

我肯定会建议先尝试某种降维!!

回答by Baptiste Wicht

At some point, it tries to store the features and 1.2M * 16384 is greater than Integer.MAX_INT so you are trying to store more than than maximum size of features supported by Spark.

在某些时候,它会尝试存储特征并且 1.2M * 16384 大于 Integer.MAX_INT 因此您试图存储超过 Spark 支持的最大特征尺寸。

You're probably running into the limits of Apache Spark.

您可能遇到了 Apache Spark 的限制。

回答by gsamaras

Increasing the number of partitions may cause Active tasks is a negative number in Spark UI, which probably means that the number of partitions is too high.

增加分区数可能会导致Spark UI 中的 Active tasks 为负数,这可能意味着分区数过高。