如何在 Datastax Java 驱动程序中使用异步/批量写入功能

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/19202812/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-12 15:00:14  来源:igfitidea点击:

How to use Asynchronous/Batch writes feature with Datastax Java driver

javacassandradatastax-java-driver

提问by arsenal

I am planning to use Datastax Java driver for writing to Cassandra.. I was mainly interested in Batch Writesand Asycnhronousfeatures of Datastax java driver but I am not able to get any tutorials which can explain me how to incorporate these features in my below code which uses Datastax Java driver..

我计划使用 Datastax Java 驱动程序写入 Cassandra .. 我主要对Datastax Java 驱动程序的特性Batch WritesAsycnhronous功能感兴趣,但我无法获得任何教程来解释我如何将这些特性合并到我下面使用 Datastax 的代码中Java驱动..

/**
 * Performs an upsert of the specified attributes for the specified id.
 */
public void upsertAttributes(final String userId, final Map<String, String> attributes, final String columnFamily) {

    try {

        // make a sql here using the above input parameters.

        String sql = sqlPart1.toString()+sqlPart2.toString();

        DatastaxConnection.getInstance();
        PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(sql);
        prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);        

        BoundStatement query = prepStatement.bind(userId, attributes.values().toArray(new Object[attributes.size()]));

        DatastaxConnection.getSession().execute(query);

    } catch (InvalidQueryException e) {
        LOG.error("Invalid Query Exception in DatastaxClient::upsertAttributes "+e);
    } catch (Exception e) {
        LOG.error("Exception in DatastaxClient::upsertAttributes "+e);
    }
}

In the below code, I am creating a Connection to Cassandra nodes using Datastax Java driver.

在下面的代码中,我使用 Datastax Java 驱动程序创建到 Cassandra 节点的连接。

/**
 * Creating Cassandra connection using Datastax Java driver
 *
 */
private DatastaxConnection() {

    try{
        builder = Cluster.builder();
        builder.addContactPoint("some_nodes");

        builder.poolingOptions().setCoreConnectionsPerHost(
                HostDistance.LOCAL,
                builder.poolingOptions().getMaxConnectionsPerHost(HostDistance.LOCAL));

        cluster = builder
                .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
                .withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
                .build();

        StringBuilder s = new StringBuilder();
        Set<Host> allHosts = cluster.getMetadata().getAllHosts();
        for (Host h : allHosts) {
            s.append("[");
            s.append(h.getDatacenter());
            s.append(h.getRack());
            s.append(h.getAddress());
            s.append("]");
        }
        System.out.println("Cassandra Cluster: " + s.toString());

        session = cluster.connect("testdatastaxks");

    } catch (NoHostAvailableException e) {
        e.printStackTrace();
        throw new RuntimeException(e);
    } catch (Exception e) {

    }
}

Can anybody help me on how to add Batch writes or Asynchronous features to my above code.. Thanks for the help..

任何人都可以帮助我如何将批量写入或异步功能添加到我上面的代码中..谢谢你的帮助..

I am running Cassandra 1.2.9

我正在运行 Cassandra 1.2.9

采纳答案by Lyuben Todorov

For asynch it's as simple as using the executeAsyncfunction:

对于异步,就像使用executeAsync函数一样简单:

...
DatastaxConnection.getSession().executeAsync(query);

For the batch, you need to build the query (I use strings because the compiler knows how to optimize string concatenation really well):

对于批处理,您需要构建查询(我使用字符串是因为编译器非常了解如何优化字符串连接):

String cql =  "BEGIN BATCH "
       cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
       cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
       cql += "APPLY BATCH; "

DatastaxConnection.getInstance();
PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(cql);
prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);        

// this is where you need to be careful
// bind expects a comma separated list of values for all the params (?) above
// so for the above batch we need to supply 4 params:                     
BoundStatement query = prepStatement.bind(userId, "col1_val", userId_2, "col1_val_2");

DatastaxConnection.getSession().execute(query);

On a side note, I think your binding of the statement might look something like this, assuming you change attributes to a list of maps where each map represents an update/insert inside the batch:

附带说明一下,我认为您对语句的绑定可能看起来像这样,假设您将属性更改为映射列表,其中每个映射代表批处理中的更新/插入:

BoundStatement query = prepStatement.bind(userId,
                                          attributesList.get(0).values().toArray(new Object[attributes.size()]), 
                                          userId_2,
                                          attributesList.get(1).values().toArray(new Object[attributes.size()])); 

回答by cfeduke

For the example provided in Lyuben's answer, setting certain attributes of a batch like Type.COUNTER(if you need to update counters) using strings won't work. Instead you can arrange your prepared statements in batch like so:

对于 Lyuben 的答案中提供的示例,Type.COUNTER使用字符串设置批次的某些属性(如果您需要更新计数器)将不起作用。相反,您可以像这样批量安排准备好的语句:

final String insertQuery = "INSERT INTO test.prepared (id, col_1) VALUES (?,?);";
final PreparedStatement prepared = session.prepare(insertQuery);

final BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
batch.add(prepared.bind(userId1, "something"));
batch.add(prepared.bind(userId2, "another"));
batch.add(prepared.bind(userId3, "thing"));

session.executeAsync(batch);