Young87

SmartCat's Blog

So happy to code my life!

游戏开发交流QQ群号60398951

当前位置:首页 >跨站数据测试

源码研读Mycat1.6之网络篇---前端线程模型(应用程序与mycat交互篇)上篇

Mycat 使用的线程模型也是基于 Reactor 主从设计模式,其设计类图如下:

  1. NIOAcceptor,在Reactor主从模式中扮演Acceptor与主Reactor角色,主要承担客户端的连接事件(accept)。
  2. NIOReactorPool,从Reacotr池,acceptor监听客户端连接后,将从该Reactor池中随机选取一个Reactor进行注册读、写事件。
  3. NioReactor,从Reactor,处理读写事件。
  4. NioProcessor,先可以这样理解,连接的容器,管理者,每一个连接都会绑定到一个NioProcessor中。
  5. AbstractConnection,Mycat对SocketChannel的业务抽象,可以看出一条数据库连接。Accecptor监听客户端连接后,包装成Connection,体现出封装性。
  6. SocketWR,处理网络IO读写,主要职责从通道解析协议包,转换为byte[],供Handler处理。
  7. NioHandler,具体IO业务处理,在这里不包括命令的解析处理。具有在连接章节重点讨论。

1、源码分析NIOAcceptor

1.1 重要属性与构造方法详解

private static finalAcceptIdGenerator ID_GENERATOR = new AcceptIdGenerator();
    private final int port;
    private final Selector selector;
    private final ServerSocketChannel serverChannel;
    private final FrontendConnectionFactory factory;
    private long acceptCount;
    private final NIOReactorPool reactorPool;
public NIOAcceptor(String name, String bindIp,int port, 
            FrontendConnectionFactory factory, NIOReactorPool reactorPool)
            throws IOException {
        super.setName(name);
        this.port = port;
        this.selector = Selector.open();
        this.serverChannel = ServerSocketChannel.open();
        this.serverChannel.configureBlocking(false);
        /** 设置TCP属性 */
        serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);           //@1
        serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 1024 * 16 * 2);   //@2
        // backlog=100 
        serverChannel.bind(new InetSocketAddress(bindIp, port), 100);                          //@3     
        this.serverChannel.register(selector, SelectionKey.OP_ACCEPT);                          //@4
        this.factory = factory;
        this.reactorPool = reactorPool;
    }

重要属性一览:

  • port : 监听端口,例如默认mycat服务端口:8066,管理端口9066
  • selector : 选择器,主要关心OP_ACCEPT
  • factory : 前端连接工厂,用于将java.nio.SocketChannel包装成NioConnection。
  • reactorPool:从Reactor池。

代码@1,设置Socket选项,如果端口关闭,马上可再次使用。

代码@2,设置Socket的接收缓存区大小,32K。

代码@3,端口绑定。

代码@4,注册OP_ACCEPT事件。

1.2 run方法详解

public void run() {
        final Selector tSelector = this.selector;
        for (;;) {
            ++acceptCount;
            try {
                tSelector.select(1000L);
                Set<SelectionKey> keys = tSelector.selectedKeys();
                try {
                    for (SelectionKey key : keys) {
                        if (key.isValid() && key.isAcceptable()) {
                            accept();    //@1
                        } else {
                            key.cancel();
                        }
                    }
                } finally {
                    keys.clear();
                }
            } catch (Exception e) {
                LOGGER.warn(getName(), e);
            }
        }
    }

利用select()的多路复用技术,监听客户端的连接事件,并处理,重点关注代码@1,accept()方法的实现:

private void accept() {
        SocketChannel channel = null;
        try {
            channel = serverChannel.accept();
            channel.configureBlocking(false);
            FrontendConnection c = factory.make(channel);      //@1
            c.setAccepted(true);
            c.setId(ID_GENERATOR.getId());
            NIOProcessor processor = (NIOProcessor) MycatServer.getInstance()
                    .nextProcessor();
            c.setProcessor(processor);      //@2

            NIOReactor reactor = reactorPool.getNextReactor();    
            reactor.postRegister(c);                                                    //@3

        } catch (Exception e) {
            LOGGER.warn(getName(), e);
            closeChannel(channel);
        }
    }

代码@1,将客户端的Socket连接,包装成Mycat的业务对象,Connection,Connection工厂的作用主要是初始化Socket相关参数,以及为通道关联相关的业务处理器(Handler)。稍候重点浏览一下工厂相关的代码。

代码@2,每个通道从Mycat服务器的NioProcessor池中获取一个,在通道的整个生命周期中,只会与一个NioProcessor打交道。

代码@3,从从Reactor池中轮询获取一个Reacotr,注册该通道的读事件。

1.2.1 关于代码@1,Connection工厂代码如下




