pandas 将MongoDB中的数据并行加载到python中
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/44073393/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Parallelizing loading data from MongoDB into python
提问by wl2776
All documents in my collection in MongoDB have the same fields. My goal is to load them into Python into pandas.DataFrame
or dask.DataFrame
.
我在 MongoDB 中的集合中的所有文档都具有相同的字段。我的目标是将它们加载到 Pythonpandas.DataFrame
或dask.DataFrame
.
I'd like to speedup the loading procedure by parallelizing it. My plan is to spawn several processes or threads. Each process would load a chunk of a collection, then these chunks would be merged together.
我想通过并行化来加速加载过程。我的计划是生成多个进程或线程。每个进程将加载一个集合的块,然后这些块将合并在一起。
How do I do it correctly with MongoDB?
如何使用 MongoDB 正确执行此操作?
I have tried similar approach with PostgreSQL. My initial idea was to use SKIP
and LIMIT
in SQL queries. It has failed, since each cursor, opened for each particular query, started reading data table from the beginning and just skipped specified amount of rows. So I had to create additional column, containing record numbers, and specify ranges of these numbers in queries.
我已经尝试过与 PostgreSQL 类似的方法。我最初的想法是在 SQL 查询中使用SKIP
和LIMIT
。它失败了,因为每个游标为每个特定查询打开,从头开始读取数据表,只是跳过指定数量的行。所以我必须创建额外的列,包含记录编号,并在查询中指定这些编号的范围。
On the contrary, MongoDB assigns unique ObjectID to each document. However, I've found that it is impossible to subtract one ObjectID from another, they can be only compared with ordering operations: less, greater and equal.
相反,MongoDB 为每个文档分配唯一的 ObjectID。但是,我发现从另一个 ObjectID 中减去一个 ObjectID 是不可能的,它们只能与排序操作进行比较:less、greater 和 equal。
Also, pymongo
returns the cursor object, that supports indexing operation and has some methods, seeming useful for my task, like count
, limit
.
此外,pymongo
返回游标对象,该对象支持索引操作并具有一些方法,对我的任务似乎很有用,例如count
, limit
。
MongoDB connector for Spark accomplishes this task somehow. Unfortunately, I'm not familiar with Scala, therefore, it's hard for me to find out how they do it.
Spark 的 MongoDB 连接器以某种方式完成了这项任务。不幸的是,我不熟悉 Scala,因此,我很难了解他们是如何做到的。
So, what is the correct way for parallel loading data from Mongo into python?
那么,从 Mongo 并行加载数据到 python 的正确方法是什么?
up to now, I've come to the following solution:
到目前为止,我已经找到了以下解决方案:
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed
# import other modules.
collection = get_mongo_collection()
cursor = collection.find({ })
def process_document(in_doc):
out_doc = # process doc keys and values
return pd.DataFrame(out_doc)
df = dd.from_delayed( (delayed(process_document)(d) for d in cursor) )
However, it looks like dask.dataframe.from_delayed
internally creates a list from passed generator, effectively loading all collection in a single thread.
然而,看起来dask.dataframe.from_delayed
内部从传递的生成器创建一个列表,有效地在单个线程中加载所有集合。
Update. I've found in docs, that skip
method of pymongo.Cursor
starts from beginning of a collection too, as PostgreSQL. The same page suggests using pagination logic in the application. Solutions, that I've found so far, use sorted _id
for this. However, they also store last seen _id
, that implies that they also work in a single thread.
更新。我在 docs 中发现,这种skip
方法也pymongo.Cursor
从集合的开头开始,如 PostgreSQL。同一页面建议在应用程序中使用分页逻辑。到目前为止,我发现的解决方案_id
为此使用 sorted 。但是,它们也存储 last seen _id
,这意味着它们也可以在单个线程中工作。
Update2. I've found the code of the partitioner in the official MongoDb Spark connector: https://github.com/mongodb/mongo-spark/blob/7c76ed1821f70ef2259f8822d812b9c53b6f2b98/src/main/scala/com/mongodb/spark/rdd/partitioner/MongoPaginationPartitioner.scala#L32
更新2。我在官方的 MongoDb Spark 连接器中找到了分区器的代码:https: //github.com/mongodb/mongo-spark/blob/7c76ed1821f70ef2259f8822d812b9c53b6f2b98/src/main/scala/com/mongodb/sparker/rdd/ MongoPaginationPartitioner.scala#L32
Looks like, initially this partitioner reads the key field from all documents in the collection and calculates ranges of values.
看起来,最初这个分区器从集合中的所有文档中读取键字段并计算值的范围。
Update3: My incomplete solution.
Update3:我不完整的解决方案。
Doesn't work, gets the exception from pymongo, because dask seems to incorrectly treat the Collection
object:
不起作用,从 pymongo 获取异常,因为 dask 似乎错误地处理了Collection
对象:
/home/user/.conda/envs/MBA/lib/python2.7/site-packages/dask/delayed.pyc in <genexpr>(***failed resolving arguments***)
81 return expr, {}
82 if isinstance(expr, (Iterator, list, tuple, set)):
---> 83 args, dasks = unzip((to_task_dask(e) for e in expr), 2)
84 args = list(args)
85 dsk = sharedict.merge(*dasks)
/home/user/.conda/envs/MBA/lib/python2.7/site-packages/pymongo/collection.pyc in __next__(self)
2342
2343 def __next__(self):
-> 2344 raise TypeError("'Collection' object is not iterable")
2345
2346 next = __next__
TypeError: 'Collection' object is not iterable
What raises the exception:
引发异常的原因:
def process_document(in_doc, other_arg):
# custom processing of incoming records
return out_doc
def compute_id_ranges(collection, query, partition_size=50):
cur = collection.find(query, {'_id': 1}).sort('_id', pymongo.ASCENDING)
id_ranges = [cur[0]['_id']]
count = 1
for r in cur:
count += 1
if count > partition_size:
id_ranges.append(r['_id'])
count = 0
id_ranges.append(r['_id'])
return zip(id_ranges[:len(id_ranges)-1], id_ranges[1: ])
def load_chunk(id_pair, collection, query={}, projection=None):
q = query
q.update( {"_id": {"$gte": id_pair[0], "$lt": id_pair[1]}} )
cur = collection.find(q, projection)
return pd.DataFrame([process_document(d, other_arg) for d in cur])
def parallel_load(*args, **kwargs):
collection = kwargs['collection']
query = kwargs.get('query', {})
projection = kwargs.get('projection', None)
id_ranges = compute_id_ranges(collection, query)
dfs = [ delayed(load_chunk)(ir, collection, query, projection) for ir in id_ranges ]
df = dd.from_delayed(dfs)
return df
collection = connect_to_mongo_and_return_collection_object(credentials)
# df = parallel_load(collection=collection)
id_ranges = compute_id_ranges(collection)
dedf = delayed(load_chunk)(id_ranges[0], collection)
load_chunk
perfectly runs when called directly. However, call delayed(load_chunk)( blah-blah-blah )
fails with exception, mentioned above.
load_chunk
直接调用时完美运行。但是,调用delayed(load_chunk)( blah-blah-blah )
失败,异常,如上所述。
采纳答案by Ali Abul Hawa
I was looking into pymongo parallelization and this is what worked for me. It took my humble gaming laptop nearly 100 minutes to process my mongodb of 40 million documents. The CPU was 100% utilised I had to turn on the AC :)
我正在研究 pymongo 并行化,这对我有用。我那不起眼的游戏笔记本电脑花了将近 100 分钟来处理我的 4000 万个文档的 mongodb。CPU 的利用率为 100%,我不得不打开 AC :)
I used skip and limit functions to split the database, then assigned batches to processes. The code is written for Python 3:
我使用跳过和限制功能来拆分数据库,然后将批次分配给进程。代码是为 Python 3 编写的:
import multiprocessing
from pymongo import MongoClient
def your_function(something):
<...>
return result
def process_cursor(skip_n,limit_n):
print('Starting process',skip_n//limit_n,'...')
collection = MongoClient().<db_name>.<collection_name>
cursor = collection.find({}).skip(skip_n).limit(limit_n)
for doc in cursor:
<do your magic>
# for example:
result = your_function(doc['your_field'] # do some processing on each document
# update that document by adding the result into a new field
collection.update_one({'_id': doc['_id']}, {'$set': {'<new_field_eg>': result} })
print('Completed process',skip_n//limit_n,'...')
if __name__ == '__main__':
n_cores = 7 # number of splits (logical cores of the CPU-1)
collection_size = 40126904 # your collection size
batch_size = round(collection_size/n_cores+0.5)
skips = range(0, n_cores*batch_size, batch_size)
processes = [ multiprocessing.Process(target=process_cursor, args=(skip_n,batch_size)) for skip_n in skips]
for process in processes:
process.start()
for process in processes:
process.join()
The last split will have a larger limit than the remaining documents, but that won't raise an error
最后一次拆分的限制将大于其余文档,但这不会引发错误
回答by wl2776
"Read the mans, thery're rulez" :)
“阅读男人,这是规则”:)
pymongo.Collection
has method parallel_scan
that returns a list of cursors.
pymongo.Collection
有parallel_scan
返回游标列表的方法。
UPDATE. This function can do the job, if the collection does not change too often, and queries are always the same (my case). One could just store query results in different collections and run parallel scans.
更新。如果集合不经常更改并且查询始终相同(我的情况),则此功能可以完成这项工作。人们可以将查询结果存储在不同的集合中并运行并行扫描。