一、前言
libcassandra是Cassandra官方推出的C/C++ API库。与thrift接口(另一个API库)相比,其接口更丰富,对类型匹配更细致。
通过实践,私下也觉得libcassandra比thrift接口更好用。当然这只是个人观点。
有关libcassandra的特点、安装等,在另一篇随笔里已有介绍,本篇把相应代码示例补上。
二、环境
创建keyspace、table如下:
create keyspace hs WITH replication={‘class‘:‘SimpleStrategy‘, ‘replication_factor‘:2}; use hs; create columnfamily SAMPLE( ID int, GID bigint, DATA blob, PRIMARY KEY(ID) );
三、示例代码
为简便起见,这里只写了Insert和select的示例,update和delete的示例与insert类似。
#include "cassandra.h" #include <string.h> #include <stdio.h> class LibCassDemo { private: CassFuture *m_pConnFuture; CassCluster *m_pCluster; CassPrepared *m_pInsertStmt = NULL; CassPrepared *m_pSelectStmt = NULL; public: CassSession *m_pSession; //其实该成员变量也应是私有的,为偷懒改成公共的了 public: LibCassDemo() {} ~LibCassDemo() { disconnect(); } //连接Cassandra bool connect() { if (m_bConnect) return true; m_pCluster = cass_cluster_new(); m_pSession = cass_session_new(); cass_cluster_set_contact_points(m_pCluster, "localhost"); cass_cluster_set_credentials(m_pCluster, "cassandra", "cassandra"); m_pConnFuture = cass_session_connect(m_pSession, m_pCluster); if (cass_future_error_code(m_pConnFuture)!=CASS_OK) //错误处理 { const char* message; size_t message_length; cass_future_error_message(m_pConnFuture, &message, &message_length); fprintf(stderr, "Unable to connect: ‘%.*s‘\n", (int)message_length, message); return false; } printf("Connect Successful!\n"); m_bConnect = true; return m_bConnect; } //断开Cassandra void disconnect() { if (!m_bConnect) return; //关闭session并释放局部变量 CassFuture *pCloseFuture = cass_session_close(m_pSession); cass_future_wait(pCloseFuture); cass_future_free(pCloseFuture); //释放所有成员变量 if (m_pInsertStmt) cass_prepared_free(m_pInsertStmt); if (m_pSelectStmt) cass_prepared_free(m_pSelectStmt); cass_future_free(m_pConnFuture); cass_cluster_free(m_pCluster); cass_session_free(m_pSession); m_bConnect = false; printf("Disonnect Successful!\n"); } //相同的INSERT语句只须Prepare一次即可 CassPrepared * buildInsertStmt() { string cql; if (m_pInsertStmt==NULL) { cql = "INSERT INTO SAMPLE (ID, GID, DATA) VALUES (?, ?, ?)"; CassFuture *pPrepareFuture = cass_session_prepare(m_pSession, cql.c_str()); if (cass_future_error_code(pPrepareFuture)!=CASS_OK) //错误处理 { const char* message; size_t message_length; cass_future_error_message(pPrepareFuture, &message, &message_length); fprintf(stderr, "Unable to prepare Insert Statement: ‘%.*s‘\n", (int)message_length, message); } else m_pInsertStmt = (CassPrepared*)cass_future_get_prepared(pPrepareFuture); cass_future_free(pPrepareFuture); } return m_pInsertStmt; } //同样的,相同的SELECT语句也只须Prepare一次 CassPrepared * buildSelectStmt() { string cql; if (m_pSelectStmt==NULL) { cql = "SELECT ID, GID, DATA FROM SAMPLE WHERE ID=? "; CassFuture *pPrepareFuture = cass_session_prepare(m_pSession, cql.c_str()); if (cass_future_error_code(pPrepareFuture)!=CASS_OK) //错误处理 { const char* message; size_t message_length; cass_future_error_message(pPrepareFuture, &message, &message_length); fprintf(stderr, "Unable to prepare Select Statement: ‘%.*s‘\n", (int)message_length, message); } else m_pSelectStmt = (CassPrepared*)cass_future_get_prepared(pPrepareFuture); cass_future_free(pPrepareFuture); } return m_pSelectStmt; } //获取int型数据 int32_t getInt32FromRow(const CassRow *row, const uint16_t index) { int32_t i32; CassValue *value = (CassValue*)cass_row_get_column(row, index); cass_value_get_int32(value, &i32); return i32; } //获取bigint型数据 int64_t getInt64FromRow(const CassRow* row, const uint16_t index) { int64_t i64; CassValue *value = (CassValue*)cass_row_get_column(row, index); cass_value_get_int64(value, &i64); return i64; } //获取blob型数据 CassError getStringFromRow(const CassRow *row, const uint16_t index, UCHAR** data, int32_t &len) { const cass_byte_t* buf; size_t sz; CassValue *value = (CassValue*)cass_row_get_column(row, index); CassError cer = cass_value_get_bytes(value, &buf, &sz); if (cer==CASS_OK) { *data = (UCHAR *)malloc(sz +1); memcpy(*data, buf, sz); *(*data+sz+1)=‘\0‘; len = (int32_t)sz; } return cer; } } int main (int argc, char *argv[]) { //初始化 LibCassDemo *cass = new LibCassDemo(); cass->connect(); UCHAR primData[128]; //Init primData here ... //写入数据 CassPrepared *prepared = cass->buildInsertStmt(); CassBatch *batch = cass_batch_new(CASS_BATCH_TYPE_LOGGED); //批量方式 for (int i=0; i<10; i++) { CassStatement * stmt = cass_prepared_bind(prepared); cass_statement_set_consistency(stmt, CASS_CONSISTENCY_QUORUM); cass_statement_bind_int32(stmt, 0, i); //第一个是0 cass_statement_bind_int64(stmt, 1, (i<<8 + i)); cass_statement_bind_bytes(stmt, 2, primData, 128); cass_batch_add_statement(batch, stmt); cass_statement_free(stmt); } CassFuture *batchFuture = cass_session_execute_batch(cass->m_pSession, batch); if (cass_future_error_code(batchFuture)!=CASS_OK) //错误处理 { const char* message; size_t message_length; cass_future_error_message(batchFuture, &message, &message_length); fprintf(stderr, "Unable to execute BATCH: ‘%.*s‘\n", (int)message_length, message); } cass_future_free(batchFuture); cass_batch_free(batch); //读取数据 prepared = cass->buildSelectStmt(); stmt = cass_prepared_bind(prepared); int id=7; cass_statement_bind_int32(stmt, 0, id); CassFuture *readFuture = cass_session_execute(cass->m_pSession, stmt); CassResult *result = (CassResult*)cass_future_get_result(readFuture); //从CassFuture获取查询结果 CassIterator *iter = cass_iterator_from_result(result); //转换为CassIterator while(cass_iterator_next(iter)) { CassRow *row = (CassRow*)cass_iterator_get_row(iter); //CassRow和CassError对象无须释放 printf("ID=%d, GID=%ld, ", cass->getInt32FromRow(row, 0), cass->getInt64FromRow(row, 1)); UCHAR *pData = (UCHAR *)malloc(128); int len; cass->getStringFromRow(row, 2, &pData, len); printf("DATA is :%s\n", pData); free(pData); } cass_iterator_free(iter); cass_result_free(result); cass_future_free(readFuture); cass_statement_free(stmt); //释放资源 delete cass; }
时间: 2024-11-09 02:54:38