hasql-pipes-0.1.0.1: A pipe to stream a postgres database cursor in the hasql ecosystem

Safe HaskellNone
LanguageHaskell2010

Hasql.Pipes

Description

This library has 2 high level functions

  • cursorPipe to stream a query from postgres DB
  • connect to handle connection in 'SafeT IO'

Here is how usage looks like from a production excerpt for a table of samples

mkDB :: ByteString
mkDB =
  [qmb|
    create table samples (
      id         uuid not null,
      time       bigint not null,
      value      double precision not null,
    );
 |]

streamSamples
  :: RunSession
  -> Int64 -- ^ first sample time
  -> Producer (UUID, Int64, Double) (SafeT IO) ()
streamSamples (RunSession run) from' =
  yield from'
    >-> cursorPipe
      do run
      do samplesEncoder
      do samplesDecoder
      do Cursor "sample_cursor"
      do Template [qmb| select * from samples where time >= $1 order by time; |]
      do 1000

samplesEncoder :: Params Int64
samplesEncoder = param $ E.nonNullable E.int8

samplesDecoder :: Row (UUID, Int64, Double)
samplesDecoder =
  (,,)
    $ do column . D.nonNullable $ D.uuid
    * do column . D.nonNullable $ D.int8
    * do column . D.nonNullable $ D.float8

localConnect :: Trace IO DatabaseLog -> SafeT IO RunSession
localConnect tracer = do
  connect tracer $
    settings
      "10.1.9.95"
      5432
      "postgres"
      "postgres"
      "postgres"

main :: IO ()
main = do
  ts <- getPOSIXTime
  runSafeT $ do
    run' <- localConnect pPrint
    runEffect $ streamSamples run' (floor ts - 600) >-> P.print


Synopsis

Documentation

newtype Cursor Source #

cursor name

Constructors

Cursor ByteString 
Instances
IsString Cursor Source # 
Instance details

Defined in Hasql.Pipes

Methods

fromString :: String -> Cursor #

newtype Template Source #

query to run

Constructors

Template ByteString 
Instances
IsString Template Source # 
Instance details

Defined in Hasql.Pipes

declareCursor Source #

Arguments

:: Params a

paramenters encoding

-> Cursor

cursor name

-> Template

query template

-> Statement a () 

a statement to declare a cursor parametrized over some parameters

closeCursor Source #

Arguments

:: Cursor

cursor name

-> Statement () () 

a statement to close the cursor

fetchFromCursor Source #

Arguments

:: Cursor

cursor name

-> Batch

max number of rows to fetch

-> Result result

row decoders

-> Statement () result 

a statement to fetch given number of rows from cursor forward and apply decoders

newtype Batch Source #

number of rows

Constructors

Batch Int 
Instances
Num Batch Source # 
Instance details

Defined in Hasql.Pipes

cursorPipe Source #

Arguments

:: (forall b. Session b -> IO b)

execute a session command

-> Params z

query parameters encoders

-> Row a

row decoders

-> Cursor

desidered cursor name

-> Template

query template

-> Batch

number of rows to repeat fetching

-> Pipe z a (SafeT IO) () 

stream rows for queries of the same template

newtype RunSession Source #

Constructors

RunSession (forall a. Session a -> IO a)