{-# LANGUAGE CPP, TypeFamilies #-}
#if __GLASGOW_HASKELL__ >= 800
{-# OPTIONS_GHC -Wno-orphans #-}
#else
{-# OPTIONS_GHC -fno-warn-orphans #-}
#endif
module Bio.Streaming
    ( MonadIO(..)
    , MonadMask
    , ByteStream

    , streamFile
    , streamHandle
    , streamInput
    , streamInputs
    , withOutputFile

    , UnwantedTerminal(..)
    , protectTerm
    , psequence
    , progressGen
    , progressNum
    , progressPos

    , mergeStreams
    , mergeStreamsBy
    , mergeStreamsOn

    , module Streaming
    , module Streaming.Prelude )
  where

import Bio.Bam.Header
import Bio.Prelude
import Bio.Streaming.Bytes
import Bio.Util.Numeric                     ( showNum )
import Streaming                     hiding ( (<>) )
import Streaming.Internal                   ( Stream(..) )
import Streaming.Prelude                    ( each )
import System.IO                            ( hIsTerminalDevice )

import qualified Streaming.Prelude      as Q

instance (Functor f, PrimMonad m) => PrimMonad (Stream f m) where
    type PrimState (Stream f m) = PrimState m
    primitive :: (State# (PrimState (Stream f m))
 -> (# State# (PrimState (Stream f m)), a #))
-> Stream f m a
primitive = m a -> Stream f m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m a -> Stream f m a)
-> ((State# (PrimState m) -> (# State# (PrimState m), a #)) -> m a)
-> (State# (PrimState m) -> (# State# (PrimState m), a #))
-> Stream f m a
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. (State# (PrimState m) -> (# State# (PrimState m), a #)) -> m a
forall (m :: * -> *) a.
PrimMonad m =>
(State# (PrimState m) -> (# State# (PrimState m), a #)) -> m a
primitive

{- | Default buffer size in elements.

Since we often want to merge many files, a read should take more time
than a seek.  Assuming a rotating hard drive, this sets the sensible
buffer size to somewhat more than one MB.  A smaller buffer size would
surely work on SSDs, but the large buffer doesn't hurt either.
-}
defaultBufSize :: Int
defaultBufSize :: Int
defaultBufSize = 2Int -> Int -> Int
forall a. Num a => a -> a -> a
*1024Int -> Int -> Int
forall a. Num a => a -> a -> a
*1024

streamFile :: (MonadIO m, MonadMask m) => FilePath -> (ByteStream m () -> m r) -> m r
streamFile :: FilePath -> (ByteStream m () -> m r) -> m r
streamFile f :: FilePath
f k :: ByteStream m () -> m r
k = m Handle -> (Handle -> m ()) -> (Handle -> m r) -> m r
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket (IO Handle -> m Handle
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Handle -> m Handle) -> IO Handle -> m Handle
forall a b. (a -> b) -> a -> b
$ FilePath -> IOMode -> IO Handle
openBinaryFile FilePath
f IOMode
ReadMode) (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Handle -> IO ()) -> Handle -> m ()
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Handle -> IO ()
hClose) (ByteStream m () -> m r
k (ByteStream m () -> m r)
-> (Handle -> ByteStream m ()) -> Handle -> m r
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Handle -> ByteStream m ()
forall (m :: * -> *). MonadIO m => Handle -> ByteStream m ()
streamHandle)
{-# INLINE streamFile #-}

streamHandle :: MonadIO m => Handle -> ByteStream m ()
streamHandle :: Handle -> ByteStream m ()
streamHandle = Int -> Handle -> ByteStream m ()
forall (m :: * -> *). MonadIO m => Int -> Handle -> ByteStream m ()
hGetContentsN Int
defaultBufSize
{-# INLINE streamHandle #-}

-- | Reads 'stdin' if the filename is \"-\", else reads the named file.
streamInput :: (MonadIO m, MonadMask m) => FilePath -> (ByteStream m () -> m r) -> m r
streamInput :: FilePath -> (ByteStream m () -> m r) -> m r
streamInput "-" k :: ByteStream m () -> m r
k = ByteStream m () -> m r
k (Handle -> ByteStream m ()
forall (m :: * -> *). MonadIO m => Handle -> ByteStream m ()
streamHandle Handle
stdin)
streamInput  f :: FilePath
f  k :: ByteStream m () -> m r
k = FilePath -> (ByteStream m () -> m r) -> m r
forall (m :: * -> *) r.
(MonadIO m, MonadMask m) =>
FilePath -> (ByteStream m () -> m r) -> m r
streamFile FilePath
f ByteStream m () -> m r
k
{-# INLINE streamInput #-}

{- | Reads multiple inputs in sequence.

Only one file is opened at a time, so they must also be consumed in
sequence.  The filename \"-\" refers to stdin, if no filenames are
given, stdin is read.
-}
streamInputs :: MonadIO m => [FilePath] -> (Stream (ByteStream m) m () -> r) -> r
streamInputs :: [FilePath] -> (Stream (ByteStream m) m () -> r) -> r
streamInputs [] k :: Stream (ByteStream m) m () -> r
k = Stream (ByteStream m) m () -> r
k (Stream (ByteStream m) m () -> r)
-> Stream (ByteStream m) m () -> r
forall a b. (a -> b) -> a -> b
$ ByteStream m () -> Stream (ByteStream m) m ()
forall (m :: * -> *) (f :: * -> *) r.
(Monad m, Functor f) =>
f r -> Stream f m r
yields (Handle -> ByteStream m ()
forall (m :: * -> *). MonadIO m => Handle -> ByteStream m ()
streamHandle Handle
stdin)
streamInputs fs :: [FilePath]
fs k :: Stream (ByteStream m) m () -> r
k = Stream (ByteStream m) m () -> r
k (Stream (ByteStream m) m () -> r)
-> Stream (ByteStream m) m () -> r
forall a b. (a -> b) -> a -> b
$ (FilePath -> Stream (ByteStream m) m ())
-> [FilePath] -> Stream (ByteStream m) m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ FilePath -> Stream (ByteStream m) m ()
forall (m :: * -> *) (m :: * -> *).
(Monad m, MonadIO m) =>
FilePath -> Stream (ByteStream m) m ()
go [FilePath]
fs
  where
    go :: FilePath -> Stream (ByteStream m) m ()
go "-" = ByteStream m () -> Stream (ByteStream m) m ()
forall (m :: * -> *) (f :: * -> *) r.
(Monad m, Functor f) =>
f r -> Stream f m r
yields (Handle -> ByteStream m ()
forall (m :: * -> *). MonadIO m => Handle -> ByteStream m ()
streamHandle Handle
stdin)
    go  f :: FilePath
f  = ByteStream m () -> Stream (ByteStream m) m ()
forall (m :: * -> *) (f :: * -> *) r.
(Monad m, Functor f) =>
f r -> Stream f m r
yields (ByteStream m () -> Stream (ByteStream m) m ())
-> ByteStream m () -> Stream (ByteStream m) m ()
forall a b. (a -> b) -> a -> b
$ do Handle
h <- IO Handle -> ByteStream m Handle
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Handle -> ByteStream m Handle)
-> IO Handle -> ByteStream m Handle
forall a b. (a -> b) -> a -> b
$ FilePath -> IOMode -> IO Handle
openBinaryFile FilePath
f IOMode
ReadMode
                         Handle -> ByteStream m ()
forall (m :: * -> *). MonadIO m => Handle -> ByteStream m ()
streamHandle Handle
h
                         IO () -> ByteStream m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ByteStream m ()) -> IO () -> ByteStream m ()
forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
hClose Handle
h
{-# INLINE streamInputs #-}

data UnwantedTerminal = UnwantedTerminal deriving (Typeable, Int -> UnwantedTerminal -> ShowS
[UnwantedTerminal] -> ShowS
UnwantedTerminal -> FilePath
(Int -> UnwantedTerminal -> ShowS)
-> (UnwantedTerminal -> FilePath)
-> ([UnwantedTerminal] -> ShowS)
-> Show UnwantedTerminal
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [UnwantedTerminal] -> ShowS
$cshowList :: [UnwantedTerminal] -> ShowS
show :: UnwantedTerminal -> FilePath
$cshow :: UnwantedTerminal -> FilePath
showsPrec :: Int -> UnwantedTerminal -> ShowS
$cshowsPrec :: Int -> UnwantedTerminal -> ShowS
Show)
instance Exception UnwantedTerminal where
    displayException :: UnwantedTerminal -> FilePath
displayException _ = "cowardly refusing to write binary data to terminal"

{- | Protects the terminal from binary junk.

If @s@ is a 'Stream', then @protectTerm s@ throws an error if 'stdout'
is a terminal device, followed by the same 'Stream'.  This is most
usefully composed with functions that might otherwise write binary data
to an interactive terminal.
-}
protectTerm :: (Functor f, MonadIO m) => Stream f m r -> Stream f m r
protectTerm :: Stream f m r -> Stream f m r
protectTerm str :: Stream f m r
str = do
    Bool
t <- IO Bool -> Stream f m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> Stream f m Bool) -> IO Bool -> Stream f m Bool
forall a b. (a -> b) -> a -> b
$ Handle -> IO Bool
hIsTerminalDevice Handle
stdout
    Bool -> Stream f m () -> Stream f m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
t (Stream f m () -> Stream f m ())
-> (UnwantedTerminal -> Stream f m ())
-> UnwantedTerminal
-> Stream f m ()
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. IO () -> Stream f m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Stream f m ())
-> (UnwantedTerminal -> IO ()) -> UnwantedTerminal -> Stream f m ()
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. UnwantedTerminal -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (UnwantedTerminal -> Stream f m ())
-> UnwantedTerminal -> Stream f m ()
forall a b. (a -> b) -> a -> b
$ UnwantedTerminal
UnwantedTerminal
    Stream f m r
str
{-# INLINE protectTerm #-}

{- Like 'Streaming.sequence', but parallel.

This runs each element of a stream of actions.  A configurable number of
actions are buffered and run asynchronously.
-}
psequence :: MonadIO m => Int -> Stream (Of (IO a)) m b -> Stream (Of a) m b
psequence :: Int -> Stream (Of (IO a)) m b -> Stream (Of a) m b
psequence np :: Int
np = QQ (MVar (Either SomeException a))
-> Stream (Of (IO a)) m b -> Stream (Of a) m b
go QQ (MVar (Either SomeException a))
forall a. QQ a
emptyQ
  where
    -- if the queue is full, wait for the head element to complete
    go :: QQ (MVar (Either SomeException a))
-> Stream (Of (IO a)) m b -> Stream (Of a) m b
go !QQ (MVar (Either SomeException a))
qq s :: Stream (Of (IO a)) m b
s = case QQ (MVar (Either SomeException a))
-> Maybe
     (MVar (Either SomeException a), QQ (MVar (Either SomeException a)))
forall a. QQ a -> Maybe (a, QQ a)
popQ QQ (MVar (Either SomeException a))
qq of
        Just (a :: MVar (Either SomeException a)
a,qq' :: QQ (MVar (Either SomeException a))
qq') | QQ (MVar (Either SomeException a)) -> Int
forall a. QQ a -> Int
lengthQ QQ (MVar (Either SomeException a))
qq Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
np -> MVar (Either SomeException a) -> Stream (Of a) m a
forall (m :: * -> *) a b.
(MonadIO m, Exception a) =>
MVar (Either a b) -> m b
reap MVar (Either SomeException a)
a Stream (Of a) m a -> (a -> Stream (Of a) m b) -> Stream (Of a) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Of a (Stream (Of a) m b) -> Stream (Of a) m b
forall (m :: * -> *) (f :: * -> *) r.
(Monad m, Functor f) =>
f (Stream f m r) -> Stream f m r
wrap (Of a (Stream (Of a) m b) -> Stream (Of a) m b)
-> (a -> Of a (Stream (Of a) m b)) -> a -> Stream (Of a) m b
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. (a -> Stream (Of a) m b -> Of a (Stream (Of a) m b)
forall a b. a -> b -> Of a b
:> QQ (MVar (Either SomeException a))
-> Stream (Of (IO a)) m b -> Stream (Of a) m b
go QQ (MVar (Either SomeException a))
qq' Stream (Of (IO a)) m b
s)
        _                               -> m (Either b (Of (IO a) (Stream (Of (IO a)) m b)))
-> Stream (Of a) m (Either b (Of (IO a) (Stream (Of (IO a)) m b)))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (Stream (Of (IO a)) m b
-> m (Either b (Of (IO a) (Stream (Of (IO a)) m b)))
forall (m :: * -> *) (f :: * -> *) r.
Monad m =>
Stream f m r -> m (Either r (f (Stream f m r)))
inspect Stream (Of (IO a)) m b
s) Stream (Of a) m (Either b (Of (IO a) (Stream (Of (IO a)) m b)))
-> (Either b (Of (IO a) (Stream (Of (IO a)) m b))
    -> Stream (Of a) m b)
-> Stream (Of a) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= QQ (MVar (Either SomeException a))
-> Either b (Of (IO a) (Stream (Of (IO a)) m b))
-> Stream (Of a) m b
go' QQ (MVar (Either SomeException a))
qq

    -- if we have room for input, we get input
    go' :: QQ (MVar (Either SomeException a))
-> Either b (Of (IO a) (Stream (Of (IO a)) m b))
-> Stream (Of a) m b
go' !QQ (MVar (Either SomeException a))
qq (Right (k :: IO a
k :> s :: Stream (Of (IO a)) m b
s)) = IO (MVar (Either SomeException a))
-> Stream (Of a) m (MVar (Either SomeException a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> IO (MVar (Either SomeException a))
forall a. IO a -> IO (MVar (Either SomeException a))
spawn IO a
k) Stream (Of a) m (MVar (Either SomeException a))
-> (MVar (Either SomeException a) -> Stream (Of a) m b)
-> Stream (Of a) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a :: MVar (Either SomeException a)
a -> QQ (MVar (Either SomeException a))
-> Stream (Of (IO a)) m b -> Stream (Of a) m b
go (MVar (Either SomeException a)
-> QQ (MVar (Either SomeException a))
-> QQ (MVar (Either SomeException a))
forall a. a -> QQ a -> QQ a
pushQ MVar (Either SomeException a)
a QQ (MVar (Either SomeException a))
qq) Stream (Of (IO a)) m b
s
    go' !QQ (MVar (Either SomeException a))
qq (Left         r :: b
r) = b -> QQ (MVar (Either SomeException a)) -> Stream (Of a) m b
forall (m :: * -> *) a r a.
(MonadIO m, Exception a) =>
r -> QQ (MVar (Either a a)) -> Stream (Of a) m r
goE b
r QQ (MVar (Either SomeException a))
qq

    -- input ended, empty the queue
    goE :: r -> QQ (MVar (Either a a)) -> Stream (Of a) m r
goE r :: r
r !QQ (MVar (Either a a))
qq = case QQ (MVar (Either a a))
-> Maybe (MVar (Either a a), QQ (MVar (Either a a)))
forall a. QQ a -> Maybe (a, QQ a)
popQ QQ (MVar (Either a a))
qq of
        Nothing      -> r -> Stream (Of a) m r
forall (f :: * -> *) a. Applicative f => a -> f a
pure r
r
        Just (a :: MVar (Either a a)
a,qq' :: QQ (MVar (Either a a))
qq') -> MVar (Either a a) -> Stream (Of a) m a
forall (m :: * -> *) a b.
(MonadIO m, Exception a) =>
MVar (Either a b) -> m b
reap MVar (Either a a)
a Stream (Of a) m a -> (a -> Stream (Of a) m r) -> Stream (Of a) m r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Of a (Stream (Of a) m r) -> Stream (Of a) m r
forall (m :: * -> *) (f :: * -> *) r.
(Monad m, Functor f) =>
f (Stream f m r) -> Stream f m r
wrap (Of a (Stream (Of a) m r) -> Stream (Of a) m r)
-> (a -> Of a (Stream (Of a) m r)) -> a -> Stream (Of a) m r
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. (a -> Stream (Of a) m r -> Of a (Stream (Of a) m r)
forall a b. a -> b -> Of a b
:> r -> QQ (MVar (Either a a)) -> Stream (Of a) m r
goE r
r QQ (MVar (Either a a))
qq')

    spawn :: IO a -> IO (MVar (Either SomeException a))
    spawn :: IO a -> IO (MVar (Either SomeException a))
spawn k :: IO a
k = IO (MVar (Either SomeException a))
forall a. IO (MVar a)
newEmptyMVar                  IO (MVar (Either SomeException a))
-> (MVar (Either SomeException a)
    -> IO (MVar (Either SomeException a)))
-> IO (MVar (Either SomeException a))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \mv :: MVar (Either SomeException a)
mv ->
              IO () -> IO ThreadId
forkIO (IO a -> IO (Either SomeException a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try IO a
k IO (Either SomeException a)
-> (Either SomeException a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MVar (Either SomeException a) -> Either SomeException a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException a)
mv) IO ThreadId
-> IO (MVar (Either SomeException a))
-> IO (MVar (Either SomeException a))
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
              MVar (Either SomeException a) -> IO (MVar (Either SomeException a))
forall (m :: * -> *) a. Monad m => a -> m a
return MVar (Either SomeException a)
mv

    reap :: MVar (Either a b) -> m b
reap mv :: MVar (Either a b)
mv = IO (Either a b) -> m (Either a b)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (MVar (Either a b) -> IO (Either a b)
forall a. MVar a -> IO a
takeMVar MVar (Either a b)
mv) m (Either a b) -> (Either a b -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (a -> m b) -> (b -> m b) -> Either a b -> m b
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (IO b -> m b
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO b -> m b) -> (a -> IO b) -> a -> m b
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> IO b
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM) b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return


-- A very simple queue data type.
-- Invariants: q = QQ l f b --> l == length f + length b
--                          --> l == 0 ==> null f

data QQ a = QQ !Int [a] [a]

emptyQ :: QQ a
emptyQ :: QQ a
emptyQ = Int -> [a] -> [a] -> QQ a
forall a. Int -> [a] -> [a] -> QQ a
QQ 0 [] []

lengthQ :: QQ a -> Int
lengthQ :: QQ a -> Int
lengthQ (QQ l :: Int
l _ _) = Int
l

pushQ :: a -> QQ a -> QQ a
pushQ :: a -> QQ a -> QQ a
pushQ a :: a
a (QQ l :: Int
l [] b :: [a]
b) = Int -> [a] -> [a] -> QQ a
forall a. Int -> [a] -> [a] -> QQ a
QQ (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
+1) ([a] -> [a]
forall a. [a] -> [a]
reverse (a
aa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
b)) []
pushQ a :: a
a (QQ l :: Int
l  f :: [a]
f b :: [a]
b) = Int -> [a] -> [a] -> QQ a
forall a. Int -> [a] -> [a] -> QQ a
QQ (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
+1) [a]
f (a
aa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
b)

popQ :: QQ a -> Maybe (a, QQ a)
popQ :: QQ a -> Maybe (a, QQ a)
popQ (QQ _ [    ] _) = Maybe (a, QQ a)
forall a. Maybe a
Nothing
popQ (QQ l :: Int
l [ a :: a
a  ] b :: [a]
b) = (a, QQ a) -> Maybe (a, QQ a)
forall a. a -> Maybe a
Just (a
a, Int -> [a] -> [a] -> QQ a
forall a. Int -> [a] -> [a] -> QQ a
QQ (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-1) ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
b) [])
popQ (QQ l :: Int
l (a :: a
a:fs :: [a]
fs) b :: [a]
b) = (a, QQ a) -> Maybe (a, QQ a)
forall a. a -> Maybe a
Just (a
a, Int -> [a] -> [a] -> QQ a
forall a. Int -> [a] -> [a] -> QQ a
QQ (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-1) [a]
fs [a]
b)


mergeStreams :: (Monad m, Ord a)
             => Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
mergeStreams :: Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
mergeStreams = (a -> a -> Ordering)
-> Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
forall (m :: * -> *) a r s.
Monad m =>
(a -> a -> Ordering)
-> Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
mergeStreamsBy a -> a -> Ordering
forall a. Ord a => a -> a -> Ordering
compare
{-# INLINE mergeStreams #-}

mergeStreamsOn :: (Monad m, Ord b)
               => (a -> b) -> Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
mergeStreamsOn :: (a -> b)
-> Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
mergeStreamsOn f :: a -> b
f = (a -> a -> Ordering)
-> Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
forall (m :: * -> *) a r s.
Monad m =>
(a -> a -> Ordering)
-> Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
mergeStreamsBy ((a -> b) -> a -> a -> Ordering
forall a b. Ord a => (b -> a) -> b -> b -> Ordering
comparing a -> b
f)
{-# INLINE mergeStreamsOn #-}

mergeStreamsBy :: Monad m
               => (a -> a -> Ordering)
               -> Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
mergeStreamsBy :: (a -> a -> Ordering)
-> Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
mergeStreamsBy cmp :: a -> a -> Ordering
cmp = Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
go
  where
    go :: Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
go str0 :: Stream (Of a) m r
str0 str1 :: Stream (Of a) m s
str1 = case Stream (Of a) m r
str0 of
      Return r0 :: r
r0         -> (\r1 :: s
r1 -> (r
r0, s
r1)) (s -> (r, s)) -> Stream (Of a) m s -> Stream (Of a) m (r, s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Stream (Of a) m s
str1
      Effect m :: m (Stream (Of a) m r)
m          -> m (Stream (Of a) m (r, s)) -> Stream (Of a) m (r, s)
forall (f :: * -> *) (m :: * -> *) r.
m (Stream f m r) -> Stream f m r
Effect (m (Stream (Of a) m (r, s)) -> Stream (Of a) m (r, s))
-> m (Stream (Of a) m (r, s)) -> Stream (Of a) m (r, s)
forall a b. (a -> b) -> a -> b
$ (Stream (Of a) m r -> Stream (Of a) m (r, s))
-> m (Stream (Of a) m r) -> m (Stream (Of a) m (r, s))
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (\str :: Stream (Of a) m r
str -> Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
go Stream (Of a) m r
str Stream (Of a) m s
str1) m (Stream (Of a) m r)
m
      Step (a :: a
a :> rest0 :: Stream (Of a) m r
rest0) -> case Stream (Of a) m s
str1 of
        Return r1 :: s
r1         -> (\r0 :: r
r0 -> (r
r0, s
r1)) (r -> (r, s)) -> Stream (Of a) m r -> Stream (Of a) m (r, s)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Stream (Of a) m r
str0
        Effect m :: m (Stream (Of a) m s)
m          -> m (Stream (Of a) m (r, s)) -> Stream (Of a) m (r, s)
forall (f :: * -> *) (m :: * -> *) r.
m (Stream f m r) -> Stream f m r
Effect (m (Stream (Of a) m (r, s)) -> Stream (Of a) m (r, s))
-> m (Stream (Of a) m (r, s)) -> Stream (Of a) m (r, s)
forall a b. (a -> b) -> a -> b
$ (Stream (Of a) m s -> Stream (Of a) m (r, s))
-> m (Stream (Of a) m s) -> m (Stream (Of a) m (r, s))
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
go Stream (Of a) m r
str0) m (Stream (Of a) m s)
m
        Step (b :: a
b :> rest1 :: Stream (Of a) m s
rest1) -> case a -> a -> Ordering
cmp a
a a
b of
          LT -> Of a (Stream (Of a) m (r, s)) -> Stream (Of a) m (r, s)
forall (f :: * -> *) (m :: * -> *) r.
f (Stream f m r) -> Stream f m r
Step (a
a a -> Stream (Of a) m (r, s) -> Of a (Stream (Of a) m (r, s))
forall a b. a -> b -> Of a b
:> Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
go Stream (Of a) m r
rest0 Stream (Of a) m s
str1)
          EQ -> Of a (Stream (Of a) m (r, s)) -> Stream (Of a) m (r, s)
forall (f :: * -> *) (m :: * -> *) r.
f (Stream f m r) -> Stream f m r
Step (a
a a -> Stream (Of a) m (r, s) -> Of a (Stream (Of a) m (r, s))
forall a b. a -> b -> Of a b
:> Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
go Stream (Of a) m r
rest0 Stream (Of a) m s
str1) -- left-biased
          GT -> Of a (Stream (Of a) m (r, s)) -> Stream (Of a) m (r, s)
forall (f :: * -> *) (m :: * -> *) r.
f (Stream f m r) -> Stream f m r
Step (a
b a -> Stream (Of a) m (r, s) -> Of a (Stream (Of a) m (r, s))
forall a b. a -> b -> Of a b
:> Stream (Of a) m r -> Stream (Of a) m s -> Stream (Of a) m (r, s)
go Stream (Of a) m r
str0 Stream (Of a) m s
rest1)
{-# INLINABLE mergeStreamsBy #-}

-- | A general progress indicator that logs some message after a set
-- number of records have passed through.
progressGen :: MonadLog m => (Int -> a -> String) -> Int -> Q.Stream (Q.Of a) m r -> Q.Stream (Q.Of a) m r
progressGen :: (Int -> a -> FilePath)
-> Int -> Stream (Of a) m r -> Stream (Of a) m r
progressGen msg :: Int -> a -> FilePath
msg sz :: Int
sz = Int -> Stream (Of a) m r -> Stream (Of a) m r
go 0
  where
    go :: Int -> Stream (Of a) m r -> Stream (Of a) m r
go   !Int
n = m (Either r (a, Stream (Of a) m r))
-> Stream (Of a) m (Either r (a, Stream (Of a) m r))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Either r (a, Stream (Of a) m r))
 -> Stream (Of a) m (Either r (a, Stream (Of a) m r)))
-> (Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r)))
-> Stream (Of a) m r
-> Stream (Of a) m (Either r (a, Stream (Of a) m r))
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Either r (a, Stream (Of a) m r))
Q.next (Stream (Of a) m r
 -> Stream (Of a) m (Either r (a, Stream (Of a) m r)))
-> (Either r (a, Stream (Of a) m r) -> Stream (Of a) m r)
-> Stream (Of a) m r
-> Stream (Of a) m r
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> (r -> Stream (Of a) m r)
-> ((a, Stream (Of a) m r) -> Stream (Of a) m r)
-> Either r (a, Stream (Of a) m r)
-> Stream (Of a) m r
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either r -> Stream (Of a) m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, MonadLog m, Functor (t m)) =>
a -> t m a
fin (Int -> (a, Stream (Of a) m r) -> Stream (Of a) m r
step (Int -> (a, Stream (Of a) m r) -> Stream (Of a) m r)
-> Int -> (a, Stream (Of a) m r) -> Stream (Of a) m r
forall a b. (a -> b) -> a -> b
$ Int -> Int
forall a. Enum a => a -> a
succ Int
n)
    step :: Int -> (a, Stream (Of a) m r) -> Stream (Of a) m r
step !Int
n (a :: a
a,s :: Stream (Of a) m r
s) = do Bool -> Stream (Of a) m () -> Stream (Of a) m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
sz Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== 0) (Stream (Of a) m () -> Stream (Of a) m ())
-> (FilePath -> Stream (Of a) m ())
-> FilePath
-> Stream (Of a) m ()
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. m () -> Stream (Of a) m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> Stream (Of a) m ())
-> (FilePath -> m ()) -> FilePath -> Stream (Of a) m ()
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. FilePath -> m ()
forall (m :: * -> *). MonadLog m => FilePath -> m ()
logString_ (FilePath -> Stream (Of a) m ()) -> FilePath -> Stream (Of a) m ()
forall a b. (a -> b) -> a -> b
$ Int -> a -> FilePath
msg Int
n a
a
                       a -> Stream (Of a) m r -> Stream (Of a) m r
forall (m :: * -> *) a r.
Monad m =>
a -> Stream (Of a) m r -> Stream (Of a) m r
Q.cons a
a (Int -> Stream (Of a) m r -> Stream (Of a) m r
go Int
n Stream (Of a) m r
s)
    fin :: a -> t m a
fin r :: a
r = a
r a -> t m () -> t m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ m () -> t m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (FilePath -> m ()
forall (m :: * -> *). MonadLog m => FilePath -> m ()
logString_ "")

-- | A simple progress indicator that logs the number of records.
progressNum :: MonadLog m => String -> Int -> Q.Stream (Q.Of a) m r -> Q.Stream (Q.Of a) m r
progressNum :: FilePath -> Int -> Stream (Of a) m r -> Stream (Of a) m r
progressNum msg :: FilePath
msg = (Int -> a -> FilePath)
-> Int -> Stream (Of a) m r -> Stream (Of a) m r
forall (m :: * -> *) a r.
MonadLog m =>
(Int -> a -> FilePath)
-> Int -> Stream (Of a) m r -> Stream (Of a) m r
progressGen (\n :: Int
n _ -> FilePath
msg FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ " " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> FilePath
forall a. Show a => a -> FilePath
showNum Int
n)

-- | A simple progress indicator that logs a position every set number
-- of passed records.
progressPos :: MonadLog m
            => (a -> (Refseq, Int)) -> String -> Refs -> Int
            -> Q.Stream (Q.Of a) m r -> Q.Stream (Q.Of a) m r
progressPos :: (a -> (Refseq, Int))
-> FilePath
-> Refs
-> Int
-> Stream (Of a) m r
-> Stream (Of a) m r
progressPos f :: a -> (Refseq, Int)
f msg :: FilePath
msg refs :: Refs
refs =
    (Int -> a -> FilePath)
-> Int -> Stream (Of a) m r -> Stream (Of a) m r
forall (m :: * -> *) a r.
MonadLog m =>
(Int -> a -> FilePath)
-> Int -> Stream (Of a) m r -> Stream (Of a) m r
progressGen ((Int -> a -> FilePath)
 -> Int -> Stream (Of a) m r -> Stream (Of a) m r)
-> (Int -> a -> FilePath)
-> Int
-> Stream (Of a) m r
-> Stream (Of a) m r
forall a b. (a -> b) -> a -> b
$ \_ a :: a
a -> let (!Refseq
rs1, !Int
po1) = a -> (Refseq, Int)
f a
a
                              !nm :: FilePath
nm = Bytes -> FilePath
forall s. Unpack s => s -> FilePath
unpack (Bytes -> FilePath) -> (BamSQ -> Bytes) -> BamSQ -> FilePath
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. BamSQ -> Bytes
sq_name (BamSQ -> FilePath) -> BamSQ -> FilePath
forall a b. (a -> b) -> a -> b
$ Refs -> Refseq -> BamSQ
getRef Refs
refs Refseq
rs1
                          in FilePath
msg FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ " " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ FilePath
nm FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ ":" FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> FilePath
forall a. Show a => a -> FilePath
showNum Int
po1