cassandra cqlsh代码分析

1.  cqlsh代码流程

1)   start入口

bin/cqlsh

-main-

->main(*read_options(sys.argv[1:],os.environ))

->def main(options, hostname, port):

->shell.cmdloop()

->cmdloop(self):  //进入了cmd loop

2)   select入口

bin/cqlsh

a)     def do_select(self, parsed):

tracing_was_enabled = self.tracing_enabled

ksname = parsed.get_binding(‘ksname‘)

stop_tracing =ksname == ‘system_traces‘ or (ksname is None and self.current_keyspace ==‘system_traces‘)

self.tracing_enabled = self.tracing_enabled and not stop_tracing

statement =parsed.extract_orig()

with_default_limit = parsed.get_binding(‘limit‘) is None

ifwith_default_limit:

statement = "%s LIMIT %d;" % (statement[:-1], DEFAULT_SELECT_LIMIT)

self.perform_statement(statement,with_default_limit=with_default_limit) #进入查询过程

self.tracing_enabled = tracing_was_enabled

可以使用tracing 开关,在shell命令行通过如下命令打开

b)      self.perform_statement(statement, decoder=ErrorHandlingSchemaDecoder, with_default_limit=with_default_limit);

c)      perform_statement_untraced(self, statement, decoder=None,with_default_limit=False)

untraced 的状态下解析代码;

d)     self.cursor.execute(statement, decoder=decoder)#执行代码;

->self.cursor= self.conn.cursor()

-> self.conn = cql.connect(hostname, port, user=username,password=password,

cql_version=cqlver, transport=transport); #cqlsh打开的时候连接集群

e)     import cql 是在 lib/cql-internal-only-1.4.1.zip 模块里面实现的;

f)       open connection.py ;

->def cursor(self):

if notself.open_socket:

raise ProgrammingError("Connection has been closed.")

curs =self.cursorclass(self)

curs.compression = self.compression

curs.consistency_level = self.consistency_level

return curs

g)     open cursor.py

-> def execute(self, cql_query,params={}, decoder=None, consistency_level=None):

# note that‘decoder‘ here is actually the decoder class, not the

# instance tobe used for decoding. bad naming, but it‘s in use now.

ifisinstance(cql_query, unicode):

raise ValueError("CQL query must be bytes, not unicode")

self.pre_execution_setup() #执行环节设置

prepared_q =self.prepare_inline(cql_query, params)

cl =consistency_level or self.consistency_level

response =self.get_response(prepared_q, cl)
 #获得执行结果

returnself.process_execution_results(response, decoder=decoder)

h)     下载代码需要深入分析

查找.get_response的分析过程

