java elasticsearch java批量大小
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/16920902/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
elasticsearch java bulk batch size
提问by Justin
I want to use the elasticsearch bulk api using java and wondering how I can set the batch size.
我想使用 java 使用 elasticsearch 批量 api 并想知道如何设置批量大小。
Currently I am using it as:
目前我将它用作:
BulkRequestBuilder bulkRequest = getClient().prepareBulk();
while(hasMore) {
bulkRequest.add(getClient().prepareIndex(indexName, indexType, artist.getDocId()).setSource(json));
hasMore = checkHasMore();
}
BulkResponse bResp = bulkRequest.execute().actionGet();
//To check failures
log.info("Has failures? {}", bResp.hasFailures());
Any idea how I can set the bulk/batch size?
知道如何设置批量/批量大小吗?
回答by javanna
It mainly depends on the size of your documents, available resources on the client and the type of client (transport client or node client).
它主要取决于您的文档大小、客户端上的可用资源和客户端的类型(传输客户端或节点客户端)。
The node client is aware of the shards over the cluster and sends the documents directly to the nodes that hold the shards where they are supposed to be indexed. On the other hand the transport client is a normal client that sends its requests to a list of nodes in a round-robin fashion. The bulk request would be sent to one node then, which would become your gateway when indexing.
节点客户端知道集群上的分片,并将文档直接发送到保存分片的节点,它们应该被索引。另一方面,传输客户端是一个普通客户端,它以循环方式将其请求发送到节点列表。批量请求将被发送到一个节点,该节点将成为索引时的网关。
Since you're using the Java API, I would suggest you to have a look at the BulkProcessor
, which makes it much easier and flexibile to index in bulk. You can either define a maximum number of actions, a maximum size and a maximum time interval since the last bulk execution. It's going to execute the bulk automatically for you when needed. You can also set a maximum number of concurrent bulk requests.
由于您使用的是 Java API,我建议您查看BulkProcessor
,这使得批量索引变得更加容易和灵活。您可以定义自上次批量执行以来的最大操作数、最大大小和最大时间间隔。它会在需要时自动为您执行批量操作。您还可以设置并发批量请求的最大数量。
After you created the BulkProcessor
like this:
在你创建BulkProcessor
这样的之后:
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.info("Going to execute new bulk composed of {} actions", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
logger.info("Executed bulk composed of {} actions", request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.warn("Error executing bulk", failure);
}
}).setBulkActions(bulkSize).setConcurrentRequests(maxConcurrentBulk).build();
You just have to add your requests to it:
您只需要向其中添加您的请求:
bulkProcessor.add(indexRequest);
and close it at the end to flush any eventual requests that might have not been executed yet:
并在最后关闭它以刷新可能尚未执行的任何最终请求:
bulkProcessor.close();
To finally answer your question: the nice thing about the BulkProcessor
is also that it has sensible defaults: 5 MB of size, 1000 actions, 1 concurrent request, no flush interval (which might be useful to set).
最后回答您的问题:关于 的BulkProcessor
好处还在于它具有合理的默认值:5 MB 大小、1000 个操作、1 个并发请求、没有刷新间隔(设置可能有用)。
回答by code.rider
you need to count your bulk request builder when it hits your batch size limit then index them and flush older bulk builds . here is example of code
当您的批量请求构建器达到批量大小限制时,您需要对其进行计数,然后将它们编入索引并刷新旧的批量构建。这是代码示例
Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", "MyClusterName").build();
TransportClient client = new TransportClient(settings);
String hostname = "myhost ip";
int port = 9300;
client.addTransportAddress(new InetSocketTransportAddress(hostname, port));
BulkRequestBuilder bulkBuilder = client.prepareBulk();
BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("my_file_path"))));
long bulkBuilderLength = 0;
String readLine = "";
String index = "my_index_name";
String type = "my_type_name";
String id = "";
while((readLine = br.readLine()) != null){
id = somefunction(readLine);
String json = new ObjectMapper().writeValueAsString(readLine);
bulkBuilder.add(client.prepareIndex(index, type, id).setSource(json));
bulkBuilderLength++;
if(bulkBuilderLength % 1000== 0){
logger.info("##### " + bulkBuilderLength + " data indexed.");
BulkResponse bulkRes = bulkBuilder.execute().actionGet();
if(bulkRes.hasFailures()){
logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage());
}
bulkBuilder = client.prepareBulk();
}
}
br.close();
if(bulkBuilder.numberOfActions() > 0){
logger.info("##### " + bulkBuilderLength + " data indexed.");
BulkResponse bulkRes = bulkBuilder.execute().actionGet();
if(bulkRes.hasFailures()){
logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage());
}
bulkBuilder = client.prepareBulk();
}
hope this helps you thanks
希望这对你有帮助 谢谢