现在的位置: 首页 > 综合 > 正文

分布式任務分發模型

2012年09月01日 ⁄ 综合 ⁄ 共 2044字 ⁄ 字号 评论关闭

分布式任務分發模型
zeromq + gevent實現
dispatcher:分派任務
worker:  任務執行者
recver:  任務結果收集

       dispatcher(push)
      /        |          \
worker worker worker(pull, push to recver)
     \         |          /
           recver(pull)

dispatcher.py

import zmq  
import random  
import time  
  
context = zmq.Context()  
  
# Socket to send messages on  
sender = context.socket(zmq.PUSH)  
sender.bind("tcp://*:5557")  
  
print "Press Enter when the workers are ready: "  
_ = raw_input()  
print "Sending tasks to workers..."  
  
# The first message is "0" and signals start of batch  
#sender.send('0')  
  
# Initialize random number generator  
random.seed()  
  
# Send 100 tasks
#url = "http://172.17.9.9/PortalManager/image/net.jpg"
url = "http://google.com.hk"
total_msec = 0  
for task_nbr in range(10):
    if task_nbr == 0:
        sender.send("http://www.facebook.com/")
        #sender.send(url)
    else:
        sender.send(url)
    
print "Total expected cost: %s msec" % total_msec  

worker.py

import sys  
import time
from gevent import pool, queue
from gevent_zeromq import zmq
import gevent
import urllib2

from gevent import monkey
monkey.patch_all()
print "patch all"

context = zmq.Context()  
  
# Socket to receive messages on  
receiver = context.socket(zmq.PULL)  
receiver.connect("tcp://localhost:5557")  
  
# Socket to send messages to  
sender = context.socket(zmq.PUSH)  
sender.connect("tcp://localhost:5558")  
  
#taskpool = pool.Pool()  
qin = queue.Queue(0)
  
def down(url):
    f = urllib2.urlopen(url)  
    data = f.read()
    f.close()
    l = len(data)
    print "down:%s len:%d" % (url, l)
    # Do the work  
    #time.sleep(int(s)*0.001)    
    # Send results to sink  
    sender.send("down:%s len:%d" % (url, l))    

def do_job():
    while True:
        url = qin.get()
        try:
            down(url)
        except:
            print "error..."

def recv():
    while True:  
        url = receiver.recv()
        qin.put(url)
  
# Process tasks forever
g1 = gevent.spawn(recv)
g2 = gevent.spawn(do_job)
g3 = gevent.spawn(do_job)
g4 = gevent.spawn(do_job)

gevent.joinall([g1, g2, g4])
    
    

recver.py

import sys  
import time  
import zmq
 
context = zmq.Context()  
  
# Socket to receive messages on  
receiver = context.socket(zmq.PULL)  
receiver.bind("tcp://*:5558")  
  
# Wait for start of batch  
s = receiver.recv()  
  
# Start our clock now  
tstart = time.time()  
  
# Process 100 confirmations  
total_msec = 0  
for task_nbr in range(100):  
    s = receiver.recv()  
    print  s
  
# Calculate and report duration of batch  
tend = time.time()  
print "Total elapsed time: %d msec" % ((tend-tstart)*1000)  

【上篇】
【下篇】

抱歉!评论已关闭.