现在的位置: 首页 > 综合 > 正文

线程杂谈3

2013年08月14日 ⁄ 综合 ⁄ 共 7327字 ⁄ 字号 评论关闭

(************************************************

(* Subject:   线程杂谈3

(* Author: linzhenqun()

(* Time:      2006-3-25

(* Blog:      http://blog.csdn.net/linzhengqun

(* E-mail: linzhengqun@163.com

(************************************************

 

前言

我在写完线程杂谈2之后,本来不再打算写关于线程的文章了,但由于项目中时时要与线程打交道,所以于实践中又领悟了一些技巧,于是又有了此篇。

 

学习Windows的消息循环

    我在做Call Center项目时,负责一个邮件服务器程序,座席端软件可以通过该邮件服务器收取邮件,也可以通过它发送邮件。发送邮件的时候我开始的设计是这样的,一个座席发送一封邮件过来,邮件服务器收到这封邮件后即启动一个线程负责将它发送出去,但这样做是有严重的性能问题的,假设如果有十个座席同时发送邮件,则邮件服务器必须启动十个线程,如果十个座席每个人同时发送几封邮件,邮件服务器即必须启动几十个线程,显然这样做是不符合实际的。

    有没有办法解决这个瓶颈呢,正当我苦苦思索的时候,想到了Windows的消息队列和消息循环,对于每一个应用程序,Windows都为它维护一个消息队列,当由于键盘鼠标等硬件事件发生时,windows将相应的消息结构加入到应用程序的消息队列中。如果我们写过Windows的程序,就知道它的入口函数中必须有一个循环,不断地从消息队列中取出消息,然后发送至处理该消息函数中。

    这样的技术很好的解决了并发性带来的问题,使得每个动作都必须排队,那么发送邮件其实也可以用这样的技术来解决:程序运行的过程中,有一个负责发送邮件的工作线程,它一直循环从发送队列中取出发送邮件的简要信息,程序根据这个信息从数据库中取出邮件发送出去。不过这里得注意线程同步的问题,有可能在将发送信息加入队列的同时,线程正在取队列,所以要用一个临界区保证不会发生竞争条件。

    下面这个方法的示例代码:

unit Unit1;

interface

uses
  Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
  Dialogs, StdCtrls;

type
  //模拟邮件在数据库的信息
  PSendRec =^TSendRec;
  TSendRec = record
    SendID: string;
  end;
  TSendEvent = procedure(SendRec: PSendRec) of object;
  TSendTread = class(TThread)
  private
    FLock: TRTLCriticalSection;  
//声明一个临界区变量
    FSendQue: TList; 
//发送结构的队列
    FSendRec: PSendRec;
    FSendEvent: TSendEvent;
    procedure Lock;
    procedure UnLock;
    procedure ClearQueue; 
//清除队列
    procedure SendAction(SendRec: PSendRec); 
//模拟发送的动作
  protected
    procedure Execute; override;
    procedure DoSend;
  public
    constructor Create(Suspend: Boolean);
    destructor Destroy; override;
    
// 将一个发送结构加入队列
    procedure AddToQueue(SendRec: PSendRec);
    
// 从队列中取出一个发送结构
    function PopFromQueue: PSendRec;
    property SendEvent: TSendEvent read FSendEvent write FSendEvent;
  end;

  TForm1 = class(TForm)
    btnSend: TButton;
    edtSendID: TEdit;
    edtSendResult: TEdit;
    procedure FormCreate(Sender: TObject);
    procedure FormDestroy(Sender: TObject);
    procedure btnSendClick(Sender: TObject);
  private
    { Private declarations }
    SendThread: TSendTread;
    procedure OnSend(SendRec: PSendRec);
  public
    
{ Public declarations }
  end;

var
  Form1: TForm1;

implementation

{$R *.dfm}

{ TSendTread }

procedure TSendTread.AddToQueue(SendRec: PSendRec);
begin
  Lock;
  try
    FSendQue.Add(SendRec);
  finally
    UnLock;
  end;
end;

procedure TSendTread.ClearQueue;
var
  i: Integer;
begin
  for i := 0 to FSendQue.Count - 1 do
    Dispose(FSendQue[i]);
  FSendQue.Clear;
end;

constructor TSendTread.Create(Suspend: Boolean);
begin
  inherited Create(Suspend);
  InitializeCriticalSection(FLock);
  FSendQue := TList.Create;
end;

destructor TSendTread.Destroy;
begin
  //下面的技术在以前的文章已经说过了
  Terminate;
  WaitFor;

  ClearQueue;
  FSendQue.Free;
  DeleteCriticalSection(FLock);
  inherited;
end;

procedure TSendTread.DoSend;
begin
  if Assigned(FSendEvent) then
    FSendEvent(FSendRec);
end;

procedure TSendTread.Execute;
var
  SendRec: PSendRec;
begin
  while not Terminated do
  begin
    SendRec := PopFromQueue;
    if SendRec <> nil then
      SendAction(SendRec);
    Sleep(50); //稍作休息,避免占用CPU过多
  end;
end;

procedure TSendTread.Lock;
begin
  EnterCriticalSection(FLock);
end;

function TSendTread.PopFromQueue: PSendRec;
begin
  Result := nil;
  Lock;
  try
    if FSendQue.Count > 0 then
    begin
      Result := FSendQue[
0];
      FSendQue.Delete(
0);
    end;
  finally
    UnLock;
  end;
end;

procedure TSendTread.SendAction(SendRec: PSendRec);
begin
  FSendRec := SendRec;
  Synchronize(DoSend);
  Dispose(SendRec);
  Sleep(500);
end;

procedure TSendTread.UnLock;
begin
  LeaveCriticalSection(FLock);
end;

procedure TForm1.FormCreate(Sender: TObject);
begin
  edtSendID.Text := '0';
  SendThread := TSendTread.Create(True);
  SendThread.SendEvent := OnSend;
  SendThread.Resume;
end;

procedure TForm1.FormDestroy(Sender: TObject);
begin
  SendThread.Free;
end;

procedure TForm1.OnSend(SendRec: PSendRec);
begin
  //接收事件,显示已经处理完的ID
  edtSendResult.Text := SendRec^.SendID;
end;

procedure TForm1.btnSendClick(Sender: TObject);
var
  SendRec: PSendRec;
  i: Integer;
begin
  New(SendRec);
  SendRec^.SendID := edtSendID.Text;
  //重生成一个ID,递增
  i := StrToInt(edtSendID.Text);
  Inc(i);
  edtSendID.Text := IntToStr(i);
  SendThread.AddToQueue(SendRec);
end;

end.

代码中用SendRec模拟发送的结构,里面只是一个简单的SendID,线程类中有AddToQueuePopFromQueue两个方法,分别是将一个结构加进队列尾和从队列头取出一个结构,这两个方法用LockUnLock将操作锁起来,成为一个原子操作防止竞争条件的出现。而Execute的操作就是不断的循环从队列取结构,如果队列不为空,将取出的结构传递给SendAction方法,我们可以假定这个方法就是发送邮件的方法,为了显示效果,我特别在该方法中向外发布一个事件,以该结构为参数,回调完事件后,即可将该结构的内存清除。

再看主窗体,程序一开始就创建了线程类,用按钮模拟发送邮件的操作,快速连续的按BtnSend,将产生一个个发送结构,并赋给一个唯一的ID,然后进加线程的发送队列中。这时候线程检测到队列中有数据,马上处理并向主界面发送事件,主界面在事件中显示了该结构的ID

这个过程很有趣,无论我们怎样疯狂的点击按钮,edtSendResult总是有条不紊地显示结构ID

 

用并发的队列提高效率

    有经验的程序也许会看出来,用上面的方法虽然可以保证程序的性能,但效率可是低了很多,如果同时有一百封邮件在队列中,假如每发送一封邮件平均用时1秒,则第一百封邮件要过一分钟钟才能被发送,这显然实时性不够。程序有时就是这样,需要在各个方面作一个权衡,好象能量守恒定律一样,如果动能增加了则势能就减少了。我们可以平衡这种极端性,在保证程序的稳定性能同时,也要提高程序的效率。

    有什么办法呢,还是要用多线程来做,假设有一个线程池类,每一个线程维护一个有限的队列,如果一个线程的队列达到最大值时,就会将结构加到另一个线程的队列中,线程池类管理线程,如果线程数不足,它会自动生成新的线程提供使用,这类似于内存页的管理技术。

    在主程序中我们只和线程池类打交道,假设这个线程池为TsendTrdPool,将每一个SendID传送进TsendTrdPool的一个方法,同时要传进一个回调函数,TsendTrdPool会将其挂到线程类中,这样界面便可以显示Send的结果了。

    声明一个MaxQueLen常量,定义发送队列最大的长度,对于线程类来说,只需要在AddToQueue中加一个限制,如果队列已经达到MaxQueLen,则增加失败,上面的线程类实现代码不必作过多的修改,只需将AddToQueue改成下面的样子:

function TSendTread.AddToQueue(SendRec: PSendRec): Boolean;
begin
  Lock;
  try
    Result := FSendQue.Count < MaxQueLen;
    if Result then
      FSendQue.Add(SendRec);
  finally
    UnLock;
  end;
end;

我们假设MaxQueLen10,接下来重点实现TsendTrdPool,且看下面的代码:

type
  ... 
  TSendTrdPool = class
  private
    FSendTrdList: TList;
    
//清除发送线程
    procedure ClearSendThreadList;
    
//创建一个新的线程类
    function CreateNewThread: TSendTread;
    function GetCount: Integer;
  public
    
//加进发送记录,并传进一个回调函数
    procedure AddSendRec(SendRec: PSendRec; ASendEvent: TSendEvent);
    constructor Create;
    destructor Destroy; override;
    property Count: Integer read GetCount;
  end;

implementation
...
{ TSendTrdPool }

procedure TSendTrdPool.AddSendRec(SendRec: PSendRec;
  ASendEvent: TSendEvent);
var
  Succ: Boolean;
  i: Integer;
  SendThread: TSendTread;
begin
  Succ := False;
  for i := 0 to FSendTrdList.Count - 1 do
  begin
    SendThread := TSendTread(FSendTrdList[i]);
    SendThread.Lock;
    try
      if SendThread.AddToQueue(SendRec) then
      begin
        Succ := True;
        SendThread.SendEvent := ASendEvent;
        Break;
      end;
    finally
      SendThread.UnLock;
    end;
  end;
  if not Succ then
  begin
    SendThread := CreateNewThread;
    SendThread.SendEvent := ASendEvent;
    SendThread.AddToQueue(SendRec);
  end;
end;

procedure TSendTrdPool.ClearSendThreadList;
var
  i: Integer;
begin
  for i := 0 to FSendTrdList.Count - 1 do
    TSendTread(FSendTrdList[i]).Free;
end;

constructor TSendTrdPool.Create;
begin
  FSendTrdList := TList.Create;
end;

function TSendTrdPool.CreateNewThread: TSendTread;
begin
  Result := TSendTread.Create(False);
  FSendTrdList.Add(Result);
end;

destructor TSendTrdPool.Destroy;
begin
  ClearSendThreadList;
  FSendTrdList.Free;
  inherited;
end;

function TSendTrdPool.GetCount: Integer;
begin
  Result := FSendTrdList.Count;
end;

end.

这个类保存一个发送线程列表,初始化时这个列表为0,当AddSendRec被调用时,它会把一个发送结构和事件尝试加进列表中的某个线程,如果加入失败,表明所有线程的发送队列均已达到最大值,此时线程池类自动增加一个新的线程,并将发送结构加进这个类中。具体可看上面的实现代码,其中有一点要注意,如果增加了的线程将不会被消毁,只有到线程池类被消毁时,所有线程才被消毁。

现在我们来看看主界面的反应,在主窗体创建时生成一个TsendTrdPool,程序结束时消毁它,界面有一个按钮,其事件代码如下:

procedure TForm1.btnSendClick(Sender: TObject);
var
  SendRec: PSendRec;
  i: Integer;
begin
  for i := 

抱歉!评论已关闭.