mycat服务启动{管理模块启动过程}

mycat启动的时候启动了三个模块

1:NIOConnector(负责链接mysql数据库,连接池以数据库为准不以链接字符串为准),

1:NIOAcceptor,ManagerConnectionFactory(管理模块,默认端口9066)

2:NIOAcceptor,ServerConnectionFactory(mysql服务模块,默认端口8066)

这里介绍下管理模块的启动流程

顺序图

NIO和AIO

mycat分别实现了NIO和AIO,由于linux当前没有真正实现AIO这里主要介绍NIO的流程。

NIO的Reactor与AIO的Proactor两种模式的场景区别:
下面是Reactor的做法:
1. 等待事件响应 (Reactor job)
2. 分发 “Ready-to-Read” 事件给用户句柄 ( Reactor job)
3. 读数据 (user handler job)
4. 处理数据( user handler job)
下面再来看看真正意义的异步模式Proactor是如何做的:
1. 等待事件响应 (Proactor job)
2. 读数据 (Proactor job)
3. 分发 “Read-Completed” 事件给用户句柄 (Proactor job)
4. 处理数据(user handler job)

mycat的NIO实现

Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。
Selector可以监听四种不同类型的事件:
- Connect
- Accept
- Read
- Write
这四种事件用SelectionKey的四个常量来表示:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
前面已经说了,NIO采用的Reactor模式:例如汽车是乘客访问的主体(Reactor),乘客上车后,到售票员(acceptor)处登记,之后乘客便可以休息睡觉去了,当到达乘客所要到达的目的地后,售票员将其唤醒即可。

核心顺序

mycat管理端的启动流程

1:new ManagerConnectionFactory extends FrontendConnectionFactory

2:new NIOReactorPool,new NIOReactor,new RW中new ConcurrentLinkedQueue<AbstractConnection>()而AbstractConnection中new NIOSocketWR

3:new NIOAcceptor中向反应堆中注册了OP_ACCEPT,该类继承了Thread然后start启动

accept

			channel = serverChannel.accept();
			channel.configureBlocking(false);
			FrontendConnection c = factory.make(channel);
			c.setAccepted(true);
			c.setId(ID_GENERATOR.getId());
			NIOProcessor processor = (NIOProcessor) MycatServer.getInstance()
					.nextProcessor();
			c.setProcessor(processor);

			LOGGER.info("accept");

			NIOReactor reactor = reactorPool.getNextReactor();
			reactor.postRegister(c);

factory.make(channel):最终构造了ManagerQueryHandler(管理命令解析器)和FrontendAuthenticator(mycat权限解析器)

reactor.postRegister(c):把当前链接添加到reactor的registerQueue中并唤醒reactor的selector

read

在NIOReactor的registerQueue为空的时候run循环空运转,当上一步把accept的链接放到队列的时候则

			for (;;) {

				++reactCount;
				try {
					selector.select(500L);
					register(selector);
					keys = selector.selectedKeys();
					for (SelectionKey key : keys) {
						AbstractConnection con = null;
						try {
							Object att = key.attachment();
							if (att != null) {
								con = (AbstractConnection) att;
								if (key.isValid() && key.isReadable()) {
									try {
										con.asynRead();
									} catch (IOException e) {
                                        con.close("program err:" + e.toString());
										continue;
									} catch (Exception e) {
										LOGGER.debug("caught err:", e);
										con.close("program err:" + e.toString());
										continue;
									}
								}
								if (key.isValid() && key.isWritable()) {
									con.doNextWriteCheck();
								}
							} else {
								key.cancel();
							}
                        } catch (CancelledKeyException e) {
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug(con + " socket key canceled");
                            }
                        } catch (Exception e) {
                            LOGGER.warn(con + " " + e);
                        }
					}
				} catch (Exception e) {
					LOGGER.warn(name, e);
				} finally {
					if (keys != null) {
						keys.clear();
					}

				}

register(selector);也即

((NIOSocketWR) c.getSocketWR()).register(selector); 注册OP_READ事件
c.register();即FrontendConnection的register发送握手数据包

con.asynRead();即NIOSocketWR的asynRead即

	public void asynRead() throws IOException {
		LOGGER.info("asynRead");
		ByteBuffer theBuffer = con.readBuffer;
		if (theBuffer == null) {
			theBuffer = con.processor.getBufferPool().allocate();
			con.readBuffer = theBuffer;
		}
		int got = channel.read(theBuffer);
		con.onReadData(got);

	}

con.onReadData(got);即AbstractConnection的onReadData这里拆包得到完成的数据包后调用

handler.handle(data);也即FrontendAuthenticator的handle在这里check user;check password;check schema如果失败则将失败信息写入缓冲区,如果成功

