Python 在 EMR 上运行 pyspark 脚本

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/23302184/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 02:42:13  来源:igfitidea点击:

running pyspark script on EMR

pythonapache-spark

提问by Matt

I currently automate my Apache Spark Pyspark scripts using clusters of EC2s using Sparks preconfigured ./ec2 directory. For automation and scheduling purposes, I would like to use Boto EMR module to send scripts up to the cluster.

我目前使用 Sparks 预配置的 ./ec2 目录使用 EC2 集群自动化我的 Apache Spark Pyspark 脚本。出于自动化和调度目的,我想使用 Boto EMR 模块将脚本发送到集群。

I was able to bootstrap and install Spark on a cluster of EMRs. I am also able to launch a script on EMR by using my localmachine's version of pyspark, and setting master like such:

我能够在 EMR 集群上引导和安装 Spark。我还可以使用本地机器的 pyspark 版本在 EMR 上启动脚本,并像这样设置 master:

$: MASTER=spark://<insert EMR master node of cluster here> ./bin/pyspark <myscriptname.py>

However, this requires me to run that script locally, and thus I am not able to fully leverage Boto's ability to 1) start the cluster 2) add the script steps and 3) stop the cluster. I've found examples using script-runner.sh and emr "step" commands for spark-shell (scala), but I assume there is an easier way to do this with the Python module (pyspark). Thanks so much in advance!

但是,这需要我在本地运行该脚本,因此我无法充分利用 Boto 的能力来 1) 启动集群 2) 添加脚本步骤和 3) 停止集群。我找到了使用 script-runner.sh 和 emr“step”命令用于 spark-shell (scala) 的示例,但我认为使用 Python 模块 (pyspark) 有一种更简单的方法可以做到这一点。非常感谢!

回答by hamed

This might be helpful though it does not use boto.

尽管它不使用 boto,但这可能会有所帮助。

Use aws cli to create the cluster and add steps(spark job) to it.

使用 aws cli 创建集群并向其添加步骤(spark 作业)。

1)Create the cluster:

1)创建集群:

aws emr create-cluster --name "Spark cluster" --ami-version 3.8 --applications Name=Spark --ec2-attributes KeyName=ir --log-uri s3://Path/logs --instance-type m3.xlarge  --instance-count 1 --use-default-roles 

2) add step(spark job). Note that your python script should be stored in master node(in this case it is in /home/hadoop/spark ).

2)添加步骤(火花作业)。请注意,您的 Python 脚本应存储在主节点中(在本例中,它位于 /home/hadoop/spark 中)。

aws emr add-steps --cluster-id j-xxxxxxx --steps Name=Spark,Jar=s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/home/hadoop/spark/bin/spark-submit,--deploy-mode,client,/home/hadoop/spark/myscript.py],ActionOnFailure=CONTINUE

you can also combine two steps into one and create cluster/run job and terminate the cluster.

您还可以将两个步骤合二为一并创建集群/运行作业并终止集群。

Few notes: 1)I have tried multiple ways to read the script from S3 but no Luck :(

几个注意事项:1)我尝试了多种方法从 S3 读取脚本,但没有运气:(

so I ended up copying it using either boto or aws cli to the node. 2) since I was testing that on one node in emr the deploy mode in step is client for client you should change that to cluster.

所以我最终使用 boto 或 aws cli 将它复制到节点。2)由于我正在 emr 中的一个节点上测试,因此步骤中的部署模式是客户端对客户端,您应该将其更改为集群。

回答by user1462351

you need to change the deploy-mode to cluster (instead of client) to access the script from S3.

您需要将部署模式更改为集群(而不是客户端)才能从 S3 访问脚本。

回答by Dmitry Deryabin

Here is a great exampleof how it needs to be configured. Browse to "A quick example" for Python code.

这是一个很好的示例,说明需要如何配置它。浏览到 Python 代码的“快速示例”。

However, in order to make things working in emr-4.7.2, a few tweaks had to be made, so here is a AWS CLI command that worked for me:

但是,为了使 emr-4.7.2 正常工作,必须进行一些调整,所以这里有一个对我有用的 AWS CLI 命令:

aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,s3a://your-source-bucket/code/pythonjob.py,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE

And here is a contents of pythonjob.pyfile:

这是pythonjob.py文件的内容:

from __future__ import print_function
from pyspark import SparkContext
import sys
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: testjob  ", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="MyTestJob")
    dataTextAll = sc.textFile(sys.argv[1])
    dataRDD = dataTextAll.map(lambda x: x.split(",")).map(lambda y: (str(y[0]), float(y[1]))).reduceByKey(lambda a, b: a + b)
    dataRDD.saveAsTextFile(sys.argv[2])
    sc.stop()

It reads the data.csvfile from S3, splits every row, converts first value to string and a second to float, groups by first value and sums the values in the second column, and writes the result back to S3.

data.csv从 S3读取文件,拆分每一行,将第一个值转换为字符串,将第二个值转换为浮点数,按第一个值分组并对第二列中的值求和,然后将结果写回 S3。

A few comments:

几点意见:

  • I've decided to leave spark.yarn.submit.waitAppCompletion=trueso that I can monitor job execution in console.
  • Input and output paths (sys.argv[1]and sys.argv[2]respectively) are passed to the script as part of the job sumbission (Argssection in add-stepscommand).
  • Be awarethat you must use s3a://URI instead of s3n://and s3://for Hadoop 2.7+ when configuring your job.
  • If your cluster is in VPC, you need to create a VPC Endpoint for Amazon S3if you intend to read/write from there in your EMR jobs.
  • 我决定离开,spark.yarn.submit.waitAppCompletion=true以便我可以在控制台中监控作业执行。
  • 输入和输出路径(sys.argv[1]sys.argv[2]分别地)传递给脚本作为作业sumbission(的一部分Args在段add-steps命令)。
  • 请注意,您必须使用s3a://,而不是URIs3n://s3://配置您的作业时的Hadoop 2.7+的。
  • 如果您的集群在 VPC 中,并且您打算在 EMR 作业中从那里读取/写入,则需要为 Amazon S3创建一个VPC 终端节点。