现在的位置: 首页 > 综合 > 正文

Mina之session

2012年02月08日 ⁄ 综合 ⁄ 共 22109字 ⁄ 字号 评论关闭

http://www.cnblogs.com/ggzwtj/archive/2011/10/14/2212095.html

Mina之session

 

1、IoSession与底层的传输层类型无关,表示通信双端的连接。提供用户自定义属性,可以用于在过滤器和处理器之间交换用户自定义协议相关信息。每个会话都由一个Service来提供服务,同时有一个Handler负责此会话的I/O事件处理。最重要的两个方法就是read和write,这两个方法都是异步执行,如要真正完成必须在其结果上进行等待。关闭会话的方法close也是异步执行的,也就是应等待返回的CloseFuture,此外,还有另一种关闭方式closeOnFlush,它和close的区别是会先flush掉写请求队列中的请求数据,但同样是异步的。会话的读写类型是可配置的,在运行中可设置此端是否可读写。

  一个会话主要包括两方面的数据:属性映射图和写请求队列,这里使用工厂模式来为新创建的会话提供这些数据结构,定义如下:

public interface IoSessionDataStructureFactory {
//返回属性
IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception;
//返回写请求队列
WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception;
}

2、IoSessionConfig表示会话的配置信息,主要包括:读缓冲区大小,会话数据吞吐量,计算吞吐量的时间间隔,指定会话段的空闲时间,写请求操作超时时间等。这个里面有两个方法需要注意,如下:

/*
* 只有在IoSession的read方法可用的时候返回true。 如果可用,受到的消息
* 保存在BlockingQueue这样对那个客户端应用程序来说更方便取到消息。开
* 启这个对服务器没什么好处,并且可能造成内存漏洞,默认是不开启的。
*/
boolean isUseReadOperation();
/*
* 打开或关闭IoSession的read方法。
*/
void setUseReadOperation(boolean useReadOperation);

3、IoSessionInitializer定义了一个回调函数,用于把用户自定义的会话初始化行为剥离出来:

public interface IoSessionInitializer<T extends IoFuture> {
void initializeSession(IoSession session, T future);
}

4、IoSessionRecycler为一个无连接的传输服务提供回收现有会话的服务:

public interface IoSessionRecycler {
/**
* 一个虚假的recycler(并不回收任何session)。但是用这个可以使得所有session
* 的生命周期事件被fired
*/
static IoSessionRecycler NOOP = new IoSessionRecycler() {
public void put(IoSession session) {}
public IoSession recycle(SocketAddress localAddress,SocketAddress remoteAddress) {
return null;
}
public void remove(IoSession session) {}
};
/*
* 创建或写Iossion的时候被调用。
*/
void put(IoSession session);
/*
* 尝试获取一个被回收了的IoSession。
*/
IoSession recycle(SocketAddress localAddress, SocketAddress remoteAddress);
/*
* 会话被关闭的时候调用。
*/
void remove(IoSession session);
}

ExpiringSessionRecycler是IoSessionRecycler的一个实现,用来回收超时失效的会话:

private ExpiringMap<Object, IoSession> sessionMap;//待处理的会话
private ExpiringMap<Object, IoSession>.Expirer mapExpirer;//负责具体的回收工作

下面来看Key是什么样的:

    private Object generateKey(SocketAddress localAddress,SocketAddress remoteAddress) {
List<SocketAddress> key = new ArrayList<SocketAddress>(2);
key.add(remoteAddress);
key.add(localAddress);
return key;
}

ExpiringMap中保存了超过限制的对象和该对象的监听器,如下:

public class ExpiringMap<K, V> implements Map<K, V> {
public static final int DEFAULT_TIME_TO_LIVE = 60;
public static final int DEFAULT_EXPIRATION_INTERVAL = 1;
private static volatile int expirerCount = 1;
private final ConcurrentHashMap<K, ExpiringObject> delegate;
private final CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners;
private final Expirer expirer;
}

其中的ExpiringObject表示一个超过限制的对象,是ExpiringMap的一个内部类,如下:

