C# 如何使用等待在自定义 TaskScheduler 上运行任务?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/15428604/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to run a Task on a custom TaskScheduler using await?
提问by Stephane Delcroix
I have some methods returning Task<T>
on which I can await
at will. I'd like to have those Tasks executed on a custom TaskScheduler
instead of the default one.
我有一些方法Task<T>
可以随意返回await
。我希望在自定义TaskScheduler
而不是默认任务上执行这些任务。
var task = GetTaskAsync ();
await task;
I know I can create a new TaskFactory (new CustomScheduler ())
and do a StartNew ()
from it, but StartNew ()
takes an action and create the Task
, and I already have the Task
(returned behind the scenes by a TaskCompletionSource
)
我知道我可以创建一个新的TaskFactory (new CustomScheduler ())
并StartNew ()
从中做一个,但是StartNew ()
采取行动并创建Task
,我已经有了Task
(由 a 在幕后返回TaskCompletionSource
)
How can I specify my own TaskScheduler
for await
?
如何指定我自己的TaskScheduler
for await
?
回答by sanosdole
After the comments it looks like you want to control the scheduler on which the code after the await is run.
在评论之后,您似乎想要控制等待后运行代码的调度程序。
The compile creates a continuation from the await that runs on the current SynchronizationContext by default. So your best shot is to set up the SynchronizationContext
before calling await.
默认情况下,编译从在当前 SynchronizationContext 上运行的 await 创建一个延续。所以你最好的办法是设置SynchronizationContext
before 调用 await。
There are some ways to await a specific context. See Configure Awaitfrom Jon Skeet, especially the part about SwitchTo, for more information on how to implement something like this.
有一些方法可以等待特定的上下文。请参阅Jon Skeet 的ConfigureAwait,尤其是关于 SwitchTo 的部分,以获取有关如何实现此类内容的更多信息。
EDIT: The SwitchTo method from TaskEx has been removed, as it was too easy to misuse. See the MSDN Forumfor reasons.
编辑:来自 TaskEx 的 SwitchTo 方法已被删除,因为它太容易被误用。有关原因,请参阅MSDN 论坛。
回答by Stephen Cleary
I think what you really want is to do a Task.Run
, but with a custom scheduler. StartNew
doesn't work intuitively with asynchronous methods; Stephen Toub has a great blog post about the differences between Task.Run
and TaskFactory.StartNew
.
我认为你真正想要的是做一个Task.Run
, 但使用自定义调度程序。StartNew
不能直观地使用异步方法;Stephen Toub 有一篇很棒的博客文章,介绍了Task.Run
和之间的区别TaskFactory.StartNew
。
So, to create your own custom Run
, you can do something like this:
因此,要创建自己的 custom Run
,您可以执行以下操作:
private static readonly TaskFactory myTaskFactory = new TaskFactory(
CancellationToken.None, TaskCreationOptions.DenyChildAttach,
TaskContinuationOptions.None, new MyTaskScheduler());
private static Task RunOnMyScheduler(Func<Task> func)
{
return myTaskFactory.StartNew(func).Unwrap();
}
private static Task<T> RunOnMyScheduler<T>(Func<Task<T>> func)
{
return myTaskFactory.StartNew(func).Unwrap();
}
private static Task RunOnMyScheduler(Action func)
{
return myTaskFactory.StartNew(func);
}
private static Task<T> RunOnMyScheduler<T>(Func<T> func)
{
return myTaskFactory.StartNew(func);
}
Then you can execute synchronous orasynchronous methods on your custom scheduler.
然后,您可以在自定义调度程序上执行同步或异步方法。
回答by Adam Davidson
The TaskCompletionSource<T>.Task
is constructed without any action and the scheduler
is assigned on the first call to ContinueWith(...)
(from Asynchronous Programming with the Reactive Framework and the Task Parallel Library — Part 3).
该TaskCompletionSource<T>.Task
构造无任何动作和调度分配到第一次调用ContinueWith(...)
(从与反应框架和任务并行库异步编程-第3部分)。
Thankfully you can customize the await behavior slightly by implementing your own class deriving from INotifyCompletion
and then using it in a pattern similar to await SomeTask.ConfigureAwait(false)
to configure the scheduler that the task should start using in the OnCompleted(Action continuation)
method (from await anything;).
值得庆幸的是,您可以通过实现自己的派生类来稍微自定义 await 行为INotifyCompletion
,然后以类似于await SomeTask.ConfigureAwait(false)
配置任务应该在OnCompleted(Action continuation)
方法中开始使用的调度程序的模式使用它(来自await nothing;)。
Here is the usage:
这是用法:
TaskCompletionSource<object> source = new TaskCompletionSource<object>();
public async Task Foo() {
// Force await to schedule the task on the supplied scheduler
await SomeAsyncTask().ConfigureScheduler(scheduler);
}
public Task SomeAsyncTask() { return source.Task; }
Here is a simple implementation of ConfigureScheduler
using a Task extension method with the important part in OnCompleted
:
这是ConfigureScheduler
使用 Task 扩展方法的一个简单实现,其重要部分为OnCompleted
:
public static class TaskExtension {
public static CustomTaskAwaitable ConfigureScheduler(this Task task, TaskScheduler scheduler) {
return new CustomTaskAwaitable(task, scheduler);
}
}
public struct CustomTaskAwaitable {
CustomTaskAwaiter awaitable;
public CustomTaskAwaitable(Task task, TaskScheduler scheduler) {
awaitable = new CustomTaskAwaiter(task, scheduler);
}
public CustomTaskAwaiter GetAwaiter() { return awaitable; }
public struct CustomTaskAwaiter : INotifyCompletion {
Task task;
TaskScheduler scheduler;
public CustomTaskAwaiter(Task task, TaskScheduler scheduler) {
this.task = task;
this.scheduler = scheduler;
}
public void OnCompleted(Action continuation) {
// ContinueWith sets the scheduler to use for the continuation action
task.ContinueWith(x => continuation(), scheduler);
}
public bool IsCompleted { get { return task.IsCompleted; } }
public void GetResult() { }
}
}
Here's a working sample that will compile as a console application:
这是一个可以编译为控制台应用程序的工作示例:
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace Example {
class Program {
static TaskCompletionSource<object> source = new TaskCompletionSource<object>();
static TaskScheduler scheduler = new CustomTaskScheduler();
static void Main(string[] args) {
Console.WriteLine("Main Started");
var task = Foo();
Console.WriteLine("Main Continue ");
// Continue Foo() using CustomTaskScheduler
source.SetResult(null);
Console.WriteLine("Main Finished");
}
public static async Task Foo() {
Console.WriteLine("Foo Started");
// Force await to schedule the task on the supplied scheduler
await SomeAsyncTask().ConfigureScheduler(scheduler);
Console.WriteLine("Foo Finished");
}
public static Task SomeAsyncTask() { return source.Task; }
}
public struct CustomTaskAwaitable {
CustomTaskAwaiter awaitable;
public CustomTaskAwaitable(Task task, TaskScheduler scheduler) {
awaitable = new CustomTaskAwaiter(task, scheduler);
}
public CustomTaskAwaiter GetAwaiter() { return awaitable; }
public struct CustomTaskAwaiter : INotifyCompletion {
Task task;
TaskScheduler scheduler;
public CustomTaskAwaiter(Task task, TaskScheduler scheduler) {
this.task = task;
this.scheduler = scheduler;
}
public void OnCompleted(Action continuation) {
// ContinueWith sets the scheduler to use for the continuation action
task.ContinueWith(x => continuation(), scheduler);
}
public bool IsCompleted { get { return task.IsCompleted; } }
public void GetResult() { }
}
}
public static class TaskExtension {
public static CustomTaskAwaitable ConfigureScheduler(this Task task, TaskScheduler scheduler) {
return new CustomTaskAwaitable(task, scheduler);
}
}
public class CustomTaskScheduler : TaskScheduler {
protected override IEnumerable<Task> GetScheduledTasks() { yield break; }
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; }
protected override void QueueTask(Task task) {
TryExecuteTask(task);
}
}
}
回答by Papay
Can you fit for this method call:
你能适应这个方法调用吗:
await Task.Factory.StartNew(
() => { /* to do what you need */ },
CancellationToken.None, /* you can change as you need */
TaskCreationOptions.None, /* you can change as you need */
customScheduler);
回答by Theodor Zoulias
There is no way to embed rich async features into a custom TaskScheduler
. This class was not designed with async
/await
in mind. The standard way to use a custom TaskScheduler
is as an argument to the Task.Factory.StartNew
method. This method does not understand async delegates. It is possible to provide an async delegate, but it is treated as any other delegate that returns some result. To get the actual awaited result of the async delegate one must call Unwrap()
to the task returned. This is not the problem though. The problem is that the TaskScheduler
infrastructure does not treat the async delegate as a single unit of work. It split each task into multiple mini-tasks (using every await
as a separator), and each mini-task is processed individually. This severely restricts the asynchronous functionality that can be implemented on top of this class. As an example here is a custom TaskScheduler
that is intended to queue the supplied tasks one at a time (to throttle the concurrency in other words):
无法将丰富的异步功能嵌入到自定义TaskScheduler
. 这个类的设计没有考虑async
/ await
。使用自定义的标准方法TaskScheduler
是作为Task.Factory.StartNew
方法的参数。此方法不理解异步委托。可以提供异步委托,但它被视为返回某些结果的任何其他委托。要获得异步委托的实际等待结果,必须调用Unwrap()
返回的任务。不过这不是问题。问题在于TaskScheduler
基础架构不会将异步委托视为单个工作单元。它将每个任务拆分为多个小任务(使用每个await
作为分隔符),并且每个小任务都是单独处理的。这严重限制了可以在此类之上实现的异步功能。作为一个例子,这里是一个习惯TaskScheduler
,旨在一次将提供的任务排队(换句话说,以限制并发性):
public class MyTaskScheduler : TaskScheduler
{
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);
protected async override void QueueTask(Task task)
{
await _semaphore.WaitAsync();
try
{
await Task.Run(() => base.TryExecuteTask(task));
await task;
}
finally
{
_semaphore.Release();
}
}
protected override bool TryExecuteTaskInline(Task task,
bool taskWasPreviouslyQueued) => false;
protected override IEnumerable<Task> GetScheduledTasks() { yield break; }
}
The SemaphoreSlim
should ensure that only one Task
would run at a time. Unfortunately it doesn't work. The semaphore is released prematurely, because the Task
passed in the call QueueTask(task)
is not the task that represents the whole work of the async delegate, but only the part until the first await
. The other parts are passed to the TryExecuteTaskInline
method. There is no way to correlate these task-parts, because no identifier or other mechanism is provided. Here is what happens in practice:
本SemaphoreSlim
应确保只有一个Task
会同时运行。不幸的是它不起作用。信号量过早释放,因为Task
传入的调用QueueTask(task)
不是代表异步委托全部工作的任务,而只是第一个await
. 其他部分传递给TryExecuteTaskInline
方法。无法关联这些任务部分,因为没有提供标识符或其他机制。以下是实践中发生的事情:
var taskScheduler = new MyTaskScheduler();
var tasks = Enumerable.Range(1, 5).Select(n => Task.Factory.StartNew(async () =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Item {n} Started");
await Task.Delay(1000);
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Item {n} Finished");
}, default, TaskCreationOptions.None, taskScheduler))
.Select(t => t.Unwrap())
.ToArray();
Task.WaitAll(tasks);
Output:
输出:
05:29:58.346 Item 1 Started
05:29:58.358 Item 2 Started
05:29:58.358 Item 3 Started
05:29:58.358 Item 4 Started
05:29:58.358 Item 5 Started
05:29:59.358 Item 1 Finished
05:29:59.374 Item 5 Finished
05:29:59.374 Item 4 Finished
05:29:59.374 Item 2 Finished
05:29:59.374 Item 3 Finished
05:29:58.346项目1入门
05:29:58.358项目2入门
05:29:58.358项目3入门
05:29:58.358项目4入门
05:29:58.358项目5入门
05:29:59.358项目1成品
05: 29:59.374 项目 5 已完成
05:29:59.374 项目 4 已完成
05:29:59.374 项目 2 已完成
05:29:59.374 项目 3 已完成
Disaster, all tasks are queued at once.
灾难,所有任务同时排队。
Conclusion:Customizing the TaskScheduler
class is not the way to go when advanced async features are required.
结论:TaskScheduler
当需要高级异步功能时,自定义类不是要走的路。