如何在 Datastax Java 驱动程序中使用异步/批量写入功能
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/19202812/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to use Asynchronous/Batch writes feature with Datastax Java driver
提问by arsenal
I am planning to use Datastax Java driver for writing to Cassandra.. I was mainly interested in Batch Writes
and Asycnhronous
features of Datastax java driver but I am not able to get any tutorials which can explain me how to incorporate these features in my below code which uses Datastax Java driver..
我计划使用 Datastax Java 驱动程序写入 Cassandra .. 我主要对Datastax Java 驱动程序的特性Batch Writes
和Asycnhronous
功能感兴趣,但我无法获得任何教程来解释我如何将这些特性合并到我下面使用 Datastax 的代码中Java驱动..
/**
* Performs an upsert of the specified attributes for the specified id.
*/
public void upsertAttributes(final String userId, final Map<String, String> attributes, final String columnFamily) {
try {
// make a sql here using the above input parameters.
String sql = sqlPart1.toString()+sqlPart2.toString();
DatastaxConnection.getInstance();
PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(sql);
prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);
BoundStatement query = prepStatement.bind(userId, attributes.values().toArray(new Object[attributes.size()]));
DatastaxConnection.getSession().execute(query);
} catch (InvalidQueryException e) {
LOG.error("Invalid Query Exception in DatastaxClient::upsertAttributes "+e);
} catch (Exception e) {
LOG.error("Exception in DatastaxClient::upsertAttributes "+e);
}
}
In the below code, I am creating a Connection to Cassandra nodes using Datastax Java driver.
在下面的代码中,我使用 Datastax Java 驱动程序创建到 Cassandra 节点的连接。
/**
* Creating Cassandra connection using Datastax Java driver
*
*/
private DatastaxConnection() {
try{
builder = Cluster.builder();
builder.addContactPoint("some_nodes");
builder.poolingOptions().setCoreConnectionsPerHost(
HostDistance.LOCAL,
builder.poolingOptions().getMaxConnectionsPerHost(HostDistance.LOCAL));
cluster = builder
.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
.withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
.build();
StringBuilder s = new StringBuilder();
Set<Host> allHosts = cluster.getMetadata().getAllHosts();
for (Host h : allHosts) {
s.append("[");
s.append(h.getDatacenter());
s.append(h.getRack());
s.append(h.getAddress());
s.append("]");
}
System.out.println("Cassandra Cluster: " + s.toString());
session = cluster.connect("testdatastaxks");
} catch (NoHostAvailableException e) {
e.printStackTrace();
throw new RuntimeException(e);
} catch (Exception e) {
}
}
Can anybody help me on how to add Batch writes or Asynchronous features to my above code.. Thanks for the help..
任何人都可以帮助我如何将批量写入或异步功能添加到我上面的代码中..谢谢你的帮助..
I am running Cassandra 1.2.9
我正在运行 Cassandra 1.2.9
采纳答案by Lyuben Todorov
For asynch it's as simple as using the executeAsync
function:
对于异步,就像使用executeAsync
函数一样简单:
...
DatastaxConnection.getSession().executeAsync(query);
For the batch, you need to build the query (I use strings because the compiler knows how to optimize string concatenation really well):
对于批处理,您需要构建查询(我使用字符串是因为编译器非常了解如何优化字符串连接):
String cql = "BEGIN BATCH "
cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
cql += "APPLY BATCH; "
DatastaxConnection.getInstance();
PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(cql);
prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);
// this is where you need to be careful
// bind expects a comma separated list of values for all the params (?) above
// so for the above batch we need to supply 4 params:
BoundStatement query = prepStatement.bind(userId, "col1_val", userId_2, "col1_val_2");
DatastaxConnection.getSession().execute(query);
On a side note, I think your binding of the statement might look something like this, assuming you change attributes to a list of maps where each map represents an update/insert inside the batch:
附带说明一下,我认为您对语句的绑定可能看起来像这样,假设您将属性更改为映射列表,其中每个映射代表批处理中的更新/插入:
BoundStatement query = prepStatement.bind(userId,
attributesList.get(0).values().toArray(new Object[attributes.size()]),
userId_2,
attributesList.get(1).values().toArray(new Object[attributes.size()]));
回答by cfeduke
For the example provided in Lyuben's answer, setting certain attributes of a batch like Type.COUNTER
(if you need to update counters) using strings won't work. Instead you can arrange your prepared statements in batch like so:
对于 Lyuben 的答案中提供的示例,Type.COUNTER
使用字符串设置批次的某些属性(如果您需要更新计数器)将不起作用。相反,您可以像这样批量安排准备好的语句:
final String insertQuery = "INSERT INTO test.prepared (id, col_1) VALUES (?,?);";
final PreparedStatement prepared = session.prepare(insertQuery);
final BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
batch.add(prepared.bind(userId1, "something"));
batch.add(prepared.bind(userId2, "another"));
batch.add(prepared.bind(userId3, "thing"));
session.executeAsync(batch);