1.实体类
package org.senssic.mina; import java.io.Serializable; public class Person implements Serializable { private static final long serialVersionUID = 1L; private String name; private int age; private String address; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } @Override public String toString() { // TODO Auto-generated method stub return "姓名:" + name + "年龄:" + age + "地址:" + address; } }
2.服务端类
package org.senssic.mina; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; class myServiceHandler extends IoHandlerAdapter { // 当一个客户端接入时候 @Override public void sessionOpened(IoSession session) throws Exception { System.out.println(session.getRemoteAddress() + "--->链接了"); } // 当客户端关闭时候 @Override public void sessionClosed(IoSession session) throws Exception { System.out.println(session.getRemoteAddress() + "--->断开了"); } @Override public void messageReceived(IoSession session, Object message) throws Exception { String str = message.toString(); System.out.println("发送过来的消息是" + str); session.write(str); } @Override public void exceptionCaught(IoSession session, Throwable cause) { System.out.println("服务端报异常了,异常原因" + cause); } @Override public void messageSent(IoSession session, Object message) throws Exception { System.out.println("服务端发送信息成功...,发送信息为:" + message.toString()); session.close(true);// 短连接,服务器发送成功后主动与客户端断开连接(一般提倡从服务端断开连接) } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { System.out.println("服务端进入空闲状态..."); } } public class MinaTest { public static void main(String[] args) throws Exception { // 1.编写IoAcceptor IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getSessionConfig().setReadBufferSize(2048);// 读取数据缓冲区的大小 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);// 读写通道10内误操作就进入空闲状态 // 2.编写过滤器 // 主意此处使用时mina的Object编码解码, // 客户端如果使用其他语言则必须遵循TCP报文编写 acceptor.getFilterChain().addLast("logger", new LoggingFilter()); acceptor.getFilterChain().addLast("senssicChin", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); // 设置自定义的编码解码过滤器 // 添加过滤器 // acceptor.getFilterChain().addLast( // "codec", // new ProtocolCodecFilter(new MyTextLineCodecFactory(Charset // .forName("utf-8"), "\r\n"))); acceptor.setHandler(new myServiceHandler()); acceptor.bind(new InetSocketAddress(9988)); System.out.println("服务端启动成功... 端口号为:" + 9988); } }
3.客户端类
package org.senssic.mina; import java.net.InetSocketAddress; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; //IoHandlerAdapter方法的说明 //sessionCreated:当一个新的连接建立时,由I/O processor thread调用; //sessionOpened:当连接打开是调用; //messageReceived:当接收了一个消息时调用; //messageSent:当一个消息被(IoSession#write)发送出去后调用; //sessionIdle:当连接进入空闲状态时调用; //sessionClosed:当连接关闭时调用; //exceptionCaught:当实现IoHandler的类抛出异常时调用; class MainServiceHandler extends IoHandlerAdapter { @Override public void messageReceived(IoSession session, Object message) throws Exception { System.out.println("服务端发过来的消息" + message.toString()); // session.write(str); } @Override public void sessionOpened(IoSession session) throws Exception { Person person = new Person(); person.setAddress("安徽省合肥市"); person.setAge(20); person.setName("senssic"); session.write(person); } } public class MinaClient { public static void main(String[] args) throws Exception { // 1.编写IoConnector 客户端 IoConnector conner = new NioSocketConnector(); conner.setConnectTimeoutMillis(30000);// 设置超时时间 // 2.编写过滤器 conner.getFilterChain().addLast("senssicChina", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); conner.getFilterChain().addBefore("senssicChina", "loggerchina", new LoggingFilter()); // 设置自定义的编码解码过滤器 // conner.getFilterChain().addLast( // "codec", // new ProtocolCodecFilter(new MyTextLineCodecFactory(Charset // .forName("utf-8"), "\r\n"))); conner.setHandler(new MainServiceHandler()); conner.connect(new InetSocketAddress(9988)); // 由于connect访问服务器时候为异步执行的方法即立即返回无论连接是否成功想要使用同步获取IoSession可以做如下操作 // ConnectFuture future = conner.connect(new InetSocketAddress(9988)); // future.awaitUninterruptibly();// 同步 // IoSession session = future.getSession(); // session.getCloseFuture().awaitUninterruptibly();// 等待连接断开 // conner.dispose(); } }
4.编码类
package org.senssic.mina; import java.nio.charset.Charset; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoder; import org.apache.mina.filter.codec.ProtocolEncoderOutput; public class MyTextLineCodecEncoder implements ProtocolEncoder { private Charset charset; // 编码格式 private String delimiter; // 文本分隔符 public MyTextLineCodecEncoder(Charset charset, String delimiter) { this.charset = charset; this.delimiter = delimiter; } @Override public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { if (delimiter == null || "".equals(delimiter)) { // 如果文本换行符未指定,使用默认值 delimiter = "\r\n"; } if (charset == null) { charset = Charset.forName("utf-8"); } String value = message.toString(); IoBuffer buf = IoBuffer.allocate(value.length()).setAutoExpand(true); buf.putString(value, charset.newEncoder()); // 真实数据 buf.putString(delimiter, charset.newEncoder()); // 文本换行符 buf.flip(); out.write(buf); } @Override public void dispose(IoSession session) throws Exception { } }
5.解码类
package org.senssic.mina; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; public class MyTextLineCodecDecoder implements ProtocolDecoder { private Charset charset; // 编码格式 private String delimiter; // 文本分隔符 private IoBuffer delimBuf; // 文本分割符匹配的变量 // 定义常量值,作为每个IoSession中保存解码任务的key值 private static String CONTEXT = MyTextLineCodecDecoder.class.getName() + ".context"; // 构造函数,必须指定Charset和文本分隔符 public MyTextLineCodecDecoder(Charset charset, String delimiter) { this.charset = charset; this.delimiter = delimiter; } @Override public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { Context ctx = getContext(session); if (delimiter == null || "".equals(delimiter)) { // 如果文本换行符未指定,使用默认值 delimiter = "\r\n"; } if (charset == null) { charset = Charset.forName("utf-8"); } decodeNormal(ctx, in, out); } // 从IoSession中获取Context对象 private Context getContext(IoSession session) { Context ctx; ctx = (Context) session.getAttribute(CONTEXT); if (ctx == null) { ctx = new Context(); session.setAttribute(CONTEXT, ctx); } return ctx; } // 解码 private void decodeNormal(Context ctx, IoBuffer in, ProtocolDecoderOutput out) throws CharacterCodingException { // 取出未完成任务中已经匹配的文本换行符的个数 int matchCount = ctx.getMatchCount(); // 设置匹配文本换行符的IoBuffer变量 if (delimBuf == null) { IoBuffer tmp = IoBuffer.allocate(2).setAutoExpand(true); tmp.putString(delimiter, charset.newEncoder()); tmp.flip(); delimBuf = tmp; } int oldPos = in.position(); // 解码的IoBuffer中数据的原始信息 int oldLimit = in.limit(); while (in.hasRemaining()) { // 变量解码的IoBuffer byte b = in.get(); if (delimBuf.get(matchCount) == b) { // 匹配第matchCount位换行符成功 matchCount++; if (matchCount == delimBuf.limit()) { // 当前匹配到字节个数与文本换行符字节个数相同,匹配结束 int pos = in.position(); // 获得当前匹配到的position(position前所有数据有效) in.limit(pos); in.position(oldPos); // position回到原始位置 ctx.append(in); // 追加到Context对象未完成数据后面 in.limit(oldLimit); // in中匹配结束后剩余数据 in.position(pos); IoBuffer buf = ctx.getBuf(); buf.flip(); buf.limit(buf.limit() - matchCount);// 去掉匹配数据中的文本换行符 try { out.write(buf.getString(ctx.getDecoder())); // 输出解码内容 } finally { buf.clear(); // 释放缓存空间 } oldPos = pos; matchCount = 0; } } else { // 如果matchCount==0,则继续匹配 // 如果matchCount>0,说明没有匹配到文本换行符的中的前一个匹配成功字节的下一个字节, // 跳转到匹配失败字符处,并置matchCount=0,继续匹配 in.position(in.position() - matchCount); matchCount = 0; // 匹配成功后,matchCount置空 } } // 把in中未解码内容放回buf中 in.position(oldPos); ctx.append(in); ctx.setMatchCount(matchCount); } @Override public void dispose(IoSession session) throws Exception { } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { } // 内部类,保存IoSession解码时未完成的任务 private class Context { // IoBuffer的简单理解,它就是个可变长度的byte数组! // 1. static IoBuffer allocate(int capacity,boolean useDirectBuffer) // 创建IoBuffer实例,第一个参数指定初始化容量,第二个参数指定使用直接缓冲区还是JAVA 内存堆的缓存区,默认为false。 // 2.IoBuffer setAutoExpand(boolean autoExpand) // 这个方法设置IoBuffer 为自动扩展容量,也就是前面所说的长度可变,那么可以看出长度可变这个特性默认是不开启的。 // 3.IoBuffer flip() // limit=position , // position=0,重置mask,为了读取做好准备,一般是结束buf操作,将buf写入输出流时调用;这个必须要调用,否则极有可能position!=limit,导致position后面没有数据;每次写入数据到输出流时,必须确保position=limit。 // 4.IoBuffer clear()与IoBuffer reset() // clear:limit=capacity , // position=0,重置mark;它是不清空数据,但从头开始存放数据做准备---相当于覆盖老数据。 // reset就是清空数据 // 5. int remaining()与boolean hasRemaining() // 这两个方法一般是在调用了flip()后使用的,remaining()是返回limt-position的值!hasRemaining()则是判断当前是否有数据,返回position // < limit的boolean值! private final CharsetDecoder decoder; private final IoBuffer buf; // 保存真实解码内容 private int matchCount = 0; // 匹配到的文本换行符个数 private Context() { decoder = charset.newDecoder(); buf = IoBuffer.allocate(80).setAutoExpand(true); } // 重置 public void reset() { matchCount = 0; decoder.reset(); } // 追加数据 public void append(IoBuffer in) { getBuf().put(in); } // ======get/set方法===================== public CharsetDecoder getDecoder() { return decoder; } public IoBuffer getBuf() { return buf; } public int getMatchCount() { return matchCount; } public void setMatchCount(int matchCount) { this.matchCount = matchCount; } } // end class Context; }
6.编码解码工厂类
package org.senssic.mina; import java.nio.charset.Charset; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; public class MyTextLineCodecFactory implements ProtocolCodecFactory { private final Charset charset; // 编码格式 private final String delimiter; // 文本分隔符 public MyTextLineCodecFactory(Charset charset, String delimiter) { this.charset = charset; this.delimiter = delimiter; } @Override public ProtocolDecoder getDecoder(IoSession session) throws Exception { return new MyTextLineCodecDecoder(charset, delimiter); } @Override public ProtocolEncoder getEncoder(IoSession session) throws Exception { return new MyTextLineCodecEncoder(charset, delimiter); } }
使用到的jar包
1.mina-core-2.0.7.jar
2.slf4j-api-1.6.6.jar
3.slf4j-simple-1.6.6.jar