现在的位置: 首页 > 综合 > 正文

如何利用python dbus来发送一个信号

2013年09月29日 ⁄ 综合 ⁄ 共 2185字 ⁄ 字号 评论关闭

简介

dbus用于进程间通信,可以降低不同程序间的耦合性,dbus的原理同分布式计算很象,其中一个介绍原理的网址为:官方的
文档

用python来操作dbus很方便,python-dbus的教程: 教程

 

一些官方例子: 例子

简单过程

1.首先要从dbus.service.Object继承,这样才可以输出方法和信号,同时调用dbus.service.Object来初始化bus类
型(Session bus or System bus),以及 对象路径

 class
Msg(dbus.service.Object):
    def __init__(self,bus,object_path):
       
dbus.service.Object.__init__(self,bus,object_path)

 

2.输出信号,先修饰,信号要传递的参数有signature确定,然后再定义信号函数,信号函数体本身没多大意义,有意义
的只在于函数体的参数,在dbus中的信号名就是这个信号函数的名字

 @dbus.service.signal(dbus_interface=MSG_INTERFACE_URI,
                        
signature='as')           #发送了一个可变数组,但数组的类型要一致,这里都是string
    def
msg_signal(self,msg_list):
        print "exported signal:
",msg_list   #这个没有意义

 

3..定义一个发送信号的函数,注意要返回True,否则如果调用timeout_add的时候,它执行了一次就会停下来,
不会重复执行,因为timeout_add碰到False的时候就会停止执行

发送信号,其实也就只是调用刚才修饰的信号函数而已

  def construct_msg(self):
       
timeStamp = time.strftime(TIMEFORMAT)
       
self.msg_signal(["1111",timeStamp,"This is the content","1 2 3"])
       
return True

 

4.连接到bus,注意在连接前要先选好loop的类型,否则不让连接

 DBusGMainLoop(set_as_default=True) 
#选好loop的类型
    bus = dbus.SessionBus()
    aMsg =
Msg(bus,MSG_OBJ_PATH)    #将对象输出到bus中

   
gobject.timeout_add(1000,aMsg.construct_msg)  #定时发送信号,知道其中的函数返回False为止
   
loop = gobject.MainLoop()
    loop.run()

完整程序

 #!/usr/bin/python

import
dbus
import dbus.service
from dbus.mainloop.glib import
DBusGMainLoop
import time
import gobject

MSG_OBJ_PATH =
"/com/example/msg"
MSG_INTERFACE_URI = "com.example.msg"

TIMEFORMAT
= "%H:%M:%S"

class Msg(dbus.service.Object):
    def
__init__(self,bus,object_path):
       
dbus.service.Object.__init__(self,bus,object_path)

   
@dbus.service.method(dbus_interface=MSG_INTERFACE_URI,
                        
in_signature='', out_signature='s')
    def say_hello(self):
       
return "hello, exported method"

   
@dbus.service.signal(dbus_interface=MSG_INTERFACE_URI,
                        
signature='as')
    def msg_signal(self,msg_list):
        print
"exported signal: ",msg_list

    def construct_msg(self):
       
timeStamp = time.strftime(TIMEFORMAT)
       
self.msg_signal(["1111",timeStamp,"This is the content","1 2 3"])
       
return True

if __name__ == "__main__":
   
DBusGMainLoop(set_as_default=True)
    bus = dbus.SessionBus()
   
aMsg = Msg(bus,MSG_OBJ_PATH)

   
gobject.timeout_add(1000,aMsg.construct_msg)
    loop =
gobject.MainLoop()
    loop.run()

抱歉!评论已关闭.