现在的位置: 首页 > 综合 > 正文

Java NIO 实践经验总结

2014年02月04日 ⁄ 综合 ⁄ 共 7512字 ⁄ 字号 评论关闭

总的来讲,java NIO 就是一个调用select的无限循环。如果,有消息接受或者发送或者连接关闭或者打开,select都会返回这个事件。但是,在写selector循环,要注意以下几个事情

1.发送事件是因为一个packet过大,一次没有发送出去。需要分两次发时,会激活这个事件
2.如果所有的channel都从select注销了,那么select函数会阻塞。需要重新注册一个channel并且调用select.awake()重新激活。
3.注册channel的线成必须是,select循环所在的线程。
4.一个select支持1000多个channel,如果多个连接。最好用round rubin的方式,采用多个selector
5.select.attachment 可以存放自定义的Object
6.如果发现了一个读时间,最好把channel从selector里注销,然后再读取。读取完毕,再注册回selector
7.当从channel中读取字节后,可以把不完整的消息放回buffer中或者attachment中
8.Windows和Linux在server socket注册不太一样,一个要先注册后启动循环,一个是先启动循环后注册。

例子程序,基本了涵盖了文章所有的关键点

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.nio.channels.SelectableChannel;

import java.net.ServerSocket;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SocketChannelDemo

{
 public static int PORT_NUMBER = 8888;

 public static int MAX_SOCKETCOUNT = 1000;

 public static int MAX_SELECTOR = 200;

 public static int i = 0;

 private ServerSocketChannel serverChannel;

 private ServerSocket serverSocket;

 private int dispatcherPointer = 0;

 private ExecutorService exec =Executors.newFixedThreadPool(20);

 /** container of dispatcher */
 private Vector dispatcherList = new Vector(SocketChannelDemo.MAX_SELECTOR);

 public static Long sessionId = new Long(0);

 public static void main(String[] args) throws Exception {
  SocketChannelDemo server = new SocketChannelDemo();
  server.init(args);
 }

 public void init(String[] argv) throws Exception {
  int port = PORT_NUMBER;

  if (argv.length > 0) {
   port = Integer.parseInt(argv[0]);
  }

  System.out.println("Listening on port " + port);
  
  serverChannel = ServerSocketChannel.open();
  
  serverSocket = serverChannel.socket();

  serverSocket.bind(new InetSocketAddress(port));

  serverChannel.configureBlocking(false);

  DispatchThread dt = new DispatchThread();
  serverChannel.register(dt.selector,SelectionKey.OP_ACCEPT);
  dt.start();
  dispatcherList.add(dt);

  //register((SelectableChannel)serverChannel, SelectionKey.OP_ACCEPT);
  
  dt.selector.wakeup();
 }

 public void register(SelectableChannel channel, int ops)
   throws Exception {

  if (channel == null) {
   return;
  }

  DispatchThread st = null;
  int pointer = dispatcherPointer;

  // round-robin approach
  st = (DispatchThread) dispatcherList.get(pointer);
  if (st.socketCount <= (SocketChannelDemo.MAX_SOCKETCOUNT))
   st.toRegister(channel, ops);
  else {
   pointer++;
   System.out.println("move pointer:"+pointer);
   if (pointer >= SocketChannelDemo.MAX_SELECTOR)
    pointer = 0;

   if (pointer >= dispatcherList.size()) {
    System.out.println("new Dispath thread");
    st = new DispatchThread();    
    st.start();
    dispatcherList.add(st);
   } else {
    st = (DispatchThread) dispatcherList.get(pointer);    
   }
   dispatcherPointer = pointer;
   st.toRegister(channel, ops);   
  }

 }

 class DispatchThread extends Thread {
  private Selector selector;
  public int socketCount = 0;

  private List registryPool = Collections.synchronizedList(new LinkedList());
  private List tempPool = Collections.synchronizedList(new LinkedList());

  public DispatchThread() {

   try {
    selector = Selector.open();
   } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }
  }

  public void addChannel() {
   if (registryPool.isEmpty())
    return;

   tempPool.clear();
   synchronized (registryPool) {
    tempPool.addAll(registryPool);
    registryPool.clear();
   }

   for (Iterator it = tempPool.iterator(); it.hasNext();) {
    try {
     SelectableChannel ch = (SelectableChannel) it.next();
     if (ch.isOpen())
      ch.register(selector, SelectionKey.OP_READ);
    } catch (Exception exp) {
     exp.printStackTrace();
     continue;
    }
    it.remove();
   }
  }

  public void run()
  {
   while (true) {
    addChannel();
    int n = 0;
    try {
     n = selector.select();
    } catch (IOException e1) {
     // TODO Auto-generated catch block
     e1.printStackTrace();
    }
//System.out.println("1 -------------");
    if (n == 0) {
     continue;
    }

    
    Iterator it = selector.selectedKeys().iterator();

    while (it.hasNext()) {
     SelectionKey key = (SelectionKey) it.next();
     it.remove();
     try {
      
      if (key.isAcceptable()) {
       
       ServerSocketChannel server = (ServerSocketChannel) key
         .channel();
       SocketChannel channel = server.accept();
       register(channel, SelectionKey.OP_READ);
       // doWork(channel);
      }
      if (key.isReadable()) {
       key.interestOps(0);
       processData(key, selector);
      }
     } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
     }
    }
    //System.out.println("2 -------------");
    addChannel();
   }
  }

  protected void toRegister(SelectableChannel channel, int ops) {
   try {
    if (channel == null) {
     return;
    }
    
    if (channel.isRegistered()) {
     //SelectionKey sk = channel.keyFor(selector);
     registryPool.add(channel);
     selector.wakeup();
    } else {
     this.socketCount++;
     channel.configureBlocking(false);
     registryPool.add(channel);
     //channel.register(selector, ops);
     selector.wakeup();
    }
   } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
   }

  }

  protected void processData(SelectionKey key, Selector selector2)
    throws Exception {
   System.out.println("-------------" + i++);
   //if ( (SocketChannel)key.channel()).
   SleepTime command = new SleepTime(key, this);
   //command.start();
   exec.execute(command);
  }

 }

}

