Python 为气流中的日志设置 s3
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/44780736/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
setting up s3 for logs in airflow
提问by HymanStat
I am using docker-compose to set up a scalable airflow cluster. I based my approach off of this Dockerfile https://hub.docker.com/r/puckel/docker-airflow/
我正在使用 docker-compose 来设置可扩展的气流集群。我的方法基于这个 Dockerfile https://hub.docker.com/r/puckle/docker-airflow/
My problem is getting the logs set up to write/read from s3. When a dag has completed I get an error like this
我的问题是将日志设置为从 s3 写入/读取。当 dag 完成时,我收到这样的错误
*** Log file isn't local.
*** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00
*** Failed to fetch log file from worker.
*** Reading remote logs...
Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06-
26T11:00:00
I set up a new section in the airflow.cfg
file like this
我在airflow.cfg
文件中设置了一个新部分,如下所示
[MyS3Conn]
aws_access_key_id = xxxxxxx
aws_secret_access_key = xxxxxxx
aws_default_region = xxxxxxx
And then specified the s3 path in the remote logs section in airflow.cfg
然后在远程日志部分中指定了 s3 路径 airflow.cfg
remote_base_log_folder = s3://buckets/xxxx/airflow/logs
remote_log_conn_id = MyS3Conn
Did I set this up properly and there is a bug? Is there a recipe for success here that I am missing?
我是否正确设置了它并且有错误?这里有我缺少的成功秘诀吗?
-- Update
- 更新
I tried exporting in URI and JSON formats and neither seemed to work. I then exported the aws_access_key_id and aws_secret_access_key and then airflow started picking it up. Now I get his error in the worker logs
我尝试以 URI 和 JSON 格式导出,但似乎都不起作用。然后我导出了 aws_access_key_id 和 aws_secret_access_key,然后气流开始接收它。现在我在工作日志中看到了他的错误
6/30/2017 6:05:59 PMINFO:root:Using connection to: s3
6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00
6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00
-- Update
- 更新
I found this link as well https://www.mail-archive.com/[email protected]/msg00462.html
我也找到了这个链接 https://www.mail-archive.com/[email protected]/msg00462.html
I then shelled into one of my worker machines (separate from the webserver and scheduler) and ran this bit of code in python
然后我进入我的一台工作机器(与网络服务器和调度程序分开)并在 python 中运行这段代码
import airflow
s3 = airflow.hooks.S3Hook('s3_conn')
s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder'))
I receive this error.
我收到此错误。
boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden
I tried exporting several different types of AIRFLOW_CONN_
envs as explained here in the connections section https://airflow.incubator.apache.org/concepts.htmland by other answers to this question.
我尝试导出几种不同类型的AIRFLOW_CONN_
环境,如连接部分https://airflow.incubator.apache.org/concepts.html和此问题的其他答案中所述。
s3://<AWS_ACCESS_KEY_ID>:<AWS_SECRET_ACCESS_KEY>@S3
{"aws_account_id":"<xxxxx>","role_arn":"arn:aws:iam::<xxxx>:role/<xxxxx>"}
{"aws_access_key_id":"<xxxxx>","aws_secret_access_key":"<xxxxx>"}
I have also exported AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY with no success.
我还导出了 AWS_ACCESS_KEY_ID 和 AWS_SECRET_ACCESS_KEY ,但没有成功。
These credentials are being stored in a database so once I add them in the UI they should be picked up by the workers but they are not able to write/read logs for some reason.
这些凭据存储在数据库中,因此一旦我将它们添加到 UI 中,工作人员应该会选择它们,但由于某种原因,他们无法写入/读取日志。
采纳答案by Him
You need to set up the s3 connection through airflow UI. For this, you need to go to the Admin -> Connections tab on airflow UI and create a new row for your S3 connection.
您需要通过气流 UI 设置 s3 连接。为此,您需要转到气流 UI 上的“管理”->“连接”选项卡,并为您的 S3 连接创建一个新行。
An example configuration would be:
一个示例配置是:
Conn Id: my_conn_S3
Conn Type: S3
Extra: {"aws_access_key_id":"your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}
连接 ID:my_conn_S3
连接类型:S3
额外:{"aws_access_key_id":"your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}
回答by Arne Huang
UPDATE Airflow 1.10 makes logging a lot easier.
UPDATE Airflow 1.10 使日志记录更容易。
For s3 logging, set up the connection hook as per the above answer
对于 s3 日志记录,按照上面的答案设置连接钩子
and then simply add the following to airflow.cfg
然后简单地将以下内容添加到airflow.cfg
[core]
# Airflow can store logs remotely in AWS S3. Users must supply a remote
# location URL (starting with either 's3://...') and an Airflow connection
# id that provides access to the storage location.
remote_base_log_folder = s3://my-bucket/path/to/logs
remote_log_conn_id = MyS3Conn
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
For gcs logging,
对于 gcs 日志记录,
Install the gcp_api package first, like so: pip install apache-airflow[gcp_api].
Set up the connection hook as per the above answer
Add the following to airflow.cfg
[core] # Airflow can store logs remotely in AWS S3. Users must supply a remote # location URL (starting with either 's3://...') and an Airflow connection # id that provides access to the storage location. remote_logging = True remote_base_log_folder = gs://my-bucket/path/to/logs remote_log_conn_id = MyGCSConn
首先安装 gcp_api 包,如下所示:pip install apache-airflow[gcp_api]。
按照上面的答案设置连接钩子
将以下内容添加到airflow.cfg
[core] # Airflow can store logs remotely in AWS S3. Users must supply a remote # location URL (starting with either 's3://...') and an Airflow connection # id that provides access to the storage location. remote_logging = True remote_base_log_folder = gs://my-bucket/path/to/logs remote_log_conn_id = MyGCSConn
NOTE: As of Airflow 1.9 remote logging has been significantly altered. If you are using 1.9, read on.
注意:自 Airflow 1.9 起,远程日志记录已显着改变。如果您使用的是 1.9,请继续阅读。
Reference here
参考这里
Complete Instructions:
完整说明:
Create a directory to store configs and place this so that it can be found in PYTHONPATH. One example is $AIRFLOW_HOME/config
Create empty files called $AIRFLOW_HOME/config/log_config.py and $AIRFLOW_HOME/config/__init__.py
Copy the contents of airflow/config_templates/airflow_local_settings.pyinto the log_config.py file that was just created in the step above.
Customize the following portions of the template:
#Add this variable to the top of the file. Note the trailing slash. S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/' Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG LOGGING_CONFIG = ... Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable 's3.task': { 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', 'formatter': 'airflow.task', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 's3_log_folder': S3_LOG_FOLDER, 'filename_template': FILENAME_TEMPLATE, }, Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': ['s3.task'], ... }, 'airflow.task_runner': { 'handlers': ['s3.task'], ... }, 'airflow': { 'handlers': ['console'], ... }, }
Make sure a s3 connection hook has been defined in Airflow, as per the above answer. The hook should have read and write access to the s3 bucket defined above in S3_LOG_FOLDER.
Update $AIRFLOW_HOME/airflow.cfg to contain:
task_log_reader = s3.task logging_config_class = log_config.LOGGING_CONFIG remote_log_conn_id = <name of the s3 platform hook>
Restart the Airflow webserver and scheduler, and trigger (or wait for) a new task execution.
Verify that logs are showing up for newly executed tasks in the bucket you've defined.
Verify that the s3 storage viewer is working in the UI. Pull up a newly executed task, and verify that you see something like:
*** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log. [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532 [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py'] [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
创建一个目录来存储配置并将其放置在 PYTHONPATH 中。一个例子是 $AIRFLOW_HOME/config
创建名为 $AIRFLOW_HOME/config/log_config.py 和 $AIRFLOW_HOME/config/__init__.py 的空文件
将airflow/config_templates/airflow_local_settings.py的内容复制到上面步骤中刚刚创建的log_config.py 文件中。
自定义模板的以下部分:
#Add this variable to the top of the file. Note the trailing slash. S3_LOG_FOLDER = 's3://<bucket where logs should be persisted>/' Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG LOGGING_CONFIG = ... Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable 's3.task': { 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', 'formatter': 'airflow.task', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 's3_log_folder': S3_LOG_FOLDER, 'filename_template': FILENAME_TEMPLATE, }, Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': ['s3.task'], ... }, 'airflow.task_runner': { 'handlers': ['s3.task'], ... }, 'airflow': { 'handlers': ['console'], ... }, }
根据上面的答案,确保在 Airflow 中定义了 s3 连接挂钩。该钩子应该对上面在 S3_LOG_FOLDER 中定义的 s3 存储桶具有读写访问权限。
更新 $AIRFLOW_HOME/airflow.cfg 以包含:
task_log_reader = s3.task logging_config_class = log_config.LOGGING_CONFIG remote_log_conn_id = <name of the s3 platform hook>
重新启动 Airflow 网络服务器和调度程序,并触发(或等待)新的任务执行。
验证是否为您定义的存储桶中新执行的任务显示日志。
验证 s3 存储查看器是否在 UI 中工作。拉起一个新执行的任务,并验证您是否看到如下内容:
*** Reading remote log from gs://<bucket where logs should be persisted>/example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log. [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532 [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py'] [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py
回答by Niels Joaquin
(Updated as of Airflow 1.10.2)
(从 Airflow 1.10.2 更新)
Here's a solution if you don't use the admin UI.
如果您不使用管理 UI,这里有一个解决方案。
My Airflow doesn't run on a persistent server ... (It gets launched afresh every day in a Docker container, on Heroku.) I know I'm missing out on a lot of great features, but in my minimal setup, I never touch the admin UI or the cfg file.Instead, I have to set Airflow-specific environment variables in a bash script, which overrides the .cfg file.
我的 Airflow 不在持久服务器上运行......(它每天在 Heroku 上的 Docker 容器中重新启动。)我知道我错过了很多很棒的功能,但在我的最小设置中,我永远不要触摸管理 UI 或 cfg 文件。相反,我必须在覆盖 .cfg 文件的 bash 脚本中设置 Airflow 特定的环境变量。
apache-airflow[s3]
阿帕奇气流[s3]
First of all, you need the s3
subpackage installed to write your Airflow logs to S3. (boto3
works fine for the Python jobs within your DAGs, but the S3Hook
depends on the s3 subpackage.)
首先,您需要s3
安装子包以将 Airflow 日志写入 S3。(boto3
适用于 DAG 中的 Python 作业,但S3Hook
取决于 s3 子包。)
One more side note: conda install doesn't handle this yet, so I have to do pip install apache-airflow[s3]
.
另一边注: conda install还没有处理这个,所以我必须做pip install apache-airflow[s3]
.
Environment variables
环境变量
In a bash script, I set these core
variables. Starting from these instructionsbut using the naming convention AIRFLOW__{SECTION}__{KEY}
for environment variables, I do:
在 bash 脚本中,我设置了这些core
变量。从这些说明开始,但使用AIRFLOW__{SECTION}__{KEY}
环境变量的命名约定,我这样做:
export AIRFLOW__CORE__REMOTE_LOGGING=True
export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key
export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri
export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False
S3 connection ID
S3 连接 ID
The s3_uri
above is a connection ID that I made up. In Airflow, it corresponds to another environment variable, AIRFLOW_CONN_S3_URI
. The value of that is your S3 path, which has to be in URI form. That's
的s3_uri
上面是一个连接ID,我组成。在 Airflow 中,它对应于另一个环境变量AIRFLOW_CONN_S3_URI
. 它的值是您的 S3 路径,它必须采用 URI 形式。那是
s3://access_key:secret_key@bucket/key
Store this however you handle other sensitive environment variables.
存储它但是你处理其他敏感的环境变量。
With this configuration, Airflow will be able to write your logs to S3. They will follow the path of s3://bucket/key/dag/task_id/timestamp/1.log
.
通过此配置,Airflow 将能够将您的日志写入 S3。他们将遵循s3://bucket/key/dag/task_id/timestamp/1.log
.
Appendix on upgrading from Airflow 1.8 to Airflow 1.10
从 Airflow 1.8 升级到 Airflow 1.10 的附录
I recently upgraded my production pipeline from Airflow 1.8 to 1.9, and then 1.10. Good news is that the changes are pretty tiny; the rest of the work was just figuring out nuances with the package installations (unrelated to the original question about S3 logs).
我最近将我的生产管道从 Airflow 1.8 升级到 1.9,然后是 1.10。好消息是变化很小。其余的工作只是找出软件包安装的细微差别(与有关 S3 日志的原始问题无关)。
(1) First of all, I needed to upgrade to Python 3.6 with Airflow 1.9.
(1) 首先,我需要使用 Airflow 1.9 升级到 Python 3.6。
(2) The package name changed from airflow
to apache-airflow
with 1.9. You also might run into thisin your pip install
.
(2)包名称从改变airflow
到apache-airflow
1.9。您也可能会遇到这在你的pip install
。
(3) The package psutil
has to be in a specific version range for Airflow. You might encounter this when you're doing pip install apache-airflow
.
(3) 包psutil
必须在 Airflow 的特定版本范围内。在执行pip install apache-airflow
.
(4) python3-dev headers are needed with Airflow 1.9+.
(4) Airflow 1.9+ 需要 python3-dev 头文件。
(5) Here are the substantive changes: export AIRFLOW__CORE__REMOTE_LOGGING=True
is now required. And
(5) 这里是实质性的变化:export AIRFLOW__CORE__REMOTE_LOGGING=True
现在需要。和
(6) The logs have a slightly different path in S3, which I updated in the answer: s3://bucket/key/dag/task_id/timestamp/1.log
.
(6) 日志在 S3 中的路径略有不同,我在答案中更新了该路径:s3://bucket/key/dag/task_id/timestamp/1.log
.
But that's it! The logs did not work in 1.9, so I recommend just going straight to 1.10, now that it's available.
但就是这样!日志在 1.9 中不起作用,所以我建议直接进入 1.10,现在它可用。
回答by Paul Leclercq
To complete Arne's answer with the recent Airflow updates, you do not need to set task_log_reader
to another value than the default one : task
要使用最近的 Airflow 更新完成 Arne 的回答,您不需要设置task_log_reader
为默认值以外的其他值:task
As if you follow the default logging template airflow/config_templates/airflow_local_settings.pyyou can see since this commit(note the handler's name changed to's3': {'task'...
instead of s3.task
) that's the value on the remote folder(REMOTE_BASE_LOG_FOLDER
) will replace the handler with the right one:
就像你遵循默认的日志模板airflow/config_templates/airflow_local_settings.py一样,你可以看到,因为这个提交(注意处理程序的名称改为's3': {'task'...
而不是s3.task
),这是远程文件夹(REMOTE_BASE_LOG_FOLDER
)上的值将用正确的值替换处理程序:
REMOTE_LOGGING = conf.get('core', 'remote_logging')
if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])
More details on how to log to/read from S3 : https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3
有关如何登录/读取 S3 的更多详细信息:https: //github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3
回答by diogoa
Just a side note to anyone following the very useful instructions in the above answer: If you stumble upon this issue: "ModuleNotFoundError: No module named 'airflow.utils.log.logging_mixin.RedirectStdHandler'" as referenced here(which happens when using airflow 1.9), the fix is simple - use rather this base template: https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py(and follow all other instructions in the above answer)
只是对遵循上述答案中非常有用的说明的任何人的旁注:如果您偶然发现此问题:“ModuleNotFoundError:No module named 'airflow.utils.log.logging_mixin.RedirectStdHandler'”如此处引用(使用气流时会发生这种情况1.9),修复很简单 - 使用这个基本模板:https: //github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py(并遵循中的所有其他说明以上答案)
The current template incubator-airflow/airflow/config_templates/airflow_local_settings.pypresent in master branch contains a reference to the class "airflow.utils.log.s3_task_handler.S3TaskHandler", which is not present in apache-airflow==1.9.0 python package. Hope this helps!
当前存在于 master 分支中的模板incubator-airflow/airflow/config_templates/airflow_local_settings.py包含对类“airflow.utils.log.s3_task_handler.S3TaskHandler”的引用,该类在 apache-airflow==1.9.0 python 中不存在包裹。希望这可以帮助!
回答by khanna
Phew! Motivation to keep nipping the airflow bugs in the bud is to confront this as a bunch of python files XD here's my experience on this with apache-airflow==1.9.0
呼!继续将气流错误扼杀在萌芽状态的动机是将其作为一堆 python 文件来面对 XD 这是我在 apache-airflow == 1.9.0 上的经验
First of all, there's simply no need trying
airflow connections ..........
--conn_extra etc, etc.
首先,根本没有必要尝试
airflow connections ..........
--conn_extra 等等。
Just set your airflow.cfg as :
只需将您的airflow.cfg 设置为:
remote_logging = True
remote_base_log_folder = s3://dev-s3-main-ew2-dmg-immutable-potns/logs/airflow-logs/
encrypt_s3_logs = False
# Logging level
logging_level = INFO
fab_logging_level = WARN
# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class = log_config.LOGGING_CONFIG
remote_log_conn_id = s3://<ACCESS-KEY>:<SECRET-ID>@<MY-S3-BUCKET>/<MY>/<SUB>/<FOLDER>/
keep the $AIRFLOW_HOME/config/__ init __.py and $AIRFLOW_HOME/config/log_config.py file as above.
保持 $AIRFLOW_HOME/config/__ init __.py 和 $AIRFLOW_HOME/config/log_config.py 文件如上。
The problem with me as a missing "boto3" package, which I could get to by :
我的问题是缺少“boto3”包,我可以通过以下方式解决:
vi /usr/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py then >> import traceback and in the line containing :
vi /usr/lib/python3.6/site-packages/airflow/utils/log/s3_task_handler.py 然后>>导入回溯并在包含以下内容的行中:
Could not create an S3Hook with connection id "%s". ' 'Please make sure that airflow[s3] is installed and ' 'the S3 connection exists.
无法创建连接 ID 为“%s”的 S3Hook。' '请确保安装了气流 [s3] 并且 ' 'S3 连接存在。
doing a traceback.print_exc() and well it started cribbing about missing boto3 !
做一个 traceback.print_exc() 并且它开始抱怨缺少 boto3 !
Installed it and Life was beautiful back again!
安装它,生活又回来了!
回答by Bertrand Paquet
Have it working with Airflow 10 in kube. I have the following env var sets:
让它与 kube 中的 Airflow 10 一起使用。我有以下环境变量集:
AIRFLOW_CONN_LOGS_S3=s3://id:secret_uri_encoded@S3
AIRFLOW__CORE__REMOTE_LOGGING=True
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://xxxx/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=logs_s3