现在的位置: 首页 > 综合 > 正文

apache-cassandra-1.2.0 批量插入样例

2013年08月09日 ⁄ 综合 ⁄ 共 5714字 ⁄ 字号 评论关闭
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.thrift.SuperColumn;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

public class TestBatch {
	private static TTransport transport;
	private static TProtocol protocol;
	private static Cassandra.Client client;

	private static final String KEY_SPACE = "testspace";

	private static final String COLUMN_FAMILY = "test_batch";

	private static final String KEY_1 = "9000";
	private static final String KEY_2 = "9001";
	private static final String KEY_3 = "9002";

	public static final String UTF8 = "UTF8";

	public static void main(String[] args) throws Exception {
		transport = new TFramedTransport(new TSocket("localhost", 9160));
		protocol = new TBinaryProtocol(transport);

		client = new Cassandra.Client(protocol);
		transport.open();

		client.set_keyspace(KEY_SPACE);

		// insert data
		Map<ByteBuffer, Map<String, List<Mutation>>> rowsMap = getMap();
		client.batch_mutate(rowsMap, ConsistencyLevel.ONE);
		// show data
		showEntries(new ArrayList<ByteBuffer>() {
			{
				add(toByteBuffer(KEY_1));
				add(toByteBuffer(KEY_2));
				add(toByteBuffer(KEY_3));
			}
		});
		transport.close();
	}

	/**
	* 显示所有记录
	* @throws InvalidRequestException
	* @throws UnavailableException
	* @throws TimedOutException
	* @throws TException
	* @throws UnsupportedEncodingException
	*/
	private static void showEntries(final List<ByteBuffer> keys) throws InvalidRequestException, UnavailableException, TimedOutException, TException, UnsupportedEncodingException {
		ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY);

		SliceRange sliceRange = new SliceRange();
		sliceRange.setStart(new byte[0]);
		sliceRange.setFinish(new byte[0]);

		SlicePredicate slicePredicate = new SlicePredicate();
		slicePredicate.setSlice_range(sliceRange);

		Map<ByteBuffer, List<ColumnOrSuperColumn>> results = client.multiget_slice(keys, columnParent, slicePredicate, ConsistencyLevel.ONE);

		for (Map.Entry<ByteBuffer, List<ColumnOrSuperColumn>> entry : results.entrySet()) {
			ByteBuffer key = entry.getKey();
			List<ColumnOrSuperColumn> list = entry.getValue();
			for (int i = 0; i < list.size(); i++) {
				ColumnOrSuperColumn result = list.get(i);
				Column col = result.column;
				System.out.printf("key:[%s] [%d] name:[%s] value:[%s] :[%s]\n", key, i + 1, toString(col.name), toString(col.value), new Date(col.timestamp));
			}
			System.out.println("--------------------");
		}
	}

	private static Map<ByteBuffer, Map<String, List<Mutation>>> getMap() throws Exception {

		List<Mutation> cslist = new ArrayList<Mutation>();

		long timestamp = System.currentTimeMillis();
		// Create the username column.
		cslist.add(toMutation("username", "mike", timestamp));
		// Create the password column.
		cslist.add(toMutation("password", "smj", timestamp));
		// Create the email column.
		cslist.add(toMutation("email", "zhang@163.com", timestamp));

		// 列族
		Map<String, List<Mutation>> columnFamilyMap = new HashMap<String, List<Mutation>>();
		columnFamilyMap.put(COLUMN_FAMILY, cslist);

		Map<ByteBuffer, Map<String, List<Mutation>>> rowsMap = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
		rowsMap.put(toByteBuffer(KEY_1), columnFamilyMap);
		rowsMap.put(toByteBuffer(KEY_2), columnFamilyMap);
		rowsMap.put(toByteBuffer(KEY_3), columnFamilyMap);

		return rowsMap;
	}

	/**
	* 
	* @param name
	* @param value
	* @param timestamp
	* @return
	* @throws UnsupportedEncodingException
	*/
	public static Mutation toMutation(final String name, final String value, final long timestamp) throws UnsupportedEncodingException {
		Mutation mutation = new Mutation();
		ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
		Column column = new Column();
		column.setName(ByteBufferUtil.bytes(name));
		column.setValue(ByteBufferUtil.bytes(value));
		column.setTimestamp(timestamp);
		columnOrSuperColumn.setColumn(column);

		mutation.setColumn_or_supercolumn(columnOrSuperColumn);
		return mutation;
	}

	public static void printSuperColumn(SuperColumn superColumn) {
		List<Column> list = superColumn.columns;
		for (Column c : list) {
			ByteBuffer name = c.name;
			ByteBuffer value = c.value;
			try {
				System.out.println(toString(name) + " : " + toString(value));
			} catch (UnsupportedEncodingException e) {
				e.printStackTrace();
			}
		}
	}

	public static void printColumn(Column c) {
		ByteBuffer name = c.name;
		ByteBuffer value = c.value;
		try {
			System.out.println(toString(name) + " : " + toLong(value));
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
	}

	/* 
	 * 将String转换为bytebuffer,以便插入cassandra 
	 */
	public static ByteBuffer toByteBuffer(String value) throws UnsupportedEncodingException {
		return ByteBuffer.wrap(value.getBytes("UTF-8"));
	}

	/* 
	 * 将String转换为bytebuffer,以便插入cassandra 
	 */
	public static byte[] toByteBuffer(long value) throws UnsupportedEncodingException {
		return Long.toHexString(value).getBytes();
	}

	/* 
	 * 将bytebuffer转换为String 
	 */
	public static String toString(ByteBuffer buffer) throws UnsupportedEncodingException {
		byte[] bytes = new byte[buffer.remaining()];
		buffer.get(bytes);
		return new String(bytes, "UTF-8");
	}

	/* 
	 * 将bytebuffer转换为String 
	 */
	public static long toLong(ByteBuffer buffer) throws UnsupportedEncodingException {
		return Long.parseLong(toString(buffer), 16);
	}
}

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;

public class ByteBufferUtil {

	public static ByteBuffer bytes(String value) throws UnsupportedEncodingException {
		return ByteBuffer.wrap(value.getBytes("UTF-8"));
	}

}

抱歉!评论已关闭.