开发者

How to Eliminate Cost-centres in String Taversals and List Comprehensions

I'm implementing a motif finding algorithm from the domain of bioinformatics using Haskell. I wont go into the details of the algorithm other then to say it's branch and bound median string search. I had planned on making my implementation more interesting by implementing a concurrent approach (and later an STM approach) in order to get a multicore speed up but after compiling with the follow flags

$ ghc -prof -auto-all -O2 -fllvm -threaded -rtsopts --make main 

and printing the profile I saw something interesting (and perhaps obvious):

COST CENTRE      entries  %time %alloc  
hammingDistance  34677951  47.6   14.7  
motifs           4835446   43.8   71.1  

It's clear that a remarkable speedup could be gained without going anywhere near multicore programming (although that's been done and I just need to find some good test data and sort out Criterion for that).

Anyway, both of these functions are purely functional and in no way concurrent. They're also doing quite simple stuff, so I was surprised that they took so much time. Here's the code for them:

data NukeTide = A | T | C | G deriving (Read, Show, Eq, Ord, Enum)

type Motif = [NukeTide] 

hammingDistance :: Motif -> Motif -> Int
hammingDistance [] [] = 0
hammingDistance xs [] = 0 -- optimistic
hammingDistance [] ys = 0 -- optimistic
hammingDistance (x:xs) (y:ys) = case (x == y) of
    True  -> hammingDistance xs ys
    False -> 1 + hammingDistance xs ys

motifs :: Int -> [a] -> [[a]]
motifs n nukeTides = [ take n $ drop k nukeTides | k <- [0..length nukeTides - n] ]

Note that of the two arguments to hammingDistance, I can actually assume that xs is going to be x long and that ys is going to be less than or equal to that, if that opens up room for improvements.

As you can see, hammingDistance calculates the hamming distance between two motifs, which are lists of nucleotides. The motifs function takes a number and a list and returns all the sub strings of that length, e.g.:

> motifs 3 "hello world"
["hel","ell","llo","lo ","o w"," wo","wor","orl","rld"]

Since the algorithmic processes involved are so simple I can't think of a way to optimize this further. I do however have two guesses as to where I should be headed:

  1. HammingDistance: The data types I'm using (NukeTides and []) are slow/clumsy. This is just a guess, since I'm not fami开发者_JAVA百科liar with their implementations but I think defining my own datatype, although more legible, probably involves more overhead then I intend. Also the pattern matching is foreign to me, I don't know if that is trivial or costly.
  2. Motifs: If I'm reading this correctly, 70% of all memory allocations are done by motifs, and I'd assume that has to be garbage collected at some time. Again using the all purpose list might be slowing me down or the list comprehension, since the cost of that is incredibly unclear to me.

