C# 异步任务的顺序处理

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/14630770/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-10 12:30:37  来源:igfitidea点击:

Sequential processing of asynchronous tasks

c#.net.net-4.0task-parallel-library

提问by Daniel Hilgarth

Assume the following synchronous code:

假设以下同步代码:

try
{
    Foo();
    Bar();
    Fubar();
    Console.WriteLine("All done");
}
catch(Exception e) // For illustration purposes only. Catch specific exceptions!
{
    Console.WriteLine(e);
}

Now assume all these methods have an Async counterpart and I have to use those for some reason, so simply wrapping the whole thing in a new task is not an option.
How would I achieve the same behavior?
What I mean with "same" is:

现在假设所有这些方法都有一个 Async 对应物,我出于某种原因必须使用它们,所以简单地将整个事情包装在一个新任务中不是一个选择。
我将如何实现相同的行为?
我对“相同”的意思是:

  1. Execute a handler for the exception, if one is thrown.
  2. Stop execution of the following methods, if an exception is thrown.
  1. 如果抛出异常,则执行异常处理程序。
  2. 如果抛出异常,则停止执行以下方法。

The only thing I was able to come up with is horrible:

我唯一能想到的是可怕的

var fooTask = FooAsync();
fooTask.ContinueWith(t => HandleError(t.Exception),
                     TaskContinuationOptions.OnlyOnFaulted);
fooTask.ContinueWith(
    t =>
    {
        var barTask = BarAsync();
        barTask.ContinueWith(t => HandleError(t.Exception),
                             TaskContinuationOptions.OnlyOnFaulted);
        barTask.ContinueWith(
            t =>
            {
                var fubarTask = FubarAsync();
                fubarTask.ContinueWith(t => HandleError(t.Exception),
                                       TaskContinuationOptions.OnlyOnFaulted);
                fubarTask.ContinueWith(
                    t => Console.WriteLine("All done"),
                    TaskContinuationOptions.OnlyOnRanToCompletion);
            }, 
            TaskContinuationOptions.OnlyOnRanToCompletion);
    }, 
    TaskContinuationOptions.OnlyOnRanToCompletion);

Please note:

请注意:

  • I need a solution that works with .NET 4, so async/awaitis out of the question. However, if it would work with async/awaitfeel free to show how.
  • I don't need to use the TPL. If it is impossible with the TPL another approach would be OK, maybe with Reactive Extensions?
  • 我需要一个适用于 .NET 4 的解决方案,所以这async/await是不可能的。但是,如果它可以使用,请async/await随时展示如何使用。
  • 我不需要使用 TPL。如果 TPL 不可能,另一种方法也可以,也许使用 Reactive Extensions?

采纳答案by Stephen Cleary

Here's how it would work with async:

这是它的工作方式async

try
{
    await FooAsync();
    await BarAsync();
    await FubarAsync();
    Console.WriteLine("All done");
}
catch(Exception e) // For illustration purposes only. Catch specific exceptions!
{
    Console.WriteLine(e);
}

This would work on .NET 4.0 if you installed the (prerelease) Microsoft.Bcl.Async package.

如果您安装了(预发行版)Microsoft.Bcl.Async 包,这将适用于 .NET 4.0 。



Since you're stuck on VS2010, you can use a variant of Stephen Toub's Then:

由于您坚持使用 VS2010,您可以使用Stephen Toub 的Then变体:

public static Task Then(this Task first, Func<Task> next)
{
  var tcs = new TaskCompletionSource<object>();
  first.ContinueWith(_ =>
  {
    if (first.IsFaulted) tcs.TrySetException(first.Exception.InnerExceptions);
    else if (first.IsCanceled) tcs.TrySetCanceled();
    else
    {
      try
      {
        next().ContinueWith(t =>
        {
          if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions);
          else if (t.IsCanceled) tcs.TrySetCanceled();
          else tcs.TrySetResult(null);
        }, TaskContinuationOptions.ExecuteSynchronously);
      }
      catch (Exception exc) { tcs.TrySetException(exc); }
    }
  }, TaskContinuationOptions.ExecuteSynchronously);
  return tcs.Task; 
}

You can use it as such:

你可以这样使用它:

var task = FooAsync().Then(() => BarAsync()).Then(() => FubarAsync());
task.ContinueWith(t =>
{
  if (t.IsFaulted || t.IsCanceled)
  {
    var e = t.Exception.InnerException;
    // exception handling
  }
  else
  {
    Console.WriteLine("All done");
  }
}, TaskContinuationOptions.ExcecuteSynchronously);


