现在的位置: 首页 > 综合 > 正文

nio高并发编程

2018年12月10日 ⁄ 综合 ⁄ 共 11789字 ⁄ 字号 评论关闭

之前http://blog.csdn.net/sunmenggmail/article/details/8638480

已经整理过,这次是2.0版

参考:

http://daizuan.iteye.com/blog/1112909

http://daizuan.iteye.com/blog/1113471

http://www.cnblogs.com/pingh/archive/2013/07/30/3224990.html

http://www.cnblogs.com/ajian005/archive/2012/09/27/2753662.html(相当好,总结了开源框架)

陷阱1:处理事件忘记移除key

在select返回值大于0的情况下,循环处理

Selector.selectedKeys集合,每处理一个必须从Set中移除

复制代码
Iterator<SelectionKey> it=set.iterator();
    While(it.hasNext()){
    SelectionKey key=it.next();
    it.remove(); //切记移除
    „„处理事件
}
复制代码

 不移除的后果是本次的就绪的key集合下次会再次返回,导致无限循环,CPU消耗100%

 陷阱2:Selector返回的key集合非线程安全

Selector.selectedKeys/keys 返回的集合都是非线程安全的

Selector.selectedKeys返回的可移除

Selector.keys 不可变

对selected keys的处理必须单线程处理或者适当同步

陷阱3:正确注册Channel和更新interest
直接注册不可吗?

channel.register(selector, ops, attachment);

不是不可以,效率问题

至少加两次锁,锁竞争激烈

Channel本身的regLock,竞争几乎没有

Selector内部的key集合,竞争激烈

更好的方式:加入缓冲队列,等待注册,reactor单线程处理

复制代码
If(isReactorThread()){
    channel.register(selector,ops,attachment);
}
else{
    register.offer(newEvent(channel,ops,attachment));
    selector.wakeup();
}
复制代码

同样,SelectionKey.interest(ops)

在linux上会阻塞,需要获取selector内部锁做同步

在win32上不会阻塞

屏蔽平台差异,避免锁的激烈竞争,采用类似注册channel的方式:

复制代码
if (this.isReactorThread()) {
    key.interestOps(key.interestOps() | SelectionKey.OP_READ);
} 
else {
    this.register.offer(new Event(key,SelectionKey.OP_READ));
    selector.wakeup();
}
复制代码

 

陷阱4:正确处理OP_WRITE

OP_WRITE处理不当很容易导致CPU 100%

OP_WRITE触发条件:

   前提:interest了OP_WRITE

   触发条件:

        socket发送缓冲区可写

        远端关闭

        有错误发生

正确的处理方式:

   仅在已经连接的channel上注册

   仅在有数据可写的时候才注册

   触发之后立即取消注册,否则会继续触发导致循环

   处理完成后视情况决定是否继续注册

     没有完全写入,继续注册

     全部写入,无需注册

陷阱5:正确取消注册channel

SelectableChannel一旦注册将一直有效直到明确取消

怎么取消注册?

   channel.close(),内部会调用key.cancel()

   key.cancel();

   中断channel的读写所在线程引起的channel关闭

但是这样还不够!

   key.cancel()仅仅是将key加入cancelledKeys

   直到下一次select才真正处理

   并且channel的socketfd只有在真正取消注册后才会close(fd)

后果是什么?

  服务端,问题不大,select调用频繁

  客户端,通常只有一个连接,关闭channel之后,没有调用select就关闭了selector

  sockfd没有关闭,停留在CLOSE_WAIT状态

正确的处理方式,取消注册也应当作为事件交给reactor处理,及时wakeup做select

适当的时候调用selector.selectNow()

  Netty在超过256连接关闭的时候主动调用一次selectNow

