{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE UndecidableInstances #-}
module Mealstrom.PostgresJSONStore(
PostgresJSONStore,
mkStore,
_fsmRead,
_fsmCreate,
_fsmUpdate,
_batchConversion
) where
import Control.Exception (handle,SomeException)
import Control.Monad (void)
import Database.PostgreSQL.Simple as PGS
import Database.PostgreSQL.Simple.FromRow
import Database.PostgreSQL.Simple.ToField
import Database.PostgreSQL.Simple.Transaction
import Database.PostgreSQL.Simple.Types
import Data.Aeson
import qualified Data.ByteString.Char8 as DBSC8
import Data.Int (Int64)
import Data.Maybe (listToMaybe)
import Data.Pool
import Data.Text
import Data.Time
import Data.Typeable hiding (Proxy)
import GHC.Generics
import Database.PostgreSQL.Simple.FromField (FromField (fromField),
fromJSONField,
Conversion)
import Mealstrom.FSM
import Mealstrom.FSMStore
import Mealstrom.WALStore
data PostgresJSONStore = PostgresJSONStore {
storeConnPool :: Pool Connection,
storeName :: Text
}
instance (FromJSON s, FromJSON e, FromJSON a,
ToJSON s, ToJSON e, ToJSON a,
Typeable s, Typeable e, Typeable a,
MealyInstance k s e a) => FSMStore PostgresJSONStore k s e a where
fsmRead st k p = Mealstrom.PostgresJSONStore._fsmRead st k p >>= \mi -> return $ fmap (currState . machine) mi
fsmCreate = Mealstrom.PostgresJSONStore._fsmCreate
fsmUpdate = Mealstrom.PostgresJSONStore._fsmUpdate
instance (FSMKey k) => WALStore PostgresJSONStore k where
walUpsertIncrement = Mealstrom.PostgresJSONStore.walUpsertIncrement
walDecrement = Mealstrom.PostgresJSONStore.walDecrement
walScan = Mealstrom.PostgresJSONStore.walScan
givePool :: IO Connection -> IO (Pool Connection)
givePool creator = createPool creator close 1 10 20
_fsmRead :: (FromJSON s, FromJSON e, FromJSON a,
Typeable s, Typeable e, Typeable a,
MealyInstance k s e a) =>
PostgresJSONStore ->
k ->
Proxy k s e a -> IO (Maybe (Instance k s e a))
_fsmRead st k _p =
withResource (storeConnPool st) (\conn ->
withTransactionSerializable conn $ do
el <- _getValue conn (storeName st) (toText k)
return $ listToMaybe el)
_fsmCreate :: forall k s e a .
(ToJSON s, ToJSON e, ToJSON a,
Typeable s, Typeable e, Typeable a,
MealyInstance k s e a) =>
PostgresJSONStore ->
Instance k s e a -> IO (Maybe String)
_fsmCreate st i =
handle (\(e::SomeException) -> return $ Just (show e))
(withResource (storeConnPool st) (\conn ->
withTransactionSerializable conn $ do
void $ _postValue conn (storeName st) (toText $ key i) (machine i)
return Nothing))
_fsmUpdate :: forall k s e a .
(FromJSON s, FromJSON e, FromJSON a,
ToJSON s, ToJSON e, ToJSON a,
Typeable s, Typeable e, Typeable a,
MealyInstance k s e a) =>
PostgresJSONStore ->
k ->
MachineTransformer s e a -> IO MealyStatus
_fsmUpdate st k t =
withResource (storeConnPool st) (\conn ->
withTransactionSerializable conn $ do
el <- _getValueForUpdate conn (storeName st) (toText k) :: IO [Instance k s e a]
let entry = listToMaybe el
maybe
(return MealyError)
(\e -> do
newMachine <- t (machine e)
void (_postOrUpdateValue conn (storeName st) (toText k) newMachine)
return $ if Prelude.null (outbox newMachine) then Done else Pending)
entry)
_createWalTable :: Connection -> Text -> IO Int64
_createWalTable conn name =
PGS.execute conn "CREATE TABLE IF NOT EXISTS ? ( id TEXT PRIMARY KEY, date timestamptz NOT NULL, count int NOT NULL )" (Only (Identifier name))
walUpsertIncrement :: (FSMKey k) => PostgresJSONStore -> k -> IO ()
walUpsertIncrement st i =
_walExecute st i _walIncrement
walDecrement :: (FSMKey k) => PostgresJSONStore -> k -> IO ()
walDecrement st i =
_walExecute st i _walDecrement
_walExecute :: (FSMKey k) => PostgresJSONStore -> k -> Query -> IO ()
_walExecute st k q = let tbl = append (storeName st) "Wal" in
withResource (storeConnPool st) (\conn ->
withTransactionSerializable conn $ do
now <- getCurrentTime
void $ PGS.execute conn q (Identifier tbl, toText k, now, Identifier tbl))
_walIncrement :: Query
_walIncrement = "INSERT INTO ? VALUES (?,?,1) ON CONFLICT (id) DO UPDATE SET count = ?.count + 1, date = EXCLUDED.date"
_walDecrement :: Query
_walDecrement = "INSERT INTO ? VALUES (?,?,0) ON CONFLICT (id) DO UPDATE SET count = ?.count - 1"
walScan :: (FSMKey k) => PostgresJSONStore -> Int -> IO [WALEntry k]
walScan st cutoff = do
t <- getCurrentTime
let xx = addUTCTime (negate (fromInteger (toInteger cutoff) :: NominalDiffTime)) t
withResource (storeConnPool st) (\c ->
withTransactionSerializable c $
PGS.query c "SELECT * FROM ? WHERE date < ? AND count > 0" (Identifier $ append (storeName st) "Wal", xx))
mkStore :: String -> Text -> IO PostgresJSONStore
mkStore connStr name =
let
connBS = DBSC8.pack connStr
in do
pool <- givePool (PGS.connectPostgreSQL connBS)
_ <- withResource pool $ flip _createFsmTable name
_ <- withResource pool $ flip _createWalTable (append name "Wal")
return $ PostgresJSONStore pool name
_createFsmTable :: Connection -> Text -> IO Int64
_createFsmTable conn name =
PGS.execute conn "CREATE TABLE IF NOT EXISTS ? ( id text PRIMARY KEY, data jsonb NOT NULL)" (Only (Identifier name))
_getValue :: (FromRow v) => Connection -> Text -> Text -> IO [v]
_getValue c tbl k =
PGS.query c "SELECT * FROM ? WHERE id = ?" (Identifier tbl, k)
_getValueForUpdate :: (FromRow v) => Connection -> Text -> Text -> IO [v]
_getValueForUpdate c tbl k =
PGS.query c "SELECT * FROM ? WHERE id = ? FOR UPDATE" (Identifier tbl, k)
_postOrUpdateValue :: (ToField v) => Connection -> Text -> Text -> v -> IO Int64
_postOrUpdateValue c tbl k v =
PGS.execute c "INSERT INTO ? VALUES (?,?) ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data" (Identifier tbl, k, v)
_postValue :: (ToField v) => Connection -> Text -> Text -> v -> IO Int64
_postValue c tbl k v =
PGS.execute c "INSERT INTO ? VALUES (?,?)" (Identifier tbl, k, v)
_deleteValue :: (ToField k) => Connection -> Text -> k -> IO Int64
_deleteValue c tbl k =
PGS.execute c "DELETE FROM ? WHERE id = ?" (Identifier tbl, k)
_queryValue :: (FromRow v) => Connection -> Text -> Text -> IO [v]
_queryValue c tbl q =
PGS.query c "SELECT * FROM ? WHERE data @> ?" (Identifier tbl, q)
_getKeys :: forall k . (FSMKey k) => PostgresJSONStore -> Text -> IO [k]
_getKeys st tbl =
withResource (storeConnPool st) (\conn -> do
keys <- PGS.query conn "SELECT id FROM ?" (Only (Identifier tbl)) :: IO [Only Text]
return (fmap (\(Only t) -> fromText t) keys :: [k]))
_batchConversion :: forall k s e a .
(FromJSON s, FromJSON e, FromJSON a,
ToJSON s, ToJSON e, ToJSON a,
Typeable s, Typeable e, Typeable a, MealyInstance k s e a)
=> PostgresJSONStore
-> Text
-> Proxy k s e a
-> IO ()
_batchConversion st tbl _p = do
keys <- _getKeys st tbl :: IO [k]
mapM_ (\k -> _fsmUpdate st k (return :: MachineTransformer s e a)) keys
instance (ToJSON s, ToJSON e, ToJSON a) => ToJSON (Machine s e a)
instance (FromJSON s, FromJSON e, FromJSON a) => FromJSON (Machine s e a)
instance (ToJSON e) => ToJSON (Msg e)
instance (FromJSON e) => FromJSON (Msg e)
instance (Typeable s, Typeable e, Typeable a,
FromJSON s, FromJSON e, FromJSON a, FSMKey k) => FromRow (Instance k s e a) where
fromRow = Instance <$> field <*> field
instance (Typeable s, Typeable e, Typeable a,
FromJSON s, FromJSON e, FromJSON a) => FromField (Machine s e a) where
fromField = fromJSONField
instance (Typeable s, Typeable e, Typeable a,
ToJSON s, ToJSON e, ToJSON a) => ToField (Machine s e a) where
toField = toJSONField
instance {-# OVERLAPS #-} (FSMKey k) => ToField k where
toField k = toField (toText k)
instance {-# OVERLAPS #-} (FSMKey k) => FromField k where
fromField f mdata = fmap fromText (fromField f mdata :: Conversion Text)
instance (FSMKey k) => FromRow (WALEntry k) where
fromRow = WALEntry <$> field <*> field <*> field
deriving instance (FSMKey k) => Generic (WALEntry k)
deriving instance (FSMKey k) => Typeable (WALEntry k)