Scala 中的 Apache Spark 日志记录
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/29208844/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Apache Spark logging within Scala
提问by Bogdan N
I am looking for a solution to be able to log additional data when executing code on Apache Spark Nodes that could help investigate later some issues that might appear during execution. Trying to use a traditional solution like for example com.typesafe.scalalogging.LazyLoggingfails because the log instance cannot be serialized on a distributed environment like Apache Spark.
我正在寻找一种解决方案,以便在 Apache Spark 节点上执行代码时能够记录附加数据,这有助于稍后调查在执行过程中可能出现的一些问题。尝试使用传统解决方案(例如)com.typesafe.scalalogging.LazyLogging失败,因为日志实例无法在分布式环境(如 Apache Spark)上序列化。
I've investigated this problem and for now the solution that I found was to use the org.apache.spark.Loggingtrait like this :
我已经调查了这个问题,现在我发现的解决方案是使用这样的org.apache.spark.Logging特征:
class SparkExample with Logging {
val someRDD = ...
someRDD.map {
rddElement => logInfo(s"$rddElement will be processed.")
doSomething(rddElement)
}
}
However it looks like the Logging trait is not a permanent solution for Apache Spark because it's marked as @DeveloperApiand the class documentation mentions:
但是看起来 Logging trait 不是 Apache Spark 的永久解决方案,因为它被标记为@DeveloperApi并且类文档提到:
This will likely be changed or removed in future releases.
这可能会在未来的版本中更改或删除。
I am wondering - are they any known logging solution that I can use and will allow me to log data when the RDDs are executed on Apache Spark nodes ?
我想知道 - 它们是否是我可以使用的任何已知日志记录解决方案,并且允许我在 Apache Spark 节点上执行 RDD 时记录数据?
@Later Edit: Some of the comments from below suggest to use Log4J. I've tried using Log4J but I'm still having issues when using logger from a Scala class (and not a Scala object). Here is my full code :
@Later Edit:下面的一些评论建议使用 Log4J。我已经尝试使用 Log4J,但是在使用 Scala 类(而不是 Scala 对象)中的记录器时仍然遇到问题。这是我的完整代码:
import org.apache.log4j.Logger
import org.apache.spark._
object Main {
def main(args: Array[String]) {
new LoggingTestWithRDD().doTest()
}
}
class LoggingTestWithRDD extends Serializable {
val log = Logger.getLogger(getClass.getName)
def doTest(): Unit = {
val conf = new SparkConf().setMaster("local[4]").setAppName("LogTest")
val spark = new SparkContext(conf)
val someRdd = spark.parallelize(List(1, 2, 3))
someRdd.map {
element =>
log.info(s"$element will be processed")
element + 1
}
spark.stop()
}
}
}
The exception that I'm seeing is :
我看到的例外是:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable -> Caused by: java.io.NotSerializableException: org.apache.log4j.Logger
线程“main” org.apache.spark.SparkException 中的异常:任务不可序列化 -> 由:java.io.NotSerializableException:org.apache.log4j.Logger
回答by florins
You can use Akhil's solution proposed in
https://www.mail-archive.com/[email protected]/msg29010.html.
I have used by myself and it works.
您可以使用
https://www.mail-archive.com/[email protected]/msg29010.html 中提出的 Akhil 解决方案。我自己用过,效果很好。
Akhil Das Mon, 25 May 2015 08:20:40 -0700
Try this way:object Holder extends Serializable { @transient lazy val log = Logger.getLogger(getClass.getName) } val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element => Holder.log.info(element) }
Akhil Das Mon, 25 May 2015 08:20:40 -0700
试试这个:object Holder extends Serializable { @transient lazy val log = Logger.getLogger(getClass.getName) } val someRdd = spark.parallelize(List(1, 2, 3)).foreach { element => Holder.log.info(element) }
回答by Ryan Stack
Use Log4j 2.x. The core logger has been made serializable. Problem solved.
使用 Log4j 2.x。核心记录器已可序列化。问题解决了。
Jira discussion: https://issues.apache.org/jira/browse/LOG4J2-801
Jira 讨论:https: //issues.apache.org/jira/browse/LOG4J2-801
"org.apache.logging.log4j" % "log4j-api" % "2.x.x"
"org.apache.logging.log4j" % "log4j-core" % "2.x.x"
"org.apache.logging.log4j" %% "log4j-api-scala" % "2.x.x"
回答by ragazzojp
If you need some code to be executed before and after a map, filteror other RDDfunction, try to use mapPartition, where the underlying iterator is passed explicitely.
如果你需要一些代码来之前和之后执行map,filter或其他RDD功能,尽量使用mapPartition,在迭代器明确地传递底层。
Example:
例子:
val log = ??? // this gets captured and produced serialization error
rdd.map { x =>
log.info(x)
x+1
}
Becomes:
变成:
rdd.mapPartition { it =>
val log = ??? // this is freshly initialized in worker nodes
it.map { x =>
log.info(x)
x + 1
}
}
Every basic RDDfunction is always implemented with a mapPartition.
每个基本RDD功能总是用一个mapPartition.
Make sure to handle the partitioner explicitly and not to loose it: see Scaladoc, preservesPartitioningparameter, this is critical for performances.
确保明确处理分区器而不是松散它:请参阅 Scaladoc,preservesPartitioning参数,这对性能至关重要。
回答by khushbu kanojia
This is an old post but I want to provide my working solution which I just got after struggling a lot and still can be useful for others:
这是一篇旧帖子,但我想提供我的工作解决方案,这是我在挣扎了很多之后才得到的,但仍然对其他人有用:
I want to print rdd contents inside rdd.map function but getting Task Not Serializalable Error. This is my solution for this problem using scala static object which is extending java.io.Serializable:
我想在 rdd.map 函数中打印 rdd 内容,但得到Task Not Serializalable Error. 这是我使用扩展的 Scala 静态对象解决此问题的解决方案java.io.Serializable:
import org.apache.log4j.Level
object MyClass extends Serializable{
val log = org.apache.log4j.LogManager.getLogger("name of my spark log")
log.setLevel(Level.INFO)
def main(args:Array[String])
{
rdd.map(t=>
//Using object's logger here
val log =MyClass.log
log.INFO("count"+rdd.count)
)
}
}
回答by Karthik
val log = Logger.getLogger(getClass.getName),
You can use "log" to write logs . Also if you need change logger properties you need to have log4j.properties in /conf folder. By default we will have a template in that location.
您可以使用“日志”来写入日志。此外,如果您需要更改记录器属性,您需要在 /conf 文件夹中有 log4j.properties。默认情况下,我们将在该位置有一个模板。
回答by Thamme Gowda
Here is my solution :
这是我的解决方案:
I am using SLF4j (with Log4j binding), in my base class of every spark job I have something like this:
我正在使用 SLF4j(带有 Log4j 绑定),在我的每个火花作业的基类中,我都有这样的东西:
import org.slf4j.LoggerFactory
val LOG = LoggerFactory.getLogger(getClass)
Just before the place where I use LOGin distributed functional code, I copy logger reference to a local constant.
就在我LOG在分布式函数代码中使用的地方之前,我将记录器引用复制到局部常量。
val LOG = this.LOG
It worked for me!
它对我有用!

