{-# LANGUAGE CPP #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE DeriveDataTypeable #-}
module Haxl.Core.Fetch
( dataFetch
, dataFetchWithShow
, dataFetchWithInsert
, uncachedRequest
, cacheResult
, dupableCacheRequest
, cacheResultWithShow
, cacheRequest
, performFetches
, performRequestStore
, ShowReq
) where
import Control.Concurrent.STM
import Control.Exception as Exception
import Control.Monad
import Data.Either
import Data.Hashable
import Data.IORef
import Data.Int
import Data.List
#if __GLASGOW_HASKELL__ < 804
import Data.Monoid
#endif
import Data.Proxy
import Data.Typeable
import Data.Text (Text)
import Data.Kind (Type)
import qualified Data.Text as Text
import Text.Printf
#ifdef PROFILING
import GHC.Stack
#endif
import Haxl.Core.DataSource
import Haxl.Core.DataCache as DataCache
import Haxl.Core.Exception
import Haxl.Core.Flags
import Haxl.Core.Monad
import Haxl.Core.Profile
import Haxl.Core.RequestStore
import Haxl.Core.ShowP
import Haxl.Core.Stats
import Haxl.Core.StateStore
import Haxl.Core.Util
data CacheResult u w a
= Uncached
(ResultVar a)
{-# UNPACK #-} !(IVar u w a)
{-# UNPACK #-} !CallId
| CachedNotFetched
{-# UNPACK #-} !(IVar u w a)
{-# UNPACK #-} !CallId
| Cached (ResultVal a w)
{-# UNPACK #-} !CallId
type ShowReq r a = (r a -> String, a -> String)
cachedWithInsert
:: forall r a u w.
(DataSource u r, Typeable (r a))
=> (r a -> String)
-> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> Env u w -> r a -> IO (CacheResult u w a)
cachedWithInsert :: (r a -> String)
-> (r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> Env u w
-> r a
-> IO (CacheResult u w a)
cachedWithInsert r a -> String
showFn r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()
insertFn env :: Env u w
env@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
DataCache (DataCacheItem u w)
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: DataCache (DataCacheItem u w)
dataCache :: DataCache (DataCacheItem u w)
..} r a
req = do
let
doFetch :: IO (CacheResult u w a)
doFetch = do
IVar u w a
ivar <- IO (IVar u w a)
forall u w a. IO (IVar u w a)
newIVar
CallId
k <- Env u w -> IO CallId
forall u w. Env u w -> IO CallId
nextCallId Env u w
env
let !rvar :: ResultVar a
rvar = Env u w -> IVar u w a -> Proxy r -> ResultVar a
forall (r :: * -> *) a u w.
(DataSourceName r, Typeable r) =>
Env u w -> IVar u w a -> Proxy r -> ResultVar a
stdResultVar Env u w
env IVar u w a
ivar (Proxy r
forall k (t :: k). Proxy t
Proxy :: Proxy r)
r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()
insertFn r a
req (IVar u w a -> CallId -> DataCacheItem u w a
forall u w a. IVar u w a -> CallId -> DataCacheItem u w a
DataCacheItem IVar u w a
ivar CallId
k) DataCache (DataCacheItem u w)
dataCache
CacheResult u w a -> IO (CacheResult u w a)
forall (m :: * -> *) a. Monad m => a -> m a
return (ResultVar a -> IVar u w a -> CallId -> CacheResult u w a
forall u w a.
ResultVar a -> IVar u w a -> CallId -> CacheResult u w a
Uncached ResultVar a
rvar IVar u w a
ivar CallId
k)
Maybe (DataCacheItem u w a)
mbRes <- r a
-> DataCache (DataCacheItem u w)
-> IO (Maybe (DataCacheItem u w a))
forall (req :: * -> *) a (res :: * -> *).
Typeable (req a) =>
req a -> DataCache res -> IO (Maybe (res a))
DataCache.lookup r a
req DataCache (DataCacheItem u w)
dataCache
case Maybe (DataCacheItem u w a)
mbRes of
Maybe (DataCacheItem u w a)
Nothing -> IO (CacheResult u w a)
doFetch
Just (DataCacheItem i :: IVar u w a
i@IVar{ivarRef :: forall u w a. IVar u w a -> IORef (IVarContents u w a)
ivarRef = IORef (IVarContents u w a)
cr} CallId
k) -> do
IVarContents u w a
e <- IORef (IVarContents u w a) -> IO (IVarContents u w a)
forall a. IORef a -> IO a
readIORef IORef (IVarContents u w a)
cr
case IVarContents u w a
e of
IVarEmpty JobList u w
_ -> do
IVar u w a
ivar <- IVar u w a -> IO (IVar u w a)
forall u w a. IVar u w a -> IO (IVar u w a)
withCurrentCCS IVar u w a
i
CacheResult u w a -> IO (CacheResult u w a)
forall (m :: * -> *) a. Monad m => a -> m a
return (IVar u w a -> CallId -> CacheResult u w a
forall u w a. IVar u w a -> CallId -> CacheResult u w a
CachedNotFetched IVar u w a
ivar CallId
k)
IVarFull ResultVal a w
r -> do
Flags -> CallId -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => Flags -> CallId -> m a -> m ()
ifTrace Flags
flags CallId
3 (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ case ResultVal a w
r of
ThrowIO{} -> String
"Cached error: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ r a -> String
showFn r a
req
ThrowHaxl{} -> String
"Cached error: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ r a -> String
showFn r a
req
Ok{} -> String
"Cached request: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ r a -> String
showFn r a
req
CacheResult u w a -> IO (CacheResult u w a)
forall (m :: * -> *) a. Monad m => a -> m a
return (ResultVal a w -> CallId -> CacheResult u w a
forall u w a. ResultVal a w -> CallId -> CacheResult u w a
Cached ResultVal a w
r CallId
k)
stdResultVar
:: forall r a u w. (DataSourceName r, Typeable r)
=> Env u w
-> IVar u w a
-> Proxy r
-> ResultVar a
stdResultVar :: Env u w -> IVar u w a -> Proxy r -> ResultVar a
stdResultVar Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} IVar u w a
ivar Proxy r
p =
(Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a.
(Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
mkResultVar ((Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a)
-> (Either SomeException a
-> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a b. (a -> b) -> a -> b
$ \Either SomeException a
r Bool
isChildThread Maybe DataSourceStats
_ -> do
Int64
allocs <- if Bool
isChildThread
then
IO Int64
getAllocationCounter
else
Int64 -> IO Int64
forall (m :: * -> *) a. Monad m => a -> m a
return Int64
0
LogicBug -> STM () -> IO ()
forall e a. Exception e => e -> STM a -> IO a
atomicallyOnBlocking
(ReadingCompletionsFailedFetch -> LogicBug
forall e. Exception e => e -> LogicBug
LogicBug (Text -> ReadingCompletionsFailedFetch
ReadingCompletionsFailedFetch (Proxy r -> Text
forall (req :: * -> *). DataSourceName req => Proxy req -> Text
dataSourceName Proxy r
p))) (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
[CompleteReq u w]
cs <- TVar [CompleteReq u w] -> STM [CompleteReq u w]
forall a. TVar a -> STM a
readTVar TVar [CompleteReq u w]
completions
TVar [CompleteReq u w] -> [CompleteReq u w] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [CompleteReq u w]
completions (ResultVal a w -> IVar u w a -> Int64 -> CompleteReq u w
forall u w a.
ResultVal a w -> IVar u w a -> Int64 -> CompleteReq u w
CompleteReq (Either SomeException a -> ResultVal a w
forall a w. Either SomeException a -> ResultVal a w
eitherToResult Either SomeException a
r) IVar u w a
ivar Int64
allocs CompleteReq u w -> [CompleteReq u w] -> [CompleteReq u w]
forall a. a -> [a] -> [a]
: [CompleteReq u w]
cs)
Flags -> ReportFlag -> IO () -> IO ()
forall (m :: * -> *) a.
Monad m =>
Flags -> ReportFlag -> m a -> m ()
ifReport Flags
flags ReportFlag
ReportOutgoneFetches (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IORef ReqCountMap -> (ReqCountMap -> (ReqCountMap, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef ReqCountMap
submittedReqsRef (\ReqCountMap
m -> (Proxy r -> CallId -> ReqCountMap -> ReqCountMap
forall (r :: * -> *).
(DataSourceName r, Typeable r) =>
Proxy r -> CallId -> ReqCountMap -> ReqCountMap
subFromCountMap Proxy r
p CallId
1 ReqCountMap
m, ()))
{-# INLINE stdResultVar #-}
logFetch :: Env u w -> (r a -> String) -> r a -> CallId -> IO ()
#ifdef PROFILING
logFetch env showFn req fid = do
ifReport (flags env) ReportFetchStack $ do
stack <- currentCallStack
modifyIORef' (statsRef env) $ \(Stats s) ->
Stats (FetchCall (showFn req) stack fid : s)
#else
logFetch :: Env u w -> (r a -> String) -> r a -> CallId -> IO ()
logFetch Env u w
_ r a -> String
_ r a
_ CallId
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
#endif
calcFailure
:: forall u req a . DataSource u req
=> u
-> req a
-> Either SomeException a
-> FailureCount
calcFailure :: u -> req a -> Either SomeException a -> FailureCount
calcFailure u
_u req a
_r Right{} = FailureCount
forall a. Monoid a => a
mempty
calcFailure u
u req a
r (Left SomeException
e) = case u -> req a -> SomeException -> FailureClassification
forall u (req :: * -> *) a.
DataSource u req =>
u -> req a -> SomeException -> FailureClassification
classifyFailure u
u req a
r SomeException
e of
FailureClassification
StandardFailure -> FailureCount
forall a. Monoid a => a
mempty { failureCountStandard :: CallId
failureCountStandard = CallId
1 }
FailureClassification
IgnoredForStatsFailure -> FailureCount
forall a. Monoid a => a
mempty { failureCountIgnored :: CallId
failureCountIgnored = CallId
1 }
addFallbackFetchStats
:: forall u w req a . DataSource u req
=> Env u w
-> CallId
-> req a
-> ResultVal a w
-> IO ()
addFallbackFetchStats :: Env u w -> CallId -> req a -> ResultVal a w -> IO ()
addFallbackFetchStats Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} CallId
fid req a
req ResultVal a w
res = do
CallId
bid <- IORef CallId -> (CallId -> (CallId, CallId)) -> IO CallId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef CallId
statsBatchIdRef ((CallId -> (CallId, CallId)) -> IO CallId)
-> (CallId -> (CallId, CallId)) -> IO CallId
forall a b. (a -> b) -> a -> b
$ \CallId
x -> (CallId
xCallId -> CallId -> CallId
forall a. Num a => a -> a -> a
+CallId
1,CallId
xCallId -> CallId -> CallId
forall a. Num a => a -> a -> a
+CallId
1)
Int64
start <- IO Int64
getTimestamp
let
dsName :: Text
dsName = Proxy req -> Text
forall (req :: * -> *). DataSourceName req => Proxy req -> Text
dataSourceName (Proxy req
forall k (t :: k). Proxy t
Proxy :: Proxy req)
FailureCount{CallId
failureCountIgnored :: CallId
failureCountStandard :: CallId
failureCountIgnored :: FailureCount -> CallId
failureCountStandard :: FailureCount -> CallId
..} = case ResultVal a w
res of
Ok{} -> FailureCount
forall a. Monoid a => a
mempty
(ThrowHaxl SomeException
e WriteTree w
_) -> u -> req a -> Either SomeException a -> FailureCount
forall u (req :: * -> *) a.
DataSource u req =>
u -> req a -> Either SomeException a -> FailureCount
calcFailure u
userEnv req a
req (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left SomeException
e)
(ThrowIO SomeException
e) -> u -> req a -> Either SomeException a -> FailureCount
forall u (req :: * -> *) a.
DataSource u req =>
u -> req a -> Either SomeException a -> FailureCount
calcFailure u
userEnv req a
req (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left SomeException
e)
this :: FetchStats
this = FetchStats :: Text
-> CallId
-> Int64
-> Int64
-> Int64
-> CallId
-> CallId
-> CallId
-> [CallId]
-> FetchStats
FetchStats { fetchDataSource :: Text
fetchDataSource = Text
dsName
, fetchBatchSize :: CallId
fetchBatchSize = CallId
1
, fetchStart :: Int64
fetchStart = Int64
start
, fetchDuration :: Int64
fetchDuration = Int64
0
, fetchSpace :: Int64
fetchSpace = Int64
0
, fetchFailures :: CallId
fetchFailures = CallId
failureCountStandard
, fetchIgnoredFailures :: CallId
fetchIgnoredFailures = CallId
failureCountIgnored
, fetchBatchId :: CallId
fetchBatchId = CallId
bid
, fetchIds :: [CallId]
fetchIds = [CallId
fid] }
IORef Stats -> (Stats -> (Stats, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Stats
statsRef ((Stats -> (Stats, ())) -> IO ())
-> (Stats -> (Stats, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Stats [FetchStats]
fs) -> ([FetchStats] -> Stats
Stats (FetchStats
this FetchStats -> [FetchStats] -> [FetchStats]
forall a. a -> [a] -> [a]
: [FetchStats]
fs), ())
addFallbackResult
:: Env u w
-> ResultVal a w
-> IVar u w a
-> IO ()
addFallbackResult :: Env u w -> ResultVal a w -> IVar u w a -> IO ()
addFallbackResult Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} ResultVal a w
res IVar u w a
ivar = do
LogicBug -> STM () -> IO ()
forall e a. Exception e => e -> STM a -> IO a
atomicallyOnBlocking
(ReadingCompletionsFailedFetch -> LogicBug
forall e. Exception e => e -> LogicBug
LogicBug (Text -> ReadingCompletionsFailedFetch
ReadingCompletionsFailedFetch Text
"addFallbackResult")) (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
[CompleteReq u w]
cs <- TVar [CompleteReq u w] -> STM [CompleteReq u w]
forall a. TVar a -> STM a
readTVar TVar [CompleteReq u w]
completions
TVar [CompleteReq u w] -> [CompleteReq u w] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [CompleteReq u w]
completions (ResultVal a w -> IVar u w a -> Int64 -> CompleteReq u w
forall u w a.
ResultVal a w -> IVar u w a -> Int64 -> CompleteReq u w
CompleteReq ResultVal a w
res IVar u w a
ivar Int64
0 CompleteReq u w -> [CompleteReq u w] -> [CompleteReq u w]
forall a. a -> [a] -> [a]
: [CompleteReq u w]
cs)
dataFetch :: (DataSource u r, Request r a) => r a -> GenHaxl u w a
dataFetch :: r a -> GenHaxl u w a
dataFetch = (r a -> String)
-> (r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> GenHaxl u w a
forall u w (r :: * -> *) a.
(DataSource u r, Eq (r a), Hashable (r a), Typeable (r a)) =>
(r a -> String)
-> (r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> GenHaxl u w a
dataFetchWithInsert r a -> String
forall a. Show a => a -> String
show r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()
forall (req :: * -> *) a (res :: * -> *).
(Hashable (req a), Typeable (req a), Eq (req a), Show (req a),
Show a) =>
req a -> res a -> DataCache res -> IO ()
DataCache.insert
dataFetchWithShow
:: (DataSource u r, Eq (r a), Hashable (r a), Typeable (r a))
=> ShowReq r a
-> r a -> GenHaxl u w a
dataFetchWithShow :: ShowReq r a -> r a -> GenHaxl u w a
dataFetchWithShow (r a -> String
showReq, a -> String
showRes) = (r a -> String)
-> (r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> GenHaxl u w a
forall u w (r :: * -> *) a.
(DataSource u r, Eq (r a), Hashable (r a), Typeable (r a)) =>
(r a -> String)
-> (r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> GenHaxl u w a
dataFetchWithInsert r a -> String
showReq
((r a -> String)
-> (a -> String)
-> r a
-> DataCacheItem u w a
-> DataCache (DataCacheItem u w)
-> IO ()
forall (req :: * -> *) a (res :: * -> *).
(Hashable (req a), Typeable (req a), Eq (req a)) =>
(req a -> String)
-> (a -> String) -> req a -> res a -> DataCache res -> IO ()
DataCache.insertWithShow r a -> String
showReq a -> String
showRes)
dataFetchWithInsert
:: forall u w r a
. (DataSource u r, Eq (r a), Hashable (r a), Typeable (r a))
=> (r a -> String)
-> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> GenHaxl u w a
dataFetchWithInsert :: (r a -> String)
-> (r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> GenHaxl u w a
dataFetchWithInsert r a -> String
showFn r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()
insertFn r a
req =
(Env u w -> IO (Result u w a)) -> GenHaxl u w a
forall u w a. (Env u w -> IO (Result u w a)) -> GenHaxl u w a
GenHaxl ((Env u w -> IO (Result u w a)) -> GenHaxl u w a)
-> (Env u w -> IO (Result u w a)) -> GenHaxl u w a
forall a b. (a -> b) -> a -> b
$ \env :: Env u w
env@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
DataCache (DataCacheItem u w)
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: DataCache (DataCacheItem u w)
dataCache :: DataCache (DataCacheItem u w)
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} -> do
CacheResult u w a
res <- (r a -> String)
-> (r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> Env u w
-> r a
-> IO (CacheResult u w a)
forall (r :: * -> *) a u w.
(DataSource u r, Typeable (r a)) =>
(r a -> String)
-> (r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> Env u w
-> r a
-> IO (CacheResult u w a)
cachedWithInsert r a -> String
showFn r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()
insertFn Env u w
env r a
req
case CacheResult u w a
res of
Uncached ResultVar a
rvar IVar u w a
ivar CallId
fid -> do
Env u w -> (r a -> String) -> r a -> CallId -> IO ()
forall u w (r :: * -> *) a.
Env u w -> (r a -> String) -> r a -> CallId -> IO ()
logFetch Env u w
env r a -> String
showFn r a
req CallId
fid
Flags -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => Flags -> m a -> m ()
ifProfiling Flags
flags (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Env u w -> r a -> CallId -> Bool -> IO ()
forall (r :: * -> *) u w a.
(DataSourceName r, Eq (r a), Hashable (r a), Typeable (r a)) =>
Env u w -> r a -> CallId -> Bool -> IO ()
addProfileFetch Env u w
env r a
req CallId
fid Bool
False
let
blockedFetch :: BlockedFetch r
blockedFetch = r a -> ResultVar a -> BlockedFetch r
forall (r :: * -> *) a. r a -> ResultVar a -> BlockedFetch r
BlockedFetch r a
req ResultVar a
rvar
blockedFetchI :: BlockedFetchInternal
blockedFetchI = CallId -> BlockedFetchInternal
BlockedFetchInternal CallId
fid
submitFetch :: IO (Result u w a)
submitFetch = do
case u -> SchedulerHint r
forall u (req :: * -> *).
DataSource u req =>
u -> SchedulerHint req
schedulerHint u
userEnv :: SchedulerHint r of
SchedulerHint r
SubmitImmediately ->
Env u w -> [BlockedFetches u] -> IO ()
forall u w. Env u w -> [BlockedFetches u] -> IO ()
performFetches Env u w
env [[BlockedFetch r] -> [BlockedFetchInternal] -> BlockedFetches u
forall u (r :: * -> *).
DataSource u r =>
[BlockedFetch r] -> [BlockedFetchInternal] -> BlockedFetches u
BlockedFetches [BlockedFetch r
blockedFetch] [BlockedFetchInternal
blockedFetchI]]
SchedulerHint r
TryToBatch ->
IORef (RequestStore u)
-> (RequestStore u -> RequestStore u) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (RequestStore u)
reqStoreRef ((RequestStore u -> RequestStore u) -> IO ())
-> (RequestStore u -> RequestStore u) -> IO ()
forall a b. (a -> b) -> a -> b
$ \RequestStore u
bs ->
BlockedFetch r
-> BlockedFetchInternal -> RequestStore u -> RequestStore u
forall u (r :: * -> *).
DataSource u r =>
BlockedFetch r
-> BlockedFetchInternal -> RequestStore u -> RequestStore u
addRequest BlockedFetch r
blockedFetch BlockedFetchInternal
blockedFetchI RequestStore u
bs
Result u w a -> IO (Result u w a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Result u w a -> IO (Result u w a))
-> Result u w a -> IO (Result u w a)
forall a b. (a -> b) -> a -> b
$ IVar u w a -> Cont u w a -> Result u w a
forall u w a b. IVar u w b -> Cont u w a -> Result u w a
Blocked IVar u w a
ivar (IVar u w a -> Cont u w a
forall u w a. IVar u w a -> Cont u w a
Return IVar u w a
ivar)
case Maybe (DataCacheLookup w)
dataCacheFetchFallback of
Maybe (DataCacheLookup w)
Nothing -> IO (Result u w a)
submitFetch
Just (DataCacheLookup forall (req :: * -> *) a.
Typeable (req a) =>
req a -> IO (Maybe (ResultVal a w))
dcl) -> do
Maybe (ResultVal a w)
mbFallbackRes <- r a -> IO (Maybe (ResultVal a w))
forall (req :: * -> *) a.
Typeable (req a) =>
req a -> IO (Maybe (ResultVal a w))
dcl r a
req
case Maybe (ResultVal a w)
mbFallbackRes of
Maybe (ResultVal a w)
Nothing -> IO (Result u w a)
submitFetch
Just ResultVal a w
fallbackRes -> do
Env u w -> ResultVal a w -> IVar u w a -> IO ()
forall u w a. Env u w -> ResultVal a w -> IVar u w a -> IO ()
addFallbackResult Env u w
env ResultVal a w
fallbackRes IVar u w a
ivar
Flags -> ReportFlag -> IO () -> IO ()
forall (m :: * -> *) a.
Monad m =>
Flags -> ReportFlag -> m a -> m ()
ifReport Flags
flags ReportFlag
ReportFetchStats (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Env u w -> CallId -> r a -> ResultVal a w -> IO ()
forall u w (req :: * -> *) a.
DataSource u req =>
Env u w -> CallId -> req a -> ResultVal a w -> IO ()
addFallbackFetchStats
Env u w
env
CallId
fid
r a
req
ResultVal a w
fallbackRes
Result u w a -> IO (Result u w a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Result u w a -> IO (Result u w a))
-> Result u w a -> IO (Result u w a)
forall a b. (a -> b) -> a -> b
$ IVar u w a -> Cont u w a -> Result u w a
forall u w a b. IVar u w b -> Cont u w a -> Result u w a
Blocked IVar u w a
ivar (IVar u w a -> Cont u w a
forall u w a. IVar u w a -> Cont u w a
Return IVar u w a
ivar)
CachedNotFetched IVar u w a
ivar CallId
fid -> do
Flags -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => Flags -> m a -> m ()
ifProfiling Flags
flags (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Env u w -> r a -> CallId -> Bool -> IO ()
forall (r :: * -> *) u w a.
(DataSourceName r, Eq (r a), Hashable (r a), Typeable (r a)) =>
Env u w -> r a -> CallId -> Bool -> IO ()
addProfileFetch Env u w
env r a
req CallId
fid Bool
True
Result u w a -> IO (Result u w a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Result u w a -> IO (Result u w a))
-> Result u w a -> IO (Result u w a)
forall a b. (a -> b) -> a -> b
$ IVar u w a -> Cont u w a -> Result u w a
forall u w a b. IVar u w b -> Cont u w a -> Result u w a
Blocked IVar u w a
ivar (IVar u w a -> Cont u w a
forall u w a. IVar u w a -> Cont u w a
Return IVar u w a
ivar)
Cached ResultVal a w
r CallId
fid -> do
Flags -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => Flags -> m a -> m ()
ifProfiling Flags
flags (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Env u w -> r a -> CallId -> Bool -> IO ()
forall (r :: * -> *) u w a.
(DataSourceName r, Eq (r a), Hashable (r a), Typeable (r a)) =>
Env u w -> r a -> CallId -> Bool -> IO ()
addProfileFetch Env u w
env r a
req CallId
fid Bool
True
Env u w -> ResultVal a w -> IO (Result u w a)
forall u w a. Env u w -> ResultVal a w -> IO (Result u w a)
done Env u w
env ResultVal a w
r
uncachedRequest
:: forall a u w (r :: Type -> Type). (DataSource u r, Request r a)
=> r a -> GenHaxl u w a
uncachedRequest :: r a -> GenHaxl u w a
uncachedRequest r a
req = do
Flags
flg <- (Env u w -> Flags) -> GenHaxl u w Flags
forall u w a. (Env u w -> a) -> GenHaxl u w a
env Env u w -> Flags
forall u w. Env u w -> Flags
flags
if Flags -> CallId
recording Flags
flg CallId -> CallId -> Bool
forall a. Eq a => a -> a -> Bool
/= CallId
0
then r a -> GenHaxl u w a
forall u (r :: * -> *) a w.
(DataSource u r, Request r a) =>
r a -> GenHaxl u w a
dataFetch r a
req
else (Env u w -> IO (Result u w a)) -> GenHaxl u w a
forall u w a. (Env u w -> IO (Result u w a)) -> GenHaxl u w a
GenHaxl ((Env u w -> IO (Result u w a)) -> GenHaxl u w a)
-> (Env u w -> IO (Result u w a)) -> GenHaxl u w a
forall a b. (a -> b) -> a -> b
$ \e :: Env u w
e@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} -> do
IVar u w a
ivar <- IO (IVar u w a)
forall u w a. IO (IVar u w a)
newIVar
CallId
k <- Env u w -> IO CallId
forall u w. Env u w -> IO CallId
nextCallId Env u w
e
let !rvar :: ResultVar a
rvar = Env u w -> IVar u w a -> Proxy r -> ResultVar a
forall (r :: * -> *) a u w.
(DataSourceName r, Typeable r) =>
Env u w -> IVar u w a -> Proxy r -> ResultVar a
stdResultVar Env u w
e IVar u w a
ivar (Proxy r
forall k (t :: k). Proxy t
Proxy :: Proxy r)
IORef (RequestStore u)
-> (RequestStore u -> RequestStore u) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (RequestStore u)
reqStoreRef ((RequestStore u -> RequestStore u) -> IO ())
-> (RequestStore u -> RequestStore u) -> IO ()
forall a b. (a -> b) -> a -> b
$ \RequestStore u
bs ->
BlockedFetch r
-> BlockedFetchInternal -> RequestStore u -> RequestStore u
forall u (r :: * -> *).
DataSource u r =>
BlockedFetch r
-> BlockedFetchInternal -> RequestStore u -> RequestStore u
addRequest (r a -> ResultVar a -> BlockedFetch r
forall (r :: * -> *) a. r a -> ResultVar a -> BlockedFetch r
BlockedFetch r a
req ResultVar a
rvar) (CallId -> BlockedFetchInternal
BlockedFetchInternal CallId
k) RequestStore u
bs
Result u w a -> IO (Result u w a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Result u w a -> IO (Result u w a))
-> Result u w a -> IO (Result u w a)
forall a b. (a -> b) -> a -> b
$ IVar u w a -> Cont u w a -> Result u w a
forall u w a b. IVar u w b -> Cont u w a -> Result u w a
Blocked IVar u w a
ivar (IVar u w a -> Cont u w a
forall u w a. IVar u w a -> Cont u w a
Return IVar u w a
ivar)
cacheResult :: Request r a => r a -> IO a -> GenHaxl u w a
cacheResult :: r a -> IO a -> GenHaxl u w a
cacheResult = (r a -> String)
-> (r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> IO a
-> GenHaxl u w a
forall (r :: * -> *) a u w.
Typeable (r a) =>
(r a -> String)
-> (r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> IO a
-> GenHaxl u w a
cacheResultWithInsert r a -> String
forall a. Show a => a -> String
show r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()
forall (req :: * -> *) a (res :: * -> *).
(Hashable (req a), Typeable (req a), Eq (req a), Show (req a),
Show a) =>
req a -> res a -> DataCache res -> IO ()
DataCache.insert
cacheResultWithShow
:: (Eq (r a), Hashable (r a), Typeable (r a))
=> ShowReq r a -> r a -> IO a -> GenHaxl u w a
cacheResultWithShow :: ShowReq r a -> r a -> IO a -> GenHaxl u w a
cacheResultWithShow (r a -> String
showReq, a -> String
showRes) = (r a -> String)
-> (r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> IO a
-> GenHaxl u w a
forall (r :: * -> *) a u w.
Typeable (r a) =>
(r a -> String)
-> (r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> IO a
-> GenHaxl u w a
cacheResultWithInsert r a -> String
showReq
((r a -> String)
-> (a -> String)
-> r a
-> DataCacheItem u w a
-> DataCache (DataCacheItem u w)
-> IO ()
forall (req :: * -> *) a (res :: * -> *).
(Hashable (req a), Typeable (req a), Eq (req a)) =>
(req a -> String)
-> (a -> String) -> req a -> res a -> DataCache res -> IO ()
DataCache.insertWithShow r a -> String
showReq a -> String
showRes)
cacheResultWithInsert
:: Typeable (r a)
=> (r a -> String)
-> (r a -> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> IO a
-> GenHaxl u w a
cacheResultWithInsert :: (r a -> String)
-> (r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ())
-> r a
-> IO a
-> GenHaxl u w a
cacheResultWithInsert r a -> String
showFn r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()
insertFn r a
req IO a
val = (Env u w -> IO (Result u w a)) -> GenHaxl u w a
forall u w a. (Env u w -> IO (Result u w a)) -> GenHaxl u w a
GenHaxl ((Env u w -> IO (Result u w a)) -> GenHaxl u w a)
-> (Env u w -> IO (Result u w a)) -> GenHaxl u w a
forall a b. (a -> b) -> a -> b
$ \env :: Env u w
env@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
DataCache (DataCacheItem u w)
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: DataCache (DataCacheItem u w)
dataCache :: DataCache (DataCacheItem u w)
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} -> do
Maybe (DataCacheItem u w a)
mbRes <- r a
-> DataCache (DataCacheItem u w)
-> IO (Maybe (DataCacheItem u w a))
forall (req :: * -> *) a (res :: * -> *).
Typeable (req a) =>
req a -> DataCache res -> IO (Maybe (res a))
DataCache.lookup r a
req DataCache (DataCacheItem u w)
dataCache
case Maybe (DataCacheItem u w a)
mbRes of
Maybe (DataCacheItem u w a)
Nothing -> do
let
getResult :: IO (ResultVal a w)
getResult = do
Either SomeException a
eitherResult <- IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
Exception.try IO a
val
case Either SomeException a
eitherResult of
Left SomeException
e -> SomeException -> IO ()
rethrowAsyncExceptions SomeException
e
Either SomeException a
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
ResultVal a w -> IO (ResultVal a w)
forall (m :: * -> *) a. Monad m => a -> m a
return (ResultVal a w -> IO (ResultVal a w))
-> ResultVal a w -> IO (ResultVal a w)
forall a b. (a -> b) -> a -> b
$ Either SomeException a -> ResultVal a w
forall a w. Either SomeException a -> ResultVal a w
eitherToResultThrowIO Either SomeException a
eitherResult
ResultVal a w
result <- case Maybe (DataCacheLookup w)
dataCacheFetchFallback of
Maybe (DataCacheLookup w)
Nothing -> IO (ResultVal a w)
getResult
Just (DataCacheLookup forall (req :: * -> *) a.
Typeable (req a) =>
req a -> IO (Maybe (ResultVal a w))
dcl) -> do
Maybe (ResultVal a w)
mbFallbackRes <- r a -> IO (Maybe (ResultVal a w))
forall (req :: * -> *) a.
Typeable (req a) =>
req a -> IO (Maybe (ResultVal a w))
dcl r a
req
IO (ResultVal a w)
-> (ResultVal a w -> IO (ResultVal a w))
-> Maybe (ResultVal a w)
-> IO (ResultVal a w)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO (ResultVal a w)
getResult ResultVal a w -> IO (ResultVal a w)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (ResultVal a w)
mbFallbackRes
IVar u w a
ivar <- ResultVal a w -> IO (IVar u w a)
forall a w u. ResultVal a w -> IO (IVar u w a)
newFullIVar ResultVal a w
result
CallId
k <- Env u w -> IO CallId
forall u w. Env u w -> IO CallId
nextCallId Env u w
env
r a
-> DataCacheItem u w a -> DataCache (DataCacheItem u w) -> IO ()
insertFn r a
req (IVar u w a -> CallId -> DataCacheItem u w a
forall u w a. IVar u w a -> CallId -> DataCacheItem u w a
DataCacheItem IVar u w a
ivar CallId
k) DataCache (DataCacheItem u w)
dataCache
Env u w -> ResultVal a w -> IO (Result u w a)
forall u w a. Env u w -> ResultVal a w -> IO (Result u w a)
done Env u w
env ResultVal a w
result
Just (DataCacheItem IVar{ivarRef :: forall u w a. IVar u w a -> IORef (IVarContents u w a)
ivarRef = IORef (IVarContents u w a)
cr} CallId
_) -> do
IVarContents u w a
e <- IORef (IVarContents u w a) -> IO (IVarContents u w a)
forall a. IORef a -> IO a
readIORef IORef (IVarContents u w a)
cr
case IVarContents u w a
e of
IVarEmpty JobList u w
_ -> Env u w -> DataSourceError -> IO (Result u w a)
forall e u w a. Exception e => Env u w -> e -> IO (Result u w a)
raise Env u w
env DataSourceError
corruptCache
IVarFull ResultVal a w
r -> Env u w -> ResultVal a w -> IO (Result u w a)
forall u w a. Env u w -> ResultVal a w -> IO (Result u w a)
done Env u w
env ResultVal a w
r
where
corruptCache :: DataSourceError
corruptCache = Text -> DataSourceError
DataSourceError (Text -> DataSourceError) -> Text -> DataSourceError
forall a b. (a -> b) -> a -> b
$ [Text] -> Text
Text.concat
[ String -> Text
Text.pack (r a -> String
showFn r a
req)
, Text
" has a corrupted cache value: these requests are meant to"
, Text
" return immediately without an intermediate value. Either"
, Text
" the cache was updated incorrectly, or you're calling"
, Text
" cacheResult on a query that involves a blocking fetch."
]
cacheRequest
:: Request req a => req a -> Either SomeException a -> GenHaxl u w ()
cacheRequest :: req a -> Either SomeException a -> GenHaxl u w ()
cacheRequest req a
request Either SomeException a
result = (Env u w -> IO (Result u w ())) -> GenHaxl u w ()
forall u w a. (Env u w -> IO (Result u w a)) -> GenHaxl u w a
GenHaxl ((Env u w -> IO (Result u w ())) -> GenHaxl u w ())
-> (Env u w -> IO (Result u w ())) -> GenHaxl u w ()
forall a b. (a -> b) -> a -> b
$ \e :: Env u w
e@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} -> do
Maybe (DataCacheItem u w a)
mbRes <- req a -> HaxlDataCache u w -> IO (Maybe (DataCacheItem u w a))
forall (req :: * -> *) a (res :: * -> *).
Typeable (req a) =>
req a -> DataCache res -> IO (Maybe (res a))
DataCache.lookup req a
request HaxlDataCache u w
dataCache
case Maybe (DataCacheItem u w a)
mbRes of
Maybe (DataCacheItem u w a)
Nothing -> do
IVar u w a
cr <- ResultVal a w -> IO (IVar u w a)
forall a w u. ResultVal a w -> IO (IVar u w a)
newFullIVar (Either SomeException a -> ResultVal a w
forall a w. Either SomeException a -> ResultVal a w
eitherToResult Either SomeException a
result)
CallId
k <- Env u w -> IO CallId
forall u w. Env u w -> IO CallId
nextCallId Env u w
e
req a -> DataCacheItem u w a -> HaxlDataCache u w -> IO ()
forall (req :: * -> *) a (res :: * -> *).
(Hashable (req a), Typeable (req a), Eq (req a), Show (req a),
Show a) =>
req a -> res a -> DataCache res -> IO ()
DataCache.insert req a
request (IVar u w a -> CallId -> DataCacheItem u w a
forall u w a. IVar u w a -> CallId -> DataCacheItem u w a
DataCacheItem IVar u w a
cr CallId
k) HaxlDataCache u w
dataCache
Result u w () -> IO (Result u w ())
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> Result u w ()
forall u w a. a -> Result u w a
Done ())
Maybe (DataCacheItem u w a)
_other -> Env u w -> DataSourceError -> IO (Result u w ())
forall e u w a. Exception e => Env u w -> e -> IO (Result u w a)
raise Env u w
e (DataSourceError -> IO (Result u w ()))
-> DataSourceError -> IO (Result u w ())
forall a b. (a -> b) -> a -> b
$
Text -> DataSourceError
DataSourceError Text
"cacheRequest: request is already in the cache"
dupableCacheRequest
:: Request req a => req a -> Either SomeException a -> GenHaxl u w ()
dupableCacheRequest :: req a -> Either SomeException a -> GenHaxl u w ()
dupableCacheRequest req a
request Either SomeException a
result = (Env u w -> IO (Result u w ())) -> GenHaxl u w ()
forall u w a. (Env u w -> IO (Result u w a)) -> GenHaxl u w a
GenHaxl ((Env u w -> IO (Result u w ())) -> GenHaxl u w ())
-> (Env u w -> IO (Result u w ())) -> GenHaxl u w ()
forall a b. (a -> b) -> a -> b
$ \e :: Env u w
e@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} -> do
IVar u w a
cr <- ResultVal a w -> IO (IVar u w a)
forall a w u. ResultVal a w -> IO (IVar u w a)
newFullIVar (Either SomeException a -> ResultVal a w
forall a w. Either SomeException a -> ResultVal a w
eitherToResult Either SomeException a
result)
CallId
k <- Env u w -> IO CallId
forall u w. Env u w -> IO CallId
nextCallId Env u w
e
req a -> DataCacheItem u w a -> HaxlDataCache u w -> IO ()
forall (req :: * -> *) a (res :: * -> *).
(Hashable (req a), Typeable (req a), Eq (req a), Show (req a),
Show a) =>
req a -> res a -> DataCache res -> IO ()
DataCache.insert req a
request (IVar u w a -> CallId -> DataCacheItem u w a
forall u w a. IVar u w a -> CallId -> DataCacheItem u w a
DataCacheItem IVar u w a
cr CallId
k) HaxlDataCache u w
dataCache
Result u w () -> IO (Result u w ())
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> Result u w ()
forall u w a. a -> Result u w a
Done ())
performRequestStore
:: forall u w. Env u w -> RequestStore u -> IO ()
performRequestStore :: Env u w -> RequestStore u -> IO ()
performRequestStore Env u w
env RequestStore u
reqStore =
Env u w -> [BlockedFetches u] -> IO ()
forall u w. Env u w -> [BlockedFetches u] -> IO ()
performFetches Env u w
env (RequestStore u -> [BlockedFetches u]
forall u. RequestStore u -> [BlockedFetches u]
contents RequestStore u
reqStore)
performFetches
:: forall u w. Env u w -> [BlockedFetches u] -> IO ()
performFetches :: Env u w -> [BlockedFetches u] -> IO ()
performFetches env :: Env u w
env@Env{flags :: forall u w. Env u w -> Flags
flags=Flags
f, statsRef :: forall u w. Env u w -> IORef Stats
statsRef=IORef Stats
sref, statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef=IORef CallId
sbref} [BlockedFetches u]
jobs = do
Int64
t0 <- IO Int64
getTimestamp
Flags -> CallId -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => Flags -> CallId -> m a -> m ()
ifTrace Flags
f CallId
3 (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
[BlockedFetches u] -> (BlockedFetches u -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [BlockedFetches u]
jobs ((BlockedFetches u -> IO ()) -> IO ())
-> (BlockedFetches u -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(BlockedFetches [BlockedFetch r]
reqs [BlockedFetchInternal]
_) ->
[BlockedFetch r] -> (BlockedFetch r -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [BlockedFetch r]
reqs ((BlockedFetch r -> IO ()) -> IO ())
-> (BlockedFetch r -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(BlockedFetch r a
r ResultVar a
_) -> String -> IO ()
putStrLn (r a -> String
forall (f :: * -> *) a. ShowP f => f a -> String
showp r a
r)
let
applyFetch :: CallId -> BlockedFetches u -> IO FetchToDo
applyFetch CallId
i bfs :: BlockedFetches u
bfs@(BlockedFetches ([BlockedFetch r]
reqs :: [BlockedFetch r]) [BlockedFetchInternal]
_) =
case StateStore -> Maybe (State r)
forall (r :: * -> *). StateKey r => StateStore -> Maybe (State r)
stateGet (Env u w -> StateStore
forall u w. Env u w -> StateStore
states Env u w
env) of
Maybe (State r)
Nothing ->
FetchToDo -> IO FetchToDo
forall (m :: * -> *) a. Monad m => a -> m a
return ([BlockedFetch r] -> PerformFetch r -> FetchToDo
forall (req :: * -> *).
(DataSourceName req, Typeable req) =>
[BlockedFetch req] -> PerformFetch req -> FetchToDo
FetchToDo [BlockedFetch r]
reqs (([BlockedFetch r] -> IO ()) -> PerformFetch r
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
SyncFetch ((BlockedFetch r -> IO ()) -> [BlockedFetch r] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((forall a. r a -> DataSourceError) -> BlockedFetch r -> IO ()
forall e (r :: * -> *).
Exception e =>
(forall a. r a -> e) -> BlockedFetch r -> IO ()
setError forall a. r a -> DataSourceError
forall (req :: * -> *) a. ShowP req => req a -> DataSourceError
e))))
where
e :: ShowP req => req a -> DataSourceError
e :: req a -> DataSourceError
e req a
req = Text -> DataSourceError
DataSourceError (Text -> DataSourceError) -> Text -> DataSourceError
forall a b. (a -> b) -> a -> b
$ Text
"data source not initialized: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
dsName
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
": "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
Text.pack (req a -> String
forall (f :: * -> *) a. ShowP f => f a -> String
showp req a
req)
Just State r
state ->
FetchToDo -> IO FetchToDo
forall (m :: * -> *) a. Monad m => a -> m a
return (FetchToDo -> IO FetchToDo) -> FetchToDo -> IO FetchToDo
forall a b. (a -> b) -> a -> b
$ [BlockedFetch r] -> PerformFetch r -> FetchToDo
forall (req :: * -> *).
(DataSourceName req, Typeable req) =>
[BlockedFetch req] -> PerformFetch req -> FetchToDo
FetchToDo [BlockedFetch r]
reqs
(PerformFetch r -> FetchToDo) -> PerformFetch r -> FetchToDo
forall a b. (a -> b) -> a -> b
$ (if ReportFlag -> ReportFlags -> Bool
testReportFlag ReportFlag
ReportFetchStats (ReportFlags -> Bool) -> ReportFlags -> Bool
forall a b. (a -> b) -> a -> b
$ Flags -> ReportFlags
report Flags
f
then u
-> IORef Stats
-> IORef CallId
-> Text
-> CallId
-> BlockedFetches u
-> PerformFetch r
-> PerformFetch r
forall u (req :: * -> *).
DataSource u req =>
u
-> IORef Stats
-> IORef CallId
-> Text
-> CallId
-> BlockedFetches u
-> PerformFetch req
-> PerformFetch req
wrapFetchInStats
(Env u w -> u
forall u w. Env u w -> u
userEnv Env u w
env)
IORef Stats
sref
IORef CallId
sbref
Text
dsName
([BlockedFetch r] -> CallId
forall (t :: * -> *) a. Foldable t => t a -> CallId
length [BlockedFetch r]
reqs)
BlockedFetches u
bfs
else PerformFetch r -> PerformFetch r
forall a. a -> a
id)
(PerformFetch r -> PerformFetch r)
-> PerformFetch r -> PerformFetch r
forall a b. (a -> b) -> a -> b
$ CallId -> CallId -> Text -> PerformFetch r -> PerformFetch r
forall (req :: * -> *).
CallId -> CallId -> Text -> PerformFetch req -> PerformFetch req
wrapFetchInTrace CallId
i ([BlockedFetch r] -> CallId
forall (t :: * -> *) a. Foldable t => t a -> CallId
length [BlockedFetch r]
reqs) Text
dsName
(PerformFetch r -> PerformFetch r)
-> PerformFetch r -> PerformFetch r
forall a b. (a -> b) -> a -> b
$ [BlockedFetch r] -> PerformFetch r -> PerformFetch r
forall (req :: * -> *).
[BlockedFetch req] -> PerformFetch req -> PerformFetch req
wrapFetchInCatch [BlockedFetch r]
reqs
(PerformFetch r -> PerformFetch r)
-> PerformFetch r -> PerformFetch r
forall a b. (a -> b) -> a -> b
$ State r -> Flags -> u -> PerformFetch r
forall u (req :: * -> *).
DataSource u req =>
State req -> Flags -> u -> PerformFetch req
fetch State r
state Flags
f (Env u w -> u
forall u w. Env u w -> u
userEnv Env u w
env)
where
dsName :: Text
dsName = Proxy r -> Text
forall (req :: * -> *). DataSourceName req => Proxy req -> Text
dataSourceName (Proxy r
forall k (t :: k). Proxy t
Proxy :: Proxy r)
[FetchToDo]
fetches <- (CallId -> BlockedFetches u -> IO FetchToDo)
-> [CallId] -> [BlockedFetches u] -> IO [FetchToDo]
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m [c]
zipWithM CallId -> BlockedFetches u -> IO FetchToDo
applyFetch [CallId
0..] [BlockedFetches u]
jobs
[FetchToDo] -> IORef ReqCountMap -> Flags -> IO ()
scheduleFetches [FetchToDo]
fetches (Env u w -> IORef ReqCountMap
forall u w. Env u w -> IORef ReqCountMap
submittedReqsRef Env u w
env) (Env u w -> Flags
forall u w. Env u w -> Flags
flags Env u w
env)
Int64
t1 <- IO Int64
getTimestamp
let roundtime :: Double
roundtime = Int64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64
t1 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
t0) Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
1000000 :: Double
Flags -> CallId -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => Flags -> CallId -> m a -> m ()
ifTrace Flags
f CallId
1 (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
String -> Double -> IO ()
forall r. PrintfType r => String -> r
printf String
"Batch data fetch done (%.4fs)\n" (Double -> Double
forall a b. (Real a, Fractional b) => a -> b
realToFrac Double
roundtime :: Double)
data FetchToDo where
FetchToDo
:: forall (req :: Type -> Type). (DataSourceName req, Typeable req)
=> [BlockedFetch req] -> PerformFetch req -> FetchToDo
wrapFetchInCatch :: [BlockedFetch req] -> PerformFetch req -> PerformFetch req
wrapFetchInCatch :: [BlockedFetch req] -> PerformFetch req -> PerformFetch req
wrapFetchInCatch [BlockedFetch req]
reqs PerformFetch req
fetch =
case PerformFetch req
fetch of
SyncFetch [BlockedFetch req] -> IO ()
f ->
([BlockedFetch req] -> IO ()) -> PerformFetch req
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
SyncFetch (([BlockedFetch req] -> IO ()) -> PerformFetch req)
-> ([BlockedFetch req] -> IO ()) -> PerformFetch req
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch req]
reqs -> [BlockedFetch req] -> IO ()
f [BlockedFetch req]
reqs IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Exception.catch` SomeException -> IO ()
handler
AsyncFetch [BlockedFetch req] -> IO () -> IO ()
f ->
([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
forall (req :: * -> *).
([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
AsyncFetch (([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req)
-> ([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch req]
reqs IO ()
io -> [BlockedFetch req] -> IO () -> IO ()
f [BlockedFetch req]
reqs IO ()
io IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Exception.catch` SomeException -> IO ()
handler
BackgroundFetch [BlockedFetch req] -> IO ()
f ->
([BlockedFetch req] -> IO ()) -> PerformFetch req
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
BackgroundFetch (([BlockedFetch req] -> IO ()) -> PerformFetch req)
-> ([BlockedFetch req] -> IO ()) -> PerformFetch req
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch req]
reqs -> [BlockedFetch req] -> IO ()
f [BlockedFetch req]
reqs IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Exception.catch` SomeException -> IO ()
handler
where
handler :: SomeException -> IO ()
handler :: SomeException -> IO ()
handler SomeException
e = do
SomeException -> IO ()
rethrowAsyncExceptions SomeException
e
(BlockedFetch req -> IO ()) -> [BlockedFetch req] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (SomeException -> BlockedFetch req -> IO ()
forall p (r :: * -> *). Exception p => p -> BlockedFetch r -> IO ()
forceError SomeException
e) [BlockedFetch req]
reqs
forceError :: p -> BlockedFetch r -> IO ()
forceError p
e (BlockedFetch r a
_ ResultVar a
rvar) =
ResultVar a -> Either SomeException a -> IO ()
forall a. ResultVar a -> Either SomeException a -> IO ()
putResult ResultVar a
rvar (p -> Either SomeException a
forall e a. Exception e => e -> Either SomeException a
except p
e)
data FailureCount = FailureCount
{ FailureCount -> CallId
failureCountStandard :: {-# UNPACK #-} !Int
, FailureCount -> CallId
failureCountIgnored :: {-# UNPACK #-} !Int
}
#if __GLASGOW_HASKELL__ >= 804
instance Semigroup FailureCount where
<> :: FailureCount -> FailureCount -> FailureCount
(<>) = FailureCount -> FailureCount -> FailureCount
forall a. Monoid a => a -> a -> a
mappend
#endif
instance Monoid FailureCount where
mempty :: FailureCount
mempty = CallId -> CallId -> FailureCount
FailureCount CallId
0 CallId
0
mappend :: FailureCount -> FailureCount -> FailureCount
mappend (FailureCount CallId
s1 CallId
i1) (FailureCount CallId
s2 CallId
i2)
= CallId -> CallId -> FailureCount
FailureCount (CallId
s1CallId -> CallId -> CallId
forall a. Num a => a -> a -> a
+CallId
s2) (CallId
i1CallId -> CallId -> CallId
forall a. Num a => a -> a -> a
+CallId
i2)
wrapFetchInStats
:: DataSource u req
=> u
-> IORef Stats
-> IORef Int
-> Text
-> Int
-> BlockedFetches u
-> PerformFetch req
-> PerformFetch req
wrapFetchInStats :: u
-> IORef Stats
-> IORef CallId
-> Text
-> CallId
-> BlockedFetches u
-> PerformFetch req
-> PerformFetch req
wrapFetchInStats
u
u
!IORef Stats
statsRef
!IORef CallId
batchIdRef
Text
dataSource
CallId
batchSize
(BlockedFetches [BlockedFetch r]
_reqs [BlockedFetchInternal]
reqsI)
PerformFetch req
perform = do
case PerformFetch req
perform of
SyncFetch [BlockedFetch req] -> IO ()
f ->
([BlockedFetch req] -> IO ()) -> PerformFetch req
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
SyncFetch (([BlockedFetch req] -> IO ()) -> PerformFetch req)
-> ([BlockedFetch req] -> IO ()) -> PerformFetch req
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch req]
reqs -> do
CallId
bid <- IO CallId
newBatchId
IORef FailureCount
fail_ref <- FailureCount -> IO (IORef FailureCount)
forall a. a -> IO (IORef a)
newIORef FailureCount
forall a. Monoid a => a
mempty
(Int64
t0,Int64
t,Int64
alloc,()
_) <- IO () -> IO (Int64, Int64, Int64, ())
forall c d. Num c => IO d -> IO (Int64, Int64, c, d)
statsForIO ([BlockedFetch req] -> IO ()
f ((BlockedFetch req -> BlockedFetch req)
-> [BlockedFetch req] -> [BlockedFetch req]
forall a b. (a -> b) -> [a] -> [b]
map (u -> IORef FailureCount -> BlockedFetch req -> BlockedFetch req
forall u (r :: * -> *).
DataSource u r =>
u -> IORef FailureCount -> BlockedFetch r -> BlockedFetch r
addFailureCount u
u IORef FailureCount
fail_ref)
(CallId -> [BlockedFetch req] -> [BlockedFetch req]
reqsWithFetchDsStats CallId
bid [BlockedFetch req]
reqs)))
FailureCount
failures <- IORef FailureCount -> IO FailureCount
forall a. IORef a -> IO a
readIORef IORef FailureCount
fail_ref
CallId
-> [CallId]
-> Int64
-> Int64
-> Int64
-> CallId
-> FailureCount
-> IO ()
updateFetchStats CallId
bid [CallId]
allFids Int64
t0 Int64
t Int64
alloc CallId
batchSize FailureCount
failures
AsyncFetch [BlockedFetch req] -> IO () -> IO ()
f -> do
([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
forall (req :: * -> *).
([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
AsyncFetch (([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req)
-> ([BlockedFetch req] -> IO () -> IO ()) -> PerformFetch req
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch req]
reqs IO ()
inner -> do
CallId
bid <- IO CallId
newBatchId
IORef (Int64, Int64)
inner_r <- (Int64, Int64) -> IO (IORef (Int64, Int64))
forall a. a -> IO (IORef a)
newIORef (Int64
0, Int64
0)
IORef FailureCount
fail_ref <- FailureCount -> IO (IORef FailureCount)
forall a. a -> IO (IORef a)
newIORef FailureCount
forall a. Monoid a => a
mempty
let inner' :: IO ()
inner' = do
(Int64
_,Int64
t,Int64
alloc,()
_) <- IO () -> IO (Int64, Int64, Int64, ())
forall c d. Num c => IO d -> IO (Int64, Int64, c, d)
statsForIO IO ()
inner
IORef (Int64, Int64) -> (Int64, Int64) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Int64, Int64)
inner_r (Int64
t,Int64
alloc)
reqs' :: [BlockedFetch req]
reqs' = (BlockedFetch req -> BlockedFetch req)
-> [BlockedFetch req] -> [BlockedFetch req]
forall a b. (a -> b) -> [a] -> [b]
map (u -> IORef FailureCount -> BlockedFetch req -> BlockedFetch req
forall u (r :: * -> *).
DataSource u r =>
u -> IORef FailureCount -> BlockedFetch r -> BlockedFetch r
addFailureCount u
u IORef FailureCount
fail_ref) [BlockedFetch req]
reqs
reqs'' :: [BlockedFetch req]
reqs'' = CallId -> [BlockedFetch req] -> [BlockedFetch req]
reqsWithFetchDsStats CallId
bid [BlockedFetch req]
reqs'
(Int64
t0, Int64
totalTime, Int64
totalAlloc, ()
_) <- IO () -> IO (Int64, Int64, Int64, ())
forall c d. Num c => IO d -> IO (Int64, Int64, c, d)
statsForIO ([BlockedFetch req] -> IO () -> IO ()
f [BlockedFetch req]
reqs'' IO ()
inner')
(Int64
innerTime, Int64
innerAlloc) <- IORef (Int64, Int64) -> IO (Int64, Int64)
forall a. IORef a -> IO a
readIORef IORef (Int64, Int64)
inner_r
FailureCount
failures <- IORef FailureCount -> IO FailureCount
forall a. IORef a -> IO a
readIORef IORef FailureCount
fail_ref
CallId
-> [CallId]
-> Int64
-> Int64
-> Int64
-> CallId
-> FailureCount
-> IO ()
updateFetchStats CallId
bid [CallId]
allFids Int64
t0 (Int64
totalTime Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
innerTime)
(Int64
totalAlloc Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
innerAlloc) CallId
batchSize FailureCount
failures
BackgroundFetch [BlockedFetch req] -> IO ()
io -> do
([BlockedFetch req] -> IO ()) -> PerformFetch req
forall (req :: * -> *).
([BlockedFetch req] -> IO ()) -> PerformFetch req
BackgroundFetch (([BlockedFetch req] -> IO ()) -> PerformFetch req)
-> ([BlockedFetch req] -> IO ()) -> PerformFetch req
forall a b. (a -> b) -> a -> b
$ \[BlockedFetch req]
reqs -> do
CallId
bid <- IO CallId
newBatchId
Int64
startTime <- IO Int64
getTimestamp
[BlockedFetch req] -> IO ()
io (CallId -> [BlockedFetch req] -> [BlockedFetch req]
reqsWithFetchDsStats CallId
bid
((BlockedFetch req -> BlockedFetchInternal -> BlockedFetch req)
-> [BlockedFetch req]
-> [BlockedFetchInternal]
-> [BlockedFetch req]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith (u
-> CallId
-> Int64
-> BlockedFetch req
-> BlockedFetchInternal
-> BlockedFetch req
forall p (r :: * -> *).
DataSource p r =>
p
-> CallId
-> Int64
-> BlockedFetch r
-> BlockedFetchInternal
-> BlockedFetch r
addTimer u
u CallId
bid Int64
startTime) [BlockedFetch req]
reqs [BlockedFetchInternal]
reqsI))
where
allFids :: [CallId]
allFids = (BlockedFetchInternal -> CallId)
-> [BlockedFetchInternal] -> [CallId]
forall a b. (a -> b) -> [a] -> [b]
map (\(BlockedFetchInternal CallId
k) -> CallId
k) [BlockedFetchInternal]
reqsI
newBatchId :: IO CallId
newBatchId = IORef CallId -> (CallId -> (CallId, CallId)) -> IO CallId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef CallId
batchIdRef ((CallId -> (CallId, CallId)) -> IO CallId)
-> (CallId -> (CallId, CallId)) -> IO CallId
forall a b. (a -> b) -> a -> b
$ \CallId
x -> (CallId
xCallId -> CallId -> CallId
forall a. Num a => a -> a -> a
+CallId
1,CallId
xCallId -> CallId -> CallId
forall a. Num a => a -> a -> a
+CallId
1)
statsForIO :: IO d -> IO (Int64, Int64, c, d)
statsForIO IO d
io = do
Int64
prevAlloc <- IO Int64
getAllocationCounter
(Int64
t0,Int64
t,d
a) <- IO d -> IO (Int64, Int64, d)
forall a. IO a -> IO (Int64, Int64, a)
time IO d
io
Int64
postAlloc <- IO Int64
getAllocationCounter
(Int64, Int64, c, d) -> IO (Int64, Int64, c, d)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int64
t0,Int64
t, Int64 -> c
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int64 -> c) -> Int64 -> c
forall a b. (a -> b) -> a -> b
$ Int64
prevAlloc Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
postAlloc, d
a)
reqsWithFetchDsStats :: CallId -> [BlockedFetch req] -> [BlockedFetch req]
reqsWithFetchDsStats = \CallId
bid [BlockedFetch req]
reqs
-> (BlockedFetch req -> BlockedFetchInternal -> BlockedFetch req)
-> [BlockedFetch req]
-> [BlockedFetchInternal]
-> [BlockedFetch req]
forall a b c. (a -> b -> c) -> [a] -> [b] -> [c]
zipWith (CallId
-> BlockedFetch req -> BlockedFetchInternal -> BlockedFetch req
forall (r :: * -> *).
CallId -> BlockedFetch r -> BlockedFetchInternal -> BlockedFetch r
addFetchDatasourceStats CallId
bid) [BlockedFetch req]
reqs [BlockedFetchInternal]
reqsI
addTimer :: p
-> CallId
-> Int64
-> BlockedFetch r
-> BlockedFetchInternal
-> BlockedFetch r
addTimer
p
u
CallId
bid
Int64
t0
(BlockedFetch r a
req (ResultVar Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
fn))
(BlockedFetchInternal fid) =
r a -> ResultVar a -> BlockedFetch r
forall (r :: * -> *) a. r a -> ResultVar a -> BlockedFetch r
BlockedFetch r a
req (ResultVar a -> BlockedFetch r) -> ResultVar a -> BlockedFetch r
forall a b. (a -> b) -> a -> b
$ (Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a.
(Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
ResultVar ((Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a)
-> (Either SomeException a
-> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a b. (a -> b) -> a -> b
$ \Either SomeException a
result Bool
isChildThread Maybe DataSourceStats
stats -> do
Int64
t1 <- IO Int64
getTimestamp
Int64
allocs <- if Bool
isChildThread then IO Int64
getAllocationCounter else Int64 -> IO Int64
forall (m :: * -> *) a. Monad m => a -> m a
return Int64
0
CallId
-> [CallId]
-> Int64
-> Int64
-> Int64
-> CallId
-> FailureCount
-> IO ()
updateFetchStats CallId
bid [CallId
fid] Int64
t0 (Int64
t1 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
t0)
(Int64 -> Int64
forall a. Num a => a -> a
negate Int64
allocs)
CallId
1
(p -> r a -> Either SomeException a -> FailureCount
forall u (req :: * -> *) a.
DataSource u req =>
u -> req a -> Either SomeException a -> FailureCount
calcFailure p
u r a
req Either SomeException a
result)
Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
fn Either SomeException a
result Bool
isChildThread Maybe DataSourceStats
stats
addFetchDatasourceStats
:: Int
-> BlockedFetch r
-> BlockedFetchInternal
-> BlockedFetch r
addFetchDatasourceStats :: CallId -> BlockedFetch r -> BlockedFetchInternal -> BlockedFetch r
addFetchDatasourceStats CallId
bid
(BlockedFetch r a
req (ResultVar Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
fn))
(BlockedFetchInternal CallId
fid) = r a -> ResultVar a -> BlockedFetch r
forall (r :: * -> *) a. r a -> ResultVar a -> BlockedFetch r
BlockedFetch r a
req (ResultVar a -> BlockedFetch r) -> ResultVar a -> BlockedFetch r
forall a b. (a -> b) -> a -> b
$ (Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a.
(Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
ResultVar
((Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a)
-> (Either SomeException a
-> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a b. (a -> b) -> a -> b
$ \Either SomeException a
result Bool
isChildThread Maybe DataSourceStats
stats -> do
let mkStats :: DataSourceStats -> FetchStats
mkStats DataSourceStats
dss = FetchDataSourceStats :: CallId -> Text -> DataSourceStats -> CallId -> FetchStats
FetchDataSourceStats
{ fetchDsStatsCallId :: CallId
fetchDsStatsCallId = CallId
fid
, fetchDsStatsDataSource :: Text
fetchDsStatsDataSource = Text
dataSource
, fetchDsStatsStats :: DataSourceStats
fetchDsStatsStats = DataSourceStats
dss
, fetchBatchId :: CallId
fetchBatchId = CallId
bid
}
case Maybe DataSourceStats
stats of
Just DataSourceStats
dss -> IORef Stats -> (Stats -> (Stats, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Stats
statsRef
((Stats -> (Stats, ())) -> IO ())
-> (Stats -> (Stats, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Stats [FetchStats]
fs) -> ([FetchStats] -> Stats
Stats (DataSourceStats -> FetchStats
mkStats DataSourceStats
dss FetchStats -> [FetchStats] -> [FetchStats]
forall a. a -> [a] -> [a]
: [FetchStats]
fs), ())
Maybe DataSourceStats
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
fn Either SomeException a
result Bool
isChildThread Maybe DataSourceStats
stats
updateFetchStats
:: Int
-> [CallId]
-> Timestamp
-> Microseconds
-> Int64
-> Int
-> FailureCount
-> IO ()
updateFetchStats :: CallId
-> [CallId]
-> Int64
-> Int64
-> Int64
-> CallId
-> FailureCount
-> IO ()
updateFetchStats CallId
bid [CallId]
fids Int64
start Int64
time Int64
space CallId
batch FailureCount{CallId
failureCountIgnored :: CallId
failureCountStandard :: CallId
failureCountIgnored :: FailureCount -> CallId
failureCountStandard :: FailureCount -> CallId
..} = do
let this :: FetchStats
this = FetchStats :: Text
-> CallId
-> Int64
-> Int64
-> Int64
-> CallId
-> CallId
-> CallId
-> [CallId]
-> FetchStats
FetchStats { fetchDataSource :: Text
fetchDataSource = Text
dataSource
, fetchBatchSize :: CallId
fetchBatchSize = CallId
batch
, fetchStart :: Int64
fetchStart = Int64
start
, fetchDuration :: Int64
fetchDuration = Int64
time
, fetchSpace :: Int64
fetchSpace = Int64
space
, fetchFailures :: CallId
fetchFailures = CallId
failureCountStandard
, fetchIgnoredFailures :: CallId
fetchIgnoredFailures = CallId
failureCountIgnored
, fetchBatchId :: CallId
fetchBatchId = CallId
bid
, fetchIds :: [CallId]
fetchIds = [CallId]
fids }
IORef Stats -> (Stats -> (Stats, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Stats
statsRef ((Stats -> (Stats, ())) -> IO ())
-> (Stats -> (Stats, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Stats [FetchStats]
fs) -> ([FetchStats] -> Stats
Stats (FetchStats
this FetchStats -> [FetchStats] -> [FetchStats]
forall a. a -> [a] -> [a]
: [FetchStats]
fs), ())
addFailureCount :: DataSource u r
=> u -> IORef FailureCount -> BlockedFetch r -> BlockedFetch r
addFailureCount :: u -> IORef FailureCount -> BlockedFetch r -> BlockedFetch r
addFailureCount u
u IORef FailureCount
ref (BlockedFetch r a
req (ResultVar Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
fn)) =
r a -> ResultVar a -> BlockedFetch r
forall (r :: * -> *) a. r a -> ResultVar a -> BlockedFetch r
BlockedFetch r a
req (ResultVar a -> BlockedFetch r) -> ResultVar a -> BlockedFetch r
forall a b. (a -> b) -> a -> b
$ (Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a.
(Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
ResultVar ((Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a)
-> (Either SomeException a
-> Bool -> Maybe DataSourceStats -> IO ())
-> ResultVar a
forall a b. (a -> b) -> a -> b
$ \Either SomeException a
result Bool
isChildThread Maybe DataSourceStats
stats -> do
let addFailures :: FailureCount -> (FailureCount, ())
addFailures FailureCount
r = (FailureCount
r FailureCount -> FailureCount -> FailureCount
forall a. Semigroup a => a -> a -> a
<> u -> r a -> Either SomeException a -> FailureCount
forall u (req :: * -> *) a.
DataSource u req =>
u -> req a -> Either SomeException a -> FailureCount
calcFailure u
u r a
req Either SomeException a
result, ())
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Either SomeException a -> Bool
forall a b. Either a b -> Bool
isLeft Either SomeException a
result) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef FailureCount -> (FailureCount -> (FailureCount, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef FailureCount
ref FailureCount -> (FailureCount, ())
addFailures
Either SomeException a -> Bool -> Maybe DataSourceStats -> IO ()
fn Either SomeException a
result Bool
isChildThread Maybe DataSourceStats
stats
wrapFetchInTrace
:: Int
-> Int
-> Text
-> PerformFetch req
-> PerformFetch req
#ifdef EVENTLOG
wrapFetchInTrace i n dsName f =
case f of
SyncFetch io -> SyncFetch (wrapF "Sync" io)
AsyncFetch fio -> AsyncFetch (wrapF "Async" . fio . unwrapF "Async")
where
d = Text.unpack dsName
wrapF :: String -> IO a -> IO a
wrapF ty = bracket_ (traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
(traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
unwrapF :: String -> IO a -> IO a
unwrapF ty = bracket_ (traceEventIO $ printf "STOP %d %s (%d %s)" i d n ty)
(traceEventIO $ printf "START %d %s (%d %s)" i d n ty)
#else
wrapFetchInTrace :: CallId -> CallId -> Text -> PerformFetch req -> PerformFetch req
wrapFetchInTrace CallId
_ CallId
_ Text
_ PerformFetch req
f = PerformFetch req
f
#endif
time :: IO a -> IO (Timestamp,Microseconds,a)
time :: IO a -> IO (Int64, Int64, a)
time IO a
io = do
Int64
t0 <- IO Int64
getTimestamp
a
a <- IO a
io
Int64
t1 <- IO Int64
getTimestamp
(Int64, Int64, a) -> IO (Int64, Int64, a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int64
t0, Int64
t1 Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
t0, a
a)
scheduleFetches :: [FetchToDo] -> IORef ReqCountMap -> Flags -> IO ()
scheduleFetches :: [FetchToDo] -> IORef ReqCountMap -> Flags -> IO ()
scheduleFetches [FetchToDo]
fetches IORef ReqCountMap
ref Flags
flags = do
Flags -> ReportFlag -> IO () -> IO ()
forall (m :: * -> *) a.
Monad m =>
Flags -> ReportFlag -> m a -> m ()
ifReport Flags
flags ReportFlag
ReportOutgoneFetches (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_
[ IORef ReqCountMap -> (ReqCountMap -> (ReqCountMap, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef ReqCountMap
ref ((ReqCountMap -> (ReqCountMap, ())) -> IO ())
-> (ReqCountMap -> (ReqCountMap, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$
\ReqCountMap
m -> (Proxy req -> CallId -> ReqCountMap -> ReqCountMap
forall (r :: * -> *).
(DataSourceName r, Typeable r) =>
Proxy r -> CallId -> ReqCountMap -> ReqCountMap
addToCountMap (Proxy req
forall k (t :: k). Proxy t
Proxy :: Proxy r) ([BlockedFetch req] -> CallId
forall (t :: * -> *) a. Foldable t => t a -> CallId
length [BlockedFetch req]
reqs) ReqCountMap
m, ())
| FetchToDo ([BlockedFetch req]
reqs :: [BlockedFetch r]) PerformFetch req
_f <- [FetchToDo]
fetches
]
Flags -> CallId -> IO () -> IO ()
forall (m :: * -> *) a. Monad m => Flags -> CallId -> m a -> m ()
ifTrace Flags
flags CallId
1 (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
forall r. PrintfType r => String -> r
printf String
"Batch data fetch round: %s\n" (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate (String
", "::String) ([String] -> String) -> [String] -> String
forall a b. (a -> b) -> a -> b
$
((CallId, String, String) -> String)
-> [(CallId, String, String)] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (\(CallId
c, String
n, String
ds) -> String -> String -> String -> CallId -> String
forall r. PrintfType r => String -> r
printf String
"%s %s %d" String
n String
ds CallId
c) [(CallId, String, String)]
stats
IO ()
fully_async_fetches
IO () -> IO ()
async_fetches IO ()
sync_fetches
where
fetchName :: forall req . PerformFetch req -> String
fetchName :: PerformFetch req -> String
fetchName (BackgroundFetch [BlockedFetch req] -> IO ()
_) = String
"background"
fetchName (AsyncFetch [BlockedFetch req] -> IO () -> IO ()
_) = String
"async"
fetchName (SyncFetch [BlockedFetch req] -> IO ()
_) = String
"sync"
srcName :: forall req . (DataSourceName req) => [BlockedFetch req] -> String
srcName :: [BlockedFetch req] -> String
srcName [BlockedFetch req]
_ = Text -> String
Text.unpack (Text -> String) -> Text -> String
forall a b. (a -> b) -> a -> b
$ Proxy req -> Text
forall (req :: * -> *). DataSourceName req => Proxy req -> Text
dataSourceName (Proxy req
forall k (t :: k). Proxy t
Proxy :: Proxy req)
stats :: [(CallId, String, String)]
stats = [([BlockedFetch req] -> CallId
forall (t :: * -> *) a. Foldable t => t a -> CallId
length [BlockedFetch req]
reqs, PerformFetch req -> String
forall (req :: * -> *). PerformFetch req -> String
fetchName PerformFetch req
f, [BlockedFetch req] -> String
forall (req :: * -> *).
DataSourceName req =>
[BlockedFetch req] -> String
srcName [BlockedFetch req]
reqs)
| FetchToDo [BlockedFetch req]
reqs PerformFetch req
f <- [FetchToDo]
fetches]
fully_async_fetches :: IO ()
fully_async_fetches :: IO ()
fully_async_fetches = [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_
[[BlockedFetch req] -> IO ()
f [BlockedFetch req]
reqs | FetchToDo [BlockedFetch req]
reqs (BackgroundFetch [BlockedFetch req] -> IO ()
f) <- [FetchToDo]
fetches]
async_fetches :: IO () -> IO ()
async_fetches :: IO () -> IO ()
async_fetches = [IO () -> IO ()] -> IO () -> IO ()
forall a. [a -> a] -> a -> a
compose
[[BlockedFetch req] -> IO () -> IO ()
f [BlockedFetch req]
reqs | FetchToDo [BlockedFetch req]
reqs (AsyncFetch [BlockedFetch req] -> IO () -> IO ()
f) <- [FetchToDo]
fetches]
sync_fetches :: IO ()
sync_fetches :: IO ()
sync_fetches = [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_
[[BlockedFetch req] -> IO ()
f [BlockedFetch req]
reqs | FetchToDo [BlockedFetch req]
reqs (SyncFetch [BlockedFetch req] -> IO ()
f) <- [FetchToDo]
fetches]
data ReadingCompletionsFailedFetch = ReadingCompletionsFailedFetch Text
deriving CallId -> ReadingCompletionsFailedFetch -> String -> String
[ReadingCompletionsFailedFetch] -> String -> String
ReadingCompletionsFailedFetch -> String
(CallId -> ReadingCompletionsFailedFetch -> String -> String)
-> (ReadingCompletionsFailedFetch -> String)
-> ([ReadingCompletionsFailedFetch] -> String -> String)
-> Show ReadingCompletionsFailedFetch
forall a.
(CallId -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [ReadingCompletionsFailedFetch] -> String -> String
$cshowList :: [ReadingCompletionsFailedFetch] -> String -> String
show :: ReadingCompletionsFailedFetch -> String
$cshow :: ReadingCompletionsFailedFetch -> String
showsPrec :: CallId -> ReadingCompletionsFailedFetch -> String -> String
$cshowsPrec :: CallId -> ReadingCompletionsFailedFetch -> String -> String
Show
instance Exception ReadingCompletionsFailedFetch