(1) 异步方式实现socketchannel
package nonblock; import java.net.*; import java.nio.channels.*; import java.nio.*; import java.io.*; import java.nio.charset.*; import java.util.*; public class EchoClient{ private SocketChannel socketChannel = null; private ByteBuffer sendBuffer=ByteBuffer.allocate(1024); private ByteBuffer receiveBuffer=ByteBuffer.allocate(1024); private Charset charset=Charset.forName("GBK"); private Selector selector; public EchoClient()throws IOException{ socketChannel = SocketChannel.open(); InetAddress ia = InetAddress.getLocalHost(); InetSocketAddress isa = new InetSocketAddress(ia,8000); socketChannel.connect(isa); socketChannel.configureBlocking(false); System.out.println("与服务器的连接建立成功"); selector=Selector.open(); } public static void main(String args[])throws IOException{ final EchoClient client=new EchoClient(); Thread receiver=new Thread(){ public void run(){ client.receiveFromUser(); } }; receiver.start(); client.talk(); } public void receiveFromUser(){ try{ BufferedReader localReader=new BufferedReader(new InputStreamReader(System.in)); String msg=null; while((msg=localReader.readLine())!=null){ synchronized(sendBuffer){ sendBuffer.put(encode(msg + "\r\n")); } if(msg.equals("bye")) break; } }catch(IOException e){ e.printStackTrace(); } } public void talk()throws IOException { socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); while (selector.select() > 0 ){ Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); while (it.hasNext()){ SelectionKey key=null; try{ key = (SelectionKey) it.next(); it.remove(); if (key.isReadable()) { receive(key); } if (key.isWritable()) { send(key); } }catch(IOException e){ e.printStackTrace(); try{ if(key!=null){ key.cancel(); key.channel().close(); } }catch(Exception ex){e.printStackTrace();} } }//#while }//#while } public void send(SelectionKey key)throws IOException{ SocketChannel socketChannel=(SocketChannel)key.channel(); synchronized(sendBuffer){ sendBuffer.flip(); //把极限设为位置 socketChannel.write(sendBuffer); sendBuffer.compact(); } } public void receive(SelectionKey key)throws IOException{ SocketChannel socketChannel=(SocketChannel)key.channel(); socketChannel.read(receiveBuffer); receiveBuffer.flip(); String receiveData=decode(receiveBuffer); if(receiveData.indexOf("\n")==-1)return; String outputData=receiveData.substring(0,receiveData.indexOf("\n")+1); System.out.print(outputData); if(outputData.equals("echo:bye\r\n")){ key.cancel(); socketChannel.close(); System.out.println("关闭与服务器的连接"); selector.close(); System.exit(0); } ByteBuffer temp=encode(outputData); receiveBuffer.position(temp.limit()); receiveBuffer.compact(); } public String decode(ByteBuffer buffer){ //解码 CharBuffer charBuffer= charset.decode(buffer); return charBuffer.toString(); } public ByteBuffer encode(String str){ //编码 return charset.encode(str); } }
服务器端实现
package nonblock; import java.io.*; import java.nio.*; import java.nio.channels.*; import java.nio.charset.*; import java.net.*; import java.util.*; public class EchoServer{ private Selector selector = null; private ServerSocketChannel serverSocketChannel = null; private int port = 8000; private Charset charset=Charset.forName("GBK"); public EchoServer()throws IOException{ selector = Selector.open(); serverSocketChannel= ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); System.out.println("服务器启动"); } public void service() throws IOException{ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT ); while (selector.select() > 0 ){ Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); while (it.hasNext()){ SelectionKey key=null; try{ key = (SelectionKey) it.next(); it.remove(); if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = (SocketChannel) ssc.accept(); System.out.println("接收到客户连接,来自:" + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort()); socketChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } if (key.isReadable()) { receive(key); } if (key.isWritable()) { send(key); } }catch(IOException e){ e.printStackTrace(); try{ if(key!=null){ key.cancel(); key.channel().close(); } }catch(Exception ex){e.printStackTrace();} } }//#while }//#while } public void send(SelectionKey key)throws IOException{ ByteBuffer buffer=(ByteBuffer)key.attachment(); SocketChannel socketChannel=(SocketChannel)key.channel(); buffer.flip(); //把极限设为位置,把位置设为0 String data=decode(buffer); if(data.indexOf("\r\n")==-1)return; String outputData=data.substring(0,data.indexOf("\n")+1); System.out.print(outputData); ByteBuffer outputBuffer=encode("echo:"+outputData); while(outputBuffer.hasRemaining()) socketChannel.write(outputBuffer); ByteBuffer temp=encode(outputData); buffer.position(temp.limit()); buffer.compact(); if(outputData.equals("bye\r\n")){ key.cancel(); socketChannel.close(); System.out.println("关闭与客户的连接"); } } public void receive(SelectionKey key)throws IOException{ ByteBuffer buffer=(ByteBuffer)key.attachment(); SocketChannel socketChannel=(SocketChannel)key.channel(); ByteBuffer readBuff= ByteBuffer.allocate(32); socketChannel.read(readBuff); readBuff.flip(); buffer.limit(buffer.capacity()); buffer.put(readBuff); } public String decode(ByteBuffer buffer){ //解码 CharBuffer charBuffer= charset.decode(buffer); return charBuffer.toString(); } public ByteBuffer encode(String str){ //编码 return charset.encode(str); } public static void main(String args[])throws Exception{ EchoServer server = new EchoServer(); server.service(); } }
(2)同步方式实现原型
package block; import java.net.*; import java.nio.channels.*; import java.nio.*; import java.io.*; import java.nio.charset.*; public class EchoClient{ private SocketChannel socketChannel = null; public EchoClient()throws IOException{ socketChannel = SocketChannel.open(); InetAddress ia = InetAddress.getLocalHost(); InetSocketAddress isa = new InetSocketAddress(ia,8000); socketChannel.connect(isa); System.out.println("与服务器的连接建立成功"); } public static void main(String args[])throws IOException{ new EchoClient().talk(); } private PrintWriter getWriter(Socket socket)throws IOException{ OutputStream socketOut = socket.getOutputStream(); return new PrintWriter(socketOut,true); } private BufferedReader getReader(Socket socket)throws IOException{ InputStream socketIn = socket.getInputStream(); return new BufferedReader(new InputStreamReader(socketIn)); } public void talk()throws IOException { try{ BufferedReader br=getReader(socketChannel.socket()); PrintWriter pw=getWriter(socketChannel.socket()); BufferedReader localReader=new BufferedReader(new InputStreamReader(System.in)); String msg=null; while((msg=localReader.readLine())!=null){ pw.println(msg); System.out.println(br.readLine()); if(msg.equals("bye")) break; } }catch(IOException e){ e.printStackTrace(); }finally{ try{socketChannel.close();}catch(IOException e){e.printStackTrace();} } } }
package block; import java.io.*; import java.nio.*; import java.nio.channels.*; import java.nio.charset.*; import java.net.*; import java.util.*; import java.util.concurrent.*; public class EchoServer { private int port=8000; private ServerSocketChannel serverSocketChannel = null; private ExecutorService executorService; private static final int POOL_MULTIPLE = 4; public EchoServer() throws IOException { executorService= Executors.newFixedThreadPool( Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE); serverSocketChannel= ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); serverSocketChannel.socket().bind(new InetSocketAddress(port)); System.out.println("服务器启动"); } public void service() { while (true) { SocketChannel socketChannel=null; try { socketChannel = serverSocketChannel.accept(); executorService.execute(new Handler(socketChannel)); }catch (IOException e) { e.printStackTrace(); } } } public static void main(String args[])throws IOException { new EchoServer().service(); } } class Handler implements Runnable{ private SocketChannel socketChannel; public Handler(SocketChannel socketChannel){ this.socketChannel=socketChannel; } public void run(){ handle(socketChannel); } public void handle(SocketChannel socketChannel){ try { Socket socket=socketChannel.socket(); System.out.println("接收到客户连接,来自: " + socket.getInetAddress() + ":" +socket.getPort()); BufferedReader br =getReader(socket); PrintWriter pw = getWriter(socket); String msg = null; while ((msg = br.readLine()) != null) { System.out.println(msg); pw.println(echo(msg)); if (msg.equals("bye")) break; } }catch (IOException e) { e.printStackTrace(); }finally { try{ if(socketChannel!=null)socketChannel.close(); }catch (IOException e) {e.printStackTrace();} } } private PrintWriter getWriter(Socket socket)throws IOException{ OutputStream socketOut = socket.getOutputStream(); return new PrintWriter(socketOut,true); } private BufferedReader getReader(Socket socket)throws IOException{ InputStream socketIn = socket.getInputStream(); return new BufferedReader(new InputStreamReader(socketIn)); } public String echo(String msg) { return "echo:" + msg; } }
综合的例子
package thread2; import java.io.*; import java.nio.*; import java.nio.channels.*; import java.nio.charset.*; import java.net.*; import java.util.*; public class EchoServer{ private Selector selector = null; private ServerSocketChannel serverSocketChannel = null; private int port = 8000; private Charset charset=Charset.forName("GBK"); public EchoServer()throws IOException{ selector = Selector.open(); serverSocketChannel= ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); serverSocketChannel.socket().bind(new InetSocketAddress(port)); System.out.println("服务器启动"); } public void accept(){ for(;;){ try{ SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("接收到客户连接,来自:" + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort()); socketChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024); synchronized(gate){ selector.wakeup(); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } }catch(IOException e){e.printStackTrace();} } } private Object gate=new Object(); public void service() throws IOException{ for(;;){ synchronized(gate){} int n = selector.select(); if(n==0)continue; Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); while (it.hasNext()){ SelectionKey key=null; try{ key = (SelectionKey) it.next(); it.remove(); if (key.isReadable()) { receive(key); } if (key.isWritable()) { send(key); } }catch(IOException e){ e.printStackTrace(); try{ if(key!=null){ key.cancel(); key.channel().close(); } }catch(Exception ex){e.printStackTrace();} } }//#while }//#while } public void send(SelectionKey key)throws IOException{ ByteBuffer buffer=(ByteBuffer)key.attachment(); SocketChannel socketChannel=(SocketChannel)key.channel(); buffer.flip(); //把极限设为位置 String data=decode(buffer); if(data.indexOf("\n")==-1)return; String outputData=data.substring(0,data.indexOf("\n")+1); System.out.print(outputData); ByteBuffer outputBuffer=encode("echo:"+outputData); while(outputBuffer.hasRemaining()) socketChannel.write(outputBuffer); ByteBuffer temp=encode(outputData); buffer.position(temp.limit()); buffer.compact(); if(outputData.equals("bye\r\n")){ key.cancel(); socketChannel.close(); System.out.println("关闭与客户的连接"); } } public void receive(SelectionKey key)throws IOException{ ByteBuffer buffer=(ByteBuffer)key.attachment(); SocketChannel socketChannel=(SocketChannel)key.channel(); ByteBuffer readBuff= ByteBuffer.allocate(32); socketChannel.read(readBuff); readBuff.flip(); buffer.limit(buffer.capacity()); buffer.put(readBuff); } public String decode(ByteBuffer buffer){ //解码 CharBuffer charBuffer= charset.decode(buffer); return charBuffer.toString(); } public ByteBuffer encode(String str){ //编码 return charset.encode(str); } public static void main(String args[])throws Exception{ final EchoServer server = new EchoServer(); Thread accept=new Thread(){ public void run(){ server.accept(); } }; accept.start(); server.service(); } }