cassandra client in Java——cassandra总结(五)

  cassandra client是基于Apache Thrift这个RPC框架来进行客户端和服务器的通信。  

  首先到$CASSANDRA_HOME\lib目录下导入apache-cassandra-thrift-2.1.11.jar,libthrift-0.9.2.jar这两个包,然后再导入单元测试JUnit包。

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;  

import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.thrift.SuperColumn;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.junit.Test;

public class CassandraClient {
    /*
     * 对cassandra数据库的基础操作
     * */

    private static String host = "127.0.0.1";
    private static int port = 9160;
    private static String keyspace = "cocoon";
    //暴露client供外界批量插入数据
    public static Cassandra.Client client = null;
    private static ThreadLocal<TTransport> ttrans = new ThreadLocal<TTransport>(); 

    //打开数据库连接
    public static TTransport openConnection(){
        TTransport tTransport = ttrans.get();
        if( tTransport == null ){
            tTransport = new TFramedTransport(new TSocket(host, port));
            TProtocol tProtocol = new TBinaryProtocol(tTransport);
            client = new Cassandra.Client(tProtocol);
            try {
                tTransport.open();
                client.set_keyspace(keyspace);
                ttrans.set(tTransport);
                System.out.println(tTransport);
            } catch (TTransportException e) {
                e.printStackTrace();
            } catch (InvalidRequestException e) {
                e.printStackTrace();
            } catch (TException e) {
                e.printStackTrace();
            }
        }
        return tTransport;
    }

    //关闭数据库连接
    public static void closeConnection(){
        TTransport tTransport = ttrans.get();
        ttrans.set(null);
        if( tTransport != null && tTransport.isOpen() )
            tTransport.close();
    }

    //测试线程局部变量
    @Test
    public void testThreadLocal() throws Exception{
        TTransport t1 = CassandraClient.openConnection();
        System.out.println(t1);
        TTransport t2 = CassandraClient.openConnection();
        System.out.println(t2);

        new Thread(){
            public void run(){
                TTransport t3 = CassandraClient.openConnection();
                System.out.println(t3);
                CassandraClient.closeConnection();
            }
        }.start();

        Thread.sleep(100);
        CassandraClient.closeConnection();
        System.out.println(t1.isOpen());
    }

    /*插入一个supercolumn(for super column family)
     * @param columns column的map集合
     * */
    public void insertSuperColumn(String superColumnFamily, String key, String superName, Map<String, String> columns)
            throws UnsupportedEncodingException, InvalidRequestException,
            UnavailableException, TimedOutException, TException{
        openConnection();
        Map<ByteBuffer, Map<String, List<Mutation>>> map;
        map = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
        List<Mutation> list = new ArrayList<Mutation>();
        SuperColumn superColumn = new SuperColumn();
        superColumn.setName(CassandraClient.toByteBuffer(superName));
        Set<String> columnNames = columns.keySet();
        for(String columnName: columnNames) {
            Column c = new Column();
            c.setName(CassandraClient.toByteBuffer(columnName));
            c.setValue(CassandraClient.toByteBuffer(columns.get(columnName)));
            c.setTimestamp(System.currentTimeMillis());
            superColumn.addToColumns(c);
        }
        ColumnOrSuperColumn cos = new ColumnOrSuperColumn();
        cos.super_column = superColumn;
        Mutation mutation = new Mutation();
        mutation.column_or_supercolumn = cos;
        list.add(mutation);
        Map<String,List<Mutation>> supers = new HashMap<String, List<Mutation>>();
        supers.put(superColumnFamily, list);
        map.put(toByteBuffer(key), supers);
        client.batch_mutate(map, ConsistencyLevel.ONE);
        closeConnection();
    }

    //插入一个column(for standard column family)
    public void insertColumn(String columnFamily, String key, String columnName, String columnValue) throws UnsupportedEncodingException{
        openConnection();
        ColumnParent parent = new ColumnParent(columnFamily);
        if( client != null ) {
            Column column = new Column( toByteBuffer(columnName) );
            column.setValue(toByteBuffer(columnValue));
            long timestamp = System.currentTimeMillis();
            column.setTimestamp(timestamp);
            try {
                client.insert(toByteBuffer(key), parent, column, ConsistencyLevel.ONE);
            } catch (InvalidRequestException e) {
                e.printStackTrace();
            } catch (UnavailableException e) {
                e.printStackTrace();
            } catch (TimedOutException e) {
                e.printStackTrace();
            } catch (TException e) {
                e.printStackTrace();
            }
            closeConnection();
        }
    }

