Wrapping a sequence in a Stream in F#
I have a function that accepts a Stream. My data is in a large list, running into millions of items.
Is there a simple way I can wrap a sequence in a Stream, returning chunks of my sequence in the stream? One obvious approach is to implement my own stream class that returns chunks of the sequence. Something like :
type SeqStream(sequence:seq<'a>) =
inherit Stream()
default x.Read(buf, offset, count) =
// get next chunk
// yield chunk
Is there a simpler way of doing it? I don't 开发者_JAVA技巧have the means to change the target function that accepts a stream though.
I think that your approach looks good. The only problem is that Stream
is a relatively complicated class that has quite a few members and you probably don't want to implement most of them - if you want to pass it to some code that uses some of the additional members, you'll need to make the implementation more complex. Anyway, a simple stream that implements only Read
can look like this:
type SeqStream<'a>(sequence:seq<'a>, formatter:'a -> byte[]) =
inherit Stream()
// Keeps bytes that were read previously, but were not used
let temp = ResizeArray<_>()
// Enumerator for reading data from the sequence
let en = sequence.GetEnumerator()
override x.Read(buffer, offset, size) =
// Read next element and add it to temp until we have enough
// data or until we reach the end of the sequence
while temp.Count < size && en.MoveNext() do
temp.AddRange(formatter(en.Current))
// Copy data to the output & return count (may be less then
// required (at the end of the sequence)
let ret = min size temp.Count
temp.CopyTo(0, buffer, offset, ret)
temp.RemoveRange(0, ret)
ret
override x.Seek(offset, dir) = invalidOp "Seek"
override x.Flush() = invalidOp "Flush"
override x.SetLength(l) = invalidOp "SetLength"
override x.Length = invalidOp "Length"
override x.Position
with get() = invalidOp "Position"
and set(p) = invalidOp "Position"
override x.Write(buffer, offset, size) = invalidOp "Write"
override x.CanWrite = false
override x.CanSeek = false
override x.CanRead = true
Note that I added an additional parameter - a function to convert value of the generic type to a byte array. In general, it is difficult to convert anything to bytes (you could use some serialization), so this is probably easier. For example, for integers you can write:
let stream = new SeqStream<_>([ 1 .. 5 ], System.BitConverter.GetBytes)
精彩评论