服务端:
#include <assert.h> #include <iostream> #include <tchar.h> #include <zmq.h> using namespace std; #pragma comment(lib,"libzmq.lib") void main(int argc,TCHAR*argv[]) { void *ctx; ctx = zmq_init(1); assert(ctx); void *s; s = zmq_socket(ctx,ZMQ_REP); assert(s); int rc; rc = zmq_bind(s,"tcp://127.0.0.1:6000"); assert(0 == rc); int nCycles = 0; zmq_msg_t msg; while(1) { char *re_string = NULL; zmq_msg_init(&msg); zmq_recv(s,&msg,0); re_string = (char*)zmq_msg_data(&msg); cout<<"main recv : "<<re_string<<endl; zmq_msg_close(&msg); nCycles++; //接收四次数据后,让服务器退出,模拟服务器出故障 // if(nCycles > 4) // { // cout<<"Simulate a crash"<<endl; // break; // } //接收三次数据后,让服务器返回数据变慢,模拟客户端超时收不到数据 if(nCycles > 3) { cout<<"Simulate CPU overload"<<endl; Sleep(2000); } cout<<"Normal "<<endl; zmq_msg_init_size(&msg,strlen(re_string)+1); memcpy(zmq_msg_data(&msg),re_string,strlen(re_string)+1); zmq_send(s,&msg,0); zmq_msg_close(&msg); } zmq_close(s); zmq_term(ctx); }
客户端:
#include <assert.h> #include <iostream> #include <tchar.h> #include <zmq.h> using namespace std; #pragma comment(lib,"libzmq.lib") void main(int argc,TCHAR*argv[]) { void *ctx; ctx = zmq_init(1); assert(ctx); void *s; s = zmq_socket(ctx,ZMQ_REQ); assert(s); int rc; rc = zmq_connect(s,"tcp://127.0.0.1:6000"); assert(0 == rc); int left_retries = 3; int nsequence = 0; zmq_msg_t msg; while(left_retries) { char sd_string[20] = {0}; sprintf(sd_string,"num: %d",nsequence++); zmq_msg_init_size(&msg,strlen(sd_string)+1); memcpy(zmq_msg_data(&msg),sd_string,strlen(sd_string)+1); zmq_send(s,&msg,0); zmq_msg_close(&msg); Sleep(1000); bool expect_reply = true; while(expect_reply) { zmq_pollitem_t items = {s,0,ZMQ_POLLIN,0}; zmq_poll(&items,1,3000); if(items.revents & ZMQ_POLLIN) { char *re_string = NULL; zmq_msg_init(&msg); zmq_recv(s,&msg,0); re_string = (char*)zmq_msg_data(&msg); zmq_msg_close(&msg); if(0 == strcmp(sd_string,re_string)) { cout<<"server reply ok : "<<re_string<<endl; left_retries = 3; expect_reply = false; } else { cout<<"recv error data from server"<<endl; } } else if(--left_retries == 0) { cout<<"Server offline,abandonint"<<endl; expect_reply = false; break; } else { cout<<"no response from server,retrying..."<<endl; zmq_close(s); s = zmq_socket(ctx,ZMQ_REQ); assert(s); rc = zmq_connect(s,"tcp://127.0.0.1:6000"); assert(0 == rc); zmq_msg_init_size(&msg,strlen(sd_string)+1); memcpy(zmq_msg_data(&msg),sd_string,strlen(sd_string)+1); zmq_send(s,&msg,0); zmq_msg_close(&msg); } } } zmq_close(s); zmq_term(ctx); }