i)      $ grep ‘get_response‘ ./*

./cursor.py: response =self.get_response(prepared_q, cl)

./cursor.py: response =self.get_response_prepared(prepared_query, params, cl)

./native.py: def get_response(self, query,consistency_level):

./native.py: def get_response_prepared(self,prepared_query, params, consistency_level):

./thrifteries.py: def get_response(self,cql_query, consistency_level):  #经过测试,该文件实现了前面的功能;

./thrifteries.py: def get_response_prepared(self,prepared_query, params, consistency_level):

j)      ./thrifteries.py: defget_response(self, cql_query, consistency_level):

def get_response(self, cql_query,consistency_level):

compressed_q, compress = self.compress_query_text(cql_query)  //语句进行了压缩

print"thrift_getresponse"

cl =getattr(ConsistencyLevel, consistency_level)

if self.use_cql3_methods:   #cassandra
默认为cql3.0

doquery = self._connection.client.execute_cql3_query 
 #获得执行方法

return self.handle_cql_execution_errors(doquery, compressed_q, compress,cl)  //

else:   #查看方法执行过程

doquery = self._connection.client.execute_cql_query

return self.handle_cql_execution_errors(doquery, compressed_q, compress)

k)     def handle_cql_execution_errors(self, executor, *args, **kwargs):

try:

return executor(*args, **kwargs) //调用executor方法执行出来结果,所有需要关注executor方法

exceptInvalidRequestException, ire:

raise cql.ProgrammingError("Bad Request: %s" % ire.why)

exceptSchemaDisagreementException, sde:

raise cql.IntegrityError("Schema versions disagree, (try againlater).")

exceptUnavailableException:

raise cql.OperationalError("Unable to complete request: one or "

"more nodes were unavailable.")

exceptTimedOutException:

raise cql.OperationalError("Request did not complete within rpc_timeout.")

exceptTApplicationException, tapp:

l)       doquery =self._connection.client.execute_prepared_cql3_query

[ lib]$ grep -r ‘prepare_cql3_query‘./cql-1.4.1/*

./cql-1.4.1/cql/thrifteries.py: doquery =self._connection.client.prepare_cql3_query

./cql-1.4.1/cql/cassandra/Cassandra.py: defprepare_cql3_query(self, query, compression):

./cql-1.4.1/cql/cassandra/Cassandra.py: defprepare_cql3_query(self, query, compression):

./cql-1.4.1/cql/cassandra/Cassandra.py:self.send_prepare_cql3_query(query, compression)

./cql-1.4.1/cql/cassandra/Cassandra.py: returnself.recv_prepare_cql3_query()

./cql-1.4.1/cql/cassandra/Cassandra.py: defsend_prepare_cql3_query(self, query, compression):

./cql-1.4.1/cql/cassandra/Cassandra.py:self._oprot.writeMessageBegin(‘prepare_cql3_query‘, TMessageType.CALL,self._seqid)

./cql-1.4.1/cql/cassandra/Cassandra.py: args =prepare_cql3_query_args()

./cql-1.4.1/cql/cassandra/Cassandra.py: def recv_prepare_cql3_query(self,):

./cql-1.4.1/cql/cassandra/Cassandra.py: result =prepare_cql3_query_result()

./cql-1.4.1/cql/cassandra/Cassandra.py: raiseTApplicationException(TApplicationException.MISSING_RESULT,"prepare_cql3_query failed: unknown result");

./cql-1.4.1/cql/cassandra/Cassandra.py:self._processMap["prepare_cql3_query"] =Processor.process_prepare_cql3_query

./cql-1.4.1/cql/cassandra/Cassandra.py: defprocess_prepare_cql3_query(self, seqid, iprot, oprot):

./cql-1.4.1/cql/cassandra/Cassandra.py: args =prepare_cql3_query_args()

./cql-1.4.1/cql/cassandra/Cassandra.py: result =prepare_cql3_query_result()

./cql-1.4.1/cql/cassandra/Cassandra.py:result.success = self._handler.prepare_cql3_query(args.query, args.compression)

./cql-1.4.1/cql/cassandra/Cassandra.py:oprot.writeMessageBegin("prepare_cql3_query", TMessageType.REPLY,seqid)

./cql-1.4.1/cql/cassandra/Cassandra.py:classprepare_cql3_query_args:

./cql-1.4.1/cql/cassandra/Cassandra.py:oprot.writeStructBegin(‘prepare_cql3_query_args‘)

./cql-1.4.1/cql/cassandra/Cassandra.py:classprepare_cql3_query_result:

./cql-1.4.1/cql/cassandra/Cassandra.py:oprot.writeStructBegin(‘prepare_cql3_query_result‘)

m)     open cql-1.4.1/cql/cassandra/Cassandra.py

def execute_cql3_query(self, query,compression, consistency):

"""

Parameters:

- query

- compression

- consistency

"""

print"execute_cql3_query(self, query, compression, consistency)"

self.send_execute_cql3_query(query, compression, consistency)  #发送请求

returnself.recv_execute_cql3_query()  #接受请求

n)       def send_execute_cql3_query(self,query, compression, consistency):

self._oprot.writeMessageBegin(‘execute_cql3_query‘, TMessageType.CALL,self._seqid)

args =execute_cql3_query_args()

args.query = query

args.compression =compression

args.consistency =consistency

args.write(self._oprot)

self._oprot.writeMessageEnd()

self._oprot.trans.flush()

消息内容如下

|   string           |       32bit      |  32bit     |

|  execute_cql3_query‘|TMessageType.CAL| self._seqid |

-------------------args----------------------------------

end

将请求类型发送到thrift端口,有固定的通信协议。

o)      grep -r ‘writeMessage‘  ./*

./lib/thrift/protocol/TProtocol.py:  def writeMessageBegin(self, name, ttype,seqid):

./lib/thrift/protocol/TProtocol.py:  def writeMessageEnd(self):

./lib/thrift/protocol/TJSONProtocol.py: defwriteMessageBegin(self, name, request_type, seqid):

./lib/thrift/protocol/TJSONProtocol.py:  def writeMessageEnd(self):

./lib/thrift/protocol/TJSONProtocol.py: defwriteMessageBegin(self, name, request_type, seqid):

./lib/thrift/protocol/TJSONProtocol.py:    def writeMessageEnd(self):

./lib/thrift/protocol/TBinaryProtocol.py:  def writeMessageBegin(self, name, type,seqid):

./lib/thrift/protocol/TBinaryProtocol.py:  def writeMessageEnd(self):

使用了thrift接口写入端口信息

p)          vim./lib/thrift/protocol/TBinaryProtocol.py

def writeMessageBegin(self, name, type,seqid):

if self.strictWrite:

self.writeI32(TBinaryProtocol.VERSION_1 | type)

self.writeString(name)

self.writeI32(seqid)

else:

self.writeString(name)

self.writeByte(type)

self.writeI32(seqid)

q)      vimlib/cql-1.4.1/cql/cassandra/Cassandra.py

def write(self,oprot):

if oprot.__class__ ==TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None andfastbinary is n

ot None:

oprot.trans.write(fastbinary.encode_binary(self, (self.__class__,self.thrift_spec)))

return

oprot.writeStructBegin(‘execute_cql3_query_args‘)

if self.query is not None:

oprot.writeFieldBegin(‘query‘, TType.STRING, 1)

oprot.writeString(self.query)

oprot.writeFieldEnd()

if self.compression is notNone:

oprot.writeFieldBegin(‘compression‘, TType.I32, 2)

oprot.writeI32(self.compression)

oprot.writeFieldEnd()

if self.consistency is notNone:

oprot.writeFieldBegin(‘consistency‘, TType.I32, 3)

oprot.writeI32(self.consistency)

oprot.writeFieldEnd()

oprot.writeFieldStop()

oprot.writeStructEnd()

时间: 2024-10-16 03:20:37

cassandra cqlsh代码分析的相关文章

java代码分析及分析工具

java代码分析及分析工具 一个项目从搭建开始,开发的初期往往思路比较清晰,代码也比较清晰.随着时间的推移,业务越来越复杂.代码也就面临着耦合,冗余,甚至杂乱,到最后谁都不敢碰. 作为一个互联网电子商务网站的业务支撑系统,业务复杂不言而喻.从09年开始一直沿用到现在,中间代码经过了多少人的手,留下了多少的坑,已经记不清楚了,谁也说不清了. 代码的维护成本越来越高.代码已经急需做调整和改善.最近项目组专门设立了一个小组,利用业余时间做代码分析的工作,目标对核心代码进行分析并进行设计重构. 代码分析

Java静态代码分析工具Infer

Java静态代码分析工具Infer 作者:chszs,转载需注明.博客主页:http://blog.csdn.net/chszs 一.Infer介绍 Infer是Facebook最新开源的静态程序分析工具,用于在发布移动应用之前对代码进行分析,找出潜在的问题.目前Facebook使用此工具分析Facebook的App,包括Android.iOS.Facebook Messenger和Instagram等. Facebook称该工具帮助其每个月检查出应用潜在的数百个Bug,例如一些空指针访问.资源

$*和[email protected]之间区别代码分析

#!/bin/bash set 'apple pie' pears peaches for i in $*           /*单引号被去掉,循环单个字符输出*/ do echo $i done [[email protected] Ex_14.02-14.31]# sh 14-14-1 apple pie pears peaches -------------------------------------------------------------- #!/bin/bash set

