源码分析mycat1.6之网络篇----前后端连接交互设计(mycat命令处理流程)
日期: 2017-04-20 分类: 跨站数据测试 378次阅读
本章将从如下3个方面剖析mycat前后端交互原理:
1、mycat握手认证阶段、命令处理阶段切换
2、mycat前后端如何交互
3、后端连接及IO线程模型。
1、mycat握手认证阶段、命令处理阶段切换
从上篇mycat前端线程模型可以看到,mycat对应用程序来说将自己伪装成mysql服务器(实现mysql通信协议)接受客户端的命令,比如查询,更新等命令。mycat前端线程模型基于主从Reactor模式,我们再简单的回顾一下其处理流程:NioAcceptor接受一个客户端连接,封装成一个FrontedConnection,将其转发到NioReactor中,NioReactor接管其读写事件的处理,一个客户端连接一旦连接成功,mycat会向客户端发送握手包完成与客户端的握手认证协议过程,该过程是通过FrontedConnection的register方法(其实是channelRegister事件方法更容易理解)和FrontedConnection的初始handler(FrontedAuthenticator)来完成的。认证信息完成后,该FrontedConnection是如何切换到命令处理模式的呢?其核心关键想必在FrontedAuthenticator中实现:其关键点如下:
protected void success(AuthPacket auth) {
source.setAuthenticated(true);
source.setUser(auth.user);
source.setSchema(auth.database);
source.setCharsetIndex(auth.charsetIndex);
source.setHandler(new FrontendCommandHandler(source)); // @1
if (LOGGER.isInfoEnabled()) {
StringBuilder s = new StringBuilder();
s.append(source).append('\'').append(auth.user).append("' login success");
byte[] extra = auth.extra;
if (extra != null && extra.length > 0) {
s.append(",extra:").append(new String(extra));
}
LOGGER.info(s.toString());
}
ByteBuffer buffer = source.allocate();
source.write(source.writeToBuffer(AUTH_OK, buffer)); // @2
boolean clientCompress = Capabilities.CLIENT_COMPRESS==(Capabilities.CLIENT_COMPRESS & auth.clientFlags); // @3
boolean usingCompress= MycatServer.getInstance().getConfig().getSystem().getUseCompression()==1 ;
if(clientCompress&&usingCompress)
{
source.setSupportCompress(true);
}
}
代码@1,关键中的关键,从握手认证状态进入到命令处理阶段,就是改变一下FrontedConnection的handler,从上文的讲解已经知道,NIOSockerWR在读事件处理时,,每成功解一个mysql包,就会交给handler处理。切换成FrontendCommandHandler处理器,进入到命令执行阶段,就这么简单。
代码@2,发送OK报文给mysql客户端。
代码@3,从认证授权包得知客户端支持的权能标记,mycat只处理了是否支持压缩。
2、前端连接与后端连接交互
上面已经剖析了mycat是如何从握手认证协议阶段向命令执行模式转变的,前端连接与后端连接的交互入口在FrontendCommandHandler处理器,该handler处理客户端发送的命令报文,故接下来将重点讲解FrontendCommandHandler处理器,源码如下:
/**
* 前端命令处理器
*
* @author mycat
*/
public class FrontendCommandHandler implements NIOHandler
{
protected final FrontendConnection source;
protected final CommandCount commands;
public FrontendCommandHandler(FrontendConnection source)
{
this.source = source;
this.commands = source.getProcessor().getCommands();
}
@Override
public void handle(byte[] data)
{
if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData())
{
MySQLMessage mm = new MySQLMessage(data);
int packetLength = mm.readUB3();
if(packetLength+4==data.length)
{
source.loadDataInfileData(data);
}
return;
}
switch (data[4])
{
case MySQLPacket.COM_INIT_DB:
commands.doInitDB();
source.initDB(data);
break;
case MySQLPacket.COM_QUERY:
commands.doQuery();
source.query(data);
break;
case MySQLPacket.COM_PING:
commands.doPing();
source.ping();
break;
case MySQLPacket.COM_QUIT:
commands.doQuit();
source.close("quit cmd");
break;
case MySQLPacket.COM_PROCESS_KILL:
commands.doKill();
source.kill(data);
break;
case MySQLPacket.COM_STMT_PREPARE:
commands.doStmtPrepare();
source.stmtPrepare(data);
break;
case MySQLPacket.COM_STMT_SEND_LONG_DATA:
commands.doStmtSendLongData();
source.stmtSendLongData(data);
break;
case MySQLPacket.COM_STMT_RESET:
commands.doStmtReset();
source.stmtReset(data);
break;
case MySQLPacket.COM_STMT_EXECUTE:
commands.doStmtExecute();
source.stmtExecute(data);
break;
case MySQLPacket.COM_STMT_CLOSE:
commands.doStmtClose();
source.stmtClose(data);
break;
case MySQLPacket.COM_HEARTBEAT:
commands.doHeartbeat();
source.heartbeat(data);
break;
default:
commands.doOther();
source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,
"Unknown command");
}
}
}
核心代码讲解:CommandCount ,命令执行统计,每调用一次,相关命令计数增加1。然后将命令包转发给前端连接来处理。我们就以查询命令COM_QUERY来处理,跟踪为FrontendConnection的 query方法。至于为什么这里是用switch(data[4]),这是sql命令协议包结构决定的,如果不明白请看: 除特别声明,本站所有文章均为原创,如需转载请以超级链接形式注明出处:SmartCat's Blog
精华推荐