现在的位置: 首页 > 综合 > 正文

jetty的SelectChannelConnector分析

2014年07月04日 ⁄ 综合 ⁄ 共 15319字 ⁄ 字号 评论关闭

以前的一篇文章曾经分析过jetty的socketConnector,其实它的实现相对来说是比较简单的了,但是由于它是阻塞的io,所以在性能上并不好。。一般情况下都推荐使用SelectChannelConnector来替换掉。。。也就是所谓的NioConnector

SelectChannelConnector的实现就要复杂的多了。。而且因为将底层的io揉在了一起。。。。所以感觉这一块的代码不是很干净。。。又在想自己是否可以将底层的I/O全部替换掉,换成netty的实现。。。。。


好了。。先来看看SelectChannelConnector的类图吧:



在这里将比较重要的关联关系也展现了出来。。。

SelectorManager,从名字就可以看出它用来管理selector,它内部可能会有多个SelectSet,其实一般情况下都只有一个,SelectSet里面则是真正的管理selector的运行,以及注册事件的维护。。。


首先我们来看看SelectChannelConnector的一些重要属性吧:

    private transient ServerSocketChannel _acceptChannel;  //服务器监听channel
    private long _lowResourcesConnections;
    private long _lowResourcesMaxIdleTime;   

    private SelectorManager _manager = new SelectorManager()  //创建selectManager对象
    {
        protected SocketChannel acceptChannel(SelectionKey key) throws IOException
        {
            // TODO handle max connections
            SocketChannel channel = ((ServerSocketChannel)key.channel()).accept();  //用于后去连接的channel
            if (channel==null)
                return null;
            channel.configureBlocking(false);  //将接收到的socketChannel设置为非阻塞的
            Socket socket = channel.socket();
            configure(socket);
            return channel;
        }

        public boolean dispatch(Runnable task) throws IOException
        {
            return getThreadPool().dispatch(task);  //将这个task放到线程池里面去运行
        }

        protected void endPointClosed(SelectChannelEndPoint endpoint)
        {
            // TODO handle max connections and low resources
            connectionClosed((HttpConnection)endpoint.getConnection());
        }

        protected void endPointOpened(SelectChannelEndPoint endpoint)
        {
            // TODO handle max connections and low resources
            connectionOpened((HttpConnection)endpoint.getConnection());
        }

        //用于创建httpconnection
        protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
        {
            return SelectChannelConnector.this.newConnection(channel,endpoint);
        }
        //用于创建ConnectorEndPoint
        protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
        {
            return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
        }
    };

首先最重要的当然是server端的监听器了。。。其次就是定义了SelectorManager,因为是一个抽象类,所以还具体的实现了一些他的方法,例如如何接收远程连接,如何dispatch任务等等。。。

我们知道,在server启动的时候,会启动内部的connector,那么我们这里来看看这个connector的启动过程吧:

    //connector的启动过程
    protected void doStart() throws Exception {
        _manager.setSelectSets(getAcceptors());   //设置selectSet的数量,也就是每一个监听器都一个selector
        _manager.setMaxIdleTime(getMaxIdleTime());  //设置最大空闲时间
        _manager.setLowResourcesConnections(getLowResourcesConnections());
        _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
        _manager.start();   //启动manager
        open();   //这里会建立serversocketchannel
        _manager.register(_acceptChannel);  //将serversocketrChannler注册到selector上面去,其实是放在这个select的change数组上面,在select真正开始之前它会被注册到selector上面去
        super.doStart();
    }

首先是设置selectmanager的selectset的数量,因为只有一个监听,所以值也就是1了。。。

然后设置连接的最大空闲时间。。。低水平的链接数量啥的,然后就是selectormanager的启动,

open是打开当前的监听器。。然后将这个监听器注册到selectorManager上,。。。最后才是父类的dostart

