{-# LANGUAGE CPP #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}

-- | More efficient query execution functions for @beam-postgres@. These
-- functions use the @conduit@ package, to execute @beam-postgres@ statements in
-- an arbitrary 'MonadIO'. These functions may be more efficient for streaming
-- operations than 'MonadBeam'.
module Database.Beam.Postgres.Conduit
  ( streamingRunSelect
  , runInsert
  , streamingRunInsertReturning
  , runUpdate
  , streamingRunUpdateReturning
  , runDelete
  , streamingRunDeleteReturning
  , executeStatement
  , streamingRunQueryReturning
  -- * Deprecated streaming variants
  , runSelect
  , runInsertReturning
  , runUpdateReturning
  , runDeleteReturning
  , runQueryReturning
  ) where

import           Database.Beam hiding (runInsert, runUpdate, runDelete)
import           Database.Beam.Postgres.Connection
import           Database.Beam.Postgres.Full
import           Database.Beam.Postgres.Syntax
import           Database.Beam.Postgres.Types

import           Control.Concurrent.MVar (takeMVar, putMVar)
import           Control.Exception.Base (bracket, throwIO)
import           Control.Exception.Lifted (finally)
import qualified Control.Exception.Lifted as Lifted
import qualified Control.Concurrent.MVar.Lifted as Lifted
import           Control.Monad.Trans.Control (MonadBaseControl)

import qualified Database.PostgreSQL.LibPQ as Pg hiding
  (Connection, escapeStringConn, escapeIdentifier, escapeByteaConn, exec)
import qualified Database.PostgreSQL.LibPQ as Pq
import qualified Database.PostgreSQL.Simple as Pg
import qualified Database.PostgreSQL.Simple.Internal as Pg
import           Database.PostgreSQL.Simple.Internal (connectionHandle)
import qualified Database.PostgreSQL.Simple.Types as Pg (Query(..))

import qualified Conduit as C
import           Data.Int (Int64)
import           Data.Maybe (fromMaybe)
#if !MIN_VERSION_base(4, 11, 0)
import           Data.Semigroup
#endif

import qualified Control.Monad.Fail as Fail

#if MIN_VERSION_conduit(1,3,0)
#define CONDUIT_TRANSFORMER C.ConduitT
#else
#define CONDUIT_TRANSFORMER C.ConduitM
#endif

-- * @SELECT@

-- | Run a PostgreSQL @SELECT@ statement in any 'C.MonadResource'.
streamingRunSelect :: ( C.MonadResource m, Fail.MonadFail m, FromBackendRow Postgres a )
                   => Pg.Connection -> SqlSelect Postgres a
                   -> CONDUIT_TRANSFORMER () a m ()
streamingRunSelect :: forall (m :: * -> *) a.
(MonadResource m, MonadFail m, FromBackendRow Postgres a) =>
Connection -> SqlSelect Postgres a -> ConduitT () a m ()
streamingRunSelect Connection
conn (SqlSelect (PgSelectSyntax PgSyntax
syntax)) =
  forall (m :: * -> *) r.
(MonadResource m, MonadFail m, FromBackendRow Postgres r) =>
Connection -> PgSyntax -> ConduitT () r m ()
streamingRunQueryReturning Connection
conn PgSyntax
syntax

-- | Run a PostgreSQL @SELECT@ statement in any 'MonadIO'.
runSelect :: ( MonadIO m, Fail.MonadFail m, MonadBaseControl IO m, FromBackendRow Postgres a )
          => Pg.Connection -> SqlSelect Postgres a
          -> (CONDUIT_TRANSFORMER () a m () -> m b) -> m b
runSelect :: forall (m :: * -> *) a b.
(MonadIO m, MonadFail m, MonadBaseControl IO m,
 FromBackendRow Postgres a) =>
Connection
-> SqlSelect Postgres a -> (ConduitT () a m () -> m b) -> m b
runSelect Connection
conn (SqlSelect (PgSelectSyntax PgSyntax
syntax)) ConduitT () a m () -> m b
withSrc =
  forall (m :: * -> *) r b.
(MonadIO m, MonadFail m, MonadBaseControl IO m, Functor m,
 FromBackendRow Postgres r) =>
Connection -> PgSyntax -> (ConduitT () r m () -> m b) -> m b
runQueryReturning Connection
conn PgSyntax
syntax ConduitT () a m () -> m b
withSrc
{-# DEPRECATED runSelect "Use streamingRunSelect" #-}


-- * @INSERT@

-- | Run a PostgreSQL @INSERT@ statement in any 'MonadIO'. Returns the number of
-- rows affected.
runInsert :: MonadIO m
          => Pg.Connection -> SqlInsert Postgres tbl -> m Int64
runInsert :: forall (m :: * -> *) (tbl :: (* -> *) -> *).
MonadIO m =>
Connection -> SqlInsert Postgres tbl -> m Int64
runInsert Connection
_ SqlInsert Postgres tbl
SqlInsertNoRows = forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
runInsert Connection
conn (SqlInsert TableSettings tbl
_ (PgInsertSyntax PgSyntax
i)) =
  forall (m :: * -> *).
MonadIO m =>
Connection -> PgSyntax -> m Int64
executeStatement Connection
conn PgSyntax
i

-- | Run a PostgreSQL @INSERT ... RETURNING ...@ statement in any 'C.MonadResource' and
-- get a 'C.Source' of the newly inserted rows.
streamingRunInsertReturning :: ( C.MonadResource m, Fail.MonadFail m, FromBackendRow Postgres a )
                            => Pg.Connection
                            -> PgInsertReturning a
                            -> CONDUIT_TRANSFORMER () a m ()
streamingRunInsertReturning :: forall (m :: * -> *) a.
(MonadResource m, MonadFail m, FromBackendRow Postgres a) =>
Connection -> PgInsertReturning a -> ConduitT () a m ()
streamingRunInsertReturning Connection
_ PgInsertReturning a
PgInsertReturningEmpty = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
streamingRunInsertReturning Connection
conn (PgInsertReturning PgSyntax
i) =
    forall (m :: * -> *) r.
(MonadResource m, MonadFail m, FromBackendRow Postgres r) =>
Connection -> PgSyntax -> ConduitT () r m ()
streamingRunQueryReturning Connection
conn PgSyntax
i

-- | Run a PostgreSQL @INSERT ... RETURNING ...@ statement in any 'MonadIO' and
-- get a 'C.Source' of the newly inserted rows.
runInsertReturning :: ( MonadIO m, Fail.MonadFail m, MonadBaseControl IO m, FromBackendRow Postgres a )
                   => Pg.Connection
                   -> PgInsertReturning a
                   -> (CONDUIT_TRANSFORMER () a m () -> m b)
                   -> m b
runInsertReturning :: forall (m :: * -> *) a b.
(MonadIO m, MonadFail m, MonadBaseControl IO m,
 FromBackendRow Postgres a) =>
Connection
-> PgInsertReturning a -> (ConduitT () a m () -> m b) -> m b
runInsertReturning Connection
_ PgInsertReturning a
PgInsertReturningEmpty ConduitT () a m () -> m b
withSrc = ConduitT () a m () -> m b
withSrc (forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
runInsertReturning Connection
conn (PgInsertReturning PgSyntax
i) ConduitT () a m () -> m b
withSrc =
    forall (m :: * -> *) r b.
(MonadIO m, MonadFail m, MonadBaseControl IO m, Functor m,
 FromBackendRow Postgres r) =>
Connection -> PgSyntax -> (ConduitT () r m () -> m b) -> m b
runQueryReturning Connection
conn PgSyntax
i ConduitT () a m () -> m b
withSrc
{-# DEPRECATED runInsertReturning "Use streamingRunInsertReturning" #-}

-- * @UPDATE@

-- | Run a PostgreSQL @UPDATE@ statement in any 'MonadIO'. Returns the number of
-- rows affected.
runUpdate :: MonadIO m
          => Pg.Connection -> SqlUpdate Postgres tbl -> m Int64
runUpdate :: forall (m :: * -> *) (tbl :: (* -> *) -> *).
MonadIO m =>
Connection -> SqlUpdate Postgres tbl -> m Int64
runUpdate Connection
_ SqlUpdate Postgres tbl
SqlIdentityUpdate = forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
runUpdate Connection
conn (SqlUpdate TableSettings tbl
_ (PgUpdateSyntax PgSyntax
i)) =
    forall (m :: * -> *).
MonadIO m =>
Connection -> PgSyntax -> m Int64
executeStatement Connection
conn PgSyntax
i

-- | Run a PostgreSQL @UPDATE ... RETURNING ...@ statement in any 'C.MonadResource' and
-- get a 'C.Source' of the newly updated rows.
streamingRunUpdateReturning :: ( C.MonadResource m, Fail.MonadFail m, FromBackendRow Postgres a)
                            => Pg.Connection
                            -> PgUpdateReturning a
                            -> CONDUIT_TRANSFORMER () a m ()
streamingRunUpdateReturning :: forall (m :: * -> *) a.
(MonadResource m, MonadFail m, FromBackendRow Postgres a) =>
Connection -> PgUpdateReturning a -> ConduitT () a m ()
streamingRunUpdateReturning Connection
_ PgUpdateReturning a
PgUpdateReturningEmpty = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
streamingRunUpdateReturning Connection
conn (PgUpdateReturning PgSyntax
u) =
  forall (m :: * -> *) r.
(MonadResource m, MonadFail m, FromBackendRow Postgres r) =>
Connection -> PgSyntax -> ConduitT () r m ()
streamingRunQueryReturning Connection
conn PgSyntax
u

-- | Run a PostgreSQL @UPDATE ... RETURNING ...@ statement in any 'MonadIO' and
-- get a 'C.Source' of the newly updated rows.
runUpdateReturning :: ( MonadIO m, Fail.MonadFail m, MonadBaseControl IO m, FromBackendRow Postgres a)
                   => Pg.Connection
                   -> PgUpdateReturning a
                   -> (CONDUIT_TRANSFORMER () a m () -> m b)
                   -> m b
runUpdateReturning :: forall (m :: * -> *) a b.
(MonadIO m, MonadFail m, MonadBaseControl IO m,
 FromBackendRow Postgres a) =>
Connection
-> PgUpdateReturning a -> (ConduitT () a m () -> m b) -> m b
runUpdateReturning Connection
_ PgUpdateReturning a
PgUpdateReturningEmpty ConduitT () a m () -> m b
withSrc = ConduitT () a m () -> m b
withSrc (forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
runUpdateReturning Connection
conn (PgUpdateReturning PgSyntax
u) ConduitT () a m () -> m b
withSrc =
  forall (m :: * -> *) r b.
(MonadIO m, MonadFail m, MonadBaseControl IO m, Functor m,
 FromBackendRow Postgres r) =>
Connection -> PgSyntax -> (ConduitT () r m () -> m b) -> m b
runQueryReturning Connection
conn PgSyntax
u ConduitT () a m () -> m b
withSrc
{-# DEPRECATED runUpdateReturning "Use streamingRunUpdateReturning" #-}

-- * @DELETE@

-- | Run a PostgreSQL @DELETE@ statement in any 'MonadIO'. Returns the number of
-- rows affected.
runDelete :: MonadIO m
          => Pg.Connection -> SqlDelete Postgres tbl
          -> m Int64
runDelete :: forall (m :: * -> *) (tbl :: (* -> *) -> *).
MonadIO m =>
Connection -> SqlDelete Postgres tbl -> m Int64
runDelete Connection
conn (SqlDelete TableSettings tbl
_ (PgDeleteSyntax PgSyntax
d)) =
    forall (m :: * -> *).
MonadIO m =>
Connection -> PgSyntax -> m Int64
executeStatement Connection
conn PgSyntax
d

-- | Run a PostgreSQl @DELETE ... RETURNING ...@ statement in any
-- 'C.MonadResource' and get a 'C.Source' of the deleted rows.
streamingRunDeleteReturning :: ( C.MonadResource m, Fail.MonadFail m, FromBackendRow Postgres a )
                            => Pg.Connection -> PgDeleteReturning a
                            -> CONDUIT_TRANSFORMER () a m ()
streamingRunDeleteReturning :: forall (m :: * -> *) a.
(MonadResource m, MonadFail m, FromBackendRow Postgres a) =>
Connection -> PgDeleteReturning a -> ConduitT () a m ()
streamingRunDeleteReturning Connection
conn (PgDeleteReturning PgSyntax
d) =
  forall (m :: * -> *) r.
(MonadResource m, MonadFail m, FromBackendRow Postgres r) =>
Connection -> PgSyntax -> ConduitT () r m ()
streamingRunQueryReturning Connection
conn PgSyntax
d

-- | Run a PostgreSQl @DELETE ... RETURNING ...@ statement in any
-- 'MonadIO' and get a 'C.Source' of the deleted rows.
runDeleteReturning :: ( MonadIO m, Fail.MonadFail m, MonadBaseControl IO m, FromBackendRow Postgres a )
                   => Pg.Connection -> PgDeleteReturning a
                   -> (CONDUIT_TRANSFORMER () a m () -> m b) -> m b
runDeleteReturning :: forall (m :: * -> *) a b.
(MonadIO m, MonadFail m, MonadBaseControl IO m,
 FromBackendRow Postgres a) =>
Connection
-> PgDeleteReturning a -> (ConduitT () a m () -> m b) -> m b
runDeleteReturning Connection
conn (PgDeleteReturning PgSyntax
d) ConduitT () a m () -> m b
withSrc =
  forall (m :: * -> *) r b.
(MonadIO m, MonadFail m, MonadBaseControl IO m, Functor m,
 FromBackendRow Postgres r) =>
Connection -> PgSyntax -> (ConduitT () r m () -> m b) -> m b
runQueryReturning Connection
conn PgSyntax
d ConduitT () a m () -> m b
withSrc
{-# DEPRECATED runDeleteReturning "Use streamingRunDeleteReturning" #-}

-- * Convenience functions

-- | Run any DML statement. Return the number of rows affected
executeStatement ::  MonadIO m => Pg.Connection -> PgSyntax -> m Int64
executeStatement :: forall (m :: * -> *).
MonadIO m =>
Connection -> PgSyntax -> m Int64
executeStatement Connection
conn PgSyntax
x =
  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
    ByteString
syntax <- Connection -> PgSyntax -> IO ByteString
pgRenderSyntax Connection
conn PgSyntax
x
    Connection -> Query -> IO Int64
Pg.execute_ Connection
conn (ByteString -> Query
Pg.Query ByteString
syntax)


-- | Runs any query that returns a set of values
streamingRunQueryReturning
  :: ( C.MonadResource m, Fail.MonadFail m, FromBackendRow Postgres r )
  => Pg.Connection -> PgSyntax
  -> CONDUIT_TRANSFORMER () r m ()
streamingRunQueryReturning :: forall (m :: * -> *) r.
(MonadResource m, MonadFail m, FromBackendRow Postgres r) =>
Connection -> PgSyntax -> ConduitT () r m ()
streamingRunQueryReturning (conn :: Connection
conn@Pg.Connection {MVar Connection
connectionHandle :: MVar Connection
connectionHandle :: Connection -> MVar Connection
connectionHandle}) PgSyntax
x = do
  ByteString
syntax <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Connection -> PgSyntax -> IO ByteString
pgRenderSyntax Connection
conn PgSyntax
x
  -- We need to own the connection for the duration of the conduit's
  -- lifetime, since it will be in a streaming state until we clean up
  forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
C.bracketP
    (forall a. MVar a -> IO a
takeMVar MVar Connection
connectionHandle)
    (forall a. MVar a -> a -> IO ()
putMVar MVar Connection
connectionHandle)
    (\Connection
conn' -> do
      Bool
success <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
        if Connection -> Bool
Pg.isNullConnection Connection
conn'
        then forall e a. Exception e => e -> IO a
throwIO SqlError
Pg.disconnectedError
        else Connection -> ByteString -> IO Bool
Pg.sendQuery Connection
conn' ByteString
syntax

      if Bool
success
        then do
          Bool
singleRowModeSet <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Connection -> IO Bool
Pg.setSingleRowMode Connection
conn'
          if Bool
singleRowModeSet
            then
              forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
C.bracketP
                (forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
                (\()
_ -> Connection -> IO ()
gracefulShutdown Connection
conn')
                (\()
_ -> forall (m :: * -> *) r i.
(MonadFail m, FromBackendRow Postgres r, MonadIO m) =>
Connection -> Connection -> Maybe [Field] -> ConduitT i r m ()
streamResults Connection
conn Connection
conn' forall a. Maybe a
Nothing)
            else forall (m :: * -> *) a. MonadFail m => String -> m a
Fail.fail String
"Could not enable single row mode"
        else do
          ByteString
errMsg <- forall a. a -> Maybe a -> a
fromMaybe ByteString
"No libpq error provided" forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Connection -> IO (Maybe ByteString)
Pg.errorMessage Connection
conn')
          forall (m :: * -> *) a. MonadFail m => String -> m a
Fail.fail (forall a. Show a => a -> String
show ByteString
errMsg))

streamResults :: (Fail.MonadFail m, FromBackendRow Postgres r, MonadIO m) => Pg.Connection -> Pq.Connection -> Maybe [Pg.Field] -> C.ConduitT i r m ()
streamResults :: forall (m :: * -> *) r i.
(MonadFail m, FromBackendRow Postgres r, MonadIO m) =>
Connection -> Connection -> Maybe [Field] -> ConduitT i r m ()
streamResults (conn :: Connection
conn@Pg.Connection {MVar Connection
connectionHandle :: MVar Connection
connectionHandle :: Connection -> MVar Connection
connectionHandle}) Connection
conn' Maybe [Field]
fields = do
  Maybe Result
nextRow <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Connection -> IO (Maybe Result)
Pg.getResult Connection
conn')
  case Maybe Result
nextRow of
    Maybe Result
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just Result
row ->
      forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Result -> IO ExecStatus
Pg.resultStatus Result
row) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
      \case
        ExecStatus
Pg.SingleTuple ->
          do [Field]
fields' <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Result -> IO [Field]
getFields Result
row) forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe [Field]
fields)
             Either BeamRowReadError r
parsedRow <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
               (forall a. MVar a -> a -> IO ()
putMVar MVar Connection
connectionHandle Connection
conn')
               (\()
_ -> forall a. MVar a -> IO a
takeMVar MVar Connection
connectionHandle)
               (\()
_ -> forall a.
Connection
-> Row
-> Result
-> [Field]
-> FromBackendRowM Postgres a
-> IO (Either BeamRowReadError a)
runPgRowReader Connection
conn Row
0 Result
row [Field]
fields' forall be a. FromBackendRow be a => FromBackendRowM be a
fromBackendRow)
             case Either BeamRowReadError r
parsedRow of
               Left BeamRowReadError
err -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a. Connection -> Result -> String -> IO a
bailEarly Connection
conn' Result
row (String
"Could not read row: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show BeamRowReadError
err))
               Right r
parsedRow' ->
                 do forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield r
parsedRow'
                    forall (m :: * -> *) r i.
(MonadFail m, FromBackendRow Postgres r, MonadIO m) =>
Connection -> Connection -> Maybe [Field] -> ConduitT i r m ()
streamResults Connection
conn Connection
conn' (forall a. a -> Maybe a
Just [Field]
fields')
        ExecStatus
Pg.TuplesOk -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Connection -> IO ()
finishQuery Connection
conn')
        ExecStatus
Pg.EmptyQuery -> forall (m :: * -> *) a. MonadFail m => String -> m a
Fail.fail String
"No query"
        ExecStatus
Pg.CommandOk -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        status :: ExecStatus
status@ExecStatus
Pg.BadResponse -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a. ByteString -> Result -> ExecStatus -> IO a
Pg.throwResultError ByteString
"streamResults" Result
row ExecStatus
status)
        status :: ExecStatus
status@ExecStatus
Pg.NonfatalError -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a. ByteString -> Result -> ExecStatus -> IO a
Pg.throwResultError ByteString
"streamResults" Result
row ExecStatus
status)
        status :: ExecStatus
status@ExecStatus
Pg.FatalError -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a. ByteString -> Result -> ExecStatus -> IO a
Pg.throwResultError ByteString
"streamResults" Result
row ExecStatus
status)
        ExecStatus
_ -> do Maybe ByteString
errMsg <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Result -> IO (Maybe ByteString)
Pg.resultErrorMessage Result
row)
                forall (m :: * -> *) a. MonadFail m => String -> m a
Fail.fail (String
"Postgres error: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Maybe ByteString
errMsg)

bailEarly :: Pq.Connection -> Pg.Result -> String -> IO a
bailEarly :: forall a. Connection -> Result -> String -> IO a
bailEarly Connection
conn' Result
row String
errorString = do
  Result -> IO ()
Pg.unsafeFreeResult Result
row
  Connection -> IO ()
cancelQuery Connection
conn'
  forall (m :: * -> *) a. MonadFail m => String -> m a
Fail.fail String
errorString

cancelQuery :: Pq.Connection -> IO ()
cancelQuery :: Connection -> IO ()
cancelQuery Connection
conn' = do
  Maybe Cancel
cancel <- Connection -> IO (Maybe Cancel)
Pg.getCancel Connection
conn'
  case Maybe Cancel
cancel of
    Maybe Cancel
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just Cancel
cancel' -> do
      Either ByteString ()
res <- Cancel -> IO (Either ByteString ())
Pg.cancel Cancel
cancel'
      case Either ByteString ()
res of
        Right () -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Connection -> IO ()
finishQuery Connection
conn')
        Left ByteString
err -> forall (m :: * -> *) a. MonadFail m => String -> m a
Fail.fail (String
"Could not cancel: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show ByteString
err)

finishQuery :: Pq.Connection -> IO ()
finishQuery :: Connection -> IO ()
finishQuery Connection
conn' = do
  Maybe Result
nextRow <- Connection -> IO (Maybe Result)
Pg.getResult Connection
conn'
  case Maybe Result
nextRow of
    Maybe Result
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Just Result
_ -> Connection -> IO ()
finishQuery Connection
conn'

gracefulShutdown :: Pq.Connection -> IO ()
gracefulShutdown :: Connection -> IO ()
gracefulShutdown Connection
conn' = do
  TransactionStatus
sts <- Connection -> IO TransactionStatus
Pg.transactionStatus Connection
conn'
  case TransactionStatus
sts of
    TransactionStatus
Pg.TransIdle -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    TransactionStatus
Pg.TransInTrans -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    TransactionStatus
Pg.TransInError -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    TransactionStatus
Pg.TransUnknown -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    TransactionStatus
Pg.TransActive -> Connection -> IO ()
cancelQuery Connection
conn'

-- | Runs any query that returns a set of values
runQueryReturning
  :: ( MonadIO m, Fail.MonadFail m, MonadBaseControl IO m, Functor m, FromBackendRow Postgres r )
  => Pg.Connection -> PgSyntax
  -> (CONDUIT_TRANSFORMER () r m () -> m b)
  -> m b
runQueryReturning :: forall (m :: * -> *) r b.
(MonadIO m, MonadFail m, MonadBaseControl IO m, Functor m,
 FromBackendRow Postgres r) =>
Connection -> PgSyntax -> (ConduitT () r m () -> m b) -> m b
runQueryReturning (conn :: Connection
conn@Pg.Connection {MVar Connection
connectionHandle :: MVar Connection
connectionHandle :: Connection -> MVar Connection
connectionHandle}) PgSyntax
x ConduitT () r m () -> m b
withSrc = do
  ByteString
syntax <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Connection -> PgSyntax -> IO ByteString
pgRenderSyntax Connection
conn PgSyntax
x

  forall (m :: * -> *) a b c.
MonadBaseControl IO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
Lifted.bracket
    (forall (m :: * -> *) a. MonadBase IO m => MVar a -> m a
Lifted.takeMVar MVar Connection
connectionHandle)
    (forall (m :: * -> *) a. MonadBase IO m => MVar a -> a -> m ()
Lifted.putMVar MVar Connection
connectionHandle)
    (\Connection
conn' -> do
      Bool
success <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Connection -> ByteString -> IO Bool
Pg.sendQuery Connection
conn' ByteString
syntax
      if Bool
success
        then do
          Bool
singleRowModeSet <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Connection -> IO Bool
Pg.setSingleRowMode Connection
conn')
          if Bool
singleRowModeSet
             then ConduitT () r m () -> m b
withSrc (forall (m :: * -> *) r i.
(MonadFail m, FromBackendRow Postgres r, MonadIO m) =>
Connection -> Connection -> Maybe [Field] -> ConduitT i r m ()
streamResults Connection
conn Connection
conn' forall a. Maybe a
Nothing) forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`finally` (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Connection -> IO ()
gracefulShutdown Connection
conn')
             else forall (m :: * -> *) a. MonadFail m => String -> m a
Fail.fail String
"Could not enable single row mode"
        else do
          ByteString
errMsg <- forall a. a -> Maybe a -> a
fromMaybe ByteString
"No libpq error provided" forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Connection -> IO (Maybe ByteString)
Pg.errorMessage Connection
conn')
          forall (m :: * -> *) a. MonadFail m => String -> m a
Fail.fail (forall a. Show a => a -> String
show ByteString
errMsg))
{-# DEPRECATED runQueryReturning "Use streamingRunQueryReturning" #-}