ZeroMQ 中规定了几个pattern用法,除此之外,不能乱用。
publish/subscribe, 一个one-to-many的消息发布。一个publisher,多个subscriber。
如果pub发布消息的时候,sub没有连接上来,则此消息,sub是收不到的。
注意:subscribe 初始化的时候一定要设置filter,否则收不到任何消息。
ServerPub端:
#include <assert.h> #include <iostream> #include <tchar.h> #include <zmq.h> using namespace std; #pragma comment(lib,"libzmq.lib") static int g_send(void *socket,char *str) { zmq_msg_t msg; int rc; rc = zmq_msg_init_size(&msg,strlen(str)+1); assert(0 == rc); memcpy(zmq_msg_data(&msg),str,strlen(str)+1); rc = zmq_send(socket,&msg,0); assert(0 == rc); zmq_msg_close(&msg); return rc; } static char* g_recv(void *socket) { zmq_msg_t msg; int rc; rc = zmq_msg_init(&msg); assert(0 == rc); rc = zmq_recv(socket,&msg,0); assert(0 == rc); int nSize = zmq_msg_size(&msg); char *pBuf = new char[nSize+1]; memcpy(pBuf,zmq_msg_data(&msg),nSize); pBuf[nSize] = 0; zmq_msg_close(&msg); return pBuf; } void sub_thread(void *ctx) { void *subsocket = zmq_socket(ctx,ZMQ_SUB); zmq_connect(subsocket,"inproc://in-pub"); zmq_setsockopt(subsocket,ZMQ_SUBSCRIBE,"",0); //no filter cout<<"begin in-sub"<<endl; while(1) { char *pStr = g_recv(subsocket); cout<<"in-sub:"<<pStr<<endl; free(pStr); } } void main(int argc,TCHAR*argv[]) { void *ctx; ctx = zmq_init(1); assert(ctx); void *pubsocket; pubsocket = zmq_socket(ctx,ZMQ_PUB); assert(pubsocket); zmq_bind(pubsocket,"inproc://in-pub"); zmq_bind(pubsocket,"tcp://127.0.0.1:8888"); HANDLE hThread; hThread = CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)sub_thread,ctx,0,NULL); cout<<"begin pu"<<endl; int i=0; while(1) { char msg[80]; sprintf(msg,"num %d",i++); g_send(pubsocket,msg); Sleep(2000); } zmq_close(pubsocket); zmq_term(ctx); }
ClientSub端:
#include <assert.h> #include <iostream> #include <tchar.h> #include <zmq.h> using namespace std; #pragma comment(lib,"libzmq.lib") static int g_send(void *socket,char *str) { zmq_msg_t msg; int rc; rc = zmq_msg_init_size(&msg,strlen(str)+1); assert(0 == rc); memcpy(zmq_msg_data(&msg),str,strlen(str)+1); rc = zmq_send(socket,&msg,0); assert(0 == rc); zmq_msg_close(&msg); return rc; } static char* g_recv(void *socket) { zmq_msg_t msg; int rc; rc = zmq_msg_init(&msg); assert(0 == rc); rc = zmq_recv(socket,&msg,0); assert(0 == rc); int nSize = zmq_msg_size(&msg); char *pBuf = new char[nSize+1]; memcpy(pBuf,zmq_msg_data(&msg),nSize); pBuf[nSize] = 0; zmq_msg_close(&msg); return pBuf; } void sub_thread(void *ctx) { void *subsocket; subsocket = zmq_socket(ctx,ZMQ_SUB); assert(subsocket); zmq_connect(subsocket,"tcp://127.0.0.1:8888"); zmq_setsockopt(subsocket,ZMQ_SUBSCRIBE,"",0); while(1) { char *pStr = g_recv(subsocket); cout<<"client in-sub:"<<pStr<<endl; free(pStr); } } void main(int argc,TCHAR*argv[]) { void *ctx; ctx = zmq_init(1); assert(ctx); void *subsocket; subsocket = zmq_socket(ctx,ZMQ_SUB); assert(subsocket); HANDLE hThread; hThread = CreateThread(NULL,1,(LPTHREAD_START_ROUTINE)sub_thread,ctx,0,NULL); zmq_connect(subsocket,"tcp://127.0.0.1:8888"); zmq_setsockopt(subsocket,ZMQ_SUBSCRIBE,"",0); cout<<"begin sub"<<endl; while(1) { char *pStr = g_recv(subsocket); cout<<"client sub:"<<pStr<<endl; free(pStr); } zmq_close(subsocket); zmq_term(ctx); }