那么我们在这里先来看看selectmanager的启动吧:

    protected void doStart() throws Exception {
        _selectSet = new SelectSet[_selectSets];   //创建selectSets数组,一般情况下都是只有一个,毕竟一般都是只监听一个端口嘛
        for (int i=0;i<_selectSet.length;i++)
            _selectSet[i]= new SelectSet(i);  //创建

        super.doStart();
    }

其实没有太多的内容,无非就是根据监听器的数量来创建selectset,那么我们来看看这个selectset的创建和它的一些重要属性吧:

        private transient int _change;  //当前用哪一个change数组,0,1里面互换
        private transient List[] _changes;  //数组,用于存放需要注册到selector的channel或者任务什么的,反正就是需要一些改变的事情
        private transient Timeout _idleTimeout;  //空闲的timeout任务
        private transient int _nextSet;  //下一次应该放在哪一个set
        private transient Timeout _retryTimeout;
        private transient Selector _selector;  //nio的selector对象
        private transient int _setID;   
        private transient boolean _selecting;
        private transient int _jvmBug;
        
        /* ------------------------------------------------------------ */
        SelectSet(int acceptorID) throws Exception { //创建来的是acceptor的编号,其实一般都是1了
            _setID=acceptorID;

            _idleTimeout = new Timeout(this);  //创建空闲超时队列
            _idleTimeout.setDuration(getMaxIdleTime());  //
            _retryTimeout = new Timeout(this); //创建超市
            _retryTimeout.setDuration(0L);  //默认的保底持续时间0

            // create a selector;
            _selector = Selector.open();  //创建selector
            _changes = new ArrayList[] {new ArrayList(),new ArrayList()};  //changes变量,用于保存需要注册人的channel
            _change=0;
        }

这里可以看到比较重要的selector的创建了。。也就可以知道,真正的select的流程是在selectSet里面执行的。。。

这里比较有意思的是change数组,它其实是用于临时保存所有的一些改动,例如selectkey上面关心的事件啥的。。。所有的这些改动都会在select执行之前被更新。。。

好了,SelectManager的启动就先到这吧,

回到上面,来看看SelectConnecor的open方法吧:

   //这里用于打开监听,创建serversocketchannel,并且还要将其设置为非阻塞的
    public void open() throws IOException {
        synchronized(this) {
            if (_acceptChannel == null) {
                // Create a new server socket
                _acceptChannel = ServerSocketChannel.open();  //创建serversocketchannel

                // Bind the server socket to the local host and port
                _acceptChannel.socket().setReuseAddress(getReuseAddress());
                InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
                _acceptChannel.socket().bind(addr,getAcceptQueueSize());

                // Set to non blocking mode
                _acceptChannel.configureBlocking(false);  //设置为非阻塞的模式
                
            }
        }
    }

这里其实就可以看到首先是创建了serversocketChannel,然后设置了它的地址啥的。。最后还将其设置为非阻塞。。。那么接下来就应该看看它是如何被注册到selector上面去了。。。 _manager.register(_acceptChannel);

 _manager.register(_acceptChannel);
    //在selector上面注册ServerChannel,其实是将其放到SelectSet上面去
    public void register(ServerSocketChannel acceptChannel) throws IOException {
        int s=_set++; 
        s=s%_selectSets;
        SelectSet set=_selectSet[s];  //获取用于注册这个serversocketchanneld额selectSet
        set.addChange(acceptChannel);//将这个channel放到这个selectSet的change数组里面,待会会将其注册到selector上面
        set.wakeup();
    }

这里其实可以看到并没有真正的立即将这个channel注册到selector上面去,而是先将其放到selectSet的change数组。。那么待会看select的流程的时候,在select执行之前,它将会被注册到selector上面去。。。

好了,这里先暂时不看。。

