现在的位置: 首页 > 综合 > 正文

解释一下粘包和拆包的原理

2019年05月23日 ⁄ 综合 ⁄ 共 5433字 ⁄ 字号 评论关闭

着重强调一下:缓冲区的复用,当满足解析为一条消息的时候在解析,不行break

package com.sof.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

import com.sof.bas.Bytes2util;
import com.sof.bas.Util2Bytes;

final public class Handler implements Runnable {

	ByteBuffer input = ByteBuffer.allocate(1024);
	final SocketChannel socket;
	final SelectionKey sk;

	static final int MESSAGE_LENGTH_HEAD = 4;
	byte[] head = new byte[4];
	int bodylen = -1;

	public Handler(Selector selector, SocketChannel socket) throws IOException {
		this.socket = socket;
		socket.configureBlocking(false);
		sk = socket.register(selector, 0);
		sk.attach(this);
		sk.interestOps(SelectionKey.OP_READ);
		selector.wakeup();
	}

	public void run() {
		try {
			read();
		} catch (IOException ex) {
			try {
				socket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}

			sk.cancel();
		}
	}

	public synchronized void read() throws IOException {
		socket.read(input);
		input.flip();

		// 读取数据的原则: 要么读取一个完整的包头,要么读取一个完整包体。不满足这两种情况,不对ByteBuffer进行任何的get操作
		// 但是要注意可能发生上次读取了一个完整的包头,下次读才读取一个完整包体情况。
		// 所以包头部分必须用类的成员变量进行暂时的存储,当完整读取包头和包体后,在给业务处理部分。

		while (input.remaining() > 0) {
			if (bodylen < 0) // 还没有生成完整的包头部分,
								// 该变量初始值为-1,并且在拼凑一个完整的消息包以后,再将该值设置为-1
			{
				if (input.remaining() >= MESSAGE_LENGTH_HEAD) // ByteBuffer缓冲区的字节数够拼凑一个包头
				{
					input.get(head, 0, 4);
					bodylen = Util2Bytes.bytes2bigint(head);
					System.out.println(bodylen);
				} else// ByteBuffer缓冲区的字节数不够拼凑一个包头,什么操作都不做,退出这次处理,继续等待
				{

					break;
				}
			} else if (bodylen > 0) // 包头部分已经完整生成.
			{
				if (input.remaining() >= bodylen) // 缓冲区的内容够一个包体部分
				{
					byte[] body = new byte[bodylen];
					input.get(body, 0, bodylen);
					byte[] headandbody = new byte[MESSAGE_LENGTH_HEAD + bodylen];
					bodylen = -1;
					System.out.println(new String(body));
				} else // /缓冲区的内容不够一个包体部分,继续等待,跳出循环等待下次再出发该函数
				{
					System.out.println("5: remain=" + input.remaining()
							+ " bodylen=" + bodylen);
					break;
				}
			} else if (bodylen == 0) // 没有包体部分,仅仅有包头的情况
			{
				byte[] headandbody = new byte[MESSAGE_LENGTH_HEAD + bodylen];
				System.arraycopy(head, 0, headandbody, 0, head.length);
				Bytes2util.outputHex(headandbody, 16);
				bodylen = -1;
			}
		}

		sk.interestOps(SelectionKey.OP_READ);
	}
}

package com.sof.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.logging.Logger;

import com.sof.nio.Handler;

public class Reactor implements Runnable {

	final Selector selector;
	final ServerSocketChannel serverSocket;

	public Reactor(String ip, int port) throws IOException {
		selector = Selector.open();
		serverSocket = ServerSocketChannel.open();
		serverSocket.socket().bind(new InetSocketAddress(port));
		serverSocket.configureBlocking(false);
		SelectionKey sk = serverSocket.register(selector,
				SelectionKey.OP_ACCEPT);
		sk.attach(new Acceptor());
	}

	public void run() {
		try {
			while (!Thread.interrupted()) {

				selector.select();
				Set<SelectionKey> keys = selector.selectedKeys();
				if (keys.size() == 0) {

					continue;
				}

				for (SelectionKey key : keys) {
					if (key.isAcceptable()) {
						System.out.println("isAcceptable");
					} else if (key.isReadable()) {
						System.out.println("isReadable");
					} else if (key.isWritable()) {
						System.out.println("isWritable");
					} else {
						System.out.println("wu");
					}
					dispatch((SelectionKey) key);
				}
				keys.clear();
			}
		} catch (IOException ex) {

			ex.printStackTrace();
		}
	}

