{-# LANGUAGE BlockArguments #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} -- | This library has 2 high level functions -- -- * 'cursorPipe' to stream a query from postgres DB -- * 'connect' to handle connection in 'SafeT IO' -- -- Here is how usage looks like from a production excerpt for a table of samples -- -- @ -- -- mkDB :: ByteString -- mkDB = -- [qmb| -- create table samples ( -- id uuid not null, -- time bigint not null, -- value double precision not null, -- ); -- |] -- -- streamSamples -- :: RunSession -- -> Int64 -- ^ first sample time -- -> Producer (UUID, Int64, Double) (SafeT IO) () -- streamSamples (RunSession run) from' = -- yield from' -- >-> cursorPipe -- do run -- do samplesEncoder -- do samplesDecoder -- do Cursor "sample_cursor" -- do Template [qmb| select * from samples where time >= $1 order by time; |] -- do 1000 -- -- samplesEncoder :: Params Int64 -- samplesEncoder = param $ E.nonNullable E.int8 -- -- samplesDecoder :: Row (UUID, Int64, Double) -- samplesDecoder = -- (,,) -- <$> do column . D.nonNullable $ D.uuid -- <*> do column . D.nonNullable $ D.int8 -- <*> do column . D.nonNullable $ D.float8 -- -- localConnect :: Trace IO DatabaseLog -> SafeT IO RunSession -- localConnect tracer = do -- connect tracer $ -- settings -- "10.1.9.95" -- 5432 -- "postgres" -- "postgres" -- "postgres" -- -- main :: IO () -- main = do -- ts <- getPOSIXTime -- runSafeT $ do -- run' <- localConnect pPrint -- runEffect $ streamSamples run' (floor ts - 600) >-> P.print -- -- -- @ module Hasql.Pipes where import qualified ByteString.TreeBuilder as TB import Hasql.Connection (acquire, ConnectionError, Settings) import Hasql.Decoders (Row, rowList) import qualified Hasql.Decoders as D import Hasql.Encoders (Params) import qualified Hasql.Encoders as E import Hasql.Session import Hasql.Statement import Pipes import qualified Pipes as P import Pipes.Safe import qualified Pipes.Safe as P import Protolude import qualified Hasql.Connection as HC -- | cursor name newtype Cursor = Cursor ByteString deriving (IsString) -- | query to run newtype Template = Template ByteString deriving (IsString) -- | a statement to declare a cursor parametrized over some parameters declareCursor :: E.Params a -- ^ paramenters encoding -> Cursor -- ^ cursor name -> Template -- ^ query template -> Statement a () declareCursor encoder (Cursor name) (Template template) = Statement sql' encoder D.noResult False where sql' = TB.toByteString $ "DECLARE " <> TB.byteString name <> " NO SCROLL CURSOR FOR " <> TB.byteString template -- | a statement to close the cursor closeCursor :: Cursor -- ^ cursor name -> Statement () () closeCursor (Cursor name) = Statement sql' E.noParams D.noResult True where sql' = "CLOSE " <> name -- | a statement to fetch given number of rows from cursor forward and apply decoders fetchFromCursor :: Cursor -- ^ cursor name -> Batch -- ^ max number of rows to fetch -> D.Result result -- ^ row decoders -> Statement () result fetchFromCursor (Cursor name) (Batch batch) decoder = Statement sql' E.noParams decoder True where sql' = TB.toByteString $ "FETCH FORWARD " <> TB.asciiIntegral batch <> " FROM " <> TB.byteString name beginTransaction :: Session () beginTransaction = sql "BEGIN TRANSACTION" endTransaction :: Session () endTransaction = sql "END TRANSACTION" -- | number of rows newtype Batch = Batch Int deriving (Num) -- | stream rows for queries of the same template cursorPipe :: (forall b. Session b -> IO b) -- ^ execute a session command -> Params z -- ^ query parameters encoders -> Row a -- ^ row decoders -> Cursor -- ^ desidered cursor name -> Template -- ^ query template -> Batch -- ^ number of rows to repeat fetching -> Pipe z a (SafeT IO) () cursorPipe runS encoders decoders cursor template batch = forever $ do parameters <- await void $ liftIO $ runS do beginTransaction statement parameters $ declareCursor encoders cursor template transaction <- lift $ register $ runS do statement () $ closeCursor cursor endTransaction let loop = do xs <- liftIO $ runS $ statement () $ fetchFromCursor cursor batch $ rowList decoders case xs of [] -> pure () _ -> P.each xs >> loop loop lift $ P.release transaction data DatabaseLog = ConnectionReady | ConnectionClosed | ConnectionFailed ConnectionError | QueryFailed QueryError deriving (Show) newtype RunSession = RunSession (forall a. Session a -> IO a) connect :: (DatabaseLog -> IO ()) -> Settings -> SafeT IO RunSession connect tracer settings' = do mconnection <- liftIO (acquire settings') connection <- case mconnection of Left e -> do liftIO $ tracer $ ConnectionFailed e panic "no connection" Right c -> pure c liftIO $ tracer ConnectionReady void $ register $ do liftIO $ HC.release connection tracer ConnectionClosed pure $ RunSession \s -> do result <- run s connection case result of Left e -> tracer (QueryFailed e) >> panic "query failed" Right r -> pure r