简单实现C#异步操作
在.net4.0以后异步操作,并行计算变得异常简单,但是由于公司项目开发基于.net3.5所以无法用到4.0的并行计算以及Task等异步编程。因此,为了以后更方便的进行异步方式的开发,我封装实现了异步编程框架,通过BeginInvoke、EndInvoke的方式实现异步编程。
一、框架结构
整个框架包括四个部分
1、基类抽象Opeartor
我把每个异步执行过程称为一个Operate,因此需要一个Opeartor去执行
2、FuncAsync
异步的Func
3、ActionAsync
异步的Action
4、Asynchorus
对ActionAsync和FuncAsync的封装
Operator
Operator是一个抽象类,实现了IOperationAsync和IContinueWithAsync两个接口。
IOperationAsync实现了异步操作,IContinueWithAsync实现了类似于Task的ContinueWith方法,在当前异步操作完成后继续进行的操作
IOperationAsync接口详解
publicinterfaceIOperationAsync { IAsyncResultInvoke(); voidWait(); voidCompletedCallBack(IAsyncResultar); voidCatchException(Exceptionexception); }
- Invoke():异步方法的调用
- Wait():等待异步操作执行
- CompletedCallBack():操作完成回调
- CatchException():抓取异常
IContinueWithAsync接口详情
publicinterfaceIContinueWithAsync { OperatorPrevious{get;set;} OperatorNext{get;set;} OperatorContinueWithAsync(Actionaction); OperatorContinueWithAsync<TParameter>(Action<TParameter>action,TParameterparameter); }
Previous:前一个操作
Next:下一个操作
ContinueWithAsync():异步继续操作
publicabstractclassOperator:IOperationAsync,IContinueWithAsync { publicIAsyncResultMiddle; publicreadonlystringId; publicExceptionException{get;privateset;} publicOperatorPrevious{get;set;} publicOperatorNext{get;set;} protectedOperator() { Id=Guid.NewGuid().ToString(); } publicabstractIAsyncResultInvoke(); protectedvoidSetAsyncResult(IAsyncResultresult) { this.Middle=result; } publicvirtualvoidWait() { if(!Middle.IsCompleted)Middle.AsyncWaitHandle.WaitOne(); } publicvirtualvoidCompletedCallBack(IAsyncResultar) { } publicvoidCatchException(Exceptionexception) { this.Exception=exception; } protectedOperatorContinueAsync() { if(Next!=null)Next.Invoke(); returnNext; } publicvirtualOperatorContinueWithAsync(Actionaction) { Next=newActionAsync(action); Next.Previous=this; returnNext; } publicvirtualOperatorContinueWithAsync<TParameter>(Action<TParameter>action,TParameterparameter) { Next=newActionAsync<TParameter>(action,parameter); Next.Previous=this; returnNext; } publicvirtualOperatorContinueWithAsync<TResult>(Func<TResult>func) { Next=newFuncAsync<TResult>(); Next.Previous=this; returnNext; } publicvirtualOperatorContinueWithAsync<TParameter,TResult>(Func<TParameter,TResult>func, TParameterparameter) { Next=newFuncAsync<TParameter,TResult>(func,parameter); Next.Previous=this; returnNext; } }
无返回异步操作
ActionAsync
publicclassActionAsync:Operator { privatereadonlyAction_action; protectedActionAsync() { } publicActionAsync(Actionaction) :this() { this._action=action; } publicoverrideIAsyncResultInvoke() { varmiddle=_action.BeginInvoke(CompletedCallBack,null); SetAsyncResult(middle); returnmiddle; } publicoverridevoidCompletedCallBack(IAsyncResultar) { try { _action.EndInvoke(ar); } catch(Exceptionexception) { this.CatchException(exception); } ContinueAsync(); } } publicclassActionAsync<T>:ActionAsync { publicTResult; privatereadonlyAction<T>_action1; protectedreadonlyTParameter1; publicActionAsync() { } publicActionAsync(Tparameter) { this.Parameter1=parameter; } publicActionAsync(Action<T>action,Tparameter) { this._action1=action; this.Parameter1=parameter; } publicoverrideIAsyncResultInvoke() { varresult=_action1.BeginInvoke(Parameter1,CompletedCallBack,null); SetAsyncResult(result); returnresult; } publicoverridevoidCompletedCallBack(IAsyncResultar) { try { _action1.EndInvoke(ar); } catch(Exceptionexception) { this.CatchException(exception); } ContinueAsync(); } }
有返回异步
FuncAsync实现了IFuncOperationAsync接口
IFuncOperationAsync
publicinterfaceIFuncOperationAsync<T> { voidSetResult(Tresult); TGetResult(); }
- SetResult(Tresult):异步操作完成设置返回值
- GetResult():获取返回值
1)、FuncAsync
publicclassFuncAsync<TResult>:Operator,IFuncOperationAsync<TResult> { privateTResult_result; publicTResultResult { get { if(!Middle.IsCompleted||_result==null) { _result=GetResult(); } return_result; } } privatereadonlyFunc<TResult>_func1; publicFuncAsync() { } publicFuncAsync(Func<TResult>func) { this._func1=func; } publicoverrideIAsyncResultInvoke() { varresult=_func1.BeginInvoke(CompletedCallBack,null); SetAsyncResult(result); returnresult; } publicoverridevoidCompletedCallBack(IAsyncResultar) { try { varresult=_func1.EndInvoke(ar); SetResult(result); } catch(Exceptionexception) { this.CatchException(exception); SetResult(default(TResult)); } ContinueAsync(); } publicvirtualTResultGetResult() { Wait(); returnthis._result; } publicvoidSetResult(TResultresult) { _result=result; } } publicclassFuncAsync<T1,TResult>:FuncAsync<TResult> { protectedreadonlyT1Parameter1; privatereadonlyFunc<T1,TResult>_func2; publicFuncAsync(Func<T1,TResult>action,T1parameter1) :this(parameter1) { this._func2=action; } protectedFuncAsync(T1parameter1) :base() { this.Parameter1=parameter1; } publicoverrideIAsyncResultInvoke() { varresult=_func2.BeginInvoke(Parameter1,CompletedCallBack,null); SetAsyncResult(result); returnresult; } publicoverridevoidCompletedCallBack(IAsyncResultar) { try { varresult=_func2.EndInvoke(ar); SetResult(result); } catch(Exceptionexception) { CatchException(exception); SetResult(default(TResult)); } ContinueAsync(); } }
Asynchronous异步操作封装
ActionAsync和FuncAsync为异步操作打下了基础,接下来最重要的工作就是通过这两个类执行我们的异步操作,为此我封装了一个异步操作类
主要封装了以下几个部分:
- WaitAll(IEnumerable<Operator>operations):等待所有操作执行完毕
- WaitAny(IEnumerable<Operator>operations):等待任意操作执行完毕
- ActionAsync
- FuncAsync
- ContinueWithAction
- ContinueWithFunc
后面四个包含若干个重载,这里只是笼统的代表一个类型的方法
WaitAll
publicstaticvoidWaitAll(IEnumerable<Operator>operations) { foreach(var@operatorinoperations) { @operator.Wait(); } }
WaitAny
publicstaticvoidWaitAny(IEnumerable<Operator>operations) { while(operations.All(o=>!o.Middle.IsCompleted)) Thread.Sleep(100); }
等待时间可以自定义
ActionInvoke
publicstaticOperatorInvoke(Actionaction) { Operatoroperation=newActionAsync(action); operation.Invoke(); returnoperation; } publicstaticOperatorInvoke<T>(Action<T>action,Tparameter) { Operatoroperation=newActionAsync<T>(action,parameter); operation.Invoke(); returnoperation; } publicstaticOperatorInvoke<T1,T2>(Action<T1,T2>action,T1parameter1,T2parameter2) { Operatoroperation=newActionAsync<T1,T2>(action,parameter1,parameter2); operation.Invoke(); returnoperation; }
FuncInvoke
publicstaticOperatorInvoke<TResult>(Func<TResult>func) { Operatoroperation=newFuncAsync<TResult>(func); operation.Invoke(); returnoperation; } publicstaticOperatorInvoke<TParameter,TResult>(Func<TParameter,TResult>func,TParameterparameter) { TParameterparam=parameter; Operatoroperation=newFuncAsync<TParameter,TResult>(func,param); operation.Invoke(); returnoperation; } publicstaticOperatorInvoke<T1,T2,TResult>(Func<T1,T2,TResult>func,T1parameter1,T2parameter2) { Operatoroperation=newFuncAsync<T1,T2,TResult>(func,parameter1,parameter2); operation.Invoke(); returnoperation; }
ContinueWithAction
publicstaticOperatorContinueWithAsync(IEnumerable<Operator>operators,Actionaction) { returnInvoke(WaitAll,operators) .ContinueWithAsync(action); } publicstaticOperatorContinueWithAsync<TParameter>(IEnumerable<Operator>operators,Action<TParameter>action,TParameterparameter) { returnInvoke(WaitAll,operators) .ContinueWithAsync(action,parameter); }
ContinueWithFunc
publicstaticOperatorContinueWithAsync<TResult>(IEnumerable<Operator>operators,Func<TResult>func) { returnInvoke(WaitAll,operators) .ContinueWithAsync(func); } publicstaticOperatorContinueWithAsync<TParameter,TResult>(IEnumerable<Operator>operators, Func<TParameter,TResult>func,TParameterparameter) { returnInvoke(WaitAll,operators) .ContinueWithAsync(func,parameter); }
这里有个bug当调用ContinueWithAsync后无法调用Wait等待,本来Wait需要从前往后等待每个异步操作,但是测试了下不符合预期结果。不过理论上来说应该无需这样操作,ContinueWithAsync只是为了当上一个异步操作执行完毕时继续执行的异步操作,若要等待,那不如两个操作放到一起,最后再等待依然可以实现。
前面的都是单步异步操作的调用,若需要对某集合进行某个方法的异步操作,可以foreach遍历
publicvoidForeachAsync(IEnumerbale<string>parameters) { foreach(stringpinparameters) { Asynchronous.Invoke(Tast,p); } } publicvoidTest(stringparameter) { //TODO:做一些事 }
每次都需要去手写foreach,比较麻烦,因此实现类似于PLinq的并行计算方法实在有必要,不过有一点差别,PLinq是采用多核CPU进行并行计算,而我封装的仅仅遍历集合进行异步操作而已
ForeachAction
publicstaticIEnumerable<Operator>Foreach<TParameter>(IEnumerable<TParameter>items,Action<TParameter>action) { returnitems.Select(t=>Invoke(action,t)).ToList(); }
ForeachFunc
publicstaticIEnumerable<Operator>Foreach<TParameter,TResult>(IEnumerable<TParameter>items,Func<TParameter,TResult>func) { returnitems.Select(parameter=>Invoke(func,parameter)).ToList(); }
如何使用
无返回值异步方法调用
publicvoidDoSomeThing() { //TODO: }
通过Asynchronous.Invoke(DoSomeThing)执行
publicvoidDoSomeThing(stringparameter) { //TODO: }
通过Asynchronous.Invoke(DoSomeThing,parameter)执行
有返回值异步方法调用
publicstringDoSomeThing() { //TODO: }
通过Asynchronous.Invoke(()=>DoSomeThing())执行
publicstringDoSomeThing(stringparameter) { //TODO: }
通过Asynchronous.Invoke(()=>DoSomeThing(parameter))执行,或者也可以传入参数通过Asynchronous.Invoke(p=>DoSomeThing(p),parameter)
无返回值Foreach
publicvoidTest { int[]parameters={1,2,3,4,5}; Asynchronous.Foreach(parameters,Console.WriteLine); }
有返回值Foreach
publicvoidTest { int[]parameters={1,2,3,4,5}; varoperators=Asynchronous.Foreach(parameters,p=>p*2); Asynchrous.WaitAll(operators); Asynchronous.Foreach(operators.Cast<FuncAsync<int,int>>(), p=>Console.WriteLine(p.Result)); }
首先将集合每个值扩大2倍,然后输出
异步执行完再执行
publicvoidTest { int[]parameters={1,2,3,4,5}; varoperators=Asynchronous.Foreach(parameters,p=>p*2); Asynchrous.ContinueWithAsync(operators,Console.WriteLine,"执行完成"); }
每次执行完继续执行
可能有时候我们需要遍历一个集合,每个元素处理完成后我们需要输出XX处理完成
publicvoidTest { int[]parameters={1,2,3,4,5}; varoperators=Asynchronous.Foreach(parameters,p=>p*2); Asynchronous.Foreach(operators,o=>{ o.ContinueWithAsync(()={ //每个元素执行完时执行 if(o.Exception!=null) { //之前执行时产生未处理的异常,这里可以捕获到 } }); }); }
可以实现链式异步操作
publicvoidChain() { Asynchronous.Invoke(Console.WriteLine,1) .ContinueWithAsync(Console.WriteLine,2) .ContinueWithAsync(Console.WriteLine,3) }
这样会按步骤输出1,2,3
结束语
以上只是列出了部分重载方法,其他重载方法无非就是加参数,本质实际是一样的。
希望对大家的学习有所帮助,在这祝大家新年快乐,新的一年大家一起努力。