现在的位置: 首页 > 综合 > 正文

MINA官网教程

2013年10月09日 ⁄ 综合 ⁄ 共 14503字 ⁄ 字号 评论关闭

简介: Apache MINA 2 是一个开发高性能和高可伸缩性网络应用程序的网络应用框架。它提供了一个抽象的事件驱动的异步 API,可以使用 TCP/IPUDP/IP、串口和虚拟机内部的管道等传输方式。Apache MINA 2 可以作为开发网络应用程序的一个良好基础。本文将介绍 Apache MINA 2 的基本概念和 API,包括 I/O 服务、I/O 会话、I/O 过滤器和 I/O 处理器。另外还将介绍如何使用状态机。本文包含简单的计算器服务和复杂的联机游戏两个示例应用。

Apache MINA 2 是一个开发高性能和高可伸缩性网络应用程序的网络应用框架。它提供了一个抽象的事件驱动的异步 API,可以使用 TCP/IPUDP/IP、串口和虚拟机内部的管道等传输方式。Apache MINA 2 可以作为开发网络应用程序的一个良好基础。下面将首先简单介绍一下 Apache MINA 2

Apache MINA 2 介绍

Apache MINA  Apache 基金会的一个开源项目,目前最新的版本是 2.0.0-RC1。本文中使用的版本是 2.0.0-M6。从参考资料中可以找到相关的下载信息。下面首先介绍基于 Apache MINA 的网络应用的一般架构。

基于 Apache MINA 的网络应用的架构

基于 Apache MINA 开发的网络应用,有着相似的架构。 1中给出了架构的示意图。

 1. 基于 Apache MINA 的网络应用的架构

 1所示,基于 Apache MINA 的网络应用有三个层次,分别是 I/O 服务、I/O 过滤器和 I/O 处理器:

I/O 服务:I/O 服务用来执行实际的 I/O 操作。Apache MINA 已经提供了一系列支持不同协议的 I/O 服务,如 TCP/IPUDP/IP、串口和虚拟机内部的管道等。开发人员也可以实现自己的 I/O 服务。

I/O 过滤器:I/O 服务能够传输的是字节流,而上层应用需要的是特定的对象与数据结构。I/O 过滤器用来完成这两者之间的转换。I/O 过滤器的另外一个重要作用是对输入输出的数据进行处理,满足横切的需求。多个 I/O 过滤器串联起来,形成 I/O 过滤器链。

I/O 处理器:I/O 处理器用来执行具体的业务逻辑。对接收到的消息执行特定的处理。

创建一个完整的基于 Apache MINA 的网络应用,需要分别构建这三个层次。Apache MINA 已经为 I/O 服务和 I/O 过滤器提供了不少的实现,因此这两个层次在大多数情况下可以使用已有的实现。I/O 处理器由于是与具体的业务相关的,一般来说都是需要自己来实现的。

事件驱动的 API

Apache MINA 提供的是事件驱动的 API。它把与网络相关的各种活动抽象成事件。网络应用只需要对其感兴趣的事件进行处理即可。事件驱动的 API 使得基于 Apache MINA 开发网络应用变得比较简单。应用不需要考虑与底层传输相关的具体细节,而只需要处理抽象的 I/O 事件。比如在实现一个服务端应用的时候,如果有新的连接进来,I/O 服务会产生sessionOpened这样一个事件。如果该应用需要在有连接打开的时候,执行某些特定的操作,只需要在 I/O 处理器中此事件处理方法sessionOpened中添加相应的代码即可。

在介绍 Apache MINA 中的基本概念的细节之前,首先通过一个简单的应用来熟悉上面提到的三个层次的具体职责。

回页首

从简单应用开始

在使用 Apache MINA 开发复杂的应用之前,首先将介绍一个简单的应用。通过此应用可以熟悉上面提到的三个层次,即 I/O 服务、I/O 过滤器和 I/O 处理器。该应用是一个简单的计算器服务,客户端发送要计算的表达式给服务器,服务器返回计算结果。比如客户端发送2+2,服务器返回4.0作为结果。