关于ServerConnectionFactory,这里有个非常重要的作用,就是设置相关Handler,比如ServerQueryHandler

,LoadDataInfileHandler,ServerPrepareHandler。从这里可以看出,Hander是线程通道安全的,线程安全的。

 

由代码@3处引出下一个主角,NioReactor,NioReactor将接管通道余下的所有关于读、写操作。

2、源码研读NIOReactor

在NIOAcceptor中,轮询获取一个NIOReactor,调用其postRegister()方法,将连接注册到事件分发器上(Selector),接下来从postRegister方法开始分析。

2.1 postRegister(AbstractConnection c)

final void postRegister(AbstractConnection c) {
        reactorR.registerQueue.offer(c);
        reactorR.selector.wakeup();
}

mycat Reactor使用的是ConcurrentLikedQueue,无界队列,该方法将连接加入到任务队列尾部,然后唤醒selector.select()方法。

2.2 run 方法详解

@Override
        public void run() {
            final Selector selector = this.selector;
            Set<SelectionKey> keys = null;
            for (;;) {
                ++reactCount;
                try {
                    selector.select(500L);
                    register(selector);                                                      //@1
                    keys = selector.selectedKeys();
                    for (SelectionKey key : keys) {
                        AbstractConnection con = null;
                        try {
                            Object att = key.attachment();
                            if (att != null) {
                                con = (AbstractConnection) att;
                                if (key.isValid() && key.isReadable()) {
                                    try {
                                        con.asynRead();                                      // @2
                                    } catch (IOException e) {
                                        con.close("program err:" + e.toString());
                                        continue;
                                    } catch (Exception e) {
                                        LOGGER.warn("caught err:", e);
                                        con.close("program err:" + e.toString());
                                        continue;
                                    }
                                }
                                if (key.isValid() && key.isWritable()) {
                                    con.doNextWriteCheck();                        //@3
                                }
                            } else {
                                key.cancel();                                              //@4
                            }
                        } catch (CancelledKeyException e) {
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug(con + " socket key canceled");
                            }
                        } catch (Exception e) {
                            LOGGER.warn(con + " " + e);
                        } catch (final Throwable e){
                            // Catch exceptions such as OOM and close connection if exists
                            //so that the reactor can keep running!
                            // @author Uncle-pan
                            // @since 2016-03-30
                            if(con != null){
                                con.close("Bad: "+e);
                            }
                            LOGGER.error("caught err: ", e);
                            continue;
                        }
                    }
                } catch (Exception e) {
                    LOGGER.warn(name, e);
                } catch (final Throwable e){
                    // Catch exceptions such as OOM so that the reactor can keep running!
                    // @author Uncle-pan
                    // @since 2016-03-30
                    LOGGER.error("caught err: ", e);
                } finally {
                    if (keys != null) {
                        keys.clear();
                    }

                }
            }
        }

NIOReactor是mycat线程模型的关键,而NioReactor的关键又是run方法,特别是对读写事件的处理。

代码@1,将主Reactor新接收的连接(SocketChannel)注册到从Reactor,用来监听读、写事件。

代码@2,读事件处理。

代码@3,写事件处理逻辑。

代码@4,如果键无效,则取消该键,该键关联的通道将会被关闭。

2.2.1 关于run()方法代码@1详解

private void register(Selector selector) {
            AbstractConnection c = null;
            if (registerQueue.isEmpty()) {       //@1
                return;
            }
            while ((c = registerQueue.poll()) != null) {     //@2
                try {
                    ((NIOSocketWR) c.getSocketWR()).register(selector);   //@3
                    c.register();                                                                    //@4
                } catch (Exception e) {
                    c.close("register err" + e.toString());
                }
            }
        }

代码@1,如果没有新的连接,直接跳过注册这一步。

代码@2,将任务队列中的SocketChannel注册到该Selector上。

代码@3,NIOSocketWR,该类为通道的IO处理类,主要实现asynRead()、doNextWriteCheck()方法。直接调用通道的注册方法。

NIOSocketWR的register方法:

public void register(Selector selector) throws IOException {
        try {
            processKey = channel.register(selector, SelectionKey.OP_READ, con);
        } finally {
            if (con.isClosed.get()) {
                clearSelectionKey();
            }
        }
    }

代码@4,调用连接的register方法,此处,理解为通道在注册读写事件成功后的事件方法,或钩子方法更加容易理解,诸如这样定义: channelRegister()。mycat作为一个数据库中间件,基于代理模式,对于应用来说,mycat伪装成一个mysql数据库,对于后端真实的mysql,又充当一个客户端的角色。所以,在客户端注册到Reactor后,需要通过该SocketChannel发送心跳包与应用的mysql client进行交互。

mysql客户端与mysql服务端交互步骤为:

客户端首先发起TCP连接,连接服务端,TCP经过三次握手协议之后,建立可靠的传输通道。

  1. 服务器发送服务器版本号,协议版本号,鉴权相关信息,服务器权能标识等等。
  2. 客户端收到握手包后向服务端发送登录验证报文,主要包括用户名,数据库名,密码(密文)
  3. 服务器向客户端发送认证结果报文(OK Package或 Error Package)或其他响应结果。

连接建立后(也就是TCP链路建立后,需要发送握手包给客户端(一般mysql客户端为应用的jdbc驱动连接))

故接下来重点关注一下 FrontendConnection的register方法。

@Override
 public void register() throws IOException {
  if (!isClosed.get()) {
   // 生成认证数据
   byte[] rand1 = RandomUtil.randomBytes(8);
   byte[] rand2 = RandomUtil.randomBytes(12);
   // 保存认证数据
   byte[] seed = new byte[rand1.length + rand2.length];
   System.arraycopy(rand1, 0, seed, 0, rand1.length);
   System.arraycopy(rand2, 0, seed, rand1.length, rand2.length);
   this.seed = seed;
   // 发送握手数据包
   boolean useHandshakeV10 = MycatServer.getInstance().getConfig().getSystem().getUseHandshakeV10() == 1;
   if(useHandshakeV10) {
    HandshakeV10Packet hs = new HandshakeV10Packet();
    hs.packetId = 0;
    hs.protocolVersion = Versions.PROTOCOL_VERSION;
    hs.serverVersion = Versions.SERVER_VERSION;
    hs.threadId = id;
    hs.seed = rand1;
    hs.serverCapabilities = getServerCapabilities();
    hs.serverCharsetIndex = (byte) (charsetIndex & 0xff);
    hs.serverStatus = 2;
    hs.restOfScrambleBuff = rand2;
    hs.write(this);
   } else {
    HandshakePacket hs = new HandshakePacket();
    hs.packetId = 0;
    hs.protocolVersion = Versions.PROTOCOL_VERSION;
    hs.serverVersion = Versions.SERVER_VERSION;
    hs.threadId = id;
    hs.seed = rand1;
    hs.serverCapabilities = getServerCapabilities();
    hs.serverCharsetIndex = (byte) (charsetIndex & 0xff);
    hs.serverStatus = 2;
    hs.restOfScrambleBuff = rand2;
    hs.write(this);
   }
   // asynread response
   this.asynRead();
  }
 }

该方法的主要目的就是发送mysql握手协议包给客户端,然后客户端收到握手协议包后,发送登陆认证报文给服务器,服务器验证授权信息,通过,则mysql服务端,客户端进入到命运发送应答模式。当然,本方法还有两个关键点,一个是AbstractConnection.writer方法,this.asynRerad()方法,这个两个方法就是对写事件、读事件的处理逻辑,本文暂不关注具体读写事件的处理逻辑,后续文章再重点剖析,在这里,首先要明白的是IO的读,写事件是在IO线程中处理的,也就是NioReactor线程中(Selector所在线程),一个key一个key的处理。大家是否有这样的疑问,既然服务端发送了握手包给客户端,那在哪里处理客户端发送的登陆认证数据包呢?首先,处理客户端的请求的入口肯定是在读事件的处理上,也就是AbstractConnection的asynRead入口中,一路跟踪代码AbstractConnection的handler引起了我的注意,hander.handler(byte[] data);再看FrontendConnection的构造方法,就看出了端倪:

public FrontendConnection(NetworkChannel channel) throws IOException {
        super(channel);
        InetSocketAddress localAddr = (InetSocketAddress) channel.getLocalAddress();
        InetSocketAddress remoteAddr = null;
        if (channel instanceof SocketChannel) {
            remoteAddr = (InetSocketAddress) ((SocketChannel) channel).getRemoteAddress();    

        } else if (channel instanceof AsynchronousSocketChannel) {
            remoteAddr = (InetSocketAddress) ((AsynchronousSocketChannel) channel).getRemoteAddress();
        }

        this.host = remoteAddr.getHostString();
        this.port = localAddr.getPort();
        this.localPort = remoteAddr.getPort();
        this.handler = new FrontendAuthenticator(this);     //@1
    }

代码@1,是关键,FrontendConnection的handler是FrontendAuthenticator,该类主要是从输入流中获取客户端发送的登录验证报文,解析,并进行权限验证。如果授权验证成功,mycat与mysql客户端进入到命令响应模式。关于mysql握手认证协议包,具体情况博文:

除特别声明,本站所有文章均为原创,如需转载请以超级链接形式注明出处:SmartCat's Blog

上一篇: 源码分析mycat1.6之网络篇---前端线程模型下篇(读写事件篇)

下一篇: 从.properties文件中获取配置数据的方法小结

精华推荐