回到上面。。来看看父类的doStart方法干了什么事情吧:

    //用于当前组件的启动,这里还会启动线程,用于接收远程的连接请求
    protected void doStart() throws Exception {
        if (_server==null)
            throw new IllegalStateException("No server");
        //开启监听
        open();
        
        super.doStart();  //父类的启动,其实也就是abstractBuffer的启动
        
        if (_threadPool==null)
            _threadPool=_server.getThreadPool();  //这里相当于是获取线程池
        if (_threadPool!=_server.getThreadPool() && (_threadPool instanceof LifeCycle))
            ((LifeCycle)_threadPool).start();   //线程池的启动
        
        // Start selector thread
        synchronized(this)
        {
            _acceptorThread=new Thread[getAcceptors()];  //用于运行监听器的线程数组,当具体的在线程池中运行了以后才会具体设置线程,这里其实一般就是一个监听器

            for (int i=0;i<_acceptorThread.length;i++)
            {
                if (!_threadPool.dispatch(new Acceptor(i)))  //这里相当于是将监听器运行起来
                {
                    Log.warn("insufficient maxThreads configured for {}",this);
                    break;
                }
            }
        }
        
        Log.info("Started {}",this);
    }

这段代码就是我们以前分析过的了,它将会创建一个Acceptor对象,然后在threadPool里面调度它,而他要做的事情很简单,就是不断的执行在子类中实现的accept方法。。。。那么我们来看看SelectConnector中定义的accept方法吧:

    //将会有一个线程不断的调用这个方法,其实是执行select的流程。。
    public void accept(int acceptorID) throws IOException {
        _manager.doSelect(acceptorID);  //其实是调用manager来处理的,这里参数就是1 吧
    }

其实这里又是直接调用的selectManager的doSelect方法。。那么继续来看看吧:

    //这里其实实际是调用selectSet来处理select的,一般情况下一个ServerSocketChannle对应一个selectSet
    public void doSelect(int acceptorID) throws IOException {  //id就是对应要用select的set
        SelectSet[] sets= _selectSet;
        if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
            sets[acceptorID].doSelect();  //执行select
    }

