-- Copyright (c) 2014-present, Facebook, Inc.
-- All rights reserved.
--
-- This source code is distributed under the terms of a BSD license,
-- found in the LICENSE file.

{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE DeriveDataTypeable #-}

-- | Defines 'runHaxl'.  Most users should import "Haxl.Core" instead.
--
module Haxl.Core.Run
  ( runHaxl
  , runHaxlWithWrites
  ) where

import Control.Concurrent.STM
import Control.Exception as Exception
import Control.Monad
import Data.IORef
import Text.Printf
import Unsafe.Coerce

import Haxl.Core.DataCache
import Haxl.Core.Exception
import Haxl.Core.Flags
import Haxl.Core.Monad
import Haxl.Core.Fetch
import Haxl.Core.Profile
import Haxl.Core.RequestStore as RequestStore
import Haxl.Core.Stats
import Haxl.Core.Util

import qualified Data.HashTable.IO as H

-- -----------------------------------------------------------------------------
-- runHaxl

-- | Runs a 'Haxl' computation in the given 'Env'.
--
-- Note: to make multiple concurrent calls to 'runHaxl', each one must
-- have a separate 'Env'. A single 'Env' must /not/ be shared between
-- multiple concurrent calls to 'runHaxl', otherwise deadlocks or worse
-- will likely ensue.
--
-- However, multiple 'Env's may share a single 'StateStore', and thereby
-- use the same set of datasources.
runHaxl:: forall u w a. Env u w -> GenHaxl u w a -> IO a
runHaxl :: Env u w -> GenHaxl u w a -> IO a
runHaxl Env u w
env GenHaxl u w a
haxl = (a, [w]) -> a
forall a b. (a, b) -> a
fst ((a, [w]) -> a) -> IO (a, [w]) -> IO a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Env u w -> GenHaxl u w a -> IO (a, [w])
forall u w a. Env u w -> GenHaxl u w a -> IO (a, [w])
runHaxlWithWrites Env u w
env GenHaxl u w a
haxl

runHaxlWithWrites :: forall u w a. Env u w -> GenHaxl u w a -> IO (a, [w])
runHaxlWithWrites :: Env u w -> GenHaxl u w a -> IO (a, [w])
runHaxlWithWrites env :: Env u w
env@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
..} GenHaxl u w a
haxl = do
  result :: IVar u w a
result@IVar{ivarRef :: forall u w a. IVar u w a -> IORef (IVarContents u w a)
ivarRef = IORef (IVarContents u w a)
resultRef} <- IO (IVar u w a)
forall u w a. IO (IVar u w a)
newIVar -- where to put the final result
  String -> IO ()
ifTraceLog <- do
    if Flags -> CallId
trace Flags
flags CallId -> CallId -> Bool
forall a. Ord a => a -> a -> Bool
< CallId
3
    then (String -> IO ()) -> IO (String -> IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((String -> IO ()) -> IO (String -> IO ()))
-> (String -> IO ()) -> IO (String -> IO ())
forall a b. (a -> b) -> a -> b
$ \String
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    else do
      Timestamp
start <- IO Timestamp
getTimestamp
      (String -> IO ()) -> IO (String -> IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return ((String -> IO ()) -> IO (String -> IO ()))
-> (String -> IO ()) -> IO (String -> IO ())
forall a b. (a -> b) -> a -> b
$ \String
s -> do
        Timestamp
now <- IO Timestamp
getTimestamp
        let t :: Double
t = Timestamp -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Timestamp
now Timestamp -> Timestamp -> Timestamp
forall a. Num a => a -> a -> a
- Timestamp
start) Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
1000.0 :: Double
        String -> Double -> String -> IO ()
forall r. PrintfType r => String -> r
printf String
"%.1fms: %s" Double
t (String
s :: String)
  let
    -- Run a job, and put its result in the given IVar
    schedule :: Env u w -> JobList u w -> GenHaxl u w b -> IVar u w b -> IO ()
    schedule :: Env u w -> JobList u w -> GenHaxl u w b -> IVar u w b -> IO ()
schedule env :: Env u w
env@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} JobList u w
rq (GenHaxl Env u w -> IO (Result u w b)
run) ivar :: IVar u w b
ivar@IVar{ivarRef :: forall u w a. IVar u w a -> IORef (IVarContents u w a)
ivarRef = !IORef (IVarContents u w b)
ref} = do
      String -> IO ()
