{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
module Hasql.Pipes where
import qualified ByteString.TreeBuilder as TB
import Hasql.Connection (acquire, ConnectionError, Settings)
import Hasql.Decoders (Row, rowList)
import qualified Hasql.Decoders as D
import Hasql.Encoders (Params)
import qualified Hasql.Encoders as E
import Hasql.Session
import Hasql.Statement
import Pipes
import qualified Pipes as P
import Pipes.Safe
import qualified Pipes.Safe as P
import Protolude
import qualified Hasql.Connection as HC
newtype Cursor = Cursor ByteString deriving (IsString)
newtype Template = Template ByteString deriving (IsString)
declareCursor
:: E.Params a
-> Cursor
-> Template
-> Statement a ()
declareCursor encoder (Cursor name) (Template template) =
Statement sql' encoder D.noResult False
where
sql' =
TB.toByteString $
"DECLARE "
<> TB.byteString name
<> " NO SCROLL CURSOR FOR "
<> TB.byteString template
closeCursor
:: Cursor
-> Statement () ()
closeCursor (Cursor name) =
Statement sql' E.noParams D.noResult True
where
sql' = "CLOSE " <> name
fetchFromCursor
:: Cursor
-> Batch
-> D.Result result
-> Statement () result
fetchFromCursor (Cursor name) (Batch batch) decoder =
Statement sql' E.noParams decoder True
where
sql' =
TB.toByteString $
"FETCH FORWARD "
<> TB.asciiIntegral batch
<> " FROM "
<> TB.byteString name
beginTransaction :: Session ()
beginTransaction = sql "BEGIN TRANSACTION"
endTransaction :: Session ()
endTransaction = sql "END TRANSACTION"
newtype Batch = Batch Int deriving (Num)
cursorPipe
:: (forall b. Session b -> IO b)
-> Params z
-> Row a
-> Cursor
-> Template
-> Batch
-> Pipe z a (SafeT IO) ()
cursorPipe runS encoders decoders cursor template batch = forever $ do
parameters <- await
void $
liftIO $ runS do
beginTransaction
statement parameters $ declareCursor encoders cursor template
transaction <- lift $
register $ runS do
statement () $ closeCursor cursor
endTransaction
let loop = do
xs <-
liftIO $
runS $
statement () $
fetchFromCursor cursor batch $
rowList decoders
case xs of
[] -> pure ()
_ -> P.each xs >> loop
loop
lift $ P.release transaction
data DatabaseLog
= ConnectionReady
| ConnectionClosed
| ConnectionFailed ConnectionError
| QueryFailed QueryError
deriving (Show)
newtype RunSession = RunSession (forall a. Session a -> IO a)
connect :: (DatabaseLog -> IO ()) -> Settings -> SafeT IO RunSession
connect tracer settings' = do
mconnection <- liftIO (acquire settings')
connection <- case mconnection of
Left e -> do
liftIO $ tracer $ ConnectionFailed e
panic "no connection"
Right c -> pure c
liftIO $ tracer ConnectionReady
void $
register $ do
liftIO $ HC.release connection
tracer ConnectionClosed
pure $ RunSession \s -> do
result <- run s connection
case result of
Left e -> tracer (QueryFailed e) >> panic "query failed"
Right r -> pure r