Python 从 PySpark 连接到 S3 数据

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/32155617/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 11:07:00  来源:igfitidea点击:

Connect to S3 data from PySpark

pythonhadoopamazon-s3apache-sparkpyspark

提问by Elon Musk

I am trying to read a JSON file, from Amazon s3, to create a spark context and use it to process the data.

我正在尝试从 Amazon s3 读取 JSON 文件以创建 spark 上下文并使用它来处理数据。

Spark is basically in a docker container. So putting files in docker path is also PITA. Hence pushed it to S3.

Spark 基本上是在一个 docker 容器中。所以把文件放在 docker path 也是 PITA。因此将其推送到 S3。

The code below explains rest of the stuff.

下面的代码解释了其余的内容。

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("first")
sc = SparkContext(conf=conf)

config_dict = {"fs.s3n.awsAccessKeyId":"**",
               "fs.s3n.awsSecretAccessKey":"**"}

bucket = "nonamecpp"
prefix = "dataset.json"
filename = "s3n://{}/{}".format(bucket, prefix)
rdd = sc.hadoopFile(filename,
                    'org.apache.hadoop.mapred.TextInputFormat',
                    'org.apache.hadoop.io.Text',
                    'org.apache.hadoop.io.LongWritable',
                    conf=config_dict)

I get the following error -

我收到以下错误 -

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-b94543fb0e8e> in <module>()
      9                     'org.apache.hadoop.io.Text',
     10                     'org.apache.hadoop.io.LongWritable',
---> 11                     conf=config_dict)
     12 

/usr/local/spark/python/pyspark/context.pyc in hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf, batchSize)
    558         jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
    559                                               valueClass, keyConverter, valueConverter,
--> 560                                               jconf, batchSize)
    561         return RDD(jrdd, self)
    562 

/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.hadoopFile.
: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
    at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:73)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
    at org.apache.hadoop.fs.s3native.$Proxy20.initialize(Unknown Source)
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:272)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
    at org.apache.hadoop.fs.FileSystem.access0(FileSystem.java:89)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:205)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:203)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:205)
    at org.apache.spark.rdd.RDD$$anonfun$partitions.apply(RDD.scala:203)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1060)
    at org.apache.spark.rdd.RDD.first(RDD.scala:1093)
    at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202)
    at org.apache.spark.api.python.PythonRDD$.hadoopFile(PythonRDD.scala:543)
    at org.apache.spark.api.python.PythonRDD.hadoopFile(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:744)

I have clearly provided aswSecretAccessKey and awsAccessId. Whats going wrong?

我已经明确提供了 aswSecretAccessKey 和 awsAccessId。怎么了?

采纳答案by Franzi

I've solved adding --packages org.apache.hadoop:hadoop-aws:2.7.1into spark-submit command.

我已经解决了添加--packages org.apache.hadoop:hadoop-aws:2.7.1到 spark-submit 命令的问题。

It will download all hadoop missing packages that will allow you to execute spark jobs with S3.

它将下载所有 hadoop 丢失的包,这些包将允许您使用 S3 执行 spark 作业。

Then in your job you need to set your AWS credentials like:

然后在您的工作中,您需要设置您的 AWS 凭证,例如:

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", aws_id)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_key)

Other option about setting your credentials is define them into spark/conf/spark-env:

关于设置凭据的其他选项是将它们定义到 spark/conf/spark-env 中:

#!/usr/bin/env bash
AWS_ACCESS_KEY_ID='xxxx'
AWS_SECRET_ACCESS_KEY='xxxx'

SPARK_WORKER_CORES=1 # to set the number of cores to use on this machine
SPARK_WORKER_MEMORY=1g # to set how much total memory workers have to give executors (e.g. 1000m, 2g)
SPARK_EXECUTOR_INSTANCES=10 #, to set the number of worker processes per node

More info:

更多信息:

回答by yardstick17

I would suggest going through this link.

我建议通过这个链接

In my case, I used Instance profile credentialsto access s3 data.

就我而言,我使用实例配置文件凭据来访问 s3 数据。

Instance profile credentials– used on EC2 instances, and delivered through the Amazon EC2 metadata service. The AWS SDK for Java uses the InstanceProfileCredentialsProvider to load these credentials.

Note

Instance profile credentials are used only if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI is not set. See EC2ContainerCredentialsProviderWrapper for more information.

实例配置文件凭证 – 在 EC2 实例上使用,并通过 Amazon EC2 元数据服务交付。适用于 Java 的 AWS 开发工具包使用 InstanceProfileCredentialsProvider 加载这些凭证。

笔记

仅当未设置 AWS_CONTAINER_CREDENTIALS_RELATIVE_URI 时才使用实例配置文件凭证。有关更多信息,请参阅 EC2ContainerCredentialsProviderWrapper。

For pyspark, I use setting to access s3 content.

对于 pyspark,我使用设置来访问 s3 内容。

def get_spark_context(app_name):
    # configure
    conf = pyspark.SparkConf()

    # init & return
    sc = pyspark.SparkContext.getOrCreate(conf=conf)

    # s3a config
    sc._jsc.hadoopConfiguration().set('fs.s3a.endpoint',
                                      's3.eu-central-1.amazonaws.com')
    sc._jsc.hadoopConfiguration().set(
        'fs.s3a.aws.credentials.provider',
        'com.amazonaws.auth.InstanceProfileCredentialsProvider,'
        'com.amazonaws.auth.profile.ProfileCredentialsProvider'
    )

    return pyspark.SQLContext(sparkContext=sc)

More on spark context here.

更多关于火花上下文在这里

Please refer thisfor type S3 access.

请参考类型S3访问。