ifTraceLog (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> CallId -> String
forall r. PrintfType r => String -> r
printf String
"schedule: %d\n" (CallId
1 CallId -> CallId -> CallId
forall a. Num a => a -> a -> a
+ JobList u w -> CallId
forall u w. JobList u w -> CallId
lengthJobList JobList u w
rq)
      let {-# INLINE result #-}
          result :: ResultVal b w -> IO ()
result ResultVal b w
r = do
            IVarContents u w b
e <- IORef (IVarContents u w b) -> IO (IVarContents u w b)
forall a. IORef a -> IO a
readIORef IORef (IVarContents u w b)
ref
            case IVarContents u w b
e of
              IVarFull ResultVal b w
_ ->
                -- An IVar is typically only meant to be written to once
                -- so it would make sense to throw an error here. But there
                -- are legitimate use-cases for writing several times.
                -- (See Haxl.Core.Parallel)
                Env u w -> JobList u w -> IO ()
forall u w. Env u w -> JobList u w -> IO ()
reschedule Env u w
env JobList u w
rq
              IVarEmpty JobList u w
haxls -> do
                IORef (IVarContents u w b) -> IVarContents u w b -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (IVarContents u w b)
ref (ResultVal b w -> IVarContents u w b
forall u w a. ResultVal a w -> IVarContents u w a
IVarFull ResultVal b w
r)
                -- Have we got the final result now?
                if IORef (IVarContents u w b)
ref IORef (IVarContents u w b) -> IORef (IVarContents u w b) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (IVarContents u w a) -> IORef (IVarContents u w b)
forall a b. a -> b
unsafeCoerce IORef (IVarContents u w a)
resultRef
                        -- comparing IORefs of different types is safe, it's
                        -- pointer-equality on the MutVar#.
                   then
                     -- We have a result, but don't discard unfinished
                     -- computations in the run queue. See
                     -- Note [runHaxl and unfinished requests].
                     -- Nothing can depend on the final IVar, so haxls must
                     -- be empty.
                     case JobList u w
rq of
                       JobList u w
JobNil -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                       JobList u w
_ -> IORef (JobList u w) -> (JobList u w -> JobList u w) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (JobList u w)
runQueueRef (JobList u w -> JobList u w -> JobList u w
forall u w. JobList u w -> JobList u w -> JobList u w
appendJobList JobList u w
rq)
                   else Env u w -> JobList u w -> IO ()
forall u w. Env u w -> JobList u w -> IO ()
reschedule Env u w
env (JobList u w -> JobList u w -> JobList u w
forall u w. JobList u w -> JobList u w -> JobList u w
appendJobList JobList u w
haxls JobList u w
rq)
      Either SomeException (Result u w b)
r <-
        if ReportFlag -> ReportFlags -> Bool
testReportFlag ReportFlag
ReportProfiling (ReportFlags -> Bool) -> ReportFlags -> Bool
forall a b. (a -> b) -> a -> b
$ Flags -> ReportFlags
report Flags
flags  -- withLabel unfolded
          then IO (Result u w b) -> IO (Either SomeException (Result u w b))
forall e a. Exception e => IO a -> IO (Either e a)
Exception.try (IO (Result u w b) -> IO (Either SomeException (Result u w b)))
-> IO (Result u w b) -> IO (Either SomeException (Result u w b))
forall a b. (a -> b) -> a -> b
$ (Env u w -> IO (Result u w b)) -> Env u w -> IO (Result u w b)
forall u w a.
(Env u w -> IO (Result u w a)) -> Env u w -> IO (Result u w a)
profileCont Env u w -> IO (Result u w b)
run Env u w
env
          else IO (Result u w b) -> IO (Either SomeException (Result u w b))
forall e a. Exception e => IO a -> IO (Either e a)
Exception.try (IO (Result u w b) -> IO (Either SomeException (Result u w b)))
-> IO (Result u w b) -> IO (Either SomeException (Result u w b))
forall a b. (a -> b) -> a -> b
$ Env u w -> IO (Result u w b)
run Env u w
env
      case Either SomeException (Result u w b)
r of
        Left SomeException
e -> do
          SomeException -> IO ()
rethrowAsyncExceptions SomeException
e
          ResultVal b w -> IO ()
result (SomeException -> ResultVal b w
forall a w. SomeException -> ResultVal a w
ThrowIO SomeException
e)
        Right (Done b
a) -> do
          WriteTree w
wt <- IORef (WriteTree w) -> IO (WriteTree w)
forall a. IORef a -> IO a
readIORef IORef (WriteTree w)
writeLogsRef
          ResultVal b w -> IO ()
result (b -> WriteTree w -> ResultVal b w
forall a w. a -> WriteTree w -> ResultVal a w
Ok b
a WriteTree w
wt)
        Right (Throw SomeException
ex) -> do
          WriteTree w
wt <- IORef (WriteTree w) -> IO (WriteTree w)
forall a. IORef a -> IO a
readIORef IORef (WriteTree w)
writeLogsRef
          ResultVal b w -> IO ()
result (SomeException -> WriteTree w -> ResultVal b w
forall a w. SomeException -> WriteTree w -> ResultVal a w
ThrowHaxl SomeException
ex WriteTree w
wt)
        Right (Blocked IVar u w b
i Cont u w b
fn) -> do
          Env u w -> GenHaxl u w b -> IVar u w b -> IVar u w b -> IO ()
forall u w b a.
Env u w -> GenHaxl u w b -> IVar u w b -> IVar u w a -> IO ()
addJob Env u w
env (Cont u w b -> GenHaxl u w b
forall u w a. Cont u w a -> GenHaxl u w a
toHaxl Cont u w b
fn) IVar u w b
ivar IVar u w b
i
          Env u w -> JobList u w -> IO ()
forall u w. Env u w -> JobList u w -> IO ()
reschedule Env u w
env JobList u w
rq

    -- Here we have a choice:
    --   - If the requestStore is non-empty, we could submit those
    --     requests right away without waiting for more.  This might
    --     be good for latency, especially if the data source doesn't
    --     support batching, or if batching is pessimal.
    --   - To optimise the batch sizes, we want to execute as much as
    --     we can and only submit requests when we have no more
    --     computation to do.
    --   - compromise: wait at least Nms for an outstanding result
    --     before giving up and submitting new requests.
    --
    -- For now we use the batching strategy in the scheduler, but
    -- individual data sources can request that their requests are
    -- sent eagerly by using schedulerHint.
    --
    reschedule :: Env u w -> JobList u w -> IO ()
    reschedule :: Env u w -> JobList u w -> IO ()
reschedule env :: Env u w
env@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} JobList u w
haxls = do
      case JobList u w
haxls of
        JobList u w
JobNil -> do
          JobList u w
rq <- IORef (JobList u w) -> IO (JobList u w)
forall a. IORef a -> IO a
readIORef IORef (JobList u w)
runQueueRef
          case JobList u w
rq of
            JobList u w
JobNil -> Env u w -> IO ()
forall u w. Env u w -> IO ()
emptyRunQueue Env u w
env
            JobCons Env u w
env' GenHaxl u w a
a IVar u w a
b JobList u w
c -> do
              IORef (JobList u w) -> JobList u w -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (JobList u w)
runQueueRef JobList u w
forall u w. JobList u w
JobNil
              Env u w -> JobList u w -> GenHaxl u w a -> IVar u w a -> IO ()
forall u w b.
Env u w -> JobList u w -> GenHaxl u w b -> IVar u w b -> IO ()
schedule Env u w
env' JobList u w
c GenHaxl u w a
a IVar u w a
b
        JobCons Env u w
env' GenHaxl u w a
a IVar u w a
b JobList u w
c ->
          Env u w -> JobList u w -> GenHaxl u w a -> IVar u w a -> IO ()
forall u w b.
Env u w -> JobList u w -> GenHaxl u w b -> IVar u w b -> IO ()
schedule Env u w
env' JobList u w
c GenHaxl u w a
a IVar u w a
b

    emptyRunQueue :: Env u w -> IO ()
    emptyRunQueue :: Env u w -> IO ()
emptyRunQueue Env u w
env = do
      String -> IO ()
ifTraceLog (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> String
forall r. PrintfType r => String -> r
printf String
"emptyRunQueue\n"
      JobList u w
haxls <- Env u w -> IO (JobList u w)
forall u w. Env u w -> IO (JobList u w)
checkCompletions Env u w
env
      case JobList u w
haxls of
        JobList u w
JobNil -> Env u w -> IO ()
forall u w. Env u w -> IO ()
checkRequestStore Env u w
env
        JobList u w
_ -> Env u w -> JobList u w -> IO ()
forall u w. Env u w -> JobList u w -> IO ()
reschedule Env u w
env JobList u w
haxls

    checkRequestStore :: Env u w -> IO ()
    checkRequestStore :: Env u w -> IO ()
checkRequestStore env :: Env u w
env@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} = do
      String -> IO ()
ifTraceLog (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> String
forall r. PrintfType r => String -> r
printf String
"checkRequestStore\n"
      RequestStore u
reqStore <- IORef (RequestStore u) -> IO (RequestStore u)
forall a. IORef a -> IO a
readIORef IORef (RequestStore u)
reqStoreRef
      if RequestStore u -> Bool
forall u. RequestStore u -> Bool
RequestStore.isEmpty RequestStore u
reqStore
        then Env u w -> IO ()
forall u w. Env u w -> IO ()
waitCompletions Env u w
env
        else do
          String -> IO ()
ifTraceLog (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> CallId -> String
forall r. PrintfType r => String -> r
printf String
"performFetches %d\n" (RequestStore u -> CallId
forall u. RequestStore u -> CallId
RequestStore.getSize RequestStore u
reqStore)
          IORef (RequestStore u) -> RequestStore u -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (RequestStore u)
reqStoreRef RequestStore u
forall u. RequestStore u
noRequests
          Env u w -> RequestStore u -> IO ()
forall u w. Env u w -> RequestStore u -> IO ()
performRequestStore Env u w
env RequestStore u
reqStore
          -- empty the cache if we're not caching.  Is this the best
          -- place to do it?  We do get to de-duplicate requests that
          -- happen simultaneously.
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Flags -> CallId
caching Flags
flags CallId -> CallId -> Bool
forall a. Eq a => a -> a -> Bool
== CallId
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            let DataCache HashTable TypeRep (SubCache (DataCacheItem u w))
dc = HaxlDataCache u w
dataCache
            (() -> (TypeRep, SubCache (DataCacheItem u w)) -> IO ())
-> () -> HashTable TypeRep (SubCache (DataCacheItem u w)) -> IO ()
forall (h :: * -> * -> * -> *) a k v.
HashTable h =>
(a -> (k, v) -> IO a) -> a -> IOHashTable h k v -> IO a
H.foldM (\()
_ (TypeRep
k, SubCache (DataCacheItem u w)
_) -> HashTable TypeRep (SubCache (DataCacheItem u w))
-> TypeRep -> IO ()
forall (h :: * -> * -> * -> *) k v.
(HashTable h, Eq k, Hashable k) =>
IOHashTable h k v -> k -> IO ()
H.delete HashTable RealWorld TypeRep (SubCache (DataCacheItem u w))
HashTable TypeRep (SubCache (DataCacheItem u w))
dc TypeRep
k) () HashTable RealWorld TypeRep (SubCache (DataCacheItem u w))
HashTable TypeRep (SubCache (DataCacheItem u w))
dc
          Env u w -> IO ()
forall u w. Env u w -> IO ()
emptyRunQueue Env u w
env

    checkCompletions :: Env u w -> IO (JobList u w)
    checkCompletions :: Env u w -> IO (JobList u w)
checkCompletions Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} = do
      String -> IO ()
ifTraceLog (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> String
forall r. PrintfType r => String -> r
printf String
"checkCompletions\n"
      [CompleteReq u w]
comps <- LogicBug -> STM [CompleteReq u w] -> IO [CompleteReq u w]
forall e a. Exception e => e -> STM a -> IO a
atomicallyOnBlocking (ReadingCompletionsFailedRun -> LogicBug
forall e. Exception e => e -> LogicBug
LogicBug ReadingCompletionsFailedRun
ReadingCompletionsFailedRun) (STM [CompleteReq u w] -> IO [CompleteReq u w])
-> STM [CompleteReq u w] -> IO [CompleteReq u w]
forall a b. (a -> b) -> a -> b
$ do
        [CompleteReq u w]
c <- TVar [CompleteReq u w] -> STM [CompleteReq u w]
forall a. TVar a -> STM a
readTVar TVar [CompleteReq u w]
completions
        TVar [CompleteReq u w] -> [CompleteReq u w] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [CompleteReq u w]
completions []
        [CompleteReq u w] -> STM [CompleteReq u w]
forall (m :: * -> *) a. Monad m => a -> m a
return [CompleteReq u w]
c
      case [CompleteReq u w]
comps of
        [] -> JobList u w -> IO (JobList u w)
forall (m :: * -> *) a. Monad m => a -> m a
return JobList u w
forall u w. JobList u w
JobNil
        [CompleteReq u w]
_ -> do
          String -> IO ()
ifTraceLog (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> CallId -> String
forall r. PrintfType r => String -> r
printf String
"%d complete\n" ([CompleteReq u w] -> CallId
forall (t :: * -> *) a. Foldable t => t a -> CallId
length [CompleteReq u w]
comps)
          let
              getComplete :: CompleteReq u w -> IO (JobList u w)
getComplete (CompleteReq ResultVal a w
a IVar{ivarRef :: forall u w a. IVar u w a -> IORef (IVarContents u w a)
ivarRef = !IORef (IVarContents u w a)
cr} Timestamp
allocs) = do
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Timestamp
allocs Timestamp -> Timestamp -> Bool
forall a. Ord a => a -> a -> Bool
< Timestamp
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                  Timestamp
cur <- IO Timestamp
getAllocationCounter
                  Timestamp -> IO ()
setAllocationCounter (Timestamp
cur Timestamp -> Timestamp -> Timestamp
forall a. Num a => a -> a -> a
+ Timestamp
allocs)
                IVarContents u w a
r <- IORef (IVarContents u w a) -> IO (IVarContents u w a)
forall a. IORef a -> IO a
readIORef IORef (IVarContents u w a)
cr
                case IVarContents u w a
r of
                  IVarFull ResultVal a w
_ -> do
                    String -> IO ()
ifTraceLog (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> String
forall r. PrintfType r => String -> r
printf String
"existing result\n"
                    JobList u w -> IO (JobList u w)
forall (m :: * -> *) a. Monad m => a -> m a
return JobList u w
forall u w. JobList u w
JobNil
                    -- this happens if a data source reports a result,
                    -- and then throws an exception.  We call putResult
                    -- a second time for the exception, which comes
                    -- ahead of the original request (because it is
                    -- pushed on the front of the completions list) and
                    -- therefore overrides it.
                  IVarEmpty JobList u w
cv -> do
                    IORef (IVarContents u w a) -> IVarContents u w a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (IVarContents u w a)
cr (ResultVal a w -> IVarContents u w a
forall u w a. ResultVal a w -> IVarContents u w a
IVarFull ResultVal a w
a)
                    JobList u w -> IO (JobList u w)
forall (m :: * -> *) a. Monad m => a -> m a
return JobList u w
cv
          [JobList u w]
jobs <- (CompleteReq u w -> IO (JobList u w))
-> [CompleteReq u w] -> IO [JobList u w]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM CompleteReq u w -> IO (JobList u w)
forall u w. CompleteReq u w -> IO (JobList u w)
getComplete [CompleteReq u w]
comps
          JobList u w -> IO (JobList u w)
forall (m :: * -> *) a. Monad m => a -> m a
return ((JobList u w -> JobList u w -> JobList u w)
-> JobList u w -> [JobList u w] -> JobList u w
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr JobList u w -> JobList u w -> JobList u w
forall u w. JobList u w -> JobList u w -> JobList u w
appendJobList JobList u w
forall u w. JobList u w
JobNil [JobList u w]
jobs)

    waitCompletions :: Env u w -> IO ()
    waitCompletions :: Env u w -> IO ()
waitCompletions env :: Env u w
env@Env{u
CallId
Maybe (DataCacheLookup w)
TVar [CompleteReq u w]
IORef CallId
IORef Profile
IORef Stats
IORef ReqCountMap
IORef (RequestStore u)
IORef (JobList u w)
IORef (WriteTree w)
HaxlDataCache u w
Flags
StateStore
ProfileCurrent
dataCacheFetchFallback :: Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: IORef (WriteTree w)
writeLogsRef :: IORef (WriteTree w)
completions :: TVar [CompleteReq u w]
submittedReqsRef :: IORef ReqCountMap
runQueueRef :: IORef (JobList u w)
reqStoreRef :: IORef (RequestStore u)
states :: StateStore
profRef :: IORef Profile
profCurrent :: ProfileCurrent
callIdRef :: IORef CallId
statsBatchIdRef :: IORef CallId
statsRef :: IORef Stats
userEnv :: u
flags :: Flags
memoKey :: CallId
memoCache :: HaxlDataCache u w
dataCache :: HaxlDataCache u w
dataCacheFetchFallback :: forall u w. Env u w -> Maybe (DataCacheLookup w)
writeLogsRefNoMemo :: forall u w. Env u w -> IORef (WriteTree w)
writeLogsRef :: forall u w. Env u w -> IORef (WriteTree w)
completions :: forall u w. Env u w -> TVar [CompleteReq u w]
submittedReqsRef :: forall u w. Env u w -> IORef ReqCountMap
runQueueRef :: forall u w. Env u w -> IORef (JobList u w)
reqStoreRef :: forall u w. Env u w -> IORef (RequestStore u)
states :: forall u w. Env u w -> StateStore
profRef :: forall u w. Env u w -> IORef Profile
profCurrent :: forall u w. Env u w -> ProfileCurrent
callIdRef :: forall u w. Env u w -> IORef CallId
statsBatchIdRef :: forall u w. Env u w -> IORef CallId
statsRef :: forall u w. Env u w -> IORef Stats
userEnv :: forall u w. Env u w -> u
flags :: forall u w. Env u w -> Flags
memoKey :: forall u w. Env u w -> CallId
memoCache :: forall u w. Env u w -> HaxlDataCache u w
dataCache :: forall u w. Env u w -> HaxlDataCache u w
..} = do
      String -> IO ()
ifTraceLog (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> String
forall r. PrintfType r => String -> r
printf String
"waitCompletions\n"
      let
        wrapped :: STM a -> IO a
wrapped = LogicBug -> STM a -> IO a
forall e a. Exception e => e -> STM a -> IO a
atomicallyOnBlocking (ReadingCompletionsFailedRun -> LogicBug
forall e. Exception e => e -> LogicBug
LogicBug ReadingCompletionsFailedRun
ReadingCompletionsFailedRun)
        doWait :: IO ()
doWait = STM () -> IO ()
forall a. STM a -> IO a
wrapped (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          [CompleteReq u w]
c <- TVar [CompleteReq u w] -> STM [CompleteReq u w]
forall a. TVar a -> STM a
readTVar TVar [CompleteReq u w]
completions
          Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([CompleteReq u w] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [CompleteReq u w]
c) STM ()
forall a. STM a
retry
        doWaitProfiled :: IO ()
doWaitProfiled = do
          Bool
queueEmpty <- [CompleteReq u w] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null ([CompleteReq u w] -> Bool) -> IO [CompleteReq u w] -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM [CompleteReq u w] -> IO [CompleteReq u w]
forall a. STM a -> IO a
wrapped (TVar [CompleteReq u w] -> STM [CompleteReq u w]
forall a. TVar a -> STM a
readTVar TVar [CompleteReq u w]
completions)
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
queueEmpty (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            -- Double check the queue as we want to make sure that
            -- submittedReqsRef is copied before waiting on the queue but as a
            -- fast path do not want to copy it if the queue is empty.
            -- There is still a race oppoortunity as submittedReqsRef is
            -- decremented in whatever thread the completion happens, and so it
            -- is possible for waitingOn to be empty while queueEmpty2 is True.
            ReqCountMap
waitingOn <- IORef ReqCountMap -> IO ReqCountMap
forall a. IORef a -> IO a
readIORef IORef ReqCountMap
submittedReqsRef
            Bool
queueEmpty2 <- [CompleteReq u w] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null ([CompleteReq u w] -> Bool) -> IO [CompleteReq u w] -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM [CompleteReq u w] -> IO [CompleteReq u w]
forall a. STM a -> IO a
wrapped (TVar [CompleteReq u w] -> STM [CompleteReq u w]
forall a. TVar a -> STM a
readTVar TVar [CompleteReq u w]
completions)
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
queueEmpty2 (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
              Timestamp
start <- IO Timestamp
getTimestamp
              IO ()
doWait
              Timestamp
end <- IO Timestamp
getTimestamp
              let fw :: FetchStats
fw = FetchWait :: HashMap Text CallId -> Timestamp -> Timestamp -> FetchStats
FetchWait
                        { fetchWaitReqs :: HashMap Text CallId
fetchWaitReqs = ReqCountMap -> HashMap Text CallId
getSummaryMapFromRCMap ReqCountMap
waitingOn
                        , fetchWaitStart :: Timestamp
fetchWaitStart = Timestamp
start
                        , fetchWaitDuration :: Timestamp
fetchWaitDuration = (Timestamp
endTimestamp -> Timestamp -> Timestamp
forall a. Num a => a -> a -> a
-Timestamp
start)
                        }
              IORef Stats -> (Stats -> Stats) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef Stats
statsRef ((Stats -> Stats) -> IO ()) -> (Stats -> Stats) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Stats [FetchStats]
s) -> [FetchStats] -> Stats
Stats (FetchStats
fwFetchStats -> [FetchStats] -> [FetchStats]
forall a. a -> [a] -> [a]
:[FetchStats]
s)
      if ReportFlag -> ReportFlags -> Bool
testReportFlag ReportFlag
ReportFetchStats (ReportFlags -> Bool) -> ReportFlags -> Bool
forall a b. (a -> b) -> a -> b
$ Flags -> ReportFlags
report Flags
flags
        then IO ()
doWaitProfiled
        else IO ()
doWait
      Env u w -> IO ()
forall u w. Env u w -> IO ()
emptyRunQueue Env u w
env

  --
  Env u w -> JobList u w -> GenHaxl u w a -> IVar u w a -> IO ()
forall u w b.
Env u w -> JobList u w -> GenHaxl u w b -> IVar u w b -> IO ()
schedule Env u w
env JobList u w
forall u w. JobList u w
JobNil GenHaxl u w a
haxl IVar u w a
result
  IVarContents u w a
r <- IORef (IVarContents u w a) -> IO (IVarContents u w a)
forall a. IORef a -> IO a
readIORef IORef (IVarContents u w a)
resultRef
  IORef (WriteTree w) -> WriteTree w -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (WriteTree w)
writeLogsRef WriteTree w
forall w. WriteTree w
NilWrites
  WriteTree w
wtNoMemo <- IORef (WriteTree w)
-> (WriteTree w -> (WriteTree w, WriteTree w)) -> IO (WriteTree w)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (WriteTree w)
writeLogsRefNoMemo
    (\WriteTree w
old_wrts -> (WriteTree w
forall w. WriteTree w
NilWrites , WriteTree w
old_wrts))
  case IVarContents u w a
r of
    IVarEmpty JobList u w
_ -> CriticalError -> IO (a, [w])
forall e a. Exception e => e -> IO a
throwIO (Text -> CriticalError
CriticalError Text
"runHaxl: missing result")
    IVarFull (Ok a
a WriteTree w
wt) -> do
      (a, [w]) -> IO (a, [w])
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, WriteTree w -> [w]
forall w. WriteTree w -> [w]
flattenWT (WriteTree w
wt WriteTree w -> WriteTree w -> WriteTree w
forall w. WriteTree w -> WriteTree w -> WriteTree w
`appendWTs` WriteTree w
wtNoMemo))
    IVarFull (ThrowHaxl SomeException
e WriteTree w
_wt)  -> SomeException -> IO (a, [w])
forall e a. Exception e => e -> IO a
throwIO SomeException
e
      -- The written logs are discarded when there's a Haxl exception. We
      -- can change this behavior if we need to get access to partial logs.
    IVarFull (ThrowIO SomeException
e)  -> SomeException -> IO (a, [w])
forall e a. Exception e => e -> IO a
throwIO SomeException
e


{- Note [runHaxl and unfinished requests]

runHaxl returns immediately when the supplied computation has returned
a result.  This doesn't necessarily mean that the whole computation
graph has completed, however.  In particular, when using pAnd and pOr,
we might have created some data fetches that have not completed, but
weren't required, because the other branch of the pAnd/pOr subsumed
the result.

When runHaxl returns, it might be that:
- reqStoreRef contains some unsubmitted requests
- runQueueRef contains some jobs
- there are in-flight BackgroundFetch requests, that will return their
  results to the completions queue in due course.
- there are various unfilled IVars in the cache and/or memo tables

This should be all safe, we can even restart runHaxl with the same Env
after it has stopped and the in-progress computations will
continue. But don't discard the contents of
reqStoreRef/runQueueRef/completions, because then we'll deadlock if we
discover one of the unfilled IVars in the cache or memo table.
-}

{- TODO: later
data SchedPolicy
  = SubmitImmediately
  | WaitAtLeast Int{-ms-}
  | WaitForAllPendingRequests
-}

-- | An exception thrown when reading from datasources fails
data ReadingCompletionsFailedRun = ReadingCompletionsFailedRun
  deriving CallId -> ReadingCompletionsFailedRun -> String -> String
[ReadingCompletionsFailedRun] -> String -> String
ReadingCompletionsFailedRun -> String
(CallId -> ReadingCompletionsFailedRun -> String -> String)
-> (ReadingCompletionsFailedRun -> String)
-> ([ReadingCompletionsFailedRun] -> String -> String)
-> Show ReadingCompletionsFailedRun
forall a.
(CallId -> a -> String -> String)
-> (a -> String) -> ([a] -> String -> String) -> Show a
showList :: [ReadingCompletionsFailedRun] -> String -> String
$cshowList :: [ReadingCompletionsFailedRun] -> String -> String
show :: ReadingCompletionsFailedRun -> String
$cshow :: ReadingCompletionsFailedRun -> String
showsPrec :: CallId -> ReadingCompletionsFailedRun -> String -> String
$cshowsPrec :: CallId -> ReadingCompletionsFailedRun -> String -> String
Show

instance Exception ReadingCompletionsFailedRun