module Hasql.Notifications
( notifyPool
, notify
, listen
, unlisten
, waitForNotifications
, PgIdentifier
, toPgIdentifier
, fromPgIdentifier
) where
import Hasql.Pool (Pool, UsageError, use)
import Hasql.Session (sql, run, statement)
import qualified Hasql.Session as S
import qualified Hasql.Statement as HST
import Hasql.Connection (Connection, withLibPQConnection)
import qualified Hasql.Decoders as HD
import qualified Hasql.Encoders as HE
import qualified Database.PostgreSQL.LibPQ as PQ
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.ByteString.Char8 (ByteString)
import Data.Functor.Contravariant (contramap)
import Control.Monad (void, forever)
import Control.Concurrent (threadWaitRead)
import Control.Exception (Exception, throw)
newtype PgIdentifier = PgIdentifier Text deriving (Int -> PgIdentifier -> ShowS
[PgIdentifier] -> ShowS
PgIdentifier -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [PgIdentifier] -> ShowS
$cshowList :: [PgIdentifier] -> ShowS
show :: PgIdentifier -> String
$cshow :: PgIdentifier -> String
showsPrec :: Int -> PgIdentifier -> ShowS
$cshowsPrec :: Int -> PgIdentifier -> ShowS
Show)
newtype FatalError = FatalError { FatalError -> String
fatalErrorMessage :: String }
deriving (Int -> FatalError -> ShowS
[FatalError] -> ShowS
FatalError -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [FatalError] -> ShowS
$cshowList :: [FatalError] -> ShowS
show :: FatalError -> String
$cshow :: FatalError -> String
showsPrec :: Int -> FatalError -> ShowS
$cshowsPrec :: Int -> FatalError -> ShowS
Show)
instance Exception FatalError
fromPgIdentifier :: PgIdentifier -> Text
fromPgIdentifier :: PgIdentifier -> Text
fromPgIdentifier (PgIdentifier Text
bs) = Text
bs
toPgIdentifier :: Text -> PgIdentifier
toPgIdentifier :: Text -> PgIdentifier
toPgIdentifier Text
x =
Text -> PgIdentifier
PgIdentifier forall a b. (a -> b) -> a -> b
$ Text
"\"" forall a. Semigroup a => a -> a -> a
<> Text -> Text
strictlyReplaceQuotes Text
x forall a. Semigroup a => a -> a -> a
<> Text
"\""
where
strictlyReplaceQuotes :: Text -> Text
strictlyReplaceQuotes :: Text -> Text
strictlyReplaceQuotes = Text -> Text -> Text -> Text
T.replace Text
"\"" (Text
"\"\"" :: Text)
notifyPool :: Pool
-> Text
-> Text
-> IO (Either UsageError ())
notifyPool :: Pool -> Text -> Text -> IO (Either UsageError ())
notifyPool Pool
pool Text
channel Text
mesg =
forall a. Pool -> Session a -> IO (Either UsageError a)
use Pool
pool (forall params result.
params -> Statement params result -> Session result
statement (Text
channel, Text
mesg) Statement (Text, Text) ()
callStatement)
where
callStatement :: Statement (Text, Text) ()
callStatement = forall a b.
ByteString -> Params a -> Result b -> Bool -> Statement a b
HST.Statement (ByteString
"SELECT pg_notify" forall a. Semigroup a => a -> a -> a
<> ByteString
"($1, $2)") Params (Text, Text)
encoder Result ()
HD.noResult Bool
False
encoder :: Params (Text, Text)
encoder = forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap forall a b. (a, b) -> a
fst (forall a. NullableOrNot Value a -> Params a
HE.param forall a b. (a -> b) -> a -> b
$ forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
HE.nonNullable Value Text
HE.text) forall a. Semigroup a => a -> a -> a
<> forall (f :: * -> *) a' a.
Contravariant f =>
(a' -> a) -> f a -> f a'
contramap forall a b. (a, b) -> b
snd (forall a. NullableOrNot Value a -> Params a
HE.param forall a b. (a -> b) -> a -> b
$ forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a
HE.nonNullable Value Text
HE.text)
notify :: Connection
-> PgIdentifier
-> Text
-> IO (Either S.QueryError ())
notify :: Connection -> PgIdentifier -> Text -> IO (Either QueryError ())
notify Connection
con PgIdentifier
channel Text
mesg =
forall a. Session a -> Connection -> IO (Either QueryError a)
run (ByteString -> Session ()
sql forall a b. (a -> b) -> a -> b
$ Text -> ByteString
T.encodeUtf8 (Text
"NOTIFY " forall a. Semigroup a => a -> a -> a
<> PgIdentifier -> Text
fromPgIdentifier PgIdentifier
channel forall a. Semigroup a => a -> a -> a
<> Text
", '" forall a. Semigroup a => a -> a -> a
<> Text
mesg forall a. Semigroup a => a -> a -> a
<> Text
"'")) Connection
con
listen :: Connection
-> PgIdentifier
-> IO ()
listen :: Connection -> PgIdentifier -> IO ()
listen Connection
con PgIdentifier
channel =
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Connection -> (Connection -> IO a) -> IO a
withLibPQConnection Connection
con Connection -> IO ()
execListen
where
execListen :: Connection -> IO ()
execListen Connection
pqCon = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Connection -> ByteString -> IO (Maybe Result)
PQ.exec Connection
pqCon forall a b. (a -> b) -> a -> b
$ Text -> ByteString
T.encodeUtf8 forall a b. (a -> b) -> a -> b
$ Text
"LISTEN " forall a. Semigroup a => a -> a -> a
<> PgIdentifier -> Text
fromPgIdentifier PgIdentifier
channel
unlisten :: Connection
-> PgIdentifier
-> IO ()
unlisten :: Connection -> PgIdentifier -> IO ()
unlisten Connection
con PgIdentifier
channel =
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Connection -> (Connection -> IO a) -> IO a
withLibPQConnection Connection
con Connection -> IO ()
execListen
where
execListen :: Connection -> IO ()
execListen Connection
pqCon = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Connection -> ByteString -> IO (Maybe Result)
PQ.exec Connection
pqCon forall a b. (a -> b) -> a -> b
$ Text -> ByteString
T.encodeUtf8 forall a b. (a -> b) -> a -> b
$ Text
"UNLISTEN " forall a. Semigroup a => a -> a -> a
<> PgIdentifier -> Text
fromPgIdentifier PgIdentifier
channel
waitForNotifications :: (ByteString -> ByteString -> IO())
-> Connection
-> IO ()
waitForNotifications :: (ByteString -> ByteString -> IO ()) -> Connection -> IO ()
waitForNotifications ByteString -> ByteString -> IO ()
sendNotification Connection
con =
forall a. Connection -> (Connection -> IO a) -> IO a
withLibPQConnection Connection
con forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> IO ()
pqFetch
where
pqFetch :: Connection -> IO ()
pqFetch Connection
pqCon = do
Maybe Notify
mNotification <- Connection -> IO (Maybe Notify)
PQ.notifies Connection
pqCon
case Maybe Notify
mNotification of
Maybe Notify
Nothing -> do
Maybe Fd
mfd <- Connection -> IO (Maybe Fd)
PQ.socket Connection
pqCon
case Maybe Fd
mfd of
Maybe Fd
Nothing -> forall a. String -> a
panic String
"Error checking for PostgreSQL notifications"
Just Fd
fd -> do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Fd -> IO ()
threadWaitRead Fd
fd
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Connection -> IO Bool
PQ.consumeInput Connection
pqCon
Just Notify
notification ->
ByteString -> ByteString -> IO ()
sendNotification (Notify -> ByteString
PQ.notifyRelname Notify
notification) (Notify -> ByteString
PQ.notifyExtra Notify
notification)
panic :: String -> a
panic :: forall a. String -> a
panic String
a = forall a e. Exception e => e -> a
throw (String -> FatalError
FatalError String
a)