现在的位置: 首页 > 综合 > 正文

ZeroMQ Lazy Pirate Pattern

2018年10月07日 ⁄ 综合 ⁄ 共 2253字 ⁄ 字号 评论关闭

 服务端:

 

#include <assert.h>
#include <iostream>
#include <tchar.h>
#include <zmq.h>
using namespace std;

#pragma comment(lib,"libzmq.lib")

void main(int argc,TCHAR*argv[])
{
	void *ctx;
	ctx = zmq_init(1);
	assert(ctx);

	void *s;
	s = zmq_socket(ctx,ZMQ_REP);
	assert(s);

	int rc;
	rc = zmq_bind(s,"tcp://127.0.0.1:6000");
	assert(0 == rc);

	int nCycles = 0;
	zmq_msg_t msg;
	while(1)
	{
		char *re_string = NULL;
		zmq_msg_init(&msg);
		zmq_recv(s,&msg,0);
		re_string = (char*)zmq_msg_data(&msg);
		cout<<"main recv : "<<re_string<<endl;
		zmq_msg_close(&msg);

		nCycles++;

		//接收四次数据后,让服务器退出,模拟服务器出故障
// 		if(nCycles > 4)
// 		{
// 			cout<<"Simulate a crash"<<endl;
// 			break;
// 		}
		//接收三次数据后,让服务器返回数据变慢,模拟客户端超时收不到数据
		if(nCycles > 3)
		{
			cout<<"Simulate CPU overload"<<endl;
			Sleep(2000);
		}

		cout<<"Normal "<<endl;

		zmq_msg_init_size(&msg,strlen(re_string)+1);
		memcpy(zmq_msg_data(&msg),re_string,strlen(re_string)+1);
		zmq_send(s,&msg,0);
		zmq_msg_close(&msg);
	}


	zmq_close(s);
	zmq_term(ctx);
}

 

客户端:

#include <assert.h>
#include <iostream>
#include <tchar.h>
#include <zmq.h>
using namespace std;

#pragma comment(lib,"libzmq.lib")

void main(int argc,TCHAR*argv[])
{
	void *ctx;
	ctx = zmq_init(1);
	assert(ctx);

	void *s;
	s = zmq_socket(ctx,ZMQ_REQ);
	assert(s);

	int rc;
	rc = zmq_connect(s,"tcp://127.0.0.1:6000");
	assert(0 == rc);

   
	int left_retries = 3;

	int nsequence = 0;
	zmq_msg_t  msg;
	while(left_retries)
	{
		char sd_string[20] = {0};
		sprintf(sd_string,"num: %d",nsequence++);
		zmq_msg_init_size(&msg,strlen(sd_string)+1);
		memcpy(zmq_msg_data(&msg),sd_string,strlen(sd_string)+1);
		zmq_send(s,&msg,0);
		zmq_msg_close(&msg);
		Sleep(1000);

		bool expect_reply = true;
 		while(expect_reply)
 		{
			zmq_pollitem_t items = {s,0,ZMQ_POLLIN,0};
			zmq_poll(&items,1,3000);

			if(items.revents & ZMQ_POLLIN)
			{
				char *re_string = NULL;
				zmq_msg_init(&msg);
				zmq_recv(s,&msg,0);
				re_string = (char*)zmq_msg_data(&msg);
				zmq_msg_close(&msg);

				if(0 == strcmp(sd_string,re_string))
				{
					cout<<"server reply ok : "<<re_string<<endl;
					left_retries = 3;
					expect_reply = false;
				}
				else
				{
					cout<<"recv error data from server"<<endl;
				}

 			}
			else if(--left_retries == 0)
			{
				cout<<"Server offline,abandonint"<<endl;
				expect_reply = false;
				break;
			}
			else
			{
				cout<<"no response from server,retrying..."<<endl;
				zmq_close(s);
				s = zmq_socket(ctx,ZMQ_REQ);
				assert(s);

				rc = zmq_connect(s,"tcp://127.0.0.1:6000");
				assert(0 == rc);

				zmq_msg_init_size(&msg,strlen(sd_string)+1);
				memcpy(zmq_msg_data(&msg),sd_string,strlen(sd_string)+1);
				zmq_send(s,&msg,0);
				zmq_msg_close(&msg);
			}
		}
		
	}


	zmq_close(s);
	zmq_term(ctx);
}

 

【上篇】
【下篇】

抱歉!评论已关闭.