复制代码
static final int CLEANUP_INTERVAL=256;
private boolean cleanUpCancelledKeys()throws IOException{
    if(cancelledKeys>=CLEANUP_INTERVAL){
        cancelledKeys=0;
        selector.selectNow();
        returntrue;
    }
    returnfalse;
}
//channel关闭的时候
channel.socket.close();
cancelledKeys++;
复制代码

陷阱6:同时注册OP_ACCPET和OP_READ,同时注册OP_CONNECT和OP_WRITE

在底层来说,只有两种事件:read和write

Java NIO还引入了OP_ACCEPT和OP_CONNECT

  OP_ACCEPT、OP_READ == Read

  OP_CONNECT、OP_WRITE == Write

同时注册OP_ACCEPT和OP_READ ,或者同时注册OP_CONNECT和OP_WRITE在不同平台上产生错误的行为,避免这样做!

陷阱7:正确处理connect

SocketChannel.connect方法在非阻塞模式下可能返回false,切记判断返回值

    如果是loopback连接,可能直接返回true,表示连接成功

    返回false,后续处理

       注册channel到selector,监听OP_CONNECT事件

       在OP_CONNECT触发后,调用SocketChannel.finishConnect成功后,连接才真正建立

陷阱:

    没有判断connect返回值

    没有调用finishConnect

    在OP_CONNECT触发后,没有移除OP_CONNECT,导致SelectionKey一直处于就绪状态,空耗CPU

       OP_CONNECT只能在还没有连接的channel上注册

忠告

尽量不要尝试实现自己的nio框架,除非有经验丰富的工程师

尽量使用经过广泛实践的开源NIO框架Mina、Netty3、xSocket

尽量使用最新稳定版JDK

遇到问题的时候,也许你可以先看下java的bug database

elector自身是线程安全的,而他的key set却不是。在一次选择发生的过程中,对于key的关心事件的修改要等到下一次select的时候才会生效。 另外,key和其代表的channel有可能在任何时候被cancel和close。因此存在于key
set中的key并不代表其key是有效的,也不代表其channel是open的。如果key有可能被其他的线程取消或关闭channel,程序必须小 心的同步检查这些条件。 


阻塞了的select可以通过调用selector的wakeup方法来唤醒。

http://blog.csdn.net/cutesource/article/details/6192016

如何正确使用NIO来构架网络服务器一直是最近思考的一个问题,于是乎分析了一下Jetty、Tomcat和Mina有关NIO的源码,发现大伙都基于类似的方式,我感觉这应该算是NIO构架网络服务器的经典模式,并基于这种模式写了个小小网络服务器,压力测试了一下,效果还不错。废话不多说,先看看三者是如何使用NIO的。

Jetty Connector的实现

先看看有关类图:

其中:

SelectChannelConnector负责组装各组件

SelectSet负责侦听客户端请求

SelectChannelEndPoint负责IO的读和写

HttpConnection负责逻辑处理

在整个服务端处理请求的过程可以分为三个阶段,时序图如下所示:

阶段一:监听并建立连接

这一过程主要是启动一个线程负责accept新连接,监听到后分配给相应的SelectSet,分配的策略就是轮询。

阶段二:监听客户端的请求

这一过程主要是启动多个线程(线程数一般为服务器CPU的个数),让SelectSet监听所管辖的channel队列,每个SelectSet维护一个Selector,这个Selector监听队列里所有的channel,一旦有读事件,从线程池里拿线程去做处理请求

阶段三:处理请求

这一过程就是每次客户端请求的数据处理过程,值得注意的是为了不让后端的业务处理阻碍Selector监听新的请求,就多线程来分隔开监听请求和处理请求两个阶段。

由此可以大致总结出Jetty有关NIO使用的模式,如下图所示:

最核心就是把三件不同的事情隔离开,并用不同规模的线程去处理,最大限度地利用NIO的异步和通知特性


下面再来看看Tomcat是如何使用NIO来构架Connector这块

先看看Tomcat Connector这块的类图:

其中:

NioEndpoint负责组装各部件