在实现此计算器的时候,首先需要考虑的是 I/O 服务。该计算器使用 TCP/IP 协议,需要在指定端口监听,接受客户端的连接。Apache MINA 提供了基于 Java NIO 的套接字实现,可以直接使用。其次要考虑的是 I/O 过滤器。I/O 过滤器过滤所有的 I/O 事件和请求,可以用来处理横切的需求,如记录日志、压缩等。最后就是 I/O 处理器。I/O 处理器用来处理业务逻辑。具体到该应用来说,就是在接收到消息之后,把该消息作为一个表达式来执行,并把结果发送回去。I/O 处理器需要实现org.apache.mina.core.service.IoHandler接口或者继承自org.apache.mina.core.service.IoHandlerAdapter该应用的 I/O 处理器的实现如清单 1所示。


清单 1. 计算器服务的 I/O 处理器CalculatorHandler

  public class CalculatorHandler extends IoHandlerAdapter { 

    private static final Logger LOGGER = LoggerFactory 

        .getLogger(CalculatorHandler.class); 

    private ScriptEngine jsEngine = null; 

    public CalculatorHandler() { 

        ScriptEngineManager sfm = new ScriptEngineManager(); 

        jsEngine = sfm.getEngineByName("JavaScript"); 

        if (jsEngine == null) { 

            throw new RuntimeException("找不到 JavaScript 引擎。"); 

        } 

    } 

    public void exceptionCaught(IoSession session, Throwable cause) 

        throws Exception { 

        LOGGER.warn(cause.getMessage(), cause); 

    } 

    public void messageReceived(IoSession session, Object message) 

        throws Exception { 

        String expression = message.toString(); 

        if ("quit".equalsIgnoreCase(expression.trim())) { 

            session.close(true); 

            return; 

        } 

        try { 

            Object result = jsEngine.eval(expression); 

            session.write(result.toString()); 

        } catch (ScriptException e) { 

            LOGGER.warn(e.getMessage(), e); 

            session.write("Wrong expression, try again."); 

        } 

    } 

}

   

清单 1中,messageReceivedIoHandler接口声明。当接收到新的消息的时候,该方法就会被调用。此处的逻辑是如果传入了“quit”,则通过session.close关闭当前连接;如果不是的话,就执行该表达式并把结果通过session.write发送回去。此处执行表达式用的是 JDK 6 中提供的 JavaScript 脚本引擎。此处使用到了 I/O 会话相关的方法,会在下面进行说明。

接下来只需要把 I/O 处理器和 I/O 过滤器配置到 I/O 服务上就可以了。具体的实现如清单 2所示。


清单 2. 计算器服务主程序 CalculatorServer

public class CalculatorServer { 

    private static final int PORT = 10010; 

    private static final Logger LOGGER = LoggerFactory 

        .getLogger(CalculatorServer.class); 

    public static void main(String[] args) throws IOException { 

        IoAcceptor acceptor = new NioSocketAcceptor(); 

        acceptor.getFilterChain().addLast("logger", new LoggingFilter()); 

        acceptor.getFilterChain().addLast( 

            "codec", 

            new ProtocolCodecFilter(new TextLineCodecFactory(Charset 

                .forName("UTF-8")))); 

        acceptor.setHandler(new CalculatorHandler()); 

        acceptor.bind(new InetSocketAddress(PORT)); 

        LOGGER.info("计算器服务已启动,端口是" + PORT); 

    } 

}

 

清单 2中,首先创建一个org.apache.mina.transport.socket.nio.NioSocketAcceptor的实例,由它提供 I/O 服务;接着获得该 I/O 服务的过滤器链,并添加两个新的过滤器,一个用来记录相关日志,另外一个用来在字节流和文本之间进行转换;最后配置 I/O 处理器。完成这些之后,通过bind方法来在特定的端口进行监听,接收连接。服务器启动之后,可以通过操作系统自带的 Telnet 工具来进行测试,如 2所示。在输入表达式之后,计算结果会出现在下面一行。


 2. 使用 Telnet 工具测试计算器服务

在介绍了简单的计算器服务这个应用之后,下面说明本文中会使用的复杂的联机游戏应用。

回页首

联机游戏示例说明

上一节中给出了一个简单的基于 Apache MINA 的网络应用的实现,可以用来熟悉基本的架构。而在实际开发中,网络应用都是有一定复杂度的。下面会以一个比较复杂的联机游戏作为示例来详细介绍 Apache MINA 的概念、API 和典型用法。

