现在的位置: 首页 > 综合 > 正文

python使用amqp的例子

2012年08月11日 ⁄ 综合 ⁄ 共 6665字 ⁄ 字号 评论关闭

web程序需要给IPHONE手机推送消息,移动端同事写好了一个LIB,但是这个LIB使用的是阻塞IO,在APPLE服务器返回前程序是阻塞的,他们用了多线程来解决这个问题,但是webpy运行在apache里,无法进行线程管理,所以就迫切需要一个异步的机制来解决这个问题,希望做到需要发送消息时。调用一个函数,把数据扔进管道或者消息队列后就立即返回,也不管数据是否真的出去了,它相当于生产者,再有一个程序从管道或者队列中读取,进行实际的发送,相当于消费者

但是python似乎只封装了IPC(UNIX/LINUX平台进程通信规范)的匿名管道,命名管道的API,并无消息队列,而且管道似乎不易操作,因此想使用第三方的消息队列工具,那么我老大进行了技术选型,最终选择了amqp,这个消息队列组件使用erlang编写,启动了一个socket服务器,程序通过socket进行入队和出队的操作,不过这个组件应该是提供了大量的库隐藏了通信的部分,使代码看起来就像调用函数进行队列操作

 

client.py  测试客户端 向队列写入消息

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#client.py

import sys
import time

import json
from amqplib import client_0_8 as amqp

conn = amqp.Connection(
    host="localhost:5672",
    userid="guest",
    password="guest",
    virtual_host="/",
    insist=False)
chan = conn.channel()

i = 0
while 1:
    #msg = amqp.Message('Message %d' % i)
    
    #笔记更新
    '''
    data = {
        "noteId" : 1,
        "recvUserId" : 2,
        "title" : "test",
        "updateUserName":"你好",
        "remindCount":10,
        "projectName":"11",
        "token":"b258f5e3809017e371009f32eba7d72fcf51165406ce011951811b53db15b414",
        "isPushed":0,
        "messageType":"NoteUpdated",
    }
    '''

    '''
    data = {
        "projectId": 1,
        "recvUserId": 2,
        "reqUserName": "testUser",
        "reqUserStatus": "pending",
        "remindCount": 10,
        "projectName":"11",
        "token":"b258f5e3809017e371009f32eba7d72fcf51165406ce011951811b53db15b414",
        "messageType":"Apply",
    }
    '''

    data = {
        "projectId": 1,
        "recvUserId": 2,
        "reqUserName": "testUser",
        "reqUserStatus": "",
        "remindCount": 10,
        "projectName": "11",
        "token":"b258f5e3809017e371009f32eba7d72fcf51165406ce011951811b53db15b414",
        "messageType":"Review",
    }

    s = json.dumps(data,ensure_ascii=False)
    print s
    msg = amqp.Message(s)

    msg.properties["delivery_mode"] = 2

    chan.basic_publish(msg,
        exchange="sorting_room",
        routing_key="testkey")
    i += 1
    time.sleep(1)
    break

chan.close()
conn.close()

 

 

server.py  读取队列

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from amqplib import client_0_8 as amqp
import process 

conn = amqp.Connection(
    host="localhost:5672",
    userid="guest",
    password="guest",
    virtual_host="/",
    insist=False)
chan = conn.channel()

chan.queue_declare(
    queue="po_box",
    durable=True,
    exclusive=False,
    auto_delete=False)
chan.exchange_declare(
    exchange="sorting_room",
    type="direct",
    durable=True,
    auto_delete=False,)

chan.queue_bind(
    queue="po_box",
    exchange="sorting_room",
    routing_key="testkey")

def recv_callback(msg):
    #TODO
    #print msg.body
    process.starup(msg.body)


chan.basic_consume(
    queue='po_box',
    no_ack=True,
    callback=recv_callback,
    consumer_tag="testtag")

while True:
    chan.wait()

#chan.basic_cancel("testtag")
#chan.close()
#conn.close()

 

process.py 业务逻辑模块

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import json
import time
import thread
from APNSWrapper import *


#---------------------------------------------------------------------------------------------
'''从外部的数据转换成为标准数据'''
def convert(args):
    print args
    data = json.loads(args)
    return data


'''线程入口'''
def threadMain(args):
    data = convert(args)
    msg = messagesFactory(data)
    msg.fillMessages(data)
    msg.push()    



#---------------------------------------------------------------------------------------------
'''业务入口'''
def startup(args):
    thread.start_new_thread(threadMain,(args,))



#---------------------------------------------------------------------------------------------
'''消息工厂'''
def messagesFactory(data):
    if data["messageType"] == "NoteUpdated":
        return NoteUpdateMessages()

    if data["messageType"] == "Apply":
        return ApplyMessages()

    if data["messageType"] == "Review":
        return ReviewMessages()    