private class ExpiringObject {
private K key;
private V value;
private long lastAccessTime;//上次访问时间
private final ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock();
}

mapExpirer用来移除sessionMap上超过临界值的项,关键代码如下:

private void processExpires() {
long timeNow = System.currentTimeMillis();//当前时间
for (ExpiringObject o : delegate.values()) {
if (timeToLiveMillis <= 0) {
continue;
}
long timeIdle = timeNow - o.getLastAccessTime();
if (timeIdle >= timeToLiveMillis) {
delegate.remove(o.getKey());//移除
for (ExpirationListener<V> listener : expirationListeners) {
listener.expired(o.getValue());//终止监听?
}
}
}
}

启动关闭该县城都需要进行封锁机制。

5、Mina中的I/O事件类型如下:

public enum IoEventType {
SESSION_CREATED,
SESSION_OPENED,
SESSION_CLOSED,
MESSAGE_RECEIVED,
MESSAGE_SENT,
SESSION_IDLE,
EXCEPTION_CAUGHT,
WRITE,
CLOSE,
}

IoEvent表示一个I/O事件或者一个I/O请求,包括时间类型、所属会话、时间参数:

public class IoEvent implements Runnable {
private final IoEventType type;
private final IoSession session;
private final Object parameter;
public IoEvent(IoEventType type, IoSession session, Object parameter) {
//...
}
//根据事件类型向会话的过滤链上的众多监听者发出事件到来的信号。
public void fire() {
switch (getType()) {
case MESSAGE_RECEIVED:
getSession().getFilterChain().fireMessageReceived(getParameter());break;
case MESSAGE_SENT:
getSession().getFilterChain().fireMessageSent((WriteRequest) getParameter());break;
case WRITE:
getSession().getFilterChain().fireFilterWrite((WriteRequest) getParameter());break;
case CLOSE:
getSession().getFilterChain().fireFilterClose();break;
case EXCEPTION_CAUGHT:
getSession().getFilterChain().fireExceptionCaught((Throwable) getParameter());break;
case SESSION_IDLE:
getSession().getFilterChain().fireSessionIdle((IdleStatus) getParameter());break;
case SESSION_OPENED:
getSession().getFilterChain().fireSessionOpened();
break;
case SESSION_CREATED:
getSession().getFilterChain().fireSessionCreated();
break;
case SESSION_CLOSED:
getSession().getFilterChain().fireSessionClosed();
break;
default:
throw new IllegalArgumentException("Unknown event type: " + getType());
}
}
public String toString() {
if (getParameter() == null) {
return "[" + getSession() + "] " + getType().name();
}
return "[" + getSession() + "] " + getType().name() + ": "+ getParameter();
}
}

Mina的会话中,有三种类型的闲置状态:READER_IDLE读端空闲、WRITER_IDLE写端空闲、BOTH_IDLE读写都空闲。为了节省会话资源可以让用户设置当空闲超过一定时间后关闭会话,因为此会话可能在一段出现问题,从而导致另一端空闲超过太长时间。
6、DefaultIoSessionDataStructureFactory是IoSessionDataStructureFactory的一个默认的实现:

public class DefaultIoSessionDataStructureFactory implements IoSessionDataStructureFactory {
public IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception {
return new DefaultIoSessionAttributeMap();
}
public WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception {
return new DefaultWriteRequestQueue();
}
private static class DefaultIoSessionAttributeMap implements IoSessionAttributeMap {
private final Map<Object, Object> attributes = Collections.synchronizedMap(new HashMap<Object, Object>(4));
public DefaultIoSessionAttributeMap() {
super();
}
public Object getAttribute(IoSession session, Object key, Object defaultValue) {
if (key == null) {
throw new NullPointerException("key");
}
Object answer = attributes.get(key);
if (answer == null) {
return defaultValue;
}
return answer;
}

public Object setAttribute(IoSession session, Object key, Object value) {
if (key == null) {
throw new NullPointerException("key");
}
if (value == null) {
return attributes.remove(key);
}
return attributes.put(key, value);
}
public Object setAttributeIfAbsent(IoSession session, Object key, Object value) {
if (key == null) {
throw new NullPointerException("key");
}
if (value == null) {
return null;
}

Object oldValue;
synchronized (attributes) {
oldValue = attributes.get(key);
if (oldValue == null) {
attributes.put(key, value);
}
}
return oldValue;
}
public Object removeAttribute(IoSession session, Object key) {
if (key == null) {
throw new NullPointerException("key");
}
return attributes.remove(key);
}
public boolean removeAttribute(IoSession session, Object key, Object value) {
if (key == null) {
throw new NullPointerException("key");
}
if (value == null) {
return false;
}
synchronized (attributes) {
if (value.equals(attributes.get(key))) {
attributes.remove(key);
return true;
}
}
return false;
}
public boolean replaceAttribute(IoSession session, Object key, Object oldValue, Object newValue) {
synchronized (attributes) {
Object actualOldValue = attributes.get(key);
if (actualOldValue == null) {
return false;
}
if (actualOldValue.equals(oldValue)) {
attributes.put(key, newValue);
return true;
}
return false;
}
}
public boolean containsAttribute(IoSession session, Object key) {
return attributes.containsKey(key);
}
public Set<Object> getAttributeKeys(IoSession session) {
synchronized (attributes) {
return new HashSet<Object>(attributes.keySet());
}
}
public void dispose(IoSession session) throws Exception {}
}
private static class DefaultWriteRequestQueue implements WriteRequestQueue {
//一个队列存储传入的写请求
private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16);
public DefaultWriteRequestQueue() {
super();
}
//...
}
}

7、AbstractIoSession是IoSession的一个抽象实现类,如下:

private IoSessionAttributeMap attributes;//会话属性
private WriteRequestQueue writeRequestQueue;//写请求队列
private WriteRequest currentWriteRequest;//当前写请求

下面是关闭的时候涉及的成员:

//要结束当前会话时发送写请求CLOSE_REQUEST
private static final WriteRequest CLOSE_REQUEST =
new DefaultWriteRequest(new Object());
//在连接关闭时状态被设置为closed
private final CloseFuture closeFuture = new DefaultCloseFuture(this);
//关闭的监听器
private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
new IoFutureListener<CloseFuture>() {
public void operationComplete(CloseFuture future) {
AbstractIoSession session = (AbstractIoSession) future.getSession();
session.scheduledWriteBytes.set(0);
session.scheduledWriteMessages.set(0);
session.readBytesThroughput = 0;
session.readMessagesThroughput = 0;
session.writtenBytesThroughput = 0;
session.writtenMessagesThroughput = 0;
}
};

close和closeOnFlush都是异步操作的,区别是前者立即关闭连接,后者是在写请求队列中放入一个CLOSE_REQUEST并将其即时刷新储蓄,若要真正等到关闭完成,需要调用方法在返回的CloseFuture等待。下面是close的代码:

    public final CloseFuture close() {
synchronized (lock) {
if (isClosing()) {
return closeFuture;
}

closing = true;
}
getFilterChain().fireFilterClose();//fire出关闭事件
return closeFuture;
}

下面是closeOnFlush的代码:

    private final CloseFuture closeOnFlush() {
getWriteRequestQueue().offer(this, CLOSE_REQUEST);
getProcessor().flush(this);
return closeFuture;
}

对于读的情况,下面是取得读的数据队列:

    //返回可被读取数据队列
private Queue<ReadFuture> getReadyReadFutures() {
Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
//如果是第一次读取数据
if (readyReadFutures == null) {
//构造一个新的读数据队列
readyReadFutures = new CircularQueue<ReadFuture>();
Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY, readyReadFutures);
if (oldReadyReadFutures != null) {
readyReadFutures = oldReadyReadFutures;
}
}
return readyReadFutures;
}

