现在的位置: 首页 > 综合 > 正文

.[ZeroMQ] messaging pattern — publish/subscribe

2018年10月07日 ⁄ 综合 ⁄ 共 2855字 ⁄ 字号 评论关闭

 

ZeroMQ 中规定了几个pattern用法,除此之外,不能乱用。
publish/subscribe, 一个one-to-many的消息发布。一个publisher,多个subscriber。
如果pub发布消息的时候,sub没有连接上来,则此消息,sub是收不到的。
注意:subscribe 初始化的时候一定要设置filter,否则收不到任何消息。
 
ServerPub端:
#include <assert.h>
#include <iostream>
#include <tchar.h>
#include <zmq.h>
using namespace std;

#pragma comment(lib,"libzmq.lib")

static int g_send(void *socket,char *str)
{
	zmq_msg_t msg;
	int rc;

	rc = zmq_msg_init_size(&msg,strlen(str)+1);
	assert(0 == rc);

	memcpy(zmq_msg_data(&msg),str,strlen(str)+1);

	rc = zmq_send(socket,&msg,0);
	assert(0 == rc);

	zmq_msg_close(&msg);

	return rc;
}

static char* g_recv(void *socket)
{
	zmq_msg_t msg;
	int rc;

	rc = zmq_msg_init(&msg);
	assert(0 == rc);

	rc = zmq_recv(socket,&msg,0);
	assert(0 == rc);

	int nSize = zmq_msg_size(&msg);
	char *pBuf = new char[nSize+1];
	memcpy(pBuf,zmq_msg_data(&msg),nSize);
	pBuf[nSize] = 0;

	zmq_msg_close(&msg);

	return pBuf;
}

void sub_thread(void *ctx)
{
	void *subsocket = zmq_socket(ctx,ZMQ_SUB);
	zmq_connect(subsocket,"inproc://in-pub");
	zmq_setsockopt(subsocket,ZMQ_SUBSCRIBE,"",0); //no filter

	cout<<"begin in-sub"<<endl;
	while(1)
	{
		char *pStr = g_recv(subsocket);
		cout<<"in-sub:"<<pStr<<endl;
		free(pStr);
	}
}

void main(int argc,TCHAR*argv[])
{
	void *ctx;
	ctx = zmq_init(1);
	assert(ctx);

	void *pubsocket;
	pubsocket = zmq_socket(ctx,ZMQ_PUB);
	assert(pubsocket);

	zmq_bind(pubsocket,"inproc://in-pub");
	zmq_bind(pubsocket,"tcp://127.0.0.1:8888");

	HANDLE hThread;
	hThread = CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)sub_thread,ctx,0,NULL);

	cout<<"begin pu"<<endl;
	int i=0;
	while(1)
	{
		char msg[80];
		sprintf(msg,"num %d",i++);
		g_send(pubsocket,msg);
		Sleep(2000);
	}

	zmq_close(pubsocket);
	zmq_term(ctx);

}

ClientSub端:

#include <assert.h>
#include <iostream>
#include <tchar.h>
#include <zmq.h>
using namespace std;

#pragma comment(lib,"libzmq.lib")

static int g_send(void *socket,char *str)
{
	zmq_msg_t msg;
	int rc;

	rc = zmq_msg_init_size(&msg,strlen(str)+1);
	assert(0 == rc);

	memcpy(zmq_msg_data(&msg),str,strlen(str)+1);

	rc = zmq_send(socket,&msg,0);
	assert(0 == rc);

	zmq_msg_close(&msg);

	return rc;
}

static char* g_recv(void *socket)
{
	zmq_msg_t msg;
	int rc;

	rc = zmq_msg_init(&msg);
	assert(0 == rc);

	rc = zmq_recv(socket,&msg,0);
	assert(0 == rc);

	int nSize = zmq_msg_size(&msg);
	char *pBuf = new char[nSize+1];
	memcpy(pBuf,zmq_msg_data(&msg),nSize);
	pBuf[nSize] = 0;

	zmq_msg_close(&msg);

	return pBuf;
}

void sub_thread(void *ctx)
{
	void *subsocket;
	subsocket = zmq_socket(ctx,ZMQ_SUB);
	assert(subsocket);

	zmq_connect(subsocket,"tcp://127.0.0.1:8888");
	zmq_setsockopt(subsocket,ZMQ_SUBSCRIBE,"",0);

	while(1)
	{
		char *pStr = g_recv(subsocket);
		cout<<"client in-sub:"<<pStr<<endl;
		free(pStr);
	}
}

void main(int argc,TCHAR*argv[])
{
	void *ctx;
	ctx = zmq_init(1);
	assert(ctx);

	void *subsocket;
	subsocket = zmq_socket(ctx,ZMQ_SUB);
	assert(subsocket);

	HANDLE hThread;
	hThread = CreateThread(NULL,1,(LPTHREAD_START_ROUTINE)sub_thread,ctx,0,NULL);

	zmq_connect(subsocket,"tcp://127.0.0.1:8888");
	zmq_setsockopt(subsocket,ZMQ_SUBSCRIBE,"",0);

	cout<<"begin sub"<<endl;
	while(1)
	{
		char *pStr = g_recv(subsocket);
		cout<<"client sub:"<<pStr<<endl;
		free(pStr);
	}

	zmq_close(subsocket);
	zmq_term(ctx);
}

抱歉!评论已关闭.