C# .NET 异步流读/写
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/1540658/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
.NET Asynchronous stream read/write
提问by XpiritO
I have been trying to solve this "Concurrent Programming" exam exercise (in C#):
我一直在尝试解决这个“并发编程”考试练习(在 C# 中):
Knowing that
Stream
class containsint Read(byte[] buffer, int offset, int size)
andvoid Write(byte[] buffer, int offset, int size)
methods, implement in C# theNetToFile
method that copies all data received fromNetworkStream net
instance to theFileStream file
instance. To do the transfer, use asynchronous reads and synchronous writes, avoiding one thread to be blocked during read operations. The transfer ends when thenet
read operation returns value 0. To simplify, it is not necessary to support controlled cancel of the operation.
知道
Stream
类包含int Read(byte[] buffer, int offset, int size)
和void Write(byte[] buffer, int offset, int size)
方法,在 C# 中实现将NetToFile
所有从NetworkStream net
实例接收到的数据复制到FileStream file
实例的方法。要进行传输,请使用异步读取和同步写入,避免在读取操作期间阻塞一个线程。当net
读操作返回值 0时传输结束。为简化起见,不需要支持操作的受控取消。
void NetToFile(NetworkStream net, FileStream file);
I've been trying to solve this exercise, but I'm struggling with a question related with the question itself. But first, here is my code:
我一直在尝试解决这个练习,但我正在为一个与问题本身相关的问题而苦苦挣扎。但首先,这是我的代码:
public static void NetToFile(NetworkStream net, FileStream file) {
byte[] buffer = new byte[4096]; // buffer with 4 kB dimension
int offset = 0; // read/write offset
int nBytesRead = 0; // number of bytes read on each cycle
IAsyncResult ar;
do {
// read partial content of net (asynchronously)
ar = net.BeginRead(buffer,offset,buffer.Length,null,null);
// wait until read is completed
ar.AsyncWaitHandle.WaitOne();
// get number of bytes read on each cycle
nBytesRead = net.EndRead(ar);
// write partial content to file (synchronously)
fs.Write(buffer,offset,nBytesRead);
// update offset
offset += nBytesRead;
}
while( nBytesRead > 0);
}
The question I have is that, in the question statement, is said:
我的问题是,在问题陈述中,说:
To do the transfer, use asynchronous reads and synchronous writes, avoiding one thread to be blocked during read operations
进行传输,使用异步读取和同步写入,避免在读取操作期间阻塞一个线程
I'm not really sure if my solution accomplishes what is wanted in this exercise, because I'm using AsyncWaitHandle.WaitOne()
to wait until the asynchronous read completes.
我不确定我的解决方案是否能完成本练习中的要求,因为我习惯于AsyncWaitHandle.WaitOne()
等待异步读取完成。
On the other side, I'm not really figuring out what is meant to be a "non-blocking" solution in this scenario, as the FileStream
write is meant to be made synchronously... and to do that, I have to wait until NetworkStream
read completes to proceed with the FileStream
writing, isn't it?
另一方面,我并没有真正弄清楚在这种情况下什么是“非阻塞”解决方案,因为FileStream
写入是同步进行的……为此,我必须等到NetworkStream
read 完成继续FileStream
写入,不是吗?
Can you, please, help me out with this?
你能帮我解决这个问题吗?
[ EDIT 1 ] Using callbacksolution
[编辑 1]使用回调解决方案
Ok, if I understood what Mitchel Sellersand willvvreplied, I've been counseled to use a callback method to turn this into a "non-blocking" solution. Here is my code, then:
好的,如果我理解Mitchel Sellers和willvv 的回答,我已经被建议使用回调方法将其转变为“非阻塞”解决方案。这是我的代码,然后:
byte[] buffer; // buffer
public static void NetToFile(NetworkStream net, FileStream file) {
// buffer with same dimension as file stream data
buffer = new byte[file.Length];
//start asynchronous read
net.BeginRead(buffer,0,buffer.Length,OnEndRead,net);
}
//asynchronous callback
static void OnEndRead(IAsyncResult ar) {
//NetworkStream retrieve
NetworkStream net = (NetworkStream) ar.IAsyncState;
//get number of bytes read
int nBytesRead = net.EndRead(ar);
//write content to file
//... and now, how do I write to FileStream instance without
//having its reference??
//fs.Write(buffer,0,nBytesRead);
}
As you may have noticed, I'm stuck on the callback method, as I don't have a reference to the FileStream
instance where I want to invoke the "Write(...)" method.
您可能已经注意到,我被困在回调方法上,因为我没有对FileStream
要调用“Write(...)”方法的实例的引用。
Additionally, this is not a thread-safe solution, as the byte[]
field is exposed and may be shared among concurrent NetToFile
invocations. I don't know how to solve this problem without exposing this byte[]
field in the outer-scope... and I'm almost sure it may not be exposed this way.
此外,这不是线程安全的解决方案,因为该byte[]
字段是公开的并且可能在并发NetToFile
调用之间共享。我不知道如何解决这个问题而不byte[]
在外部范围中公开这个字段......而且我几乎可以肯定它可能不会以这种方式公开。
I don't want to use a lambda or anonymous method solution, because that's not in the curriculum of "Concurrent Programing" course.
我不想使用 lambda 或匿名方法解决方案,因为这不在“并发编程”课程的课程中。
采纳答案by bendewey
You are going to need to use the callback from the NetStream read to handle this. And frankly it might be easier to wrap the copying logic into its own class so that you can maintain the instance of the active Streams.
您将需要使用来自 NetStream 读取的回调来处理此问题。坦率地说,将复制逻辑包装到它自己的类中可能更容易,以便您可以维护活动流的实例。
This is how I'd approach it (not tested):
这就是我的处理方式(未测试):
public class Assignment1
{
public static void NetToFile(NetworkStream net, FileStream file)
{
var copier = new AsyncStreamCopier(net, file);
copier.Start();
}
public static void NetToFile_Option2(NetworkStream net, FileStream file)
{
var completedEvent = new ManualResetEvent(false);
// copy as usual but listen for completion
var copier = new AsyncStreamCopier(net, file);
copier.Completed += (s, e) => completedEvent.Set();
copier.Start();
completedEvent.WaitOne();
}
/// <summary>
/// The Async Copier class reads the input Stream Async and writes Synchronously
/// </summary>
public class AsyncStreamCopier
{
public event EventHandler Completed;
private readonly Stream input;
private readonly Stream output;
private byte[] buffer = new byte[4096];
public AsyncStreamCopier(Stream input, Stream output)
{
this.input = input;
this.output = output;
}
public void Start()
{
GetNextChunk();
}
private void GetNextChunk()
{
input.BeginRead(buffer, 0, buffer.Length, InputReadComplete, null);
}
private void InputReadComplete(IAsyncResult ar)
{
// input read asynchronously completed
int bytesRead = input.EndRead(ar);
if (bytesRead == 0)
{
RaiseCompleted();
return;
}
// write synchronously
output.Write(buffer, 0, bytesRead);
// get next
GetNextChunk();
}
private void RaiseCompleted()
{
if (Completed != null)
{
Completed(this, EventArgs.Empty);
}
}
}
}
回答by willvv
You're right, what you're doing is basically synchronous reading, because you use the WaitOne() method and it just stops the execution until the data is ready, that's basically the same as doing it using Read() instead of BeginRead() and EndRead().
你是对的,你所做的基本上是同步读取,因为你使用 WaitOne() 方法,它只是停止执行,直到数据准备好,这与使用 Read() 而不是 BeginRead() 基本相同) 和 EndRead()。
What you have to do, is use the callback argument in the BeginRead() method, with it, you define a callback method (or a lambda expression), this method will be invoked when the information has been read (in the callback method you have to check for the end of the stream, and write to the output stream), this way you won't be blocking the main thread (you won't need the WaitOne() nor the EndRead().
你要做的,就是在 BeginRead() 方法中使用回调参数,用它,你定义一个回调方法(或一个 lambda 表达式),这个方法将在信息被读取时调用(在回调方法中你必须检查流的结尾,并写入输出流),这样您就不会阻塞主线程(您不需要 WaitOne() 或 EndRead()。
Hope this helps.
希望这可以帮助。
回答by Nicholas Carey
Even though it goes against the grain to help people with their homework, given that this is more than a year old, here's the proper way to accomplish this. All you need to overlapyour read/write operations — no spawning of additional threads, or anything else is required.
尽管帮助人们完成家庭作业是违背常理的,但鉴于这已经一年多了,以下是完成此任务的正确方法。所有你需要重复你的读/写操作-没有产卵的额外的线程,或其他任何东西是必需的。
public static class StreamExtensions
{
private const int DEFAULT_BUFFER_SIZE = short.MaxValue ; // +32767
public static void CopyTo( this Stream input , Stream output )
{
input.CopyTo( output , DEFAULT_BUFFER_SIZE ) ;
return ;
}
public static void CopyTo( this Stream input , Stream output , int bufferSize )
{
if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" );
if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );
byte[][] buf = { new byte[bufferSize] , new byte[bufferSize] } ;
int[] bufl = { 0 , 0 } ;
int bufno = 0 ;
IAsyncResult read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;
IAsyncResult write = null ;
while ( true )
{
// wait for the read operation to complete
read.AsyncWaitHandle.WaitOne() ;
bufl[bufno] = input.EndRead(read) ;
// if zero bytes read, the copy is complete
if ( bufl[bufno] == 0 )
{
break ;
}
// wait for the in-flight write operation, if one exists, to complete
// the only time one won't exist is after the very first read operation completes
if ( write != null )
{
write.AsyncWaitHandle.WaitOne() ;
output.EndWrite(write) ;
}
// start the new write operation
write = output.BeginWrite( buf[bufno] , 0 , bufl[bufno] , null , null ) ;
// toggle the current, in-use buffer
// and start the read operation on the new buffer.
//
// Changed to use XOR to toggle between 0 and 1.
// A little speedier than using a ternary expression.
bufno ^= 1 ; // bufno = ( bufno == 0 ? 1 : 0 ) ;
read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ;
}
// wait for the final in-flight write operation, if one exists, to complete
// the only time one won't exist is if the input stream is empty.
if ( write != null )
{
write.AsyncWaitHandle.WaitOne() ;
output.EndWrite(write) ;
}
output.Flush() ;
// return to the caller ;
return ;
}
public static async Task CopyToAsync( this Stream input , Stream output )
{
await input.CopyToAsync( output , DEFAULT_BUFFER_SIZE ) ;
return;
}
public static async Task CopyToAsync( this Stream input , Stream output , int bufferSize )
{
if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" );
if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" );
byte[][] buf = { new byte[bufferSize] , new byte[bufferSize] } ;
int[] bufl = { 0 , 0 } ;
int bufno = 0 ;
Task<int> read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length ) ;
Task write = null ;
while ( true )
{
await read ;
bufl[bufno] = read.Result ;
// if zero bytes read, the copy is complete
if ( bufl[bufno] == 0 )
{
break;
}
// wait for the in-flight write operation, if one exists, to complete
// the only time one won't exist is after the very first read operation completes
if ( write != null )
{
await write ;
}
// start the new write operation
write = output.WriteAsync( buf[bufno] , 0 , bufl[bufno] ) ;
// toggle the current, in-use buffer
// and start the read operation on the new buffer.
//
// Changed to use XOR to toggle between 0 and 1.
// A little speedier than using a ternary expression.
bufno ^= 1; // bufno = ( bufno == 0 ? 1 : 0 ) ;
read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length );
}
// wait for the final in-flight write operation, if one exists, to complete
// the only time one won't exist is if the input stream is empty.
if ( write != null )
{
await write;
}
output.Flush();
// return to the caller ;
return;
}
}
Cheers.
干杯。
回答by Kenzi
Wow, these are all very complex! Here's my async solution, and it's just one function. The Read() and BeginWrite() both run at the same time.
哇,这些都是很复杂的!这是我的异步解决方案,它只是一个功能。Read() 和 BeginWrite() 同时运行。
/// <summary>
/// Copies a stream.
/// </summary>
/// <param name="source">The stream containing the source data.</param>
/// <param name="target">The stream that will receive the source data.</param>
/// <remarks>
/// This function copies until no more can be read from the stream
/// and does not close the stream when done.<br/>
/// Read and write are performed simultaneously to improve throughput.<br/>
/// If no data can be read for 60 seconds, the copy will time-out.
/// </remarks>
public static void CopyStream(Stream source, Stream target)
{
// This stream copy supports a source-read happening at the same time
// as target-write. A simpler implementation would be to use just
// Write() instead of BeginWrite(), at the cost of speed.
byte[] readbuffer = new byte[4096];
byte[] writebuffer = new byte[4096];
IAsyncResult asyncResult = null;
for (; ; )
{
// Read data into the readbuffer. The previous call to BeginWrite, if any,
// is executing in the background..
int read = source.Read(readbuffer, 0, readbuffer.Length);
// Ok, we have read some data and we're ready to write it, so wait here
// to make sure that the previous write is done before we write again.
if (asyncResult != null)
{
// This should work down to ~0.01kb/sec
asyncResult.AsyncWaitHandle.WaitOne(60000);
target.EndWrite(asyncResult); // Last step to the 'write'.
if (!asyncResult.IsCompleted) // Make sure the write really completed.
throw new IOException("Stream write failed.");
}
if (read <= 0)
return; // source stream says we're done - nothing else to read.
// Swap the read and write buffers so we can write what we read, and we can
// use the then use the other buffer for our next read.
byte[] tbuf = writebuffer;
writebuffer = readbuffer;
readbuffer = tbuf;
// Asynchronously write the data, asyncResult.AsyncWaitHandle will
// be set when done.
asyncResult = target.BeginWrite(writebuffer, 0, read, null, null);
}
}
回答by Shrike
It's strange that no one mentioned TPL.
Here's very nice post by PFX team (Stephen Toub) about how to implement concurrent async stream copy. The post contains out-dated refenrece to samples so here's corrent one:
Get Parallel Extensions Extras from code.msdnthen
奇怪的是没有人提到TPL。
这是 PFX 团队 (Stephen Toub) 关于如何实现并发异步流复制的非常好的帖子。该帖子包含对示例的过时参考,因此这里是相应的参考:
Get Parallel Extensions Extras from code.msdnthen
var task = sourceStream.CopyStreamToStreamAsync(destinationStream);
// do what you want with the task, for example wait when it finishes:
task.Wait();
Also consider using J.Richer's AsyncEnumerator.
还可以考虑使用 J.Richer 的AsyncEnumerator。
回答by John Leidegren
I doubt this is the fastest code (there's some overhead from the .NET Task abstraction) but I do think it's a cleanerapproach to the whole async copy thing.
我怀疑这是最快的代码(有一些来自 .NET 任务抽象的开销),但我确实认为这是对整个异步复制事情的更清晰的方法。
I needed a CopyTransformAsync
where I could pass a delegate to do something as chunks were passed through the copy operation. e.g. compute a message digest while copying. That's why I got interested in rolling my own option.
我需要一个CopyTransformAsync
可以传递委托来做某事的地方,因为块是通过复制操作传递的。例如,在复制时计算消息摘要。这就是为什么我对滚动我自己的选项感兴趣。
Findings:
发现:
- CopyToAsync bufferSize is sensitive (a large buffer is required)
- FileOptions.Asynchronous -> makes it horrendously slow (not sure exactly why that is)
- The bufferSize of the FileStream objects can be smaller (it's not that important)
- The
Serial
test is clearly the fastest and most resource intensive
- CopyToAsync bufferSize 敏感(需要大缓冲区)
- FileOptions.Asynchronous -> 让它慢得可怕(不确定为什么会这样)
- FileStream 对象的 bufferSize 可以更小(这并不重要)
- 该
Serial
测试显然是最快和最密集的资源
Here's what I've found and the complete source codefor the program I used to test this. On my machine, these tests were run on a SSD disk and is the equivalent of a file copy. Normally, you'd not want to use this for just copying files, instead when you have a network stream (which is what my use case is), that's when you'd wanna use something like this.
这是我找到的内容以及我用来测试它的程序的完整源代码。在我的机器上,这些测试是在 SSD 磁盘上运行的,相当于文件副本。通常,您不希望仅将其用于复制文件,而是当您有网络流(这是我的用例)时,您就会想要使用这样的东西。
4K buffer
Serial... in 0.474s
CopyToAsync... timed out
CopyToAsync (Asynchronous)... timed out
CopyTransformAsync... timed out
CopyTransformAsync (Asynchronous)... timed out
8K buffer
Serial... in 0.344s
CopyToAsync... timed out
CopyToAsync (Asynchronous)... timed out
CopyTransformAsync... in 1.116s
CopyTransformAsync (Asynchronous)... timed out
40K buffer
Serial... in 0.195s
CopyToAsync... in 0.624s
CopyToAsync (Asynchronous)... timed out
CopyTransformAsync... in 0.378s
CopyTransformAsync (Asynchronous)... timed out
80K buffer
Serial... in 0.190s
CopyToAsync... in 0.355s
CopyToAsync (Asynchronous)... in 1.196s
CopyTransformAsync... in 0.300s
CopyTransformAsync (Asynchronous)... in 0.886s
160K buffer
Serial... in 0.432s
CopyToAsync... in 0.252s
CopyToAsync (Asynchronous)... in 0.454s
CopyTransformAsync... in 0.447s
CopyTransformAsync (Asynchronous)... in 0.555s
Here you can see the Process Explorer, performance graph as the test is run. Basically each top(in the lower of the three graphs) is the start of the serial test. You can clearly see how the throughput increases dramatically as the buffer size grows. It would appear as if it plans out somewhere around 80K which is what the .NET framework CopyToAsync
method uses, internally.
在这里您可以看到 Process Explorer,测试运行时的性能图。基本上每个顶部(在三张图中的下部)都是串行测试的开始。您可以清楚地看到吞吐量如何随着缓冲区大小的增加而急剧增加。看起来好像它计划在 80K 左右,这是 .NET 框架CopyToAsync
方法在内部使用的。
The nice thing here is that the final implementation wasn't that complicated:
这里的好处是最终的实现并没有那么复杂:
static Task CompletedTask = ((Task)Task.FromResult(0));
static async Task CopyTransformAsync(Stream inputStream
, Stream outputStream
, Func<ArraySegment<byte>, ArraySegment<byte>> transform = null
)
{
var temp = new byte[bufferSize];
var temp2 = new byte[bufferSize];
int i = 0;
var readTask = inputStream
.ReadAsync(temp, 0, bufferSize)
.ConfigureAwait(false);
var writeTask = CompletedTask.ConfigureAwait(false);
for (; ; )
{
// synchronize read
int read = await readTask;
if (read == 0)
{
break;
}
if (i++ > 0)
{
// synchronize write
await writeTask;
}
var chunk = new ArraySegment<byte>(temp, 0, read);
// do transform (if any)
if (!(transform == null))
{
chunk = transform(chunk);
}
// queue write
writeTask = outputStream
.WriteAsync(chunk.Array, chunk.Offset, chunk.Count)
.ConfigureAwait(false);
// queue read
readTask = inputStream
.ReadAsync(temp2, 0, bufferSize)
.ConfigureAwait(false);
// swap buffer
var temp3 = temp;
temp = temp2;
temp2 = temp3;
}
await writeTask; // complete any lingering write task
}
This method of interleaving the read/write despite the huge buffers is somewhere between 18% faster than the BCL CopyToAsync
.
尽管有巨大的缓冲区,但这种交错读/写的方法比 BCL 快 18% CopyToAsync
。
Out of curiosity, I did change the async calls to typical begin/end async pattern calls and that did not improve the situation one bit, it made it worse. For all I like to bash on the Task abstraction overhead, they do some nifty things when you write you code with the async/await keywords and it is much nicer to read that code!
出于好奇,我确实将异步调用更改为典型的开始/结束异步模式调用,但这并没有改善情况,反而使情况变得更糟。对于我喜欢抨击任务抽象开销的所有内容,当您使用 async/await 关键字编写代码时,它们会做一些漂亮的事情,并且阅读该代码会更好!