module Haxl.Core.Monad (
GenHaxl (..), runHaxl,
env,
Env(..), caches, initEnvWithData, initEnv, emptyEnv,
throw, catch, catchIf, try, tryToHaxlException,
dataFetch, uncachedRequest,
cacheRequest, cacheResult, cachedComputation,
dumpCacheAsHaskell,
unsafeLiftIO, unsafeToHaxlException,
) where
import Haxl.Core.Types
import Haxl.Core.Show1
import Haxl.Core.StateStore
import Haxl.Core.Exception
import Haxl.Core.RequestStore
import Haxl.Core.Util
import Haxl.Core.DataCache as DataCache
import qualified Data.Text as Text
import Control.Exception (Exception(..), SomeException)
#if __GLASGOW_HASKELL__ >= 708
import Control.Exception (SomeAsyncException(..))
#endif
#if __GLASGOW_HASKELL__ >= 710
import Control.Exception (AllocationLimitExceeded(..))
#endif
import Control.Monad
import qualified Control.Exception as Exception
import Control.Applicative hiding (Const)
import GHC.Exts (IsString(..))
#if __GLASGOW_HASKELL__ < 706
import Prelude hiding (catch)
#endif
import Data.IORef
import Data.List
import Data.Monoid
import Data.Time
import qualified Data.HashMap.Strict as HashMap
import Text.Printf
import Text.PrettyPrint hiding ((<>))
import Control.Arrow (left)
#ifdef EVENTLOG
import Control.Exception (bracket_)
import Debug.Trace (traceEventIO)
#endif
data Env u = Env
{ cacheRef :: IORef (DataCache ResultVar)
, memoRef :: IORef (DataCache (MemoVar u))
, flags :: Flags
, userEnv :: u
, statsRef :: IORef Stats
, states :: StateStore
}
type Caches u = (IORef (DataCache ResultVar), IORef (DataCache (MemoVar u)))
caches :: Env u -> Caches u
caches env = (cacheRef env, memoRef env)
initEnvWithData :: StateStore -> u -> Caches u -> IO (Env u)
initEnvWithData states e (cref, mref) = do
sref <- newIORef emptyStats
return Env
{ cacheRef = cref
, memoRef = mref
, flags = defaultFlags
, userEnv = e
, states = states
, statsRef = sref
}
initEnv :: StateStore -> u -> IO (Env u)
initEnv states e = do
cref <- newIORef DataCache.empty
mref <- newIORef DataCache.empty
initEnvWithData states e (cref,mref)
emptyEnv :: u -> IO (Env u)
emptyEnv = initEnv stateEmpty
newtype GenHaxl u a = GenHaxl
{ unHaxl :: Env u -> IORef (RequestStore u) -> IO (Result u a) }
data Result u a
= Done a
| Throw SomeException
| Blocked (GenHaxl u a)
instance (Show a) => Show (Result u a) where
show (Done a) = printf "Done(%s)" $ show a
show (Throw e) = printf "Throw(%s)" $ show e
show Blocked{} = "Blocked"
instance Monad (GenHaxl u) where
return a = GenHaxl $ \_env _ref -> return (Done a)
GenHaxl m >>= k = GenHaxl $ \env ref -> do
e <- m env ref
case e of
Done a -> unHaxl (k a) env ref
Throw e -> return (Throw e)
Blocked cont -> return (Blocked (cont >>= k))
instance Functor (GenHaxl u) where
fmap f m = pure f <*> m
instance Applicative (GenHaxl u) where
pure = return
GenHaxl f <*> GenHaxl a = GenHaxl $ \env ref -> do
r <- f env ref
case r of
Throw e -> return (Throw e)
Done f' -> do
ra <- a env ref
case ra of
Done a' -> return (Done (f' a'))
Throw e -> return (Throw e)
Blocked a' -> return (Blocked (f' <$> a'))
Blocked f' -> do
ra <- a env ref
case ra of
Done a' -> return (Blocked (f' <*> return a'))
Throw e -> return (Blocked (f' <*> throw e))
Blocked a' -> return (Blocked (f' <*> a'))
runHaxl :: Env u -> GenHaxl u a -> IO a
#ifdef EVENTLOG
runHaxl env h = do
let go !n env (GenHaxl haxl) = do
traceEventIO "START computation"
ref <- newIORef noRequests
e <- haxl env ref
traceEventIO "STOP computation"
case e of
Done a -> return a
Throw e -> Exception.throw e
Blocked cont -> do
bs <- readIORef ref
writeIORef ref noRequests
traceEventIO "START performFetches"
n' <- performFetches n env bs
traceEventIO "STOP performFetches"
go n' env cont
traceEventIO "START runHaxl"
r <- go 0 env h
traceEventIO "STOP runHaxl"
return r
#else
runHaxl env (GenHaxl haxl) = do
ref <- newIORef noRequests
e <- haxl env ref
case e of
Done a -> return a
Throw e -> Exception.throw e
Blocked cont -> do
bs <- readIORef ref
writeIORef ref noRequests
void (performFetches 0 env bs)
runHaxl env cont
#endif
env :: (Env u -> a) -> GenHaxl u a
env f = GenHaxl $ \env _ref -> return (Done (f env))
throw :: (Exception e) => e -> GenHaxl u a
throw e = GenHaxl $ \_env _ref -> raise e
raise :: (Exception e) => e -> IO (Result u a)
raise = return . Throw . toException
catch :: Exception e => GenHaxl u a -> (e -> GenHaxl u a) -> GenHaxl u a
catch (GenHaxl m) h = GenHaxl $ \env ref -> do
r <- m env ref
case r of
Done a -> return (Done a)
Throw e | Just e' <- fromException e -> unHaxl (h e') env ref
| otherwise -> return (Throw e)
Blocked k -> return (Blocked (catch k h))
catchIf
:: Exception e => (e -> Bool) -> GenHaxl u a -> (e -> GenHaxl u a)
-> GenHaxl u a
catchIf cond haxl handler =
catch haxl $ \e -> if cond e then handler e else throw e
try :: Exception e => GenHaxl u a -> GenHaxl u (Either e a)
try haxl = (Right <$> haxl) `catch` (return . Left)
unsafeLiftIO :: IO a -> GenHaxl u a
unsafeLiftIO m = GenHaxl $ \_env _ref -> Done <$> m
unsafeToHaxlException :: GenHaxl u a -> GenHaxl u a
unsafeToHaxlException (GenHaxl m) = GenHaxl $ \env ref -> do
r <- m env ref `Exception.catch` \e -> return (Throw e)
case r of
Blocked c -> return (Blocked (unsafeToHaxlException c))
other -> return other
tryToHaxlException :: GenHaxl u a -> GenHaxl u (Either HaxlException a)
tryToHaxlException h = left asHaxlException <$> try (unsafeToHaxlException h)
data CacheResult a
= Uncached (ResultVar a)
| CachedNotFetched (ResultVar a)
| Cached (Either SomeException a)
cached :: (Request r a) => Env u -> r a -> IO (CacheResult a)
cached env req = do
cache <- readIORef (cacheRef env)
let
do_fetch = do
rvar <- newEmptyResult
writeIORef (cacheRef env) $! DataCache.insert req rvar cache
return (Uncached rvar)
case DataCache.lookup req cache of
Nothing -> do_fetch
Just rvar -> do
mb <- tryReadResult rvar
case mb of
Nothing -> return (CachedNotFetched rvar)
Just r -> do
ifTrace (flags env) 3 $ putStrLn $ case r of
Left _ -> "Cached error: " ++ show req
Right _ -> "Cached request: " ++ show req
return (Cached r)
dataFetch :: (DataSource u r, Request r a) => r a -> GenHaxl u a
dataFetch req = GenHaxl $ \env ref -> do
res <- cached env req
case res of
Uncached rvar -> do
modifyIORef' ref $ \bs -> addRequest (BlockedFetch req rvar) bs
return $ Blocked (continueFetch req rvar)
CachedNotFetched rvar -> return
$ Blocked (continueFetch req rvar)
Cached (Left ex) -> return (Throw ex)
Cached (Right a) -> return (Done a)
uncachedRequest :: (DataSource u r, Request r a) => r a -> GenHaxl u a
uncachedRequest req = GenHaxl $ \_env ref -> do
rvar <- newEmptyResult
modifyIORef' ref $ \bs -> addRequest (BlockedFetch req rvar) bs
return $ Blocked (continueFetch req rvar)
continueFetch
:: (DataSource u r, Request r a, Show a)
=> r a -> ResultVar a -> GenHaxl u a
continueFetch req rvar = GenHaxl $ \_env _ref -> do
m <- tryReadResult rvar
case m of
Nothing -> raise . DataSourceError $
textShow req <> " did not set contents of result var"
Just r -> done r
cacheResult :: (Request r a) => r a -> IO a -> GenHaxl u a
cacheResult req val = GenHaxl $ \env _ref -> do
cachedResult <- cached env req
case cachedResult of
Uncached rvar -> do
result <- Exception.try val
putResult rvar result
case result of
Left e -> do rethrowAsyncExceptions e; done result
_other -> done result
Cached result -> done result
CachedNotFetched _ -> corruptCache
where
corruptCache = raise . DataSourceError $ Text.concat
[ textShow req
, " has a corrupted cache value: these requests are meant to"
, " return immediately without an intermediate value. Either"
, " the cache was updated incorrectly, or you're calling"
, " cacheResult on a query that involves a blocking fetch."
]
rethrowAsyncExceptions :: SomeException -> IO ()
rethrowAsyncExceptions e
#if __GLASGOW_HASKELL__ >= 708
| Just SomeAsyncException{} <- fromException e = Exception.throw e
#endif
#if __GLASGOW_HASKELL__ >= 710
| Just AllocationLimitExceeded{} <- fromException e = Exception.throw e
#endif
| otherwise = return ()
cacheRequest
:: (Request req a) => req a -> Either SomeException a -> GenHaxl u ()
cacheRequest request result = GenHaxl $ \env _ref -> do
res <- cached env request
case res of
Uncached rvar -> do
putResult rvar result
return $ Done ()
_other -> raise $
DataSourceError "cacheRequest: request is already in the cache"
instance IsString a => IsString (GenHaxl u a) where
fromString s = return (fromString s)
performFetches :: forall u. Int -> Env u -> RequestStore u -> IO Int
performFetches n env reqs = do
let f = flags env
sref = statsRef env
jobs = contents reqs
!n' = n + length jobs
t0 <- getCurrentTime
let
roundstats =
[ (dataSourceName (getReq reqs), length reqs)
| BlockedFetches reqs <- jobs ]
where
getReq :: [BlockedFetch r] -> r a
getReq = undefined
ifTrace f 1 $
printf "Batch data fetch (%s)\n" $
intercalate (", "::String) $
map (\(name,num) -> printf "%d %s" num (Text.unpack name)) roundstats
ifTrace f 3 $
forM_ jobs $ \(BlockedFetches reqs) ->
forM_ reqs $ \(BlockedFetch r _) -> putStrLn (show1 r)
let
applyFetch (i, BlockedFetches (reqs :: [BlockedFetch r])) =
case stateGet (states env) of
Nothing ->
return (SyncFetch (mapM_ (setError (const e)) reqs))
where req :: r a; req = undefined
e = DataSourceError $
"data source not initialized: " <> dataSourceName req
Just state ->
return $ wrapFetchInTrace i (length reqs)
(dataSourceName (undefined :: r a))
$ wrapFetchInCatch reqs
$ fetch state f (userEnv env) reqs
fetches <- mapM applyFetch $ zip [n..] jobs
times <-
if report f >= 2
then do
(refs, timedfetches) <- mapAndUnzipM wrapFetchInTimer fetches
scheduleFetches timedfetches
mapM (fmap Just . readIORef) refs
else do
scheduleFetches fetches
return $ repeat Nothing
let dsroundstats = HashMap.fromList
[ (name, DataSourceRoundStats { dataSourceFetches = fetches
, dataSourceTime = time
})
| ((name, fetches), time) <- zip roundstats times]
t1 <- getCurrentTime
let roundtime = realToFrac (diffUTCTime t1 t0) :: Double
ifReport f 1 $
modifyIORef' sref $ \(Stats rounds) ->
Stats (RoundStats (microsecs roundtime) dsroundstats: rounds)
ifTrace f 1 $
printf "Batch data fetch done (%.2fs)\n" (realToFrac roundtime :: Double)
return n'
wrapFetchInCatch :: [BlockedFetch req] -> PerformFetch -> PerformFetch
wrapFetchInCatch reqs fetch =
case fetch of
SyncFetch io ->
SyncFetch (io `Exception.catch` handler)
AsyncFetch fio ->
AsyncFetch (\io -> fio io `Exception.catch` handler)
where
handler :: SomeException -> IO ()
handler e = do
rethrowAsyncExceptions e
mapM_ (forceError e) reqs
forceError e (BlockedFetch _ rvar) = do
void $ tryTakeResult rvar
putResult rvar (except e)
wrapFetchInTimer :: PerformFetch -> IO (IORef Microseconds, PerformFetch)
wrapFetchInTimer f = do
r <- newIORef 0
case f of
SyncFetch io -> return (r, SyncFetch (time io >>= writeIORef r))
AsyncFetch f -> do
inner_r <- newIORef 0
return (r, AsyncFetch $ \inner -> do
total <- time (f (time inner >>= writeIORef inner_r))
inner_t <- readIORef inner_r
writeIORef r (total inner_t))
wrapFetchInTrace :: Int -> Int -> Text.Text -> PerformFetch -> PerformFetch
#ifdef EVENTLOG
wrapFetchInTrace i n dsName f =
case f of
SyncFetch io -> SyncFetch (wrapF "Sync" io)
AsyncFetch fio -> AsyncFetch (wrapF "Async" . fio . unwrapF "Async")
where
d = Text.unpack dsName
wrapF :: String -> IO a -> IO a
wrapF ty = bracket_ (traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
(traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
unwrapF :: String -> IO a -> IO a
unwrapF ty = bracket_ (traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
(traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
#else
wrapFetchInTrace _ _ _ f = f
#endif
time :: IO () -> IO Microseconds
time io = do
t0 <- getCurrentTime
io
t1 <- getCurrentTime
return . microsecs . realToFrac $ t1 `diffUTCTime` t0
microsecs :: Double -> Microseconds
microsecs t = round (t * 10^(6::Int))
scheduleFetches :: [PerformFetch] -> IO()
scheduleFetches fetches = async_fetches sync_fetches
where
async_fetches :: IO () -> IO ()
async_fetches = compose [f | AsyncFetch f <- fetches]
sync_fetches :: IO ()
sync_fetches = sequence_ [io | SyncFetch io <- fetches]
newtype MemoVar u a = MemoVar (IORef (MemoStatus u a))
data MemoStatus u a
= MemoInProgress (RoundId u) (GenHaxl u a)
| MemoDone (Either SomeException a)
type RoundId u = IORef (RequestStore u)
cachedComputation
:: forall req u a. (Request req a)
=> req a -> GenHaxl u a -> GenHaxl u a
cachedComputation req haxl = GenHaxl $ \env ref -> do
cache <- readIORef (memoRef env)
case DataCache.lookup req cache of
Nothing -> do
memovar <- newIORef (MemoInProgress ref haxl)
writeIORef (memoRef env) $! DataCache.insert req (MemoVar memovar) cache
run memovar haxl env ref
Just (MemoVar memovar) -> do
status <- readIORef memovar
case status of
MemoDone r -> done r
MemoInProgress round cont
| round == ref -> return (Blocked (retryMemo req))
| otherwise -> run memovar cont env ref
where
retryMemo req =
cachedComputation req (throw (CriticalError "retryMemo"))
run memovar cont env ref = do
e <- unHaxl cont env ref
case e of
Done a -> complete memovar (Right a)
Throw e -> complete memovar (Left e)
Blocked cont -> do
writeIORef memovar (MemoInProgress ref cont)
return (Blocked (retryMemo req))
complete memovar r = do
writeIORef memovar (MemoDone r)
done r
done :: Either SomeException a -> IO (Result u a)
done = return . either Throw Done
dumpCacheAsHaskell :: GenHaxl u String
dumpCacheAsHaskell = do
ref <- env cacheRef
entries <- unsafeLiftIO $ readIORef ref >>= showCache
let
mk_cr (req, res) =
text "cacheRequest" <+> parens (text req) <+> parens (result res)
result (Left e) = text "except" <+> parens (text (show e))
result (Right s) = text "Right" <+> parens (text s)
return $ show $
text "loadCache :: GenHaxl u ()" $$
text "loadCache = do" $$
nest 2 (vcat (map mk_cr (concatMap snd entries))) $$
text ""