则把AbstractConnection的默认hander从FrontendAuthenticator换成FrontendCommandHandler等待接下来的处理(比如show命令等,

以上的处理是发生在输入mysql -utest -ptest -h10.97.177.83 -P9066时)

认证完成后下一次的handler.handle(data)则使用FrontendCommandHandler的handle来处理也即

    public void handle(byte[] data)
    {
        if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData())
        {
            MySQLMessage mm = new MySQLMessage(data);
            int  packetLength = mm.readUB3();
            if(packetLength+4==data.length)
            {
                source.loadDataInfileData(data);
            }
            return;
        }
        switch (data[4])
        {
            case MySQLPacket.COM_INIT_DB:
                commands.doInitDB();
                source.initDB(data);
                break;
            case MySQLPacket.COM_QUERY:
                commands.doQuery();
                source.query(data);
                break;
            case MySQLPacket.COM_PING:
                commands.doPing();
                source.ping();
                break;
            case MySQLPacket.COM_QUIT:
                commands.doQuit();
                source.close("quit cmd");
                break;
            case MySQLPacket.COM_PROCESS_KILL:
                commands.doKill();
                source.kill(data);
                break;
            case MySQLPacket.COM_STMT_PREPARE:
                commands.doStmtPrepare();
                source.stmtPrepare(data);
                break;
            case MySQLPacket.COM_STMT_EXECUTE:
                commands.doStmtExecute();
                source.stmtExecute(data);
                break;
            case MySQLPacket.COM_STMT_CLOSE:
                commands.doStmtClose();
                source.stmtClose(data);
                break;
            case MySQLPacket.COM_HEARTBEAT:
                commands.doHeartbeat();
                source.heartbeat(data);
                break;
            default:
                     commands.doOther();
                     source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,
                             "Unknown command");

        }
    }

source.query(data);即queryHandler.query(sql);这里的queryHandler是ManagerQueryHandler即

    public void query(String sql) {
        ManagerConnection c = this.source;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(new StringBuilder().append(c).append(sql).toString());
        }
        int rs = ManagerParse.parse(sql);
        switch (rs & 0xff) {
            case ManagerParse.SELECT:
                SelectHandler.handle(sql, c, rs >>> SHIFT);
                break;
            case ManagerParse.SET:
                c.write(c.writeToBuffer(OkPacket.OK, c.allocate()));
                break;
            case ManagerParse.SHOW:
                ShowHandler.handle(sql, c, rs >>> SHIFT);
                break;
            case ManagerParse.SWITCH:
                SwitchHandler.handler(sql, c, rs >>> SHIFT);
                break;
            case ManagerParse.KILL_CONN:
                KillConnection.response(sql, rs >>> SHIFT, c);
                break;
            case ManagerParse.OFFLINE:
                Offline.execute(sql, c);
                break;
            case ManagerParse.ONLINE:
                Online.execute(sql, c);
                break;
            case ManagerParse.STOP:
                StopHandler.handle(sql, c, rs >>> SHIFT);
                break;
            case ManagerParse.RELOAD:
                ReloadHandler.handle(sql, c, rs >>> SHIFT);
                break;
            case ManagerParse.ROLLBACK:
                RollbackHandler.handle(sql, c, rs >>> SHIFT);
                break;
            case ManagerParse.CLEAR:
                ClearHandler.handle(sql, c, rs >>> SHIFT);
                break;
            case ManagerParse.CONFIGFILE:
                ConfFileHandler.handle(sql, c);
                break;
            case ManagerParse.LOGFILE:
                ShowServerLog.handle(sql, c);
                break;
            default:
                c.writeErrMessage(ErrorCode.ER_YES, "Unsupported statement");
        }
    }

总结

mycat的网络处理逻辑上是通过队列加上后台线程来实现了accept和read的解耦从而实现了高性能,但是代码写的就不敢恭维。

时间: 2024-10-28 22:04:29

mycat服务启动{管理模块启动过程}的相关文章

启动期间的内存管理之初始化过程概述----Linux内存管理(九)

日期 内核版本 架构 作者 GitHub CSDN 2016-06-14 Linux-4.7 X86 & arm gatieme LinuxDeviceDrivers Linux内存管理 在内存管理的上下文中, 初始化(initialization)可以有多种含义. 在许多CPU上, 必须显式设置适用于Linux内核的内存模型. 例如在x86_32上需要切换到保护模式, 然后内核才能检测到可用内存和寄存器. 而我们今天要讲的boot阶段就是系统初始化阶段使用的内存分配器. 1 前景回顾 1.1

centOS7服务管理与启动流程

