如何在python中使用pyarrow从S3读取分区的镶木地板文件

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/45082832/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 16:40:55  来源:igfitidea点击:

How to read partitioned parquet files from S3 using pyarrow in python

pythonparquetfastparquetarrow-pythonpython-s3fs

提问by stormfield

I looking for ways to read data from multiple partitioned directories from s3 using python.

我正在寻找使用 python 从 s3 的多个分区目录中读取数据的方法。

data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet data_folder/serial_number=2/cur_date=27-12-2012/asdsdfsd0324324.snappy.parquet

data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet data_folder/serial_number=2/cur_date=27-12-2012/asdsdfsd0324324.snappy.parquet

pyarrow's ParquetDataset module has the capabilty to read from partitions. So I have tried the following code :

pyarrow 的 ParquetDataset 模块具有从分区读取的能力。所以我尝试了以下代码:

>>> import pandas as pd
>>> import pyarrow.parquet as pq
>>> import s3fs
>>> a = "s3://my_bucker/path/to/data_folder/"
>>> dataset = pq.ParquetDataset(a)

It threw the following error :

它引发了以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
    self.metadata_path) = _make_manifest(path_or_paths, self.fs)
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 601, in _make_manifest
    .format(path))
OSError: Passed non-file path: s3://my_bucker/path/to/data_folder/

Based on documentation of pyarrow I tried using s3fs as the file system, ie :

根据 pyarrow 的文档,我尝试使用 s3fs 作为文件系统,即:

>>> dataset = pq.ParquetDataset(a,filesystem=s3fs)

Which throws the following error :

这会引发以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
    self.metadata_path) = _make_manifest(path_or_paths, self.fs)
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 583, in _make_manifest
    if is_string(path_or_paths) and fs.isdir(path_or_paths):
AttributeError: module 's3fs' has no attribute 'isdir'

I am limited to use a ECS cluster, hence spark/pyspark is not an option.

我只能使用 ECS 集群,因此spark/pyspark 不是一个选项

Is there a way we can easily read the parquet files easily, in python from such partitioned directories in s3 ? I feel that listing the all the directories and then reading the is not a good practise as suggested in this link. I would need to convert the read data to a pandas dataframe for further processing & hence prefer options related to fastparquet or pyarrow. I am open to other options in python as well.

有没有办法我们可以轻松地从 s3 中的此类分区目录在 python 中轻松读取镶木地板文件?我觉得列出所有目录,然后阅读此链接中的建议并不是一个好习惯。我需要将读取的数据转换为熊猫数据帧以进行进一步处理,因此更喜欢与 fastparquet 或 pyarrow 相关的选项。我也对 python 中的其他选项持开放态度。

采纳答案by stormfield

I managed to get this working with the latest release of fastparquet & s3fs. Below is the code for the same:

我设法使用最新版本的 fastparquet 和 s3fs 使其工作。下面是相同的代码:

import s3fs
import fastparquet as fp
s3 = s3fs.S3FileSystem()
fs = s3fs.core.S3FileSystem()

#mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet 
s3_path = "mybucket/data_folder/*/*/*.parquet"
all_paths_from_s3 = fs.glob(path=s3_path)

myopen = s3.open
#use s3fs as the filesystem
fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
#convert to pandas dataframe
df = fp_obj.to_pandas()

credits to martin for pointing me in the right direction via our conversation

感谢马丁通过我们的谈话为我指明了正确的方向

NB: This would be slower than using pyarrow, based on the benchmark. I will update my answer once s3fs support is implemented in pyarrow via ARROW-1213

注意:根据基准,这会比使用 pyarrow 慢。一旦通过ARROW-1213在 pyarrow 中实现 s3fs 支持,我将更新我的答案

I did quick benchmark on on indivdual iterations with pyarrow & list of files send as a glob to fastparquet. fastparquet is faster with s3fs vs pyarrow + my hackish code. But I reckon pyarrow +s3fs will be faster once implemented.

我使用pyarrow和作为glob发送到fastparquet的文件列表对单个迭代进行了快速基准测试。fastparquet 使用 s3fs 比 pyarrow + 我的黑客代码更快。但我认为 pyarrow +s3fs 一旦实施会更快。

The code & benchmarks are below :

代码和基准如下:

>>> def test_pq():
...     for current_file in list_parquet_files:
...         f = fs.open(current_file)
...         df = pq.read_table(f).to_pandas()
...         # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe
...         #probably not the best way to split :)
...         elements_list=current_file.split('/')
...         for item in elements_list:
...             if item.find(date_partition) != -1:
...                 current_date = item.split('=')[1]
...             elif item.find(dma_partition) != -1:
...                 current_dma = item.split('=')[1]
...         df['serial_number'] = current_dma
...         df['cur_date'] = current_date
...         list_.append(df)
...     frame = pd.concat(list_)
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
12.078817503992468

>>> def test_fp():
...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
...     df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.961556333000317

Update 2019

2019 年更新

After all PRs, Issues such as Arrow-2038& Fast Parquet - PR#182have been resolved.

在所有 PR 之后,诸如Arrow-2038Fast Parquet - PR#182 之类的问题已经解决。

Read parquet files using Pyarrow

使用 Pyarrow 读取镶木地板文件

# pip install pyarrow
# pip install s3fs

