{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE KindSignatures #-}
module Haxl.Core.Fetch
( dataFetch
, dataFetchWithShow
, uncachedRequest
, cacheResult
, cacheResultWithShow
, cacheRequest
, performFetches
, performRequestStore
, ShowReq
) where
import Control.Concurrent.STM
import Control.Exception as Exception
import Control.Monad
import Data.Either
import Data.Hashable
import Data.IORef
import Data.Int
import Data.List
#if __GLASGOW_HASKELL__ < 804
import Data.Monoid
import Data.Proxy
import Data.Typeable
import Data.Text (Text)
import qualified Data.Text as Text
import Text.Printf
import GHC.Stack
import Haxl.Core.DataSource
import Haxl.Core.DataCache as DataCache
import Haxl.Core.Exception
import Haxl.Core.Flags
import Haxl.Core.Monad
import Haxl.Core.Profile
import Haxl.Core.RequestStore
import Haxl.Core.ShowP
import Haxl.Core.Stats
import Haxl.Core.StateStore
import Haxl.Core.Util
data CacheResult u w a
= Uncached
(ResultVar a)
{-# UNPACK #-} !(IVar u w a)
| CachedNotFetched
{-# UNPACK #-} !(IVar u w a)
| Cached (ResultVal a w)
type ShowReq r a = (r a -> String, a -> String)
:: forall r a u w.
(DataSource u r, Typeable (r a))
=> (r a -> String)
-> (r a -> IVar u w a -> DataCache (IVar u w) -> DataCache (IVar u w))
-> Env u w -> r a -> IO (CacheResult u w a)
cachedWithInsert showFn insertFn Env{..} req = do
cache <- readIORef cacheRef
doFetch = do
ivar <- newIVar
let !rvar = stdResultVar ivar completions submittedReqsRef flags
(Proxy :: Proxy r)
writeIORef cacheRef $! insertFn req ivar cache
return (Uncached rvar ivar)
case DataCache.lookup req cache of
Nothing -> doFetch
Just (IVar cr) -> do
e <- readIORef cr
case e of
IVarEmpty _ -> return (CachedNotFetched (IVar cr))
IVarFull r -> do
ifTrace flags 3 $ putStrLn $ case r of
ThrowIO{} -> "Cached error: " ++ showFn req
ThrowHaxl{} -> "Cached error: " ++ showFn req
Ok{} -> "Cached request: " ++ showFn req
return (Cached r)
:: forall r a u w. (DataSourceName r, Typeable r)
=> IVar u w a
-> TVar [CompleteReq u w]
-> IORef ReqCountMap
-> Flags
-> Proxy r
-> ResultVar a
stdResultVar ivar completions ref flags p =
mkResultVar $ \r isChildThread -> do
allocs <- if isChildThread
return 0
ifReport flags 1 $
atomicModifyIORef' ref (\m -> (subFromCountMap p 1 m, ()))
atomically $ do
cs <- readTVar completions
writeTVar completions (CompleteReq r ivar allocs : cs)
{-# INLINE stdResultVar #-}
logFetch :: Env u w -> (r a -> String) -> r a -> IO ()
logFetch env showFn req = do
ifReport (flags env) 5 $ do
stack <- currentCallStack
modifyIORef' (statsRef env) $ \(Stats s) ->
Stats (FetchCall (showFn req) stack : s)
logFetch _ _ _ = return ()
dataFetch :: (DataSource u r, Request r a) => r a -> GenHaxl u w a
dataFetch = dataFetchWithInsert show DataCache.insert
:: (DataSource u r, Eq (r a), Hashable (r a), Typeable (r a))
=> ShowReq r a
-> r a -> GenHaxl u w a
dataFetchWithShow (showReq, showRes) = dataFetchWithInsert showReq
(DataCache.insertWithShow showReq showRes)
:: forall u w r a
. (DataSource u r, Eq (r a), Hashable (r a), Typeable (r a))
=> (r a -> String)
-> (r a -> IVar u w a -> DataCache (IVar u w) -> DataCache (IVar u w))
-> r a
-> GenHaxl u w a
dataFetchWithInsert showFn insertFn req =
GenHaxl $ \env@Env{..} -> do
res <- cachedWithInsert showFn insertFn env req
ifProfiling flags $ addProfileFetch env req
case res of
Uncached rvar ivar -> do
logFetch env showFn req
case schedulerHint userEnv :: SchedulerHint r of
SubmitImmediately -> do
(_,ios) <- performFetches 0 env
[BlockedFetches [BlockedFetch req rvar]]
when (not (null ios)) $
error "bad data source:SubmitImmediately but returns FutureFetch"
TryToBatch ->
modifyIORef' reqStoreRef $ \bs ->
addRequest (BlockedFetch req rvar) bs
return $ Blocked ivar (Cont (getIVar ivar))
CachedNotFetched ivar -> return $ Blocked ivar (Cont (getIVar ivar))
Cached r -> done r
:: forall a u w (r :: * -> *). (DataSource u r, Request r a)
=> r a -> GenHaxl u w a
uncachedRequest req = do
flg <- env flags
subRef <- env submittedReqsRef
if recording flg /= 0
then dataFetch req
else GenHaxl $ \Env{..} -> do
ivar <- newIVar
let !rvar = stdResultVar ivar completions subRef flg (Proxy :: Proxy r)
modifyIORef' reqStoreRef $ \bs ->
addRequest (BlockedFetch req rvar) bs
return $ Blocked ivar (Cont (getIVar ivar))
cacheResult :: Request r a => r a -> IO a -> GenHaxl u w a
cacheResult = cacheResultWithInsert show DataCache.insert
:: (Eq (r a), Hashable (r a), Typeable (r a))
=> ShowReq r a -> r a -> IO a -> GenHaxl u w a
cacheResultWithShow (showReq, showRes) = cacheResultWithInsert showReq
(DataCache.insertWithShow showReq showRes)
:: Typeable (r a)
=> (r a -> String)
-> (r a -> IVar u w a -> DataCache (IVar u w) -> DataCache (IVar u w)) -> r a
-> IO a -> GenHaxl u w a
cacheResultWithInsert showFn insertFn req val = GenHaxl $ \env -> do
let !ref = cacheRef env
cache <- readIORef ref
case DataCache.lookup req cache of
Nothing -> do
eitherResult <- Exception.try val
case eitherResult of
Left e -> rethrowAsyncExceptions e
_ -> return ()
let result = eitherToResultThrowIO eitherResult
ivar <- newFullIVar result
writeIORef ref $! insertFn req ivar cache
done result
Just (IVar cr) -> do
e <- readIORef cr
case e of
IVarEmpty _ -> corruptCache
IVarFull r -> done r
corruptCache = raise . DataSourceError $ Text.concat
[ Text.pack (showFn req)
, " has a corrupted cache value: these requests are meant to"
, " return immediately without an intermediate value. Either"
, " the cache was updated incorrectly, or you're calling"
, " cacheResult on a query that involves a blocking fetch."
:: Request req a => req a -> Either SomeException a -> GenHaxl u w ()
cacheRequest request result = GenHaxl $ \env -> do
cache <- readIORef (cacheRef env)
case DataCache.lookup request cache of
Nothing -> do
cr <- newFullIVar (eitherToResult result)
writeIORef (cacheRef env) $! DataCache.insert request cr cache
return (Done ())
_other -> raise $
DataSourceError "cacheRequest: request is already in the cache"
:: forall u w. Int -> Env u w -> RequestStore u -> IO (Int, [IO ()])
performRequestStore n env reqStore =
performFetches n env (contents reqStore)
:: forall u w. Int -> Env u w -> [BlockedFetches u] -> IO (Int, [IO ()])
performFetches n env@Env{flags=f, statsRef=sref} jobs = do
let !n' = n + length jobs
t0 <- getTimestamp
roundstats =
[ (dataSourceName (Proxy :: Proxy r), length reqs)
| BlockedFetches (reqs :: [BlockedFetch r]) <- jobs ]
ifTrace f 1 $
printf "Batch data fetch (%s)\n" $
intercalate (", "::String) $
map (\(name,num) -> printf "%d %s" num (Text.unpack name)) roundstats
ifTrace f 3 $
forM_ jobs $ \(BlockedFetches reqs) ->
forM_ reqs $ \(BlockedFetch r _) -> putStrLn (showp r)
applyFetch i (BlockedFetches (reqs :: [BlockedFetch r])) =
case stateGet (states env) of
Nothing ->
return (FetchToDo reqs (SyncFetch (mapM_ (setError e))))
e :: ShowP req => req a -> DataSourceError
e req = DataSourceError $ "data source not initialized: " <> dsName
<> ": "
<> Text.pack (showp req)
Just state ->
$ FetchToDo reqs
$ (if report f >= 2
then wrapFetchInStats sref dsName (length reqs)
else id)
$ wrapFetchInTrace i (length reqs) dsName
$ wrapFetchInCatch reqs
$ fetch state f (userEnv env)
dsName = dataSourceName (Proxy :: Proxy r)
fetches <- zipWithM applyFetch [n..] jobs
waits <- scheduleFetches fetches (submittedReqsRef env) (flags env)
t1 <- getTimestamp
let roundtime = fromIntegral (t1 - t0) / 1000000 :: Double
ifTrace f 1 $
printf "Batch data fetch done (%.2fs)\n" (realToFrac roundtime :: Double)
return (n', waits)
data FetchToDo where
:: forall (req :: * -> *). (DataSourceName req, Typeable req)
=> [BlockedFetch req] -> PerformFetch req -> FetchToDo
wrapFetchInCatch :: [BlockedFetch req] -> PerformFetch req -> PerformFetch req
wrapFetchInCatch reqs fetch =
case fetch of
SyncFetch f ->
SyncFetch $ \reqs -> f reqs `Exception.catch` handler
AsyncFetch f ->
AsyncFetch $ \reqs io -> f reqs io `Exception.catch` handler
FutureFetch f ->
FutureFetch $ \reqs -> f reqs `Exception.catch` (
\e -> handler e >> return (return ()))
BackgroundFetch f ->
BackgroundFetch $ \reqs -> f reqs `Exception.catch` handler
handler :: SomeException -> IO ()
handler e = do
rethrowAsyncExceptions e
mapM_ (forceError e) reqs
forceError e (BlockedFetch _ rvar) =
putResult rvar (except e)
:: IORef Stats
-> Text
-> Int
-> PerformFetch req
-> PerformFetch req
wrapFetchInStats !statsRef dataSource batchSize perform = do
case perform of
SyncFetch f ->
SyncFetch $ \reqs -> do
fail_ref <- newIORef 0
(t0,t,alloc,_) <- statsForIO (f (map (addFailureCount fail_ref) reqs))
failures <- readIORef fail_ref
updateFetchStats t0 t alloc batchSize failures
AsyncFetch f -> do
AsyncFetch $ \reqs inner -> do
inner_r <- newIORef (0, 0)
fail_ref <- newIORef 0
let inner' = do
(_,t,alloc,_) <- statsForIO inner
writeIORef inner_r (t,alloc)
reqs' = map (addFailureCount fail_ref) reqs
(t0, totalTime, totalAlloc, _) <- statsForIO (f reqs' inner')
(innerTime, innerAlloc) <- readIORef inner_r
failures <- readIORef fail_ref
updateFetchStats t0 (totalTime - innerTime) (totalAlloc - innerAlloc)
batchSize failures
FutureFetch submit ->
FutureFetch $ \reqs -> do
fail_ref <- newIORef 0
let reqs' = map (addFailureCount fail_ref) reqs
(t0, submitTime, submitAlloc, wait) <- statsForIO (submit reqs')
return $ do
(_, waitTime, waitAlloc, _) <- statsForIO wait
failures <- readIORef fail_ref
updateFetchStats t0 (submitTime + waitTime) (submitAlloc + waitAlloc)
batchSize failures
BackgroundFetch io -> do
BackgroundFetch $ \reqs -> do
startTime <- getTimestamp
io (map (addTimer startTime) reqs)
statsForIO io = do
prevAlloc <- getAllocationCounter
(t0,t,a) <- time io
postAlloc <- getAllocationCounter
return (t0,t, fromIntegral $ prevAlloc - postAlloc, a)
addTimer t0 (BlockedFetch req (ResultVar fn)) =
BlockedFetch req $ ResultVar $ \result isChildThread -> do
t1 <- getTimestamp
allocs <- if isChildThread then getAllocationCounter else return 0
updateFetchStats t0 (t1 - t0)
(negate allocs)
(if isLeft result then 1 else 0)
fn result isChildThread
:: Timestamp -> Microseconds -> Int64 -> Int -> Int -> IO ()
updateFetchStats start time space batch failures = do
let this = FetchStats { fetchDataSource = dataSource
, fetchBatchSize = batch
, fetchStart = start
, fetchDuration = time
, fetchSpace = space
, fetchFailures = failures }
atomicModifyIORef' statsRef $ \(Stats fs) -> (Stats (this : fs), ())
addFailureCount :: IORef Int -> BlockedFetch r -> BlockedFetch r
addFailureCount ref (BlockedFetch req (ResultVar fn)) =
BlockedFetch req $ ResultVar $ \result isChildThread -> do
when (isLeft result) $ atomicModifyIORef' ref (\r -> (r+1,()))
fn result isChildThread
:: Int
-> Int
-> Text
-> PerformFetch req
-> PerformFetch req
wrapFetchInTrace i n dsName f =
case f of
SyncFetch io -> SyncFetch (wrapF "Sync" io)
AsyncFetch fio -> AsyncFetch (wrapF "Async" . fio . unwrapF "Async")
d = Text.unpack dsName
wrapF :: String -> IO a -> IO a
wrapF ty = bracket_ (traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
(traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
unwrapF :: String -> IO a -> IO a
unwrapF ty = bracket_ (traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
(traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
wrapFetchInTrace _ _ _ f = f
time :: IO a -> IO (Timestamp,Microseconds,a)
time io = do
t0 <- getTimestamp
a <- io
t1 <- getTimestamp
return (t0, t1 - t0, a)
scheduleFetches :: [FetchToDo] -> IORef ReqCountMap -> Flags -> IO [IO ()]
scheduleFetches fetches ref flags = do
ifReport flags 1 $ sequence_
[ atomicModifyIORef' ref $
\m -> (addToCountMap (Proxy :: Proxy r) (length reqs) m, ())
| FetchToDo (reqs :: [BlockedFetch r]) _f <- fetches
waits <- future_fetches
async_fetches sync_fetches
return waits
fully_async_fetches :: IO ()
fully_async_fetches = sequence_
[f reqs | FetchToDo reqs (BackgroundFetch f) <- fetches]
future_fetches :: IO [IO ()]
future_fetches = sequence
[f reqs | FetchToDo reqs (FutureFetch f) <- fetches]
async_fetches :: IO () -> IO ()
async_fetches = compose
[f reqs | FetchToDo reqs (AsyncFetch f) <- fetches]
sync_fetches :: IO ()
sync_fetches = sequence_
[f reqs | FetchToDo reqs (SyncFetch f) <- fetches]