现在的位置: 首页 > 综合 > 正文

使用Binary Memtable将大量数据导入Cassandra

2013年04月26日 ⁄ 综合 ⁄ 共 4136字 ⁄ 字号 评论关闭

在这篇《谈谈Cassandra的客户端》文章中,我们谈到了如何使用Thrift API以及更加高级的封装(Hector)如果将数据导入到到Cassandra中,但是在导入大量数据的时候这会遇到很多的问题,比如插入失败,超时等等问题。为了解决这个问题,我们可以尝试使用Binary Memtable。

在Cassandra的wiki上,对Binary Memtable的描述如下:

Binary Memtable is the name of Cassandra's bulk-load interface. It avoids several kinds of overhead associated with the normal Thrift API:

  • Converting to Thrift from the internal structures and back
  • Routing (copying) from a coordinator node to the replica nodes
  • Writing to the commitlog
  • Serializing the internal structures to on-disk format

The tradeoff you make is that it is considerably less convenient to use than Thrift:

  • You must use the StorageProxy API, only available as Java code

  • You must pre-serialize the rows yourself
  • The rows you send are not live for querying until a flush occurs (either normally because the Binary Memtable fills up, or because you request one with nodetool)

  • You must write an entire row at once

具体的实现我们可以参考https://svn.apache.org/repos/asf/cassandra/trunk/contrib/bmt_example/.但是这个示例结合了Hadoop,如何没有这个环境,也可以参考我的测试代码:

package com.alibaba.dw.thrift.client;

 

import java.io.IOException;

import java.io.UnsupportedEncodingException;

 

import org.apache.cassandra.config.DatabaseDescriptor;

import org.apache.cassandra.db.Column;

import org.apache.cassandra.db.ColumnFamily;

import org.apache.cassandra.db.RowMutation;

import org.apache.cassandra.db.filter.QueryPath;

import org.apache.cassandra.io.util.DataOutputBuffer;

import java.net.InetAddress;

 

import org.apache.cassandra.net.Message;

import org.apache.cassandra.net.MessagingService;

import org.apache.cassandra.service.StorageService;

 

/**

 * TODO Comment of BinaryMemtableTest

 * 

 * @author aaron.guop

 */

public class BinaryMemtableTest {

 

    /**

     * @param args

     * @throws IOException

     * @throws InterruptedException

     */

    public static void main(String[] args) throws IOException, InterruptedException {

        System.setProperty("storage-config", "D:\\apache-cassandra-0.6.1\\conf");

 

        StorageService.instance.initClient();

 

        while (StorageService.instance.getNaturalEndpoints("Keyspace1", "bmt").isEmpty()) {

            Thread.sleep(1 * 1000);

        }

        

 

        doInsert();

 

        StorageService.instance.stopClient();

    }

 

    /**

     * @throws UnsupportedEncodingException

     */

    private static void doInsert() throws UnsupportedEncodingException {

        String keyspace = "Keyspace1";

        String cfName = "Member";

        String memberID = "bmt";

 

        /* Create a column family */

        ColumnFamily columnFamily = ColumnFamily.create(keyspace, cfName);

 

        //while (values.hasNext()) {

        String SuperColumnName = "SuperColumnName";

        String ColumnName = "ColumnName";

        String ColumnValue = "ColumnValue";

        long timestamp = 0;

        columnFamily.addColumn(new QueryPath(cfName, SuperColumnName.getBytes("UTF-8"), ColumnName

                .getBytes("UTF-8")), ColumnValue.getBytes(), timestamp);

        //}

 

        /* Get serialized message to send to cluster */

        Message message = createMessage(keyspace, memberID, cfName, columnFamily);

        for (InetAddress endpoint : StorageService.instance.getNaturalEndpoints(keyspace, memberID)) {

            /* Send message to end point */

            MessagingService.instance.sendOneWay(message, endpoint);

            System.out.println("Send message to " + endpoint.toString());

        }

    }

 

    public static Message createMessage(String Keyspace, String Key, String CFName,

                                        ColumnFamily columnFamile) {

        DataOutputBuffer bufOut = new DataOutputBuffer();

        Column column;

 

        /*

         * Get the first column family from list, this is just to get past

         * validation

         */

        ColumnFamily baseColumnFamily = new ColumnFamily(CFName, "Standard", DatabaseDescriptor

                .getComparator(Keyspace, CFName), DatabaseDescriptor.getSubComparator(Keyspace,

                CFName));

 

        bufOut.reset();

        try {

            ColumnFamily.serializer().serializeWithIndexes(columnFamile, bufOut);

            byte[] data = new byte[bufOut.getLength()];

            System.arraycopy(bufOut.getData(), 0, data, 0, bufOut.getLength());

 

            column = new Column(columnFamile.name().getBytes("UTF-8"), data, 0, false);

            baseColumnFamily.addColumn(column);

        } catch (IOException e) {

            throw new RuntimeException(e);

        }

 

        RowMutation rm = new RowMutation(Keyspace, Key);

        rm.add(baseColumnFamily);

 

        try {

            /* Make message */

            return rm.makeRowMutationMessage(StorageService.Verb.BINARY);

        } catch (IOException e) {

            throw new RuntimeException(e);

        }

    }

 

}

在运行这个代码之前,我们还必须注意以下事项:

1 运行这个程序的机器不能运行Cassandra Server。

2 在storage-conf.xml的seed中指定其他Cassandra Server的地址。

3 使用nodetool的getcachecapacity得到当前的压缩设置,记录下来,供完成导入后恢复使用。

4 使用nodetool的setcachecapacity设置当前的压缩设置为0,即取消压缩功能。

5 运行导入程序。

6 使用nodetool的flush。

7 使用nodetool的setcompactionthreshold设置回之前在第三步获得的值。 

经过这些操作后,数据就导入到我们的Cassandra中了。

 

更多关于Cassandra的文章:http://www.cnblogs.com/gpcuster/tag/Cassandra/

抱歉!评论已关闭.