数据可能很长,比如一个大文件,所以可以将数据分成多个 message 来发送,但逻辑上知道这些 message 应该合并成一个处理。
写了个小程序,fclient 上传一个文件给 fserver。
FSever端:
#include <assert.h> #include <iostream> #include <tchar.h> #include <zmq.h> using namespace std; #pragma comment(lib,"libzmq.lib") void recv_file(void *s) { zmq_msg_t msg; //Recv int rc; rc = zmq_msg_init(&msg); assert(0 == rc); rc = zmq_recv(s,&msg,0); const char *re_string; re_string = (const char*)zmq_msg_data(&msg); FILE *fp = fopen(re_string,"wb+"); if(!fp) { cout<<"文件打开失败."<<endl; return; } zmq_msg_close(&msg); //Send const char* se_string = "ok"; rc = zmq_msg_init_size(&msg,strlen(se_string)+1); assert(0 == rc); memcpy(zmq_msg_data(&msg),se_string,strlen(se_string)+1); rc = zmq_send(s,&msg,0); assert(0 == rc); zmq_msg_close(&msg); //Recv INT64 more; size_t more_size; while(1) { rc = zmq_msg_init(&msg); assert(0 == rc); rc = zmq_recv(s,&msg,0); assert(0 == rc); // re_string = (const char*)zmq_msg_data(&msg); // cout<<re_string<<endl; fwrite((const char*)zmq_msg_data(&msg),zmq_msg_size(&msg),1,fp); zmq_msg_close(&msg); zmq_getsockopt(s,ZMQ_RCVMORE,&more,&more_size); if(!more) break; } //Send rc = zmq_msg_init_size(&msg,strlen(se_string)+1); assert(0 == rc); memcpy(zmq_msg_data(&msg),se_string,strlen(se_string)+1); rc = zmq_send(s,&msg,0); assert(0 == rc); zmq_msg_close(&msg); //Close file fclose(fp); puts("done."); } void main(int argc,TCHAR*argv[]) { void *ctx; ctx = zmq_init(1); assert(ctx); void *s; s = zmq_socket(ctx,ZMQ_REP); assert(s); int rc; rc = zmq_bind(s,"tcp://*:5555"); assert(0 == rc); while(1) { recv_file(s); } }
FClient端:
#include <assert.h> #include <iostream> #include <tchar.h> #include <zmq.h> using namespace std; #pragma comment(lib,"libzmq.lib") #define ONE_BUFFER_SIZE 1024 void myFree(void *data,void *hint) { cout<<"MyFree"<<endl; free(data); } void main(int argc,char*argv[]) { const char* se_string = "111.bmp"; FILE *fp = fopen(se_string,"rb"); if(!fp) { cout<<"打开文件失败."<<endl; return; } //Init ctx and socket void *ctx; ctx = zmq_init(1); assert(ctx); void *s; s = zmq_socket(ctx,ZMQ_REQ); assert(s); int rc; rc = zmq_connect(s,"tcp://127.0.0.1:5555"); assert(0 == rc); //Send zmq_msg_t msg; rc = zmq_msg_init_size(&msg,strlen(se_string)+1); assert(0 == rc); memcpy(zmq_msg_data(&msg),se_string,strlen(se_string)+1); rc = zmq_send(s,&msg,0); assert(0 == rc); zmq_msg_close(&msg); //Recv rc = zmq_msg_init(&msg); assert(0 == rc); rc = zmq_recv(s,&msg,0); assert(0 == rc); const char* re_string; re_string = (const char*)zmq_msg_data(&msg); cout<<re_string<<endl; zmq_msg_close(&msg); //Send char *pBuf; int n = 0; while(1) { pBuf = (char*)malloc(ONE_BUFFER_SIZE); n = fread(pBuf,1,ONE_BUFFER_SIZE,fp); if(n < ONE_BUFFER_SIZE) { cout<<"client < 1024"<<endl; // rc = zmq_msg_init_size(&msg,strlen(se_string)+1); // assert(0 == rc); // // memcpy(zmq_msg_data(&msg),se_string,strlen(se_string)+1); rc = zmq_msg_init_data(&msg,pBuf,ONE_BUFFER_SIZE,myFree,NULL); assert(0 == rc); rc = zmq_send(s,&msg,0); assert(0 == rc); zmq_msg_close(&msg); break; } // rc = zmq_msg_init_size(&msg,strlen(se_string)+1); // assert(0 == rc); // // memcpy(zmq_msg_data(&msg),se_string,strlen(se_string)+1); rc = zmq_msg_init_data(&msg,pBuf,ONE_BUFFER_SIZE,myFree,NULL); assert(0 == rc); rc = zmq_send(s,&msg,ZMQ_SNDMORE); assert(0 == rc); zmq_msg_close(&msg); } //Recv rc = zmq_msg_init(&msg); assert(0 == rc); rc = zmq_recv(s,&msg,0); assert(0 == rc); re_string = (const char*)zmq_msg_data(&msg); cout<<re_string<<endl; zmq_msg_close(&msg); //Close ctx and socket zmq_close(s); zmq_term(ctx); fclose(fp); puts("done.client"); }