读数据的过程:

    public final ReadFuture read() {
//配置不允许读数据
if (!getConfig().isUseReadOperation()) {
throw new IllegalStateException("useReadOperation is not enabled.");
}
//获得已经被许可的可被读数据队列
Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
ReadFuture future;
synchronized (readyReadFutures) {
future = readyReadFutures.poll();
if (future != null) {
//如果关联的会话已关闭,通知读者
if (future.isClosed()) {
readyReadFutures.offer(future);
}
} else {
future = new DefaultReadFuture(this);
//将数据出入等待读的队列
getWaitingReadFutures().offer(future);
}
}
return future;
}

写数据的过程:

    //IoBuffer、文件、文件部分区域
public WriteFuture write(Object message, SocketAddress remoteAddress) {
if (message == null) {
throw new NullPointerException("message");
}
//如果没有远端地址
if (!getTransportMetadata().isConnectionless() && remoteAddress != null) {
throw new UnsupportedOperationException();
}
//如果会话已被关闭
if (isClosing() || !isConnected()) {
WriteFuture future = new DefaultWriteFuture(this);
WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
WriteException writeException = new WriteToClosedSessionException(request);
future.setException(writeException);
return future;
}
FileChannel openedFileChannel = null;
try {
if (message instanceof IoBuffer && !((IoBuffer) message).hasRemaining()) {//如果是空消息
throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
} else if (message instanceof FileChannel) {//文件的某一区域
FileChannel fileChannel = (FileChannel) message;
message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
} else if (message instanceof File) {//要发送的是文件
File file = (File) message;
openedFileChannel = new FileInputStream(file).getChannel();//打开文件通道
message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());
}
} catch (IOException e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
return DefaultWriteFuture.newNotWrittenFuture(this, e);
}
//构造写请求,通过过滤器链发送出去
WriteFuture writeFuture = new DefaultWriteFuture(this);
WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
IoFilterChain filterChain = getFilterChain();
filterChain.fireFilterWrite(writeRequest);

