现在的位置: 首页 > 综合 > 正文

NIO不错的

2019年05月22日 ⁄ 综合 ⁄ 共 9945字 ⁄ 字号 评论关闭

欢迎大家讨论,我也是接触时间不长,有问题欢迎大家指正。欢迎转载,转载请注明出处

楔子

最近在研究JAVA NIO的相关知识,发现网上多是三种类型的研究文章,一是单Reactor单Selector,二是主从Reactor单Selector,三就是无Reactor单Selector,有一篇是一个Selector绑定两个地址的文章。但是随着链接数增多,单Selector肯定不能满足对于系统性能的要求,所以必然会出现多个selector配合工作的情况。可是我找了很久,也只找到一篇讨论Mina和Grizzly的多selector的实现,并没有自己去实现一个简单的多selector的例子(文章地址:点击打开链接)。这些天闲来无事,写成一个多selector配合工作的例子,与大家共享。本实例只是关注如何进行多selector配合工作,例子有很多不完美的地方,比如对于包拆分和包合并的判断处理还没有实现,如果错误之处,还请大家指正。

背景

直接来。最基本的Java NIO流程图,如下:
我们知道,如果连接数增多,并且读写量很大,那么服务器的压力会非常大。于是大师们研究出使用reactor模式来搞定。于是出现了下面的实现:
(图片来源:点击打开链接
Reactor负责Select,如果发现是OP_ACCEPT,则交给acceptor处理,如果发现是读写,则交给Thread Pool来处理。读写新开了线程,不会阻塞整个主线程的运行。我们今天的例子就是从这个架构发展而来。

实例

多selector,顾名思义,多个selector来配合完成NIO的操作。这样做的好处是可以最大限度的榨取服务器的资源。我们使用一个selector来处理accept的工作,完成绑定,监听,一个selector完成读写,发送的工作。我们称这个版本为V1.1。
首先来看Server.Java,这是Server端main函数的所在地,代码如下:
 
package com.jjzhk.NIOPractice.MultiSelector;
import java.io.IOException;
public class Server {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
int port = 8008;
new Thread(new ServerReactor(port)).start();
}
}
 来自CODE的代码片

Server.java

主要是新开一个ServerReactor的线程,主要负责初始化和绑定。

来看ServerReactor.java:
 
package com.jjzhk.NIOPractice.MultiSelector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
public class ServerReactor implements Runnable {
static Logger logger = Logger.getLogger(ServerReactor.class);
private SelectorProvider selectorProvider = SelectorProvider.provider();
private ServerSocketChannel serverSocketChannel;
public ServerReactor(int port) throws IOException {
serverSocketChannel = selectorProvider.openServerSocketChannel(); //ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress("localhost", port), 1024);
serverSocketChannel.configureBlocking(false);
logger.debug(String.format("Server : Server Start.----%d", port));
}
public void run() {
try {
new ServerDispatcher(serverSocketChannel, SelectorProvider.provider()).execute();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
 来自CODE的代码片

ServerReactor.java

首先创建了ServerSocketChannel,然后将localhost和8008端口绑定在channel上,然后设置为非阻塞接着创建了ServerDispatcher对象,调用Execute。

 
package com.jjzhk.NIOPractice.MultiSelector;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Set;
import java.util.logging.Level;
import org.apache.log4j.Logger;
public class ServerDispatcher{
private ServerSocketChannel ssc;
private Selector[] selectors = new Selector[3];
private SelectorProvider selectorProvider;
private final static Logger logger = Logger.getLogger(ServerDispatcher.class);
public ServerDispatcher(ServerSocketChannel ssc, SelectorProvider selectorProvider) throws IOException {
this.ssc = ssc;
this.selectorProvider = selectorProvider;
for (int i = 0; i < 3; i++)
{
selectors[i] = selectorProvider.openSelector();
}
}
public Selector getAcceptSelector()
{
return selectors[0];
}
public Selector getReadSelector()
{
return selectors[1];
}
public Selector getWriteSelector()
{
return selectors[1];
}
public void execute() throws IOException
{
SocketHandler r = new SocketAcceptHandler(this, ssc, getAcceptSelector());
new Thread(r).start();
r = new SocketReadHandler(this, ssc, getReadSelector());
new Thread(r).start();
r = new SocketWriteHandler(this, ssc, getWriteSelector());
new Thread(r).start();
}
}
 来自CODE的代码片

ServerDispatcher.java

大家可以看到,ServerDispatcher管理着3个selector,然后启动了三个线程,ServerAcceptHandler,ServerReadHandler和ServerWriteHandler。accept使用了一个selector,read和write使用了一个selector。

这三个类继承自SocketHandler类,我们来看这个基类的方法:

 
package com.jjzhk.NIOPractice.MultiSelector;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.log4j.Logger;
public abstract class SocketHandler implements Runnable{
protected Selector selector;
protected SocketChannel socketChannel = null;
protected ServerSocketChannel serverSocketChannel;
protected ServerDispatcher dispatcher;
protected final static Logger logger = Logger.getLogger(SocketHandler.class);
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();;
public SocketHandler(ServerDispatcher dispatcher, ServerSocketChannel sc, Selector selector) throws IOException{
this.selector = selector;
this.serverSocketChannel = sc;
this.dispatcher = dispatcher;
}
public abstract void runnerExecute(int readyKeyOps) throws IOException;
public final void run()
{
while(true)
{
readWriteLock.readLock().lock();
try {
int keyOps = this.Select();
runnerExecute(keyOps);
Thread.sleep(1000);
} catch (Exception e) {
// TODO: handle exception
logger.debug(e.getMessage());
}
finally {
readWriteLock.readLock().unlock();
}
}
}
private int Select() throws IOException
{
int keyOps = this.selector.selectNow();
boolean flag = keyOps > 0;
if (flag)
{
Set<SelectionKey >readyKeySet = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeySet.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
keyOps = key.readyOps();
if (keyOps == SelectionKey.OP_READ || keyOps == SelectionKey.OP_WRITE)
{
socketChannel = (SocketChannel)key.channel();
socketChannel.configureBlocking(false);
}
}
}
return keyOps;
}
}
 来自CODE的代码片

SocketHandler.java
大家看到这里,就会比较清楚了,每一个线程启动后,都会调用selector.selectNow方法来监听SelectionKey,找到了以后,就会进行处理。可是怎么叫accept处理OP_ACCEPT, read处理OP_READ,Write处理OP_WRITE呢。我都是放到了具体的子类的runnerExecute里面进行判断。
 
package com.jjzhk.NIOPractice.MultiSelector;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import org.apache.log4j.Logger;
public class SocketAcceptHandler extends SocketHandler{
private static final Logger logger = Logger.getLogger(SocketAcceptHandler.class);
public SocketAcceptHandler(ServerDispatcher dispatcher, ServerSocketChannel sc, Selector selector)
throws IOException {
super(dispatcher, sc, selector);
serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT, this);
}
@Override
public void runnerExecute(int readyKeyOps) throws IOException {
// TODO Auto-generated method stub
if (readyKeyOps == SelectionKey.OP_ACCEPT)
{
socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
logger.debug("Server accept");
socketChannel.register(dispatcher.getReadSelector(), SelectionKey.OP_READ);
}
}
}
 来自CODE的代码片

SocketAcceptHandler.java
 
package com.jjzhk.NIOPractice.MultiSelector;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import org.apache.log4j.Logger;
public class SocketReadHandler extends SocketHandler{
static Logger logger = Logger.getLogger(SocketReadHandler.class);
private SelectionKey selectionKey;
private int BLOCK = 4096;
private ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);
public SocketReadHandler(ServerDispatcher dispatcher, ServerSocketChannel sc, Selector selector) throws IOException{
super(dispatcher, sc, selector);
}
@Override
public void runnerExecute(int readyKeyOps) throws IOException {
// TODO Auto-generated method stub
int count = 0;
if (SelectionKey.OP_READ == readyKeyOps)
{
receivebuffer.clear();
count = socketChannel.read(receivebuffer);
if (count > 0) {
logger.debug("Server : Readable.");
receivebuffer.flip();
byte[] bytes = new byte[receivebuffer.remaining()];
receivebuffer.get(bytes);
String body = new String(bytes, "UTF-8");
logger.debug("Server : Receive :" + body);
socketChannel.register(dispatcher.getWriteSelector(), SelectionKey.OP_WRITE);
}
}
}
}
 来自CODE的代码片