好吧,其实又是调用的selectSet的doSelect方法。。。那么我们来看看这个方法吧(它就是整个select的执行流程):

        public void doSelect() throws IOException  {
            SelectionKey key=null;
            
            try {
                List changes;
                synchronized (_changes)
                {
                    changes=_changes[_change]; //获取应该select的对象
                    _change=_change==0?1:0;  //在0,1两个数组里面轮换
                    _selecting=true;  //表示正在执行select
                }

                // Make any key changes required
                //这里遍历change数组里面的所有,将他们注册到selector上面去,或者是其他的类型,那么做相应的动作就好了
                //有serversocket的注册,key的事件更新,任务的调度啥的
                for (int i = 0; i < changes.size(); i++) {
                    try
                    {
                        Object o = changes.get(i);
                        if (o instanceof EndPoint) {  //一般在httpconnection的handle方法执行完毕之后,如果有需要,那么都会挂起doUpdateKey方法,用于在select之前更新注册的事件
                            SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
                            endpoint.doUpdateKey();  //更新它的key,其实是更新它注册的事件
                        } else if (o instanceof Runnable)  {
                            dispatch((Runnable)o);   //执行这个runnable
                        } else if (o instanceof SocketChannel) {
                            // finish accepting/connecting this connection
                            SocketChannel channel=(SocketChannel)o;
                            Object att = changes.get(++i);  //获取attachMent

                            if (channel.isConnected())  {  //如果已经连接了,那么注册它的读取事件
                                key = channel.register(_selector,SelectionKey.OP_READ,att); //这里是注册读事件
                                SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);  //创建selectChannelConnector里面定义的ConnectorEndPoint
                                key.attach(endpoint);
                                endpoint.dispatch();  //它本来也是一个runnable,run方法是执行httpconnection的handle方法
                            }  else {  //注册它的connect事件
                                channel.register(_selector,SelectionKey.OP_CONNECT,att);
                            }

                        }  else if (o instanceof ServerSocketChannel)  { 
                            ServerSocketChannel channel = (ServerSocketChannel)o;  //如果是serversocketchannel,那么需要祖册它的accept事件
                            channel.register(getSelector(),SelectionKey.OP_ACCEPT);  //将当前的serversocketchannel注册到这个selector上面去
                        } else {
                            throw new IllegalArgumentException(o.toString());
                        }
                    } catch (CancelledKeyException e)  {
                        if (isRunning())
                            Log.warn(e);
                        else
                            Log.debug(e);
                    }
                }
                
                changes.clear();  //用于将change队列里面的需要注册的channel都清除,因为已经注册到上面去了,或者应该走的事情都已经做完了

                long idle_next = 0;
                long retry_next = 0;
                long now=System.currentTimeMillis();  //记录select之前的时间
                synchronized (this) {
                    _idleTimeout.setNow(now);  //设置timeOut队列的当前时间
                    _retryTimeout.setNow(now);
                    if (_lowResourcesConnections>0 && _selector.keys().size()>_lowResourcesConnections)
                        _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
                    else 
                        _idleTimeout.setDuration(_maxIdleTime);
                    idle_next=_idleTimeout.getTimeToNext();  //获取下一个即将超时的时间还有多久
                    retry_next=_retryTimeout.getTimeToNext();
                }

               //这里用于计算select的阻塞时间
                long wait = 1000L;  // not getMaxIdleTime() as the now value of the idle timers needs to be updated.
                if (idle_next >= 0 && wait > idle_next)
                    wait = idle_next;
                if (wait > 0 && retry_next >= 0 && wait > retry_next)
                    wait = retry_next;
    
                // Do the select.
                if (wait > 10) {// TODO tune or configure this
                    long before=now;
                    int selected=_selector.select(wait);
                    now = System.currentTimeMillis();  //更新select之后的时间
                    _idleTimeout.setNow(now);  //重新设置timeout队列的时间
                    _retryTimeout.setNow(now);

                    // Look for JVM bug 
                    if (selected==0 && wait>0 && (now-before)<wait/2 && _selector.selectedKeys().size()==0)
                    {
                        if (_jvmBug++>5)  // TODO tune or configure this
                        {
                            // Probably JVM BUG!
                            
                            Iterator iter = _selector.keys().iterator();
                            while(iter.hasNext())
                            {
                                key = (SelectionKey) iter.next();
                                if (key.isValid()&&key.interestOps()==0)
                                {
                                    key.cancel();
                                }
                            }
                            try
                            {
                                Thread.sleep(20);  // tune or configure this
                            }
                            catch (InterruptedException e)
                            {
                                Log.ignore(e);
                            }
                        } 
                    }
                    else
                        _jvmBug=0;
                }  else  {
                    _selector.selectNow();
                    _jvmBug=0;
                }

                // have we been destroyed while sleeping\
                if (_selector==null || !_selector.isOpen())
                    return;

                // Look for things to do
                Iterator iter = _selector.selectedKeys().iterator();
                //遍历所有已经select出来的key
                while (iter.hasNext())  {
                    key = (SelectionKey) iter.next();                      
                    try  {
                        if (!key.isValid()) {
                            key.cancel();  //用于取消这个key的注册
                            SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
                            if (endpoint != null)
                                endpoint.doUpdateKey();
                            continue;
                        }

                        Object att = key.attachment();
                        if (att instanceof SelectChannelEndPoint) {
                            SelectChannelEndPoint endpoint = (SelectChannelEndPoint)att;
                            endpoint.dispatch();  //其实也就是调用SelectChannelEndPoint的run方法,其实也就是调用httpConnection的处理流程
                        } else if (key.isAcceptable()) {  //如果是accept事件
                            SocketChannel channel = acceptChannel(key); //这里相当于接收到一个socketChannel
                            if (channel==null)
                                continue;

                            channel.configureBlocking(false);  //设置为非阻塞

                            _nextSet=++_nextSet%_selectSet.length;  //计算下一个应该注册到哪一个selectSet上面去

                            if (_nextSet==_setID)  {  //一般也都是相等的
                            	//其实这里一般情况下selectSet的长度都是1,所以这里搞一个selector的负载均衡貌似也没啥用啊。。
                                SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);  //将其注册到selector上面去
                                SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);  //创建endPoint,也就是selectChannelConnector里面定义的ConnectorEndPoint
                                cKey.attach(endpoint);   //将这个endPoint附着在上面
                                if (endpoint != null)
                                    endpoint.dispatch();  //其实是执行http的流程处理
                            } else {
                                // nope - give it to another.
                                _selectSet[_nextSet].addChange(channel);  //到这个selectSet上面注册这个channel
                                _selectSet[_nextSet].wakeup();
                            }
                        }  else if (key.isConnectable()) {  //表示连接已经建立好了
                            // Complete a connection of a registered channel
                            SocketChannel channel = (SocketChannel)key.channel();
                            boolean connected=false;
                            try   {
                                connected=channel.finishConnect();  //完成连接
                            }
                            catch(Exception e)  {
                                connectionFailed(channel,e,att);
                            }
                            finally
                            {
                                if (connected) {  //表示连接已经建立了
                                    key.interestOps(SelectionKey.OP_READ);
                                    SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);  //这里用于创建selectChannelConnector里面定义的ConnectorEndPoint
                                    key.attach(endpoint);
                                    endpoint.dispatch();
                                }  else {
                                    key.cancel();  //取消这个key的注册
                                }
                            }
                        } else {
                            SocketChannel channel = (SocketChannel)key.channel();
                            SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);  //这里有多个地方都有创建endpoint的代码,可能也是因为注册的时候稍微混乱了一些不得不这样吧
                            key.attach(endpoint);
                            if (key.isReadable())
                                endpoint.dispatch();                           
                        }
                        key = null;
                    }
                    catch (CancelledKeyException e)
                    {
                        Log.ignore(e);
                    }
                    catch (Exception e)
                    {
                        if (isRunning())
                            Log.warn(e);
                        else
                            Log.ignore(e);

                        if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
                        {
                            key.interestOps(0);

                            key.cancel();
                        } 
                    }
                }
                
                // Everything always handled
                _selector.selectedKeys().clear();  //清除那些已经选出来的key

                // tick over the timers
                _idleTimeout.tick(now);  //执行timeout里面的超时
                _retryTimeout.tick(now);
                
            }
            catch (CancelledKeyException e)
            {
                Log.ignore(e);
            }
            finally
            {
                synchronized(this)
                {
                    _selecting=false;
                }
            }
        }

