现在的位置: 首页 > 综合 > 正文

在.Net中使用异步

2011年08月03日 ⁄ 综合 ⁄ 共 13286字 ⁄ 字号 评论关闭

在写程序的过程中,我们可能会需要对某些功能实现异步操作,比如记录调用日志等。

提到异步,我们最容易想到的就是多线程:我们可以启动另外一个线程,把一部分工作交给另外一个线程去执行,而当前线程继续去做一些更加急迫的事情。这里的“把一部分工作交给另外一个线程取执行”,是通过将要执行的函数的函数入口地址告诉另外一个线程来实现的,当新的线程有了函数的入口地址,就可以调用该函数。

我们先来看一下怎样使用C#中的Thread类来实现异步。

使用Thread类异步执行一个方法
在C#中,Thread类是常用的用来启动线程的类:

        static void Main(string[] args)
        {
            Thread thread = new Thread(new ThreadStart(myStartingMethod));
            thread.Start();
        }

        static void myStartingMethod()
        {
        }
实际上,这里创建的ThreadStart对象,封装的就是方法“myStartingMethod”的入口地址。C#中通过Delegate对象,可以方便的封装函数入口地址。

而Delegate,实际上是用来描述函数定义的,比如上面提到的ThreadStart委托,他的声明如下:

public delegate void ThreadStart();
这句话声明了一个叫做ThreadStart的委托类型,而且该声明表示:ThredStart这个委托类型,只能封装“返回值为void、没有参数”的函数的入口地址。如果我们给ThreadStart类的构造函数传递的方法不符合,则会出错:

 

        static void Main(string[] args)
        {
            // 错误 “myStartingMethod”的重载均与委托“System.Threading.ThreadStart”不匹配
            Thread thread = new Thread(new ThreadStart(myStartingMethod));
            thread.Start();
        }

        static void myStartingMethod(int a)
        {
        }

实际上,我们在使用多线程时,要异步执行的函数往往会有一些参数。比如记录日志时,我们需要告诉另外一个线程日志的信息。

异步执行一个带参数的方法
因此,Thread类除了接受ThreadStart委托,还接受另外一个带参数的委托类型ParameterizedThreadStart:

        static void Main(string[] args)
        {
            Thread thread = new Thread(new ParameterizedThreadStart(myStartingMethod));
            thread.Start(null);
        }

        static void myStartingMethod(object threadData)
        {
            // do something.
        }

ParameterizedThreadStart 委托可以用来封装返回值为void、具有一个object类型参数的函数。这样,我们就可以往另外一个函数中传递参数了——只不过,如果要传递多个参数,我们必须将参数封装一下,弄到一个object对象中去。比如下面的例子中,本来我们需要传递两个整数的,但为了符合ParameterizedThreadStart的声明,我们需要改造一下函数:

        static void Main(string[] args)
        {
            Thread thread = new Thread(new ParameterizedThreadStart(myStartingMethod));
            MyStartingMethodParameterWarpper param = new MyStartingMethodParameterWarpper();
            param.X = 1;
            param.Y = 2;
            thread.Start(param);
        }

        static void myStartingMethod(object threadData)
        {
            MyStartingMethodParameterWarpper param = (MyStartingMethodParameterWarpper)threadData;
            int value = param.X + param.Y;
            // do something
        }

        public class MyStartingMethodParameterWarpper
        {
            public int X;
            public int Y;
        }

ParameterizedThreadStart委托必须与Thread.Start(Object) 方法一起使用——委托只是用来传递函数入口,但函数的参数是通过Thread.Start方法传递的。

另外需要注意的,从这里我们可以看到,这样的使用方法并不是类型安全的,我们无法保证myStartingMethod方法的参数threadData永远都是MyStartingMethodParameterWarpper 类型,因此我们还需要加上判断;另外这样实际上也加大了程序间的沟通成本:如果有人需要异步执行myStartingMethod方法,那么他就必须知道其参数的实际类型并保证参数传递正确,而这块编译器已经无法通过编译错误的方式通知你了。

怎样获得异步执行的结果?

