现在的位置: 首页 > 综合 > 正文

ZeroMQ(java)之异步req/rep

2014年07月04日 ⁄ 综合 ⁄ 共 2641字 ⁄ 字号 评论关闭

前面见到过的Request和Response类型的socket都是同步的:

例如Request,必须先要执行send,将请求发送出去,接着执行recv等待返回。。。

这种最大的好处就是编程代码逻辑简单,坏处就是伸缩性比较差。。。

于是就有了实现异步Request/Response的需求。。。那接下来来看看在ZeroMQ中是如何实现的吧:



这个是官方的guid文档上面给出的构原理,可以从图中看到这里面全是采用的异步类型的socket,Router与Dealer,

这里用Dealer来充当client,同时用于处理请求的worker也是dealer类型的,

哎,其实ZeroMQ只是提供了一种基础的通信模式,可以在此基础上灵活的使用,满足各种条件的要求,这里就和直接将上面的实现代码贴出来吧:

package asyncReqRep;


import java.util.ArrayList;
import java.util.List;

import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

public class Main {
	public static class Broker{
		private ZMQ.Socket front;
		private ZMQ.Socket back;
		private ZMQ.Context context;
		
		public Broker() {
			this.context = ZMQ.context(1);
			this.front = this.context.socket(ZMQ.ROUTER);
			this.back = this.context.socket(ZMQ.DEALER);
		}
		
		public void start() {
			new Thread(new Runnable(){

				public void run() {
					
					// TODO Auto-generated method stub
					front.bind("ipc://front");
					back.bind("ipc://back");
					ZMQ.Poller poller = new ZMQ.Poller(2);
					ZMQ.PollItem fItem = new ZMQ.PollItem(front, ZMQ.Poller.POLLIN);
					ZMQ.PollItem bItem =new ZMQ.PollItem(back, ZMQ.Poller.POLLIN);
					poller.register(fItem);
					poller.register(bItem);
					
					while (!Thread.currentThread().isInterrupted()) {
						
						poller.poll();
						if (fItem.isReadable()) {
							ZMsg msg = ZMsg.recvMsg(fItem.getSocket());
							msg.send(back);
						}
						if (bItem.isReadable()) {
							ZMsg msg = ZMsg.recvMsg(bItem.getSocket());
							msg.send(front);
						}
					}
				}
				
			}).start();
		}
	}
	
	public static class Client {
		private ZMQ.Context context;
		private ZMQ.Socket socket;
		
		public Client() {
			this.context = ZMQ.context(1);
			this.socket = context.socket(ZMQ.DEALER);
		}
		
		public void start() {
			new Thread(new Runnable(){

				public void run() {
					// TODO Auto-generated method stub
					socket.connect("ipc://front");
					for (int i = 0; i < 5; i++) {
						String now = "hello" + i;
						socket.send(now.getBytes(), 0);
					}
					for (int i = 0; i < 5; i++) {
						String back = new String(socket.recv(0));
						System.out.println("recv response is : " + back);
					}
					
				}
				
			}).start();
		}
	}
	
	
	public static class Worker {
		private ZMQ.Context context;
		private ZMQ.Socket socket;
		private List<ZMsg> requests;
		
		public Worker() {
			this.context = ZMQ.context(1);
			this.socket = context.socket(ZMQ.DEALER);
			this.requests = new ArrayList<ZMsg>(); 
		}
		
		public void start() {
			new Thread(new Runnable(){

				public void run() {
					// TODO Auto-generated method stub
					socket.connect("ipc://back");
					for (int i = 0; i < 5; i++) {
						ZMsg msg = ZMsg.recvMsg(socket);
						requests.add(msg);
					}
					for (int i = 0; i < 5; i++) {
						ZMsg msg = requests.remove(0);
						ZFrame request = msg.removeLast();
						String now = new String(request.getData());
						System.out.println("recv request : " + now);
						ZFrame out = new ZFrame("world" + i);
						msg.addLast(out);
						msg.send(socket);
						
					}
					
				}
				
			}).start();
		}
	}
	
	public static void main(String args[]) {
		Worker worker = new Worker();
		Client client = new Client();
		Broker broker = new Broker();
		broker.start();
		worker.start();
		client.start();
	}
}

官方的guid的内容很多。。。介绍了各种各样的用法,接下来不打算再看guid了,觉得这些用法以后可能实际场合遇到了再去查也不迟。。。

接下来就开始看ZeroMQ的源码实现吧,当然看的是java版的。。。

抱歉!评论已关闭.