现在的位置: 首页 > 综合 > 正文

Zeromq

2018年12月11日 ⁄ 综合 ⁄ 共 5922字 ⁄ 字号 评论关闭

Zeromq的资源:

Zeromq模式:

http://blog.codingnow.com/2011/02/zeromq_message_patterns.html

zeromq主页:

http://www.zeromq.org/

Zeromq Guild:

http://zguide.zeromq.org/page:all#Fixing-the-World

Zeromq 中文简介:

http://blog.csdn.net/program_think/article/details/6687076

Zero wiki:

http://en.wikipedia.org/wiki/%C3%98MQ

zeromq系列:

http://iyuan.iteye.com/blog/972949

Zeromq资源阅读:

ØMQ(Zeromq) 是一个更为高效的传输层

优势是:

1 程序接口库是一个并发框架

2 在集群和超级计算机上表现得比TCP更快

3 通过inproc, IPC, TCP, 和 multicast进行传播消息

4 通过发散,订阅,流水线,请求的方式连接

5 对于不定规模的多核消息传输应用使用异步IO

6 有非常大并且活跃的开源社区

7 支持30+的语言

8 支持多种系统

 

Zeromq定义为“史上最快的消息队列”

从网络通信的角度看,它处于会话层之上,应用层之下。

ØMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N
with patterns like fanout, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has
a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPL open source.

Zeromq中传递的数据格式是由用户自己负责,就是说如果server发送的string是有带"\0"的,那么client就必须要知道有这个

 

Pub_Sub模式。

the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.

在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,使用proxy。

Zeromq示例:

1 获取例子

git clone --depth=1 git://github.com/imatix/zguide.git

2 服务器端:

(当服务器收到消息的时候,服务器回复“World”)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?php
    /*
    
Hello World server
    
Binds REP socket to
tcp://
*:5555
    
Expects "Hello" from client, replies with "World"
    *
@author Ian Barber <ian(dot)barber(at)gmail(dot)com>
    */
     
    $context =
new ZMQContext(1);
     
    // 
Socket to talk to clients
    $responder =
new ZMQSocket($context,
ZMQ::SOCKET_REP);
    $responder->bind("tcp://*:5555");
     
    while(true)
{
        // 
Wait for next request from client
        $request =
$responder->recv();
        printf
(
"Received
request: [%s]\n"
,
$request);
     
        // 
Do some 'work'
        sleep
(1);
     
        // 
Send reply back to client
        $responder->send("World");   
 
}

3 客户端:

(客户端发送消息)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php
    /*
    
Hello World client
    
Connects REQ socket to
tcp://localhost:5555
    
Sends "Hello" to server, expects "World" back
    *
@author Ian Barber <ian(dot)barber(at)gmail(dot)com>
    */
     
    $context =
new ZMQContext();
     
    // 
Socket to talk to server
    echo "Connecting
to hello world server…\n"
;
    $requester =
new ZMQSocket($context,
ZMQ::SOCKET_REQ);
    $requester->connect("tcp://localhost:5555");
     
    for($request_nbr =
0;
$request_nbr !=
10;
$request_nbr++)
{
        printf
(
"Sending
request %d…\n"
,
$request_nbr);
        $requester->send("Hello");
         
        $reply =
$requester->recv();
        printf
(
"Received
reply %d: [%s]\n"
,
$request_nbr,
$reply);
 
}
?
1
 

天气气候订阅系统:(pub-sub)

1 server端:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?php
    /*
    
Weather update server
    
Binds PUB socket to
tcp://
*:5556
    
Publishes random weather updates
    *
@author Ian Barber <ian(dot)barber(at)gmail(dot)com>
    */
     
    // 
Prepare our context and publisher
    $context =
new ZMQContext();
    $publisher =
$context->getSocket(ZMQ::SOCKET_PUB);
    $publisher->bind("tcp://*:5556");
    $publisher->bind("ipc://weather.ipc");
     
    while (true)
{
        // 
Get values that will fool the boss
        $zipcode     =
mt_rand(0, 100000);
        $temperature =
mt_rand(-80, 135);
        $relhumidity =
mt_rand(10, 60);
     
        // 
Send message to all subscribers
        $update =
sprintf (
"%05d
%d %d"
,
$zipcode,
$temperature,
$relhumidity);
        $publisher->send($update);
    }

2 client端:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php
    /*
    
Weather update client
    
Connects SUB socket to
tcp://localhost:5556
    
Collects weather updates and finds avg temp in zipcode
    *
@author Ian Barber <ian(dot)barber(at)gmail(dot)com>
    */
     
    $context =
new ZMQContext();
     
    // 
Socket to talk to server
    echo "Collecting
updates from weather server…"
,
PHP_EOL;
    $subscriber =
new ZMQSocket($context,
ZMQ::SOCKET_SUB);
    $subscriber->connect("tcp://localhost:5556");
     
    // 
Subscribe to zipcode, default is NYC, 10001
    $filter =
$_SERVER['argc']
> 1 ?
$_SERVER['argv'][1]
:
"10001";
    $subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE,
$filter);
     
    // 
Process 100 updates
    $total_temp =
0;
    for ($update_nbr =
0;
$update_nbr <
100;
$update_nbr++)
{
        $string =
$subscriber->recv();
        sscanf
(
$string,
"%d
%d %d"
,
$zipcode,
$temperature,
$relhumidity);
        $total_temp +=
$temperature;
    }
    printf
(
"Average
temperature for zipcode '%s' was %dF\n"
,
        $filter,
(int) (
$total_temp /
$update_nbr));
?
1
------------------------
?
1
pub-sub的proxy模式:
?
1
图示是:

clip_image001_thumb[2]

Proxy节点的代码:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?php
    /*
    
Weather proxy device
    *
@author Ian Barber <ian(dot)barber(at)gmail(dot)com>
    */
     
    $context =
new ZMQContext();
     
    // 
This is where the weather server sits
    $frontend =
new ZMQSocket($context,
ZMQ::SOCKET_SUB);
    $frontend->connect("tcp://192.168.55.210:5556");
     
    // 
This is our public endpoint for subscribers
    $backend =
new ZMQSocket($context,
ZMQ::SOCKET_PUB);
    $backend->bind("tcp://10.1.1.0:8100");
     
    // 
Subscribe on everything
    $frontend->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE,
"");
     
    // 
Shunt messages out to our own subscribers
    while(true)
{
        while(true)
{
            // 
Process all parts of the message
            $message =
$frontend->recv();
            $more =
$frontend->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
            $backend->send($message,
$more ?
ZMQ::SOCKOPT_SNDMORE : 0);
            if(!$more)
{
                break;
//
Last message part
            }
        }
 
}

其实就是proxy同时是作为pub又作为sub的

    ----------------------

作者:yjf512(轩脉刃)

出处:http://www.cnblogs.com/yjf512/

本文版权归yjf512和cnBlog共有,欢迎转载,但未经作者同意必须保留此段声明

http://www.cnblogs.com/yjf512/archive/2012/03/03/2378024.html

抱歉!评论已关闭.