传统的服务器编程,来一个用户请求,则服务器生成一个线程处理这个请求,就是单线程单用户的模式,并且是同步io的
java nio的实现是,由一个线程来处理所有的用户请求,这个线程是通过多路复用技术实现的,
while(true){
int nKeys=selector.select();
....
......
}
只要有多个线程的io有一个已经处于ready状态,nkey就返回大于0,这时再判断是什么操作,是读还是写操作。
再高性能点的程序是,如果是读操作,就从读线程池中,申请一个读线程处理,写操作也同样处理
但是,用来接收用户请求的只是这个单线程selector
有疑惑的地方是:selector中有注册集合,ready集合,删除ready集合是否也会删除注册的集合?
http://www.ibm.com/developerworks/cn/java/l-niosvr/
http://hi.baidu.com/personnel/item/0911daebc57f30c3baf37d2c
看着代码讲解比较容易理解
/** * 《构建高性能的大型分布式Java应用》 * 书中的示例代码 * 版权所有 2008---2009 */ package book.chapter1.tcpnio; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.*; /** * 描述:基于Java NIO实现的tcp服务器端 * * @author bluedavy * 创建时间: 2008-12-2 */ public class Server { public static String oper(int op) { StringBuilder sb = new StringBuilder(); if ( (op & SelectionKey.OP_ACCEPT) != 0) sb.append("accept "); if ( (op & SelectionKey.OP_READ) != 0 ) sb.append("read "); return sb.toString(); } public static String typekey(SelectionKey key) { StringBuilder sb = new StringBuilder("interest:"); sb.append(oper(key.interestOps())); sb.append("\nready:"); sb.append(oper(key.readyOps())); sb.append("\n"); // if (key.isAcceptable()) // return "accept"; // if (key.isConnectable()) // return "connect"; // if (key.isReadable()) // return "read"; // if (key.isWritable()) // return "writer"; return sb.toString(); } public static void main(String[] args) throws Exception{ int port=9527; Selector selector=Selector.open(); ServerSocketChannel ssc=ServerSocketChannel.open();//创建一个channel ServerSocket serverSocket=ssc.socket();//由channel创建一个socket serverSocket.bind(new InetSocketAddress(port));//socket肯定要监听一个port System.out.println("Server listen on port: "+port); ssc.configureBlocking(false);//channel是非堵塞 ssc.register(selector, SelectionKey.OP_ACCEPT);//channel向selector注册,由selector进行监听 int ind = 0; while(true){ System.out.println("befor select"); System.out.println("key"); for (SelectionKey key : selector.keys()) { System.out.println(typekey(key)); } System.out.println(); int nKeys=selector.select(5000);//如果没有io,则堵塞1000毫秒就返回了;如果有io,就立即返回 //Thread.sleep(2000); System.out.println(nKeys); System.out.println("after select"); System.out.println("key"); for (SelectionKey key : selector.keys()) { System.out.println(typekey(key)); } System.out.println(); if(nKeys>0){ System.out.println("selector key"); Iterator it = selector.selectedKeys().iterator(); //for (SelectionKey key : selector.selectedKeys()) { while (it.hasNext()) { SelectionKey key =(SelectionKey) it.next(); it.remove(); System.out.println(typekey(key)); if(key.isAcceptable()){ ServerSocketChannel server=(ServerSocketChannel) key.channel(); SocketChannel sc=server.accept(); if(sc==null){ continue; } sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); } else if(key.isReadable()){ ByteBuffer buffer=ByteBuffer.allocate(1024); SocketChannel sc=(SocketChannel) key.channel(); int readBytes=0; String message=null; try{ int ret; try{ while((ret=sc.read(buffer))>0){ readBytes+=ret; } } catch(Exception e){ readBytes=0; // IGNORE } finally{ buffer.flip(); } if(readBytes>0){ message=Charset.forName("UTF-8").decode(buffer).toString(); buffer = null; } } finally{ if(buffer!=null){ buffer.clear(); } } if(readBytes>0){ System.out.println("Message from client: "+ message); if("quit".equalsIgnoreCase(message.trim())){ sc.close(); selector.close(); System.out.println("Server has been shutdown!"); System.exit(0); } String outMessage="Server response:"+message; sc.write(Charset.forName("UTF-8").encode(outMessage)); } } } System.out.println(); //selector.selectedKeys().clear(); //System.out.println(ind++); }//if (key > 0) }//while(true) } }
在54,首先定义一个server通道,然后将这个server通道注册到selector,注意在selector中是通过键来保存通道和通道需要监听的IO操作的,如果再注册另一个通道,在selector中又会生成一个键。
server通道只用来监听accept操作,即客户发出的连接请求,一旦收到这个连接请求,则通过server通路的accept操作,生成对于这个客户通信的新的通路,并将这个通路和监听的read操作注册到selector中,selector自动为这个通路生成一个键。主要selector是通过键来管理通道和其监听。
但是有个问题,如果有成千上万个客户发起连接请求,也会在selector中生成同样数目的键,监听可能更多数目的io操作。
我想到三个方法,一是,服务器在接收消息和发送消息之后就将对于这个客户的连接的通路给关闭;二是,每隔一段时间就清理一些许久不用的连接通路;三,如果服务器收到了客户端关闭连接的消息(可能是通信双方规定的关闭协议),就应该把注册的键取消
下面开始进入nio最纠结的地方,就是监听。
首先要清楚,在键中有两个int,表示两个操作集合。interest集合,ready集合。
在选择器中有三个集合,注册键集合,选择键集合,取消键集合
其中,选择键集合和取消键集合均是注册键集合的子集,并且均是注册键集合中键的引用的拷贝,就是说对一个键的状态的修改,如果这个键也在其他三个集合中,则三个集合中这个键均改变。
调用键的cancel方法即可以将键拷贝一份到取消集合中,但是他还没有被注销,而是在下次调用select时,检查到取消集合中的键,然后才从另外两个集合中删除。
69行的select步骤是:
1. 检查到取消集合中的键,然后才从另外两个集合中删除。这个步骤之后,取消集合为空
2. 检查注册集合中键的interest集合,如果没有操作就绪,select()是堵塞,select(n)是堵塞n时间,selectnow是立即返回。一旦某个通道的io就绪,则:
1. 如果通道的键还没有处于选择集合中,那么这个键的ready集合被清空,然后将当前通道已经准备好的操作的比特掩码将被设置,然后将键拷贝到选择集合
2. 如果通道的键已经在选择集合时,这个键的ready集合不会被清空,而是直接将操作的比特掩码设置,注意到键的ready是积累的。
select返回的是ready集被修改的key的数量。但是,要注意的是,如果在上次select时,已经将这个键放入选择集合中,并且在这次的select时,这个键又有同样的操作就绪,但是键的ready集合没有变化,所以返回0
所以在处理选择集合时,要将选择集合的键清空。(清空选择集合不会对注册集合产生影响)
如果将84行注释,发生的事情将会很诡异。
首先启动server
程序进入循环,这时只有一个server通路的键,并且interest集合只有accept,而ready集合null
过段时间启动client
选择器发现客户的连接请求,这时键的ready集合被设为accept,并且选择集合为空,所以要将这个键拷贝到选择集合中。
处理选择集合的键,只有一个键,这时再建立新的连接通路,并在选择器中注册read操作,注意由于注释了84行,server键有一份拷贝在选择集。
这时,线程在循环中,等待连接通路的read就绪。
从客户端发送消息,服务器中的选择器检测到连接通路的read就绪,发现连接通路的键没有在选择集合中,就将这个键拷贝到选择集合中。
然后,开始处理选择集合中的键,注意,这时选择集合有两个键,一个是server键,一个是连接通路键
如果先处理server键,在88行,由于实际上这个键的accept操作并没有就绪,所以返回null
接着处理连接通路键,从95行开始,处理从客户端接收的消息,并发送。
注意,此时,在选择器中的选择集合中有两个键,一个是server键,一个是连接通路键。
再从客户端发送消息,
选择器发现连接通路的键的read操作就绪,但是这个键已经在选择集合了,所以只是将read操作的掩码叠加在之前的ready集合,可以想象为与操作。由于,上次已经是read操作,所以与之后,选择器并没有发现ready集合有变化,所以select操作返回0.
再次进入61行,循环,到69行时,发现连接通路的键的read操作处于就绪状态,所以再次进行与操作,select还是返回为0。从此,就完全进入死循环状态。
所以,在处理选择集合的键时,要注意清空选择集合。
client的代码,其实没有必要用nio,用堵塞io即可
/** * 《构建高性能的大型分布式Java应用》 * 书中的示例代码 * 版权所有 2008---2009 */ package book.chapter1.tcpnio; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; /** * 描述:基于java NIO实现的tcp client * * @author bluedavy * 创建时间: 2008-12-2 */ public class Client { public static void main(String[] args) throws Exception{ int port=9527; SocketChannel channel=SocketChannel.open(); channel.configureBlocking(false); SocketAddress target=new InetSocketAddress("127.0.0.1",port); channel.connect(target); Selector selector=Selector.open(); channel.register(selector, SelectionKey.OP_CONNECT); BufferedReader systemIn=new BufferedReader(new InputStreamReader(System.in)); while(true){ if(channel.isConnected()){ String command=systemIn.readLine(); channel.write(Charset.forName("UTF-8").encode(command)); if(command==null || "quit".equalsIgnoreCase(command.trim())){ systemIn.close(); channel.close(); selector.close(); System.out.println("Client quit!"); System.exit(0); } } int nKeys=selector.select(1000); if(nKeys>0){ for (SelectionKey key : selector.selectedKeys()) { if(key.isConnectable()){ SocketChannel sc=(SocketChannel) key.channel(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); sc.finishConnect(); } else if(key.isReadable()){ ByteBuffer buffer=ByteBuffer.allocate(1024); SocketChannel sc=(SocketChannel) key.channel(); int readBytes=0; try{ int ret=0; try{ while((ret=sc.read(buffer))>0){ readBytes+=ret; } } finally{ buffer.flip(); } if(readBytes>0){ System.out.println(Charset.forName("UTF-8").decode(buffer).toString()); buffer = null; } } finally{ if(buffer!=null){ buffer.clear(); } } } } selector.selectedKeys().clear(); } } } }
主要还是对TCP的机制不是很清楚,导致在理解nio上出现较大的困难。下面有清晰版的程序
1. 对于普通的socket连接如下图所示
第一次握手,客户connect 向服务器发送syn j,然后进入堵塞
第二次握手,服务器accept接收这次连接,然后发送 syn k, ack j+1给客户,进入堵塞
第三次握手,客户收到服务器accept发送的消息,然后先服务器发送ack k+1,从connect返回;服务器收到客户的消息,服务器也从accept返回
经过三次握手之后,服务器和客户可以进行数据通信,一般而言是长连接吧,至于要发送心跳之类的,以后再进行探索。
对于nio而言,也是要经过这三次握手之后,才能进行数据通信,其优势在于,三次握手期间和传输数据期间,是非堵塞的模式,提高cpu的运行效率
客户端代码 25行,channel.connect(target),客户向服务器发送建立连接请求,但是这里是非堵塞的,直接返回。有极少的概率,能立即建立好连接,并返回,一般只是向服务器的channel发送个请求,然后false返回。这时就客户就要将这次连接的channel作为一个Key放入selector中进行监听。
服务器代码66行, selector.select(5000); 服务器监听的channel上有个io建立连接的请求,所以,select从堵塞中返回,这时就要依次检查selector监听的所有key,确定是哪个channel上有io请求。
83行SocketChannel sc = server.accept(); 服务器创建新channel专门用于监听和传输数据与客户,并发送请求确认信息给客户,这里相当于第二次握手
客户端代码55行sc.finishConnect(),相当于第三次握手,并把确认信息发送给服务器。
此时,服务器和客户端的连接已经建立,并且都在监视自己channel的io read请求
这时,双方可以相互发送数据。
注意,一旦对方发送数据后,己方检查到channel的io read已经就绪,就一定要从channel中读出数据,否则对于己方而言,将会一直有个io read的事件,select总是立即就返回,从而进入死循环
长连接和短连接的区别:
如下代码是长连接,但是还缺少心跳机制。
如果是短连接的话,一旦select监听到channel的io事件,处理好这个key之后,就用key.cancel,这样在下次select时,就会把key从注册集合中删除
import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.*; public class SimpleServer { public static String oper(int op) { StringBuilder sb = new StringBuilder(); if ( (op & SelectionKey.OP_ACCEPT) != 0) sb.append("accept "); if ( (op & SelectionKey.OP_READ) != 0 ) sb.append("read "); return sb.toString(); } public static String typekey(SelectionKey key) { StringBuilder sb = new StringBuilder("interest:"); sb.append(oper(key.interestOps())); sb.append("\tready:"); sb.append(oper(key.readyOps())); return sb.toString(); } public static void main(String[] args) throws Exception{ // TODO Auto-generated method stub ByteBuffer buffer = ByteBuffer.allocate(1024); int port = 9321; Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); System.out.println("main serversocketchannel: " + ssc.hashCode()); ServerSocket serverSocket=ssc.socket();//由channel创建一个socket serverSocket.bind(new InetSocketAddress(port));//socket肯定要监听一个port System.out.println("Server listen on port: "+port); ssc.configureBlocking(false);//channel是非堵塞 SelectionKey regKey = ssc.register(selector, SelectionKey.OP_ACCEPT);//channel向selector注册,由selector进行监听 System.out.println(regKey); int ind = 0; while(true) { System.out.println("**********************************************************"); System.out.println("the " + (ind++) +"th:"); System.out.println("befor select, the keys in selector are :"); for (SelectionKey key : selector.keys()) { System.out.println(key + ":"); System.out.println(typekey(key)); } System.out.println(); System.out.println("befor select, the keys in selected set are :"); for(SelectionKey key: selector.selectedKeys()) { System.out.println(key + ":"); System.out.println(typekey(key)); } System.out.println(); int nKeys=selector.select(5000);//如果没有io,则堵塞1000毫秒就返回了;如果有io,就立即返回 Thread.sleep(2000); //Thread.sleep(2000); System.out.println(nKeys); System.out.println("after select"); if (nKeys > 0) { System.out.println("selected key are:"); Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); System.out.println(key); //it.remove();//直接放在最后clear if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); //System.out.println("ServerSocketChannel " + server.hashCode()); SocketChannel sc = server.accept();//TCP连接的第二步。如果注释此句,那么对于客户端发起的建立连接的请求,就一直不能应答,所以,建立连接的请求会一直触发上面的selector //创建新的channel, 用以数据传输 sc.configureBlocking(false); //sc.write(Charset.forName("utf-8").encode("the " + ind + "th message from server")); sc.register(selector, SelectionKey.OP_READ);//开始监听客户端的信息,既然连接已建立,肯定也可以主动发送消息 //System.out.println("socketChannel " + sc.hashCode() +" from " + sc.getRemoteAddress()); } else if(key.isReadable()) { buffer.clear(); SocketChannel sc = (SocketChannel) key.channel(); int rd = 0; int rdbytes = 0; while ((rd = sc.read(buffer)) >0) { rdbytes += rd; } //rdbytes = sc.read(buffer); buffer.flip(); System.out.println(rdbytes); String rev = Charset.forName("utf-8").decode(buffer).toString(); if (rdbytes > 0) { System.out.println("receive from "+sc.getRemoteAddress() +" "+rev); } buffer.clear(); sc.write(Charset.forName("utf-8").encode("response " + rev)); } } selector.selectedKeys().clear(); } } } }
import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; public class SimpleClient { public static void main(String[] args) throws Exception{ // TODO Auto-generated method stub ByteBuffer buffer = ByteBuffer.allocate(1024); Selector selector = null; int port=9321; SocketChannel channel=SocketChannel.open(); System.out.println(channel.hashCode()); channel.configureBlocking(false); SocketAddress target=new InetSocketAddress("127.0.0.1",port); boolean isConn = channel.connect(target);//如果是堵塞模式的话,要等到第二次握手的时候,才返回,就是说这时候连接已经建立好了,开始传输数据;如果是非堵塞模式,不管是不是建立TCP三次握手成功,就立即返回 if (isConn) { System.out.println("client connection is success");//TCP 3 handshakes is finish } else { System.out.println("cannot establish connection, so register"); selector=Selector.open(); channel.register(selector, SelectionKey.OP_CONNECT); } int id = 0; while(true) { System.out.println("************************************************"); System.out.println("the " + id +"th :"); if (channel.isConnected()) {//连接已经建立,就可以直接发送数据//这里的连接可能是由connect直接建立的,也可能是由selector监听 System.out.println("channel is connected"); channel.write(Charset.forName("utf-8").encode("the " + id + "th message from channel" )); } int nkeys = selector.select();// if (nkeys > 0) { for (SelectionKey key : selector.selectedKeys()) { if (key.isConnectable()) {//这可以看做是tcp连接的第二步,服务器同意建立连接 System.out.println("key connectable"); SocketChannel sc = (SocketChannel) key.channel();//不知道每次的channel是否是相同的 System.out.println("socketchannel " + sc.hashCode()); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); sc.finishConnect();//这是tcp连接的第三步,客户端确认连接 } else if (key.isReadable()) { buffer.clear(); SocketChannel sc = (SocketChannel) key.channel(); int rd = 0; int rdbytes = 0; while ((rd = sc.read(buffer)) >0) { rdbytes += rd; } buffer.flip(); if (rdbytes > 0) { System.out.println(Charset.forName("utf-8").decode(buffer).toString()); } buffer.clear(); } } selector.selectedKeys().clear(); } ++id; } } }