Java 如何在 Airflow 中运行 Spark 代码?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/39827804/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to run Spark code in Airflow?
提问by Ruslan Lomov
Hello people of the Earth!
I'm using Airflow to schedule and run Spark tasks.
All I found by this time is python DAGs that Airflow can manage.
DAG example:
地球人你好!我正在使用 Airflow 来安排和运行 Spark 任务。此时我发现的只是 Airflow 可以管理的 python DAG。
DAG 示例:
spark_count_lines.py
import logging
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
args = {
'owner': 'airflow'
, 'start_date': datetime(2016, 4, 17)
, 'provide_context': True
}
dag = DAG(
'spark_count_lines'
, start_date = datetime(2016, 4, 17)
, schedule_interval = '@hourly'
, default_args = args
)
def run_spark(**kwargs):
import pyspark
sc = pyspark.SparkContext()
df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
logging.info('Number of lines in people.txt = {0}'.format(df.count()))
sc.stop()
t_main = PythonOperator(
task_id = 'call_spark'
, dag = dag
, python_callable = run_spark
)
The problem is I'm not good in Python code and have some tasks written in Java. My question is how to run Spark Java jar in python DAG? Or maybe there is other way yo do it? I found spark submit: http://spark.apache.org/docs/latest/submitting-applications.html
But I don't know how to connect everything together. Maybe someone used it before and has working example. Thank you for your time!
问题是我不擅长 Python 代码并且有一些任务是用 Java 编写的。我的问题是如何在 python DAG 中运行 Spark Java jar?或者也许还有其他方法可以做到?我发现 spark submit: http://spark.apache.org/docs/latest/submitting-applications.html
但我不知道如何将所有内容连接在一起。也许有人以前使用过它并且有工作示例。感谢您的时间!
采纳答案by zero323
You should be able to use BashOperator
. Keeping the rest of your code as is, import required class and system packages:
您应该可以使用BashOperator
. 保持其余代码不变,导入所需的类和系统包:
from airflow.operators.bash_operator import BashOperator
import os
import sys
set required paths:
设置所需的路径:
os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
and add operator:
并添加运算符:
spark_task = BashOperator(
task_id='spark_java',
bash_command='spark-submit --class {{ params.class }} {{ params.jar }}',
params={'class': 'MainClassName', 'jar': '/path/to/your.jar'},
dag=dag
)
You can easily extend this to provide additional arguments using Jinja templates.
您可以使用 Jinja 模板轻松扩展它以提供其他参数。
You can of course adjust this for non-Spark scenario by replacing bash_command
with a template suitable in your case, for example:
您当然可以通过替换bash_command
为适合您情况的模板来针对非 Spark 场景进行调整,例如:
bash_command = 'java -jar {{ params.jar }}'
and adjusting params
.
和调整params
。
回答by Tagar
Airflow as of version 1.8 (released today), has
从 1.8 版(今天发布)开始,Airflow 具有
- SparkSqlOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_sql_operator.py;
- SparkSqlOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_sql_operator.py;
SparkSQLHook code - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py
SparkSQLHook 代码 - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py
- SparkSubmitOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_submit_operator.py
- SparkSubmitOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_submit_operator.py
SparkSubmitHook code - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py
SparkSubmitHook 代码 - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py
Notice that these two new Spark operators/hooks are in "contrib" branch as of 1.8 version so not (well) documented.
请注意,这两个新的 Spark 操作符/钩子在 1.8 版本的“contrib”分支中,所以没有(很好)记录。
So you can use SparkSubmitOperator to submit your java code for Spark execution.
因此,您可以使用 SparkSubmitOperator 提交您的 Java 代码以供 Spark 执行。
回答by CTiPKA
There is an example of SparkSubmitOperator
usage for Spark 2.3.1 on kubernetes (minikube instance):
有一个SparkSubmitOperator
在 kubernetes(minikube 实例)上使用 Spark 2.3.1 的例子:
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta
default_args = {
'owner': '[email protected]',
'depends_on_past': False,
'start_date': datetime(2018, 7, 27),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
'end_date': datetime(2018, 7, 29),
}
dag = DAG(
'tutorial_spark_operator', default_args=default_args, schedule_interval=timedelta(1))
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
print_path_env_task = BashOperator(
task_id='print_path_env',
bash_command='echo $PATH',
dag=dag)
spark_submit_task = SparkSubmitOperator(
task_id='spark_submit_job',
conn_id='spark_default',
java_class='com.ibm.cdopoc.DataLoaderDB2COS',
application='local:///opt/spark/examples/jars/cppmpoc-dl-0.1.jar',
total_executor_cores='1',
executor_cores='1',
executor_memory='2g',
num_executors='2',
name='airflowspark-DataLoaderDB2COS',
verbose=True,
driver_memory='1g',
conf={
'spark.DB_URL': 'jdbc:db2://dashdb-dal13.services.dal.bluemix.net:50001/BLUDB:sslConnection=true;',
'spark.DB_USER': Variable.get("CEDP_DB2_WoC_User"),
'spark.DB_PASSWORD': Variable.get("CEDP_DB2_WoC_Password"),
'spark.DB_DRIVER': 'com.ibm.db2.jcc.DB2Driver',
'spark.DB_TABLE': 'MKT_ATBTN.MERGE_STREAM_2000_REST_API',
'spark.COS_API_KEY': Variable.get("COS_API_KEY"),
'spark.COS_SERVICE_ID': Variable.get("COS_SERVICE_ID"),
'spark.COS_ENDPOINT': 's3-api.us-geo.objectstorage.softlayer.net',
'spark.COS_BUCKET': 'data-ingestion-poc',
'spark.COS_OUTPUT_FILENAME': 'cedp-dummy-table-cos2',
'spark.kubernetes.container.image': 'ctipka/spark:spark-docker',
'spark.kubernetes.authenticate.driver.serviceAccountName': 'spark'
},
dag=dag,
)
t1.set_upstream(print_path_env_task)
spark_submit_task.set_upstream(t1)
The code using variables stored in Airflow variables:
Also, you need to create a new spark connection or edit existing 'spark_default' with
extra dictionary {"queue":"root.default", "deploy-mode":"cluster", "spark-home":"", "spark-binary":"spark-submit", "namespace":"default"}
:
此外,您需要创建一个新的 spark 连接或使用额外的字典编辑现有的“spark_default” {"queue":"root.default", "deploy-mode":"cluster", "spark-home":"", "spark-binary":"spark-submit", "namespace":"default"}
: