{-# LANGUAGE BangPatterns #-}
module Bio.PriorityQueue (
        Sizeable(..),
        PQ_Conf(..),

        PQ,
        withPQ,
        makePQ,
        deletePQ,
        enqueuePQ,
        dequeuePQ,
        getMinPQ,
        peekMinPQ,
        sizePQ
) where

import Data.Binary
import Data.IORef
import qualified Control.Exception as CE

-- | A Priority Queue that can fall back to external storage.  
-- 
-- Note that such a Priority Queue automatically gives rise to an
-- external sorting algorithm:  enqueue everything, dequeue until empty.
--
-- Whatever is to be stored in this queue needs to be in Binary, because
-- it may need to be moved to external storage on demand.  We also need
-- a way to estimate the memory consumption of an enqueued object.  When
-- constructing the queue, the maximum amount of RAM to consume is set.
-- Note that open input streams use memory for buffering, too.  
--
-- Enqueued objects are kept in an in memory heap until the memory
-- consumption becomes too high.  At that point, the whole heap is
-- sorted and dumped to external storage.  If necessary, the file to do
-- so is created and kept open.  The newly created stream is added to a
-- heap so that dequeueing objects amounts to performing a merge sort on
-- multiple external streams.  To conserve on file descriptors, we
-- concatenate multiple streams into a single file, then use pread(2) on
-- that as appropriate.  If too many streams are open (how do we set
-- that limit?), we do exactly that:  merge-sort all streams and the
-- in-memory heap into a single new stream.  One file is created for
-- each generation of streams, so that mergind handles streams of
-- roughly equal length.
--
-- XXX  Truth be told, this queue isn't backed externally, and ignores
--      all limits.  It *is* a Priority Queue, though!
--
-- XXX  May want to add callbacks for significant events (new file,
--      massive merge, deletion of file?)
--
-- XXX  Need to track memory consumption of input buffers.
--
-- XXX  Need a way to decide when too many streams are open.  That point
--      is reached when seeking takes about as much time as reading
--      (which depends on buffer size and system characteristics), so
--      that an additional merge pass becomes economical.
--
-- XXX  These will be useful:
--          unix-bytestring:System.Posix.IO.ByteString.fdPread
--          temporary:System.IO.Temp.openBinaryTempFile
--          lz4:Codec.Compression.LZ4

data PQ_Conf = PQ_Conf {
        max_mb :: Int,          -- ^ memory limit
        temp_path :: FilePath   -- ^ path to temporary files (a directory will be created)
        -- functions to report progress go here
    }

newtype PQ a = PQ (IORef (SkewHeap a, Int))

class Sizeable a where usedBytes :: a -> Int

-- | Creates a priority queue.  Note that the priority queue creates
-- files, which will only be cleaned up if deletePQ is called.
makePQ :: (Binary a, Ord a, Sizeable a) => PQ_Conf -> IO (PQ a)
makePQ _ = PQ `fmap` newIORef (Empty,0)

-- | Deletes the priority queue and all associated temporary files.
deletePQ :: PQ a -> IO ()
deletePQ (PQ _) = return ()

withPQ :: (Binary a, Ord a, Sizeable a) => PQ_Conf -> (PQ a -> IO b) -> IO b
withPQ conf = CE.bracket (makePQ conf) deletePQ

-- | Enqueues an element.
-- This operation may result in the creation of a file or in an enormous
-- merge of already created files.
enqueuePQ :: (Binary a, Ord a, Sizeable a) => a -> PQ a -> IO ()
enqueuePQ a (PQ pq) = do (p,s) <- readIORef pq
                         let !p' = insert a p
                             !s' = 1 + s
                         writeIORef pq (p',s')

-- | Removes the minimum element from the queue.
-- If the queue is already empty, nothing happens.  As a result, it is
-- possible that one or more file become empty and are deleted.
dequeuePQ :: (Binary a, Ord a, Sizeable a ) => PQ a -> IO ()
dequeuePQ (PQ pq) = do (p,s) <- readIORef pq
                       let !p' = dropMin p
                           !s' = max 0 (s - 1)
                       writeIORef pq (p',s')


-- | Returns the minimum element from the queue.  
-- If the queue is empty, Nothing is returned.  Else the minimum element
-- currently in the queue.
peekMinPQ :: (Binary a, Ord a, Sizeable a) => PQ a -> IO (Maybe a)
peekMinPQ (PQ pq) = (getMin . fst) `fmap` readIORef pq

getMinPQ :: (Binary a, Ord a, Sizeable a) => PQ a -> IO (Maybe a)
getMinPQ (PQ pq) = do r <- (getMin . fst) `fmap` readIORef pq
                      case r of Nothing -> return () ; Just _ -> dequeuePQ  (PQ pq)
                      return r

sizePQ :: (Binary a, Ord a, Sizeable a) => PQ a -> IO Int
sizePQ (PQ pq) = snd `fmap` readIORef pq


-- We need an in-memory heap anyway.  Here's a skew heap.
data SkewHeap a = Empty | Node a (SkewHeap a) (SkewHeap a)
 
singleton :: Ord a => a -> SkewHeap a
singleton x = Node x Empty Empty
 
union :: Ord a => SkewHeap a -> SkewHeap a -> SkewHeap a
Empty              `union` t2                 = t2
t1                 `union` Empty              = t1
t1@(Node x1 l1 r1) `union` t2@(Node x2 l2 r2)
   | x1 <= x2                                 = Node x1 (t2 `union` r1) l1
   | otherwise                                = Node x2 (t1 `union` r2) l2
 
insert :: Ord a => a -> SkewHeap a -> SkewHeap a
insert x heap = singleton x `union` heap
 
getMin :: Ord a => SkewHeap a -> Maybe a
getMin Empty        = Nothing
getMin (Node x _ _) = Just x
 
dropMin :: Ord a => SkewHeap a -> SkewHeap a
dropMin Empty        = error "dropMin on empty queue... are you sure?!"
dropMin (Node _ l r) = l `union` r