//如果打开文件通道应该在完成时关闭通道
if (openedFileChannel != null) {
final FileChannel finalChannel = openedFileChannel;
writeFuture.addListener(new IoFutureListener<WriteFuture>() {
public void operationComplete(WriteFuture future) {
try {
finalChannel.close();
} catch (IOException e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
});
}
return writeFuture;
}
 

1、IoSession与底层的传输层类型无关,表示通信双端的连接。提供用户自定义属性,可以用于在过滤器和处理器之间交换用户自定义协议相关信息。每个会话都由一个Service来提供服务,同时有一个Handler负责此会话的I/O事件处理。最重要的两个方法就是read和write,这两个方法都是异步执行,如要真正完成必须在其结果上进行等待。关闭会话的方法close也是异步执行的,也就是应等待返回的CloseFuture,此外,还有另一种关闭方式closeOnFlush,它和close的区别是会先flush掉写请求队列中的请求数据,但同样是异步的。会话的读写类型是可配置的,在运行中可设置此端是否可读写。

  一个会话主要包括两方面的数据:属性映射图和写请求队列,这里使用工厂模式来为新创建的会话提供这些数据结构,定义如下:

public interface IoSessionDataStructureFactory {
//返回属性
IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception;
//返回写请求队列
WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception;
}

2、IoSessionConfig表示会话的配置信息,主要包括:读缓冲区大小,会话数据吞吐量,计算吞吐量的时间间隔,指定会话段的空闲时间,写请求操作超时时间等。这个里面有两个方法需要注意,如下:

/*
* 只有在IoSession的read方法可用的时候返回true。 如果可用,受到的消息
* 保存在BlockingQueue这样对那个客户端应用程序来说更方便取到消息。开
* 启这个对服务器没什么好处,并且可能造成内存漏洞,默认是不开启的。
*/
boolean isUseReadOperation();
/*
* 打开或关闭IoSession的read方法。
*/
void setUseReadOperation(boolean useReadOperation);

3、IoSessionInitializer定义了一个回调函数,用于把用户自定义的会话初始化行为剥离出来:

public interface IoSessionInitializer<T extends IoFuture> {
void initializeSession(IoSession session, T future);
}

4、IoSessionRecycler为一个无连接的传输服务提供回收现有会话的服务:

public interface IoSessionRecycler {
/**
* 一个虚假的recycler(并不回收任何session)。但是用这个可以使得所有session
* 的生命周期事件被fired
*/
static IoSessionRecycler NOOP = new IoSessionRecycler() {
public void put(IoSession session) {}
public IoSession recycle(SocketAddress localAddress,SocketAddress remoteAddress) {
return null;
}
public void remove(IoSession session) {}
};
/*
* 创建或写Iossion的时候被调用。
*/
void put(IoSession session);
/*
* 尝试获取一个被回收了的IoSession。
*/
IoSession recycle(SocketAddress localAddress, SocketAddress remoteAddress);
/*
* 会话被关闭的时候调用。
*/
void remove(IoSession session);
}

ExpiringSessionRecycler是IoSessionRecycler的一个实现,用来回收超时失效的会话:

private ExpiringMap<Object, IoSession> sessionMap;//待处理的会话
private ExpiringMap<Object, IoSession>.Expirer mapExpirer;//负责具体的回收工作

下面来看Key是什么样的:

    private Object generateKey(SocketAddress localAddress,SocketAddress remoteAddress) {
List<SocketAddress> key = new ArrayList<SocketAddress>(2);
key.add(remoteAddress);
key.add(localAddress);
return key;
}

ExpiringMap中保存了超过限制的对象和该对象的监听器,如下:

public class ExpiringMap<K, V> implements Map<K, V> {
public static final int DEFAULT_TIME_TO_LIVE = 60;
public static final int DEFAULT_EXPIRATION_INTERVAL = 1;
private static volatile int expirerCount = 1;
private final ConcurrentHashMap<K, ExpiringObject> delegate;
private final CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners;
private final Expirer expirer;
}

其中的ExpiringObject表示一个超过限制的对象,是ExpiringMap的一个内部类,如下:

private class ExpiringObject {
private K key;
private V value;
private long lastAccessTime;//上次访问时间
private final ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock();
}

mapExpirer用来移除sessionMap上超过临界值的项,关键代码如下:

private void processExpires() {
long timeNow = System.currentTimeMillis();//当前时间
for (ExpiringObject o : delegate.values()) {
if (timeToLiveMillis <= 0) {
continue;
}
long timeIdle = timeNow - o.getLastAccessTime();
if (timeIdle >= timeToLiveMillis) {
delegate.remove(o.getKey());//移除
for (ExpirationListener<V> listener : expirationListeners) {
listener.expired(o.getValue());//终止监听?
}
}
}
}

启动关闭该县城都需要进行封锁机制。

5、Mina中的I/O事件类型如下:

public enum IoEventType {
SESSION_CREATED,
SESSION_OPENED,
SESSION_CLOSED,
MESSAGE_RECEIVED,
MESSAGE_SENT,
SESSION_IDLE,
EXCEPTION_CAUGHT,
WRITE,
CLOSE,
}

IoEvent表示一个I/O事件或者一个I/O请求,包括时间类型、所属会话、时间参数:

public class IoEvent implements Runnable {
private final IoEventType type;
private final IoSession session;
private final Object parameter;
public IoEvent(IoEventType type, IoSession session, Object parameter) {
//...
}
//根据事件类型向会话的过滤链上的众多监听者发出事件到来的信号。
public void fire() {
switch (getType()) {
case MESSAGE_RECEIVED:
getSession().getFilterChain().fireMessageReceived(getParameter());break;
case MESSAGE_SENT:
getSession().getFilterChain().fireMessageSent((WriteRequest) getParameter());break;
case WRITE:
getSession().getFilterChain().fireFilterWrite((WriteRequest) getParameter());break;
case CLOSE:
getSession().getFilterChain().fireFilterClose();break;
case EXCEPTION_CAUGHT:
getSession().getFilterChain().fireExceptionCaught((Throwable) getParameter());break;
case SESSION_IDLE:
getSession().getFilterChain().fireSessionIdle((IdleStatus) getParameter());break;
case SESSION_OPENED:
getSession().getFilterChain().fireSessionOpened();
break;
case SESSION_CREATED:
getSession().getFilterChain().fireSessionCreated();
break;
case SESSION_CLOSED:
getSession().getFilterChain().fireSessionClosed();
break;
default:
throw new IllegalArgumentException("Unknown event type: " + getType());
}
}
public String toString() {
if (getParameter() == null) {
return "[" + getSession() + "] " + getType().name();
}
return "[" + getSession() + "] " + getType().name() + ": "+ getParameter();
}
}

Mina的会话中,有三种类型的闲置状态:READER_IDLE读端空闲、WRITER_IDLE写端空闲、BOTH_IDLE读写都空闲。为了节省会话资源可以让用户设置当空闲超过一定时间后关闭会话,因为此会话可能在一段出现问题,从而导致另一端空闲超过太长时间。
6、DefaultIoSessionDataStructureFactory是IoSessionDataStructureFactory的一个默认的实现:

public class DefaultIoSessionDataStructureFactory implements IoSessionDataStructureFactory {
public IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception {
return new DefaultIoSessionAttributeMap();
}
public WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception {
return new DefaultWriteRequestQueue();
}
private static class DefaultIoSessionAttributeMap implements IoSessionAttributeMap {
private final Map<Object, Object> attributes = Collections.synchronizedMap(new HashMap<Object, Object>(4));
public DefaultIoSessionAttributeMap() {
super();
}
public Object getAttribute(IoSession session, Object key, Object defaultValue) {
if (key == null) {
throw new NullPointerException("key");
}
Object answer = attributes.get(key);
if (answer == null) {
return defaultValue;
}
return answer;
}

public Object setAttribute(IoSession session, Object key, Object value) {
if (key == null) {
throw new NullPointerException("key");
}
if (value == null) {
return attributes.remove(key);
}
return attributes.put(key, value);
}
public Object setAttributeIfAbsent(IoSession session, Object key, Object value) {
if (key == null) {
throw new NullPointerException("key");
}
if (value == null) {
return null;
}

Object oldValue;
synchronized (attributes) {
oldValue = attributes.get(key);
if (oldValue == null) {
attributes.put(key, value);
}
}
return oldValue;
}
public Object removeAttribute(IoSession session, Object key) {
if (key == null) {
throw new NullPointerException("key");
}
return attributes.remove(key);
}
public boolean removeAttribute(IoSession session, Object key, Object value) {
if (key == null) {
throw new NullPointerException("key");
}
if (value == null) {
return false;
}
synchronized (attributes) {
if (value.equals(attributes.get(key))) {
attributes.remove(key);
return true;
}
}
return false;
}
public boolean replaceAttribute(IoSession session, Object key, Object oldValue, Object newValue) {
synchronized (attributes) {
Object actualOldValue = attributes.get(key);
if (actualOldValue == null) {
return false;
}
if (actualOldValue.equals(oldValue)) {
attributes.put(key, newValue);
return true;
}
return false;
}
}
public boolean containsAttribute(IoSession session, Object key) {
return attributes.containsKey(key);
}
public Set<Object> getAttributeKeys(IoSession session) {
synchronized (attributes) {
return new HashSet<Object>(attributes.keySet());
}
}
public void dispose(IoSession session) throws Exception {}
}
private static class DefaultWriteRequestQueue implements WriteRequestQueue {
//一个队列存储传入的写请求
private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16);
public DefaultWriteRequestQueue() {
super();
}
//...
}
}