    /*获取key对应的column集合(for standard column family)
     * @return column的map集合
     * */
    public HashMap<String, String> getColumns(String columnFamily, String key) throws InvalidRequestException, UnavailableException, TimedOutException, UnsupportedEncodingException, TException{
        openConnection();
        ColumnParent parent = new ColumnParent(columnFamily);
        if( client != null ) {
            SlicePredicate predicate = new SlicePredicate();
            //定义查询的columnName范围(begin~end),正反方向,数目(columnName在数据库中是排好序的,所以有正方向查询,反方向查询)
            SliceRange sliceRange = new SliceRange(toByteBuffer(""), toByteBuffer(""), false,100);
            predicate.setSlice_range(sliceRange);
            List<ColumnOrSuperColumn> results = client.get_slice(toByteBuffer(key), parent, predicate, ConsistencyLevel.ONE);
            if( results == null )
                return null;
            HashMap<String, String> map = new HashMap<String, String>();
            for (ColumnOrSuperColumn result : results)
            {
                Column column = result.column;
                map.put(byteBufferToString(column.name), byteBufferToString(column.value));
                //System.out.println(byteBufferToString(column.name) + " : " + byteBufferToString(column.value) );
            }
            closeConnection();
            return map;
        }
        return null;
    }

    /*获取key对应的superColumn集合(for super column family)
     * @return superColumn的map集合
     * */
    public HashMap<String, Map<String, String>> getSuperColumns(String superColumnFamily, String key) throws InvalidRequestException, UnavailableException, TimedOutException, UnsupportedEncodingException, TException{
        openConnection();
        ColumnParent parent = new ColumnParent(superColumnFamily);
        if( client != null ) {
            SlicePredicate predicate = new SlicePredicate();
            //定义查询的superColumn.key范围,正反方向,数目(superColumn.key在数据库中是排好序的,所以有正方向查询,反方向查询)
            SliceRange sliceRange = new SliceRange(toByteBuffer(""), toByteBuffer(""), false, 100);
            predicate.setSlice_range(sliceRange);
            List<ColumnOrSuperColumn> results = client.get_slice(toByteBuffer(key), parent, predicate, ConsistencyLevel.ONE);
            if( results == null )
                return null;
            HashMap<String, Map<String, String>> supers = new HashMap<String, Map<String,String>>();
            for (ColumnOrSuperColumn result : results)
            {
                SuperColumn superColumn = result.super_column;
                Map<String, String> columns = new HashMap<String, String>();
                for( Column column : superColumn.columns){
                    columns.put(byteBufferToString(column.name), byteBufferToString(column.value));
                }
                supers.put(byteBufferToString(superColumn.name), columns);
                //System.out.println(byteBufferToString(column.name) + " : " + byteBufferToString(column.value) );
            }
            closeConnection();
            return supers;
        }
        return null;
    }