这部分的代码就属于比较关键的了,它执行了整个select的流程,

(1)在select之前,还先遍历了change数组,执行了一些更新的动作,前面说到的serversocketchannel的注册也将会在这里被搞定。。

(2)对于select出来的key,会对他们进行处理。。当然这里如果是SelectChannelEndPoint,那么直接就将其进行调度运行就好了。如果是accept事件,那么还需要accept的流程。。

(3)在这些搞定以后,还会进行超时的处理。。貌似所有的都是这么实现的吧。包括nginx都是在select之后进行超时的处理。。

这部分代码很多。。而且很重要。。。有很多的细节。。。这里就不一一解释了。。

先来看看accept吧:

else if (key.isAcceptable()) {  //如果是accept事件
                            SocketChannel channel = acceptChannel(key); //这里相当于接收到一个socketChannel
                            if (channel==null)
                                continue;

                            channel.configureBlocking(false);  //设置为非阻塞

                            _nextSet=++_nextSet%_selectSet.length;  //计算下一个应该注册到哪一个selectSet上面去

                            if (_nextSet==_setID)  {  //一般也都是相等的
                            	//其实这里一般情况下selectSet的长度都是1,所以这里搞一个selector的负载均衡貌似也没啥用啊。。
                                SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);  //将其注册到selector上面去
                                SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);  //创建endPoint,也就是selectChannelConnector里面定义的ConnectorEndPoint
                                cKey.attach(endpoint);   //将这个endPoint附着在上面
                                if (endpoint != null)
                                    endpoint.dispatch();  //其实是执行http的流程处理
                            } else {
                                // nope - give it to another.
                                _selectSet[_nextSet].addChange(channel);  //到这个selectSet上面注册这个channel
                                _selectSet[_nextSet].wakeup();
                            }