Acceptor负责监听新连接,并把连接交给Poller

Poller负责监听所管辖的channel队列,并把请求交给SocketProcessor处理

SocketProcessor负责数据处理,并把请求传递给后端业务处理模块

在整个服务端处理请求的过程可以分为三个阶段,时序图如下所示:

阶段一:监听并建立连接

这一阶段主要是Acceptor监听新连接,并轮询取一个Poller ,把连接交付给Poller

阶段二:监听客户端的请求

这一过程主要是让每个Poller监听所管辖的channel队列,select到新请求后交付给SocketProcessor处理

阶段三:处理请求

这一过程就是从多线程执行SocketProcessor,做数据和业务处理

于是乎我们发现抛开具体代码细节,Tomcat和Jetty在NIO的使用方面是非常一致的,采用的模式依然是下图:


Mina框架

最后我们再看看NIO方面最著名的框架Mina,抛开Mina有关session和处理链条等方面的设计,单单挑出前端网络层处理来看,也采用的是与Jetty和Tomcat类似的模式,只不过它做了些简化,它没有隔开请求侦听和请求处理两个阶段,因此,宏观上看它只分为两个阶段。

先看看它的类图:

其中:

SocketAcceptor起线程调用SocketAcceptor.Work负责新连接侦听,并交给SocketIoProcessor处理

SocketIoProcessor起线程调用SocketIoProcessor.Work负责侦听所管辖的channel队列, select到新请求后交给IoFilterChain处理

IoFilterChain组装了mina的处理链条

在整个服务端处理请求的过程可以分为两个阶段,时序图如下所示:

阶段一:监听并建立连接

阶段二:监听并处理客户端的请求

 

总结来看Jetty、tomcat和Mina,我们也大概清楚了该如何基于NIO来构架网络服务器,通过这个提炼出来的模式,我写了个很简单的NIO Server,在保持连接的情况下,可以很轻松的保持6万连接(由于有65535连接限制),并能在负载只有3左右的情况下(4核),承担3到4万的TPS请求(当然做的事情很简单,仅仅是把buffer转化为自定义协议的包,然后再把包转为buffer写到客户端)。因此简单地实践一下可以证明这个模式的有效性,不妨再看看这个图,希望对大伙以后写server有用:

安装这个架构,写了个粗略的版本,以后有机会一定要看看jetty等是怎么优雅的实现的

//server

import java.io.IOException;
import java.net.InetSocketAddress;  
import java.net.ServerSocket;  
import java.nio.ByteBuffer;  
import java.nio.channels.SelectionKey;  
import java.nio.channels.Selector;  
import java.nio.channels.ServerSocketChannel;  
import java.nio.channels.SocketChannel;  
import java.nio.charset.Charset;  
import java.util.*;
import java.util.concurrent.*;

public class Server {

	private ConcurrentLinkedQueue<SelectionKey> m_conn = new ConcurrentLinkedQueue<SelectionKey>(); 
	private ConcurrentLinkedQueue<SelectionKey> m_req = new ConcurrentLinkedQueue<SelectionKey>();
	private final int m_processNum = 3;
	private final int m_worksNum = 3;
	private final int m_port = 3562;
	private ServerSocketChannel channel ;
	private boolean connQuEpt = true;
	private boolean reqQuEpt = true;
	private Selector selector;//for connection
	private List<Selector> m_reqSelector = new ArrayList<Selector>();
	
	
	public void listen() throws IOException{
		selector = Selector.open();
		channel = ServerSocketChannel.open();
		channel.configureBlocking(false);
		channel.socket().bind(new InetSocketAddress(m_port));
		channel.register(selector, SelectionKey.OP_ACCEPT);
		
		new Thread(new ConnectionHander()).start();
		
		//new Thread(new RequestManager()).start();
		creatRequestHanders();
		
		new Thread(new ProcessManager()).start();
		
	}
	
