/************************************************************************************************** |
§案例场景:
1、服务系统负责接收外部调用请求,考虑性能问题,将此请求转发为异步方式执行。
即: 通过MQ将原始请求发送至后台消息处理服务,完成业务逻辑异步Processing。
2、消息处理过程中,存在异常失败情况,需要进行二级队列Process
3、另外考虑到消息的完整性、一致性,需要通过事务来控制消息处理。
§分析设计:
通过命令方式,将消息的发送者、接受者、调用者分别委派到场景中的各实例对象,各部分单一职责的方式来处理,另外对于并发的消息命令、特殊命令,可以指定不同的Receiver,处理不同的业务Path.
§设计实现:
1、 设计类图
MessageReceiver 执行消息处理操作(Receiver)
MessageCommand 消息命令(可将命令与消息分开)
IMessageInvoker 调用消息的处理方法(Invoker)
ThreadClient 创建消息并绑定消息调用者
2、 代码实现
//ThreadClient.cs /// <summary> /// 创建消息并绑定消息调用者 /// </summary> public class ThreadClient { public static List<Thread> _threadArray = new List<Thread>(); /// <summary> /// 队列参数设置 /// </summary> public QueueSettings QueueSettings { get; set; } public Thread Initialize(QueueSettings setting) { QueueSettings = setting; Thread thread = new Thread(new ThreadStart(this.Start)); return thread; } /// <summary> /// 启动接收消息线程 /// </summary> public void Start() { MessageReceiver receiver = new MessageReceiver(QueueSettings); Command command = new MessageCommand(receiver); IMessageInvoker invoker = new IMessageInvoker(); invoker.SetCommand(command); invoker.ExecuteCommand(); } }
IMessageInvoker.cs
/// <summary> /// 调用消息的处理方法 /// </summary> public class IMessageInvoker { private Command command; public void SetCommand(Command command) { this.command = command; } public void ExecuteCommand() { command.Execute(); } }
MessageReceiver.cs
/// <summary> /// 执行消息处理操作 /// </summary> public class MessageReceiver { /// <summary> /// 队列参数设置 /// </summary> public QueueSettings QueueSettings { get; set; } private DepositOrderInfo _depositInfo = null; public MessageReceiver(QueueSettings setting) { QueueSettings = setting; } public int Process(object data) { #region 验证基本参数 #endregion #region 填充订单明细 #endregion #region 保存订单 #endregion return Consts.Success; } }
MessageCommand.cs
/// <summary> /// 消息命令 /// </summary> public class MessageCommand : Command { MessageReceiver _receiver; string queuePath = string.Empty; public MessageCommand(MessageReceiver receiver) : base(receiver) { _receiver = receiver; } /// <summary> /// 处理消息 /// </summary> public override void Execute() { QueueSettings = _receiver.QueueSettings; while (true) { try { queuePath = QueueHelper.GetQueuePath(QueueSettings.QueueLevel); if (!String.IsNullOrEmpty(queuePath) && SDMSMQ.GetAllMessageCount(queuePath) > 0) { //接收事务型消息 //Process } Thread.Sleep(QueueSettings.Interval); } catch (Exception ex) { #region 发送到二级队列 #endregion #region 发送到监控系统 #endregion } } } }
Command.cs
/// <summary> /// 各类消息命令基类 /// </summary> public abstract class Command { protected MessageReceiver receiver; protected QueueSettings QueueSettings; public Command(MessageReceiver receiver) { this.receiver = receiver; } public abstract void Execute(); }
调用示例
public static List<Thread> _threadArray = new List<Thread>(); //一级队列线程 Thread thread1 = new ThreadClient().Initialize(new QueueSettings { QueueLevel = QueueLevel.First, Interval = Configs.GetConfig<int>("Queue1Interval") }); thread1.Start(); _threadArray.Add(thread1);
§总结:
当然,通过指定不同的命令,Process不同的消息未必要通过命令者,灵活应用其他类型模式也可以完成此过程。本例主要针对命令者给出Demo。
另外上述接受者构造器中入参QueueSettings,很多时候可作为命令中介适配器:
public class CommandAdapterInfo { List<CommandAdapterInfo> CommandList = null; public CommandAdapterInfo Create() { Init(); return this.CommandList.Find(delegate(CommandAdapterInfo _adapter) { return _adapter.CommandType == this.CommandType; }); } void Init() { this.CommandList = new List<CommandAdapterInfo>(); //提现 CommandAdapterInfo adapter = new CommandAdapterInfo(); adapter.CommandType = CommanderType.Commission; adapter.Receiver = new CommissionMessageReceiver(adapter); adapter.Command = new CommissionMessageCommand(adapter.Receiver); adapter.Invoker = new CommissionMessageInvoker(); this.CommandList.Add(adapter); //委托付款到卡 CommandAdapterInfo adapterPayToCard = new CommandAdapterInfo(); adapterPayToCard.CommandType = CommanderType.PayToCard; adapterPayToCard.Receiver = new CommissionMessageReceiver(adapterPayToCard); adapterPayToCard.Command = new CommissionMessageCommand(adapterPayToCard.Receiver); adapterPayToCard.Invoker = new CommissionMessageInvoker(); this.CommandList.Add(adapterPayToCard); } /// <summary> /// 命令类型 /// </summary> public CommanderType CommandType { get; set; } /// <summary> /// 命令接收者 /// </summary> public MessageReceiver Receiver { get; set; } /// <summary> /// 命令执行 /// </summary> public Command Command { get; set; } /// <summary> /// 调用者 /// </summary> public MessageInvoker Invoker { get; set; } }