《linux 内核完全剖析》 keyboard.S 部分代码分析(key_map)

keyboard.S 部分代码分析(key_map) keyboard中间有这么一段,我一开始没看明白,究竟啥意思 key_map: .byte 0,27 .ascii "1234567890-=" .byte 127,9 .ascii "qwertyuiop[]" .byte 13,0 .ascii "asdfghjkl;'" .byte '`,0 .ascii "\\zxcvbnm,./" .byte 0,'*,0,32

20145234黄斐《网络对抗技术》实验四,恶意代码分析

恶意代码 概述 恶意代码是指故意编制或设置的.对网络或系统会产生威胁或潜在威胁的计算机代码.最常见的恶意代码有计算机病毒(简称病毒).特洛伊木马(简称木马).计算机蠕虫(简称蠕虫).后门.逻辑炸弹等. 特征: 恶意的目的,获取靶机权限.用户隐私等 本身是计算机程序,可以执行,并作用于靶机 通过执行发生作用,一般来说不运行是没问题的 恶意代码分析 在大多数情况下,进行恶意代码分析时,我们将只有恶意代码的可执行文件本身,而这些文件并不是我们人类可读的.为了了解这些文件的意义,你需要使用各种工具和技巧

20145326蔡馨熠《网络对抗》——恶意代码分析