Using Rx, it would look like this (assuming you don't have the asyncmethods already exposed as IObservable<Unit>):

使用 Rx,它看起来像这样(假设您没有async公开的方法IObservable<Unit>):

FooAsync().ToObservable()
    .SelectMany(_ => BarAsync().ToObservable())
    .SelectMany(_ => FubarAsync().ToObservable())
    .Subscribe(_ => { Console.WriteLine("All done"); },
        e => { Console.WriteLine(e); });

I think. I'm not an Rx master, by any means. :)

我认为。无论如何,我不是 Rx 大师。:)

回答by Chris Sinclair

Now, I haven't really used the TPL much, so this is just a stab in the dark. And given what @Servy mentioned, perhaps this won't run completely asynchronously. But I figured I'd post it and if it's wayoff the mark, you can downvote me to oblivion or I can have it deleted (or we can just fix what needs fixing)

现在,我并没有真正使用 TPL,所以这只是在黑暗中的一个刺。考虑到@Servy 提到的内容,也许这不会完全异步运行。但我想我会发布它,如果它的方式没谱,你可以downvote我遗忘或者我可以把它删除(或者我们可以只解决什么需要修理)

public void RunAsync(Action onComplete, Action<Exception> errorHandler, params Action[] actions)
{
    if (actions.Length == 0)
    {
        //what to do when no actions/tasks provided?
        onComplete();
        return;
    }

    List<Task> tasks = new List<Task>(actions.Length);
    foreach(var action in actions)
    {
        Task task = new Task(action);
        task.ContinueWith(t => errorHandler(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
        tasks.Add(task);
    }

    //last task calls onComplete
    tasks[actions.Length - 1].ContinueWith(t => onComplete(), TaskContinuationOptions.OnlyOnRanToCompletion);

    //wire all tasks to execute the next one, except of course, the last task
    for (int i = 0; i <= actions.Length - 2; i++)
    {
        var nextTask = tasks[i + 1];
        tasks[i].ContinueWith(t => nextTask.Start(), TaskContinuationOptions.OnlyOnRanToCompletion);
    }

    tasks[0].Start();
}

And it would have usage like:

它的用法如下:

RunAsync(() => Console.WriteLine("All done"),
            ex => Console.WriteLine(ex),
            Foo,
            Bar,
            Fubar);

Thoughts? Downvotes? :)

想法?否决票?:)

(I definitely prefer async/await though)

(不过我绝对更喜欢 async/await)

EDIT: Based on your comments to take Func<Task>, would this be a proper implementation?

编辑:根据您的意见Func<Task>,这会是一个正确的实现吗?

