现在的位置: 首页 > 综合 > 正文

MSMQ 大文件分包收发(带头文件、监听)

2013年02月09日 ⁄ 综合 ⁄ 共 8074字 ⁄ 字号 评论关闭

消息队列类

 

代码

using System;
using System.Collections.Generic;
using System.Text;
using System.Messaging;
using System.IO;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;

using System.Xml;
using System.Threading;

namespace TestMSMQ01
{
    
public class MessageManager
    {
        
string path = string.Empty;
        
string fileName = string.Empty;
        MessageQueue queue 
= null;
        
public MessageManager(string path, string fileName)
        {
            
this.path = path;
            
this.fileName = fileName;
            
this.CreateQueue(path);
            queue 
= new MessageQueue(this.path);
        }

        private void CreateQueue(string path)
        {
            
if (!MessageQueue.Exists(path))
            {
                MessageQueue.Create(path, true);
            }
        }

        public bool ReceiveMsg(string fileName)
        {
            
byte[] data;
            
int length = 0;
            
bool flg = false;

            MessageQueue queue = new MessageQueue(this.path);
            
try
            {

                using (FileStream fs = File.OpenWrite(fileName))
                {

                    //Message msg = queue.Receive(new TimeSpan(0, 0, 0, 1));
                    Message msg = queue.Receive();
                    
while (msg != null)
                    {
                        msg.Formatter 
= new BinaryMessageFormatter();
                        length 
= Convert.ToInt32(msg.BodyStream.Length);
                        data 
= new byte[length];
                        msg.BodyStream.Read(data, 
0, length);
                        fs.Write(data, 
0, length);
                        
//msg = queue.Receive(new TimeSpan(0, 0, 0, 1));
                        msg = queue.Receive();
                    }
                }
                flg 
= true;
            }
            
catch (Exception ex)
            {
                flg 
= false;
            }
            
finally
            {
                queue.Close();
            }
            
return flg;
        }

        public bool SendMsgB(string fileName)
        {
            
bool flg = false;
            
bool flg_fit = false;
            
long len = 0;

            int n;
            
int m = 0;//记录次数
            int total = 0;
            
int chunkSize = 65000;// 4193280;
            string nextMsgId = string.Empty;

            byte[] temp = new byte[chunkSize];
            
byte[] buff = null;
            
//MemoryStream ms = null;

            Message Msg 
= null;
            MessageQueueTransaction msgt 
= new MessageQueueTransaction();
            msgt.Begin();
            
try
            {
                FileStream inFile 
= new FileStream(fileName, System.IO.FileMode.Open, System.IO.FileAccess.Read);
                len 
= inFile.Length;
                buff 
= new byte[len];
                
//ms = new MemoryStream(buff);
                if ((int)len % chunkSize == 0)
                {
                    n 
= (int)len / chunkSize;
                    flg_fit 
= true;
                }
                
else
                {
                    n 
= (int)len / chunkSize + 1;
                }

                Msg = new Message(temp, new BinaryMessageFormatter());
                Msg.Label 
= "大文件分包队列" + m.ToString();
                Msg.Body 
= string.Format("<msg><length>{0}</length><count>{1}</count></msg>", len.ToString(), (n + 1).ToString());
                Msg.Formatter 
= new XmlMessageFormatter(new Type[] { typeof(string) });
                queue.Send(Msg, msgt);

                while (total < len)
                {
                    m
++;
                    
if (m == n && flg_fit == false)
                    {
                        chunkSize 
= (int)(len - total);
                        temp 
= new byte[chunkSize];
                    }

                    //ms.Position = total;
                    
//ms.Read(temp, 0, chunkSize);
                    inFile.Position = total;
                    inFile.Read(temp, 
0, chunkSize);
                    total 
+= chunkSize;
                    Msg 
= new Message(temp, new BinaryMessageFormatter());
                    Msg.Label 
= "大文件分包队列" + m.ToString();
                    Msg.CorrelationId 
= nextMsgId;
                    queue.Send(Msg, msgt);
                    nextMsgId 
= Msg.Id;
                }
                msgt.Commit();
                flg 
= true;
            }
            
catch (Exception ex)
            {
                flg 
= false;
            }
            
finally
            {
            }
            
return flg;
        }

        public void ReceiveMsgB()
        {
            
byte[] bytes = null;
            
byte[] data = null;
            MessageQueue mq 
= null;
            Message Msg 
= new Message(bytes, new BinaryMessageFormatter());
            
bool flgRead = false;
            XmlDocument xmldoc 
= null;
            
long size = 0;

            int i = 0;
            
int count = 0;
            
long n = 0;
            IFormatter bf 
= new BinaryFormatter();
            
while (true)
            {
                
try
                {
                    Thread.Sleep(
1000);
                    
//outfile = new System.IO.FileStream(this.fileName, System.IO.FileMode.Append);
                    Msg = queue.Receive(new TimeSpan(0001));
                    
while (Msg != null)
                    {
                        flgRead 
= true;
                        i
++;
                        
//头消息
                        if (i == 1)
                        {
                            Msg.Formatter 
= new XmlMessageFormatter(new Type[] { typeof(string) });
                            xmldoc 
= new XmlDocument();
                            xmldoc.LoadXml(Msg.Body.ToString());
                            size 
= Convert.ToInt64(xmldoc.SelectSingleNode("//msg/length").InnerText);
                            data 
= new byte[size];
                            count 
= Convert.ToInt32(xmldoc.SelectSingleNode("//msg/count").InnerText);
                        }
                        
else
                        {
                            bytes 
= (byte[])bf.Deserialize(Msg.BodyStream);
                            bytes.CopyTo(data, n);
                            n 
+= bytes.Length; 
                            
//outfile.Write(bytes, 0, (int)bytes.Length);
                        }

                        if (i < count)
                        {
                            Msg 
= this.queue.Receive();
                        }
                        
else
                        {
                            
break;
                        }
                    }

                    if (flgRead == true)
                    {
                        xmldoc 
= new XmlDocument();
                        xmldoc.Load(AppDomain.CurrentDomain.BaseDirectory 
+ "mq.xml");
                        
string p = xmldoc.SelectSingleNode("//mq/files/file/path").InnerText;
                        
string ext = xmldoc.SelectSingleNode("//mq/files/file/ext").InnerText;
                        
string name = DateTime.Now.ToString("yyyyMMddHHmmss"+ ext;
                        DataProcess.EnFileContentToFile(p, name, data);
                        DataProcess.DeFileToFile(p, name, 
".xls");
                    }
                }
                
catch (ThreadInterruptedException e)
                { }
                
catch (Exception ex)
                {
                }
                
finally
                {
                    size 
= count = i = 0;
                    n 
= 0;
                    flgRead 
= false;
                }
            }
        }

        public void ClearMsg()
        {
            
this.queue.Purge(); 
        }

        public int Counts
        {
            
get
            {
                
int messageCounter = 1;

                using (MessageEnumerator objMess = this.queue.GetMessageEnumerator2())
                {
                    
while (objMess.MoveNext())
                    {
                        messageCounter
++;
                    }
                }
                
return messageCounter;
            }
        }

    }
}

 

 

 

线程调用

 

MessageManager mm = new MessageManager(path, name);

            Thread thMsg = new Thread(mm.ReceiveMsgB);
            thMsg.Start();

 

 

抱歉!评论已关闭.