如何从我的 Python Spark 脚本登录
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/25407550/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How do I log from my Python Spark script
提问by W.P. McNeill
I have a Python Spark program which I run with spark-submit. I want to put logging statements in it.
我有一个 Python Spark 程序,我用spark-submit. 我想在其中放入日志语句。
logging.info("This is an informative message.")
logging.debug("This is a debug message.")
I want to use the same logger that Spark is using so that the log messages come out in the same format and the level is controlled by the same configuration files. How do I do this?
我想使用 Spark 使用的同一个记录器,以便日志消息以相同的格式出现,并且级别由相同的配置文件控制。我该怎么做呢?
I've tried putting the loggingstatements in the code and starting out with a logging.getLogger(). In both cases I see Spark's log messages but not mine. I've been looking at the Python logging documentation, but haven't been able to figure it out from there.
我试过将这些logging语句放在代码中并以logging.getLogger(). 在这两种情况下,我都看到了 Spark 的日志消息,但没有看到我的。我一直在查看Python logging documentation,但无法从那里弄清楚。
Not sure if this is something specific to scripts submitted to Spark or just me not understanding how logging works.
不确定这是否是提交给 Spark 的脚本所特有的,或者只是我不了解日志记录的工作原理。
回答by CasualDemon
You need to get the logger for spark itself, by default getLogger()will return the logger for you own module. Try something like:
您需要获取 spark 本身的记录器,默认情况下getLogger()将为您自己的模块返回记录器。尝试类似:
logger = logging.getLogger('py4j')
logger.info("My test info statement")
It might also be 'pyspark'instead of 'py4j'.
它也可能'pyspark'代替'py4j'.
In case the function that you use in your spark program (and which does some logging) is defined in the same module as the main function it will give some serialization error.
如果您在 spark 程序中使用的函数(并执行一些日志记录)与主函数在同一模块中定义,则会出现一些序列化错误。
This is explained hereand an example by the same person is given here
I also tested this on spark 1.3.1
我也在 spark 1.3.1 上测试了这个
EDIT:
编辑:
To change logging from STDERRto STDOUTyou will have to remove the current StreamHandlerand add a new one.
要将日志记录从 更改为STDERR,STDOUT您必须删除当前记录StreamHandler并添加一个新记录。
Find the existing Stream Handler (This line can be removed when finished)
找到现有的Stream Handler(完成后可以删除此行)
print(logger.handlers)
# will look like [<logging.StreamHandler object at 0x7fd8f4b00208>]
There will probably only be a single one, but if not you will have to update position.
可能只有一个,但如果没有,您将不得不更新位置。
logger.removeHandler(logger.handlers[0])
Add new handler for sys.stdout
添加新的处理程序 sys.stdout
import sys # Put at top if not already there
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.DEBUG)
logger.addHandler(sh)
回答by Alex Q
You can get the logger from the SparkContext object:
您可以从 SparkContext 对象获取记录器:
log4jLogger = sc._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("pyspark script logger initialized")
回答by Pierre D
In my case, I am just happy to get my log messages added to the workers stderr, along with the usual spark log messages.
就我而言,我很高兴将我的日志消息与通常的 spark 日志消息一起添加到工作人员的标准错误中。
If that suits your needs, then the trick is to redirect the particular Python logger to stderr.
如果这适合您的需要,那么诀窍是将特定的 Python 记录器重定向到stderr.
For example, the following, inspired from this answer, works fine for me:
例如,受此答案启发的以下内容对我来说效果很好:
def getlogger(name, level=logging.INFO):
import logging
import sys
logger = logging.getLogger(name)
logger.setLevel(level)
if logger.handlers:
# or else, as I found out, we keep adding handlers and duplicate messages
pass
else:
ch = logging.StreamHandler(sys.stderr)
ch.setLevel(level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
Usage:
用法:
def tst_log():
logger = getlogger('my-worker')
logger.debug('a')
logger.info('b')
logger.warning('c')
logger.error('d')
logger.critical('e')
...
Output (plus a few surrounding lines for context):
输出(加上几行上下文):
17/05/03 03:25:32 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 5.8 KB, free 319.2 MB)
2017-05-03 03:25:32,849 - my-worker - INFO - b
2017-05-03 03:25:32,849 - my-worker - WARNING - c
2017-05-03 03:25:32,849 - my-worker - ERROR - d
2017-05-03 03:25:32,849 - my-worker - CRITICAL - e
17/05/03 03:25:32 INFO PythonRunner: Times: total = 2, boot = -40969, init = 40971, finish = 0
17/05/03 03:25:32 INFO Executor: Finished task 7.0 in stage 20.0 (TID 213). 2109 bytes result sent to driver
回答by Jorge Sierra Carbonell
The key of interacting pyspark and java log4j is the jvm. This below is python code, the conf is missing the url, but this is about logging.
pyspark和java log4j交互的关键是jvm。下面是 python 代码,conf 缺少 url,但这是关于日志记录的。
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
my_jars = os.environ.get("SPARK_HOME")
myconf = SparkConf()
myconf.setMaster("local").setAppName("DB2_Test")
myconf.set("spark.jars","%s/jars/log4j-1.2.17.jar" % my_jars)
spark = SparkSession\
.builder\
.appName("DB2_Test")\
.config(conf = myconf) \
.getOrCreate()
Logger= spark._jvm.org.apache.log4j.Logger
mylogger = Logger.getLogger(__name__)
mylogger.error("some error trace")
mylogger.info("some info trace")
回答by vy32
We needed to log from the executors, not from the driver node. So we did the following:
我们需要从 executors 登录,而不是从 driver 节点登录。所以我们做了以下事情:
We created a
/etc/rsyslog.d/spark.confon all of the nodes (using a Bootstrap method with Amazon Elastic Map Reduceso that the Core nodes forwarded sysloglocal1` messages to the master node.On the Master node, we enabled the UDP and TCP syslog listeners, and we set it up so that all
localmessages got logged to/var/log/local1.log.We created a Python
loggingmodule Syslog logger in our map function.Now we can log with
logging.info(). ...
我们
/etc/rsyslog.d/spark.conf在所有节点上创建了一个(使用 Bootstrap 方法将 Amazon Elastic Map Reduceso that the Core nodes forwarded sysloglocal1` 消息发送到主节点。在主节点上,我们启用了 UDP 和 TCP 系统日志侦听器,并对其进行了设置,以便将所有
local消息记录到/var/log/local1.log.我们
logging在 map 函数中创建了一个 Python模块 Syslog 记录器。现在我们可以使用
logging.info(). ...
One of the things we discovered is that the same partition is being processed simultaneously on multiple executors. Apparently Spark does this all the time, when it has extra resources. This handles the case when an executor is mysteriously delayed or fails.
我们发现的一件事是同一分区在多个执行程序上同时处理。显然,当 Spark 有额外的资源时,它会一直这样做。这可以处理执行程序神秘延迟或失败的情况。
Logging in the mapfunctions has taught us a lot about how Spark works.
登录map函数已经教会了我们很多关于 Spark 是如何工作的。
回答by user12910640
import logging
# Logger
logging.basicConfig(format='%(asctime)s %(filename)s %(funcName)s %(lineno)d %(message)s')
logger = logging.getLogger('driver_logger')
logger.setLevel(logging.DEBUG)
Simplest way to log from pyspark !
从 pyspark 登录的最简单方法!