20145326蔡馨熠<网络对抗>--恶意代码分析 1.实验后回答问题 (1)如果在工作中怀疑一台主机上有恶意代码,但只是猜想,所以想监控下系统一天天的到底在干些什么.请设计下你想监控的操作有哪些,用什么方法来监控.. 需要监控什么? 系统中各种程序.文件的行为. 还需要注意是否会出现权限更改的行为. 注册表. 是否有可疑进程. 如果有网络连接的情况,需要注意这个过程中的IP地址与端口. 用什么来监控? 最先想到的肯定是使用wireshark抓包了,再进行进一步分析. Sysinternals

代码分析—“CA0052 没有选择要分析的目标”(VS2012)

情况: 1.未采用代码分析时程序正常编译 2.采用代码分析,会提示"没有选择分析目标"或"未加载制定版本的程序集"...的错误 分析: 是由于代码分析依赖程序集的强签名,包括版本 解决方案: 1.修改代码分析工具的配置项: FxCopCmd.exe.config里节点AssemblyReferenceResolveMode的Value值StrongName修改为StrongNameIgnoringVersion或None 2.修改当前分析的项目: .csproj增加

常用 Java 静态代码分析工具的分析与比较

转载自: http://www.oschina.net/question/129540_23043 简介: 本文首先介绍了静态代码分析的基本概念及主要技术,随后分别介绍了现有 4 种主流 Java 静态代码分析工具 (Checkstyle,FindBugs,PMD,Jtest),最后从功能.特性等方面对它们进行分析和比较,希望能够帮助 Java 软件开发人员了解静态代码分析工具,并选择合适的工具应用到软件开发中. 引言 在 Java 软件开发过程中,开发团队往往要花费大量的时间和精力发现并修改代

驱动相关的内核代码分析

arch\arm\include\asm\Io.h #define __raw_readl(a) (__chk_io_ptr(a), *(volatile unsigned int __force   *)(a)) #define __raw_writel(v,a) (__chk_io_ptr(a), *(volatile unsigned int __force   *)(a) = (v)) 注:(volatile unsigned int __force   *)指针强制转换为unsigne