centOS7服务管理与启动流程 centOS7启动流程 systemd简介 unit对象 unit类型 特性 service unit文件格式 service unit file文件通常由三部分组成 unit段的常用选项 Service段的常用选项 Install段的常用选项 管理服务 管理系统服务 服务查看 chkconfig命令的对应关系 其他命令 服务状态 systemctl示例 运行级别 运行级别与target的对照 运行级别的切换 CentOS7引导顺序 设置简单的内核参数 简单的启

交通银行 Java Socket 服务启动 管理 WINDOWS 版

按照交通银行提供的无界面启动方法试验了很多次,都没有成功,所以自己动手用C# 知识写了一个. 小工具可以判断 交通银行 JAVA SOCKET 服务是否启动,并可以启动/关闭服务 主要代码如下: 判断服务是否启动 引用 :using System.Management; SelectQuery selectQuery = new SelectQuery(“select * from Win32_Process where Name = ‘java.exe’”); object cmdLine =

python爬虫主要就是五个模块:爬虫启动入口模块,URL管理器存放已经爬虫的URL和待爬虫URL列表,html下载器,html解析器,html输出器 同时可以掌握到urllib2的使用、bs4(BeautifulSoup)页面解析器、re正则表达式、urlparse、python基础知识回顾(set集合操作)等相关内容。

本次python爬虫百步百科,里面详细分析了爬虫的步骤,对每一步代码都有详细的注释说明,可通过本案例掌握python爬虫的特点: 1.爬虫调度入口(crawler_main.py) # coding:utf-8from com.wenhy.crawler_baidu_baike import url_manager, html_downloader, html_parser, html_outputer print "爬虫百度百科调度入口" # 创建爬虫类class SpiderMai

Ubuntu管理开机启动服务项 -- 图形界面的Boot-up Manager

有时学习时安装的服务太多,比如mysql.mongodb.redis.apache.nginx等等,它们都是默认开机启动的,如果不想让它们开机启动,用到时再自己手工启动怎么办呢? 使用sysv-rc-conf确实是一个不错的选择,但在暂时不了解服务启动的层级细节,又只需要一次过全关掉的情况下,用sysv-rc-conf未免过于繁琐. 好在我们还有图形界面下的工具Boot-up Manager,即bum 1 sudo apt-get install bum 安装之后以root身份运行,就可以直接对

linux 学习15 16 启动管理,备份和恢复

第十五讲 启动管理 15. 1 CentOS 6.x 启动管理 //此处指6.3 15.1.1 系统运行级别 1.运行级别 运行级别 含 义 0 关机 1 单用户模式,可以想象为windows的安全模式,主要用于系统修复 //linux 有另外的安全模式 2 不完全的命令行模式,不含NFS服务 //NFS,linux之间文件共享 3 完全的命令行模式,就是标准字符界面 //就是我们当前使用的 4 系统保留 5 图形模式 6 重启动 2.运行级别命令 [[email protected] ~]#

Openstack学习笔记之——Neutron-server服务加载与启动源码分析(三)

本文是在学习Openstack的过程中整理和总结,由于时间和个人能力有限,错误之处在所难免,欢迎指正! 在Neutron-server服务加载与启动源码分析(二)中搞定模块功能的扩展和加载,我们就回到Neutron-server服务加载与启动源码分析(一)中的_run_wsgi函数 <span style="font-size:14px;">def _run_wsgi(app_name): app = config.load_paste_app(app_name) ifno

Linux学习笔记(22) Linux启动管理

1. 系统运行级别 运行级别 含义 0 关机 1 单用户模式,可想象为windows的安全模式,主要用于系统修复 2 不完全的命令行模式,不含NFS服务 3 完全的命令行模式,就是标准字符界面 4 系统保留 5 图形模式 6 重启动 (1) 运行级别命令 runlevel #查看运行级别命令 N表示进入3前面的级别 init 运行级别 #改变运行级别命令 (2) 系统默认运行级别 在配置文件/etc/inittab中进行修改即可 id:3:initdefault: #系统开机后直接进入哪个运行级

关于sql server 2008过期导致 MSSQLSERVER服务就无法启动,手动启动就报告错误代码17051。

1.基本现象:MSSQLSERVER服务就无法启动,手动启动就报告17051错误. 2.解决办法: 第一步:进入SQL2008配置工具中的安装中心, 第二步:再进入维护界面,选择版本升级, 第三步:进入产品密钥,输入密钥 Developer: PTTFM-X467G-P7RH2-3Q6CG-4DMYB Enterprise: JD8Y6-HQG69-P9H84-XDTPG-34MBB 第四步:一直点下一步,直到升级完毕. 最后用key升级成功后即可启动MSSQLSERVER服务. 另外,关于连接