前面见到过的Request和Response类型的socket都是同步的:
例如Request,必须先要执行send,将请求发送出去,接着执行recv等待返回。。。
这种最大的好处就是编程代码逻辑简单,坏处就是伸缩性比较差。。。
于是就有了实现异步Request/Response的需求。。。那接下来来看看在ZeroMQ中是如何实现的吧:
这个是官方的guid文档上面给出的构原理,可以从图中看到这里面全是采用的异步类型的socket,Router与Dealer,
这里用Dealer来充当client,同时用于处理请求的worker也是dealer类型的,
哎,其实ZeroMQ只是提供了一种基础的通信模式,可以在此基础上灵活的使用,满足各种条件的要求,这里就和直接将上面的实现代码贴出来吧:
package asyncReqRep; import java.util.ArrayList; import java.util.List; import org.zeromq.ZFrame; import org.zeromq.ZMQ; import org.zeromq.ZMsg; public class Main { public static class Broker{ private ZMQ.Socket front; private ZMQ.Socket back; private ZMQ.Context context; public Broker() { this.context = ZMQ.context(1); this.front = this.context.socket(ZMQ.ROUTER); this.back = this.context.socket(ZMQ.DEALER); } public void start() { new Thread(new Runnable(){ public void run() { // TODO Auto-generated method stub front.bind("ipc://front"); back.bind("ipc://back"); ZMQ.Poller poller = new ZMQ.Poller(2); ZMQ.PollItem fItem = new ZMQ.PollItem(front, ZMQ.Poller.POLLIN); ZMQ.PollItem bItem =new ZMQ.PollItem(back, ZMQ.Poller.POLLIN); poller.register(fItem); poller.register(bItem); while (!Thread.currentThread().isInterrupted()) { poller.poll(); if (fItem.isReadable()) { ZMsg msg = ZMsg.recvMsg(fItem.getSocket()); msg.send(back); } if (bItem.isReadable()) { ZMsg msg = ZMsg.recvMsg(bItem.getSocket()); msg.send(front); } } } }).start(); } } public static class Client { private ZMQ.Context context; private ZMQ.Socket socket; public Client() { this.context = ZMQ.context(1); this.socket = context.socket(ZMQ.DEALER); } public void start() { new Thread(new Runnable(){ public void run() { // TODO Auto-generated method stub socket.connect("ipc://front"); for (int i = 0; i < 5; i++) { String now = "hello" + i; socket.send(now.getBytes(), 0); } for (int i = 0; i < 5; i++) { String back = new String(socket.recv(0)); System.out.println("recv response is : " + back); } } }).start(); } } public static class Worker { private ZMQ.Context context; private ZMQ.Socket socket; private List<ZMsg> requests; public Worker() { this.context = ZMQ.context(1); this.socket = context.socket(ZMQ.DEALER); this.requests = new ArrayList<ZMsg>(); } public void start() { new Thread(new Runnable(){ public void run() { // TODO Auto-generated method stub socket.connect("ipc://back"); for (int i = 0; i < 5; i++) { ZMsg msg = ZMsg.recvMsg(socket); requests.add(msg); } for (int i = 0; i < 5; i++) { ZMsg msg = requests.remove(0); ZFrame request = msg.removeLast(); String now = new String(request.getData()); System.out.println("recv request : " + now); ZFrame out = new ZFrame("world" + i); msg.addLast(out); msg.send(socket); } } }).start(); } } public static void main(String args[]) { Worker worker = new Worker(); Client client = new Client(); Broker broker = new Broker(); broker.start(); worker.start(); client.start(); } }
官方的guid的内容很多。。。介绍了各种各样的用法,接下来不打算再看guid了,觉得这些用法以后可能实际场合遇到了再去查也不迟。。。
接下来就开始看ZeroMQ的源码实现吧,当然看的是java版的。。。