开发者

F# image manipulation performance problem

I am currently trying to improve the performance of an F# program to make it as fast as its C# equivalent. The program does apply a filter array to a buffer of pixels. Access to memory is always done using pointers.

Here is the C# code which is applied to each pixel of an image:

unsafe private static byte getPixelValue(byte* buffer,开发者_如何学Python double* filter, int filterLength, double filterSum)
{
    double sum = 0.0;
    for (int i = 0; i < filterLength; ++i)
    {
        sum += (*buffer) * (*filter);
        ++buffer;
        ++filter;
    }

    sum = sum / filterSum;

    if (sum > 255) return 255;
    if (sum < 0) return 0;
    return (byte) sum;
}

The F# code looks like this and takes three times as long as the C# program:

let getPixelValue (buffer:nativeptr<byte>) (filterData:nativeptr<float>) filterLength filterSum : byte =

    let rec accumulatePixel (acc:float) (buffer:nativeptr<byte>) (filter:nativeptr<float>) i = 
        if i > 0 then
            let newAcc = acc + (float (NativePtr.read buffer) * (NativePtr.read filter))
            accumulatePixel newAcc (NativePtr.add buffer 1) (NativePtr.add filter 1) (i-1)
        else
            acc

    let acc = (accumulatePixel 0.0 buffer filterData filterLength) / filterSum                

    match acc with
        | _ when acc > 255.0 -> 255uy
        | _ when acc < 0.0 -> 0uy
        | _ -> byte acc

Using mutable Variables and a for loop in F# does result in the same speed as using recursion. All Projects are configured to run in Release Mode with Code Optimization turned on.

How could the performance of the F# version be improved?

EDIT:

The bottleneck seems to be in (NativePtr.get buffer offset). If I replace this code with a fixed value and also replace the corresponding code in the C# version with a fixed value, I get about the same speed for both programs. In fact, in C# the speed does not change at all, but in F# it makes a huge difference.

Can this behaviour possibly be changed or is it rooted deeply in the architecture of F#?

EDIT 2:

I refactored the code again to use for-loops. The execution speed remains the same:

let mutable acc <- 0.0
let mutable f <- filterData
let mutable b <- tBuffer
for i in 1 .. filter.FilterLength do
    acc <- acc + (float (NativePtr.read b)) * (NativePtr.read f)
    f <- NativePtr.add f 1
    b <- NativePtr.add b 1

If I compare the IL code of a version that uses (NativePtr.read b) and another version that is the same except that it uses a fixed value 111uy instead of reading it from the pointer, Only the following lines in the IL code change:

111uy has IL-Code ldc.i4.s 0x6f (0.3 seconds)

(NativePtr.read b) has IL-Code lines ldloc.s b and ldobj uint8 (1.4 seconds)

For comparison: C# does the filtering in 0.4 seconds.

The fact that reading the filter does not impact performance while reading from the image buffer does is somehow confusing. Before I filter a line of the image I copy the line into a buffer that has the length of a line. That's why the read operations are not spread all over the image but are within this buffer, which has a size of about 800 bytes.


If we look at the actual IL code of the inner loop which traverses both buffers in parallel generated by C# compiler (relevant part):

L_0017: ldarg.0 
L_0018: ldc.i4.1 
L_0019: conv.i 
L_001a: add 
L_001b: starg.s buffer
L_001d: ldarg.1 
L_001e: ldc.i4.8 
L_001f: conv.i 
L_0020: add 

and F# compiler:

L_0017: ldc.i4.1 
L_0018: conv.i 
L_0019: sizeof uint8
L_001f: mul 
L_0020: add 
L_0021: ldarg.2 
L_0022: ldc.i4.1 
L_0023: conv.i 
L_0024: sizeof float64
L_002a: mul 
L_002b: add

we'll notice that while C# code uses only add operator while F# needs both mul and add. But obviously on each step we only need to increment pointers (by 'sizeof byte' and 'sizeof float' values respectively), not to calculate address (addrBase + (sizeof byte)) F# mul is unnecessary (it always multiplies by 1).

The cause for that is that C# defines ++ operator for pointers while F# provides only add : nativeptr<'T> -> int -> nativeptr<'T> operator:

[<NoDynamicInvocation>]
let inline add (x : nativeptr<'a>) (n:int) : nativeptr<'a> = to_nativeint x + nativeint n * (# "sizeof !0" type('a) : nativeint #) |> of_nativeint

So it's not "rooted deeply" in F#, it's just that module NativePtr lacks inc and dec functions.

Btw, I suspect the above sample could be written in a more concise manner if the arguments were passed as arrays instead of raw pointers.

UPDATE:

So does the following code have only 1% speed up (it seems to generate very similar to C# IL):

let getPixelValue (buffer:nativeptr<byte>) (filterData:nativeptr<float>) filterLength filterSum : byte =

    let rec accumulatePixel (acc:float) (buffer:nativeptr<byte>) (filter:nativeptr<float>) i = 
        if i > 0 then
            let newAcc = acc + (float (NativePtr.read buffer) * (NativePtr.read filter))
            accumulatePixel newAcc (NativePtr.ofNativeInt <| (NativePtr.toNativeInt buffer) + (nativeint 1)) (NativePtr.ofNativeInt <| (NativePtr.toNativeInt filter) + (nativeint 8)) (i-1)
        else
            acc

    let acc = (accumulatePixel 0.0 buffer filterData filterLength) / filterSum                

    match acc with
        | _ when acc > 255.0 -> 255uy
        | _ when acc < 0.0 -> 0uy
        | _ -> byte acc

Another thought: it might also depend on the number of calls to getPixelValue your test does (F# splits this function into two methods while C# does it in one).

Is it possible that you post your testing code here?

Regarding array - I'd expect the code be at least more concise (and not unsafe).

UPDATE #2:

Looks like the actual bottleneck here is byte->float conversion.

C#:

L_0003: ldarg.1 
L_0004: ldind.u1 
L_0005: conv.r8 

F#:

L_000c: ldarg.1 
L_000d: ldobj uint8
L_0012: conv.r.un 
L_0013: conv.r8 

For some reason F# uses the following path: byte->float32->float64 while C# does only byte->float64. Not sure why is that, but with the following hack my F# version runs with the same speed as C# on gradbot test sample (BTW, thanks gradbot for the test!):

let inline preadConvert (p : nativeptr<byte>) = (# "conv.r8" (# "ldobj !0" type (byte) p : byte #) : float #)
let inline pinc (x : nativeptr<'a>) : nativeptr<'a> = NativePtr.toNativeInt x + (# "sizeof !0" type('a) : nativeint #) |> NativePtr.ofNativeInt

let rec accumulatePixel_ed (acc, buffer, filter,  i) = 
        if i > 0 then
            accumulatePixel_ed
                (acc + (preadConvert buffer) * (NativePtr.read filter),
                (pinc buffer),
                (pinc filter),
                (i-1))
        else
            acc

Results:

    adrian 6374985677.162810 1408.870900 ms
   gradbot 6374985677.162810 1218.908200 ms
        C# 6374985677.162810 227.832800 ms
 C# Offset 6374985677.162810 224.921000 ms
   mutable 6374985677.162810 1254.337300 ms
     ed'ka 6374985677.162810 227.543100 ms

LAST UPDATE It turned out that we can achieve the same speed even without any hacks:

let rec accumulatePixel_ed_last (acc, buffer, filter,  i) = 
        if i > 0 then
            accumulatePixel_ed_last
                (acc + (float << int16 <| NativePtr.read buffer) * (NativePtr.read filter),
                (NativePtr.add buffer 1),
                (NativePtr.add filter 1),
                (i-1))
        else
            acc

All we need to do is to convert byte into, say int16 and then into float. This way 'costly' conv.r.un instruction will be avoided.

PS Relevant conversion code from "prim-types.fs" :

let inline float (x: ^a) = 
    (^a : (static member ToDouble : ^a -> float) (x))
     when ^a : float     = (# "" x  : float #)
     when ^a : float32   = (# "conv.r8" x  : float #)
     // [skipped]
     when ^a : int16     = (# "conv.r8" x  : float #)
     // [skipped]
     when ^a : byte       = (# "conv.r.un conv.r8" x  : float #)
     when ^a : decimal    = (System.Convert.ToDouble((# "" x : decimal #))) 


How does this compare? It has less calls to NativePtr.

let getPixelValue (buffer:nativeptr<byte>) (filterData:nativeptr<float>) filterLength filterSum : byte =
    let accumulatePixel (acc:float) (buffer:nativeptr<byte>) (filter:nativeptr<float>) length = 
        let rec accumulate acc offset = 
            if offset < length then
                let newAcc = acc + (float (NativePtr.get buffer offset) * (NativePtr.get filter offset))
                accumulate newAcc (offset + 1)
            else
                acc
        accumulate acc 0

    let acc = (accumulatePixel 0.0 buffer filterData filterLength) / filterSum                

    match acc with
        | _ when acc > 255.0 -> 255uy
        | _ when acc < 0.0 -> 0uy
        | _ -> byte acc

F# source code of NativePtr.

[<NoDynamicInvocation>]
[<CompiledName("AddPointerInlined")>]
let inline add (x : nativeptr<'T>) (n:int) : nativeptr<'T> = toNativeInt x + nativeint n * (# "sizeof !0" type('T) : nativeint #) |> ofNativeInt

[<NoDynamicInvocation>]
[<CompiledName("GetPointerInlined")>]
let inline get (p : nativeptr<'T>) n = (# "ldobj !0" type ('T) (add p n) : 'T #) 


My results on a larger test.

    adrian 6374730426.098020 1561.102500 ms
   gradbot 6374730426.098020 1842.768000 ms
        C# 6374730426.098020  150.793500 ms
 C# Offset 6374730426.098020  150.318900 ms
   mutable 6374730426.098020 1446.616700 ms

F# test code

open Microsoft.FSharp.NativeInterop
open System.Runtime.InteropServices
open System.Diagnostics

open AccumulatePixel
#nowarn "9"

let test size fn =
    let bufferByte = Marshal.AllocHGlobal(size * 4)
    let bufferFloat = Marshal.AllocHGlobal(size * 8)

    let bi = NativePtr.ofNativeInt bufferByte
    let bf = NativePtr.ofNativeInt bufferFloat

    let random = System.Random()

    for i in 1 .. size do
        NativePtr.set bi i (byte <| random.Next() % 256)
        NativePtr.set bf i (random.NextDouble())

    let duration (f, name) =
        let stopWatch = Stopwatch.StartNew()
        let time = f(0.0, bi, bf, size)
        stopWatch.Stop()
        printfn "%10s %f %f ms" name time stopWatch.Elapsed.TotalMilliseconds

    List.iter duration fn

    Marshal.FreeHGlobal bufferFloat
    Marshal.FreeHGlobal bufferByte

let rec accumulatePixel_adrian (acc, buffer, filter, i) = 
    if i > 0 then
        let newAcc = acc + (float (NativePtr.read buffer) * (NativePtr.read filter))
        accumulatePixel_adrian (newAcc, (NativePtr.add buffer 1), (NativePtr.add filter 1), (i - 1))
    else
        acc

let accumulatePixel_gradbot (acc, buffer, filter, length) = 
    let rec accumulate acc offset = 
        if offset < length then
            let newAcc = acc + (float (NativePtr.get buffer offset) * (NativePtr.get filter offset))
            accumulate newAcc (offset + 1)
        else
            acc
    accumulate acc 0

let accumulatePixel_mutable (acc, buffer, filter, length) = 
    let mutable acc = 0.0
    let mutable f = filter
    let mutable b = buffer
    for i in 1 .. length do
        acc <- acc + (float (NativePtr.read b)) * (NativePtr.read f)
        f <- NativePtr.add f 1
        b <- NativePtr.add b 1
    acc

[
    accumulatePixel_adrian, "adrian";
    accumulatePixel_gradbot, "gradbot";
    AccumulatePixel.getPixelValue, "C#";
    AccumulatePixel.getPixelValueOffset, "C# Offset";
    accumulatePixel_mutable, "mutable";
]
|> test 100000000

System.Console.ReadLine() |> ignore

C# test code

namespace AccumulatePixel
{
    public class AccumulatePixel
    {
        unsafe public static double getPixelValue(double sum, byte* buffer, double* filter, int filterLength)
        {
            for (int i = 0; i < filterLength; ++i)
            {
                sum += (*buffer) * (*filter);
                ++buffer;
                ++filter;
            }

            return sum;
        }

        unsafe public static double getPixelValueOffset(double sum, byte* buffer, double* filter, int filterLength)
        {
            for (int i = 0; i < filterLength; ++i)
            {
                sum += buffer[i] * filter[i];
            }

            return sum;
        }
    }
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