#---------------------------------------------------------------------------------------------
'''消息基类'''
class Messages:
    def __init__(self):
        self.Messages = []

    def makeMessage(self,data):
        pass

    def fillMessages(self,data):
        message = self.makeMessage(data)
        self.Messages.append(message)

    def push(self):
        try:
            wrapper = APNSNotificationWrapper('/etc/ck.pem', True)

            for message in self.Messages:
                wrapper.append(message)

            wrapper.notify()
            print 'send success'
        except Exception, e:
            raise


#---------------------------------------------------------------------------------------------
'''笔记更新消息类'''
class NoteUpdateMessages(Messages):
    def makeMessage(self,data):
        token = data["token"]
        deviceToken = token.decode('hex')
        message = APNSNotification()
        message.token(deviceToken)
        for key in ['updateUserName','title']:
            if type(data[key]) is unicode:
                data[key] = data[key].encode('utf8')
        alert = '共%d篇更新 %s编辑了“%s”' % (int(data["remindCount"]), data["updateUserName"], data["title"])

        if data["isPushed"] == 0:
            message.alert(alert)
        message.badge(int(data["remindCount"]))
        message.sound('qing.caf')
    
        property1 = APNSProperty("NOTE_ID", int(data["noteId"]) )
        message.appendProperty(property1)
        property2 = APNSProperty("USER_ID", int(data["recvUserId"]) )
        message.appendProperty(property2)
        property3 = APNSProperty("REMIND_TIME", int(time.time()) )
        message.appendProperty(property3)
        message.appendProperty(APNSProperty("REMIND_TYPE", 1 ))
        return message


#---------------------------------------------------------------------------------------------
'''申请加入项目消息类'''
class ApplyMessages(Messages):
    def makeMessage(self,data):
        token = data["token"]
        deviceToken = token.decode('hex')
        message = APNSNotification()
        message.token(deviceToken)
        alertMsg = ''
        status = data["reqUserStatus"]
        if status == 'pending':
            alertMsg = '%s申请加入%s群组,请审批' % (data["reqUserName"].encode('utf-8'), data["projectName"].encode('utf-8'),)
        elif status == 'active':
            alertMsg = '%s已加入%s群组' % (data["reqUserName"].encode('utf-8'), data["projectName"].encode('utf-8'),)
        elif status == 'deny':
            alertMsg = '%s被拒绝加入%s群组' % (data["reqUserName"].encode('utf-8'), data["projectName"].encode('utf-8'),)
        elif status == 'removed':
            alertMsg = '%s已从%s群组移除' % (data["reqUserName"].encode('utf-8'), data["projectName"].encode('utf-8'),)

        message.alert(alertMsg)
        message.badge(int(data["remindCount"]))
        message.sound('qing.caf')
    
        message.appendProperty(APNSProperty("PROJECT_ID", int(data["projectId"]) ))
        message.appendProperty(APNSProperty("USER_ID", int(data["recvUserId"]) ))
        message.appendProperty(APNSProperty("REMIND_TIME", int(time.time()) ))
        message.appendProperty(APNSProperty("REMIND_TYPE", 3 ))
        message.appendProperty(APNSProperty("REMIND_MSG", alertMsg ))
        return message


#---------------------------------------------------------------------------------------------
'''审核结果消息类'''
class ReviewMessages(Messages):
    def makeMessage(self,data):
        token = data["token"]
        deviceToken = token.decode('hex')

        message = APNSNotification()
        message.token(deviceToken)
    
        alertMsg = ''
        status = data["reqUserStatus"]
        if status == 'active':
            alertMsg = '您已加入%s群组' % (data["projectName"].encode('utf-8'),)
        elif status == 'deny':
            alertMsg = '您被拒绝加入%s群组' % (data["projectName"].encode('utf-8'),)
        elif status == 'removed':
            alertMsg = '您已从%s群组移除' % (data["projectName"].encode('utf-8'),)

        message.alert(alertMsg)
        message.badge(int(data["remindCount"]))
        message.sound('qing.caf')
    
        message.appendProperty(APNSProperty("PROJECT_ID", int(data["projectId"]) ))
        message.appendProperty(APNSProperty("USER_ID", int(data["recvUserId"]) ))
        message.appendProperty(APNSProperty("REMIND_TIME", int(time.time()) ))
        message.appendProperty(APNSProperty("REMIND_TYPE", 2 ))
        message.appendProperty(APNSProperty("REMIND_MSG", alertMsg ))
        return message
    

抱歉!评论已关闭.