至此,我们只解决了传递参数的问题。
Thread类无法执行一个包含有返回值的函数。我们知道“int a = Math.Sum(1, 2)”是将Sum函数的返回结果复制给了变量a,但如果用了多线程,那么这个线程不知道将这个返回结果复制到哪里,因此接受这样的一个函数是没有意义的。于是产生了另外一个重要的问题:如果我想要知道一步执行的结果,也就是如果我的线程函数具有返回值,我应该怎样做呢?

解决的方法有很多种。

顺着刚才解决传递参数的思路,我们可能会想到:如果Thread类接受一个包含有一个object类型的输入参数和一个object类型的输出参数,不就可以了么?嗯,这个思路听起来不错。不过很不幸的是,MS并没有提供这个接口。

如此看来,我们是没法直接得到异步函数的执行结果了。

不过没关系,我们可以间接的得到——我们可以在线程函数内,把函数的返回值保存在一个约定好的地方,然后在主线程到那里去取就可以了!

因此,考虑到object对象是引用类型,我们可以返回值直接放在线程函数的参数中:        static void Main(string[] args)
        {
            Thread thread = new Thread(new ParameterizedThreadStart(myStartingMethod));
            MyStartingMethodParameterWarpper param = new MyStartingMethodParameterWarpper();
            param.X = 1;
            param.Y = 2;
            thread.Start(param);
            while(thread.ThreadState != ThreadState.Stopped)
            {
                Thread.Sleep(10);
            }
            Console.WriteLine(param.Value);
        }

        static void myStartingMethod(object threadData)
        {
            MyStartingMethodParameterWarpper param = (MyStartingMethodParameterWarpper)threadData;
            param.Value = param.X + param.Y;
        }

        public class MyStartingMethodParameterWarpper
        {
            public int X;
            public int Y;
            public int Value;
        }

 

回顾上面的封装函数参数、封装函数返回值的做法,我们的思路实际上是“将线程函数的参数、返回值封装在对象中”。而刚刚我们也提到了,ParameterizedThreadStart 委托和 Thread.Start(Object) 方法重载使得将数据传递给线程过程变得简单,但由于可以将任何对象传递给 Thread.Start(Object),因此这种方法并不是类型安全的。将数据传递给线程过程的一个更可靠的方法是将线程过程和数据字段都放入辅助对象:

 

        static void Main(string[] args)
        {
            MyClass obj = new MyClass();
            obj.X = 1;
            obj.Y = 2;
            Thread thread = new Thread(new ThreadStart(obj.myStartingMethod));
            thread.Start();
            while(thread.ThreadState != ThreadState.Stopped)
            {
                Thread.Sleep(10);
            }
            Console.WriteLine(obj.Value);
        }

        public class MyClass
        {
            public int X;
            public int Y;
            public int Value;

            public void myStartingMethod()
            ...{
                this.Value = this.X + this.Y;
            }
        }

怎样知道线程函数已经执行完毕
刚才在我们获取函数返回值时,都使用了一个While循环来等待线程函数执行完毕。但这种方式可能是不好的——假设我们启动一个线程,这个线程尝试去获得一个打开的数据库链接,而主程序需要在获得该连接后马上得到通知。看下面这段:

        static void Main(string[] args)
        ...{
            MyClass obj = new MyClass();
            Thread thread = new Thread(new ThreadStart(obj.MyStartingMethod));
            thread.Start();

            //
            if(!SomethingDone && thread.ThreadState == ThreadState.Stopped)
            ...{
                DoSomething();
            }

            // 事情1
            // .....
            if(!SomethingDone && thread.ThreadState == ThreadState.Stopped)
            ...{
                DoSomething();
            }

            // 事情2
            // .....
            if(!SomethingDone && thread.ThreadState == ThreadState.Stopped)
            ...{
                DoSomething();
            }

            // 事情3
            // .....
            if(!SomethingDone && thread.ThreadState == ThreadState.Stopped)
            ...{
                DoSomething();
            }

            // 事情4
            // .....
            if(!SomethingDone && thread.ThreadState == ThreadState.Stopped)
            ...{
                DoSomething();
            }

            // ......
        }

        static bool SomethingDone = false;
        static void DoSomething()
        {
            SomethingDone = true;
            // do something
        }

        public class MyClass
        {
            public OdbcConnection OpenConnection;

            public void MyStartingMethod()
            ...{
                this.OpenConnection = new OdbcConnection();
                // do something
                this.OpenConnection.Open();
            }
        }