// Runnable run = new Runnable() {
class SleepTime extends Thread {
 private static String setupResponse1 = "RTSP/1.0 200 OK/r/nCSeq:220/r/n";

 private static String setupResponse2 = "/r/nVersion:1.9/r/n/r/n";

 private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

 private SelectionKey key;

 private SocketChannelDemo.DispatchThread disp;

 // private static Logger log = Logger.getLogger(SleepTime.class);
 public SleepTime(SelectionKey key1, SocketChannelDemo.DispatchThread disp1) {
  key = key1;
  disp = disp1;
 }

 public void run() {

  SocketChannel socketChannel = (SocketChannel) key.channel();
  int count = 0;

  buffer.clear();

  try {
   while ((count = socketChannel.read(buffer)) > 0) {
    //buffer.flip();

    int position = buffer.position();

    byte[] data = new byte[position];

    buffer.clear();

    buffer.get(data, 0, data.length);

    StringBuffer sbBuff = new StringBuffer(new String(data));

    //System.out.print(sbBuff.toString());
    //handle request
    StringBuffer sb = new StringBuffer();
    if (sbBuff.toString().indexOf("SETUP") > -1) {
     sb.append(setupResponse1);
     synchronized (SocketChannelDemo.sessionId) {
      SocketChannelDemo.sessionId = Long
        .valueOf(SocketChannelDemo.sessionId
          .longValue() + 1);
     }

     sb.append("Session:" + SocketChannelDemo.sessionId);
     sb.append(setupResponse2);
    } else if (sbBuff.toString().indexOf("Session:") > -1) {
     int begin = sbBuff.toString().indexOf("Session:");
     sb.append(setupResponse1);
     //find out the sessionId.
     sb.append(sbBuff.toString().substring(
       begin,
       begin
         + sbBuff.toString().substring(begin)
           .indexOf("/r/n")));
     sb.append(setupResponse2);
    }

    // send the data, don′t assume it goes all at once
    buffer.clear();
    buffer.put(sb.toString().getBytes());
    buffer.flip();
    socketChannel.write(buffer);
    // System.out.println("/r/nWrite:/r/n"+sb.toString());
    buffer.clear();
    disp.toRegister(socketChannel, SelectionKey.OP_READ);
   }

   if (count < 0) {
    
    //System.out.println("*************Socket closed");
    disp.socketCount--;
    socketChannel.close();
   }

  } catch (IOException e) {
   // TODO Auto-generated catch block
   // e.printStackTrace();
  }
 }
}

 

 

抱歉!评论已关闭.