    /*获取key,columnName对应的columnValue(for standard column family)
     * @return String
     * */
    public String getColumnValue(String columnFamily, String key, String columnName){
        try {
            ColumnPath path = new ColumnPath(columnFamily);
            path.setColumn(toByteBuffer(columnName));
            try {
                openConnection();
                String result =  new String( client.get(toByteBuffer(key), path, ConsistencyLevel.ONE).getColumn().getValue() );
                closeConnection();
                return result;
            } catch (InvalidRequestException e) {
                e.printStackTrace();
            } catch (NotFoundException e) {
                e.printStackTrace();
            } catch (UnavailableException e) {
                e.printStackTrace();
            } catch (TimedOutException e) {
                e.printStackTrace();
            } catch (TException e) {
                e.printStackTrace();
            }
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }    

    //String类型转化为ByteBuffer(UTF-8编码)
    public static ByteBuffer toByteBuffer(String value) throws UnsupportedEncodingException  {
        return ByteBuffer.wrap(value.getBytes("UTF-8"));
    }

    //ByteBuffer类型转化为String(UTF-8编码)
    public static String byteBufferToString(ByteBuffer byteBuffer) throws UnsupportedEncodingException{
        byte[] bytes = new byte[byteBuffer.remaining()];
        byteBuffer.get(bytes);
        return new String(bytes,"UTF-8");
    }
}

批量插入数据调用client.batch_mutate()方法,standard和super类型的数据都可以批量插入,insertSuperColumn()方法中有用到。

需要注意的是在插入大量数据时,如果选择一条一条插入,打开一次数据库的连接就够了,如果调用上面的方法,每一条数据都打开一次数据库的连接会快速耗尽客户端使用的端口号(短暂端口号),短暂端口号的范围为49152 ~ 65535(即2^15+(2^16-2^15)/2 ~ 2^16-1),我们在编程时要避免大量使用短暂端口号。

时间: 2025-01-18 10:10:31

cassandra client in Java——cassandra总结(五)的相关文章

Java实验报告五:Java网络编程及安全

Java实验报告五:Java网络编程及安全                                                                                                      20135315  宋宸宁 一.实验内容 1.掌握Socket程序的编写: 2.掌握密码技术的使用: 3.设计安全传输系统. 二.实验步骤 1. 基于Java Socket实现安全传输 2. 基于TCP实现客户端和服务器,结对编程一人负责客户端,一人负责

elasticsearch系列七:ES Java客户端-Elasticsearch Java client(ES Client 简介、Java REST Client、Java Client、Spring Data Elasticsearch)

一.ES Client 简介 1. ES是一个服务,采用C/S结构 2. 回顾 ES的架构 3. ES支持的客户端连接方式 3.1 REST API ,端口 9200 这种连接方式对应于架构图中的RESTful style API这一层,这种客户端的连接方式是RESTful风格的,使用http的方式进行连接 3.2 Transport 连接 端口 9300 这种连接方式对应于架构图中的Transport这一层,这种客户端连接方式是直接连接ES的节点,使用TCP的方式进行连接 4. ES提供了多种

java入门第五步之数据库项目实战【转】

在真正进入代码编写前些进行一些工具的准备: 1.保证有一个可用的数据库,这里我用sql server 2000为例,2.拥有一个ide,如ecelise或myeclipse等,这里我使用的是myeclipse 8.5 3.数据库连接的架包: 这里数据库的安装就不说了,如果你安装的sql server 2000的话,你在使用jdbc进行连接时还需要打上sp3补丁: 一切准备就绪后我们就开始进入正题了: 1.首先我们在file--->New---->Web Project(也可以再Package

Java设计模式(五)外观模式 桥梁模式

(九)外观模式 外观模式为子系统提供一个接口,便于使用.解决了类与类之间关系的,外观模式将类之间的关系放在一个 Facade 类中,降低了类类之间的耦合度,该模式不涉及接口. class CPU { public void startup(){ System.out.println("cpu start"); } public void shutdown(){ System.out.println("cpu stop"); } } class Memory { pu

Java中的五种单例模式实现方法

[代码] Java中的五种单例模式实现方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 package s

跟老杨学java系列(五) JDK的安装与配置

跟老杨学java系列(五) JDK的安装与配置 提示:本节内容对于java入门是非常关键的,对于刚接触java的同学一定要认真学习,欢迎大家留言探讨技术问题.其他问题概不回复. (书接上回)上节课程我们简单介绍了java项目的开发过程及常用的开发工具,这节课我们详细讲解一下JDK的安装与配置.根据上一节的学习,我们知道编写完java代码后,需要先对java代码进行编译,然后再执行.而java程序的编译与执行都是通过JDK来完成的.所以做java开发,首先我们需要学会安装和配置JDK.下面我们就来

Java 多线程(五) 多线程的同步

Java 多线程(五) 多线程的同步 为什么要引入同步机制 在多线程环境中,可能会有两个甚至更多的线程试图同时访问一个有限的资源.必须对这种潜在资源冲突进行预防. 解决方法:在线程使用一个资源时为其加锁即可. 访问资源的第一个线程为其加上锁以后,其他线程便不能再使用那个资源,除非被解锁. 程序实例 用一个取钱的程序例子,来说明为什么需要引入同步. 在使用同步机制前,整体程序如下: public class FetchMoneyTest { public static void main(Stri

201671010140. 2016-2017-2 《Java程序设计》java学习第五周

java学习第五周心得体会        本周,是Java学习第五周,随着时间推移,随着课本内容的推进,我们接触到的程序也开始变得越来越复杂,不再是二三章那些用来练手的小程序了,这一点,在我们的例题运行注解上就可以感受到,程序的长度,涉及到的知识越来越多.        本周学习主要关注超类与子类的关系--继承,继承是面向对象最显著的一个特性,从已有的类(超类)中派生出新的类(子类),新的类能继承已有类的数据属性和行为,并能扩展新的能力. Java继承是使用已存在的类的定义作为基础建立新类的技术

Cassandra基本介绍(2) - Cassandra概述

上一节我们介绍RDBMS遇到的问题,这一节我们将介绍Cassandra以及Cassandra是否可以解决此问题. 通过此章节,我们将学习到: 什么是Cassandra Cassandra数据的Hash分布 Cassandra在CAP中的权衡 Cassandra复制 Cassandra可调一致性 Cassandra多数据中心 什么是Cassandra Apache Cassandra是一个开源的.分布式.无中心.弹性可扩展.高可用.容错.一致性可调.面向列的数据库,它基于Amazon Dynamo