在没有 elasticsearch-py 的情况下将 Pandas 数据帧索引到 Elasticsearch

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41440791/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-14 02:42:22  来源:igfitidea点击:

Index a pandas dataframe into Elasticsearch without elasticsearch-py

pythonpandaselasticsearch

提问by Dirk

I would like to index a bunch of large pandas dataframes (some million rows and 50 columns) into Elasticsearch.

我想将一堆大Pandas数据帧(几百万行和 50 列)索引到 Elasticsearch 中。

When looking for examples on how to do this, most people will use elasticsearch-py's bulk helper method, passing it an instance of the Elasticsearch classwhich handles the connection as well as a list of dictionaries which is created with pandas' dataframe.to_dict(orient='records') method. Metadata can be inserted into the dataframe beforehand as new columns, e.g. df['_index'] = 'my_index'etc.

在寻找有关如何执行此操作的示例时,大多数人会使用elasticsearch-py 的批量辅助方法,将处理连接的 Elasticsearch 类的实例以及使用 pandas 的 dataframe.to_dict( orient='records') 方法。元数据可以作为新列预先插入到数据框中,例如df['_index'] = 'my_index'等。

However, I have reasons not to use the elasticsearch-py library and would like to talk to the Elasticsearch bulk APIdirectly, e.g. via requestsor another convenient HTTP library. Besides, df.to_dict()is very slow on large dataframes, unfortunately, and converting a dataframe to a list of dicts which is then serialized to JSON by elasticsearch-py sounds like unnecessary overhead when there is something like dataframe.to_json()which is pretty fast even on large dataframes.

但是,我有理由不使用 elasticsearch-py 库,并且想直接与Elasticsearch 批量 API对话,例如通过请求或其他方便的 HTTP 库。此外,df.to_dict()不幸的是,在大型数据帧上非常慢,并且将数据帧转换为字典列表,然后由 elasticsearch-py 序列化为 JSON 听起来像是不必要的开销,当有像dataframe.to_json()这样的东西时,它甚至在大型数据帧。

What would be an easy and quick approach of getting a pandas dataframe into the format required by the bulk API? I think a step in the right direction is using dataframe.to_json()as follows:

将 Pandas 数据帧转换为批量 API 所需格式的简单快捷的方法是什么?我认为朝着正确方向迈出的一步是使用dataframe.to_json()如下:

import pandas as pd
df = pd.DataFrame.from_records([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}, {'a': 5, 'b': 6}])
df
   a  b
0  1  2
1  3  4
2  5  6
df.to_json(orient='records', lines=True)
'{"a":1,"b":2}\n{"a":3,"b":4}\n{"a":5,"b":6}'

This is now a newline-separated JSON string, however, it is still lacking the metadata. What would be a performing way to get it in there?

这现在是一个换行符分隔的 JSON 字符串,但是,它仍然缺少元数据。让它进入那里的表演方式是什么?

edit:For completeness, a metadata JSON document would look like that:

编辑:为了完整起见,元数据 JSON 文档如下所示:

{"index": {"_index": "my_index", "_type": "my_type"}}

Hence, in the end the whole JSON expected by the bulk API would look like this (with an additional linebreak after the last line):

因此,最终,批量 API 预期的整个 JSON 将如下所示(在最后一行之后有一个额外的换行符):

{"index": {"_index": "my_index", "_type": "my_type"}}
{"a":1,"b":2}
{"index": {"_index": "my_index", "_type": "my_type"}}
{"a":3,"b":4}
{"index": {"_index": "my_index", "_type": "my_type"}}
{"a":5,"b":6}

采纳答案by Dirk

Meanwhile I found out multiple possibilities how to do that with at least reasonable speed:

同时,我发现了如何以至少合理的速度做到这一点的多种可能性:

import json
import pandas as pd
import requests

# df is a dataframe or dataframe chunk coming from your reading logic
df['_id'] = df['column_1'] + '_' + df['column_2'] # or whatever makes your _id
df_as_json = df.to_json(orient='records', lines=True)

final_json_string = ''
for json_document in df_as_json.split('\n'):
    jdict = json.loads(json_document)
    metadata = json.dumps({'index': {'_id': jdict['_id']}})
    jdict.pop('_id')
    final_json_string += metadata + '\n' + json.dumps(jdict) + '\n'

headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
r = requests.post('http://elasticsearch.host:9200/my_index/my_type/_bulk', data=final_json_string, headers=headers, timeout=60) 

Instead of using pandas' to_json()method, one could also use to_dict()as follows. This was slightly slower in my tests but not much:

除了使用Pandas的to_json()方法,还可以使用to_dict()如下方法。这在我的测试中稍微慢了一点,但不多:

dicts = df.to_dict(orient='records')
final_json_string = ''
for document in dicts:
    metadata = {"index": {"_id": document["_id"]}}
    document.pop('_id')
    final_json_string += json.dumps(metadata) + '\n' + json.dumps(document) + '\n'

When running this on large datasets, one can save a couple of minutes by replacing Python's default jsonlibrary with ujsonor rapidjsonvia installing it, then import ujson as jsonor import rapidjson as json, respectively.

在大型数据集上运行此程序时,通过分别安装 then或,将 Python 的默认json库替换为ujsonRapidjson,可以节省几分钟。import ujson as jsonimport rapidjson as json

An even bigger speedup can be achieved by replacing the sequential execution of the steps with a parallel one so that reading and converting does not stop while requests is waiting for Elasticsearch to process all documents and return a response. This could by done via Threading, Multiprocessing, Asyncio, Task Queues, ... but this is out of the scope of this question.

通过将步骤的顺序执行替换为并行执行,可以实现更大的加速,以便在请求等待 Elasticsearch 处理所有文档并返回响应时读取和转换不会停止。这可以通过线程、多处理、异步、任务队列等来完成,但这超出了本问题的范围。

If you happen to find an approach to do the to-json-conversion even faster, let me know.

如果您碰巧找到了一种更快地进行 to-json-conversion 的方法,请告诉我。

回答by Ali Mirzaei

This function insert a pandas dataframe into elastic search (chunk by chunk)

此函数将一个 Pandas 数据帧插入到弹性搜索中(逐块)

def insertDataframeIntoElastic(dataFrame,index='index', typ = 'test', server = 'http://localhost:9200',
                           chunk_size = 2000):
    headers = {'content-type': 'application/x-ndjson', 'Accept-Charset': 'UTF-8'}
    records = dataFrame.to_dict(orient='records')
    actions = ["""{ "index" : { "_index" : "%s", "_type" : "%s"} }\n""" % (index, typ) +json.dumps(records[j])
                    for j in range(len(records))]
    i=0
    while i<len(actions):
        serverAPI = server + '/_bulk' 
        data='\n'.join(actions[i:min([i+chunk_size,len(actions)])])
        data = data + '\n'
        r = requests.post(serverAPI, data = data, headers=headers)
        print r.content
        i = i+chunk_size