{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}

-- | This library has 2 high level functions
-- 
--  * 'cursorPipe' to stream a query from postgres DB
--  * 'connect' to handle connection in 'SafeT IO'
-- 
-- Here is how usage looks like from a production excerpt for a table of samples
--
-- @
--
-- mkDB :: ByteString
-- mkDB =
--   [qmb|
--     create table samples (
--       id         uuid not null,
--       time       bigint not null,
--       value      double precision not null,
--     );
--  |]
--
-- streamSamples
--   :: RunSession
--   -> Int64 -- ^ first sample time
--   -> Producer (UUID, Int64, Double) (SafeT IO) ()
-- streamSamples (RunSession run) from' =
--   yield from'
--     >-> cursorPipe
--       do run
--       do samplesEncoder
--       do samplesDecoder
--       do Cursor "sample_cursor"
--       do Template [qmb| select * from samples where time >= $1 order by time; |]
--       do 1000
--
-- samplesEncoder :: Params Int64
-- samplesEncoder = param $ E.nonNullable E.int8
--
-- samplesDecoder :: Row (UUID, Int64, Double)
-- samplesDecoder =
--   (,,)
--     <$> do column . D.nonNullable $ D.uuid
--     <*> do column . D.nonNullable $ D.int8
--     <*> do column . D.nonNullable $ D.float8
--
-- localConnect :: Trace IO DatabaseLog -> SafeT IO RunSession
-- localConnect tracer = do
--   connect tracer $
--     settings
--       "10.1.9.95"
--       5432
--       "postgres"
--       "postgres"
--       "postgres"
--
-- main :: IO ()
-- main = do
--   ts <- getPOSIXTime
--   runSafeT $ do
--     run' <- localConnect pPrint
--     runEffect $ streamSamples run' (floor ts - 600) >-> P.print
--
-- 
-- @


module Hasql.Pipes where

import qualified ByteString.TreeBuilder as TB
import Hasql.Connection (acquire, ConnectionError, Settings)
import Hasql.Decoders (Row, rowList)
import qualified Hasql.Decoders as D
import Hasql.Encoders (Params)
import qualified Hasql.Encoders as E
import Hasql.Session
import Hasql.Statement
import Pipes
import qualified Pipes as P
import Pipes.Safe
import qualified Pipes.Safe as P
import Protolude
import qualified Hasql.Connection as HC

-- | cursor name
newtype Cursor = Cursor ByteString deriving (IsString)

-- | query to run
newtype Template = Template ByteString deriving (IsString)

-- | a statement to declare a cursor parametrized over some parameters
declareCursor
  :: E.Params a -- ^ paramenters encoding
  -> Cursor -- ^ cursor name
  -> Template -- ^ query template
  -> Statement a ()
declareCursor encoder (Cursor name) (Template template) =
  Statement sql' encoder D.noResult False
  where
    sql' =
      TB.toByteString $
        "DECLARE "
          <> TB.byteString name
          <> " NO SCROLL CURSOR FOR "
          <> TB.byteString template

-- | a statement to close the cursor
closeCursor
  :: Cursor -- ^ cursor name
  -> Statement () ()
closeCursor (Cursor name) =
  Statement sql' E.noParams D.noResult True
  where
    sql' = "CLOSE " <> name

-- | a statement to fetch given number of rows from cursor forward and apply decoders
fetchFromCursor
  :: Cursor -- ^ cursor name
  -> Batch -- ^ max number of rows to fetch
  -> D.Result result -- ^ row decoders
  -> Statement () result
fetchFromCursor (Cursor name) (Batch batch) decoder =
  Statement sql' E.noParams decoder True
  where
    sql' =
      TB.toByteString $
        "FETCH FORWARD "
          <> TB.asciiIntegral batch
          <> " FROM "
          <> TB.byteString name

beginTransaction :: Session ()
beginTransaction = sql "BEGIN TRANSACTION"

endTransaction :: Session ()
endTransaction = sql "END TRANSACTION"

-- | number of rows
newtype Batch = Batch Int deriving (Num)

-- | stream rows for queries of the same template
cursorPipe
  :: (forall b. Session b -> IO b) -- ^ execute a session command
  -> Params z -- ^ query parameters encoders
  -> Row a -- ^ row decoders
  -> Cursor -- ^ desidered cursor name
  -> Template -- ^ query template
  -> Batch -- ^ number of rows to repeat fetching
  -> Pipe z a (SafeT IO) ()
cursorPipe runS encoders decoders cursor template batch = forever $ do
  parameters <- await
  void $
    liftIO $ runS do
      beginTransaction
      statement parameters $ declareCursor encoders cursor template
  transaction <- lift $
    register $ runS do
      statement () $ closeCursor cursor
      endTransaction
  let loop = do
        xs <-
          liftIO $
            runS $
              statement () $
                fetchFromCursor cursor batch $
                  rowList decoders
        case xs of
          [] -> pure ()
          _ -> P.each xs >> loop
  loop
  lift $ P.release transaction

data DatabaseLog
  = ConnectionReady
  | ConnectionClosed
  | ConnectionFailed ConnectionError
  | QueryFailed QueryError
  deriving (Show)

newtype RunSession = RunSession (forall a. Session a -> IO a)

connect :: (DatabaseLog -> IO ()) -> Settings -> SafeT IO RunSession
connect tracer settings' = do
  mconnection <- liftIO (acquire settings')
  connection <- case mconnection of
    Left e -> do
      liftIO $ tracer $ ConnectionFailed e
      panic "no connection"
    Right c -> pure c
  liftIO $ tracer ConnectionReady
  void $
    register $ do
      liftIO $ HC.release connection
      tracer ConnectionClosed
  pure $ RunSession \s -> do
    result <- run s connection
    case result of
      Left e -> tracer (QueryFailed e) >> panic "query failed"
      Right r -> pure r