该联机游戏支持两个人进行俄罗斯方块的对战。这个游戏借鉴了 QQ 火拼俄罗斯。用户在启动客户端之后,需要输入一个昵称进行注册。用户可以在游戏大厅中查看当前已注册的所有其它用户。当前用户可以选择另外的一个用户发送游戏邀请。邀请被接受之后就可以开始进行对战。在游戏过程中,当前用户可以看到对方的游戏状态,即方块的情况。该游戏的运行效果如 3所示。


 3. 联机游戏示例运行效果图

下面开始以这个应用为例来具体介绍 Apache MINA 中的基本概念。先从 I/O 服务开始。

回页首

I/O 服务

I/O 服务用来执行真正的 I/O 操作,以及管理 I/O 会话。根据所使用的数据传输方式的不同,有不同的 I/O 服务的实现。由于 I/O 服务执行的是输入和输出两种操作,实际上有两种具体的子类型。一种称为“I/O 接受器(I/O acceptor,用来接受连接,一般用在服务器的实现中;另外一种称为“I/O 连接器(I/O connector,用来发起连接,一般用在客户端的实现中。对应在 Apache MINA 中的实现,org.apache.mina.core.service.IoService I/O 服务的接口,而继承自它的接口org.apache.mina.core.service.IoAcceptororg.apache.mina.core.service.IoConnector则分别表示 I/O 接受器和 I/O 连接器。IoService接口提供的重要方法如 1所示。


 1. IoService 中的重要方法

方法

说明

setHandler(IoHandler handler)

设置 I/O 处理器。该 I/O 处理器会负责处理该 I/O 服务所管理的所有 I/O 会话产生的 I/O 事件。

getFilterChain()

获取 I/O 过滤器链,可以对 I/O 过滤器进行管理,包括添加和删除 I/O 过滤器。

getManagedSessions()

获取该 I/O 服务所管理的 I/O 会话。

下面具体介绍 I/O 接受器和 I/O 连接器。

I/O 接受器

I/O 接受器用来接受连接,与对等体(客户端)进行通讯,并发出相应的 I/O 事件交给 I/O 处理器来处理。使用 I/O 接受器的时候,只需要调用bind方法并指定要监听的套接字地址。当不再接受连接的时候,调用unbind停止监听即可。关于 I/O 接受器的具体用法,可以参考清单 2中给出的计算器服务的实现。

I/O 连接器

I/O 连接器用来发起连接,与对等体(服务器)进行通讯,并发出相应的 I/O 事件交给 I/O 处理器来处理。使用 I/O 连接器的时候,只需要调用connect方法连接指定的套接字地址。另外可以通过setConnectTimeoutMillis设置连接超时时间(毫秒数)。

清单 3中给出了使用 I/O 连接器的一个示例。


清单 3. I/O 连接器示例

SocketConnector connector = new NioSocketConnector(); 

connector.setConnectTimeoutMillis(CONNECT_TIMEOUT); 

connector.getFilterChain().addLast("logger", new LoggingFilter()); 

connector.getFilterChain().addLast("protocol", 

    new ProtocolCodecFilter(new TetrisCodecFactory())); 

ConnectFuture connectFuture = connector.connect(new InetSocketAddress(host, port)); 

connectFuture.awaitUninterruptibly(); 

清单 3中,首先创建一个 Java NIO 的套接字连接器NioSocketConnector的实例,接着设置超时时间。再添加了 I/O 过滤器之后,通过connect方法连接到指定的地址和端口即可。

在介绍完 I/O 服务之后,下面介绍 I/O 会话。

回页首

I/O 会话

I/O 会话表示一个活动的网络连接,与所使用的传输方式无关。I/O 会话可以用来存储用户自定义的与应用相关的属性。这些属性通常用来保存应用的状态信息,还可以用来在 I/O 过滤器和 I/O 处理器之间交换数据。I/O 会话在作用上类似于 Servlet 规范中的 HTTP 会话。

Apache MINA  I/O 会话实现的接口是org.apache.mina.core.session.IoSession。该接口中比较重要的方法如 2所示。


 2. IoSession 中的重要方法

方法

说明

close(boolean immediately)

关闭当前连接。如果参数immediately为true的话,连接会等到队列中所有的数据发送请求都完成之后才关闭;否则的话就立即关闭。

getAttribute(Object key)

从 I/O 会话中获取键为key的用户自定义的属性。

setAttribute(Object key, Object value)

将键为key,值为value的用户自定义的属性存储到 I/O 会话中。

removeAttribute(Object key)

从 I/O 会话中删除键为key的用户自定义的属性。

write(Object message)

将消息对象message发送到当前连接的对等体。该方法是异步的,当消息被真正发送到对等体的时候,IoHandler.messageSent(IoSession,Object)会被调用。如果需要的话,也可以等消息真正发送出去之后再继续执行后续操作。

在介绍完 I/O 会话之后,下面介绍 I/O 过滤器。

回页首

I/O 过滤器

 I/O 服务发送过来的所有 I/O 事件和请求,在到达 I/O 处理器之前,会先由 I/O 过滤器链中的 I/O 过滤器进行处理。Apache MINA 中的过滤器与 Servlet 规范中的过滤器是类似的。过滤器可以在很多情况下使用,比如记录日志、性能分析、访问控制、负载均衡和消息转换等。过滤器非常适合满足网络应用中各种横切的非功能性需求。在一个基于 Apache MINA 的网络应用中,一般存在多个过滤器。这些过滤器互相串联,形成链条,称为过滤器链。每个过滤器依次对传入的 I/O 事件进行处理。当前过滤器完成处理之后,由过滤器链中的下一个过滤器继续处理。当前过滤器也可以不调用下一个过滤器,而提前结束,这样 I/O 事件就不会继续往后传递。比如负责用户认证的过滤器,如果遇到未认证的对等体发出的 I/O 事件,则会直接关闭连接。这可以保证这些事件不会通过此过滤器到达 I/O 处理器。

Apache MINA  I/O 过滤器都实现org.apache.mina.core.filterchain.IoFilter接口。一般来说,不需要完整实现IOFilter接口,只需要继承 Apache MINA 提供的适配器org.apache.mina.core.filterchain.IoFilterAdapter,并覆写所需的事件过滤方法即可,其它方法的默认实现是不做任何处理,而直接把事件转发到下一个过滤器。

IoFilter 接口详细说明

IoFilter接口提供了 15 个方法。这 15 个方法大致分成两类,一类是与过滤器的生命周期相关的,另外一类是用来过滤 I/O 事件的。第一类方法如 3所示。


 3. IoFilter 中与过滤器的生命周期相关的方法

方法

说明

init()

当过滤器第一次被添加到过滤器链中的时候,此方法被调用。用来完成过滤器的初始化工作。

onPreAdd(IoFilterChain parent, String name, IoFilter.NextFilter nextFilter)

当过滤器即将被添加到过滤器链中的时候,此方法被调用。

onPostAdd(IoFilterChain parent, String name, IoFilter.NextFilter nextFilter)

当过滤器已经被添加到过滤器链中之后,此方法被调用。

onPreRemove(IoFilterChain parent, String name, IoFilter.NextFilter nextFilter)

当过滤器即将被从过滤器链中删除的时候,此方法被调用。

onPostRemove(IoFilterChain parent, String name, IoFilter.NextFilter nextFilter)

当过滤器已经被从过滤器链中删除的时候,此方法被调用。

destroy()

当过滤器不再需要的时候,它将被销毁,此方法被调用。

 3中给出的方法中,参数parent表示包含此过滤器的过滤器链,参数name表示过滤器的名称,参数nextFilter表示过滤器链中的下一个过滤器。

第二类方法如 4所示。


 4. IoFilter 中过滤 I/O 事件的方法

方法

说明

filterClose(IoFilter.NextFilter nextFilter, IoSession session)

过滤对IoSession的close方法的调用。

filterWrite(IoFilter.NextFilter nextFilter, IoSession session, WriteRequest writeRequest)

过滤对IoSession的write方法的调用。

exceptionCaught(IoFilter.NextFilter nextFilter, IoSession session, Throwable cause)

过滤对IoHandler的exceptionCaught方法的调用。

messageReceived(IoFilter.NextFilter nextFilter, IoSession session, Object message)

过滤对IoHandler的messageReceived方法的调用。

messageSent(IoFilter.NextFilter nextFilter, IoSession session, WriteRequest writeRequest)

过滤对IoHandler的messageSent方法的调用。

sessionClosed(IoFilter.NextFilter nextFilter, IoSession session)

过滤对IoHandler的sessionClosed方法的调用。

sessionCreated(IoFilter.NextFilter nextFilter, IoSession session)

过滤对IoHandler的sessionCreated方法的调用。

sessionIdle(IoFilter.NextFilter nextFilter, IoSession session, IdleStatus status)

过滤对IoHandler的sessionIdle方法的调用。

sessionOpened(IoFilter.NextFilter nextFilter, IoSession session)

过滤对IoHandler的sessionOpened方法的调用。

对于 4中给出的与 I/O 事件相关的方法,它们都有一个参数是nextFilter,表示过滤器链中的下一个过滤器。如果当前过滤器完成处理之后,可以通过调用nextFilter中的方法,把 I/O 事件传递到下一个过滤器。如果当前过滤器不调用nextFilter中的方法的话,该 I/O 事件就不能继续往后传递。另外一个共同的参数是session,用来表示当前的 I/O 会话,可以用来发送消息给对等体。下面通过具体的实例来说明过滤器的实现。

BlacklistFilter

BlacklistFilter Apache MINA 自带的一个过滤器实现,其功能是阻止来自特定地址的连接,即所谓的黑名单功能。BlacklistFilter继承自IoFilterAdapter,并覆写了IoHandler相关的方法。清单 4中给出了部分实现。


清单 4. 阻止来自特定地址连接的 BlacklistFilter

public void messageReceived(NextFilter nextFilter, IoSession session, Object message) { 

    if (!isBlocked(session)) { 

        nextFilter.messageReceived(session, message); 

    } else { 

        blockSession(session); 

    } 

private void blockSession(IoSession session) { 

    session.close(true); 

}

清单 4messageReceived方法的实现中,首先通过isBlocked来判断当前连接是否应该被阻止,如果不是的话,则通过nextFilter.messageReceived把该 I/O 事件传递到下一个过滤器;否则的话,则通过blockSession来阻止当前连接。

使用 ProtocolCodecFilter

ProtocolCodecFilter用来在字节流和消息对象之间互相转换。当该过滤器接收到字节流的时候,需要首先判断消息的边界,然后把表示一条消息的字节提取出来,通过一定的逻辑转换成消息对象,再把消息对象往后传递,交给 I/O 处理器来执行业务逻辑。这个过程称为解码。与解码对应的是编码过程。在编码的时候,过滤器接收到的是消息对象,通过与解码相反的逻辑,把消息对象转换成字节,并反向传递,交给 I/O 服务来执行 I/O 操作。

编码解码中的一个重要问题是如何在字节流中判断消息的边界。通常来说,有三种办法解决这个问题:

使用固定长度的消息。这种方式实现起来比较简单,只需要每次读取特定数量的字节即可。

使用固定长度的消息头来指明消息主体的长度。比如每个消息开始的 4 个字节的值表示了后面紧跟的消息主体的长度。只需要首先读取该长度,再读取指定数量的字节即可。

使用分隔符。消息之间通过特定模式的分隔符来分隔。每次只要遇到该模式的字节,就表示到了一个消息的末尾。

具体到示例应用来说,客户端和服务器之间的通信协议比较复杂,有不同种类的消息。每种消息的格式都不相同,同类消息的内容也不尽相同。因此,使用固定长度的消息头来指明消息主体的长度就成了最好的选择。

示例应用中的每种消息主体由两部分组成,第一部分是固定长度的消息类别名称,第二部分是每种消息的主体内容。 4中给出了示例应用中一条完整的消息的结构。


 4. 示例应用中消息的结构

AbstractTetrisCommand用来描述联机游戏示例应用中的消息。它是一个抽象类,是所有具体消息的基类。其具体实现如清单 5所示。


清单 5. 联机游戏示例应用中的消息 AbstractTetrisCommand

public abstract class AbstractTetrisCommand implements TetrisCommand {         

    public abstract String getName(); 

    public abstract byte[] bodyToBytes() throws Exception; 

        

    public abstract void bodyFromBytes(byte[] bytes) throws Exception; 

    public byte[] toBytes() throws Exception { 

        byte[] body = bodyToBytes(); 

        int commandNameLength = Constants.COMMAND_NAME_LENGTH; 

        int len = commandNameLength + body.length; 

        byte[] bytes = new byte[len]; 

        String name = StringUtils.rightPad(getName(), commandNameLength, 

            Constants.COMMAND_NAME_PAD_CHAR); 

        name = name.substring(0, commandNameLength); 

        System.arraycopy(name.getBytes(), 0, bytes, 0, commandNameLength); 

        System.arraycopy(body, 0, bytes, commandNameLength, body.length); 

        return bytes; 

    } 

}

清单 5所示,AbstractTetrisCommand中定义了 3 个抽象方法:getNamebodyToBytesbodyFromBytes,分别用来获取消息的名称、把消息的主体转换成字节数组和从字节数组中构建消息。bodyToBytes对应于前面提到的编码过程,而bodyFromBytes对应于解码过程。每种具体的消息都应该实现这 3 个方法。AbstractTetrisCommand中的方法toBytes封装了把消息的主体转换成字节数组的逻辑,在字节数组中,首先是长度固定为Constants.COMMAND_NAME_LENGTH的消息类别名称,紧接着是每种消息特定的主体内容,由bodyToBytes方法来生成。

在介绍完示例应用中的消息格式之后,下面将讨论具体的编码解码过程。编码过程由编码器来完成,编码器需要实现org.apache.mina.filter.codec.ProtocolEncoder接口,一般来说继承自org.apache.mina.filter.codec.ProtocolEncoderAdapter并覆写所需的方法即可。清单 6中给出了示例应用中消息编码器CommandEncoder的实现。


清单 6. 联机游戏示例应用中消息编码器 CommandEncoder

public class CommandEncoder extends ProtocolEncoderAdapter { 

    public void encode(IoSession session, Object message, 

        ProtocolEncoderOutput out) throws Exception { 

        AbstractTetrisCommand command = (AbstractTetrisCommand) message; 

        byte[] bytes = command.toBytes(); 

        IoBuffer buf = IoBuffer.allocate(bytes.length, false); 

                

        buf.setAutoExpand(true); 

        buf.putInt(bytes.length); 

        buf.put(bytes); 

                

        buf.flip(); 

        out.write(buf); 

    } 

}

清单 6中,encode方法封装了编码的逻辑。由于AbstractTetrisCommandtoBytes已经完成了到字节数组的转换,encode方法直接使用即可。首先写入消息主体字节数组的长度,再是字节数组本身,就完成了编码的过程。

与编码过程相比,解码过程要相对复杂一些。具体的实现如清单 7所示。


清单 7. 联机游戏示例应用中消息解码器 CommandDecoder

public class CommandDecoder extends CumulativeProtocolDecoder { 

    protected boolean doDecode(IoSession session, IoBuffer in, 

            ProtocolDecoderOutput out) throws Exception { 

        if (in.prefixedDataAvailable(4, Constants.MAX_COMMAND_LENGTH)) { 

            int length = in.getInt(); 

            byte[] bytes = new byte[length]; 

            in.get(bytes); 

            int commandNameLength = Constants.COMMAND_NAME_LENGTH; 

            byte[] cmdNameBytes = new byte[commandNameLength]; 

            System.arraycopy(bytes, 0, cmdNameBytes, 0, commandNameLength); 

            String cmdName = StringUtils.trim(new String(cmdNameBytes)); 

            AbstractTetrisCommand command = TetrisCommandFactory 

                .newCommand(cmdName); 

            if (command != null) { 

                byte[] cmdBodyBytes = new byte[length - commandNameLength]; 

                System.arraycopy(bytes, commandNameLength, cmdBodyBytes, 0, 

                    length - commandNameLength); 

                command.bodyFromBytes(cmdBodyBytes); 

                out.write(command); 

            } 

            return true; 

        } else { 

            return false; 

        } 

    } 

}

清单 7中可以看到,解码器CommandDecoder继承自CumulativeProtocolDecoder。这是 Apache MINA 提供的一个帮助类,它会自动缓存所有已经接收到的数据,直到编码器认为可以开始进行编码。这样在实现自己的编码器的时候,就只需要考虑如何判断消息的边界即可。如果一条消息的后续数据还没有接收到,CumulativeProtocolDecoder会自动进行缓存。在之前提到过,解码过程的一个重要问题是判断消息的边界。对于固定长度的消息来说,只

抱歉!评论已关闭.