开发者

Wrapping a sequence in a Stream in F#

I have a function that accepts a Stream. My data is in a large list, running into millions of items.

Is there a simple way I can wrap a sequence in a Stream, returning chunks of my sequence in the stream? One obvious approach is to implement my own stream class that returns chunks of the sequence. Something like :

type SeqStream(sequence:seq<'a>) = 
    inherit Stream()
    default x.Read(buf, offset, count) =
        // get next chunk
        // yield chunk

Is there a simpler way of doing it? I don't 开发者_JAVA技巧have the means to change the target function that accepts a stream though.


I think that your approach looks good. The only problem is that Stream is a relatively complicated class that has quite a few members and you probably don't want to implement most of them - if you want to pass it to some code that uses some of the additional members, you'll need to make the implementation more complex. Anyway, a simple stream that implements only Read can look like this:

type SeqStream<'a>(sequence:seq<'a>, formatter:'a -> byte[]) =
  inherit Stream()

  // Keeps bytes that were read previously, but were not used    
  let temp = ResizeArray<_>() 
  // Enumerator for reading data from the sequence
  let en = sequence.GetEnumerator()

  override x.Read(buffer, offset, size) = 
    // Read next element and add it to temp until we have enough
    // data or until we reach the end of the sequence
    while temp.Count < size && en.MoveNext() do
      temp.AddRange(formatter(en.Current))

    // Copy data to the output & return count (may be less then 
    // required (at the end of the sequence)
    let ret = min size temp.Count
    temp.CopyTo(0, buffer, offset, ret)
    temp.RemoveRange(0, ret)
    ret

  override x.Seek(offset, dir) = invalidOp "Seek"
  override x.Flush() = invalidOp "Flush"
  override x.SetLength(l) = invalidOp "SetLength"
  override x.Length = invalidOp "Length"
  override x.Position 
    with get() = invalidOp "Position"
    and set(p) = invalidOp "Position"
  override x.Write(buffer, offset, size) = invalidOp "Write"
  override x.CanWrite = false
  override x.CanSeek = false
  override x.CanRead = true

Note that I added an additional parameter - a function to convert value of the generic type to a byte array. In general, it is difficult to convert anything to bytes (you could use some serialization), so this is probably easier. For example, for integers you can write:

let stream = new SeqStream<_>([ 1 .. 5 ], System.BitConverter.GetBytes)
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