这里调用acceptChannel方法来获取远程的链接,然后还要将其注册到selectSet上面去。。还要对这个channel进行包装,生成endpoint。。。

这个acceptChannel的方法的实现如下:

        protected SocketChannel acceptChannel(SelectionKey key) throws IOException
        {
            // TODO handle max connections
            SocketChannel channel = ((ServerSocketChannel)key.channel()).accept();  //用于后去连接的channel
            if (channel==null)
                return null;
            channel.configureBlocking(false);  //将接收到的socketChannel设置为非阻塞的
            Socket socket = channel.socket();
            configure(socket);
            return channel;
        }

没啥意思,无非就是调用accept方法,获取链接,然后对获取的链接进行初始化,设置非阻塞什么的。。

这里newEndPoint方法用于创建endPoint,来看看它的实现:

        //用于创建ConnectorEndPoint
        protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
        {
            return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
        }

其实也就是创建ConnectorEndPoint,它的定义在SelectChannelConnector里面。。。另外在创建的过程中,其父类SelectChannelEndPoint还会创建httpConnection

这里就不详细的来讲ConnectorEndPoint的实现了。。。在前面我们可以看到在select中对于SelectChannelConnector的处理就是直接调用其dispatch方法。。。

    //调度当前这个endpoint的运行,其实是激活http的处理流程
    void dispatch() throws IOException {
        boolean dispatch_done = true;
        try {
            if (dispatch(_manager.isDelaySelectKeyUpdate())) {
                dispatch_done= false;
                dispatch_done = _manager.dispatch((Runnable)this);  //相当于将交给selectorManager来处理,其实就是交给线程池里面去运行,其实是执行httpconnection的handle的方法
            }
        }
        finally {
            if (!dispatch_done) { //执行失败
                Log.warn("dispatch failed!");
                undispatch();
            }
        }
    }

这里就是在线程池中调度当前的runnable,

    //当有数据可以读的时候,会执行这个
    public void run() {
        try {
            _connection.handle();  //httpConnection的方法啊
        } catch (ClosedChannelException e) {
            Log.ignore(e);
        }  catch (EofException e) {
            Log.debug("EOF", e);
            try{close();}
            catch(IOException e2){Log.ignore(e2);}
        } catch (HttpException e)  {
            Log.debug("BAD", e);
            try{close();}
            catch(IOException e2){Log.ignore(e2);}
        }  catch (Throwable e)  {
            Log.warn("handle failed", e);
            try{close();}
            catch(IOException e2){Log.ignore(e2);}
        } finally {
            undispatch();  //因为是非阻塞的,所以需要从线程池里面移除,并且还要进行key的更新,有可能需要加入新的事件
        }
    }

到这里就很熟悉了吧。。就进入了http的处理流程。。因为这里的connection其实就是httpConnection。。。


由此。。整个SelectChannelConnector的运行流程就算比较的清晰了。。。。。

首先将监听器注册到selector上,。。。当有新的连接进来之后,获取新的channel,然后将其进行包装。。变成ConnectorEndPoint,将其注册到selector上面。。。

当selector感应到上面的事件之后,就直接进行httpConnection的处理流程。。。。


同时这里也感觉整个这部分I/O的处理不是很干净。。。。如果能够用netty将下面的这部分I/O替换掉就好多了。。当然这个还是有一定难度的。。。

抱歉!评论已关闭.