上面的代码,虽然我们在每次执行一个代码段后就判断线程有没有执行完,但实际上仍然不是及时的——仍然无法保证在函数执行完后就第一时间就启动了函数DoSomething,因为每个代码段执行过程中也许消耗了很长时间,而在这段时间内另一个线程早就执行完了。

这样的主动轮询的方法,实在是比较累,而且及时性也不好。

那么,Thread类接受了一个函数入口地址,线程在启动后就会去执行这个函数。那么,假设我们给线程多传递一个函数入口地址,叫线程在执行完线程函数之后就马上执行这个函数,那我们岂不是。。。就能第一时间得知函数已经执行完了?想法很好。看我们来改造:

        static void Main(string[] args)
        {
            MyClass obj = new MyClass();
            obj.X = 1;
            obj.Y = 2;
            obj.OnMyStartingMethodCompleted = new MyStartingMethodCompleteCallback(WriteResult);
            Thread thread = new Thread(new ThreadStart(obj.MyStartingMethod));
            thread.Start();

            // wait for process exit
        }

        static void WriteResult(MyClass sender)
        {
            Console.WriteLine(sender.Value);
        }

        public delegate void MyStartingMethodCompleteCallback(MyClass sender);

        public class MyClass
        {
            public int X;
            public int Y;
            public int Value;
            public MyStartingMethodCompleteCallback OnMyStartingMethodCompleted;

            public void MyStartingMethod()
            {
                this.Value = this.X + this.Y;

                // 函数已经执行完了,调用另外一个函数。
                this.OnMyStartingMethodCompleted(this);
            }
        }

注意线程方法MyStartingMethod的最后一句,这里实际上就是执行了委托对象OnMyStartingMethodCompleted中所封装的那个函数入。当然为此我们专门定义了一个表示方法MyStartingMethod已经执行完毕的一个委托MyStartingMethodCompleteCallback,他没有返回值,只有一个参数就是方法MyStartingMethod所属的对象。

当然,这里的通知,我们也可以使用Event来实现。不过event的实现方法偶就不写了,,今天写的好累。剩下的事情,就留给大家自己搞吧。

下篇:

在上一篇文章中,我们探讨了使用Thread类实现异步的方法。

在整个过程中,可以发现Delegate这个东西出现了很多次。而仔细研究Delegate,我们发现每一个Delegate类型都自动产生了Invoke、BeginInvoke、EndInvoke等方法。而BeginInvoke、EndInvoke这两个方法,我们马上就可以猜到这是用来实现异步的~~

那么我们现在就看一下怎样使用委托来实现异步。

Delegate的BeginInvoke、EndInvoke两个方法,是编译器自动生成的,专门用来实现异步,这里是MSDN中关于这两个方法的说明:

异步委托提供以异步方式调用同步方法的能力。当同步调用一个委托时,“Invoke”方法直接对当前线程调用目标方法。如果编译器支持异步委托,则它将生成“Invoke”方法以及“BeginInvoke”和“EndInvoke”方法。如果调用“BeginInvoke”方法,则公共语言运行库 (CLR) 将对请求进行排队并立即返回到调用方。将对来自线程池的线程调用该目标方法。提交请求的原始线程自由地继续与目标方法并行执行,该目标方法是对线程池线程运行的。如果在对“BeginInvoke”方法的调用中指定了回调方法,则当目标方法返回时将调用该回调方法。在回调方法中,“EndInvoke”方法获取返回值和所有输入/输出参数。如果在调用“BeginInvoke”时未指定任何回调方法,则可以从调用“BeginInvoke”的线程中调用“EndInvoke”。