7、AbstractIoSession是IoSession的一个抽象实现类,如下:

private IoSessionAttributeMap attributes;//会话属性
private WriteRequestQueue writeRequestQueue;//写请求队列
private WriteRequest currentWriteRequest;//当前写请求

下面是关闭的时候涉及的成员:

//要结束当前会话时发送写请求CLOSE_REQUEST
private static final WriteRequest CLOSE_REQUEST =
new DefaultWriteRequest(new Object());
//在连接关闭时状态被设置为closed
private final CloseFuture closeFuture = new DefaultCloseFuture(this);
//关闭的监听器
private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
new IoFutureListener<CloseFuture>() {
public void operationComplete(CloseFuture future) {
AbstractIoSession session = (AbstractIoSession) future.getSession();
session.scheduledWriteBytes.set(0);
session.scheduledWriteMessages.set(0);
session.readBytesThroughput = 0;
session.readMessagesThroughput = 0;
session.writtenBytesThroughput = 0;
session.writtenMessagesThroughput = 0;
}
};

close和closeOnFlush都是异步操作的,区别是前者立即关闭连接,后者是在写请求队列中放入一个CLOSE_REQUEST并将其即时刷新储蓄,若要真正等到关闭完成,需要调用方法在返回的CloseFuture等待。下面是close的代码:

    public final CloseFuture close() {
synchronized (lock) {
if (isClosing()) {
return closeFuture;
}

closing = true;
}
getFilterChain().fireFilterClose();//fire出关闭事件
return closeFuture;
}

