现在的位置: 首页 > 综合 > 正文

如何利用jgroups实现分布式环境下消息的接受和发送

2013年08月28日 ⁄ 综合 ⁄ 共 9976字 ⁄ 字号 评论关闭

       为了提高应用的性能,我们准备实现分布式cache,所以我特别研究了oscache关于分布式实现的部分.
       我们知道为了实现分布式环境下消息的通知,目前两种比较流行的做法是使用JavaGroups[http://www.jgroups.org]和JMS。这两种方式都在底层实现了广播发布消息。
       由于JGroups可以提供可靠的广播通信.所以我们准备采用JGroups.

     我自己写了一个JavaGroupBroadcastingManager.java类实现消息的管理(包括发送和接收),代码参考了oscache的相关代码,在其基础上进行了改进.

代码如下:
1、JavaGroupBroadcastingManager.java
package com.yz;

import com.opensymphony.oscache.base.FinalizationException;
import com.opensymphony.oscache.base.InitializationException;
import com.opensymphony.oscache.plugins.clustersupport.JavaGroupsBroadcastingListener;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.jgroups.Address;
import org.jgroups.Channel;

import org.jgroups.blocks.NotificationBus;

import java.io.Serializable;

import java.util.Properties;

/**
 * @author   yangzheng
 * @version  $Revision$
 * @since   2005-7-14
 */
public class JavaGroupBroadcastingManager
        implements NotificationBus.Consumer {
    private static final Log    log                   = LogFactory.getLog(JavaGroupsBroadcastingListener.class);
    private static final String BUS_NAME              = "OSCacheBus";
    private static final String CHANNEL_PROPERTIES    = "cache.cluster.properties";
    private static final String MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip";
    private NotificationBus     bus;

    /**
     * Initializes the broadcasting listener by starting up a JavaGroups notification
     * bus instance to handle incoming and outgoing messages.
     *
     */
    public synchronized void initialize(Properties config) throws InitializationException {
        String properties  = config.getProperty(CHANNEL_PROPERTIES);
        String multicastIP = config.getProperty(MULTICAST_IP_PROPERTY);

        if (log.isInfoEnabled()) {
            log.info("Starting a new JavaGroups broadcasting listener with properties="
                     + properties);
        }

        try {
            bus = new NotificationBus(BUS_NAME, properties);
            bus.start();
            bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false));
            bus.setConsumer(this);
            log.info("JavaGroups clustering support started successfully");
        } catch (Exception e) {
            throw new InitializationException("Initialization failed: " + e);
        }
    }

    /**
     * Shuts down the JavaGroups being managed
     */
    public synchronized void finialize() throws FinalizationException {
        if (log.isInfoEnabled()) {
            log.info("JavaGroups shutting down...");
        }

        bus.stop();
        bus = null;

        if (log.isInfoEnabled()) {
            log.info("JavaGroups shutdown complete.");
        }
    }

    /**
     * Uses JavaGroups to broadcast the supplied notification message across the cluster.
     *
     */
    protected void sendNotification(Serializable message) {
        bus.sendNotification(message);
    }

    /**
     * Handles incoming notification messages from JavaGroups. This method should
     * never be called directly.
     *
     */
    public void handleNotification(Serializable serializable) {
        log.info("An cluster notification message received message " + serializable.toString()
                 + "). Notification ignored.");
    }

    /**
     * We are not using the caching, so we just return something that identifies
     * us. This method should never be called directly.
     */
    public Serializable getCache() {
        return "JavaGroupsBroadcastingListener: " + bus.getLocalAddress();
    }

    /**
     * A callback that is fired when a new member joins the cluster. This
     * method should never be called directly.
     *
     * @param address The address of the member who just joined.
     */
    public void memberJoined(Address address) {
        if (log.isInfoEnabled()) {
            log.info("A new member at address '" + address + "' has joined the cluster");
        }
    }

    /**
     * A callback that is fired when an existing member leaves the cluster.
     * This method should never be called directly.
     *
     * @param address The address of the member who left.
     */
    public void memberLeft(Address address) {
        if (log.isInfoEnabled()) {
            log.info("Member at address '" + address + "' left the cluster");
        }
    }
}

2、发送消息的程序:
package com.yz;

import java.io.FileInputStream;

import java.util.Properties;

/**
 * @author   yangzheng
 * @version  $Revision$
 * @since   2005-7-14
 */
public class TestJavaGroupBroadcastSend {
    public static void main(String[] args) throws Exception {
        JavaGroupBroadcastingManager javaGroupBroadcastingManager = new JavaGroupBroadcastingManager();
        Properties                   properties = new Properties();

        properties.load(new FileInputStream("javagroup.properties"));
        javaGroupBroadcastingManager.initialize(properties);

        String message = "hello world!";
        while (true) {
            Thread.sleep(1000);
            javaGroupBroadcastingManager.sendNotification(message);
        }
    }
}

3、接受消息的程序:
package com.yz;

import java.io.FileInputStream;
import java.util.Properties;

/**
 * @author   yangzheng
 * @version  $Revision$
 * @since   2005-7-14
 */
public class TestJavaGroupBroadcastReceive {
    public static void main(String[] args) throws Exception {
        JavaGroupBroadcastingManager javaGroupBroadcastingManager = new JavaGroupBroadcastingManager();
        Properties                   properties = new Properties();

        properties.load(new FileInputStream("javagroup.properties"));
        javaGroupBroadcastingManager.initialize(properties);
       
        Thread.sleep(100000000);
    }
}