	/*class RequestManager implements Runnable {
		private ExecutorService m_reqPool;
		public RequestManager() {
			m_reqPool = Executors.newFixedThreadPool(m_processNum, new RequestThreadFactor());
		}
		public void run() {
			while (true) {
				
			}
		}
	}*/
	
	void creatRequestHanders() {
		try {
		for (int i = 0; i < m_processNum; ++i) {
			Selector slt = Selector.open();
			m_reqSelector.add(slt);
			RequestHander req = new RequestHander();
			req.setSelector(slt);
			new Thread(req).start();
		}
		}
		catch(IOException e) {
			e.printStackTrace();
		}
	}
	
	class ProcessManager implements Runnable {
		private ExecutorService m_workPool;
		public ProcessManager() {
			m_workPool = Executors.newFixedThreadPool(m_worksNum);
		}
		public void run() {
			SelectionKey key;
			while(true) {
				//太消耗cpu//应该要加一个wait,但是这样就有锁了
				while((key = m_req.poll()) !=null) {
					ProcessRequest preq = new ProcessRequest();
					preq.setKey(key);
					m_workPool.execute(preq);
				}
			}
		}
	}
 	
	
	/*class RequestThread extends Thread {
		private Selector selector;
		public  RequestThread(Runnable r) {
			super(r);
			try {
			selector = Selector.open();
			}
			catch(IOException e) {
				e.printStackTrace();
				//todo
			}
		}
	}
	
	class RequestThreadFactor implements ThreadFactory {
		public Thread newThread(Runnable r) {
			return new RequestThread(r);
		}
	}*/
	
	//监视请求连接
	class ConnectionHander implements Runnable {
		
		int idx = 0;
		@Override
		public void run() {
			
			System.out.println("listenning to connection");
			while (true) {
				
				try {
					selector.select();
					Set<SelectionKey> selectKeys = selector.selectedKeys();
					Iterator<SelectionKey> it = selectKeys.iterator();
					
					while (it.hasNext()) {
						SelectionKey key = it.next();
						it.remove();
						
						m_conn.add(key);
						int num = m_reqSelector.size();
						m_reqSelector.get(idx).wakeup();//防止监听request的进程都在堵塞中
						idx =(idx + 1)%num;
					}
					
					
				}
				catch(IOException e) {
					e.printStackTrace();
				}
				
			}
		}
	}
	
	//监视读操作
	class RequestHander implements Runnable {
		private Selector selector;
		public void setSelector(Selector slt) {
			selector = slt;
		}
		public void run() {
			try {
			SelectionKey key;
			System.out.println(Thread.currentThread() + "listenning to request");
			while (true) {
				selector.select();
				while((key = m_conn.poll()) != null) {
					ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
					SocketChannel sc = ssc.accept();//接受一个连接
					sc.configureBlocking(false);
					sc.register(selector, SelectionKey.OP_READ);
					System.out.println(Thread.currentThread() + "a connected line");
				}
				
				Set<SelectionKey> keys = selector.selectedKeys();
				Iterator<SelectionKey> it = keys.iterator();
				while(it.hasNext()) {
					SelectionKey keytmp = it.next();
					it.remove();
					if (keytmp.isReadable()) {
						m_req.add(keytmp);
					}
					
				}
			}
			}
			catch(IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	//读数据并进行处理和发送返回
	class ProcessRequest implements Runnable {
		SelectionKey key;
		public void setKey(SelectionKey key) {
			this.key = key;
		}
		public void run() {
			ByteBuffer buffer = ByteBuffer.allocate(1024);
			SocketChannel sc = (SocketChannel) key.channel();
			String msg = null;
			try{
				int readBytes = 0;
				int ret;
				try{
				while((ret = sc.read(buffer)) > 0) {
					
				}
				}
				catch(IOException e) {
					
				}
				finally {
					buffer.flip();
				}
				if (readBytes > 0) {
					msg = Charset.forName("utf-8").decode(buffer).toString();
					buffer = null;
				}
			}
			finally {
				if(buffer != null)
					buffer.clear();
			}
			try {
			System.out.println("server received [ " + msg +"] from client address : " + sc.getRemoteAddress());
			Thread.sleep(2000);
			sc.write(ByteBuffer.wrap((msg + " server response ").getBytes(Charset.forName("utf-8"))));
			}
			catch(Exception e) {
				
			}
			
			
		}
	}
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		Server server = new Server();
		try {
		server.listen();
		}
		catch(IOException e) {
			
		}
	}

}

//client

package javatest;

import java.io.IOException;
import java.net.InetSocketAddress;  
import java.net.ServerSocket;  
import java.nio.ByteBuffer;  
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;  
import java.nio.channels.Selector;  
import java.nio.channels.ServerSocketChannel;  
import java.nio.channels.SocketChannel;  
import java.nio.charset.Charset;  
import java.util.*;

public class Client implements Runnable {
	// 空闲计数器,如果空闲超过10次,将检测server是否中断连接.
	private static int idleCounter = 0;
	private Selector selector;
	private SocketChannel socketChannel;
	private ByteBuffer temp = ByteBuffer.allocate(1024);

