{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Network.Ethereum.Contract.Event (
EventAction(..)
, event
, event'
, eventMany'
) where
import Control.Concurrent (threadDelay)
import Control.Monad (forM, void, when)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Reader (ReaderT (..))
import Data.Either (rights)
import Data.Machine (MachineT, asParts, autoM,
await, construct, final,
mapping, repeatedly, runT,
unfoldPlan, (~>))
import Data.Machine.Plan (PlanT, stop, yield)
import Data.Maybe (listToMaybe)
import Control.Concurrent.Async (Async)
import Network.Ethereum.ABI.Event (DecodeEvent (..))
import qualified Network.Ethereum.Web3.Eth as Eth
import Network.Ethereum.Web3.Provider (Web3, forkWeb3)
import Network.Ethereum.Web3.Types (BlockNumber (..), Change (..),
DefaultBlock (..), Filter (..),
FilterId)
data EventAction = ContinueEvent
| TerminateEvent
deriving (Show, Eq)
event :: DecodeEvent i ni e
=> Filter e
-> (e -> ReaderT Change Web3 EventAction)
-> Web3 (Async ())
event fltr = forkWeb3 . event' fltr
event' :: DecodeEvent i ni e
=> Filter e
-> (e -> ReaderT Change Web3 EventAction)
-> Web3 ()
event' fltr = eventMany' fltr 0
eventMany' :: DecodeEvent i ni e
=> Filter e
-> Integer
-> (e -> ReaderT Change Web3 EventAction)
-> Web3 ()
eventMany' fltr window handler = do
start <- mkBlockNumber $ filterFromBlock fltr
let initState = FilterStreamState { fssCurrentBlock = start
, fssInitialFilter = fltr
, fssWindowSize = window
}
mLastProcessedFilterState <- reduceEventStream (playLogs initState) handler
case mLastProcessedFilterState of
Nothing -> startPolling fltr {filterFromBlock = BlockWithNumber start}
Just (act, lastBlock) -> do
end <- mkBlockNumber . filterToBlock $ fltr
when (act /= TerminateEvent && lastBlock < end) $
let pollingFromBlock = lastBlock + 1
in startPolling fltr {filterFromBlock = BlockWithNumber pollingFromBlock}
where
startPolling fltr' = do
filterId <- Eth.newFilter fltr'
let pollTo = filterToBlock fltr'
void $ reduceEventStream (pollFilter filterId pollTo) handler
reduceEventStream :: Monad m
=> MachineT m k [FilterChange a]
-> (a -> ReaderT Change m EventAction)
-> m (Maybe (EventAction, BlockNumber))
reduceEventStream filterChanges handler = fmap listToMaybe . runT $
filterChanges
~> autoM (processChanges handler)
~> asParts
~> runWhile (\(act, _) -> act /= TerminateEvent)
~> final
where
runWhile p = repeatedly $ do
v <- await
if p v
then yield v
else yield v >> stop
processChanges :: Monad m
=> (a -> ReaderT Change m EventAction)
-> [FilterChange a]
-> m [(EventAction, BlockNumber)]
processChanges handler' changes =
forM changes $ \FilterChange{..} -> do
act <- flip runReaderT filterChangeRawChange $
handler' filterChangeEvent
return (act, changeBlockNumber filterChangeRawChange)
data FilterChange a = FilterChange { filterChangeRawChange :: Change
, filterChangeEvent :: a
}
playLogs :: DecodeEvent i ni e
=> FilterStreamState e
-> MachineT Web3 k [FilterChange e]
playLogs s = filterStream s
~> autoM Eth.getLogs
~> mapping mkFilterChanges
pollFilter :: forall i ni e k . DecodeEvent i ni e
=> FilterId
-> DefaultBlock
-> MachineT Web3 k [FilterChange e]
pollFilter i = construct . pollPlan i
where
pollPlan :: FilterId -> DefaultBlock -> PlanT k [FilterChange e] Web3 ()
pollPlan fid end = do
bn <- lift $ Eth.blockNumber
if BlockWithNumber bn > end
then do
_ <- lift $ Eth.uninstallFilter fid
stop
else do
liftIO $ threadDelay 1000000
changes <- lift $ Eth.getFilterChanges fid
yield $ mkFilterChanges changes
pollPlan fid end
mkFilterChanges :: DecodeEvent i ni e
=> [Change]
-> [FilterChange e]
mkFilterChanges = rights
. fmap (\c@Change{..} -> FilterChange c <$> decodeEvent c)
data FilterStreamState e =
FilterStreamState { fssCurrentBlock :: BlockNumber
, fssInitialFilter :: Filter e
, fssWindowSize :: Integer
}
filterStream :: FilterStreamState e
-> MachineT Web3 k (Filter e)
filterStream initialPlan = unfoldPlan initialPlan filterPlan
where
filterPlan :: FilterStreamState e -> PlanT k (Filter e) Web3 (FilterStreamState e)
filterPlan initialState@FilterStreamState{..} = do
end <- lift . mkBlockNumber $ filterToBlock fssInitialFilter
if fssCurrentBlock > end
then stop
else do
let to' = newTo end fssCurrentBlock fssWindowSize
filter' = fssInitialFilter { filterFromBlock = BlockWithNumber fssCurrentBlock
, filterToBlock = BlockWithNumber to'
}
yield filter'
filterPlan $ initialState { fssCurrentBlock = succBn to' }
succBn :: BlockNumber -> BlockNumber
succBn (BlockNumber bn) = BlockNumber $ bn + 1
newTo :: BlockNumber -> BlockNumber -> Integer -> BlockNumber
newTo upper (BlockNumber current) window = min upper . BlockNumber $ current + window
mkBlockNumber :: DefaultBlock -> Web3 BlockNumber
mkBlockNumber bm = case bm of
BlockWithNumber bn -> return bn
Earliest -> return 0
_ -> Eth.blockNumber