4、配置文件:(基本上不用改动)
javagroup.properties
cache.cluster.properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;/
mcast_send_buf_size=150000;mcast_recv_buf_size=80000):/
PING(timeout=2000;num_initial_members=3):/
MERGE2(min_interval=5000;max_interval=10000):/
FD_SOCK:VERIFY_SUSPECT(timeout=1500):/
pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):/
UNICAST(timeout=300,600,1200,2400):/
pbcast.STABLE(desired_avg_gossip=20000):/
FRAG(frag_size=8096;down_thread=false;up_thread=false):/
pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
cache.cluster.multicast.ip=231.12.21.132

5、所需要的jar包
commons-logging-1.0.4.jar
jgroups-2.2.8.jar  concurrent.jar 属于jgroups的包

6、说明:
1、发送消息和接受消息的程序都需要调用JavaGroupBroadcastingManager.initialize()方法初始化jgroup。
2、运行环境的多台服务器要在同一个局域网内,同时hosts中不要将127.0.0.1写入,以便jgroup获得本机的ip,而不是获得127.0.0.1

7、程序运行的结果:
接受端:

Jul 14, 2005 1:29:09 PM com.yz.JavaGroupBroadcastingManager initialize
INFO: Starting a new JavaGroups broadcasting listener with properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):UNICAST(timeout=300,600,1200,2400):pbcast.STABLE(desired_avg_gossip=20000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)

Jul 14, 2005 1:29:12 PM org.jgroups.protocols.UDP createSockets
INFO: sockets will use interface 10.0.99.99
Jul 14, 2005 1:29:12 PM org.jgroups.protocols.UDP createSockets
INFO: socket information:
local_addr=10.0.99.99:33637, mcast_addr=231.12.21.132:45566, bind_addr=/10.0.99.99, ttl=32
sock: bound to 10.0.99.99:33637, receive buffer size=64000, send buffer size=32000
mcast_recv_sock: bound to 10.0.99.99:45566, send buffer size=131071, receive buffer size=80000
mcast_send_sock: bound to 10.0.99.99:33638, send buffer size=131071, receive buffer size=80000

-------------------------------------------------------
GMS: address is 10.0.99.99:33637
-------------------------------------------------------

Jul 14, 2005 1:29:14 PM com.yz.JavaGroupBroadcastingManager initialize
INFO: JavaGroups clustering support started successfully
Jul 14, 2005 1:29:14 PM com.yz.JavaGroupBroadcastingManager memberJoined
INFO: A new member at address '10.0.99.99:33617' has joined the cluster
Jul 14, 2005 1:29:14 PM com.yz.JavaGroupBroadcastingManager memberJoined
INFO: A new member at address '10.0.99.99:33637' has joined the cluster
Jul 14, 2005 1:30:24 PM com.yz.JavaGroupBroadcastingManager memberJoined
INFO: A new member at address '10.0.99.98:33648' has joined the cluster // 监控到发送端服务器加入cluster

Jul 14, 2005 1:30:25 PM com.yz.JavaGroupBroadcastingManager handleNotification  //接受到消息
INFO: An cluster notification message received message hello world!). Notification ignored.

发送端
Jul 14, 2005 1:20:15 PM com.yz.JavaGroupBroadcastingManager initialize
INFO: Starting a new JavaGroups broadcasting listener with properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):UNICAST(timeout=300,600,1200,2400):pbcast.STABLE(desired_avg_gossip=20000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
Jul 14, 2005 1:20:16 PM org.jgroups.protocols.UDP createSockets
INFO: sockets will use interface 10.0.99.98
Jul 14, 2005 1:20:16 PM org.jgroups.protocols.UDP createSockets
INFO: socket information:
local_addr=10.0.99.98:33648, mcast_addr=231.12.21.132:45566, bind_addr=/10.0.99.98, ttl=32
sock: bound to 10.0.99.98:33648, receive buffer size=64000, send buffer size=32000
mcast_recv_sock: bound to 10.0.99.98:45566, send buffer size=131071, receive buffer size=80000
mcast_send_sock: bound to 10.0.99.98:33649, send buffer size=131071, receive buffer size=80000

-------------------------------------------------------
GMS: address is 10.0.99.98:33648
-------------------------------------------------------
Jul 14, 2005 1:20:18 PM com.yz.JavaGroupBroadcastingManager initialize
INFO: JavaGroups clustering support started successfully
Jul 14, 2005 1:20:18 PM com.yz.JavaGroupBroadcastingManager memberJoined
INFO: A new member at address '10.0.99.99:33617' has joined the cluster
Jul 14, 2005 1:20:18 PM com.yz.JavaGroupBroadcastingManager memberJoined
INFO: A new member at address '10.0.99.99:33637' has joined the cluster
Jul 14, 2005 1:20:18 PM com.yz.JavaGroupBroadcastingManager memberJoined
INFO: A new member at address '10.0.99.98:33648' has joined the cluster  // 监控到接受端服务器加入cluster
Jul 14, 2005 1:20:27 PM com.yz.JavaGroupBroadcastingManager memberLeft
INFO: Member at address '10.0.99.99:33637' left the cluster   // 监控到接受端服务器的程序退出

现在程序已经可以正常运行,有了这个基础,分布式cache的实现指日可待.

抱歉!评论已关闭.