	public static void main(String[] args) throws IOException {
		Client client= new Client();
		new Thread(client).start();
		//client.sendFirstMsg();
	}
	
	public Client() throws IOException {
		// 同样的,注册闹钟.
		this.selector = Selector.open();
		
		// 连接远程server
		socketChannel = SocketChannel.open();
		// 如果快速的建立了连接,返回true.如果没有建立,则返回false,并在连接后出发Connect事件.
		Boolean isConnected = socketChannel.connect(new InetSocketAddress("localhost", 3562));
		socketChannel.configureBlocking(false);
		SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
		
		if (isConnected) {
			this.sendFirstMsg();
		} else {
			// 如果连接还在尝试中,则注册connect事件的监听. connect成功以后会出发connect事件.
		    key.interestOps(SelectionKey.OP_CONNECT);
		}
	}
	
	public void sendFirstMsg() throws IOException {
		String msg = "Hello NIO.";
		socketChannel.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
	}

	@Override
	public void run() {
        while (true) {
			try {
				// 阻塞,等待事件发生,或者1秒超时. num为发生事件的数量.
				int num = this.selector.select(1000);
				if (num ==0) {
					idleCounter ++;
					if(idleCounter >10) {
						// 如果server断开了连接,发送消息将失败.
						try {
						    this.sendFirstMsg();
						} catch(ClosedChannelException e) {
							e.printStackTrace();
							this.socketChannel.close();
							return;
						}
					}
					continue;
				} else {
					idleCounter = 0;
				}
				Set<SelectionKey> keys = this.selector.selectedKeys();
				Iterator<SelectionKey> it = keys.iterator();
				while (it.hasNext()) {
					SelectionKey key = it.next();
					it.remove();
					if (key.isConnectable()) {
						// socket connected
						SocketChannel sc = (SocketChannel)key.channel();
						if (sc.isConnectionPending()) {
						    sc.finishConnect();
						}
						// send first message;
						this.sendFirstMsg();
					}
					if (key.isReadable()) {
						// msg received.
						SocketChannel sc = (SocketChannel)key.channel();
						this.temp = ByteBuffer.allocate(1024);
						int count = sc.read(temp);
						if (count<0) {
							sc.close();
							continue;
						}
						// 切换buffer到读状态,内部指针归位.
						temp.flip();
						String msg = Charset.forName("UTF-8").decode(temp).toString();
						System.out.println("Client received ["+msg+"] from server address:" + sc.getRemoteAddress());
						
						Thread.sleep(1000);
						// echo back.
						sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
						
						// 清空buffer
						temp.clear();
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}

抱歉!评论已关闭.