public void RunAsync(Action onComplete, Action<Exception> errorHandler, params Func<Task>[] actions)
{
    if (actions.Length == 0)
    {
        //what to do when no actions/tasks provided?
        onComplete();
        return;
    }

    List<Task> tasks = new List<Task>(actions.Length);
    foreach (var action in actions)
    {
        Func<Task> nextActionFunc = action;
        Task task = new Task(() =>
        {
            var nextTask = nextActionFunc();
            nextTask.ContinueWith(t => errorHandler(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
            nextTask.Start();
        });
        tasks.Add(task);
    }

    //last task calls onComplete
    tasks[actions.Length - 1].ContinueWith(t => onComplete(), TaskContinuationOptions.OnlyOnRanToCompletion);

    //wire all tasks to execute the next one, except of course, the last task
    for (int i = 0; i <= actions.Length - 2; i++)
    {
        var nextTask = tasks[i + 1];
        tasks[i].ContinueWith(t => nextTask.Start(), TaskContinuationOptions.OnlyOnRanToCompletion);
    }

    tasks[0].Start();
}

回答by Lee

You should be able to create a method to combine two tasks, and only start the second if the first succeeds.

您应该能够创建一个方法来组合两个任务,并且只有在第一个成功时才启动第二个。

public static Task Then(this Task parent, Task next)
{
    TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
    parent.ContinueWith(pt =>
    {
        if (pt.IsFaulted)
        {
            tcs.SetException(pt.Exception.InnerException);
        }
        else
        {
            next.ContinueWith(nt =>
            {
                if (nt.IsFaulted)
                {
                    tcs.SetException(nt.Exception.InnerException);
                }
                else { tcs.SetResult(null); }
            });
            next.Start();
        }
    });
    return tcs.Task;
}

you can then chain tasks together:

然后您可以将任务链接在一起:

Task outer = FooAsync()
    .Then(BarAsync())
    .Then(FubarAsync());

outer.ContinueWith(t => {
    if(t.IsFaulted) {
        //handle exception
    }
});

If your tasks are started immediately you can just wrap them in a Func:

如果您的任务立即开始,您可以将它们包装在一个Func

public static Task Then(this Task parent, Func<Task> nextFunc)
{
    TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
    parent.ContinueWith(pt =>
    {
        if (pt.IsFaulted)
        {
            tcs.SetException(pt.Exception.InnerException);
        }
        else
        {
            Task next = nextFunc();
            next.ContinueWith(nt =>
            {
                if (nt.IsFaulted)
                {
                    tcs.SetException(nt.Exception.InnerException);
                }
                else { tcs.SetResult(null); }
            });
        }
    });
    return tcs.Task;
}

回答by Daniel Hilgarth

Just for the sake of completeness, that's how I would implement the helper method suggested by Chris Sinclair:

为了完整起见,这就是我将如何实现 Chris Sinclair 建议的辅助方法:

public void RunSequential(Action onComplete, Action<Exception> errorHandler,
                          params Func<Task>[] actions)
{
    RunSequential(onComplete, errorHandler,
                  actions.AsEnumerable().GetEnumerator());
}

public void RunSequential(Action onComplete, Action<Exception> errorHandler,
                          IEnumerator<Func<Task>> actions)
{
    if(!actions.MoveNext())
    {
        onComplete();
        return;
    }

    var task = actions.Current();
    task.ContinueWith(t => errorHandler(t.Exception),
                      TaskContinuationOptions.OnlyOnFaulted);
    task.ContinueWith(t => RunSequential(onComplete, errorHandler, actions),
                      TaskContinuationOptions.OnlyOnRanToCompletion);
}

This ensures that each subsequent task is only requested when the previous one completed successfully.
It assumes that the Func<Task>returns an already running task.

这确保了每个后续任务仅在前一个任务成功完成时才被请求。
它假设Func<Task>返回一个已经运行的任务。

回答by Servy

What you have here is essentially a ForEachAsync. You want to run each async item, sequentially, but with some error handling support. Here is one such implementation:

你在这里拥有的本质上是一个ForEachAsync. 您希望按顺序运行每个异步项,但具有一些错误处理支持。这是一个这样的实现:

public static Task ForEachAsync(IEnumerable<Func<Task>> tasks)
{
    var tcs = new TaskCompletionSource<bool>();

    Task currentTask = Task.FromResult(false);

    foreach (Func<Task> function in tasks)
    {
        currentTask.ContinueWith(t => tcs.TrySetException(t.Exception.InnerExceptions)
            , TaskContinuationOptions.OnlyOnFaulted);
        currentTask.ContinueWith(t => tcs.TrySetCanceled()
                , TaskContinuationOptions.OnlyOnCanceled);
        Task<Task> continuation = currentTask.ContinueWith(t => function()
            , TaskContinuationOptions.OnlyOnRanToCompletion);
        currentTask = continuation.Unwrap();
    }

    currentTask.ContinueWith(t => tcs.TrySetException(t.Exception.InnerExceptions)
            , TaskContinuationOptions.OnlyOnFaulted);
    currentTask.ContinueWith(t => tcs.TrySetCanceled()
            , TaskContinuationOptions.OnlyOnCanceled);
    currentTask.ContinueWith(t => tcs.TrySetResult(true)
            , TaskContinuationOptions.OnlyOnRanToCompletion);

    return tcs.Task;
}

I added in support for canceled tasks as well, just to be more general and because it took so little to do.

我还添加了对取消任务的支持,只是为了更通用,因为它需要做的事情很少。

It adds each task as a continuation of the previous task, and all along the line it ensures that any exceptions result in the final task's exception being set.

它将每个任务添加为前一个任务的延续,并且始终确保任何异常都会导致设置最终任务的异常。

Here is an example usage:

这是一个示例用法:

public static Task FooAsync()
{
    Console.WriteLine("Started Foo");
    return Task.Delay(1000)
        .ContinueWith(t => Console.WriteLine("Finished Foo"));
}

public static Task BarAsync()
{
    return Task.Factory.StartNew(() => { throw new Exception(); });
}

private static void Main(string[] args)
{
    List<Func<Task>> list = new List<Func<Task>>();

    list.Add(() => FooAsync());
    list.Add(() => FooAsync());
    list.Add(() => FooAsync());
    list.Add(() => FooAsync());
    list.Add(() => BarAsync());

    Task task = ForEachAsync(list);

    task.ContinueWith(t => Console.WriteLine(t.Exception.ToString())
        , TaskContinuationOptions.OnlyOnFaulted);
    task.ContinueWith(t => Console.WriteLine("Done!")
        , TaskContinuationOptions.OnlyOnRanToCompletion);
}