此片文章得到微软并行计算平台的Cristina Manu的热心帮助和大力支持,在此表示感谢。

Cristina Manu is SDET in Microsoft, working at Parallel Computing Platform, the author of paper "WORKFLOW AND PARALLELEXTENSIONS IN .NET FRAMEWORK 4".

上面我们讲到要做一个定制化的移动多并发的工作流活动,实现对微软并行架构的核心组件System.Threading.Tasks.Task的封装,用户只需要通过拖拽这个工作流活动实例到Parallel Activity,绑定输入输出参数和要执行的任务,就可以实现业务流程的并行处理。我们给将要构造的活动命名为AsyncParallelActivity,那么如何实现呢?

关于这个系列的基础技术WF4和微软并行计算的方案我们在此不做详细说明,如果需要我们会后续补上,因为介绍它们暂时不是我们这个系列的重点。

设计这样一个组件,我们要处理以下问题。

1)由于输入参数的不确定性(设计者现在不可能知道用户的业务数据模型,我们必须用泛型来解决输入数据多样性问题。),因此该组建必须支持泛型。

2)用户处理逻辑必须封装,我们的组件不能支持任意类型的处理函数,因为用户处理逻辑必然与输入参数类型相关联。

  (1)我们先实现无数据返回的业务封装,在此我们可以使用.net framework 2.0中的Action<T>泛型委托,来看一下它的函数签名

     public delegate void Action<in T>(T obj)

    返回类型为void,符合我们的需求

  (2) 有返回数据类型的业务逻辑的封装我们下面再解决。

那么对于第二个系列里面的代码,我们改写为:

/// <summary>
/// wrapper class for System.Threading.Tasks.tASK, extended from AsyncCodeActivity
/// </summary>
/// <typeparam name="T"></typeparam>
public class AsyncParallelActivity<T> : AsyncCodeActivity
{
/// <summary>
/// Input argument for business object with type of T
/// </summary>
[RequiredArgument]
public InOutArgument<T> Data
{
get;
set;
}

/// <summary>
/// Input argument for business action, as Action<T>
/// </summary>
[RequiredArgument]
public InArgument<Action<T>> Function
{
get;
set;
}

protected override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
{
var data
= Data.Get(context);
var function
= Function.Get(context);
if (data == null) throw new ArgumentNullException("Data");
if (function == null) throw new ArgumentException("Function");
// use Task to wrap the action
Task worker = Task.Factory.StartNew((o) =>
{
function(data);
}, state);
worker.ContinueWith(task
=> callback(task));
return worker;
}

protected override void EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
{

}

我们来测试一下我们的基于并行计算的异步并发工作流活动。

(1)首先定义一个OrderService,OrderService 封装了用户业务逻辑,它有两个方法,分别处理Order信息拷贝到Part和Vehicle数据库,这两个操作分别需要5秒。

如果串行处理,总共需要10秒。并行则总时间为5秒(不考虑其他开销)。

public class OrderService
{
public void DoPartReplication(Order order)
{
order.Id
= Guid.NewGuid();
Thread.Sleep(
5000);

}

public void DoVehicleReplication(Order order)
{
order.Customer
= "Customer 123";
Thread.Sleep(
5000);
}
}

下面我们定义一个新的简化的订单处理流程。

(1)首先我们定义一个新的Workflow,命名为Workflow1

 在这个新的workflow里面拖拽一个FlowChart,然后在FlowChart这个根活动里面定义两个变量。OrderInProcess和OrderService,

分别为Order和OrderService的实例,如下图所示。

(2)双击Parallel,添加AsyncParallelActivity实例(添加前请先编译项目,新的工作流活动就会出现在Toolbox 的最上面)

我们拖拽两个AsyncParallelActivity,设定泛型参数为Order类型。

然后设置这两个实例的输入参数(微软工作流里面还是仅支持VB.net,对与同我一样只熟悉C#的有点小杯具)

Data=OrderInProcess

Function=New Action(Of Order)(AddressOf OrderService.DoPartReplication)

Data=OrderInProcess

Function=New Action(Of Order)(AddressOf OrderService.DoVehicleReplication),

然后修改main里面的代码。

static void Main(string[] args)
{
//InProcess Call workflow
DateTime dtStart = DateTime.Now;
Console.WriteLine(
"workflow start time {0}", dtStart);
WorkflowInvoker.Invoke(
new Workflow1());
DateTime dtEnd
= DateTime.Now;
Console.WriteLine(
"workflow end time {0}", dtEnd);
Console.WriteLine(
"total time elapsed {0}", (dtEnd - dtStart).Seconds);
Console.Read();

}

来看执行结果(为了比较性能,请执行编译后的exe文件,不要在Visual Studio里面运行原代码)

可以看到两个业务流程(分别修改OrderInprocess的Id和客户信息)都得到了执行,而且总共时间只有5秒,并发成功!!

下一章节我们将完善现在这个例子,并且最终抛弃微软的Parallel Activity(因为它是个假的并行工作流活动!)

-胡以谦 于 2011.5.28 

作者: 胡以谦 发表于 2011-05-28 13:57 原文链接

推荐.NET配套的通用数据层ORM框架:CYQ.Data 通用数据层框架