{-# LANGUAGE CPP                    #-}
{-# LANGUAGE DataKinds              #-}
{-# LANGUAGE FlexibleContexts       #-}
{-# LANGUAGE FlexibleInstances      #-}
{-# LANGUAGE GADTs                  #-}
{-# LANGUAGE MultiParamTypeClasses  #-}
{-# LANGUAGE RankNTypes             #-}
{-# LANGUAGE RecordWildCards        #-}
{-# LANGUAGE ScopedTypeVariables    #-}
{-# LANGUAGE TypeApplications       #-}
{-# LANGUAGE TypeFamilyDependencies #-}
{-# LANGUAGE TypeOperators          #-}
{-# LANGUAGE UndecidableInstances   #-}

-- |
-- Module      :  Network.Ethereum.Contract.Event.MultiFilter
-- Copyright   :  FOAM team <http://foam.space> 2018
-- License     :  Apache-2.0
--
-- Maintainer  :  mail@akru.me
-- Stability   :  experimental
-- Portability :  unportable
--
-- Parallel multiple event filters.
--

module  Network.Ethereum.Contract.Event.MultiFilter
    (
    -- * The @MultiFilter@ type
      MultiFilter(..)
    , minStartBlock
    , minEndBlock
    , modifyMultiFilter

    -- * With geth filters
    , multiEvent
    , multiEventMany

    -- * Without geth filters
    , multiEventNoFilter
    , multiEventManyNoFilter

    -- * Re-exports
    , Handlers
    , Handler(..)
    , Rec(..)
    ) where

import           Control.Concurrent                     (threadDelay)
import           Control.Monad                          (forM, void, when)
import           Control.Monad.IO.Class                 (MonadIO (..))
import           Control.Monad.Trans.Class              (lift)
import           Control.Monad.Trans.Reader             (ReaderT (..))
import           Data.List                              (sortOn)
import           Data.Machine                           (MachineT, asParts,
                                                         autoM, await,
                                                         construct, final,
                                                         repeatedly, runT,
                                                         unfoldPlan, (~>))
import           Data.Machine.Plan                      (PlanT, stop, yield)
import           Data.Maybe                             (catMaybes, fromJust,
                                                         listToMaybe)
import           Data.Tagged                            (Tagged (..))
import           Data.Vinyl                             (Rec ((:&), RNil),
                                                         RecApplicative)
import           Data.Vinyl.CoRec                       (CoRec (..), Field,
                                                         FoldRec, Handler (H),
                                                         Handlers, coRecToRec,
                                                         firstField, match,
                                                         onField)
import           Data.Vinyl.Functor                     (Compose (..),
                                                         Identity (..))
#if MIN_VERSION_vinyl(0,10,0)
import           Data.Vinyl                             (RPureConstrained)
#else
import           Data.Proxy                             (Proxy (..))
import           Data.Vinyl.TypeLevel                   (AllAllSat)
#endif

import           Data.Solidity.Event                    (DecodeEvent (..))
import qualified Network.Ethereum.Api.Eth               as Eth
import           Network.Ethereum.Api.Types             (Change (..),
                                                         DefaultBlock (..),
                                                         Filter (..), Quantity)
import Network.JsonRpc.TinyClient (JsonRpc(..))
import           Network.Ethereum.Contract.Event.Common

--------------------------------------------------------------------------------
-- | MultiFilters
--------------------------------------------------------------------------------

data MultiFilter (es :: [*]) where
  NilFilters :: MultiFilter '[]
  (:?) :: Filter e -> MultiFilter es -> MultiFilter (e ': es)

infixr 5 :?

minEndBlock
  :: MultiFilter es
  -> DefaultBlock
minEndBlock :: MultiFilter es -> DefaultBlock
minEndBlock MultiFilter es
NilFilters             = DefaultBlock
Pending
minEndBlock (Filter Maybe [Address]
_ DefaultBlock
_ DefaultBlock
e Maybe [Maybe HexString]
_ :? MultiFilter es
fs) = DefaultBlock
e DefaultBlock -> DefaultBlock -> DefaultBlock
forall a. Ord a => a -> a -> a
`min` MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minEndBlock MultiFilter es
fs

minStartBlock
  :: MultiFilter es
  -> DefaultBlock
minStartBlock :: MultiFilter es -> DefaultBlock
minStartBlock MultiFilter es
NilFilters             = DefaultBlock
Pending
minStartBlock (Filter Maybe [Address]
_ DefaultBlock
s DefaultBlock
_ Maybe [Maybe HexString]
_ :? MultiFilter es
fs) = DefaultBlock
s DefaultBlock -> DefaultBlock -> DefaultBlock
forall a. Ord a => a -> a -> a
`min` MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minStartBlock MultiFilter es
fs

modifyMultiFilter
  :: (forall e. Filter e -> Filter e)
  -> MultiFilter es
  -> MultiFilter es
modifyMultiFilter :: (forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
modifyMultiFilter forall e. Filter e -> Filter e
_ MultiFilter es
NilFilters = MultiFilter es
MultiFilter '[]
NilFilters
modifyMultiFilter forall e. Filter e -> Filter e
h (Filter e
f :? MultiFilter es
fs)  = Filter e -> Filter e
forall e. Filter e -> Filter e
h Filter e
f Filter e -> MultiFilter es -> MultiFilter (e : es)
forall e (es :: [*]).
Filter e -> MultiFilter es -> MultiFilter (e : es)
:? (forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
forall (es :: [*]).
(forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
modifyMultiFilter forall e. Filter e -> Filter e
h MultiFilter es
fs


multiEvent
  :: ( PollFilters es
     , QueryAllLogs es
     , MapHandlers m es (WithChange es)
#if MIN_VERSION_vinyl(0,10,0)
     , RPureConstrained HasLogIndex (WithChange es)
#else
     , AllAllSat '[HasLogIndex] (WithChange es)
#endif
     , RecApplicative (WithChange es)
     , JsonRpc m
     )
  => MultiFilter es
  -> Handlers es (ReaderT Change m EventAction)
  -> m ()
multiEvent :: MultiFilter es
-> Handlers es (ReaderT Change m EventAction) -> m ()
multiEvent MultiFilter es
fltrs = MultiFilter es
-> Integer -> Handlers es (ReaderT Change m EventAction) -> m ()
forall (es :: [*]) (m :: * -> *).
(PollFilters es, QueryAllLogs es, MapHandlers m es (WithChange es),
 RPureConstrained HasLogIndex (WithChange es),
 RecApplicative (WithChange es), JsonRpc m) =>
MultiFilter es
-> Integer -> Handlers es (ReaderT Change m EventAction) -> m ()
multiEventMany MultiFilter es
fltrs Integer
0

data MultiFilterStreamState es =
  MultiFilterStreamState { MultiFilterStreamState es -> Quantity
mfssCurrentBlock       :: Quantity
                         , MultiFilterStreamState es -> MultiFilter es
mfssInitialMultiFilter :: MultiFilter es
                         , MultiFilterStreamState es -> Integer
mfssWindowSize         :: Integer
                         }


multiEventMany
  :: ( PollFilters es
     , QueryAllLogs es
     , MapHandlers m es (WithChange es)
#if MIN_VERSION_vinyl(0,10,0)
     , RPureConstrained HasLogIndex (WithChange es)
#else
     , AllAllSat '[HasLogIndex] (WithChange es)
#endif
     , RecApplicative (WithChange es)
     , JsonRpc m
     )
  => MultiFilter es
  -> Integer
  -> Handlers es (ReaderT Change m EventAction)
  -> m ()
multiEventMany :: MultiFilter es
-> Integer -> Handlers es (ReaderT Change m EventAction) -> m ()
multiEventMany MultiFilter es
fltrs Integer
window Handlers es (ReaderT Change m EventAction)
handlers = do
    Quantity
start <- DefaultBlock -> m Quantity
forall (m :: * -> *). JsonRpc m => DefaultBlock -> m Quantity
mkBlockNumber (DefaultBlock -> m Quantity) -> DefaultBlock -> m Quantity
forall a b. (a -> b) -> a -> b
$ MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minStartBlock MultiFilter es
fltrs
    let initState :: MultiFilterStreamState es
initState =
          MultiFilterStreamState :: forall (es :: [*]).
Quantity -> MultiFilter es -> Integer -> MultiFilterStreamState es
MultiFilterStreamState { mfssCurrentBlock :: Quantity
mfssCurrentBlock = Quantity
start
                                 , mfssInitialMultiFilter :: MultiFilter es
mfssInitialMultiFilter = MultiFilter es
fltrs
                                 , mfssWindowSize :: Integer
mfssWindowSize = Integer
window
                                 }
    Maybe (EventAction, Quantity)
mLastProcessedFilterState <- MachineT m Any [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
(Monad m, MapHandlers m es (WithChange es)) =>
MachineT m k [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceMultiEventStream (MultiFilterStreamState es -> MachineT m Any [Field (WithChange es)]
forall (es :: [*]) (k :: * -> *) (m :: * -> *).
(QueryAllLogs es, RPureConstrained HasLogIndex (WithChange es),
 RecApplicative (WithChange es), JsonRpc m) =>
MultiFilterStreamState es -> MachineT m k [Field (WithChange es)]
playMultiLogs MultiFilterStreamState es
initState) Handlers es (ReaderT Change m EventAction)
handlers
    case Maybe (EventAction, Quantity)
mLastProcessedFilterState of
      Maybe (EventAction, Quantity)
Nothing -> MultiFilter es -> m ()
startPolling ((forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
forall (es :: [*]).
(forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
modifyMultiFilter (\Filter e
f -> Filter e
f {filterFromBlock :: DefaultBlock
filterFromBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
start}) MultiFilter es
fltrs)
      Just (EventAction
act, Quantity
lastBlock) -> do
        Quantity
end <- DefaultBlock -> m Quantity
forall (m :: * -> *). JsonRpc m => DefaultBlock -> m Quantity
mkBlockNumber (DefaultBlock -> m Quantity)
-> (MultiFilter es -> DefaultBlock) -> MultiFilter es -> m Quantity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minEndBlock (MultiFilter es -> m Quantity) -> MultiFilter es -> m Quantity
forall a b. (a -> b) -> a -> b
$ MultiFilter es
fltrs
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (EventAction
act EventAction -> EventAction -> Bool
forall a. Eq a => a -> a -> Bool
/= EventAction
TerminateEvent Bool -> Bool -> Bool
&& Quantity
lastBlock Quantity -> Quantity -> Bool
forall a. Ord a => a -> a -> Bool
< Quantity
end) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          let pollingFromBlock :: Quantity
pollingFromBlock = Quantity
lastBlock Quantity -> Quantity -> Quantity
forall a. Num a => a -> a -> a
+ Quantity
1
          in MultiFilter es -> m ()
startPolling ((forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
forall (es :: [*]).
(forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
modifyMultiFilter (\Filter e
f -> Filter e
f {filterFromBlock :: DefaultBlock
filterFromBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
pollingFromBlock}) MultiFilter es
fltrs)
  where
    startPolling :: MultiFilter es -> m ()
startPolling MultiFilter es
fltrs' = do
      TaggedFilterIds es
fIds <- MultiFilter es -> m (TaggedFilterIds es)
forall (es :: [*]) (m :: * -> *).
(PollFilters es, JsonRpc m) =>
MultiFilter es -> m (TaggedFilterIds es)
openMultiFilter MultiFilter es
fltrs'
      let pollTo :: DefaultBlock
pollTo = MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minEndBlock MultiFilter es
fltrs'
      m (Maybe (EventAction, Quantity)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Maybe (EventAction, Quantity)) -> m ())
-> m (Maybe (EventAction, Quantity)) -> m ()
forall a b. (a -> b) -> a -> b
$ MachineT m Any [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
(Monad m, MapHandlers m es (WithChange es)) =>
MachineT m k [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceMultiEventStream (TaggedFilterIds es
-> DefaultBlock -> MachineT m Any [Field (WithChange es)]
forall (es :: [*]) (m :: * -> *) (k :: * -> *).
(PollFilters es, RecApplicative (WithChange es),
 RPureConstrained HasLogIndex (WithChange es), JsonRpc m) =>
TaggedFilterIds es
-> DefaultBlock -> MachineT m k [Field (WithChange es)]
pollMultiFilter TaggedFilterIds es
fIds DefaultBlock
pollTo) Handlers es (ReaderT Change m EventAction)
handlers

multiFilterStream :: JsonRpc m
                  => MultiFilterStreamState es
                  -> MachineT m k (MultiFilter es)
multiFilterStream :: MultiFilterStreamState es -> MachineT m k (MultiFilter es)
multiFilterStream MultiFilterStreamState es
initialPlan = do
  MultiFilterStreamState es
-> (MultiFilterStreamState es
    -> PlanT k (MultiFilter es) m (MultiFilterStreamState es))
-> MachineT m k (MultiFilter es)
forall (m :: * -> *) s (k :: * -> *) o.
Monad m =>
s -> (s -> PlanT k o m s) -> MachineT m k o
unfoldPlan MultiFilterStreamState es
initialPlan ((MultiFilterStreamState es
  -> PlanT k (MultiFilter es) m (MultiFilterStreamState es))
 -> MachineT m k (MultiFilter es))
-> (MultiFilterStreamState es
    -> PlanT k (MultiFilter es) m (MultiFilterStreamState es))
-> MachineT m k (MultiFilter es)
forall a b. (a -> b) -> a -> b
$ \MultiFilterStreamState es
s -> do
    Quantity
end <- m Quantity -> PlanT k (MultiFilter es) m Quantity
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m Quantity -> PlanT k (MultiFilter es) m Quantity)
-> (MultiFilterStreamState es -> m Quantity)
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m Quantity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DefaultBlock -> m Quantity
forall (m :: * -> *). JsonRpc m => DefaultBlock -> m Quantity
mkBlockNumber (DefaultBlock -> m Quantity)
-> (MultiFilterStreamState es -> DefaultBlock)
-> MultiFilterStreamState es
-> m Quantity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minEndBlock (MultiFilter es -> DefaultBlock)
-> (MultiFilterStreamState es -> MultiFilter es)
-> MultiFilterStreamState es
-> DefaultBlock
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MultiFilterStreamState es -> MultiFilter es
forall (es :: [*]). MultiFilterStreamState es -> MultiFilter es
mfssInitialMultiFilter (MultiFilterStreamState es -> PlanT k (MultiFilter es) m Quantity)
-> MultiFilterStreamState es -> PlanT k (MultiFilter es) m Quantity
forall a b. (a -> b) -> a -> b
$ MultiFilterStreamState es
initialPlan
    Quantity
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
JsonRpc m =>
Quantity
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
filterPlan Quantity
end MultiFilterStreamState es
s
  where
    filterPlan :: JsonRpc m
               => Quantity
               -> MultiFilterStreamState es
               -> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
    filterPlan :: Quantity
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
filterPlan Quantity
end initialState :: MultiFilterStreamState es
initialState@MultiFilterStreamState{Integer
Quantity
MultiFilter es
mfssWindowSize :: Integer
mfssInitialMultiFilter :: MultiFilter es
mfssCurrentBlock :: Quantity
mfssWindowSize :: forall (es :: [*]). MultiFilterStreamState es -> Integer
mfssInitialMultiFilter :: forall (es :: [*]). MultiFilterStreamState es -> MultiFilter es
mfssCurrentBlock :: forall (es :: [*]). MultiFilterStreamState es -> Quantity
..} = do
      if Quantity
mfssCurrentBlock Quantity -> Quantity -> Bool
forall a. Ord a => a -> a -> Bool
> Quantity
end
        then PlanT k (MultiFilter es) m (MultiFilterStreamState es)
forall (k :: * -> *) o a. Plan k o a
stop
        else do
          let to' :: Quantity
to' = Quantity -> Quantity -> Quantity
forall a. Ord a => a -> a -> a
min Quantity
end (Quantity -> Quantity) -> Quantity -> Quantity
forall a b. (a -> b) -> a -> b
$ Quantity
mfssCurrentBlock Quantity -> Quantity -> Quantity
forall a. Num a => a -> a -> a
+ Integer -> Quantity
forall a. Num a => Integer -> a
fromInteger Integer
mfssWindowSize
              h :: forall e. Filter e -> Filter e
              h :: Filter e -> Filter e
h Filter e
f = Filter e
f { filterFromBlock :: DefaultBlock
filterFromBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
mfssCurrentBlock
                      , filterToBlock :: DefaultBlock
filterToBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
to'
                      }
          MultiFilter es -> Plan k (MultiFilter es) ()
forall o (k :: * -> *). o -> Plan k o ()
yield ((forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
forall (es :: [*]).
(forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
modifyMultiFilter forall e. Filter e -> Filter e
h MultiFilter es
mfssInitialMultiFilter)
          Quantity
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
JsonRpc m =>
Quantity
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
filterPlan Quantity
end MultiFilterStreamState es
initialState { mfssCurrentBlock :: Quantity
mfssCurrentBlock = Quantity
to' Quantity -> Quantity -> Quantity
forall a. Num a => a -> a -> a
+ Quantity
1 }

weakenCoRec
  :: ( RecApplicative ts
     , FoldRec (t ': ts) (t ': ts)
     )
  => Field ts
  -> Field (t ': ts)
weakenCoRec :: Field ts -> Field (t : ts)
weakenCoRec = Maybe (Field (t : ts)) -> Field (t : ts)
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (Field (t : ts)) -> Field (t : ts))
-> (Field ts -> Maybe (Field (t : ts)))
-> Field ts
-> Field (t : ts)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Rec (Maybe :. Identity) (t : ts) -> Maybe (Field (t : ts))
forall k (ts :: [k]) (f :: k -> *).
FoldRec ts ts =>
Rec (Maybe :. f) ts -> Maybe (CoRec f ts)
firstField (Rec (Maybe :. Identity) (t : ts) -> Maybe (Field (t : ts)))
-> (Field ts -> Rec (Maybe :. Identity) (t : ts))
-> Field ts
-> Maybe (Field (t : ts))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe (Identity t) -> Compose Maybe Identity t
forall l k (f :: l -> *) (g :: k -> l) (x :: k).
f (g x) -> Compose f g x
Compose Maybe (Identity t)
forall a. Maybe a
Nothing Compose Maybe Identity t
-> Rec (Maybe :. Identity) ts -> Rec (Maybe :. Identity) (t : ts)
forall u (a :: u -> *) (r :: u) (rs :: [u]).
a r -> Rec a rs -> Rec a (r : rs)
:&) (Rec (Maybe :. Identity) ts -> Rec (Maybe :. Identity) (t : ts))
-> (Field ts -> Rec (Maybe :. Identity) ts)
-> Field ts
-> Rec (Maybe :. Identity) (t : ts)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Field ts -> Rec (Maybe :. Identity) ts
forall u (f :: u -> *) (ts :: [u]).
RecApplicative ts =>
CoRec f ts -> Rec (Maybe :. f) ts
coRecToRec

type family WithChange (es :: [*]) = (es' :: [*]) | es' -> es where
  WithChange '[] = '[]
  WithChange (e : es) = FilterChange e : WithChange es

class QueryAllLogs (es :: [*]) where
    queryAllLogs :: JsonRpc m => MultiFilter es -> m [Field (WithChange es)]

instance QueryAllLogs '[] where
  queryAllLogs :: MultiFilter '[] -> m [Field (WithChange '[])]
queryAllLogs MultiFilter '[]
NilFilters = [Field '[]] -> m [Field '[]]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []

instance forall e i ni es.
  ( DecodeEvent i ni e
  , QueryAllLogs es
  , RecApplicative (WithChange es)
  , FoldRec (FilterChange e : WithChange es) (WithChange es)
  ) => QueryAllLogs (e:es) where

  queryAllLogs :: MultiFilter (e : es) -> m [Field (WithChange (e : es))]
queryAllLogs (Filter e
f  :? MultiFilter es
fs) = do
    [Change]
changes <- Filter e -> m [Change]
forall (m :: * -> *) e. JsonRpc m => Filter e -> m [Change]
Eth.getLogs Filter e
f
    [FilterChange e]
filterChanges <- IO [FilterChange e] -> m [FilterChange e]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [FilterChange e] -> m [FilterChange e])
-> ([Change] -> IO [FilterChange e])
-> [Change]
-> m [FilterChange e]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DecodeEvent i ni e => [Change] -> IO [FilterChange e]
forall i ni e.
DecodeEvent i ni e =>
[Change] -> IO [FilterChange e]
mkFilterChanges @_ @_ @e ([Change] -> m [FilterChange e]) -> [Change] -> m [FilterChange e]
forall a b. (a -> b) -> a -> b
$ [Change]
changes
    [Field (WithChange es)]
filterChanges' <- MultiFilter es -> m [Field (WithChange es)]
forall (es :: [*]) (m :: * -> *).
(QueryAllLogs es, JsonRpc m) =>
MultiFilter es -> m [Field (WithChange es)]
queryAllLogs MultiFilter es
fs
    [CoRec Identity (FilterChange e : WithChange es)]
-> m [CoRec Identity (FilterChange e : WithChange es)]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([CoRec Identity (FilterChange e : WithChange es)]
 -> m [CoRec Identity (FilterChange e : WithChange es)])
-> [CoRec Identity (FilterChange e : WithChange es)]
-> m [CoRec Identity (FilterChange e : WithChange es)]
forall a b. (a -> b) -> a -> b
$ (FilterChange e -> CoRec Identity (FilterChange e : WithChange es))
-> [FilterChange e]
-> [CoRec Identity (FilterChange e : WithChange es)]
forall a b. (a -> b) -> [a] -> [b]
map (Identity (FilterChange e)
-> CoRec Identity (FilterChange e : WithChange es)
forall k (a1 :: k) (b :: [k]) (a :: k -> *).
RElem a1 b (RIndex a1 b) =>
a a1 -> CoRec a b
CoRec (Identity (FilterChange e)
 -> CoRec Identity (FilterChange e : WithChange es))
-> (FilterChange e -> Identity (FilterChange e))
-> FilterChange e
-> CoRec Identity (FilterChange e : WithChange es)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilterChange e -> Identity (FilterChange e)
forall a. a -> Identity a
Identity) [FilterChange e]
filterChanges [CoRec Identity (FilterChange e : WithChange es)]
-> [CoRec Identity (FilterChange e : WithChange es)]
-> [CoRec Identity (FilterChange e : WithChange es)]
forall a. Semigroup a => a -> a -> a
<> (Field (WithChange es)
 -> CoRec Identity (FilterChange e : WithChange es))
-> [Field (WithChange es)]
-> [CoRec Identity (FilterChange e : WithChange es)]
forall a b. (a -> b) -> [a] -> [b]
map Field (WithChange es)
-> CoRec Identity (FilterChange e : WithChange es)
forall (ts :: [*]) t.
(RecApplicative ts, FoldRec (t : ts) (t : ts)) =>
Field ts -> Field (t : ts)
weakenCoRec [Field (WithChange es)]
filterChanges'

class HasLogIndex a where
  getLogIndex :: a -> Maybe (Quantity, Quantity)

instance HasLogIndex (FilterChange e) where
  getLogIndex :: FilterChange e -> Maybe (Quantity, Quantity)
getLogIndex FilterChange{e
Change
filterChangeEvent :: forall a. FilterChange a -> a
filterChangeRawChange :: forall a. FilterChange a -> Change
filterChangeEvent :: e
filterChangeRawChange :: Change
..} =
    (,) (Quantity -> Quantity -> (Quantity, Quantity))
-> Maybe Quantity -> Maybe (Quantity -> (Quantity, Quantity))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Change -> Maybe Quantity
changeBlockNumber Change
filterChangeRawChange Maybe (Quantity -> (Quantity, Quantity))
-> Maybe Quantity -> Maybe (Quantity, Quantity)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Change -> Maybe Quantity
changeLogIndex Change
filterChangeRawChange

sortChanges
#if MIN_VERSION_vinyl(0,10,0)
  :: ( RPureConstrained HasLogIndex es
#else
  :: ( AllAllSat '[HasLogIndex] es
#endif
     , RecApplicative es
     )
  => [Field es]
  -> [Field es]
sortChanges :: [Field es] -> [Field es]
sortChanges [Field es]
changes =
#if MIN_VERSION_vinyl(0,10,0)
  let sorterProj :: Field ts -> Maybe (Quantity, Quantity)
sorterProj Field ts
change = (forall a.
 (a ∈ ts, HasLogIndex a) =>
 a -> Maybe (Quantity, Quantity))
-> Field ts -> Maybe (Quantity, Quantity)
forall (c :: * -> Constraint) (ts :: [*]) b.
RPureConstrained c ts =>
(forall a. (a ∈ ts, c a) => a -> b) -> Field ts -> b
onField @HasLogIndex forall a.
(a ∈ ts, HasLogIndex a) =>
a -> Maybe (Quantity, Quantity)
forall a. HasLogIndex a => a -> Maybe (Quantity, Quantity)
getLogIndex Field ts
change
#else
  let sorterProj change = onField (Proxy @'[HasLogIndex]) getLogIndex change
#endif
  in (Field es -> Maybe (Quantity, Quantity))
-> [Field es] -> [Field es]
forall b a. Ord b => (a -> b) -> [a] -> [a]
sortOn Field es -> Maybe (Quantity, Quantity)
forall (ts :: [*]).
RPureConstrained HasLogIndex ts =>
Field ts -> Maybe (Quantity, Quantity)
sorterProj [Field es]
changes

class MapHandlers m es es' where
  mapHandlers
    :: Handlers es (ReaderT Change m EventAction)
    -> Handlers es' (m (Maybe (EventAction, Quantity)))

instance Monad m => MapHandlers m '[] '[] where
  mapHandlers :: Handlers '[] (ReaderT Change m EventAction)
-> Handlers '[] (m (Maybe (EventAction, Quantity)))
mapHandlers Handlers '[] (ReaderT Change m EventAction)
RNil = Handlers '[] (m (Maybe (EventAction, Quantity)))
forall u (a :: u -> *). Rec a '[]
RNil

instance
  ( Monad m
  , MapHandlers m es es'
  ) => MapHandlers m (e : es) (FilterChange e : es') where

  mapHandlers :: Handlers (e : es) (ReaderT Change m EventAction)
-> Handlers
     (FilterChange e : es') (m (Maybe (EventAction, Quantity)))
mapHandlers (H r -> ReaderT Change m EventAction
f :& Rec (Handler (ReaderT Change m EventAction)) rs
fs) =
    let f' :: FilterChange r -> m (Maybe (EventAction, Quantity))
f' FilterChange{r
Change
filterChangeEvent :: r
filterChangeRawChange :: Change
filterChangeEvent :: forall a. FilterChange a -> a
filterChangeRawChange :: forall a. FilterChange a -> Change
..} = do
          EventAction
act <- ReaderT Change m EventAction -> Change -> m EventAction
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (r -> ReaderT Change m EventAction
f r
filterChangeEvent) Change
filterChangeRawChange
          Maybe (EventAction, Quantity) -> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) a. Monad m => a -> m a
return ((,) EventAction
act (Quantity -> (EventAction, Quantity))
-> Maybe Quantity -> Maybe (EventAction, Quantity)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Change -> Maybe Quantity
changeBlockNumber Change
filterChangeRawChange)
    in (FilterChange r -> m (Maybe (EventAction, Quantity)))
-> Handler (m (Maybe (EventAction, Quantity))) (FilterChange r)
forall b a. (a -> b) -> Handler b a
H FilterChange r -> m (Maybe (EventAction, Quantity))
f' Handler (m (Maybe (EventAction, Quantity))) (FilterChange r)
-> Rec (Handler (m (Maybe (EventAction, Quantity)))) es'
-> Rec
     (Handler (m (Maybe (EventAction, Quantity))))
     (FilterChange r : es')
forall u (a :: u -> *) (r :: u) (rs :: [u]).
a r -> Rec a rs -> Rec a (r : rs)
:& Rec (Handler (ReaderT Change m EventAction)) rs
-> Rec (Handler (m (Maybe (EventAction, Quantity)))) es'
forall (m :: * -> *) (es :: [*]) (es' :: [*]).
MapHandlers m es es' =>
Handlers es (ReaderT Change m EventAction)
-> Handlers es' (m (Maybe (EventAction, Quantity)))
mapHandlers Rec (Handler (ReaderT Change m EventAction)) rs
fs


reduceMultiEventStream
  :: ( Monad m
     , MapHandlers m es (WithChange es)
     )
  => MachineT m k [Field (WithChange es)]
  -> Handlers es (ReaderT Change m EventAction)
  -> m (Maybe (EventAction, Quantity))
reduceMultiEventStream :: MachineT m k [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceMultiEventStream MachineT m k [Field (WithChange es)]
filterChanges Handlers es (ReaderT Change m EventAction)
handlers = ([(EventAction, Quantity)] -> Maybe (EventAction, Quantity))
-> m [(EventAction, Quantity)] -> m (Maybe (EventAction, Quantity))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [(EventAction, Quantity)] -> Maybe (EventAction, Quantity)
forall a. [a] -> Maybe a
listToMaybe (m [(EventAction, Quantity)] -> m (Maybe (EventAction, Quantity)))
-> (MachineT m k (EventAction, Quantity)
    -> m [(EventAction, Quantity)])
-> MachineT m k (EventAction, Quantity)
-> m (Maybe (EventAction, Quantity))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MachineT m k (EventAction, Quantity) -> m [(EventAction, Quantity)]
forall (m :: * -> *) (k :: * -> *) b.
Monad m =>
MachineT m k b -> m [b]
runT (MachineT m k (EventAction, Quantity)
 -> m (Maybe (EventAction, Quantity)))
-> MachineT m k (EventAction, Quantity)
-> m (Maybe (EventAction, Quantity))
forall a b. (a -> b) -> a -> b
$
       MachineT m k [Field (WithChange es)]
filterChanges
    MachineT m k [Field (WithChange es)]
-> ProcessT m [Field (WithChange es)] [(EventAction, Quantity)]
-> MachineT m k [(EventAction, Quantity)]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ([Field (WithChange es)] -> m [(EventAction, Quantity)])
-> ProcessT m [Field (WithChange es)] [(EventAction, Quantity)]
forall (k :: * -> * -> *) (m :: * -> *) a b.
(Category k, Monad m) =>
(a -> m b) -> MachineT m (k a) b
autoM (Handlers es (ReaderT Change m EventAction)
-> [Field (WithChange es)] -> m [(EventAction, Quantity)]
forall (f :: * -> *) (es :: [*]) (ts :: [*]).
(Monad f, MapHandlers f es ts) =>
Handlers es (ReaderT Change f EventAction)
-> [CoRec Identity ts] -> f [(EventAction, Quantity)]
processChanges Handlers es (ReaderT Change m EventAction)
handlers)
    MachineT m k [(EventAction, Quantity)]
-> ProcessT m [(EventAction, Quantity)] (EventAction, Quantity)
-> MachineT m k (EventAction, Quantity)
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ProcessT m [(EventAction, Quantity)] (EventAction, Quantity)
forall (f :: * -> *) a. Foldable f => Process (f a) a
asParts
    MachineT m k (EventAction, Quantity)
-> ProcessT m (EventAction, Quantity) (EventAction, Quantity)
-> MachineT m k (EventAction, Quantity)
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ((EventAction, Quantity) -> Bool)
-> ProcessT m (EventAction, Quantity) (EventAction, Quantity)
forall (m :: * -> *) (k :: * -> * -> *) o.
(Monad m, Category k) =>
(o -> Bool) -> MachineT m (k o) o
runWhile (\(EventAction
act, Quantity
_) -> EventAction
act EventAction -> EventAction -> Bool
forall a. Eq a => a -> a -> Bool
/= EventAction
TerminateEvent)
    MachineT m k (EventAction, Quantity)
-> ProcessT m (EventAction, Quantity) (EventAction, Quantity)
-> MachineT m k (EventAction, Quantity)
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> ProcessT m (EventAction, Quantity) (EventAction, Quantity)
forall (k :: * -> * -> *) a. Category k => Machine (k a) a
final
  where
    runWhile :: (o -> Bool) -> MachineT m (k o) o
runWhile o -> Bool
p = PlanT (k o) o m () -> MachineT m (k o) o
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
repeatedly (PlanT (k o) o m () -> MachineT m (k o) o)
-> PlanT (k o) o m () -> MachineT m (k o) o
forall a b. (a -> b) -> a -> b
$ do
      o
v <- PlanT (k o) o m o
forall (k :: * -> * -> *) i o. Category k => Plan (k i) o i
await
      if o -> Bool
p o
v
        then o -> Plan (k o) o ()
forall o (k :: * -> *). o -> Plan k o ()
yield o
v
        else o -> Plan (k o) o ()
forall o (k :: * -> *). o -> Plan k o ()
yield o
v PlanT (k o) o m () -> PlanT (k o) o m () -> PlanT (k o) o m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> PlanT (k o) o m ()
forall (k :: * -> *) o a. Plan k o a
stop
    processChanges :: Handlers es (ReaderT Change f EventAction)
-> [CoRec Identity ts] -> f [(EventAction, Quantity)]
processChanges Handlers es (ReaderT Change f EventAction)
handlers' [CoRec Identity ts]
changes = ([Maybe (EventAction, Quantity)] -> [(EventAction, Quantity)])
-> f [Maybe (EventAction, Quantity)] -> f [(EventAction, Quantity)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Maybe (EventAction, Quantity)] -> [(EventAction, Quantity)]
forall a. [Maybe a] -> [a]
catMaybes (f [Maybe (EventAction, Quantity)] -> f [(EventAction, Quantity)])
-> f [Maybe (EventAction, Quantity)] -> f [(EventAction, Quantity)]
forall a b. (a -> b) -> a -> b
$
        [CoRec Identity ts]
-> (CoRec Identity ts -> f (Maybe (EventAction, Quantity)))
-> f [Maybe (EventAction, Quantity)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [CoRec Identity ts]
changes ((CoRec Identity ts -> f (Maybe (EventAction, Quantity)))
 -> f [Maybe (EventAction, Quantity)])
-> (CoRec Identity ts -> f (Maybe (EventAction, Quantity)))
-> f [Maybe (EventAction, Quantity)]
forall a b. (a -> b) -> a -> b
$ \CoRec Identity ts
fc -> CoRec Identity ts
-> Handlers ts (f (Maybe (EventAction, Quantity)))
-> f (Maybe (EventAction, Quantity))
forall (ts :: [*]) b. CoRec Identity ts -> Handlers ts b -> b
match CoRec Identity ts
fc (Handlers es (ReaderT Change f EventAction)
-> Handlers ts (f (Maybe (EventAction, Quantity)))
forall (m :: * -> *) (es :: [*]) (es' :: [*]).
MapHandlers m es es' =>
Handlers es (ReaderT Change m EventAction)
-> Handlers es' (m (Maybe (EventAction, Quantity)))
mapHandlers Handlers es (ReaderT Change f EventAction)
handlers')

-- | 'playLogs' streams the 'filterStream' and calls eth_getLogs on these 'Filter' objects.
playMultiLogs
  :: forall es k m.
     ( QueryAllLogs es
#if MIN_VERSION_vinyl(0,10,0)
     , RPureConstrained HasLogIndex (WithChange es)
#else
     , AllAllSat '[HasLogIndex] (WithChange es)
#endif
     , RecApplicative (WithChange es)
     , JsonRpc m
     )
  => MultiFilterStreamState es
  -> MachineT m k [Field (WithChange es)]
playMultiLogs :: MultiFilterStreamState es -> MachineT m k [Field (WithChange es)]
playMultiLogs MultiFilterStreamState es
s = ([Field (WithChange es)] -> [Field (WithChange es)])
-> MachineT m k [Field (WithChange es)]
-> MachineT m k [Field (WithChange es)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Field (WithChange es)] -> [Field (WithChange es)]
forall (es :: [*]).
(RPureConstrained HasLogIndex es, RecApplicative es) =>
[Field es] -> [Field es]
sortChanges (MachineT m k [Field (WithChange es)]
 -> MachineT m k [Field (WithChange es)])
-> MachineT m k [Field (WithChange es)]
-> MachineT m k [Field (WithChange es)]
forall a b. (a -> b) -> a -> b
$
     MultiFilterStreamState es -> MachineT m k (MultiFilter es)
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
JsonRpc m =>
MultiFilterStreamState es -> MachineT m k (MultiFilter es)
multiFilterStream MultiFilterStreamState es
s
  MachineT m k (MultiFilter es)
-> ProcessT m (MultiFilter es) [Field (WithChange es)]
-> MachineT m k [Field (WithChange es)]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> (MultiFilter es -> m [Field (WithChange es)])
-> ProcessT m (MultiFilter es) [Field (WithChange es)]
forall (k :: * -> * -> *) (m :: * -> *) a b.
(Category k, Monad m) =>
(a -> m b) -> MachineT m (k a) b
autoM MultiFilter es -> m [Field (WithChange es)]
forall (es :: [*]) (m :: * -> *).
(QueryAllLogs es, JsonRpc m) =>
MultiFilter es -> m [Field (WithChange es)]
queryAllLogs

data TaggedFilterIds (es :: [*]) where
    TaggedFilterNil :: TaggedFilterIds '[]
    TaggedFilterCons :: Tagged e Quantity -> TaggedFilterIds es -> TaggedFilterIds (e : es)

class PollFilters (es :: [*]) where
    openMultiFilter :: JsonRpc m => MultiFilter es -> m (TaggedFilterIds es)
    checkMultiFilter :: JsonRpc m => TaggedFilterIds es -> m [Field (WithChange es)]
    closeMultiFilter :: JsonRpc m => TaggedFilterIds es -> m ()

instance PollFilters '[] where
    openMultiFilter :: MultiFilter '[] -> m (TaggedFilterIds '[])
openMultiFilter MultiFilter '[]
_ = TaggedFilterIds '[] -> m (TaggedFilterIds '[])
forall (f :: * -> *) a. Applicative f => a -> f a
pure TaggedFilterIds '[]
TaggedFilterNil
    checkMultiFilter :: TaggedFilterIds '[] -> m [Field (WithChange '[])]
checkMultiFilter TaggedFilterIds '[]
_ = [Field '[]] -> m [Field '[]]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
    closeMultiFilter :: TaggedFilterIds '[] -> m ()
closeMultiFilter TaggedFilterIds '[]
_ = () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

instance forall e i ni es.
  ( DecodeEvent i ni e
  , PollFilters es
  , RecApplicative (WithChange es)
  , FoldRec (FilterChange e : WithChange es) (WithChange es)
  ) => PollFilters (e:es) where
  openMultiFilter :: MultiFilter (e : es) -> m (TaggedFilterIds (e : es))
openMultiFilter (Filter e
f :? MultiFilter es
fs) = do
    Quantity
fId <- Filter e -> m Quantity
forall (m :: * -> *) e. JsonRpc m => Filter e -> m Quantity
Eth.newFilter Filter e
f
    TaggedFilterIds es
fsIds <- MultiFilter es -> m (TaggedFilterIds es)
forall (es :: [*]) (m :: * -> *).
(PollFilters es, JsonRpc m) =>
MultiFilter es -> m (TaggedFilterIds es)
openMultiFilter MultiFilter es
fs
    TaggedFilterIds (e : es) -> m (TaggedFilterIds (e : es))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TaggedFilterIds (e : es) -> m (TaggedFilterIds (e : es)))
-> TaggedFilterIds (e : es) -> m (TaggedFilterIds (e : es))
forall a b. (a -> b) -> a -> b
$ Tagged e Quantity -> TaggedFilterIds es -> TaggedFilterIds (e : es)
forall e (es :: [*]).
Tagged e Quantity -> TaggedFilterIds es -> TaggedFilterIds (e : es)
TaggedFilterCons (Quantity -> Tagged e Quantity
forall k (s :: k) b. b -> Tagged s b
Tagged Quantity
fId) TaggedFilterIds es
fsIds

  checkMultiFilter :: TaggedFilterIds (e : es) -> m [Field (WithChange (e : es))]
checkMultiFilter (TaggedFilterCons (Tagged Quantity
fId) TaggedFilterIds es
fsIds) = do
    [Change]
changes <- Quantity -> m [Change]
forall (m :: * -> *). JsonRpc m => Quantity -> m [Change]
Eth.getFilterChanges Quantity
fId
    [FilterChange e]
filterChanges <- IO [FilterChange e] -> m [FilterChange e]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [FilterChange e] -> m [FilterChange e])
-> ([Change] -> IO [FilterChange e])
-> [Change]
-> m [FilterChange e]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. DecodeEvent i ni e => [Change] -> IO [FilterChange e]
forall i ni e.
DecodeEvent i ni e =>
[Change] -> IO [FilterChange e]
mkFilterChanges @_ @_ @e ([Change] -> m [FilterChange e]) -> [Change] -> m [FilterChange e]
forall a b. (a -> b) -> a -> b
$ [Change]
changes
    [Field (WithChange es)]
filterChanges' <- TaggedFilterIds es -> m [Field (WithChange es)]
forall (es :: [*]) (m :: * -> *).
(PollFilters es, JsonRpc m) =>
TaggedFilterIds es -> m [Field (WithChange es)]
checkMultiFilter @es TaggedFilterIds es
TaggedFilterIds es
fsIds
    [CoRec Identity (FilterChange e : WithChange es)]
-> m [CoRec Identity (FilterChange e : WithChange es)]
forall (f :: * -> *) a. Applicative f => a -> f a
pure  ([CoRec Identity (FilterChange e : WithChange es)]
 -> m [CoRec Identity (FilterChange e : WithChange es)])
-> [CoRec Identity (FilterChange e : WithChange es)]
-> m [CoRec Identity (FilterChange e : WithChange es)]
forall a b. (a -> b) -> a -> b
$ (FilterChange e -> CoRec Identity (FilterChange e : WithChange es))
-> [FilterChange e]
-> [CoRec Identity (FilterChange e : WithChange es)]
forall a b. (a -> b) -> [a] -> [b]
map (Identity (FilterChange e)
-> CoRec Identity (FilterChange e : WithChange es)
forall k (a1 :: k) (b :: [k]) (a :: k -> *).
RElem a1 b (RIndex a1 b) =>
a a1 -> CoRec a b
CoRec (Identity (FilterChange e)
 -> CoRec Identity (FilterChange e : WithChange es))
-> (FilterChange e -> Identity (FilterChange e))
-> FilterChange e
-> CoRec Identity (FilterChange e : WithChange es)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FilterChange e -> Identity (FilterChange e)
forall a. a -> Identity a
Identity) [FilterChange e]
filterChanges [CoRec Identity (FilterChange e : WithChange es)]
-> [CoRec Identity (FilterChange e : WithChange es)]
-> [CoRec Identity (FilterChange e : WithChange es)]
forall a. Semigroup a => a -> a -> a
<>  (Field (WithChange es)
 -> CoRec Identity (FilterChange e : WithChange es))
-> [Field (WithChange es)]
-> [CoRec Identity (FilterChange e : WithChange es)]
forall a b. (a -> b) -> [a] -> [b]
map Field (WithChange es)
-> CoRec Identity (FilterChange e : WithChange es)
forall (ts :: [*]) t.
(RecApplicative ts, FoldRec (t : ts) (t : ts)) =>
Field ts -> Field (t : ts)
weakenCoRec [Field (WithChange es)]
filterChanges'

  closeMultiFilter :: TaggedFilterIds (e : es) -> m ()
closeMultiFilter (TaggedFilterCons (Tagged Quantity
fId) TaggedFilterIds es
fsIds) = do
    Bool
_ <- Quantity -> m Bool
forall (m :: * -> *). JsonRpc m => Quantity -> m Bool
Eth.uninstallFilter Quantity
fId
    TaggedFilterIds es -> m ()
forall (es :: [*]) (m :: * -> *).
(PollFilters es, JsonRpc m) =>
TaggedFilterIds es -> m ()
closeMultiFilter TaggedFilterIds es
fsIds


-- | Polls a filter from the given filterId until the target toBlock is reached.
pollMultiFilter
  :: ( PollFilters es
     , RecApplicative (WithChange es)
#if MIN_VERSION_vinyl(0,10,0)
     , RPureConstrained HasLogIndex (WithChange es)
#else
     , AllAllSat '[HasLogIndex] (WithChange es)
#endif
     , JsonRpc m
     )
  => TaggedFilterIds es
  -> DefaultBlock
  -> MachineT m k [Field (WithChange es)]
pollMultiFilter :: TaggedFilterIds es
-> DefaultBlock -> MachineT m k [Field (WithChange es)]
pollMultiFilter TaggedFilterIds es
is = PlanT k [Field (WithChange es)] m Any
-> MachineT m k [Field (WithChange es)]
forall (m :: * -> *) (k :: * -> *) o a.
Monad m =>
PlanT k o m a -> MachineT m k o
construct (PlanT k [Field (WithChange es)] m Any
 -> MachineT m k [Field (WithChange es)])
-> (DefaultBlock -> PlanT k [Field (WithChange es)] m Any)
-> DefaultBlock
-> MachineT m k [Field (WithChange es)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TaggedFilterIds es
-> DefaultBlock -> PlanT k [Field (WithChange es)] m Any
forall (m :: * -> *) (es :: [*]) (k :: * -> *) b.
(JsonRpc m, PollFilters es,
 RPureConstrained HasLogIndex (WithChange es),
 RecApplicative (WithChange es)) =>
TaggedFilterIds es
-> DefaultBlock -> PlanT k [Field (WithChange es)] m b
pollPlan TaggedFilterIds es
is
  where
    pollPlan :: TaggedFilterIds es
-> DefaultBlock -> PlanT k [Field (WithChange es)] m b
pollPlan (TaggedFilterIds es
fIds :: TaggedFilterIds es) DefaultBlock
end = do
      Quantity
bn <- m Quantity -> PlanT k [Field (WithChange es)] m Quantity
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m Quantity -> PlanT k [Field (WithChange es)] m Quantity)
-> m Quantity -> PlanT k [Field (WithChange es)] m Quantity
forall a b. (a -> b) -> a -> b
$ m Quantity
forall (m :: * -> *). JsonRpc m => m Quantity
Eth.blockNumber
      if Quantity -> DefaultBlock
BlockWithNumber Quantity
bn DefaultBlock -> DefaultBlock -> Bool
forall a. Ord a => a -> a -> Bool
> DefaultBlock
end
        then do
          ()
_ <- m () -> PlanT k [Field (WithChange es)] m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> PlanT k [Field (WithChange es)] m ())
-> m () -> PlanT k [Field (WithChange es)] m ()
forall a b. (a -> b) -> a -> b
$ TaggedFilterIds es -> m ()
forall (es :: [*]) (m :: * -> *).
(PollFilters es, JsonRpc m) =>
TaggedFilterIds es -> m ()
closeMultiFilter TaggedFilterIds es
fIds
          PlanT k [Field (WithChange es)] m b
forall (k :: * -> *) o a. Plan k o a
stop
        else do
          IO () -> PlanT k [Field (WithChange es)] m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> PlanT k [Field (WithChange es)] m ())
-> IO () -> PlanT k [Field (WithChange es)] m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
1000000
          [Field (WithChange es)]
changes <- m [Field (WithChange es)]
-> PlanT k [Field (WithChange es)] m [Field (WithChange es)]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m [Field (WithChange es)]
 -> PlanT k [Field (WithChange es)] m [Field (WithChange es)])
-> m [Field (WithChange es)]
-> PlanT k [Field (WithChange es)] m [Field (WithChange es)]
forall a b. (a -> b) -> a -> b
$ [Field (WithChange es)] -> [Field (WithChange es)]
forall (es :: [*]).
(RPureConstrained HasLogIndex es, RecApplicative es) =>
[Field es] -> [Field es]
sortChanges ([Field (WithChange es)] -> [Field (WithChange es)])
-> m [Field (WithChange es)] -> m [Field (WithChange es)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TaggedFilterIds es -> m [Field (WithChange es)]
forall (es :: [*]) (m :: * -> *).
(PollFilters es, JsonRpc m) =>
TaggedFilterIds es -> m [Field (WithChange es)]
checkMultiFilter TaggedFilterIds es
fIds
          [Field (WithChange es)] -> Plan k [Field (WithChange es)] ()
forall o (k :: * -> *). o -> Plan k o ()
yield [Field (WithChange es)]
changes
          TaggedFilterIds es
-> DefaultBlock -> PlanT k [Field (WithChange es)] m b
pollPlan TaggedFilterIds es
fIds DefaultBlock
end

--------------------------------------------------------------------------------

multiEventNoFilter
  :: ( QueryAllLogs es
     , MapHandlers m es (WithChange es)
#if MIN_VERSION_vinyl(0,10,0)
     , RPureConstrained HasLogIndex (WithChange es)
#else
     , AllAllSat '[HasLogIndex] (WithChange es)
#endif
     , RecApplicative (WithChange es)
     , JsonRpc m
     )
  => MultiFilter es
  -> Handlers es (ReaderT Change m EventAction)
  -> m ()
multiEventNoFilter :: MultiFilter es
-> Handlers es (ReaderT Change m EventAction) -> m ()
multiEventNoFilter MultiFilter es
fltrs = MultiFilter es
-> Integer -> Handlers es (ReaderT Change m EventAction) -> m ()
forall (es :: [*]) (m :: * -> *).
(QueryAllLogs es, MapHandlers m es (WithChange es),
 RPureConstrained HasLogIndex (WithChange es),
 RecApplicative (WithChange es), JsonRpc m) =>
MultiFilter es
-> Integer -> Handlers es (ReaderT Change m EventAction) -> m ()
multiEventManyNoFilter MultiFilter es
fltrs Integer
0


multiEventManyNoFilter
  :: ( QueryAllLogs es
     , MapHandlers m es (WithChange es)
#if MIN_VERSION_vinyl(0,10,0)
     , RPureConstrained HasLogIndex (WithChange es)
#else
     , AllAllSat '[HasLogIndex] (WithChange es)
#endif
     , RecApplicative (WithChange es)
     , JsonRpc m
     )
  => MultiFilter es
  -> Integer
  -> Handlers es (ReaderT Change m EventAction)
  -> m ()
multiEventManyNoFilter :: MultiFilter es
-> Integer -> Handlers es (ReaderT Change m EventAction) -> m ()
multiEventManyNoFilter MultiFilter es
fltrs Integer
window Handlers es (ReaderT Change m EventAction)
handlers = do
    Quantity
start <- DefaultBlock -> m Quantity
forall (m :: * -> *). JsonRpc m => DefaultBlock -> m Quantity
mkBlockNumber (DefaultBlock -> m Quantity) -> DefaultBlock -> m Quantity
forall a b. (a -> b) -> a -> b
$ MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minStartBlock MultiFilter es
fltrs
    let initState :: MultiFilterStreamState es
initState =
          MultiFilterStreamState :: forall (es :: [*]).
Quantity -> MultiFilter es -> Integer -> MultiFilterStreamState es
MultiFilterStreamState { mfssCurrentBlock :: Quantity
mfssCurrentBlock = Quantity
start
                                 , mfssInitialMultiFilter :: MultiFilter es
mfssInitialMultiFilter = MultiFilter es
fltrs
                                 , mfssWindowSize :: Integer
mfssWindowSize = Integer
window
                                 }
    Maybe (EventAction, Quantity)
mLastProcessedFilterState <- MachineT m Any [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
(Monad m, MapHandlers m es (WithChange es)) =>
MachineT m k [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceMultiEventStream (MultiFilterStreamState es -> MachineT m Any [Field (WithChange es)]
forall (es :: [*]) (k :: * -> *) (m :: * -> *).
(QueryAllLogs es, RPureConstrained HasLogIndex (WithChange es),
 RecApplicative (WithChange es), JsonRpc m) =>
MultiFilterStreamState es -> MachineT m k [Field (WithChange es)]
playMultiLogs MultiFilterStreamState es
initState) Handlers es (ReaderT Change m EventAction)
handlers
    case Maybe (EventAction, Quantity)
mLastProcessedFilterState of
      Maybe (EventAction, Quantity)
Nothing ->
        let pollingFilterState :: MultiFilterStreamState es
pollingFilterState =
              MultiFilterStreamState :: forall (es :: [*]).
Quantity -> MultiFilter es -> Integer -> MultiFilterStreamState es
MultiFilterStreamState { mfssCurrentBlock :: Quantity
mfssCurrentBlock = Quantity
start
                                     , mfssInitialMultiFilter :: MultiFilter es
mfssInitialMultiFilter = MultiFilter es
fltrs
                                     , mfssWindowSize :: Integer
mfssWindowSize = Integer
0
                                     }
        in m (Maybe (EventAction, Quantity)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Maybe (EventAction, Quantity)) -> m ())
-> m (Maybe (EventAction, Quantity)) -> m ()
forall a b. (a -> b) -> a -> b
$ MachineT m Any [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
(Monad m, MapHandlers m es (WithChange es)) =>
MachineT m k [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceMultiEventStream (MultiFilterStreamState es -> MachineT m Any [Field (WithChange es)]
forall (es :: [*]) (k :: * -> *) (m :: * -> *).
(QueryAllLogs es, RPureConstrained HasLogIndex (WithChange es),
 RecApplicative (WithChange es), JsonRpc m) =>
MultiFilterStreamState es -> MachineT m k [Field (WithChange es)]
playNewMultiLogs MultiFilterStreamState es
pollingFilterState) Handlers es (ReaderT Change m EventAction)
handlers
      Just (EventAction
act, Quantity
lastBlock) -> do
        Quantity
end <- DefaultBlock -> m Quantity
forall (m :: * -> *). JsonRpc m => DefaultBlock -> m Quantity
mkBlockNumber (DefaultBlock -> m Quantity)
-> (MultiFilter es -> DefaultBlock) -> MultiFilter es -> m Quantity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minEndBlock (MultiFilter es -> m Quantity) -> MultiFilter es -> m Quantity
forall a b. (a -> b) -> a -> b
$ MultiFilter es
fltrs
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (EventAction
act EventAction -> EventAction -> Bool
forall a. Eq a => a -> a -> Bool
/= EventAction
TerminateEvent Bool -> Bool -> Bool
&& Quantity
lastBlock Quantity -> Quantity -> Bool
forall a. Ord a => a -> a -> Bool
< Quantity
end) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          let pollingFilterState :: MultiFilterStreamState es
pollingFilterState = MultiFilterStreamState :: forall (es :: [*]).
Quantity -> MultiFilter es -> Integer -> MultiFilterStreamState es
MultiFilterStreamState { mfssCurrentBlock :: Quantity
mfssCurrentBlock = Quantity
lastBlock Quantity -> Quantity -> Quantity
forall a. Num a => a -> a -> a
+ Quantity
1
                                                          , mfssInitialMultiFilter :: MultiFilter es
mfssInitialMultiFilter = MultiFilter es
fltrs
                                                          , mfssWindowSize :: Integer
mfssWindowSize = Integer
0
                                                          }
          in m (Maybe (EventAction, Quantity)) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Maybe (EventAction, Quantity)) -> m ())
-> m (Maybe (EventAction, Quantity)) -> m ()
forall a b. (a -> b) -> a -> b
$ MachineT m Any [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
(Monad m, MapHandlers m es (WithChange es)) =>
MachineT m k [Field (WithChange es)]
-> Handlers es (ReaderT Change m EventAction)
-> m (Maybe (EventAction, Quantity))
reduceMultiEventStream (MultiFilterStreamState es -> MachineT m Any [Field (WithChange es)]
forall (es :: [*]) (k :: * -> *) (m :: * -> *).
(QueryAllLogs es, RPureConstrained HasLogIndex (WithChange es),
 RecApplicative (WithChange es), JsonRpc m) =>
MultiFilterStreamState es -> MachineT m k [Field (WithChange es)]
playNewMultiLogs MultiFilterStreamState es
pollingFilterState) Handlers es (ReaderT Change m EventAction)
handlers

newMultiFilterStream :: JsonRpc m
                     => MultiFilterStreamState es
                     -> MachineT m k (MultiFilter es)
newMultiFilterStream :: MultiFilterStreamState es -> MachineT m k (MultiFilter es)
newMultiFilterStream MultiFilterStreamState es
initialPlan = do
  MultiFilterStreamState es
-> (MultiFilterStreamState es
    -> PlanT k (MultiFilter es) m (MultiFilterStreamState es))
-> MachineT m k (MultiFilter es)
forall (m :: * -> *) s (k :: * -> *) o.
Monad m =>
s -> (s -> PlanT k o m s) -> MachineT m k o
unfoldPlan MultiFilterStreamState es
initialPlan ((MultiFilterStreamState es
  -> PlanT k (MultiFilter es) m (MultiFilterStreamState es))
 -> MachineT m k (MultiFilter es))
-> (MultiFilterStreamState es
    -> PlanT k (MultiFilter es) m (MultiFilterStreamState es))
-> MachineT m k (MultiFilter es)
forall a b. (a -> b) -> a -> b
$ \MultiFilterStreamState es
s -> do
    let end :: DefaultBlock
end = MultiFilter es -> DefaultBlock
forall (es :: [*]). MultiFilter es -> DefaultBlock
minEndBlock (MultiFilter es -> DefaultBlock)
-> (MultiFilterStreamState es -> MultiFilter es)
-> MultiFilterStreamState es
-> DefaultBlock
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MultiFilterStreamState es -> MultiFilter es
forall (es :: [*]). MultiFilterStreamState es -> MultiFilter es
mfssInitialMultiFilter (MultiFilterStreamState es -> DefaultBlock)
-> MultiFilterStreamState es -> DefaultBlock
forall a b. (a -> b) -> a -> b
$ MultiFilterStreamState es
initialPlan
    DefaultBlock
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
JsonRpc m =>
DefaultBlock
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
filterPlan DefaultBlock
end MultiFilterStreamState es
s
  where
    filterPlan :: JsonRpc m
               => DefaultBlock
               -> MultiFilterStreamState es
               -> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
    filterPlan :: DefaultBlock
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
filterPlan DefaultBlock
end initialState :: MultiFilterStreamState es
initialState@MultiFilterStreamState{Integer
Quantity
MultiFilter es
mfssWindowSize :: Integer
mfssInitialMultiFilter :: MultiFilter es
mfssCurrentBlock :: Quantity
mfssWindowSize :: forall (es :: [*]). MultiFilterStreamState es -> Integer
mfssInitialMultiFilter :: forall (es :: [*]). MultiFilterStreamState es -> MultiFilter es
mfssCurrentBlock :: forall (es :: [*]). MultiFilterStreamState es -> Quantity
..} = do
      if Quantity -> DefaultBlock
BlockWithNumber Quantity
mfssCurrentBlock DefaultBlock -> DefaultBlock -> Bool
forall a. Ord a => a -> a -> Bool
> DefaultBlock
end
        then PlanT k (MultiFilter es) m (MultiFilterStreamState es)
forall (k :: * -> *) o a. Plan k o a
stop
        else do
          Quantity
newestBlockNumber <- m Quantity -> PlanT k (MultiFilter es) m Quantity
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m Quantity -> PlanT k (MultiFilter es) m Quantity)
-> (Quantity -> m Quantity)
-> Quantity
-> PlanT k (MultiFilter es) m Quantity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Quantity -> m Quantity
forall (m :: * -> *). JsonRpc m => Quantity -> m Quantity
pollTillBlockProgress (Quantity -> PlanT k (MultiFilter es) m Quantity)
-> Quantity -> PlanT k (MultiFilter es) m Quantity
forall a b. (a -> b) -> a -> b
$ Quantity
mfssCurrentBlock
          let h :: forall e. Filter e -> Filter e
              h :: Filter e -> Filter e
h Filter e
f = Filter e
f { filterFromBlock :: DefaultBlock
filterFromBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
mfssCurrentBlock
                      , filterToBlock :: DefaultBlock
filterToBlock = Quantity -> DefaultBlock
BlockWithNumber Quantity
newestBlockNumber
                      }
          MultiFilter es -> Plan k (MultiFilter es) ()
forall o (k :: * -> *). o -> Plan k o ()
yield ((forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
forall (es :: [*]).
(forall e. Filter e -> Filter e)
-> MultiFilter es -> MultiFilter es
modifyMultiFilter forall e. Filter e -> Filter e
h MultiFilter es
mfssInitialMultiFilter)
          DefaultBlock
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
JsonRpc m =>
DefaultBlock
-> MultiFilterStreamState es
-> PlanT k (MultiFilter es) m (MultiFilterStreamState es)
filterPlan DefaultBlock
end MultiFilterStreamState es
initialState { mfssCurrentBlock :: Quantity
mfssCurrentBlock = Quantity
newestBlockNumber Quantity -> Quantity -> Quantity
forall a. Num a => a -> a -> a
+ Quantity
1 }

playNewMultiLogs
  :: forall es k m.
     ( QueryAllLogs es
#if MIN_VERSION_vinyl(0,10,0)
     , RPureConstrained HasLogIndex (WithChange es)
#else
     , AllAllSat '[HasLogIndex] (WithChange es)
#endif
     , RecApplicative (WithChange es)
     , JsonRpc m
     )
  => MultiFilterStreamState es
  -> MachineT m k [Field (WithChange es)]
playNewMultiLogs :: MultiFilterStreamState es -> MachineT m k [Field (WithChange es)]
playNewMultiLogs MultiFilterStreamState es
s = ([Field (WithChange es)] -> [Field (WithChange es)])
-> MachineT m k [Field (WithChange es)]
-> MachineT m k [Field (WithChange es)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Field (WithChange es)] -> [Field (WithChange es)]
forall (es :: [*]).
(RPureConstrained HasLogIndex es, RecApplicative es) =>
[Field es] -> [Field es]
sortChanges (MachineT m k [Field (WithChange es)]
 -> MachineT m k [Field (WithChange es)])
-> MachineT m k [Field (WithChange es)]
-> MachineT m k [Field (WithChange es)]
forall a b. (a -> b) -> a -> b
$
     MultiFilterStreamState es -> MachineT m k (MultiFilter es)
forall (m :: * -> *) (es :: [*]) (k :: * -> *).
JsonRpc m =>
MultiFilterStreamState es -> MachineT m k (MultiFilter es)
newMultiFilterStream MultiFilterStreamState es
s
  MachineT m k (MultiFilter es)
-> ProcessT m (MultiFilter es) [Field (WithChange es)]
-> MachineT m k [Field (WithChange es)]
forall (m :: * -> *) (k :: * -> *) b c.
Monad m =>
MachineT m k b -> ProcessT m b c -> MachineT m k c
~> (MultiFilter es -> m [Field (WithChange es)])
-> ProcessT m (MultiFilter es) [Field (WithChange es)]
forall (k :: * -> * -> *) (m :: * -> *) a b.
(Category k, Monad m) =>
(a -> m b) -> MachineT m (k a) b
autoM MultiFilter es -> m [Field (WithChange es)]
forall (es :: [*]) (m :: * -> *).
(QueryAllLogs es, JsonRpc m) =>
MultiFilter es -> m [Field (WithChange es)]
queryAllLogs