Copyright | (c) Adam Conner-Sax 2019 |
---|---|
License | BSD-3-Clause |
Maintainer | adam_conner_sax@yahoo.com |
Stability | experimental |
Safe Haskell | None |
Language | Haskell2010 |
map-reduce engine (fold builder) using Streamly
streams as its intermediate and return type.
Notes:
1. These are polymorphic in the return stream type. Thought the streams do have to be serial
when groupBy
is called
So you have to specify the stream type in the call or it has to be inferrable from the use of the result.
- There is a concurrent engine here, one that uses Streamly's concurrency features to map over the stream. I've not been able to verify that this is faster on an appropriate task with appropriate runtime settings.
Synopsis
- streamlyEngine :: (Foldable g, Functor g) => (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)) -> MapReduceFold y k c (SerialT Identity) x d
- streamlyEngineM :: (IsStream t, Monad m, MonadAsync m, Traversable g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFoldM m y k c (t m) x d
- concurrentStreamlyEngine :: forall tIn tOut m g y k c x d. (IsStream tIn, IsStream tOut, MonadAsync m, Foldable g, Functor g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFold y k c (tOut m) x d
- toStreamlyFold :: Monad m => Fold a b -> Fold m a b
- toStreamlyFoldM :: FoldM m a b -> Fold m a b
- resultToList :: (Monad m, IsStream t) => t m a -> m [a]
- concatStream :: (Monad m, Monoid a) => SerialT m a -> m a
- concatStreamFold :: Monoid b => Fold a (SerialT Identity b) -> Fold a b
- concatStreamFoldM :: (Monad m, Monoid b, IsStream t) => FoldM m a (t m b) -> FoldM m a b
- concatConcurrentStreamFold :: (Monad m, Monoid b, IsStream t) => Fold a (t m b) -> FoldM m a b
- groupByHashableKey :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByOrderedKey :: (Monad m, Ord k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByHashableKeyST :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- groupByDiscriminatedKey :: (Monad m, Grouping k) => SerialT m (k, c) -> SerialT m (k, Seq c)
- data SerialT (m :: Type -> Type) a
- data WSerialT (m :: Type -> Type) a
- data AheadT (m :: Type -> Type) a
- data AsyncT (m :: Type -> Type) a
- data WAsyncT (m :: Type -> Type) a
- data ParallelT (m :: Type -> Type) a
- type MonadAsync (m :: Type -> Type) = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
- class (forall (m :: Type -> Type) a. MonadAsync m => Semigroup (t m a), forall (m :: Type -> Type) a. MonadAsync m => Monoid (t m a), forall (m :: Type -> Type). Monad m => Functor (t m), forall (m :: Type -> Type). MonadAsync m => Applicative (t m)) => IsStream (t :: (Type -> Type) -> Type -> Type)
Engines
streamlyEngine :: (Foldable g, Functor g) => (forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)) -> MapReduceFold y k c (SerialT Identity) x d Source #
map-reduce-fold builder returning a SerialT Identity d
result
streamlyEngineM :: (IsStream t, Monad m, MonadAsync m, Traversable g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFoldM m y k c (t m) x d Source #
effectful map-reduce-fold engine returning a (Istream t => t m d) result The MonadAsync constraint here more or less requires us to run in IO, or something IO like.
concurrentStreamlyEngine :: forall tIn tOut m g y k c x d. (IsStream tIn, IsStream tOut, MonadAsync m, Foldable g, Functor g) => (forall z. SerialT m (k, z) -> SerialT m (k, g z)) -> MapReduceFold y k c (tOut m) x d Source #
possibly (depending on chosen stream types) concurrent map-reduce-fold builder returning an (Istream t, MonadAsync m) => t m d
result
Streamly Combinators
toStreamlyFold :: Monad m => Fold a b -> Fold m a b Source #
convert a Control.Foldl Fold into a Streamly.Data.Fold fold
toStreamlyFoldM :: FoldM m a b -> Fold m a b Source #
convert a Control.Foldl FoldM into a Streamly.Data.Fold fold
Result Extraction
resultToList :: (Monad m, IsStream t) => t m a -> m [a] Source #
make a stream into an (effectful) []
concatStreamFold :: Monoid b => Fold a (SerialT Identity b) -> Fold a b Source #
mappend everything in a pure Streamly fold
concatStreamFoldM :: (Monad m, Monoid b, IsStream t) => FoldM m a (t m b) -> FoldM m a b Source #
mappend everything in an effectful Streamly fold.
concatConcurrentStreamFold :: (Monad m, Monoid b, IsStream t) => Fold a (t m b) -> FoldM m a b Source #
mappend everything in a concurrent Streamly fold.
groupBy
Functions
groupByHashableKey :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c)
by hashable
key.
NB: this function uses the fact that SerialT m
is a monad
groupByOrderedKey :: (Monad m, Ord k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c)
by ordered key.
NB: this function uses the fact that SerialT m
is a monad
groupByHashableKeyST :: (Monad m, Hashable k, Eq k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c)
by hashable
key. Uses mutable hashtables running in the ST monad.
NB: this function uses the fact that SerialT m
is a monad
groupByDiscriminatedKey :: (Monad m, Grouping k) => SerialT m (k, c) -> SerialT m (k, Seq c) Source #
Group streamly stream of (k,c)
by key with instance of Grouping from http://hackage.haskell.org/package/discrimination.
NB: this function uses the fact that SerialT m
is a monad
Re-Exports
data SerialT (m :: Type -> Type) a #
The Semigroup
operation for SerialT
behaves like a regular append
operation. Therefore, when a <> b
is evaluated, stream a
is evaluated
first until it exhausts and then stream b
is evaluated. In other words,
the elements of stream b
are appended to the elements of stream a
. This
operation can be used to fold an infinite lazy container of streams.
import Streamly
import qualified Streamly.Prelude as S
main = (S.toList . serially
$ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,2,3,4]
The Monad
instance runs the monadic continuation for each
element of the stream, serially.
main = S.drain . serially
$ do
x <- return 1 <> return 2
S.yieldM $ print x
1 2
SerialT
nests streams serially in a depth first manner.
main = S.drain . serially
$ do
x <- return 1 <> return 2
y <- return 3 <> return 4
S.yieldM $ print (x, y)
(1,3) (1,4) (2,3) (2,4)
We call the monadic code being run for each element of the stream a monadic
continuation. In imperative paradigm we can think of this composition as
nested for
loops and the monadic continuation is the body of the loop. The
loop iterates for all elements of the stream.
Note that the behavior and semantics of SerialT
, including Semigroup
and Monad
instances are exactly like Haskell lists except that SerialT
can contain effectful actions while lists are pure.
In the code above, the serially
combinator can be omitted as the default
stream type is SerialT
.
Since: streamly-0.2.0
Instances
data WSerialT (m :: Type -> Type) a #
The Semigroup
operation for WSerialT
interleaves the elements from the
two streams. Therefore, when a <> b
is evaluated, stream a
is evaluated
first to produce the first element of the combined stream and then stream
b
is evaluated to produce the next element of the combined stream, and
then we go back to evaluating stream a
and so on. In other words, the
elements of stream a
are interleaved with the elements of stream b
.
Note that evaluation of a <> b <> c
does not schedule a
, b
and c
with equal priority. This expression is equivalent to a <> (b <> c)
,
therefore, it fairly interleaves a
with the result of b <> c
. For
example, S.fromList [1,2] <> S.fromList [3,4] <> S.fromList [5,6] ::
WSerialT Identity Int
would result in [1,3,2,5,4,6]. In other words, the
leftmost stream gets the same scheduling priority as the rest of the
streams taken together. The same is true for each subexpression on the right.
Note that this operation cannot be used to fold a container of infinite streams as the state that it needs to maintain is proportional to the number of streams.
The W
in the name stands for wide
or breadth wise scheduling in
contrast to the depth wise scheduling behavior of SerialT
.
import Streamly
import qualified Streamly.Prelude as S
main = (S.toList . wSerially
$ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,3,2,4]
Similarly, the Monad
instance interleaves the iterations of the
inner and the outer loop, nesting loops in a breadth first manner.
main = S.drain . wSerially
$ do
x <- return 1 <> return 2
y <- return 3 <> return 4
S.yieldM $ print (x, y)
(1,3) (2,3) (1,4) (2,4)
Since: streamly-0.2.0
Instances
data AheadT (m :: Type -> Type) a #
The Semigroup
operation for AheadT
appends two streams. The combined
stream behaves like a single stream with the actions from the second stream
appended to the first stream. The combined stream is evaluated in the
speculative style. This operation can be used to fold an infinite lazy
container of streams.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main = do xs <- S.toList
.aheadly
$ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil) print xs where p n = threadDelay 1000000 >> return n
[1,2,3,4]
Any exceptions generated by a constituent stream are propagated to the output stream.
The monad instance of AheadT
may run each monadic continuation (bind)
concurrently in a speculative manner, performing side effects in a partially
ordered manner but producing the outputs in an ordered manner like
SerialT
.
main = S.drain . aheadly
$ do
n <- return 3 <> return 2 <> return 1
S.yieldM $ do
threadDelay (n * 1000000)
myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Since: streamly-0.3.0
Instances
MonadTrans AheadT | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
IsStream AheadT | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
(MonadState s m, MonadAsync m) => MonadState s (AheadT m) | |
(MonadReader r m, MonadAsync m) => MonadReader r (AheadT m) | |
MonadAsync m => Monad (AheadT m) | |
Monad m => Functor (AheadT m) | |
(Monad m, MonadAsync m) => Applicative (AheadT m) | |
(MonadIO m, MonadAsync m) => MonadIO (AheadT m) | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
(MonadThrow m, MonadAsync m) => MonadThrow (AheadT m) | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
MonadAsync m => Semigroup (AheadT m a) | |
MonadAsync m => Monoid (AheadT m a) | |
data AsyncT (m :: Type -> Type) a #
The Semigroup
operation (<>
) for AsyncT
merges two streams
concurrently with priority given to the first stream. In s1 <> s2 <> s3
...
the streams s1, s2 and s3 are scheduled for execution in that order.
Multiple scheduled streams may be executed concurrently and the elements
generated by them are served to the consumer as and when they become
available. This behavior is similar to the scheduling and execution behavior
of actions in a single async stream.
Since only a finite number of streams are executed concurrently, this operation can be used to fold an infinite lazy container of streams.
import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent
main = (S.toList . asyncly
$ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,2,3,4]
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the monad instance of AsyncT
may run each iteration
concurrently based on demand. More concurrent iterations are started only
if the previous iterations are not able to produce enough output for the
consumer.
main =drain
.asyncly
$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Since: streamly-0.1.0
Instances
MonadTrans AsyncT | |
Defined in Streamly.Internal.Data.Stream.Async | |
IsStream AsyncT | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadState s m, MonadAsync m) => MonadState s (AsyncT m) | |
(MonadReader r m, MonadAsync m) => MonadReader r (AsyncT m) | |
MonadAsync m => Monad (AsyncT m) | |
Monad m => Functor (AsyncT m) | |
(Monad m, MonadAsync m) => Applicative (AsyncT m) | |
(MonadIO m, MonadAsync m) => MonadIO (AsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadThrow m, MonadAsync m) => MonadThrow (AsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
MonadAsync m => Semigroup (AsyncT m a) | |
MonadAsync m => Monoid (AsyncT m a) | |
data WAsyncT (m :: Type -> Type) a #
WAsyncT
is similar to WSerialT
but with concurrent execution.
The Semigroup
operation (<>
) for WAsyncT
merges two streams
concurrently interleaving the actions from both the streams. In s1
<> s2 <> s3 ...
, the individual actions from streams s1
, s2
and s3
are scheduled for execution in a round-robin fashion. Multiple scheduled
actions may be executed concurrently, the results from concurrent executions
are consumed in the order in which they become available.
The W
in the name stands for wide
or breadth wise scheduling in
contrast to the depth wise scheduling behavior of AsyncT
.
import Streamly
import qualified Streamly.Prelude as S
import Control.Concurrent
main = (S.toList . wAsyncly
. maxThreads 1 $ (S.fromList [1,2]) <> (S.fromList [3,4])) >>= print
[1,3,2,4]
For this example, we are using maxThreads 1
so that concurrent thread
scheduling does not affect the results and make them unpredictable. Let's
now take a more general example:
main = (S.toList . wAsyncly
. maxThreads 1 $ (S.fromList [1,2,3]) <> (S.fromList [4,5,6]) <> (S.fromList [7,8,9])) >>= print
[1,4,2,7,5,3,8,6,9]
This is how the execution of the above stream proceeds:
- The scheduler queue is initialized with
[S.fromList [1,2,3], (S.fromList [4,5,6]) <> (S.fromList [7,8,9])]
assuming the head of the queue is represented by the rightmost item. S.fromList [1,2,3]
is executed, yielding the element1
and putting[2,3]
at the back of the scheduler queue. The scheduler queue now looks like[(S.fromList [4,5,6]) <> (S.fromList [7,8,9]), S.fromList [2,3]]
.- Now
(S.fromList [4,5,6]) <> (S.fromList [7,8,9])
is picked up for execution,S.fromList [7,8,9]
is added at the back of the queue andS.fromList [4,5,6]
is executed, yielding the element4
and addingS.fromList [5,6]
at the back of the queue. The queue now looks like[S.fromList [2,3], S.fromList [7,8,9], S.fromList [5,6]]
. - Note that the scheduler queue expands by one more stream component in
every pass because one more
<>
is broken down into two components. At this point there are no more<>
operations to be broken down further and the queue has reached its maximum size. Now these streams are scheduled in round-robin fashion yielding[2,7,5,3,8,8,9]
.
As we see above, in a right associated expression composed with <>
, only
one <>
operation is broken down into two components in one execution,
therefore, if we have n
streams composed using <>
it will take n
scheduler passes to expand the whole expression. By the time n-th
component is added to the scheduler queue, the first component would have
received n
scheduler passes.
Since all streams get interleaved, this operation is not suitable for
folding an infinite lazy container of infinite size streams. However, if
the streams are small, the streams on the left may get finished before more
streams are added to the scheduler queue from the right side of the
expression, so it may be possible to fold an infinite lazy container of
streams. For example, if the streams are of size n
then at most n
streams would be in the scheduler queue at a time.
Note that WSerialT
and WAsyncT
differ in their scheduling behavior,
therefore the output of WAsyncT
even with a single thread of execution is
not the same as that of WSerialT
See notes in WSerialT
for details about
its scheduling behavior.
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the Monad
instance of WAsyncT
runs all iterations fairly
concurrently using a round robin scheduling.
main =drain
.wAsyncly
$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Since: streamly-0.2.0
Instances
MonadTrans WAsyncT | |
Defined in Streamly.Internal.Data.Stream.Async | |
IsStream WAsyncT | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadState s m, MonadAsync m) => MonadState s (WAsyncT m) | |
(MonadReader r m, MonadAsync m) => MonadReader r (WAsyncT m) | |
MonadAsync m => Monad (WAsyncT m) | |
Monad m => Functor (WAsyncT m) | |
(Monad m, MonadAsync m) => Applicative (WAsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadIO m, MonadAsync m) => MonadIO (WAsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
(MonadThrow m, MonadAsync m) => MonadThrow (WAsyncT m) | |
Defined in Streamly.Internal.Data.Stream.Async | |
MonadAsync m => Semigroup (WAsyncT m a) | |
MonadAsync m => Monoid (WAsyncT m a) | |
data ParallelT (m :: Type -> Type) a #
Async composition with strict concurrent execution of all streams.
The Semigroup
instance of ParallelT
executes both the streams
concurrently without any delay or without waiting for the consumer demand
and merges the results as they arrive. If the consumer does not consume
the results, they are buffered upto a configured maximum, controlled by the
maxBuffer
primitive. If the buffer becomes full the concurrent tasks will
block until there is space in the buffer.
Both WAsyncT
and ParallelT
, evaluate the constituent streams fairly in a
round robin fashion. The key difference is that WAsyncT
might wait for the
consumer demand before it executes the tasks whereas ParallelT
starts
executing all the tasks immediately without waiting for the consumer demand.
For WAsyncT
the maxThreads
limit applies whereas for ParallelT
it does
not apply. In other words, WAsyncT
can be lazy whereas ParallelT
is
strict.
ParallelT
is useful for cases when the streams are required to be
evaluated simultaneously irrespective of how the consumer consumes them e.g.
when we want to race two tasks and want to start both strictly at the same
time or if we have timers in the parallel tasks and our results depend on
the timers being started at the same time. If we do not have such
requirements then AsyncT
or AheadT
are recommended as they can be more
efficient than ParallelT
.
main = (toList
.parallely
$ (fromFoldable [1,2]) <> (fromFoldable [3,4])) >>= print
[1,3,2,4]
When streams with more than one element are merged, it yields whichever
stream yields first without any bias, unlike the Async
style streams.
Any exceptions generated by a constituent stream are propagated to the output stream. The output and exceptions from a single stream are guaranteed to arrive in the same order in the resulting stream as they were generated in the input stream. However, the relative ordering of elements from different streams in the resulting stream can vary depending on scheduling and generation delays.
Similarly, the Monad
instance of ParallelT
runs all iterations
of the loop concurrently.
import Streamly import qualified Streamly.Prelude as S import Control.Concurrent main =drain
.parallely
$ do n <- return 3 <> return 2 <> return 1 S.yieldM $ do threadDelay (n * 1000000) myThreadId >>= \tid -> putStrLn (show tid ++ ": Delay " ++ show n)
ThreadId 40: Delay 1 ThreadId 39: Delay 2 ThreadId 38: Delay 3
Note that parallel composition can only combine a finite number of streams as it needs to retain state for each unfinished stream.
Since: 0.7.0 (maxBuffer applies to ParallelT streams)
Since: 0.1.0
Instances
MonadTrans ParallelT | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
IsStream ParallelT | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadBase b m, Monad m, MonadAsync m) => MonadBase b (ParallelT m) | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadState s m, MonadAsync m) => MonadState s (ParallelT m) | |
(MonadReader r m, MonadAsync m) => MonadReader r (ParallelT m) | |
MonadAsync m => Monad (ParallelT m) | |
Monad m => Functor (ParallelT m) | |
(Monad m, MonadAsync m) => Applicative (ParallelT m) | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadIO m, MonadAsync m) => MonadIO (ParallelT m) | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
(MonadThrow m, MonadAsync m) => MonadThrow (ParallelT m) | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
MonadAsync m => Semigroup (ParallelT m a) | |
MonadAsync m => Monoid (ParallelT m a) | |
type MonadAsync (m :: Type -> Type) = (MonadIO m, MonadBaseControl IO m, MonadThrow m) #
A monad that can perform concurrent or parallel IO operations. Streams
that can be composed concurrently require the underlying monad to be
MonadAsync
.
Since: streamly-0.1.0
class (forall (m :: Type -> Type) a. MonadAsync m => Semigroup (t m a), forall (m :: Type -> Type) a. MonadAsync m => Monoid (t m a), forall (m :: Type -> Type). Monad m => Functor (t m), forall (m :: Type -> Type). MonadAsync m => Applicative (t m)) => IsStream (t :: (Type -> Type) -> Type -> Type) #
Class of types that can represent a stream of elements of some type a
in
some monad m
.
Since: streamly-0.2.0
Instances
IsStream AheadT | |
Defined in Streamly.Internal.Data.Stream.Ahead | |
IsStream AsyncT | |
Defined in Streamly.Internal.Data.Stream.Async | |
IsStream WAsyncT | |
Defined in Streamly.Internal.Data.Stream.Async | |
IsStream ParallelT | |
Defined in Streamly.Internal.Data.Stream.Parallel | |
IsStream SerialT | |
Defined in Streamly.Internal.Data.Stream.Serial | |
IsStream WSerialT | |
Defined in Streamly.Internal.Data.Stream.Serial | |
IsStream ZipSerialM | |
Defined in Streamly.Internal.Data.Stream.Zip toStream :: forall (m :: Type -> Type) a. ZipSerialM m a -> Stream m a # fromStream :: forall (m :: Type -> Type) a. Stream m a -> ZipSerialM m a # consM :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a # (|:) :: MonadAsync m => m a -> ZipSerialM m a -> ZipSerialM m a # | |
IsStream ZipAsyncM | |
Defined in Streamly.Internal.Data.Stream.Zip | |
IsStream Stream | |
Defined in Streamly.Internal.Data.Stream.StreamK.Type |