现在的位置: 首页 > 综合 > 正文

[ZeroMQ] multipart message

2018年10月07日 ⁄ 综合 ⁄ 共 3206字 ⁄ 字号 评论关闭

 数据可能很长,比如一个大文件,所以可以将数据分成多个 message 来发送,但逻辑上知道这些 message 应该合并成一个处理。

写了个小程序,fclient 上传一个文件给 fserver。
 
FSever端:
#include <assert.h>
#include <iostream>
#include <tchar.h>
#include <zmq.h>
using namespace std;

#pragma comment(lib,"libzmq.lib")


void recv_file(void *s)
{
	zmq_msg_t msg;

	//Recv
	int rc;
	rc = zmq_msg_init(&msg);
	assert(0 == rc);

	rc = zmq_recv(s,&msg,0);

	const char *re_string;
	re_string = (const char*)zmq_msg_data(&msg);
	FILE *fp = fopen(re_string,"wb+");
	if(!fp)
	{
		cout<<"文件打开失败."<<endl;
		return;
	}

	zmq_msg_close(&msg);

    //Send
	const char* se_string = "ok";
	rc = zmq_msg_init_size(&msg,strlen(se_string)+1);
	assert(0 == rc);

	memcpy(zmq_msg_data(&msg),se_string,strlen(se_string)+1);

    rc = zmq_send(s,&msg,0);
	assert(0 == rc);

	zmq_msg_close(&msg);


	//Recv
	INT64 more;
	size_t more_size;
	while(1)
	{
		rc = zmq_msg_init(&msg);
		assert(0 == rc);

		rc = zmq_recv(s,&msg,0);
		assert(0 == rc);
 
// 		re_string = (const char*)zmq_msg_data(&msg);
// 		cout<<re_string<<endl;
		fwrite((const char*)zmq_msg_data(&msg),zmq_msg_size(&msg),1,fp);

		zmq_msg_close(&msg);

		zmq_getsockopt(s,ZMQ_RCVMORE,&more,&more_size);
		if(!more)
			break;
	}


	//Send
	rc = zmq_msg_init_size(&msg,strlen(se_string)+1);
	assert(0 == rc);

	memcpy(zmq_msg_data(&msg),se_string,strlen(se_string)+1);

	rc = zmq_send(s,&msg,0);
	assert(0 == rc);

	zmq_msg_close(&msg);

    //Close file
	fclose(fp);
	puts("done.");
}


void main(int argc,TCHAR*argv[])
{
	void *ctx;
	ctx = zmq_init(1);
	assert(ctx);

	void *s;
	s = zmq_socket(ctx,ZMQ_REP);
	assert(s);

	int rc;
	rc = zmq_bind(s,"tcp://*:5555");
	assert(0 == rc);

	while(1)
	{
		recv_file(s);
	}

}

FClient端:

#include <assert.h>
#include <iostream>
#include <tchar.h>
#include <zmq.h>
using namespace std;

#pragma comment(lib,"libzmq.lib")

#define ONE_BUFFER_SIZE 1024

void myFree(void *data,void *hint)
{
	cout<<"MyFree"<<endl;
	free(data);
}

void main(int argc,char*argv[])
{
	const char* se_string = "111.bmp";
	FILE *fp = fopen(se_string,"rb");
	if(!fp)
	{
		cout<<"打开文件失败."<<endl;
		return;
	}

	//Init ctx and socket
	void *ctx;
	ctx = zmq_init(1);
	assert(ctx);

	void *s;
	s = zmq_socket(ctx,ZMQ_REQ);
	assert(s);

	int rc;
	rc = zmq_connect(s,"tcp://127.0.0.1:5555");
	assert(0 == rc);

	//Send
	zmq_msg_t msg;
	rc = zmq_msg_init_size(&msg,strlen(se_string)+1);
	assert(0 == rc);

	memcpy(zmq_msg_data(&msg),se_string,strlen(se_string)+1);

	rc = zmq_send(s,&msg,0);
	assert(0 == rc);

	zmq_msg_close(&msg);

	//Recv
	rc = zmq_msg_init(&msg);
	assert(0 == rc);

	rc = zmq_recv(s,&msg,0);
	assert(0 == rc);

	const char* re_string;
	re_string = (const char*)zmq_msg_data(&msg);
	cout<<re_string<<endl;

	zmq_msg_close(&msg);

	//Send
	char *pBuf;
	int n = 0;
	while(1)
	{
		pBuf = (char*)malloc(ONE_BUFFER_SIZE);
		n = fread(pBuf,1,ONE_BUFFER_SIZE,fp);
		if(n < ONE_BUFFER_SIZE)
		{
			cout<<"client < 1024"<<endl;

// 			rc = zmq_msg_init_size(&msg,strlen(se_string)+1);
// 			assert(0 == rc);
// 
// 			memcpy(zmq_msg_data(&msg),se_string,strlen(se_string)+1);

			rc = zmq_msg_init_data(&msg,pBuf,ONE_BUFFER_SIZE,myFree,NULL);
			assert(0 == rc);

			rc = zmq_send(s,&msg,0);
			assert(0 == rc);

			zmq_msg_close(&msg);
			break;
		}

// 		rc = zmq_msg_init_size(&msg,strlen(se_string)+1);
// 		assert(0 == rc);
// 
// 		memcpy(zmq_msg_data(&msg),se_string,strlen(se_string)+1);

		rc = zmq_msg_init_data(&msg,pBuf,ONE_BUFFER_SIZE,myFree,NULL);
		assert(0 == rc);

		rc = zmq_send(s,&msg,ZMQ_SNDMORE);
		assert(0 == rc);

		zmq_msg_close(&msg);
	}

	//Recv
	rc = zmq_msg_init(&msg);
	assert(0 == rc);

	rc = zmq_recv(s,&msg,0);
	assert(0 == rc);

	re_string = (const char*)zmq_msg_data(&msg);
	cout<<re_string<<endl;

	zmq_msg_close(&msg);

	//Close ctx and socket
	zmq_close(s);
	zmq_term(ctx);

	fclose(fp);
	puts("done.client");
}

抱歉!评论已关闭.