SocketReadHandler.java
 
package com.jjzhk.NIOPractice.MultiSelector;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import org.apache.log4j.Logger;
public class SocketWriteHandler extends SocketHandler{
static Logger logger = Logger.getLogger(SocketReadHandler.class);
private int BLOCK = 4096;
private ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
private static int Index = 1;
public SocketWriteHandler(ServerDispatcher dispatcher, ServerSocketChannel sc, Selector selector) throws IOException{
super(dispatcher, sc, selector);
}
@Override
public void runnerExecute(int readyKeyOps) throws IOException {
// TODO Auto-generated method stub
if (readyKeyOps == SelectionKey.OP_WRITE)
{
logger.debug("Server : Writable.");
String data = String.format("%d", Index);
byte[] req = data.getBytes();
sendbuffer.clear();
logger.debug(String.format("Server : Write %s", data));
sendbuffer = ByteBuffer.allocate(req.length);
sendbuffer.put(req);
sendbuffer.flip();
socketChannel.write(sendbuffer);
Index++;
socketChannel.register(dispatcher.getReadSelector(), SelectionKey.OP_READ);
}
}
}
 来自CODE的代码片

SocketWriteHandler.java

扩展

         首先,可以将Read和Write的Selector分开,这样理论上会更快,但是会出现一个问题。那就是服务器处理完OP_ACCEPT和第一次的read之后,就会一直处理写,直到接收到客户端的数据才开始读,这样客户端读取的东西就是错误的。我们需要加一个变量来控制,如果上一次是写,那么必须处理完读之后,才能开始第二次的写。
         其次,我们可以改造ServerDispatcher,现在维护的是3个selector,其实,应该维护3个SocketHandler,每个Handler里面自己创建新的selector,或者维护两个SocketHandler,一个accept,一个Read/Write,那么这就是什么的架构?这就是Grizzly的核心架构。如果我们维护两个线程池,第一个池子里面 只有一个线程--accept,第二个池子里面,放入读写的Handler,那么这就是Mina或者是Netty的基础架构。对于Netty来说,accept就是父类的handler,读写Handler就是子类的handler。

参考及代码

代码地址:https://github.com/JJZHK/NIOPractice.git
参考了以下图书及文章:
1. 《Netty权威指南》
2.http://www.iteye.com/topic/482269
3.http://blog.csdn.net/thomescai/article/details/6865406

抱歉!评论已关闭.