{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE KindSignatures #-}
module Haxl.Core.Fetch
( dataFetch
, dataFetchWithShow
, uncachedRequest
, cacheResult
, cacheResultWithShow
, cacheRequest
, performFetches
, performRequestStore
, ShowReq
) where
import Control.Concurrent.STM
import Control.Exception as Exception
import Control.Monad
import Data.Either
import Data.Hashable
import Data.IORef
import Data.Int
import Data.List
#if __GLASGOW_HASKELL__ < 804
import Data.Monoid
#endif
import Data.Proxy
import Data.Typeable
import Data.Text (Text)
import qualified Data.Text as Text
import Text.Printf
#ifdef PROFILING
import GHC.Stack
#endif
import Haxl.Core.DataSource
import Haxl.Core.DataCache as DataCache
import Haxl.Core.Exception
import Haxl.Core.Flags
import Haxl.Core.Monad
import Haxl.Core.Profile
import Haxl.Core.RequestStore
import Haxl.Core.ShowP
import Haxl.Core.Stats
import Haxl.Core.StateStore
import Haxl.Core.Util
data CacheResult u w a
= Uncached
(ResultVar a)
{-# UNPACK #-} !(IVar u w a)
| CachedNotFetched
{-# UNPACK #-} !(IVar u w a)
| Cached (ResultVal a w)
type ShowReq r a = (r a -> String, a -> String)
cachedWithInsert
:: forall r a u w.
(DataSource u r, Typeable (r a))
=> (r a -> String)
-> (r a -> IVar u w a -> DataCache (IVar u w) -> DataCache (IVar u w))
-> Env u w -> r a -> IO (CacheResult u w a)
cachedWithInsert showFn insertFn Env{..} req = do
cache <- readIORef cacheRef
let
doFetch = do
ivar <- newIVar
let !rvar = stdResultVar ivar completions submittedReqsRef flags
(Proxy :: Proxy r)
writeIORef cacheRef $! insertFn req ivar cache
return (Uncached rvar ivar)
case DataCache.lookup req cache of
Nothing -> doFetch
Just (IVar cr) -> do
e <- readIORef cr
case e of
IVarEmpty _ -> return (CachedNotFetched (IVar cr))
IVarFull r -> do
ifTrace flags 3 $ putStrLn $ case r of
ThrowIO{} -> "Cached error: " ++ showFn req
ThrowHaxl{} -> "Cached error: " ++ showFn req
Ok{} -> "Cached request: " ++ showFn req
return (Cached r)
stdResultVar
:: forall r a u w. (DataSourceName r, Typeable r)
=> IVar u w a
-> TVar [CompleteReq u w]
-> IORef ReqCountMap
-> Flags
-> Proxy r
-> ResultVar a
stdResultVar ivar completions ref flags p =
mkResultVar $ \r isChildThread -> do
allocs <- if isChildThread
then
getAllocationCounter
else
return 0
ifReport flags 1 $
atomicModifyIORef' ref (\m -> (subFromCountMap p 1 m, ()))
atomically $ do
cs <- readTVar completions
writeTVar completions (CompleteReq r ivar allocs : cs)
{-# INLINE stdResultVar #-}
logFetch :: Env u w -> (r a -> String) -> r a -> IO ()
#ifdef PROFILING
logFetch env showFn req = do
ifReport (flags env) 5 $ do
stack <- currentCallStack
modifyIORef' (statsRef env) $ \(Stats s) ->
Stats (FetchCall (showFn req) stack : s)
#else
logFetch _ _ _ = return ()
#endif
dataFetch :: (DataSource u r, Request r a) => r a -> GenHaxl u w a
dataFetch = dataFetchWithInsert show DataCache.insert
dataFetchWithShow
:: (DataSource u r, Eq (r a), Hashable (r a), Typeable (r a))
=> ShowReq r a
-> r a -> GenHaxl u w a
dataFetchWithShow (showReq, showRes) = dataFetchWithInsert showReq
(DataCache.insertWithShow showReq showRes)
dataFetchWithInsert
:: forall u w r a
. (DataSource u r, Eq (r a), Hashable (r a), Typeable (r a))
=> (r a -> String)
-> (r a -> IVar u w a -> DataCache (IVar u w) -> DataCache (IVar u w))
-> r a
-> GenHaxl u w a
dataFetchWithInsert showFn insertFn req =
GenHaxl $ \env@Env{..} -> do
res <- cachedWithInsert showFn insertFn env req
ifProfiling flags $ addProfileFetch env req
case res of
Uncached rvar ivar -> do
logFetch env showFn req
case schedulerHint userEnv :: SchedulerHint r of
SubmitImmediately -> do
(_,ios) <- performFetches 0 env
[BlockedFetches [BlockedFetch req rvar]]
when (not (null ios)) $
error "bad data source:SubmitImmediately but returns FutureFetch"
TryToBatch ->
modifyIORef' reqStoreRef $ \bs ->
addRequest (BlockedFetch req rvar) bs
return $ Blocked ivar (Cont (getIVar ivar))
CachedNotFetched ivar -> return $ Blocked ivar (Cont (getIVar ivar))
Cached r -> done r
uncachedRequest
:: forall a u w (r :: * -> *). (DataSource u r, Request r a)
=> r a -> GenHaxl u w a
uncachedRequest req = do
flg <- env flags
subRef <- env submittedReqsRef
if recording flg /= 0
then dataFetch req
else GenHaxl $ \Env{..} -> do
ivar <- newIVar
let !rvar = stdResultVar ivar completions subRef flg (Proxy :: Proxy r)
modifyIORef' reqStoreRef $ \bs ->
addRequest (BlockedFetch req rvar) bs
return $ Blocked ivar (Cont (getIVar ivar))
cacheResult :: Request r a => r a -> IO a -> GenHaxl u w a
cacheResult = cacheResultWithInsert show DataCache.insert
cacheResultWithShow
:: (Eq (r a), Hashable (r a), Typeable (r a))
=> ShowReq r a -> r a -> IO a -> GenHaxl u w a
cacheResultWithShow (showReq, showRes) = cacheResultWithInsert showReq
(DataCache.insertWithShow showReq showRes)
cacheResultWithInsert
:: Typeable (r a)
=> (r a -> String)
-> (r a -> IVar u w a -> DataCache (IVar u w) -> DataCache (IVar u w)) -> r a
-> IO a -> GenHaxl u w a
cacheResultWithInsert showFn insertFn req val = GenHaxl $ \env -> do
let !ref = cacheRef env
cache <- readIORef ref
case DataCache.lookup req cache of
Nothing -> do
eitherResult <- Exception.try val
case eitherResult of
Left e -> rethrowAsyncExceptions e
_ -> return ()
let result = eitherToResultThrowIO eitherResult
ivar <- newFullIVar result
writeIORef ref $! insertFn req ivar cache
done result
Just (IVar cr) -> do
e <- readIORef cr
case e of
IVarEmpty _ -> corruptCache
IVarFull r -> done r
where
corruptCache = raise . DataSourceError $ Text.concat
[ Text.pack (showFn req)
, " has a corrupted cache value: these requests are meant to"
, " return immediately without an intermediate value. Either"
, " the cache was updated incorrectly, or you're calling"
, " cacheResult on a query that involves a blocking fetch."
]
cacheRequest
:: Request req a => req a -> Either SomeException a -> GenHaxl u w ()
cacheRequest request result = GenHaxl $ \env -> do
cache <- readIORef (cacheRef env)
case DataCache.lookup request cache of
Nothing -> do
cr <- newFullIVar (eitherToResult result)
writeIORef (cacheRef env) $! DataCache.insert request cr cache
return (Done ())
_other -> raise $
DataSourceError "cacheRequest: request is already in the cache"
performRequestStore
:: forall u w. Int -> Env u w -> RequestStore u -> IO (Int, [IO ()])
performRequestStore n env reqStore =
performFetches n env (contents reqStore)
performFetches
:: forall u w. Int -> Env u w -> [BlockedFetches u] -> IO (Int, [IO ()])
performFetches n env@Env{flags=f, statsRef=sref} jobs = do
let !n' = n + length jobs
t0 <- getTimestamp
let
roundstats =
[ (dataSourceName (Proxy :: Proxy r), length reqs)
| BlockedFetches (reqs :: [BlockedFetch r]) <- jobs ]
ifTrace f 1 $
printf "Batch data fetch (%s)\n" $
intercalate (", "::String) $
map (\(name,num) -> printf "%d %s" num (Text.unpack name)) roundstats
ifTrace f 3 $
forM_ jobs $ \(BlockedFetches reqs) ->
forM_ reqs $ \(BlockedFetch r _) -> putStrLn (showp r)
let
applyFetch i (BlockedFetches (reqs :: [BlockedFetch r])) =
case stateGet (states env) of
Nothing ->
return (FetchToDo reqs (SyncFetch (mapM_ (setError e))))
where
e :: ShowP req => req a -> DataSourceError
e req = DataSourceError $ "data source not initialized: " <> dsName
<> ": "
<> Text.pack (showp req)
Just state ->
return
$ FetchToDo reqs
$ (if report f >= 2
then wrapFetchInStats sref dsName (length reqs)
else id)
$ wrapFetchInTrace i (length reqs) dsName
$ wrapFetchInCatch reqs
$ fetch state f (userEnv env)
where
dsName = dataSourceName (Proxy :: Proxy r)
fetches <- zipWithM applyFetch [n..] jobs
waits <- scheduleFetches fetches (submittedReqsRef env) (flags env)
t1 <- getTimestamp
let roundtime = fromIntegral (t1 - t0) / 1000000 :: Double
ifTrace f 1 $
printf "Batch data fetch done (%.2fs)\n" (realToFrac roundtime :: Double)
return (n', waits)
data FetchToDo where
FetchToDo
:: forall (req :: * -> *). (DataSourceName req, Typeable req)
=> [BlockedFetch req] -> PerformFetch req -> FetchToDo
wrapFetchInCatch :: [BlockedFetch req] -> PerformFetch req -> PerformFetch req
wrapFetchInCatch reqs fetch =
case fetch of
SyncFetch f ->
SyncFetch $ \reqs -> f reqs `Exception.catch` handler
AsyncFetch f ->
AsyncFetch $ \reqs io -> f reqs io `Exception.catch` handler
FutureFetch f ->
FutureFetch $ \reqs -> f reqs `Exception.catch` (
\e -> handler e >> return (return ()))
BackgroundFetch f ->
BackgroundFetch $ \reqs -> f reqs `Exception.catch` handler
where
handler :: SomeException -> IO ()
handler e = do
rethrowAsyncExceptions e
mapM_ (forceError e) reqs
forceError e (BlockedFetch _ rvar) =
putResult rvar (except e)
wrapFetchInStats
:: IORef Stats
-> Text
-> Int
-> PerformFetch req
-> PerformFetch req
wrapFetchInStats !statsRef dataSource batchSize perform = do
case perform of
SyncFetch f ->
SyncFetch $ \reqs -> do
fail_ref <- newIORef 0
(t0,t,alloc,_) <- statsForIO (f (map (addFailureCount fail_ref) reqs))
failures <- readIORef fail_ref
updateFetchStats t0 t alloc batchSize failures
AsyncFetch f -> do
AsyncFetch $ \reqs inner -> do
inner_r <- newIORef (0, 0)
fail_ref <- newIORef 0
let inner' = do
(_,t,alloc,_) <- statsForIO inner
writeIORef inner_r (t,alloc)
reqs' = map (addFailureCount fail_ref) reqs
(t0, totalTime, totalAlloc, _) <- statsForIO (f reqs' inner')
(innerTime, innerAlloc) <- readIORef inner_r
failures <- readIORef fail_ref
updateFetchStats t0 (totalTime - innerTime) (totalAlloc - innerAlloc)
batchSize failures
FutureFetch submit ->
FutureFetch $ \reqs -> do
fail_ref <- newIORef 0
let reqs' = map (addFailureCount fail_ref) reqs
(t0, submitTime, submitAlloc, wait) <- statsForIO (submit reqs')
return $ do
(_, waitTime, waitAlloc, _) <- statsForIO wait
failures <- readIORef fail_ref
updateFetchStats t0 (submitTime + waitTime) (submitAlloc + waitAlloc)
batchSize failures
BackgroundFetch io -> do
BackgroundFetch $ \reqs -> do
startTime <- getTimestamp
io (map (addTimer startTime) reqs)
where
statsForIO io = do
prevAlloc <- getAllocationCounter
(t0,t,a) <- time io
postAlloc <- getAllocationCounter
return (t0,t, fromIntegral $ prevAlloc - postAlloc, a)
addTimer t0 (BlockedFetch req (ResultVar fn)) =
BlockedFetch req $ ResultVar $ \result isChildThread -> do
t1 <- getTimestamp
allocs <- if isChildThread then getAllocationCounter else return 0
updateFetchStats t0 (t1 - t0)
(negate allocs)
1
(if isLeft result then 1 else 0)
fn result isChildThread
updateFetchStats
:: Timestamp -> Microseconds -> Int64 -> Int -> Int -> IO ()
updateFetchStats start time space batch failures = do
let this = FetchStats { fetchDataSource = dataSource
, fetchBatchSize = batch
, fetchStart = start
, fetchDuration = time
, fetchSpace = space
, fetchFailures = failures }
atomicModifyIORef' statsRef $ \(Stats fs) -> (Stats (this : fs), ())
addFailureCount :: IORef Int -> BlockedFetch r -> BlockedFetch r
addFailureCount ref (BlockedFetch req (ResultVar fn)) =
BlockedFetch req $ ResultVar $ \result isChildThread -> do
when (isLeft result) $ atomicModifyIORef' ref (\r -> (r+1,()))
fn result isChildThread
wrapFetchInTrace
:: Int
-> Int
-> Text
-> PerformFetch req
-> PerformFetch req
#ifdef EVENTLOG
wrapFetchInTrace i n dsName f =
case f of
SyncFetch io -> SyncFetch (wrapF "Sync" io)
AsyncFetch fio -> AsyncFetch (wrapF "Async" . fio . unwrapF "Async")
where
d = Text.unpack dsName
wrapF :: String -> IO a -> IO a
wrapF ty = bracket_ (traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
(traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
unwrapF :: String -> IO a -> IO a
unwrapF ty = bracket_ (traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
(traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
#else
wrapFetchInTrace _ _ _ f = f
#endif
time :: IO a -> IO (Timestamp,Microseconds,a)
time io = do
t0 <- getTimestamp
a <- io
t1 <- getTimestamp
return (t0, t1 - t0, a)
scheduleFetches :: [FetchToDo] -> IORef ReqCountMap -> Flags -> IO [IO ()]
scheduleFetches fetches ref flags = do
ifReport flags 1 $ sequence_
[ atomicModifyIORef' ref $
\m -> (addToCountMap (Proxy :: Proxy r) (length reqs) m, ())
| FetchToDo (reqs :: [BlockedFetch r]) _f <- fetches
]
fully_async_fetches
waits <- future_fetches
async_fetches sync_fetches
return waits
where
fully_async_fetches :: IO ()
fully_async_fetches = sequence_
[f reqs | FetchToDo reqs (BackgroundFetch f) <- fetches]
future_fetches :: IO [IO ()]
future_fetches = sequence
[f reqs | FetchToDo reqs (FutureFetch f) <- fetches]
async_fetches :: IO () -> IO ()
async_fetches = compose
[f reqs | FetchToDo reqs (AsyncFetch f) <- fetches]
sync_fetches :: IO ()
sync_fetches = sequence_
[f reqs | FetchToDo reqs (SyncFetch f) <- fetches]