>>> import s3fs
>>> import pyarrow.parquet as pq
>>> fs = s3fs.S3FileSystem()

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name' #if its a directory omit the traling /
>>> bucket_uri = f's3://{bucket}/{path}'
's3://your-bucket-name/directory_name'

>>> dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
>>> table = dataset.read()
>>> df = table.to_pandas() 

Read parquet files using Fast parquet

使用 Fast parquet 读取 parquet 文件

# pip install s3fs
# pip install fastparquet

>>> import s3fs
>>> import fastparquet as fp

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name'
>>> root_dir_path = f'{bucket}/{path}'
# the first two wild card represents the 1st,2nd column partitions columns of your data & so forth
>>> s3_path = f"{root_dir_path}/*/*/*.parquet"
>>> all_paths_from_s3 = fs.glob(path=s3_path)

>>> fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
>>> df = fp_obj.to_pandas()

Quick benchmarks

快速基准

This is probably not the best way to benchmark it. please read the blog postfor a through benchmark

这可能不是对其进行基准测试的最佳方式。请阅读博客文章以获取通过基准

#pyarrow
>>> import timeit
>>> def test_pq():
...     dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
...     table = dataset.read()
...     df = table.to_pandas()
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
1.2677053569998407

#fastparquet
>>> def test_fp():
...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
...     df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.931876824000028

Further reading regarding Pyarrow's speed

进一步阅读关于 Pyarrow 的速度

Reference :

参考 :

回答by Wes McKinney

Let's discuss in https://issues.apache.org/jira/browse/ARROW-1213and https://issues.apache.org/jira/browse/ARROW-1119. We must add some code to allow pyarrow to recognize the s3fs filesystem and add a shim / compatibility class to conform S3FS's slightly different filesystem API to pyarrow's.

让我们在https://issues.apache.org/jira/browse/ARROW-1213https://issues.apache.org/jira/browse/ARROW-1119 中讨论。我们必须添加一些代码以允许 pyarrow 识别 s3fs 文件系统并添加一个 shim / 兼容性类以使 S3FS 的文件系统 API 与 pyarrow 的略有不同。

回答by efbbrown

This issue was resolved in this pull requestin 2017.

此问题已在 2017年的此拉取请求中解决。

For those who want to read parquet from S3 using only pyarrow, here is an example:

对于那些只想使用 pyarrow 从 S3 读取 parquet 的人,这里有一个例子:

import s3fs
import pyarrow.parquet as pq
from pyarrow.filesystem import S3FSWrapper

fs = s3fs.S3FileSystem()
bucket = "your-bucket"
path = "your-path"

# Python 3.6 or later
p_dataset = pq.ParquetDataset(
    f"s3://{bucket}/{path}",
    filesystem=fs
)
df = p_dataset.read().to_pandas()

# Pre-python 3.6
p_dataset = pq.ParquetDataset(
    "s3://{0}/{1}".format(bucket, path),
    filesystem=fs
)
df = p_dataset.read().to_pandas()

回答by Statmonger

For those of you who want to read in only partsof a partitioned parquet file, pyarrow accepts a list of keys as well as just the partial directory path to read in all parts of the partition. This method is especially useful for organizations who have partitioned their parquet datasets in a meaningful like for example by year or country allowing users to specify which parts of the file they need. This will reduce costs in the long run as AWS charges per byte when reading in datasets.

对于那些你们谁想要为只读部分分区拼花文件,pyarrow接受键的列表,以及只是部分的目录路径在分区的所有部分阅读。这种方法对于将其镶木地板数据集按有意义的方式(例如按年份或国家/地区)进行分区的组织特别有用,允许用户指定他们需要文件的哪些部分。从长远来看,这将降低成本,因为 AWS 在读取数据集时按字节收费。

# Read in user specified partitions of a partitioned parquet file 

import s3fs
import pyarrow.parquet as pq
s3 = s3fs.S3FileSystem()

keys = ['keyname/blah_blah/part-00000-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
         ,'keyname/blah_blah/part-00001-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
         ,'keyname/blah_blah/part-00002-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\
         ,'keyname/blah_blah/part-00003-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet']

bucket = 'bucket_yada_yada_yada'

# Add s3 prefix and bucket name to all keys in list
parq_list=[]
for key in keys:
    parq_list.append('s3://'+bucket+'/'+key)

# Create your dataframe
df = pq.ParquetDataset(parq_list, filesystem=s3).read_pandas(columns=['Var1','Var2','Var3']).to_pandas()

回答by Vincent Claes

For python 3.6+ AWS has a library called aws-data-wrangler that helps with the integration between Pandas/S3/Parquet

对于 python 3.6+,AWS 有一个名为 aws-data-wrangler 的库,它有助于 Pandas/S3/Parquet 之间的集成

to install do;

安装做;

pip install awswrangler

to read partitioned parquet from s3 using awswrangler 1.x.xand above, do;

要使用 awswrangler1.x.x及更高版本从 s3 读取分区镶木地板,请执行;

import awswrangler as wr
df = wr.s3.read_parquet(path="s3://my_bucket/path/to/data_folder/", dataset=True)

By setting dataset=Trueawswrangler expects partitioned parquet files. It will read all the individual parquet files from your different partitions below the s3 key you specify in the path.

通过设置dataset=Trueawswrangler 期望分区的镶木地板文件。它将从您在path.