import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.Column; import org.apache.cassandra.thrift.ColumnOrSuperColumn; import org.apache.cassandra.thrift.ColumnParent; import org.apache.cassandra.thrift.ConsistencyLevel; import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.Mutation; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.SliceRange; import org.apache.cassandra.thrift.SuperColumn; import org.apache.cassandra.thrift.TimedOutException; import org.apache.cassandra.thrift.UnavailableException; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; public class TestBatch { private static TTransport transport; private static TProtocol protocol; private static Cassandra.Client client; private static final String KEY_SPACE = "testspace"; private static final String COLUMN_FAMILY = "test_batch"; private static final String KEY_1 = "9000"; private static final String KEY_2 = "9001"; private static final String KEY_3 = "9002"; public static final String UTF8 = "UTF8"; public static void main(String[] args) throws Exception { transport = new TFramedTransport(new TSocket("localhost", 9160)); protocol = new TBinaryProtocol(transport); client = new Cassandra.Client(protocol); transport.open(); client.set_keyspace(KEY_SPACE); // insert data Map<ByteBuffer, Map<String, List<Mutation>>> rowsMap = getMap(); client.batch_mutate(rowsMap, ConsistencyLevel.ONE); // show data showEntries(new ArrayList<ByteBuffer>() { { add(toByteBuffer(KEY_1)); add(toByteBuffer(KEY_2)); add(toByteBuffer(KEY_3)); } }); transport.close(); } /** * 显示所有记录 * @throws InvalidRequestException * @throws UnavailableException * @throws TimedOutException * @throws TException * @throws UnsupportedEncodingException */ private static void showEntries(final List<ByteBuffer> keys) throws InvalidRequestException, UnavailableException, TimedOutException, TException, UnsupportedEncodingException { ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY); SliceRange sliceRange = new SliceRange(); sliceRange.setStart(new byte[0]); sliceRange.setFinish(new byte[0]); SlicePredicate slicePredicate = new SlicePredicate(); slicePredicate.setSlice_range(sliceRange); Map<ByteBuffer, List<ColumnOrSuperColumn>> results = client.multiget_slice(keys, columnParent, slicePredicate, ConsistencyLevel.ONE); for (Map.Entry<ByteBuffer, List<ColumnOrSuperColumn>> entry : results.entrySet()) { ByteBuffer key = entry.getKey(); List<ColumnOrSuperColumn> list = entry.getValue(); for (int i = 0; i < list.size(); i++) { ColumnOrSuperColumn result = list.get(i); Column col = result.column; System.out.printf("key:[%s] [%d] name:[%s] value:[%s] :[%s]\n", key, i + 1, toString(col.name), toString(col.value), new Date(col.timestamp)); } System.out.println("--------------------"); } } private static Map<ByteBuffer, Map<String, List<Mutation>>> getMap() throws Exception { List<Mutation> cslist = new ArrayList<Mutation>(); long timestamp = System.currentTimeMillis(); // Create the username column. cslist.add(toMutation("username", "mike", timestamp)); // Create the password column. cslist.add(toMutation("password", "smj", timestamp)); // Create the email column. cslist.add(toMutation("email", "zhang@163.com", timestamp)); // 列族 Map<String, List<Mutation>> columnFamilyMap = new HashMap<String, List<Mutation>>(); columnFamilyMap.put(COLUMN_FAMILY, cslist); Map<ByteBuffer, Map<String, List<Mutation>>> rowsMap = new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); rowsMap.put(toByteBuffer(KEY_1), columnFamilyMap); rowsMap.put(toByteBuffer(KEY_2), columnFamilyMap); rowsMap.put(toByteBuffer(KEY_3), columnFamilyMap); return rowsMap; } /** * * @param name * @param value * @param timestamp * @return * @throws UnsupportedEncodingException */ public static Mutation toMutation(final String name, final String value, final long timestamp) throws UnsupportedEncodingException { Mutation mutation = new Mutation(); ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn(); Column column = new Column(); column.setName(ByteBufferUtil.bytes(name)); column.setValue(ByteBufferUtil.bytes(value)); column.setTimestamp(timestamp); columnOrSuperColumn.setColumn(column); mutation.setColumn_or_supercolumn(columnOrSuperColumn); return mutation; } public static void printSuperColumn(SuperColumn superColumn) { List<Column> list = superColumn.columns; for (Column c : list) { ByteBuffer name = c.name; ByteBuffer value = c.value; try { System.out.println(toString(name) + " : " + toString(value)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } public static void printColumn(Column c) { ByteBuffer name = c.name; ByteBuffer value = c.value; try { System.out.println(toString(name) + " : " + toLong(value)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /* * 将String转换为bytebuffer,以便插入cassandra */ public static ByteBuffer toByteBuffer(String value) throws UnsupportedEncodingException { return ByteBuffer.wrap(value.getBytes("UTF-8")); } /* * 将String转换为bytebuffer,以便插入cassandra */ public static byte[] toByteBuffer(long value) throws UnsupportedEncodingException { return Long.toHexString(value).getBytes(); } /* * 将bytebuffer转换为String */ public static String toString(ByteBuffer buffer) throws UnsupportedEncodingException { byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); return new String(bytes, "UTF-8"); } /* * 将bytebuffer转换为String */ public static long toLong(ByteBuffer buffer) throws UnsupportedEncodingException { return Long.parseLong(toString(buffer), 16); } }
import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; public class ByteBufferUtil { public static ByteBuffer bytes(String value) throws UnsupportedEncodingException { return ByteBuffer.wrap(value.getBytes("UTF-8")); } }