module Bio.Iteratee (
groupStreamBy,
groupStreamOn,
iGetString,
iterGet,
iterLoop,
iLookAhead,
headStream,
peekStream,
takeStream,
dropStream,
mapStreamM,
mapStreamM_,
filterStream,
filterStreamM,
foldStream,
foldStreamM,
zipStreams,
protectTerm,
concatMapStream,
concatMapStreamM,
mapMaybeStream,
parMapChunksIO,
progressNum,
progressPos,
I.mapStream,
I.takeWhileE,
I.tryHead,
I.isFinished,
I.heads,
I.breakE,
($==),
mBind, mBind_, ioBind, ioBind_,
ListLike,
MonadIO, MonadMask,
lift, liftIO,
(>=>), (<=<),
stdin, stdout, stderr,
enumAuxFile,
enumInputs,
enumDefaultInputs,
defaultBufSize,
Ordering'(..),
mergeSortStreams,
Enumerator',
Enumeratee',
mergeEnums',
QQ(..),
emptyQ,
lengthQ,
pushQ,
popQ,
cancelAll,
ParseError(..),
parserToIteratee,
stream2vector,
stream2vectorN,
Fd,
withFileFd,
module Data.Iteratee.Binary,
module Data.Iteratee.Char,
module Data.Iteratee.IO,
module Data.Iteratee.Iteratee
) where
import Bio.Base ( findAuxFile )
import Bio.Bam.Header
import Bio.Util.Numeric ( showNum )
import Control.Concurrent.Async ( Async, async, wait, cancel )
import Control.Monad
import Control.Monad.Catch
import Control.Monad.IO.Class
import Control.Monad.Trans.Class
import Data.Binary.Get
import Data.Bits ( shiftR )
import Data.Iteratee.Binary
import Data.Iteratee.Char
import Data.Iteratee.IO hiding ( defaultBufSize )
import Data.Iteratee.Iteratee hiding ( identity )
import Data.ListLike ( ListLike )
import Data.Monoid
import Data.Typeable
import System.IO ( stdin, stdout, stderr, hIsTerminalDevice )
import System.Environment ( getArgs )
import System.Mem ( performGC )
import System.Posix ( Fd, openFd, closeFd, OpenMode(..), defaultFileFlags )
import qualified Data.Attoparsec.ByteString as A
import qualified Data.ByteString.Char8 as S
import qualified Data.Iteratee as I
import qualified Data.ListLike as LL
import qualified Data.Vector.Generic as VG
import qualified Data.Vector.Generic.Mutable as VM
groupStreamOn :: (Monad m, LL.ListLike l e, Eq t1, NullPoint l, Nullable l)
=> (e -> t1)
-> (t1 -> m (Iteratee l m t2))
-> Enumeratee l [(t1, t2)] m a
groupStreamOn proj inner = eneeCheckIfDonePass (icont . step)
where
step outer (EOF mx) = idone (liftI outer) $ EOF mx
step outer c@(Chunk as)
| LL.null as = liftI $ step outer
| otherwise = let x = proj (LL.head as)
in lift (inner x) >>= \i -> step' x i outer c
step' c it outer (Chunk as)
| LL.null as = liftI $ step' c it outer
| (l,r) <- LL.span ((==) c . proj) as, not (LL.null l) =
let od a _str = idoneM a $ EOF Nothing
oc k Nothing = return $ k (Chunk l)
oc k m = icontM k m
in lift (runIter it od oc) >>= \it' -> step' c it' outer (Chunk r)
step' c it outer str =
lift (run it) >>= \b -> eneeCheckIfDone (`step` str) . outer $ Chunk [(c,b)]
groupStreamBy :: (Monad m, LL.ListLike l t, NullPoint l, Nullable l)
=> (t -> t -> Bool)
-> m (Iteratee l m t2)
-> Enumeratee l [t2] m a
groupStreamBy cmp inner = eneeCheckIfDonePass (icont . step)
where
step outer (EOF mx) = idone (liftI outer) $ EOF mx
step outer c@(Chunk as)
| LL.null as = liftI $ step outer
| otherwise = lift inner >>= \i -> step' (LL.head as) i outer c
step' c it outer (Chunk as)
| LL.null as = liftI $ step' c it outer
| (l,r) <- LL.span (cmp c) as, not (LL.null l) =
let od a _str = idoneM a $ EOF Nothing
oc k Nothing = return $ k (Chunk l)
oc k m = icontM k m
in lift (runIter it od oc) >>= \it' -> step' (LL.head l) it' outer (Chunk r)
step' _ it outer str =
lift (run it) >>= \b -> eneeCheckIfDone (`step` str) . outer $ Chunk [b]
takeStream :: (Monad m, Nullable s, ListLike s el) => Int -> Enumeratee s s m a
takeStream = I.take
headStream :: ListLike s el => Iteratee s m el
headStream = I.head
peekStream :: ListLike s el => Iteratee s m (Maybe el)
peekStream = I.peek
dropStream :: (Nullable s, ListLike s el) => Int -> Iteratee s m ()
dropStream = I.drop
iLookAhead :: Monoid s => Iteratee s m a -> Iteratee s m a
iLookAhead = go mempty
where
go acc it = Iteratee $ \od oc -> runIter it (\x _ -> od x (Chunk acc)) (oc . step acc)
step acc k c@(Chunk str) = go (acc `mappend` str) (k c)
step acc k c@(EOF _) = Iteratee $ \od1 -> runIter (k c) (\x _ -> od1 x (Chunk acc))
iGetString :: Monad m => Int -> Iteratee S.ByteString m S.ByteString
iGetString 0 = idone S.empty (Chunk S.empty)
iGetString n = liftI $ step [] 0
where
step acc l c@(EOF _) = icont (step acc l) (Just $ setEOF c)
step acc l (Chunk c) | l + S.length c >= n = let r = S.concat . reverse $ S.take (nl) c : acc
in idone r (Chunk $ S.drop (nl) c)
| otherwise = liftI $ step (c:acc) (l + S.length c)
iterLoop :: (Nullable s, Monad m) => (a -> Iteratee s m a) -> a -> Iteratee s m a
iterLoop it a = do e <- I.isFinished
if e then return a
else it a >>= iterLoop it
iterGet :: Monad m => Get a -> Iteratee S.ByteString m a
iterGet = go . runGetIncremental
where
go (Fail _ _ err) = throwErr (iterStrExc err)
go (Done rest _ a) = idone a (Chunk rest)
go (Partial dec) = liftI $ \ck -> case ck of
Chunk s -> go (dec $ Just s)
EOF mx -> case dec Nothing of
Fail _ _ err -> throwErr (iterStrExc err)
Partial _ -> throwErr (iterStrExc "<partial>")
Done rest _ a | S.null rest -> idone a (EOF mx)
| otherwise -> idone a (Chunk rest)
infixl 1 `mBind`
mBind :: Monad m => m a -> (a -> Iteratee s m b) -> Iteratee s m b
mBind m f = Iteratee $ \onDone onCont -> m >>= \a -> runIter (f a) onDone onCont
infixl 1 `mBind_`
mBind_ :: Monad m => m a -> Iteratee s m b -> Iteratee s m b
mBind_ m b = Iteratee $ \onDone onCont -> m >> runIter b onDone onCont
infixl 1 `ioBind`
ioBind :: MonadIO m => IO a -> (a -> Iteratee s m b) -> Iteratee s m b
ioBind m f = Iteratee $ \onDone onCont -> liftIO m >>= \a -> runIter (f a) onDone onCont
infixl 1 `ioBind_`
ioBind_ :: MonadIO m => IO a -> Iteratee s m b -> Iteratee s m b
ioBind_ m b = Iteratee $ \onDone onCont -> liftIO m >> runIter b onDone onCont
infixl 1 $==
($==) :: Monad m => Enumerator' hdr input m (Iteratee output m result)
-> Enumeratee input output m result
-> Enumerator' hdr output m result
($==) enum enee iter = run =<< enum (enee . iter)
mergeEnums' :: (Nullable s2, Nullable s1, Monad m)
=> Enumerator' hi s1 m a
-> Enumerator' ho s2 (Iteratee s1 m) a
-> (ho -> Enumeratee s2 s1 (Iteratee s1 m) a)
-> Enumerator' hi s1 m a
mergeEnums' e1 e2 etee i = e1 $ \hi -> e2 (\ho -> joinI . etee ho $ ilift lift (i hi)) >>= run
concatMapStream :: (Monad m, ListLike s a, NullPoint s, ListLike t b) => (a -> t) -> Enumeratee s t m r
concatMapStream f = eneeCheckIfDone (liftI . go)
where
go k (EOF mx) = idone (liftI k) (EOF mx)
go k (Chunk xs) | LL.null xs = liftI (go k)
| otherwise = eneeCheckIfDone (flip go (Chunk (LL.tail xs))) . k . Chunk . f $ LL.head xs
concatMapStreamM :: (Monad m, ListLike s a, NullPoint s, ListLike t b) => (a -> m t) -> Enumeratee s t m r
concatMapStreamM f = eneeCheckIfDone (liftI . go)
where
go k (EOF mx) = idone (liftI k) (EOF mx)
go k (Chunk xs) | LL.null xs = liftI (go k)
| otherwise = f (LL.head xs) `mBind`
eneeCheckIfDone (flip go (Chunk (LL.tail xs))) . k . Chunk
mapMaybeStream :: (Monad m, ListLike s a, NullPoint s, ListLike t b) => (a -> Maybe b) -> Enumeratee s t m r
mapMaybeStream f = mapChunks mm
where
mm l = if LL.null l then LL.empty else
case f (LL.head l) of Nothing -> mm (LL.tail l)
Just b -> LL.cons b $ mm (LL.tail l)
filterStream :: (Monad m, ListLike s a, NullPoint s) => (a -> Bool) -> Enumeratee s s m r
filterStream = mapChunks . LL.filter
filterStreamM :: (Monad m, ListLike s a, Nullable s, NullPoint s) => (a -> m Bool) -> Enumeratee s s m r
filterStreamM k = mapChunksM (go id)
where
go acc s | LL.null s = return $! acc LL.empty
| otherwise = do p <- k (LL.head s)
let acc' = if p then LL.cons (LL.head s) . acc else acc
go acc' (LL.tail s)
mapStreamM :: (Monad m, ListLike (s el) el, ListLike (s el') el', NullPoint (s el), Nullable (s el), LooseMap s el el')
=> (el -> m el') -> Enumeratee (s el) (s el') m a
mapStreamM = mapChunksM . LL.mapM
mapStreamM_ :: (Monad m, Nullable s, ListLike s el) => (el -> m b) -> Iteratee s m ()
mapStreamM_ = mapChunksM_ . LL.mapM_
foldStreamM :: (Monad m, Nullable s, ListLike s a) => (b -> a -> m b) -> b -> Iteratee s m b
foldStreamM k = foldChunksM go
where
go b s | LL.null s = return b
| otherwise = k b (LL.head s) >>= \b' -> go b' (LL.tail s)
foldStream :: (Monad m, Nullable s, ListLike s a) => (b -> a -> b) -> b -> Iteratee s m b
foldStream f = foldChunksM (\b s -> return $! LL.foldl' f b s)
zipStreams :: (Nullable s, ListLike s el, Monad m)
=> Iteratee s m a -> Iteratee s m b -> Iteratee s m (a, b)
zipStreams = I.zip
type Enumerator' h eo m b = (h -> Iteratee eo m b) -> m (Iteratee eo m b)
type Enumeratee' h ei eo m b = (h -> Iteratee eo m b) -> Iteratee ei m (Iteratee eo m b)
enumAuxFile :: (MonadIO m, MonadMask m) => FilePath -> Iteratee S.ByteString m a -> m a
enumAuxFile fp it = liftIO (findAuxFile fp) >>= fileDriver it
enumDefaultInputs :: (MonadIO m, MonadMask m) => Enumerator S.ByteString m a
enumDefaultInputs it0 = liftIO getArgs >>= flip enumInputs it0
enumInputs :: (MonadIO m, MonadMask m) => [FilePath] -> Enumerator S.ByteString m a
enumInputs [] = enumHandle defaultBufSize stdin
enumInputs xs = go xs
where go ("-":fs) = enumHandle defaultBufSize stdin >=> go fs
go ( f :fs) = enumFile defaultBufSize f >=> go fs
go [ ] = return
defaultBufSize :: Int
defaultBufSize = 2*1024*1024
data Ordering' a = Less | Equal a | NotLess
mergeSortStreams :: (Monad m, ListLike s a, Nullable s) => (a -> a -> Ordering' a) -> Enumeratee s s (Iteratee s m) b
mergeSortStreams comp = eneeCheckIfDone step
where
step out = peekStream >>= \mx -> lift peekStream >>= \my -> case (mx, my) of
(Just x, Just y) -> case x `comp` y of
Less -> do I.drop 1 ; eneeCheckIfDone step . out . Chunk $ LL.singleton x
NotLess -> do lift (I.drop 1) ; eneeCheckIfDone step . out . Chunk $ LL.singleton y
Equal z -> do I.drop 1 ; lift (I.drop 1) ; eneeCheckIfDone step . out . Chunk $ LL.singleton z
(Just x, Nothing) -> do I.drop 1 ; eneeCheckIfDone step . out . Chunk $ LL.singleton x
(Nothing, Just y) -> do lift (I.drop 1) ; eneeCheckIfDone step . out . Chunk $ LL.singleton y
(Nothing, Nothing) -> idone (liftI out) $ EOF Nothing
parMapChunksIO :: (MonadIO m, Nullable s) => Int -> (s -> IO t) -> Enumeratee s t m a
parMapChunksIO np f = eneeCheckIfDonePass (go emptyQ)
where
go !qq k (Just e) = cancelAll qq >> icont (go' emptyQ k) (Just e)
go !qq k Nothing = case popQ qq of
Just (a,qq') | lengthQ qq == np -> liftIO (wait a) >>= eneeCheckIfDonePass (go qq') . k . Chunk
_ -> liftI $ go' qq k
go' !qq k (EOF mx) = do a <- liftIO (async (f empty))
goE mx (pushQ a qq) k Nothing
go' !qq k (Chunk c) = do a <- liftIO (async (f c))
go (pushQ a qq) k Nothing
goE _ !qq k (Just e) = cancelAll qq >> icont (go' emptyQ k) (Just e)
goE mx !qq k Nothing = case popQ qq of
Nothing -> idone (liftI k) (EOF mx)
Just (a,qq') -> liftIO (wait a) >>= eneeCheckIfDonePass (goE mx qq') . k . Chunk
protectTerm :: (Nullable s, MonadIO m) => Iteratee s m a -> Iteratee s m a
protectTerm itr = do
t <- liftIO $ hIsTerminalDevice stdout
if t then err else itr
where
err = error "cowardly refusing to write binary data to terminal"
progressNum :: (MonadIO m, Nullable s, NullPoint s, ListLike s a)
=> String -> (String -> IO ()) -> Enumeratee s s m b
progressNum msg put = eneeCheckIfDonePass (icont . go 0)
where
go !_ k (EOF mx) = idone (liftI k) (EOF mx)
go !n k (Chunk as) = do let !n' = n + LL.length as
when (n `div` 65536 /= n' `div` 65536) . liftIO .
put $ "\27[K" ++ msg ++ showNum n ++ "\r"
eneeCheckIfDonePass (icont . go n') . k $ Chunk as
progressPos :: (MonadIO m, ListLike s a, NullPoint s)
=> (a -> (Refseq, Int)) -> String -> (String -> IO ()) -> Refs -> Enumeratee s s m b
progressPos f msg put refs = eneeCheckIfDonePass (icont . go invalidRefseq 0)
where
go !_ !_ k (EOF mx) = idone (liftI k) (EOF mx)
go !rs0 !po0 k c@(Chunk as)
| LL.null as = liftI $ go rs0 po0 k
| otherwise = when (rs1 /= rs0 || po1 `shiftR` 19 /= po0 `shiftR` 19)
(let nm = S.unpack (sq_name (getRef refs rs1)) ++ ":"
in put $ "\27[K" ++ msg ++ nm ++ showNum po1 ++ "\r")
`ioBind_` eneeCheckIfDonePass (icont . go rs1 po1) (k c)
where (!rs1, !po1) = f (LL.head as)
data QQ a = QQ !Int [a] [a]
emptyQ :: QQ a
emptyQ = QQ 0 [] []
lengthQ :: QQ a -> Int
lengthQ (QQ l _ _) = l
pushQ :: a -> QQ a -> QQ a
pushQ a (QQ l [] b) = QQ (l+1) (reverse (a:b)) []
pushQ a (QQ l f b) = QQ (l+1) f (a:b)
popQ :: QQ a -> Maybe (a, QQ a)
popQ (QQ l (a:[]) b) = Just (a, QQ (l1) (reverse b) [])
popQ (QQ l (a:fs) b) = Just (a, QQ (l1) fs b)
popQ (QQ _ [ ] _) = Nothing
cancelAll :: MonadIO m => QQ (Async a) -> m ()
cancelAll (QQ _ ff bb) = liftIO $ mapM_ cancel (ff ++ bb)
data ParseError = ParseError {errorContexts :: [String], errorMessage :: String}
deriving (Show, Typeable)
instance Exception ParseError
parserToIteratee :: (Monad m) => A.Parser a -> Iteratee S.ByteString m a
parserToIteratee p = icont (f (A.parse p)) Nothing
where
f k (EOF Nothing) =
case A.feed (k S.empty) S.empty of
A.Fail _ err dsc -> throwErr (toException $ ParseError err dsc)
A.Partial _ -> throwErr (toException EofException)
A.Done rest v | S.null rest -> idone v (EOF Nothing)
| otherwise -> idone v (Chunk rest)
f _ (EOF (Just e)) = throwErr e
f k (Chunk s)
| S.null s = icont (f k) Nothing
| otherwise =
case k s of
A.Fail _ err dsc -> throwErr (toException $ ParseError err dsc)
A.Partial k' -> icont (f k') Nothing
A.Done rest v -> idone v (Chunk rest)
stream2vectorN :: (MonadIO m, ListLike s a, Nullable s, VG.Vector v a) => Int -> Iteratee s m (v a)
stream2vectorN n = do
mv <- liftIO $ VM.new n
l <- go mv 0
liftIO $ VG.unsafeFreeze $ VM.take l mv
where
go mv i
| i == n = return n
| otherwise =
I.tryHead >>= \x -> case x of
Nothing -> return i
Just a -> liftIO (VM.write mv i a) >> go mv (i+1)
stream2vector :: (MonadIO m, ListLike s a, Nullable s, VG.Vector v a) => Iteratee s m (v a)
stream2vector = liftIO (VM.new 1024) >>= go 0
where
go !i !mv = I.tryHead >>= \x -> case x of
Nothing -> liftIO $ VG.unsafeFreeze $ VM.take i mv
Just a -> do mv' <- if VM.length mv == i then liftIO (VM.grow mv (VM.length mv)) else return mv
when (i `rem` 0x10000 == 0) $ liftIO performGC
liftIO $ VM.write mv' i a
go (i+1) mv'
withFileFd :: (MonadIO m, MonadMask m) => FilePath -> (Fd -> m a) -> m a
withFileFd filepath iter = bracket
(liftIO $ openFd filepath ReadOnly Nothing defaultFileFlags)
(liftIO . closeFd) iter