Consuming a custom stream (IEnumerable<T>)
Do I need to add something in the custom stream itself to notify that all the data have been read?
You can, but that wouldn't help in the WCF scenario where the received Stream
is a different class.
There are two standard (official, by design) ways of determining the end of the Stream
data:
(1) ReadByte returning -1
Returns
The unsigned byte cast to an Int32, or -1 if at the end of the stream.
(2) Read returning 0 when called with count > 0
Returns
The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero (0) if the end of the stream has been reached.
Unfortunately both they consume the current byte (advance to next) and will break the deserializer.
What are the possible solutions?
First, implementing some serialization/deserialization format (protocol) which allows you to know if there are more elements to deserialize. for instance, List<T>
stores Count
before elements, T[]
stores Length
before elements etc. Since the EnumerableStream<T>
does not know the count in advance, one simple solution would be to emit a single fake byte before each element:
private bool SerializeNext()
{
if (!_source.MoveNext())
return false;
buf.Enqueue(1); // <--
foreach (var b in _serializer(_source.Current))
_buf.Enqueue(b);
return true;
}
This would allow you to use
while (stream.ReadByte() != -1)
{
// ...
}
Second, if you want to keep the current format, a more general solution would be to implement a custom stream, which wraps another stream and implements PeekByte
method with the same semantics as the standard ReadByte
, but without consuming the current byte:
public class SequentialStream : Stream
{
private Stream source;
private bool leaveOpen;
private int? nextByte;
public SequentialStream(Stream source, bool leaveOpen = false)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (!source.CanRead) throw new ArgumentException("Non readable source.", nameof(source));
this.source = source;
this.leaveOpen = leaveOpen;
}
protected override void Dispose(bool disposing)
{
if (disposing && !leaveOpen)
source.Dispose();
base.Dispose(disposing);
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
public override void Flush() { }
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public int PeekByte()
{
if (nextByte == null)
nextByte = source.ReadByte();
return nextByte.Value;
}
public override int Read(byte[] buffer, int offset, int count)
{
if (count <= 0) return 0;
if (nextByte != null)
{
if (nextByte.Value < 0) return 0;
buffer[offset] = (byte)nextByte.Value;
if (count > 1)
{
int read = source.Read(buffer, offset + 1, count - 1);
if (read == 0)
nextByte = -1;
else
nextByte = null;
return read + 1;
}
else
{
nextByte = null;
return 1;
}
}
else
{
int read = source.Read(buffer, offset, count);
if (read == 0)
nextByte = -1;
return read;
}
}
}
This basically implements read only forward only stream with 0 or 1 byte read ahead functionality.
The usage will be like this:
using (var stream = new SequentialStream(GetStream(ListToSend)))
{
// ...
while (stream.PeekByte() != -1)
{
// ...
}
// ...
}
P.S. What about
I also want to know why when I put a break point in the read function, the buffer size is changing randomly.
It's not randomly. BinaryFormatter
internally uses BinaryReader
to read typed values like Int32
, Byte
, String
etc., passing the desired size as count
, e.g. 4, 1, number of the string encoded bytes (which it knows because stores them in the stream before actual the data and reads it before trying to read the actual data) etc.
First off, you can simply serialize List<List<string>>
itself. Demo here. This eliminates the need for this specialized class to read the stream. And potentially renders this answer moot. The only purpose for streaming it one at a time would be a potentially very large dataset. A different implementation would be needed in this case, which is what this following solution could potentially address.
The following answer (and your code) requires that the client reading the stream has the EnumerableStream
class.
Do I need to add something in the custom stream itself to notify that all the data have been read?
Yes. You need to implement a new property to know if you have another T to read, or use Length.
public bool HasMore { get { return _buf.Any() || SerializeNext();} }
or
public override long Length { get { return (_buf.Any() || SerializeNext()) ? 1 : 0; } }
I feel like this whole solution can be cleaned up to have an IEnumerable<T>
StreamReader
. However, this works.
Here is the adjusted and working fiddler. Note that I also cleaned it up a bit. The static class named the same as the other class was causing me a headache ;). Also, I would change to byte[]
, not List<byte>
.
Is it because the format of the deserializer and serialiser are not the same (I don't think so).
No.
I also want to know why when I put a break point in the read function, the buffer size is changing randomly.
The buffer _buf
should be the size of the current item serialized. This could vary per item.
I would prefer not to wrap the code with a try and catch, I want a clean solution that does not crash.
You are wise to not take the approach of just swallowing the exception, but instead understanding how to make it work as intended.