其中,BeginInvoke用来启动异步,与Thread类不同的是这里的异步使用CLR管理的。BeginInvoke方法的最后两个参数总是一个AsyncCallback委托对象和一个object类型,其中AsyncCallback委托就是当异步执行完成时将要被调用的函数入口,也就是上一篇中用来实现“在异步完成时通知我”这个功能的。而最后一个object类型,则是用来传递参数的,其实与上一篇中ParameterizedThreadStart委托的参数是类似的——不过他们还是有着明显的区别:使用ParameterizedThreadStart委托时永远只能接受一个object类型的参数,因此如果原本要异步执行的函数具有多个参数,必须进行封装;而使用BeginInvoke方法则不同,编译器生成的BeginInvoke方法前面几个参数(除了最后两个)的类型跟声明委托时的参数个数和类型完全相同,这样就不必再封装参数了,最后一个object参数只是一个补充的参数,一般情况下是不需要的:        
private void DoMain(string cmd, string[] args)
        {
            SumDelegate handle = new SumDelegate(this.Sum);
            IAsyncResult ar = handle.BeginInvoke(1, 2, null, null);
        }

        public delegate int SumDelegate(int x, int y);

        public int Sum(int x, int y)
        {
            return x + y;
        }

我们可以看到,在调用BeginInvoke的时候,方法的后面两个参数就是对应的AsyncCallback和object参数,这里因为我们没有用到这个回调和参数,就都传递了null;而BeginInvoke的前面两个方法,就对应的是Sum函数的两个参数x和y。因此,这个BeginInvoke方法还在代码编译的时候就帮我们检查了函数的输入参数个数以及类型。

当使用Thread类时,我们可以通过判断Thread类的ThreadStatus来判断线程是否已经执行结束。而如果用Delegate.BeginInvoke方法,我们则需要根据其返回的一个IAsyncResult对象的IsCompleted属性来获取“异步操作是否已完成的指示”:当这个属性变成True时,就表示异步已经执行结束:
        private void DoMain(string cmd, string[] args)
        {
            SumDelegate handle = new SumDelegate(this.Sum);
            IAsyncResult ar = handle.BeginInvoke(1, 2, null, null);
            while(!ar.IsCompleted)
            {
                Thread.Sleep(10);
            }
            // 异步已经执行完毕
        }

        public delegate int SumDelegate(int x, int y);

        public int Sum(int x, int y)
        {
            return x + y;
        }

当然,前面我们提到,BeginInvoke方法总是会接收一个AsyncCallback类型的委托,当异步执行完毕后,CLR就会自动调用这个委托封装的函数。因此,我们还可以通过这个委托来接受异步已经完成的通知:         private void DoMain(string cmd, string[] args)
        {
            SumDelegate handle = new SumDelegate(this.Sum);
            AsyncCallback callback = new AsyncCallback(this.OnSumCompleted);
            IAsyncResult ar = handle.BeginInvoke(1, 2, callback, null);
        }

        public delegate int SumDelegate(int x, int y);

        public int Sum(int x, int y)
        {
            return x + y;
        }

        public void OnSumCompleted(IAsyncResult ar)
        {
            // 异步已经执行完毕
            Debug.Assert(ar.IsCompleted);
        }
注意这里,当向BeginInvoke传入的AsyncCallback被执行时,IAsyncResult对象的IsCompleted属性一定是True。另外,BeginInvoke方法传递的最后一个object参数,实际上就是保存在了IAsyncResult的AsyncState属性中。

上面已经提到了两种等待异步调用执行完毕的方法:主动轮询 和 异步执行完毕时执行回调方法。除了这两种方法,我们还可以通过EndInvoke方法来直接阻塞线程(并不是每次都会阻塞,这个我们下面再讲)直到异步执行完成:         private void DoMain(string cmd, string[] args)
        {
            SumDelegate handle = new SumDelegate(this.Sum);
            AsyncCallback callback = new AsyncCallback(this.OnSumCompleted);
            IAsyncResult ar = handle.BeginInvoke(1, 2, null, null);
            int value = handle.EndInvoke(ar);
            Debug.Assert(value == 3);
        }

        public delegate int SumDelegate(int x, int y);

        public int Sum(int x, int y)
        {
            return x + y;
        }
