源码研读Mycat1.6之网络篇---前端线程模型(应用程序与mycat交互篇)上篇
日期: 2017-04-10 分类: 跨站数据测试 282次阅读
Mycat 使用的线程模型也是基于 Reactor 主从设计模式,其设计类图如下:
- NIOAcceptor,在Reactor主从模式中扮演Acceptor与主Reactor角色,主要承担客户端的连接事件(accept)。
- NIOReactorPool,从Reacotr池,acceptor监听客户端连接后,将从该Reactor池中随机选取一个Reactor进行注册读、写事件。
- NioReactor,从Reactor,处理读写事件。
- NioProcessor,先可以这样理解,连接的容器,管理者,每一个连接都会绑定到一个NioProcessor中。
- AbstractConnection,Mycat对SocketChannel的业务抽象,可以看出一条数据库连接。Accecptor监听客户端连接后,包装成Connection,体现出封装性。
- SocketWR,处理网络IO读写,主要职责从通道解析协议包,转换为byte[],供Handler处理。
- NioHandler,具体IO业务处理,在这里不包括命令的解析处理。具有在连接章节重点讨论。
1、源码分析NIOAcceptor
1.1 重要属性与构造方法详解
private static finalAcceptIdGenerator ID_GENERATOR = new AcceptIdGenerator();
private final int port;
private final Selector selector;
private final ServerSocketChannel serverChannel;
private final FrontendConnectionFactory factory;
private long acceptCount;
private final NIOReactorPool reactorPool;
public NIOAcceptor(String name, String bindIp,int port,
FrontendConnectionFactory factory, NIOReactorPool reactorPool)
throws IOException {
super.setName(name);
this.port = port;
this.selector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
this.serverChannel.configureBlocking(false);
/** 设置TCP属性 */
serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); //@1
serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024 * 16 * 2); //@2
// backlog=100
serverChannel.bind(new InetSocketAddress(bindIp, port), 100); //@3
this.serverChannel.register(selector, SelectionKey.OP_ACCEPT); //@4
this.factory = factory;
this.reactorPool = reactorPool;
}
重要属性一览:
- port : 监听端口,例如默认mycat服务端口:8066,管理端口9066
- selector : 选择器,主要关心OP_ACCEPT
- factory : 前端连接工厂,用于将java.nio.SocketChannel包装成NioConnection。
- reactorPool:从Reactor池。
代码@1,设置Socket选项,如果端口关闭,马上可再次使用。
代码@2,设置Socket的接收缓存区大小,32K。
代码@3,端口绑定。
代码@4,注册OP_ACCEPT事件。
1.2 run方法详解
public void run() {
final Selector tSelector = this.selector;
for (;;) {
++acceptCount;
try {
tSelector.select(1000L);
Set<SelectionKey> keys = tSelector.selectedKeys();
try {
for (SelectionKey key : keys) {
if (key.isValid() && key.isAcceptable()) {
accept(); //@1
} else {
key.cancel();
}
}
} finally {
keys.clear();
}
} catch (Exception e) {
LOGGER.warn(getName(), e);
}
}
}
利用select()的多路复用技术,监听客户端的连接事件,并处理,重点关注代码@1,accept()方法的实现:
private void accept() {
SocketChannel channel = null;
try {
channel = serverChannel.accept();
channel.configureBlocking(false);
FrontendConnection c = factory.make(channel); //@1
c.setAccepted(true);
c.setId(ID_GENERATOR.getId());
NIOProcessor processor = (NIOProcessor) MycatServer.getInstance()
.nextProcessor();
c.setProcessor(processor); //@2
NIOReactor reactor = reactorPool.getNextReactor();
reactor.postRegister(c); //@3
} catch (Exception e) {
LOGGER.warn(getName(), e);
closeChannel(channel);
}
}
代码@1,将客户端的Socket连接,包装成Mycat的业务对象,Connection,Connection工厂的作用主要是初始化Socket相关参数,以及为通道关联相关的业务处理器(Handler)。稍候重点浏览一下工厂相关的代码。
代码@2,每个通道从Mycat服务器的NioProcessor池中获取一个,在通道的整个生命周期中,只会与一个NioProcessor打交道。
代码@3,从从Reactor池中轮询获取一个Reacotr,注册该通道的读事件。
1.2.1 关于代码@1,Connection工厂代码如下
关于ServerConnectionFactory,这里有个非常重要的作用,就是设置相关Handler,比如ServerQueryHandler
,LoadDataInfileHandler,ServerPrepareHandler。从这里可以看出,Hander是线程通道安全的,线程安全的。
由代码@3处引出下一个主角,NioReactor,NioReactor将接管通道余下的所有关于读、写操作。
2、源码研读NIOReactor
在NIOAcceptor中,轮询获取一个NIOReactor,调用其postRegister()方法,将连接注册到事件分发器上(Selector),接下来从postRegister方法开始分析。
2.1 postRegister(AbstractConnection c)
final void postRegister(AbstractConnection c) {
reactorR.registerQueue.offer(c);
reactorR.selector.wakeup();
}
mycat Reactor使用的是ConcurrentLikedQueue,无界队列,该方法将连接加入到任务队列尾部,然后唤醒selector.select()方法。
2.2 run 方法详解
@Override
public void run() {
final Selector selector = this.selector;
Set<SelectionKey> keys = null;
for (;;) {
++reactCount;
try {
selector.select(500L);
register(selector); //@1
keys = selector.selectedKeys();
for (SelectionKey key : keys) {
AbstractConnection con = null;
try {
Object att = key.attachment();
if (att != null) {
con = (AbstractConnection) att;
if (key.isValid() && key.isReadable()) {
try {
con.asynRead(); // @2
} catch (IOException e) {
con.close("program err:" + e.toString());
continue;
} catch (Exception e) {
LOGGER.warn("caught err:", e);
con.close("program err:" + e.toString());
continue;
}
}
if (key.isValid() && key.isWritable()) {
con.doNextWriteCheck(); //@3
}
} else {
key.cancel(); //@4
}
} catch (CancelledKeyException e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(con + " socket key canceled");
}
} catch (Exception e) {
LOGGER.warn(con + " " + e);
} catch (final Throwable e){
// Catch exceptions such as OOM and close connection if exists
//so that the reactor can keep running!
// @author Uncle-pan
// @since 2016-03-30
if(con != null){
con.close("Bad: "+e);
}
LOGGER.error("caught err: ", e);
continue;
}
}
} catch (Exception e) {
LOGGER.warn(name, e);
} catch (final Throwable e){
// Catch exceptions such as OOM so that the reactor can keep running!
// @author Uncle-pan
// @since 2016-03-30
LOGGER.error("caught err: ", e);
} finally {
if (keys != null) {
keys.clear();
}
}
}
}
NIOReactor是mycat线程模型的关键,而NioReactor的关键又是run方法,特别是对读写事件的处理。
代码@1,将主Reactor新接收的连接(SocketChannel)注册到从Reactor,用来监听读、写事件。
代码@2,读事件处理。
代码@3,写事件处理逻辑。
代码@4,如果键无效,则取消该键,该键关联的通道将会被关闭。
2.2.1 关于run()方法代码@1详解
private void register(Selector selector) {
AbstractConnection c = null;
if (registerQueue.isEmpty()) { //@1
return;
}
while ((c = registerQueue.poll()) != null) { //@2
try {
((NIOSocketWR) c.getSocketWR()).register(selector); //@3
c.register(); //@4
} catch (Exception e) {
c.close("register err" + e.toString());
}
}
}
代码@1,如果没有新的连接,直接跳过注册这一步。
代码@2,将任务队列中的SocketChannel注册到该Selector上。
代码@3,NIOSocketWR,该类为通道的IO处理类,主要实现asynRead()、doNextWriteCheck()方法。直接调用通道的注册方法。
NIOSocketWR的register方法:
public void register(Selector selector) throws IOException {
try {
processKey = channel.register(selector, SelectionKey.OP_READ, con);
} finally {
if (con.isClosed.get()) {
clearSelectionKey();
}
}
}
代码@4,调用连接的register方法,此处,理解为通道在注册读写事件成功后的事件方法,或钩子方法更加容易理解,诸如这样定义: channelRegister()。mycat作为一个数据库中间件,基于代理模式,对于应用来说,mycat伪装成一个mysql数据库,对于后端真实的mysql,又充当一个客户端的角色。所以,在客户端注册到Reactor后,需要通过该SocketChannel发送心跳包与应用的mysql client进行交互。
mysql客户端与mysql服务端交互步骤为:
客户端首先发起TCP连接,连接服务端,TCP经过三次握手协议之后,建立可靠的传输通道。
- 服务器发送服务器版本号,协议版本号,鉴权相关信息,服务器权能标识等等。
- 客户端收到握手包后向服务端发送登录验证报文,主要包括用户名,数据库名,密码(密文)
- 服务器向客户端发送认证结果报文(OK Package或 Error Package)或其他响应结果。
连接建立后(也就是TCP链路建立后,需要发送握手包给客户端(一般mysql客户端为应用的jdbc驱动连接))
故接下来重点关注一下 FrontendConnection的register方法。
@Override
public void register() throws IOException {
if (!isClosed.get()) {
// 生成认证数据
byte[] rand1 = RandomUtil.randomBytes(8);
byte[] rand2 = RandomUtil.randomBytes(12);
// 保存认证数据
byte[] seed = new byte[rand1.length + rand2.length];
System.arraycopy(rand1, 0, seed, 0, rand1.length);
System.arraycopy(rand2, 0, seed, rand1.length, rand2.length);
this.seed = seed;
// 发送握手数据包
boolean useHandshakeV10 = MycatServer.getInstance().getConfig().getSystem().getUseHandshakeV10() == 1;
if(useHandshakeV10) {
HandshakeV10Packet hs = new HandshakeV10Packet();
hs.packetId = 0;
hs.protocolVersion = Versions.PROTOCOL_VERSION;
hs.serverVersion = Versions.SERVER_VERSION;
hs.threadId = id;
hs.seed = rand1;
hs.serverCapabilities = getServerCapabilities();
hs.serverCharsetIndex = (byte) (charsetIndex & 0xff);
hs.serverStatus = 2;
hs.restOfScrambleBuff = rand2;
hs.write(this);
} else {
HandshakePacket hs = new HandshakePacket();
hs.packetId = 0;
hs.protocolVersion = Versions.PROTOCOL_VERSION;
hs.serverVersion = Versions.SERVER_VERSION;
hs.threadId = id;
hs.seed = rand1;
hs.serverCapabilities = getServerCapabilities();
hs.serverCharsetIndex = (byte) (charsetIndex & 0xff);
hs.serverStatus = 2;
hs.restOfScrambleBuff = rand2;
hs.write(this);
}
// asynread response
this.asynRead();
}
}
该方法的主要目的就是发送mysql握手协议包给客户端,然后客户端收到握手协议包后,发送登陆认证报文给服务器,服务器验证授权信息,通过,则mysql服务端,客户端进入到命运发送应答模式。当然,本方法还有两个关键点,一个是AbstractConnection.writer方法,this.asynRerad()方法,这个两个方法就是对写事件、读事件的处理逻辑,本文暂不关注具体读写事件的处理逻辑,后续文章再重点剖析,在这里,首先要明白的是IO的读,写事件是在IO线程中处理的,也就是NioReactor线程中(Selector所在线程),一个key一个key的处理。大家是否有这样的疑问,既然服务端发送了握手包给客户端,那在哪里处理客户端发送的登陆认证数据包呢?首先,处理客户端的请求的入口肯定是在读事件的处理上,也就是AbstractConnection的asynRead入口中,一路跟踪代码AbstractConnection的handler引起了我的注意,hander.handler(byte[] data);再看FrontendConnection的构造方法,就看出了端倪:
public FrontendConnection(NetworkChannel channel) throws IOException {
super(channel);
InetSocketAddress localAddr = (InetSocketAddress) channel.getLocalAddress();
InetSocketAddress remoteAddr = null;
if (channel instanceof SocketChannel) {
remoteAddr = (InetSocketAddress) ((SocketChannel) channel).getRemoteAddress();
} else if (channel instanceof AsynchronousSocketChannel) {
remoteAddr = (InetSocketAddress) ((AsynchronousSocketChannel) channel).getRemoteAddress();
}
this.host = remoteAddr.getHostString();
this.port = localAddr.getPort();
this.localPort = remoteAddr.getPort();
this.handler = new FrontendAuthenticator(this); //@1
}
代码@1,是关键,FrontendConnection的handler是FrontendAuthenticator,该类主要是从输入流中获取客户端发送的登录验证报文,解析,并进行权限验证。如果授权验证成功,mycat与mysql客户端进入到命令响应模式。关于mysql握手认证协议包,具体情况博文: 除特别声明,本站所有文章均为原创,如需转载请以超级链接形式注明出处:SmartCat's Blog
精华推荐