Does anybody have any advice on the usual procedure here? If data types are the problem, would arrays be the right answer? (I've heard they come in boxes)

Thanks for the help.

Edit: It just occurred to me that it might be useful if I describe the manner in which these two functions are called:

totalDistance :: Motif -> Int
totalDistance motif = sum $ map (minimum . map (hammingDistance motif) . motifs l) dna

This function is the result of another function, and is passed around nodes in a tree. At each node in the tree an evaluation of the nucleotide (of length <= n, that is if == n then it is a leaf node) is done, using totalDistance to score the node. From then on it's your typical branch and bound algorithm.

Edit: John asked that I print out the change I made which virutally eliminated the cost of motifs:

scoreFunction :: DNA -> Int -> (Motif -> Int)
scoreFunction dna l = totalDistance
    where
        -- The sum of the minimum hamming distance in each line of dna
        -- is given by totalDistance motif
        totalDistance motif = sum $ map (minimum . map (hammingDistance motif)) possibleMotifs
        possibleMotifs = map (motifs l) dna -- Previously this was computed in the line above

I didn't make it clear in my original post, but scoreFunction is only called once, and the result is passed around in a tree traversal/branch and bound and used to evaluate nodes. Recomputing motifs at every step of the way, in retrospect, isn't one of the brightest things I've done.


Your definition of motifs looks like it's doing a lot more traversing than necessary because each application of drop has to traverse the list from the beginning. I would implement it using Data.List.tails instead:

motifs2 :: Int -> [a] -> [[a]]
motifs2 n nukeTides = map (take n) $ take count $ tails nukeTides
  where count = length nukeTides - n + 1

A quick comparison in GHCi shows the difference (using sum . map length to force evaluation):

*Main> let xs = concat (replicate 10000 [A, T, C, G])
(0.06 secs, 17914912 bytes)
*Main> sum . map length $ motifs 5 xs
199980
(3.47 secs, 56561208 bytes)
*Main> sum . map length $ motifs2 5 xs
199980
(0.15 secs, 47978952 bytes)


Your definition of hammingDistance is probably much less efficient than it could be.

hammingDistance (x:xs) (y:ys) = case (x == y) of
    True  -> hammingDistance xs ys
    False -> 1 + hammingDistance xs ys

Because of haskell's laziness, this will be expanded to (in the worst case):

(1 + (1 + (1 + ...)))

which will exist as a thunk on the stack, getting reduced only when it's used. Whether this is actually a problem depends on the call site, compiler options, etc., so it's often good practice to write your code in a form which avoids this issue altogether.

A common solution is to create a tail-recursive form with a strict accumulator, but in this case you could use higher-order functions, like this:

hammingDistance :: Motif -> Motif -> Int
hammingDistance xs ys = length . filter (uncurry (==)) $ zip xs ys

here's the tail-recursive implementation, for comparison

hammingDistance :: Motif -> Motif -> Int
hammingDistance xs ys = go 0 xs ys
  where
    go !acc [] [] = acc
    go !acc xs [] = acc -- optimistic
    go !acc [] ys = acc -- optimistic
    go !acc (x:xs) (y:ys) = case (x == y) of
      True  -> go acc xs ys
      False -> go (acc+1) xs ys

This uses the BangPatterns extension to force the accumulator to be strictly evaluated, otherwise it would have the same problem as your current definition.

To directly answer some of your other questions:

  1. Pattern matching is trivial
  2. Whether you should use lists or arrays depends mostly on how the data is created and how it's consumed. For this case, it's possible that lists may be the best type. In particular, if your lists are all consumed as they're created, and you don't ever need the whole list in memory, they should be fine. If you do retain lists in memory though, they have a lot of space overhead.

Usage patterns

I think the way you use these functions does some extra work as well:

(minimum . map (hammingDistance motif) . motifs l

Since you only need the minimum hammingDistance, you may be calculating a lot of extra values which aren't necessary. I can think of two solutions to this:

Option 1. Define a new function hammingDistanceThresh :: Motif -> Int -> Motif -> Int, which stops when it exceeds the threshold. The slightly odd type ordering is to facilitate using it in a fold, like this:

let motifs' = motifs l
in foldl' (hammingDistanceThresh motif) (hammingDistance motif $ head motifs') (tail motifs')

Option 2. If you define a lazy natural number type, you can use that instead of Ints for the result of hammingDistance. Then only as much of the hamming distance as necessary will be calculated.

One final note: using -auto-all will very frequently generate much slower code than other profiling options. I would suggest you try using just -auto first, and then adding manual SCC annotations if necessary.


Right... I could not resist going to the limit and wrote a plain-metal-ish packed-bits implementation:

{-# language TypeSynonymInstances #-}
{-# language BangPatterns #-}

import Data.Bits
import Data.Word


data NukeTide = A | T | C | G deriving (Read, Show, Eq, Ord, Enum)

type UnpackedMotif = [NukeTide] 

type PackageType = Word32
nukesInPackage = 16 :: Int
allSetMask = complement 0 :: PackageType


-- Be careful to have length of motif == nukesInPackage here!
packNukesToWord :: UnpackedMotif -> PackageType
packNukesToWord = packAt 0
    where packAt _ [] = 0
          packAt i (m:ml) =    (b0 m .&. bit i)
                           .|. (b1 m .&. bit (i+1))
                           .|. packAt (i+2) ml
          b0 A = 0
          b0 T = allSetMask
          b0 C = 0
          b0 G = allSetMask
          b1 A = 0
          b1 T = 0
          b1 C = allSetMask
          b1 G = allSetMask

unpackNukesWord :: PackageType -> UnpackedMotif
unpackNukesWord = unpackNNukesFromWord nukesInPackage

unpackNNukesFromWord :: Int -> PackageType -> UnpackedMotif
unpackNNukesFromWord = unpackN
    where unpackN 0 _ = []
          unpackN i w = (nukeOf $ w .&. r2Mask):(unpackN (i-1) $ w`shiftR`2)
          nukeOf bs
           | bs == 0      = A
           | bs == bit 0  = T
           | bs == bit 1  = C
           | otherwise    = G
          r2Mask = (bit 1 .|. bit 0) :: PackageType


data PackedMotif = PackedMotif { motifPackets::[PackageType]
                               , nukesInLastPack::Int        }
 -- note nukesInLastPack will never be zero; motifPackets must be [] to represent empty motifs.
packNukes :: UnpackedMotif -> PackedMotif
packNukes m = case remain of
               [] -> PackedMotif [packNukesToWord takeN] (length takeN)
               r  -> prAppend (packNukesToWord takeN) (packNukes r)
    where (takeN, remain) = splitAt nukesInPackage m
          prAppend w (PackedMotif l i) = PackedMotif (w:l) i

unpackNukes :: PackedMotif -> UnpackedMotif
unpackNukes (PackedMotif l i) = unpack l i
  where unpack [l] i = unpackNNukesFromWord i l
        unpack (l:ls) i = unpackNukesWord l ++ unpack ls i
        unpack [] _ = []

instance Show PackedMotif where
  show = show . unpackNukes



class Nukes a where
  pLength :: a -> Int
  shiftLN1 :: a -> a
  hammingDistance :: a -> a -> Int
  motifs :: Int -> a -> [a]

instance Nukes PackageType where
  pLength _ = nukesInPackage
  shiftLN1 = (`shiftR`2)
  hammingDistance !x !y = fromIntegral $ abt (x `xor` y)
      where abt !b = bbt(b.&.a0Mask .|. ((b.&.a1Mask) `shiftR` 1))
            bbt !b = sbt $ (b.&.r16Mask) + (b `shiftR` nukesInPackage)
            sbt !b = (r2Mask .&. b)             + (r2Mask .&. (b`shiftR`2))
                   + (r2Mask .&. (b`shiftR`4))  + (r2Mask .&. (b`shiftR`6))
                   + (r2Mask .&. (b`shiftR`8))  + (r2Mask .&. (b`shiftR`10))
                   + (r2Mask .&. (b`shiftR`12)) + (r2Mask .&. (b`shiftR`14))
            a0Mask = 0x55555555 :: PackageType
            a1Mask = 0xAAAAAAAA :: PackageType
            r16Mask = 0xFFFF :: PackageType
            r2Mask = 0x3 :: PackageType
  motifs 0 _ = []
  motifs l x = x : motifs (l-1) (shiftLN1 x)


maskNukesBut :: Int -> PackageType -> PackageType
maskNukesBut i = ( ( allSetMask `shiftR` (2*(nukesInPackage - i)) ) .&.)

instance Nukes PackedMotif where
  pLength (PackedMotif (x:xs) ix) = nukesInPackage * (length xs) + ix
  pLength _ = 0
  shiftLN1 ξ@(PackedMotif [] _) = ξ
  shiftLN1 (PackedMotif [x] ix) | ix>1       = PackedMotif [x`shiftR`2] (ix-1)
                                | otherwise  = PackedMotif [] nukesInPackage
  shiftLN1 (PackedMotif (x:x':xs) ix)
        = PackedMotif (( shiftLN1 x .|. pnext ):sxs) resLMod
      where sxs = motifPackets $ shiftLN1 (PackedMotif (x':xs) ix)
            pnext = shiftL (x'.&.0x3) 30
            resLMod = if ix > 1 then (ix-1) else nukesInPackage
  hammingDistance xs ys = go 0 xs ys
    where
      go :: Int -> PackedMotif -> PackedMotif -> Int
      go !acc (PackedMotif [x] ix) (PackedMotif [y] iy)
       | ix > iy    = acc + (hammingDistance y $ maskNukesBut iy x)
       | otherwise  = acc + (hammingDistance x $ maskNukesBut ix y)
      go !acc (PackedMotif [x] ix) (PackedMotif (y:ys) iy)
        = acc + (hammingDistance x $ maskNukesBut ix y)
      go !acc (PackedMotif (x:xs) ix) (PackedMotif [y] iy)
        = acc + (hammingDistance y $ maskNukesBut iy x)
      go !acc (PackedMotif (x:xs) ix) (PackedMotif (y:ys) iy)
        = go (acc + hammingDistance x y) (PackedMotif xs ix) (PackedMotif ys iy)
      go !acc _ _ = acc
  motifs l ξ
     | l>0        = fShfts (min nukesInPackage $ pLength ξ + 1 - l) ξ >>= ct
     | otherwise  = []
    where fShfts k χ | k > 0      = χ : fShfts (k-1) (shiftLN1 χ)
                     | otherwise  = []
          ct (PackedMotif ys iy) = case remain of
                [] -> if (length takeN - 1) * nukesInPackage + iy >= l
                       then [PackedMotif takeN lMod] else []
                _  -> PackedMotif takeN lMod : ct(PackedMotif (tail ys) iy)
            where (takeN, remain) = splitAt lQuot ys
          (lQuot,lMod) = case l `quotRem` nukesInPackage of
                   (i,0) -> (i, nukesInPackage)
                   (i,m) -> (i+1, m)

It can be used from UnpackedMotif = [NukeTide]s with the packNukes function, e.g.

*BioNuke0> motifs 23 $ packNukes $ take 27 $ cycle [A,T,G,C,A]
[[A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G],[T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C],[G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A],[C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A],[A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T]]

*BioNuke0> hammingDistance (packNukes [A,T,G,C,A,A,T,G]) (packNukes [A,T,C,C,A,T,G])
3

*BioNuke0> map (hammingDistance (packNukes $ take 52 $ cycle [A,T,C,C,A,T,G])) (motifs 52 $ packNukes $ take 523 $ cycle [A,T,C,C,A,T,G])
[0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44]

I haven't compared the performance to the original version yet, but it should be quite a bit faster than any algebraic-datatype implementation. Plus, it readily offers a space-efficient storage format.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