下面是closeOnFlush的代码:

    private final CloseFuture closeOnFlush() {
getWriteRequestQueue().offer(this, CLOSE_REQUEST);
getProcessor().flush(this);
return closeFuture;
}

对于读的情况,下面是取得读的数据队列:

    //返回可被读取数据队列
private Queue<ReadFuture> getReadyReadFutures() {
Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
//如果是第一次读取数据
if (readyReadFutures == null) {
//构造一个新的读数据队列
readyReadFutures = new CircularQueue<ReadFuture>();
Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY, readyReadFutures);
if (oldReadyReadFutures != null) {
readyReadFutures = oldReadyReadFutures;
}
}
return readyReadFutures;
}

读数据的过程:

    public final ReadFuture read() {
//配置不允许读数据
if (!getConfig().isUseReadOperation()) {
throw new IllegalStateException("useReadOperation is not enabled.");
}
//获得已经被许可的可被读数据队列
Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
ReadFuture future;
synchronized (readyReadFutures) {
future = readyReadFutures.poll();
if (future != null) {
//如果关联的会话已关闭,通知读者
if (future.isClosed()) {
readyReadFutures.offer(future);
}
} else {
future = new DefaultReadFuture(this);
//将数据出入等待读的队列
getWaitingReadFutures().offer(future);
}
}
return future;
}

写数据的过程:

    //IoBuffer、文件、文件部分区域
public WriteFuture write(Object message, SocketAddress remoteAddress) {
if (message == null) {
throw new NullPointerException("message");
}
//如果没有远端地址
if (!getTransportMetadata().isConnectionless() && remoteAddress != null) {
throw new UnsupportedOperationException();
}
//如果会话已被关闭
if (isClosing() || !isConnected()) {
WriteFuture future = new DefaultWriteFuture(this);
WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
WriteException writeException = new WriteToClosedSessionException(request);
future.setException(writeException);
return future;
}
FileChannel openedFileChannel = null;
try {
if (message instanceof IoBuffer && !((IoBuffer) message).hasRemaining()) {//如果是空消息
throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
} else if (message instanceof FileChannel) {//文件的某一区域
FileChannel fileChannel = (FileChannel) message;
message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
} else if (message instanceof File) {//要发送的是文件
File file = (File) message;
openedFileChannel = new FileInputStream(file).getChannel();//打开文件通道
message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());
}
} catch (IOException e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
return DefaultWriteFuture.newNotWrittenFuture(this, e);
}
//构造写请求,通过过滤器链发送出去
WriteFuture writeFuture = new DefaultWriteFuture(this);
WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
IoFilterChain filterChain = getFilterChain();
filterChain.fireFilterWrite(writeRequest);

//如果打开文件通道应该在完成时关闭通道
if (openedFileChannel != null) {
final FileChannel finalChannel = openedFileChannel;
writeFuture.addListener(new IoFutureListener<WriteFuture>() {
public void operationComplete(WriteFuture future) {
try {
finalChannel.close();
} catch (IOException e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
});
}
return writeFuture;
}

抱歉!评论已关闭.