如何使用Bulk API在ES中使用Python存储关键字
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/20288770/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to use Bulk API to store the keywords in ES by using Python
提问by chengji18
I have to store some message in ElasticSearch integrate with my python program. Now what I try to store the message is:
我必须在 ElasticSearch 中存储一些消息与我的 python 程序集成。现在我尝试存储的消息是:
d={"message":"this is message"}
for index_nr in range(1,5):
ElasticSearchAPI.addToIndex(index_nr, d)
print d
That means if I have 10 messages then I have to repeat my code 10 times. So what I want to do is try to make a script file or batch file. I've checked the ElasticSearch Guide, BULK API is possible to use. The format should be something like below:
这意味着如果我有 10 条消息,那么我必须重复我的代码 10 次。所以我想做的是尝试制作一个脚本文件或批处理文件。我检查了ElasticSearch Guide,可以使用 BULK API。格式应如下所示:
{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }
what I did is:
我所做的是:
{"index":{"_index":"test1","_type":"message","_id":"1"}}
{"message":"it is red"}
{"index":{"_index":"test2","_type":"message","_id":"2"}}
{"message":"it is green"}
I also use curl tool to store the doc.
我还使用 curl 工具来存储文档。
$ curl -s -XPOST localhost:9200/_bulk --data-binary @message.json
Now I want to use my Python codeto store the file to the Elastic Search.
现在我想使用我的 Python 代码将文件存储到 Elastic Search。
回答by Justina Chen
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()
actions = [
{
"_index": "tickets-index",
"_type": "tickets",
"_id": j,
"_source": {
"any":"data" + str(j),
"timestamp": datetime.now()}
}
for j in range(0, 10)
]
helpers.bulk(es, actions)
回答by Diolor
Although @justinachen 's code helped me start with py-elasticsearch, after looking in the source code let me do a simple improvement:
虽然@justinachen 的代码帮助我开始使用 py-elasticsearch,但在查看源代码后,让我做一个简单的改进:
es = Elasticsearch()
j = 0
actions = []
while (j <= 10):
action = {
"_index": "tickets-index",
"_type": "tickets",
"_id": j,
"_source": {
"any":"data" + str(j),
"timestamp": datetime.now()
}
}
actions.append(action)
j += 1
helpers.bulk(es, actions)
helpers.bulk()already does the segmentation for you. And by segmentation I mean the chucks sent every time to the server. If you want to reduce the chunk of sent documents do: helpers.bulk(es, actions, chunk_size=100)
helpers.bulk()已经为你做了细分。通过分段,我的意思是每次发送到服务器的夹头。如果要减少发送的文档块,请执行以下操作:helpers.bulk(es, actions, chunk_size=100)
Some handy info to get started:
一些方便的信息开始:
helpers.bulk()is just a wrapper of the helpers.streaming_bulkbut the first accepts a list which makes it handy.
helpers.bulk()只是一个包装器,helpers.streaming_bulk但第一个接受一个列表,这使得它很方便。
helpers.streaming_bulkhas been based on Elasticsearch.bulk()so you do not need to worry about what to choose.
helpers.streaming_bulk一直基于Elasticsearch.bulk()所以你不必担心选择什么。
So in most cases, helpers.bulk()should be all you need.
所以在大多数情况下,helpers.bulk()应该是你所需要的。
回答by Ethan
(the other approaches mentioned in this thread use python list for the ES update, which is not a good solution today, especially when you need to add millions of data to ES)
(这个线程中提到的其他方法使用python list进行ES更新,这在今天不是一个好的解决方案,尤其是当您需要向ES添加数百万条数据时)
Better approachis using python generators-- process gigs of data without going out of memory or compromising much on speed.
更好的方法是使用python 生成器-处理数据的演出,而不会耗尽内存或在速度上有很大的妥协。
Below is an example snippet from a practical use case - adding data from nginx log file to ES for analysis.
下面是一个实际用例的示例片段 - 将 nginx 日志文件中的数据添加到 ES 进行分析。
def decode_nginx_log(_nginx_fd):
for each_line in _nginx_fd:
# Filter out the below from each log line
remote_addr = ...
timestamp = ...
...
# Index for elasticsearch. Typically timestamp.
idx = ...
es_fields_keys = ('remote_addr', 'timestamp', 'url', 'status')
es_fields_vals = (remote_addr, timestamp, url, status)
# We return a dict holding values from each line
es_nginx_d = dict(zip(es_fields_keys, es_fields_vals))
# Return the row on each iteration
yield idx, es_nginx_d # <- Note the usage of 'yield'
def es_add_bulk(nginx_file):
# The nginx file can be gzip or just text. Open it appropriately.
...
es = Elasticsearch(hosts = [{'host': 'localhost', 'port': 9200}])
# NOTE the (...) round brackets. This is for a generator.
k = ({
"_index": "nginx",
"_type" : "logs",
"_id" : idx,
"_source": es_nginx_d,
} for idx, es_nginx_d in decode_nginx_log(_nginx_fd))
helpers.bulk(es, k)
# Now, just run it.
es_add_bulk('./nginx.1.log.gz')
This skeleton demonstrates the usage of generators. You can use this even on a bare machine if you need to. And you can go on expanding on this to tailor to your needs quickly.
这个骨架演示了生成器的用法。如果需要,您甚至可以在裸机上使用它。您可以继续扩展以快速满足您的需求。
Python Elasticsearch reference here.
Python Elasticsearch 参考在这里。
回答by Rafal Enden
There are two options which I can think of at the moment:
目前我能想到的有两种选择:
1. Define index name and document type with each entity:
1. 为每个实体定义索引名称和文档类型:
es_client = Elasticsearch()
body = []
for entry in entries:
body.append({'index': {'_index': index, '_type': 'doc', '_id': entry['id']}})
body.append(entry)
response = es_client.bulk(body=body)
2. Provide the default index and document type with the method:
2. 使用方法提供默认索引和文档类型:
es_client = Elasticsearch()
body = []
for entry in entries:
body.append({'index': {'_id': entry['id']}})
body.append(entry)
response = es_client.bulk(index='my_index', doc_type='doc', body=body)
Works with:
适用于:
ES version:6.4.0
ES版本:6.4.0
ES python lib: 6.3.1
ES python 库:6.3.1