	void dispatch(SelectionKey k) {
		Runnable r = (Runnable) (k.attachment());
		if (r != null) {
			r.run();
		}
	}

	public class Acceptor implements Runnable {
		public synchronized void run() {
			try {
				SocketChannel c = serverSocket.accept();

				if (c != null) {
					new Handler(selector, c);
				}
			} catch (IOException ex) {
				ex.printStackTrace();
			}
		}
	}

	public static void main(String[] args) {
		try {
			new Thread(new Reactor(null, 9999)).start();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

package com.sof.bas;

public class Bytes2util {

	public static byte[] biginttobytes(int value) {
		byte[] stream = new byte[4];
		for (int i = 0; i < 4; i++) {
			stream[i] = (byte) ((value & (0xFF << (4 - i - 1) * 8)) >> ((4 - i - 1) * 8));
		}
		outputHex(stream, 16);
		return stream;
	}

	public static byte[] bigshorttobytes(short value) {
		byte stream[] = new byte[2];
		for (int i = 0; i < 2; i++) {
			stream[i] = (byte) ((value & (0xFF << (2 - i - 1) * 8)) >> ((2 - i - 1) * 8));
		}
		outputHex(stream, 16);
		return stream;
	}

	public static byte[] smallinttobytes(int value) {
		byte stream[] = new byte[4];
		for (int i = 0; i < 4; i++) {
			stream[4 - i - 1] = (byte) ((value & (0xFF << (4 - i - 1) * 8)) >> ((4 - i - 1) * 8));
		}
		outputHex(stream, 16);
		return stream;
	}

	public static byte[] smallshorttobytes(short value) {
		byte stream[] = new byte[2];
		for (int i = 0; i < 2; i++) {
			stream[2 - i - 1] = (byte) ((value & (0xFF << (2 - i - 1) * 8)) >> ((2 - i - 1) * 8));
		}
		outputHex(stream, 16);
		return stream;
	}

	public static void outputHex(byte[] stream, int number) {
		String content = "stream display, length=" + stream.length + "\n";
		for (int i = 0; i < stream.length; i++) {
			if (i / number != 0 && i % number == 0) {
				content += "\n";
			}
			String tempstr = Integer.toHexString(stream[i] & 0xFF)
					.toUpperCase();
			if (tempstr.length() == 1)
				tempstr = "0" + tempstr;
			content += tempstr + " ";
		}
	}
}

package com.sof.bas;

public class Util2Bytes
{
	public static int bytes2smallint(byte stream[])
	{
		int value = 0;
		int temp = 0;
		for (int i = 3; i >= 0; i--)
		{
			if ((stream[i]) >= 0)
			{
				temp = stream[i];
			}
			else
			{
				temp = stream[i] + 256;
			}
			temp <<= (i * 8);
			value += temp;
		}
		return value;
	}

	public static short bytes2smallshort(byte stream[])
	{
		short value = 0;
		int temp = 0;
		for (int i = 1; i >= 0; i--)
		{
			if ((stream[i]) >= 0)
			{
				temp = stream[i];
			}
			else
			{
				temp = stream[i] + 256;
			}
			temp <<= (i * 8);
			value += temp;
		}
		return value;
	}

	public static int bytes2bigint(byte stream[])
	{
		int value = 0;
		int temp = 0;
		for (int i = 0; i < 4; i++)
		{
			if ((stream[i]) >= 0)
			{
				temp = stream[i];
			}
			else
			{
				temp = stream[i] + 256;
			}
			temp <<= ((4 - i - 1) * 8);
			value += temp;
		}
		return value;
	}

	public static short bytes2bigshort(byte stream[])
	{
		short value = 0;
		int temp = 0;
		for (int i = 0; i < 2; i++)
		{
			if ((stream[i]) >= 0)
			{
				temp = stream[i];
			}
			else
			{
				temp = stream[i] + 256;
			}
			temp <<= ((2 - i - 1) * 8);
			value += temp;
		}
		return value;
	}
}

抱歉!评论已关闭.