1. cqlsh代码流程
1) start入口
bin/cqlsh
-main-
->main(*read_options(sys.argv[1:],os.environ))
->def main(options, hostname, port):
->shell.cmdloop()
->cmdloop(self): //进入了cmd loop
2) select入口
bin/cqlsh
a) def do_select(self, parsed):
tracing_was_enabled = self.tracing_enabled
ksname = parsed.get_binding(‘ksname‘)
stop_tracing =ksname == ‘system_traces‘ or (ksname is None and self.current_keyspace ==‘system_traces‘)
self.tracing_enabled = self.tracing_enabled and not stop_tracing
statement =parsed.extract_orig()
with_default_limit = parsed.get_binding(‘limit‘) is None
ifwith_default_limit:
statement = "%s LIMIT %d;" % (statement[:-1], DEFAULT_SELECT_LIMIT)
self.perform_statement(statement,with_default_limit=with_default_limit) #进入查询过程
self.tracing_enabled = tracing_was_enabled
可以使用tracing 开关,在shell命令行通过如下命令打开
b) self.perform_statement(statement, decoder=ErrorHandlingSchemaDecoder, with_default_limit=with_default_limit);
c) perform_statement_untraced(self, statement, decoder=None,with_default_limit=False)
untraced 的状态下解析代码;
d) self.cursor.execute(statement, decoder=decoder)#执行代码;
->self.cursor= self.conn.cursor()
-> self.conn = cql.connect(hostname, port, user=username,password=password,
cql_version=cqlver, transport=transport); #cqlsh打开的时候连接集群
e) import cql 是在 lib/cql-internal-only-1.4.1.zip 模块里面实现的;
f) open connection.py ;
->def cursor(self):
if notself.open_socket:
raise ProgrammingError("Connection has been closed.")
curs =self.cursorclass(self)
curs.compression = self.compression
curs.consistency_level = self.consistency_level
return curs
g) open cursor.py
-> def execute(self, cql_query,params={}, decoder=None, consistency_level=None):
# note that‘decoder‘ here is actually the decoder class, not the
# instance tobe used for decoding. bad naming, but it‘s in use now.
ifisinstance(cql_query, unicode):
raise ValueError("CQL query must be bytes, not unicode")
self.pre_execution_setup() #执行环节设置
prepared_q =self.prepare_inline(cql_query, params)
cl =consistency_level or self.consistency_level
response =self.get_response(prepared_q, cl)
#获得执行结果
returnself.process_execution_results(response, decoder=decoder)
h) 下载代码需要深入分析
查找.get_response的分析过程
i) $ grep ‘get_response‘ ./*
./cursor.py: response =self.get_response(prepared_q, cl)
./cursor.py: response =self.get_response_prepared(prepared_query, params, cl)
./native.py: def get_response(self, query,consistency_level):
./native.py: def get_response_prepared(self,prepared_query, params, consistency_level):
./thrifteries.py: def get_response(self,cql_query, consistency_level): #经过测试,该文件实现了前面的功能;
./thrifteries.py: def get_response_prepared(self,prepared_query, params, consistency_level):
j) ./thrifteries.py: defget_response(self, cql_query, consistency_level):
def get_response(self, cql_query,consistency_level):
compressed_q, compress = self.compress_query_text(cql_query) //语句进行了压缩
print"thrift_getresponse"
cl =getattr(ConsistencyLevel, consistency_level)
if self.use_cql3_methods: #cassandra
默认为cql3.0
doquery = self._connection.client.execute_cql3_query
#获得执行方法
return self.handle_cql_execution_errors(doquery, compressed_q, compress,cl) //
else: #查看方法执行过程
doquery = self._connection.client.execute_cql_query
return self.handle_cql_execution_errors(doquery, compressed_q, compress)
k) def handle_cql_execution_errors(self, executor, *args, **kwargs):
try:
return executor(*args, **kwargs) //调用executor方法执行出来结果,所有需要关注executor方法
exceptInvalidRequestException, ire:
raise cql.ProgrammingError("Bad Request: %s" % ire.why)
exceptSchemaDisagreementException, sde:
raise cql.IntegrityError("Schema versions disagree, (try againlater).")
exceptUnavailableException:
raise cql.OperationalError("Unable to complete request: one or "
"more nodes were unavailable.")
exceptTimedOutException:
raise cql.OperationalError("Request did not complete within rpc_timeout.")
exceptTApplicationException, tapp:
l) doquery =self._connection.client.execute_prepared_cql3_query
[ lib]$ grep -r ‘prepare_cql3_query‘./cql-1.4.1/*
./cql-1.4.1/cql/thrifteries.py: doquery =self._connection.client.prepare_cql3_query
./cql-1.4.1/cql/cassandra/Cassandra.py: defprepare_cql3_query(self, query, compression):
./cql-1.4.1/cql/cassandra/Cassandra.py: defprepare_cql3_query(self, query, compression):
./cql-1.4.1/cql/cassandra/Cassandra.py:self.send_prepare_cql3_query(query, compression)
./cql-1.4.1/cql/cassandra/Cassandra.py: returnself.recv_prepare_cql3_query()
./cql-1.4.1/cql/cassandra/Cassandra.py: defsend_prepare_cql3_query(self, query, compression):
./cql-1.4.1/cql/cassandra/Cassandra.py:self._oprot.writeMessageBegin(‘prepare_cql3_query‘, TMessageType.CALL,self._seqid)
./cql-1.4.1/cql/cassandra/Cassandra.py: args =prepare_cql3_query_args()
./cql-1.4.1/cql/cassandra/Cassandra.py: def recv_prepare_cql3_query(self,):
./cql-1.4.1/cql/cassandra/Cassandra.py: result =prepare_cql3_query_result()
./cql-1.4.1/cql/cassandra/Cassandra.py: raiseTApplicationException(TApplicationException.MISSING_RESULT,"prepare_cql3_query failed: unknown result");
./cql-1.4.1/cql/cassandra/Cassandra.py:self._processMap["prepare_cql3_query"] =Processor.process_prepare_cql3_query
./cql-1.4.1/cql/cassandra/Cassandra.py: defprocess_prepare_cql3_query(self, seqid, iprot, oprot):
./cql-1.4.1/cql/cassandra/Cassandra.py: args =prepare_cql3_query_args()
./cql-1.4.1/cql/cassandra/Cassandra.py: result =prepare_cql3_query_result()
./cql-1.4.1/cql/cassandra/Cassandra.py:result.success = self._handler.prepare_cql3_query(args.query, args.compression)
./cql-1.4.1/cql/cassandra/Cassandra.py:oprot.writeMessageBegin("prepare_cql3_query", TMessageType.REPLY,seqid)
./cql-1.4.1/cql/cassandra/Cassandra.py:classprepare_cql3_query_args:
./cql-1.4.1/cql/cassandra/Cassandra.py:oprot.writeStructBegin(‘prepare_cql3_query_args‘)
./cql-1.4.1/cql/cassandra/Cassandra.py:classprepare_cql3_query_result:
./cql-1.4.1/cql/cassandra/Cassandra.py:oprot.writeStructBegin(‘prepare_cql3_query_result‘)
m) open cql-1.4.1/cql/cassandra/Cassandra.py
def execute_cql3_query(self, query,compression, consistency):
"""
Parameters:
- query
- compression
- consistency
"""
print"execute_cql3_query(self, query, compression, consistency)"
self.send_execute_cql3_query(query, compression, consistency) #发送请求
returnself.recv_execute_cql3_query() #接受请求
n) def send_execute_cql3_query(self,query, compression, consistency):
self._oprot.writeMessageBegin(‘execute_cql3_query‘, TMessageType.CALL,self._seqid)
args =execute_cql3_query_args()
args.query = query
args.compression =compression
args.consistency =consistency
args.write(self._oprot)
self._oprot.writeMessageEnd()
self._oprot.trans.flush()
消息内容如下
| string | 32bit | 32bit |
| execute_cql3_query‘|TMessageType.CAL| self._seqid |
-------------------args----------------------------------
end
将请求类型发送到thrift端口,有固定的通信协议。
o) grep -r ‘writeMessage‘ ./*
./lib/thrift/protocol/TProtocol.py: def writeMessageBegin(self, name, ttype,seqid):
./lib/thrift/protocol/TProtocol.py: def writeMessageEnd(self):
./lib/thrift/protocol/TJSONProtocol.py: defwriteMessageBegin(self, name, request_type, seqid):
./lib/thrift/protocol/TJSONProtocol.py: def writeMessageEnd(self):
./lib/thrift/protocol/TJSONProtocol.py: defwriteMessageBegin(self, name, request_type, seqid):
./lib/thrift/protocol/TJSONProtocol.py: def writeMessageEnd(self):
./lib/thrift/protocol/TBinaryProtocol.py: def writeMessageBegin(self, name, type,seqid):
./lib/thrift/protocol/TBinaryProtocol.py: def writeMessageEnd(self):
使用了thrift接口写入端口信息
p) vim./lib/thrift/protocol/TBinaryProtocol.py
def writeMessageBegin(self, name, type,seqid):
if self.strictWrite:
self.writeI32(TBinaryProtocol.VERSION_1 | type)
self.writeString(name)
self.writeI32(seqid)
else:
self.writeString(name)
self.writeByte(type)
self.writeI32(seqid)
q) vimlib/cql-1.4.1/cql/cassandra/Cassandra.py
def write(self,oprot):
if oprot.__class__ ==TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None andfastbinary is n
ot None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__,self.thrift_spec)))
return
oprot.writeStructBegin(‘execute_cql3_query_args‘)
if self.query is not None:
oprot.writeFieldBegin(‘query‘, TType.STRING, 1)
oprot.writeString(self.query)
oprot.writeFieldEnd()
if self.compression is notNone:
oprot.writeFieldBegin(‘compression‘, TType.I32, 2)
oprot.writeI32(self.compression)
oprot.writeFieldEnd()
if self.consistency is notNone:
oprot.writeFieldBegin(‘consistency‘, TType.I32, 3)
oprot.writeI32(self.consistency)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()