当调用EndInvoke时,必须把BeginInvoke返回的IAsyncResult对象作为参数传递,这样EndInvoke才可以通过IAsyncResult对象得知要等待哪个方法异步执行完毕。因为在BeginInvoke返回的IAsyncResult中,属性AsyncWaitHandle指示了用于等待异步执行完毕的一个句柄。如果你调用了很多次BeginInvoke,就会启动很多个异步任务,每次调用返回的IAsyncResult就会对应的保存了不同的句柄。另外,这里可以看到,EndInvoke方法的返回结果,实际上就是我们在定义SumDelegate委托时声明的返回值类型,这个也是编译器自动帮我们生成的。

那么,我们刚才提到EndInvoke方法“并不是每次都会阻塞”。为什么呢?原因很简单:在EndInvoke方法内部,首先会判断IAsyncResult.IsCompleted属性,如果为True,则直接返回执行结果,否则调用AsyncWaitHandle这个句柄的WaitOne方法,这个方法“阻止当前线程,直到当前的 WaitHandle 收到异步调用已经结束的信号”,然后返回执行结果。

因此,与之对应的,我们还有另外一个方法来等待异步执行结束,那就是我们直接访问AsyncWaitHandle:
        private void DoMain(string cmd, string[] args)
        {
            SumDelegate handle = new SumDelegate(this.Sum);
            AsyncCallback callback = new AsyncCallback(this.OnSumCompleted);
            IAsyncResult ar = handle.BeginInvoke(1, 2, null, null);
            if(!ar.IsCompleted)
            {
                ar.AsyncWaitHandle.WaitOne();
            }
            // 异步调用已结束。
            Debug.Assert(ar.IsCompleted);
        }

        public delegate int SumDelegate(int x, int y);

        public int Sum(int x, int y)
        {
            return x + y;
        }

实际上,这个方式跟EndInvoke是完全相同的。

这下我们应该明白刚才所说的“并不是每次都会阻塞”了吧?没错:当ar.IsCompleted为True时,就会直接返回函数执行结果,否则才会调用WaitHandle的WaitOne来阻塞线程。

通过Delegate对象,我们可以使得我们的类更方便的支持异步方法。就好像刚才的类里面,我们有个Sum方法,然后通过定义一个可以接受这个函数的Delegate,然后用户就可以使用这个Delegate、AsyncCallback、IAsyncResult等对象来实现异步了。

那么我们可不可以为客户封装的更简单一点呢?就好像FileStream类,就有Read、BeginRead、EndRead三个方法,非常简单好用。很明显的,FileStream对象是封装了对Delegate对象的BeginInvoke、EndInvoke方法的调用。那么我们怎样去实现这样的效果呢?

下面,我们利用实现一个支持异步调用的一个类,这个类有个用于同步执行的Sum函数,和一个异步执行的BeginSum、EndSum函数:

    public class MyClass1
    {
        private delegate int SumDelegate(int a, int b);

        private SumDelegate _sumHandler;

        public MyClass1()
        {
            this._sumHandler = new SumDelegate(this.Sum);
        }

        public int Sum(int a, int b)
        {
            return a + b;
        }

        public IAsyncResult BeginSum(int a, int b, AsyncCallback callback, object stateObject)
        {
            return this._sumHandler.BeginInvoke(a, b, callback, stateObject);
        }

        public int EndSum(IAsyncResult asyncResult)
        {
            return this._sumHandler.EndInvoke(asyncResult);
        }
    }

注意这个类的内部,声明了一个私有的委托类型“SumDelegate”,以及一个类型为SumDelegate的私有变量。我们把对这个委托的BeginInvoke、EndInvoke的调用,分别封装在了BeginSum、EndSum中。这样,用户在异步调用Sum方法时,就不用为了封装Sum函数而声明一个新的委托了。

本篇文章来源于 精品文章收藏网-IT技术宝藏 原文链接:http://www.5i-net.cn/article/html/2008-04/1313p2.html

抱歉!评论已关闭.