以前的一篇文章曾经分析过jetty的socketConnector,其实它的实现相对来说是比较简单的了,但是由于它是阻塞的io,所以在性能上并不好。。一般情况下都推荐使用SelectChannelConnector来替换掉。。。也就是所谓的NioConnector
SelectChannelConnector的实现就要复杂的多了。。而且因为将底层的io揉在了一起。。。。所以感觉这一块的代码不是很干净。。。又在想自己是否可以将底层的I/O全部替换掉,换成netty的实现。。。。。
好了。。先来看看SelectChannelConnector的类图吧:
在这里将比较重要的关联关系也展现了出来。。。
SelectorManager,从名字就可以看出它用来管理selector,它内部可能会有多个SelectSet,其实一般情况下都只有一个,SelectSet里面则是真正的管理selector的运行,以及注册事件的维护。。。
首先我们来看看SelectChannelConnector的一些重要属性吧:
private transient ServerSocketChannel _acceptChannel; //服务器监听channel private long _lowResourcesConnections; private long _lowResourcesMaxIdleTime; private SelectorManager _manager = new SelectorManager() //创建selectManager对象 { protected SocketChannel acceptChannel(SelectionKey key) throws IOException { // TODO handle max connections SocketChannel channel = ((ServerSocketChannel)key.channel()).accept(); //用于后去连接的channel if (channel==null) return null; channel.configureBlocking(false); //将接收到的socketChannel设置为非阻塞的 Socket socket = channel.socket(); configure(socket); return channel; } public boolean dispatch(Runnable task) throws IOException { return getThreadPool().dispatch(task); //将这个task放到线程池里面去运行 } protected void endPointClosed(SelectChannelEndPoint endpoint) { // TODO handle max connections and low resources connectionClosed((HttpConnection)endpoint.getConnection()); } protected void endPointOpened(SelectChannelEndPoint endpoint) { // TODO handle max connections and low resources connectionOpened((HttpConnection)endpoint.getConnection()); } //用于创建httpconnection protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint) { return SelectChannelConnector.this.newConnection(channel,endpoint); } //用于创建ConnectorEndPoint protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException { return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey); } };
首先最重要的当然是server端的监听器了。。。其次就是定义了SelectorManager,因为是一个抽象类,所以还具体的实现了一些他的方法,例如如何接收远程连接,如何dispatch任务等等。。。
我们知道,在server启动的时候,会启动内部的connector,那么我们这里来看看这个connector的启动过程吧:
//connector的启动过程 protected void doStart() throws Exception { _manager.setSelectSets(getAcceptors()); //设置selectSet的数量,也就是每一个监听器都一个selector _manager.setMaxIdleTime(getMaxIdleTime()); //设置最大空闲时间 _manager.setLowResourcesConnections(getLowResourcesConnections()); _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime()); _manager.start(); //启动manager open(); //这里会建立serversocketchannel _manager.register(_acceptChannel); //将serversocketrChannler注册到selector上面去,其实是放在这个select的change数组上面,在select真正开始之前它会被注册到selector上面去 super.doStart(); }
首先是设置selectmanager的selectset的数量,因为只有一个监听,所以值也就是1了。。。
然后设置连接的最大空闲时间。。。低水平的链接数量啥的,然后就是selectormanager的启动,
open是打开当前的监听器。。然后将这个监听器注册到selectorManager上,。。。最后才是父类的dostart
那么我们在这里先来看看selectmanager的启动吧:
protected void doStart() throws Exception { _selectSet = new SelectSet[_selectSets]; //创建selectSets数组,一般情况下都是只有一个,毕竟一般都是只监听一个端口嘛 for (int i=0;i<_selectSet.length;i++) _selectSet[i]= new SelectSet(i); //创建 super.doStart(); }
其实没有太多的内容,无非就是根据监听器的数量来创建selectset,那么我们来看看这个selectset的创建和它的一些重要属性吧:
private transient int _change; //当前用哪一个change数组,0,1里面互换 private transient List[] _changes; //数组,用于存放需要注册到selector的channel或者任务什么的,反正就是需要一些改变的事情 private transient Timeout _idleTimeout; //空闲的timeout任务 private transient int _nextSet; //下一次应该放在哪一个set private transient Timeout _retryTimeout; private transient Selector _selector; //nio的selector对象 private transient int _setID; private transient boolean _selecting; private transient int _jvmBug; /* ------------------------------------------------------------ */ SelectSet(int acceptorID) throws Exception { //创建来的是acceptor的编号,其实一般都是1了 _setID=acceptorID; _idleTimeout = new Timeout(this); //创建空闲超时队列 _idleTimeout.setDuration(getMaxIdleTime()); // _retryTimeout = new Timeout(this); //创建超市 _retryTimeout.setDuration(0L); //默认的保底持续时间0 // create a selector; _selector = Selector.open(); //创建selector _changes = new ArrayList[] {new ArrayList(),new ArrayList()}; //changes变量,用于保存需要注册人的channel _change=0; }
这里可以看到比较重要的selector的创建了。。也就可以知道,真正的select的流程是在selectSet里面执行的。。。
这里比较有意思的是change数组,它其实是用于临时保存所有的一些改动,例如selectkey上面关心的事件啥的。。。所有的这些改动都会在select执行之前被更新。。。
好了,SelectManager的启动就先到这吧,
回到上面,来看看SelectConnecor的open方法吧:
//这里用于打开监听,创建serversocketchannel,并且还要将其设置为非阻塞的 public void open() throws IOException { synchronized(this) { if (_acceptChannel == null) { // Create a new server socket _acceptChannel = ServerSocketChannel.open(); //创建serversocketchannel // Bind the server socket to the local host and port _acceptChannel.socket().setReuseAddress(getReuseAddress()); InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort()); _acceptChannel.socket().bind(addr,getAcceptQueueSize()); // Set to non blocking mode _acceptChannel.configureBlocking(false); //设置为非阻塞的模式 } } }
这里其实就可以看到首先是创建了serversocketChannel,然后设置了它的地址啥的。。最后还将其设置为非阻塞。。。那么接下来就应该看看它是如何被注册到selector上面去了。。。 _manager.register(_acceptChannel);
_manager.register(_acceptChannel);
//在selector上面注册ServerChannel,其实是将其放到SelectSet上面去 public void register(ServerSocketChannel acceptChannel) throws IOException { int s=_set++; s=s%_selectSets; SelectSet set=_selectSet[s]; //获取用于注册这个serversocketchanneld额selectSet set.addChange(acceptChannel);//将这个channel放到这个selectSet的change数组里面,待会会将其注册到selector上面 set.wakeup(); }
这里其实可以看到并没有真正的立即将这个channel注册到selector上面去,而是先将其放到selectSet的change数组。。那么待会看select的流程的时候,在select执行之前,它将会被注册到selector上面去。。。
好了,这里先暂时不看。。
回到上面。。来看看父类的doStart方法干了什么事情吧:
//用于当前组件的启动,这里还会启动线程,用于接收远程的连接请求 protected void doStart() throws Exception { if (_server==null) throw new IllegalStateException("No server"); //开启监听 open(); super.doStart(); //父类的启动,其实也就是abstractBuffer的启动 if (_threadPool==null) _threadPool=_server.getThreadPool(); //这里相当于是获取线程池 if (_threadPool!=_server.getThreadPool() && (_threadPool instanceof LifeCycle)) ((LifeCycle)_threadPool).start(); //线程池的启动 // Start selector thread synchronized(this) { _acceptorThread=new Thread[getAcceptors()]; //用于运行监听器的线程数组,当具体的在线程池中运行了以后才会具体设置线程,这里其实一般就是一个监听器 for (int i=0;i<_acceptorThread.length;i++) { if (!_threadPool.dispatch(new Acceptor(i))) //这里相当于是将监听器运行起来 { Log.warn("insufficient maxThreads configured for {}",this); break; } } } Log.info("Started {}",this); }
这段代码就是我们以前分析过的了,它将会创建一个Acceptor对象,然后在threadPool里面调度它,而他要做的事情很简单,就是不断的执行在子类中实现的accept方法。。。。那么我们来看看SelectConnector中定义的accept方法吧:
//将会有一个线程不断的调用这个方法,其实是执行select的流程。。 public void accept(int acceptorID) throws IOException { _manager.doSelect(acceptorID); //其实是调用manager来处理的,这里参数就是1 吧 }
其实这里又是直接调用的selectManager的doSelect方法。。那么继续来看看吧:
//这里其实实际是调用selectSet来处理select的,一般情况下一个ServerSocketChannle对应一个selectSet public void doSelect(int acceptorID) throws IOException { //id就是对应要用select的set SelectSet[] sets= _selectSet; if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null) sets[acceptorID].doSelect(); //执行select }
好吧,其实又是调用的selectSet的doSelect方法。。。那么我们来看看这个方法吧(它就是整个select的执行流程):
public void doSelect() throws IOException { SelectionKey key=null; try { List changes; synchronized (_changes) { changes=_changes[_change]; //获取应该select的对象 _change=_change==0?1:0; //在0,1两个数组里面轮换 _selecting=true; //表示正在执行select } // Make any key changes required //这里遍历change数组里面的所有,将他们注册到selector上面去,或者是其他的类型,那么做相应的动作就好了 //有serversocket的注册,key的事件更新,任务的调度啥的 for (int i = 0; i < changes.size(); i++) { try { Object o = changes.get(i); if (o instanceof EndPoint) { //一般在httpconnection的handle方法执行完毕之后,如果有需要,那么都会挂起doUpdateKey方法,用于在select之前更新注册的事件 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o; endpoint.doUpdateKey(); //更新它的key,其实是更新它注册的事件 } else if (o instanceof Runnable) { dispatch((Runnable)o); //执行这个runnable } else if (o instanceof SocketChannel) { // finish accepting/connecting this connection SocketChannel channel=(SocketChannel)o; Object att = changes.get(++i); //获取attachMent if (channel.isConnected()) { //如果已经连接了,那么注册它的读取事件 key = channel.register(_selector,SelectionKey.OP_READ,att); //这里是注册读事件 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key); //创建selectChannelConnector里面定义的ConnectorEndPoint key.attach(endpoint); endpoint.dispatch(); //它本来也是一个runnable,run方法是执行httpconnection的handle方法 } else { //注册它的connect事件 channel.register(_selector,SelectionKey.OP_CONNECT,att); } } else if (o instanceof ServerSocketChannel) { ServerSocketChannel channel = (ServerSocketChannel)o; //如果是serversocketchannel,那么需要祖册它的accept事件 channel.register(getSelector(),SelectionKey.OP_ACCEPT); //将当前的serversocketchannel注册到这个selector上面去 } else { throw new IllegalArgumentException(o.toString()); } } catch (CancelledKeyException e) { if (isRunning()) Log.warn(e); else Log.debug(e); } } changes.clear(); //用于将change队列里面的需要注册的channel都清除,因为已经注册到上面去了,或者应该走的事情都已经做完了 long idle_next = 0; long retry_next = 0; long now=System.currentTimeMillis(); //记录select之前的时间 synchronized (this) { _idleTimeout.setNow(now); //设置timeOut队列的当前时间 _retryTimeout.setNow(now); if (_lowResourcesConnections>0 && _selector.keys().size()>_lowResourcesConnections) _idleTimeout.setDuration(_lowResourcesMaxIdleTime); else _idleTimeout.setDuration(_maxIdleTime); idle_next=_idleTimeout.getTimeToNext(); //获取下一个即将超时的时间还有多久 retry_next=_retryTimeout.getTimeToNext(); } //这里用于计算select的阻塞时间 long wait = 1000L; // not getMaxIdleTime() as the now value of the idle timers needs to be updated. if (idle_next >= 0 && wait > idle_next) wait = idle_next; if (wait > 0 && retry_next >= 0 && wait > retry_next) wait = retry_next; // Do the select. if (wait > 10) {// TODO tune or configure this long before=now; int selected=_selector.select(wait); now = System.currentTimeMillis(); //更新select之后的时间 _idleTimeout.setNow(now); //重新设置timeout队列的时间 _retryTimeout.setNow(now); // Look for JVM bug if (selected==0 && wait>0 && (now-before)<wait/2 && _selector.selectedKeys().size()==0) { if (_jvmBug++>5) // TODO tune or configure this { // Probably JVM BUG! Iterator iter = _selector.keys().iterator(); while(iter.hasNext()) { key = (SelectionKey) iter.next(); if (key.isValid()&&key.interestOps()==0) { key.cancel(); } } try { Thread.sleep(20); // tune or configure this } catch (InterruptedException e) { Log.ignore(e); } } } else _jvmBug=0; } else { _selector.selectNow(); _jvmBug=0; } // have we been destroyed while sleeping\ if (_selector==null || !_selector.isOpen()) return; // Look for things to do Iterator iter = _selector.selectedKeys().iterator(); //遍历所有已经select出来的key while (iter.hasNext()) { key = (SelectionKey) iter.next(); try { if (!key.isValid()) { key.cancel(); //用于取消这个key的注册 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); if (endpoint != null) endpoint.doUpdateKey(); continue; } Object att = key.attachment(); if (att instanceof SelectChannelEndPoint) { SelectChannelEndPoint endpoint = (SelectChannelEndPoint)att; endpoint.dispatch(); //其实也就是调用SelectChannelEndPoint的run方法,其实也就是调用httpConnection的处理流程 } else if (key.isAcceptable()) { //如果是accept事件 SocketChannel channel = acceptChannel(key); //这里相当于接收到一个socketChannel if (channel==null) continue; channel.configureBlocking(false); //设置为非阻塞 _nextSet=++_nextSet%_selectSet.length; //计算下一个应该注册到哪一个selectSet上面去 if (_nextSet==_setID) { //一般也都是相等的 //其实这里一般情况下selectSet的长度都是1,所以这里搞一个selector的负载均衡貌似也没啥用啊。。 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ); //将其注册到selector上面去 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey); //创建endPoint,也就是selectChannelConnector里面定义的ConnectorEndPoint cKey.attach(endpoint); //将这个endPoint附着在上面 if (endpoint != null) endpoint.dispatch(); //其实是执行http的流程处理 } else { // nope - give it to another. _selectSet[_nextSet].addChange(channel); //到这个selectSet上面注册这个channel _selectSet[_nextSet].wakeup(); } } else if (key.isConnectable()) { //表示连接已经建立好了 // Complete a connection of a registered channel SocketChannel channel = (SocketChannel)key.channel(); boolean connected=false; try { connected=channel.finishConnect(); //完成连接 } catch(Exception e) { connectionFailed(channel,e,att); } finally { if (connected) { //表示连接已经建立了 key.interestOps(SelectionKey.OP_READ); SelectChannelEndPoint endpoint = newEndPoint(channel,this,key); //这里用于创建selectChannelConnector里面定义的ConnectorEndPoint key.attach(endpoint); endpoint.dispatch(); } else { key.cancel(); //取消这个key的注册 } } } else { SocketChannel channel = (SocketChannel)key.channel(); SelectChannelEndPoint endpoint = newEndPoint(channel,this,key); //这里有多个地方都有创建endpoint的代码,可能也是因为注册的时候稍微混乱了一些不得不这样吧 key.attach(endpoint); if (key.isReadable()) endpoint.dispatch(); } key = null; } catch (CancelledKeyException e) { Log.ignore(e); } catch (Exception e) { if (isRunning()) Log.warn(e); else Log.ignore(e); if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid()) { key.interestOps(0); key.cancel(); } } } // Everything always handled _selector.selectedKeys().clear(); //清除那些已经选出来的key // tick over the timers _idleTimeout.tick(now); //执行timeout里面的超时 _retryTimeout.tick(now); } catch (CancelledKeyException e) { Log.ignore(e); } finally { synchronized(this) { _selecting=false; } } }
这部分的代码就属于比较关键的了,它执行了整个select的流程,
(1)在select之前,还先遍历了change数组,执行了一些更新的动作,前面说到的serversocketchannel的注册也将会在这里被搞定。。
(2)对于select出来的key,会对他们进行处理。。当然这里如果是SelectChannelEndPoint,那么直接就将其进行调度运行就好了。如果是accept事件,那么还需要accept的流程。。
(3)在这些搞定以后,还会进行超时的处理。。貌似所有的都是这么实现的吧。包括nginx都是在select之后进行超时的处理。。
这部分代码很多。。而且很重要。。。有很多的细节。。。这里就不一一解释了。。
先来看看accept吧:
else if (key.isAcceptable()) { //如果是accept事件 SocketChannel channel = acceptChannel(key); //这里相当于接收到一个socketChannel if (channel==null) continue; channel.configureBlocking(false); //设置为非阻塞 _nextSet=++_nextSet%_selectSet.length; //计算下一个应该注册到哪一个selectSet上面去 if (_nextSet==_setID) { //一般也都是相等的 //其实这里一般情况下selectSet的长度都是1,所以这里搞一个selector的负载均衡貌似也没啥用啊。。 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ); //将其注册到selector上面去 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey); //创建endPoint,也就是selectChannelConnector里面定义的ConnectorEndPoint cKey.attach(endpoint); //将这个endPoint附着在上面 if (endpoint != null) endpoint.dispatch(); //其实是执行http的流程处理 } else { // nope - give it to another. _selectSet[_nextSet].addChange(channel); //到这个selectSet上面注册这个channel _selectSet[_nextSet].wakeup(); }
这里调用acceptChannel方法来获取远程的链接,然后还要将其注册到selectSet上面去。。还要对这个channel进行包装,生成endpoint。。。
这个acceptChannel的方法的实现如下:
protected SocketChannel acceptChannel(SelectionKey key) throws IOException { // TODO handle max connections SocketChannel channel = ((ServerSocketChannel)key.channel()).accept(); //用于后去连接的channel if (channel==null) return null; channel.configureBlocking(false); //将接收到的socketChannel设置为非阻塞的 Socket socket = channel.socket(); configure(socket); return channel; }
没啥意思,无非就是调用accept方法,获取链接,然后对获取的链接进行初始化,设置非阻塞什么的。。
这里newEndPoint方法用于创建endPoint,来看看它的实现:
//用于创建ConnectorEndPoint protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException { return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey); }
其实也就是创建ConnectorEndPoint,它的定义在SelectChannelConnector里面。。。另外在创建的过程中,其父类SelectChannelEndPoint还会创建httpConnection
这里就不详细的来讲ConnectorEndPoint的实现了。。。在前面我们可以看到在select中对于SelectChannelConnector的处理就是直接调用其dispatch方法。。。
//调度当前这个endpoint的运行,其实是激活http的处理流程 void dispatch() throws IOException { boolean dispatch_done = true; try { if (dispatch(_manager.isDelaySelectKeyUpdate())) { dispatch_done= false; dispatch_done = _manager.dispatch((Runnable)this); //相当于将交给selectorManager来处理,其实就是交给线程池里面去运行,其实是执行httpconnection的handle的方法 } } finally { if (!dispatch_done) { //执行失败 Log.warn("dispatch failed!"); undispatch(); } } }
这里就是在线程池中调度当前的runnable,
//当有数据可以读的时候,会执行这个 public void run() { try { _connection.handle(); //httpConnection的方法啊 } catch (ClosedChannelException e) { Log.ignore(e); } catch (EofException e) { Log.debug("EOF", e); try{close();} catch(IOException e2){Log.ignore(e2);} } catch (HttpException e) { Log.debug("BAD", e); try{close();} catch(IOException e2){Log.ignore(e2);} } catch (Throwable e) { Log.warn("handle failed", e); try{close();} catch(IOException e2){Log.ignore(e2);} } finally { undispatch(); //因为是非阻塞的,所以需要从线程池里面移除,并且还要进行key的更新,有可能需要加入新的事件 } }
到这里就很熟悉了吧。。就进入了http的处理流程。。因为这里的connection其实就是httpConnection。。。
由此。。整个SelectChannelConnector的运行流程就算比较的清晰了。。。。。
首先将监听器注册到selector上,。。。当有新的连接进来之后,获取新的channel,然后将其进行包装。。变成ConnectorEndPoint,将其注册到selector上面。。。
当selector感应到上面的事件之后,就直接进行httpConnection的处理流程。。。。
同时这里也感觉整个这部分I/O的处理不是很干净。。。。如果能够用netty将下面的这部分I/O替换掉就好多了。。当然这个还是有一定难度的。。。