{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# OPTIONS_HADDOCK hide #-}
module Data.Array.Accelerate.Debug.Monitoring (
beginMonitoring,
initAccMetrics,
Processor(..),
withProcessor, addProcessorTime,
didAllocateBytesLocal, didAllocateBytesRemote,
didCopyBytesToRemote, didCopyBytesFromRemote,
increaseCurrentBytesRemote, decreaseCurrentBytesRemote,
setCurrentBytesNursery,
didRemoteGC,
didEvictBytes,
) where
#ifdef ACCELERATE_MONITORING
import Data.Atomic ( Atomic )
import qualified Data.Atomic as Atomic
import System.Metrics
import System.Metrics.Counter ( Counter )
import System.Metrics.Gauge ( Gauge )
import qualified System.Metrics.Counter as Counter
import qualified System.Metrics.Gauge as Gauge
import Control.Concurrent
import Control.Concurrent.Async
import Control.Monad
import Data.IORef
import Data.Text ( Text )
import Data.Time.Clock
import System.IO.Unsafe
import System.Remote.Monitoring
import Text.Printf
import qualified Data.HashMap.Strict as Map
#endif
import Data.Int
import Prelude
beginMonitoring :: IO ()
#ifdef ACCELERATE_MONITORING
beginMonitoring = do
store <- initAccMetrics
registerGcMetrics store
r <- withAsync (forkServerWith store "localhost" 8000 >> threadDelay 10000) waitCatch
case r of
Right _ -> printf "EKG monitor started at: http://localhost:8000\n"
Left _ -> printf "Failed to start EKG monitor\n"
#else
beginMonitoring = return ()
#endif
#ifndef ACCELERATE_MONITORING
initAccMetrics :: IO a
initAccMetrics = error "Data.Array.Accelerate: Monitoring is disabled. Reinstall package 'accelerate' with '-fekg' to enable it."
#else
initAccMetrics :: IO Store
initAccMetrics = do
store <- newStore
registerRate "acc.load.llvm_native" (estimateProcessorLoad _active_ns_llvm_native) store
registerRate "acc.load.llvm_ptx" (estimateProcessorLoad _active_ns_llvm_ptx) store
registerGauge "acc.gc.current_bytes_remote" (Gauge.read _current_bytes_remote) store
registerGauge "acc.gc.current_bytes_nursery" (Gauge.read _current_bytes_nursery) store
registerCounter "acc.gc.bytes_allocated_local" (Counter.read _total_bytes_allocated_local) store
registerCounter "acc.gc.bytes_allocated_remote" (Counter.read _total_bytes_allocated_remote) store
registerCounter "acc.gc.bytes_copied_to_remote" (Counter.read _total_bytes_copied_to_remote) store
registerCounter "acc.gc.bytes_copied_from_remote" (Counter.read _total_bytes_copied_from_remote) store
registerCounter "acc.gc.bytes_evicted_from_remote" (Counter.read _total_bytes_evicted_from_remote) store
registerCounter "acc.gc.num_gcs" (Counter.read _num_remote_gcs) store
registerCounter "acc.gc.num_lru_evict" (Counter.read _num_evictions) store
return store
registerRate :: Text -> (IORef EMAState -> IO Int64) -> Store -> IO ()
registerRate name sample store = do
now <- getCurrentTime
st <- newIORef (ES now 0 0)
registerGroup (Map.singleton name Gauge) (sample st) store
#endif
data Processor = Native | PTX
{-# INLINE withProcessor #-}
withProcessor :: Processor -> IO a -> IO a
#ifndef ACCELERATE_MONITORING
withProcessor _ = id
#else
withProcessor Native = withProcessor' _active_ns_llvm_native
withProcessor PTX = withProcessor' _active_ns_llvm_ptx
withProcessor' :: Atomic -> IO a -> IO a
withProcessor' var action = do
wall0 <- getCurrentTime
!r <- action
wall1 <- getCurrentTime
addProcessorTime' var (realToFrac (diffUTCTime wall1 wall0))
return r
#endif
{-# INLINE addProcessorTime #-}
addProcessorTime :: Processor -> Double -> IO ()
#ifndef ACCELERATE_MONITORING
addProcessorTime _ _ = return ()
#else
addProcessorTime Native = addProcessorTime' _active_ns_llvm_native
addProcessorTime PTX = addProcessorTime' _active_ns_llvm_ptx
addProcessorTime' :: Atomic -> Double -> IO ()
addProcessorTime' var secs =
let ns = round (secs * 1.0E9)
in void $ Atomic.add var ns
#endif
didAllocateBytesLocal :: Int64 -> IO ()
#ifndef ACCELERATE_MONITORING
didAllocateBytesLocal _ = return ()
#else
didAllocateBytesLocal n = do
Counter.add _total_bytes_allocated_local n
#endif
didAllocateBytesRemote :: Int64 -> IO ()
#ifndef ACCELERATE_MONITORING
didAllocateBytesRemote _ = return ()
#else
didAllocateBytesRemote n = do
Counter.add _total_bytes_allocated_remote n
#endif
{-# INLINE increaseCurrentBytesRemote #-}
increaseCurrentBytesRemote :: Int64 -> IO ()
#ifndef ACCELERATE_MONITORING
increaseCurrentBytesRemote _ = return ()
#else
increaseCurrentBytesRemote n = Gauge.add _current_bytes_remote n
#endif
{-# INLINE decreaseCurrentBytesRemote #-}
decreaseCurrentBytesRemote :: Int64 -> IO ()
#ifndef ACCELERATE_MONITORING
decreaseCurrentBytesRemote _ = return ()
#else
decreaseCurrentBytesRemote n = Gauge.subtract _current_bytes_remote n
#endif
didCopyBytesToRemote :: Int64 -> IO ()
#ifndef ACCELERATE_MONITORING
didCopyBytesToRemote _ = return ()
#else
didCopyBytesToRemote n = Counter.add _total_bytes_copied_to_remote n
#endif
didCopyBytesFromRemote :: Int64 -> IO ()
#ifndef ACCELERATE_MONITORING
didCopyBytesFromRemote _ = return ()
#else
didCopyBytesFromRemote n = Counter.add _total_bytes_copied_from_remote n
#endif
{-# INLINE setCurrentBytesNursery #-}
setCurrentBytesNursery :: Int64 -> IO ()
#ifndef ACCELERATE_MONITORING
setCurrentBytesNursery _ = return ()
#else
setCurrentBytesNursery n = Gauge.set _current_bytes_nursery n
#endif
didRemoteGC :: IO ()
#ifndef ACCELERATE_MONITORING
didRemoteGC = return ()
#else
didRemoteGC = Counter.inc _num_remote_gcs
#endif
didEvictBytes :: Int64 -> IO ()
#ifndef ACCELERATE_MONITORING
didEvictBytes _ = return ()
#else
didEvictBytes n = do
Counter.inc _num_evictions
Counter.add _total_bytes_evicted_from_remote n
#endif
#ifdef ACCELERATE_MONITORING
data EMAState = ES
{ old_time :: {-# UNPACK #-} !UTCTime
, old_inst :: {-# UNPACK #-} !Double
, old_avg :: {-# UNPACK #-} !Double
}
estimateProcessorLoad :: Atomic -> IORef EMAState -> IO Int64
estimateProcessorLoad !var !ref = do
ES{..} <- readIORef ref
time <- getCurrentTime
sample <- Atomic.and var 0
let
active_ns = fromIntegral sample
elapsed_s = realToFrac (diffUTCTime time old_time)
elapsed_ns = 1.0E9 * elapsed_s
new_inst = 100 * (active_ns / elapsed_ns)
new_avg = ema 0.2 elapsed_s old_avg old_inst new_inst
writeIORef ref (ES time new_inst new_avg)
return (round new_avg)
ema :: Double -> Double -> Double -> Double -> Double -> Double
ema !alpha !dt !old_ema !old_sample !new_sample =
let
a = dt / alpha
u = exp ( -a )
v = ( 1 - u ) / a
in
(u * old_ema) + ((v-u) * old_sample) + ((1-v) * new_sample)
{-# NOINLINE _active_ns_llvm_native #-}
_active_ns_llvm_native :: Atomic
_active_ns_llvm_native = unsafePerformIO (Atomic.new 0)
{-# NOINLINE _active_ns_llvm_ptx #-}
_active_ns_llvm_ptx :: Atomic
_active_ns_llvm_ptx = unsafePerformIO (Atomic.new 0)
{-# NOINLINE _active_ns_cuda #-}
_active_ns_cuda :: Atomic
_active_ns_cuda = unsafePerformIO (Atomic.new 0)
{-# NOINLINE _total_bytes_allocated_local #-}
_total_bytes_allocated_local :: Counter
_total_bytes_allocated_local = unsafePerformIO Counter.new
{-# NOINLINE _total_bytes_allocated_remote #-}
_total_bytes_allocated_remote :: Counter
_total_bytes_allocated_remote = unsafePerformIO Counter.new
{-# NOINLINE _total_bytes_copied_to_remote #-}
_total_bytes_copied_to_remote :: Counter
_total_bytes_copied_to_remote = unsafePerformIO Counter.new
{-# NOINLINE _total_bytes_copied_from_remote #-}
_total_bytes_copied_from_remote :: Counter
_total_bytes_copied_from_remote = unsafePerformIO Counter.new
{-# NOINLINE _total_bytes_evicted_from_remote #-}
_total_bytes_evicted_from_remote :: Counter
_total_bytes_evicted_from_remote = unsafePerformIO Counter.new
{-# NOINLINE _current_bytes_remote #-}
_current_bytes_remote :: Gauge
_current_bytes_remote = unsafePerformIO Gauge.new
{-# NOINLINE _current_bytes_nursery #-}
_current_bytes_nursery :: Gauge
_current_bytes_nursery = unsafePerformIO Gauge.new
{-# NOINLINE _num_remote_gcs #-}
_num_remote_gcs :: Counter
_num_remote_gcs = unsafePerformIO Counter.new
{-# NOINLINE _num_evictions #-}
_num_evictions :: Counter
_num_evictions = unsafePerformIO Counter.new
#endif