现在的位置: 首页 > 综合 > 正文

MSMQ实现自定义序列化存储

2011年07月25日 ⁄ 综合 ⁄ 共 3728字 ⁄ 字号 评论关闭

在使用MSMQ的时候一般只会使用默认的XML序列化来对消息进行存储,但XML存储的缺点是序列化体积相对比较大和效率上有点低.其实.net提供非常简单的方式让我们实现不同序列化方式来存储MSMQ信息,如json,protobuf等.为了能够让开发人员实现自定义序列化的消息存储,.NET提供了IMessageFormatter这样一个接口,只需要简单地实现这个接口就可以对MSMQ的消息进行处理.以下讲解如何实现json和protobuf的messageformater.

IMessageFormatter

// 摘要:
    //     从“消息队列”消息体序列化或反序列化对象。
    [TypeConverter(typeof(MessageFormatterConverter))]
    public interface IMessageFormatter : ICloneable
    {
        // 摘要:
        //     在类中实现时,确定格式化程序是否可以反序列化消息的内容。
        //
        // 参数:
        //   message:
        //     要检查的 System.Messaging.Message。
        //
        // 返回结果:
        //     如果格式化程序可以反序列化消息,则为 true;否则为 false。
        bool CanRead(Message message);
        //
        // 摘要:
        //     在类中实现时,读取给定消息中的内容并创建包含该消息中的数据的对象。
        //
        // 参数:
        //   message:
        //     The System.Messaging.Message to deserialize.
        //
        // 返回结果:
        //     反序列化的消息。
        object Read(Message message);
        //
        // 摘要:
        //     在类中实现时,将对象序列化到消息体中。
        //
        // 参数:
        //   message:
        //     System.Messaging.Message,它将包含序列化的对象。
        //
        //   obj:
        //     要序列化到消息中的对象。
        void Write(Message message, object obj);
    }

接口非常简单,主要规范了MSMQ写入和读取的规则.

Json Formater

public class JsonFormater<T> : IMessageFormatter
    {

        public bool CanRead(Message message)
        {
            return message.BodyStream != null && message.BodyStream.Length > 0;
        }

        [ThreadStatic]
        private static byte[] mBuffer;

        public object Read(Message message)
        {
            if(mBuffer== null)
                mBuffer = new byte[4096];
            int count =(int)message.BodyStream.Length;
            message.BodyStream.Read(mBuffer, 0, count);
            return Newtonsoft.Json.JsonConvert.DeserializeObject(Encoding.UTF8.GetString(mBuffer, 0, count), typeof(T));

        }

        [System.ThreadStatic]
        private static System.IO.MemoryStream mStream;

        public void Write(Message message, object obj)
        {
            if (mStream == null)
                mStream = new System.IO.MemoryStream(4096);
            mStream.Position = 0;
            mStream.SetLength(4095);
            string value = Newtonsoft.Json.JsonConvert.SerializeObject(obj);
            int count = Encoding.UTF8.GetBytes(value, 0, value.Length, mStream.GetBuffer(), 0);
            mStream.SetLength(count);
            message.BodyStream = mStream;
        }

        public object Clone()
        {
            return this;
        }
    }

Protobuf Formater

public class ProtobufFormater<T> : IMessageFormatter
    {
        public bool CanRead(Message message)
        {
            return message.BodyStream != null && message.BodyStream.Length > 0;
        }

        public object Read(Message message)
        {
            return ProtoBuf.Meta.RuntimeTypeModel.Default.Deserialize(message.BodyStream, null, typeof(T));
        }

        [System.ThreadStatic]
        private static System.IO.MemoryStream mStream ;

        public void Write(Message message, object obj)
        {
            if (mStream == null)
                mStream = new System.IO.MemoryStream(4096);
            mStream.Position = 0;
            mStream.SetLength(0);
            ProtoBuf.Meta.RuntimeTypeModel.Default.Serialize(mStream, obj);
            message.BodyStream = mStream;
        }

        public object Clone()
        {
            return this;
        }
       
    }

使用Formater

使有自定义Formater比较简单,只需要指定MessageQueue的Formatter属性即可.

MessageQueue queue = new MessageQueue(@".\private$\Test");
            queue.Formatter = new JsonFormater<User>();

简单的测性能测试

针对json,protobuf这两种自定义序列化和默认的XML序列化性能上有多大差异,各自进行100000条写入和读取的耗时情况.

System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
            sw.Reset();
            sw.Start();
            for (int i = 0; i < 100000; i++)
            {
                queue.Send(user);
            }
            sw.Stop();
            Console.WriteLine("MSMQ send xml formater:" + sw.Elapsed.TotalMilliseconds + "ms");
            sw.Reset();
            sw.Start();
            for (int i = 0; i < 100000; i++)
            {
                User result = (User)queue.Receive().Body;
            }
            sw.Stop();
            Console.WriteLine("MSMQ receive xml formater:" + sw.Elapsed.TotalMilliseconds + "ms");

            queue.Formatter = new JsonFormater<User>();
            sw.Reset();
            sw.Start();
            for (int i = 0; i < 100000; i++)
            {
                queue.Send(user);
            }
            sw.Stop();
            Console.WriteLine("MSMQ send Json formater:" + sw.Elapsed.TotalMilliseconds + "ms");
            sw.Reset();
            sw.Start();
            for (int i = 0; i < 100000; i++)
            {
                User result = (User)queue.Receive().Body;
            }
            sw.Stop();
            Console.WriteLine("MSMQ receive json formater:" + sw.Elapsed.TotalMilliseconds + "ms");

            queue.Formatter = new ProtobufFormater<User>();
            sw.Reset();
            sw.Start();
            for (int i = 0; i < 100000; i++)
            {
                queue.Send(user);
            }
            sw.Stop();
            Console.WriteLine("MSMQ send Protobuf formater:" + sw.Elapsed.TotalMilliseconds+"ms");
            sw.Reset();
            sw.Start();
            for (int i = 0; i < 100000; i++)
            {
                User result = (User)queue.Receive().Body;
            }
            sw.Stop();
            Console.WriteLine("MSMQ receive Protobuf formater:" + sw.Elapsed.TotalMilliseconds + "ms");

测试结果

从测试来看还是protobuf效率上占优点:)

抱歉!评论已关闭.