{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -Wno-orphans #-}
module Servant.Streamly
( StreamlyToSourceIO(..)
)
where
import Control.Monad.IO.Class ( MonadIO(..)
, liftIO
)
import Control.Monad.Trans.Resource ( ResourceT
, runResourceT
)
import qualified Streamly
import qualified Streamly.Prelude as Streamly
import qualified Servant.API.Stream as Servant
import qualified Servant.Types.SourceT as Servant
class StreamlyToSourceIO m where
streamlyToSourceIO :: Streamly.IsStream t => t m a -> Servant.SourceIO a
instance StreamlyToSourceIO IO where
streamlyToSourceIO :: t IO a -> SourceIO a
streamlyToSourceIO t IO a
stream = (forall b. (StepT IO a -> IO b) -> IO b) -> SourceIO a
forall (m :: * -> *) a.
(forall b. (StepT m a -> m b) -> m b) -> SourceT m a
Servant.SourceT
((StepT IO a -> IO b) -> StepT IO a -> IO b
forall a b. (a -> b) -> a -> b
$ SerialT IO a -> StepT IO a
forall a. SerialT IO a -> StepT IO a
transform (SerialT IO a -> StepT IO a) -> SerialT IO a -> StepT IO a
forall a b. (a -> b) -> a -> b
$ t IO a -> SerialT IO a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
Streamly.adapt t IO a
stream)
where
transform :: SerialT IO a -> StepT IO a
transform = IO (StepT IO a) -> StepT IO a
forall (m :: * -> *) a. m (StepT m a) -> StepT m a
Servant.Effect (IO (StepT IO a) -> StepT IO a)
-> (SerialT IO a -> IO (StepT IO a)) -> SerialT IO a -> StepT IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> StepT IO a -> StepT IO a)
-> StepT IO a -> SerialT IO a -> IO (StepT IO a)
forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> SerialT m a -> m b
Streamly.foldr a -> StepT IO a -> StepT IO a
forall (m :: * -> *) a. a -> StepT m a -> StepT m a
Servant.Yield StepT IO a
forall (m :: * -> *) a. StepT m a
Servant.Stop
instance StreamlyToSourceIO (ResourceT IO) where
streamlyToSourceIO :: t (ResourceT IO) a -> SourceIO a
streamlyToSourceIO t (ResourceT IO) a
stream = (forall b. (StepT IO a -> IO b) -> IO b) -> SourceIO a
forall (m :: * -> *) a.
(forall b. (StepT m a -> m b) -> m b) -> SourceT m a
Servant.SourceT
((StepT IO a -> IO b) -> StepT IO a -> IO b
forall a b. (a -> b) -> a -> b
$ SerialT (ResourceT IO) a -> StepT IO a
forall a. SerialT (ResourceT IO) a -> StepT IO a
transform (SerialT (ResourceT IO) a -> StepT IO a)
-> SerialT (ResourceT IO) a -> StepT IO a
forall a b. (a -> b) -> a -> b
$ t (ResourceT IO) a -> SerialT (ResourceT IO) a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
Streamly.adapt t (ResourceT IO) a
stream)
where
transform :: SerialT (ResourceT IO) a -> StepT IO a
transform =
IO (StepT IO a) -> StepT IO a
forall (m :: * -> *) a. m (StepT m a) -> StepT m a
Servant.Effect (IO (StepT IO a) -> StepT IO a)
-> (SerialT (ResourceT IO) a -> IO (StepT IO a))
-> SerialT (ResourceT IO) a
-> StepT IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ResourceT IO (StepT IO a) -> IO (StepT IO a)
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT IO (StepT IO a) -> IO (StepT IO a))
-> (SerialT (ResourceT IO) a -> ResourceT IO (StepT IO a))
-> SerialT (ResourceT IO) a
-> IO (StepT IO a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> StepT IO a -> StepT IO a)
-> StepT IO a
-> SerialT (ResourceT IO) a
-> ResourceT IO (StepT IO a)
forall (m :: * -> *) a b.
Monad m =>
(a -> b -> b) -> b -> SerialT m a -> m b
Streamly.foldr a -> StepT IO a -> StepT IO a
forall (m :: * -> *) a. a -> StepT m a -> StepT m a
Servant.Yield StepT IO a
forall (m :: * -> *) a. StepT m a
Servant.Stop
instance (StreamlyToSourceIO m, Streamly.IsStream t) => Servant.ToSourceIO a (t m a) where
toSourceIO :: t m a -> SourceIO a
toSourceIO = t m a -> SourceIO a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(StreamlyToSourceIO m, IsStream t) =>
t m a -> SourceIO a
streamlyToSourceIO
instance (Streamly.IsStream t) => Servant.FromSourceIO a (t IO a) where
fromSourceIO :: SourceIO a -> t IO a
fromSourceIO SourceIO a
src =
(IO (t IO a) -> IO (t IO a)) -> t IO (IO (t IO a)) -> t IO a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m (t m b)) -> t m a -> t m b
Streamly.concatMapM IO (t IO a) -> IO (t IO a)
forall a. a -> a
id (t IO (IO (t IO a)) -> t IO a) -> t IO (IO (t IO a)) -> t IO a
forall a b. (a -> b) -> a -> b
$ IO (t IO a) -> t IO (IO (t IO a))
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
Streamly.yield (IO (t IO a) -> t IO (IO (t IO a)))
-> IO (t IO a) -> t IO (IO (t IO a))
forall a b. (a -> b) -> a -> b
$ SourceIO a -> (StepT IO a -> IO (t IO a)) -> IO (t IO a)
forall (m :: * -> *) a.
SourceT m a -> forall b. (StepT m a -> m b) -> m b
Servant.unSourceT SourceIO a
src StepT IO a -> IO (t IO a)
IsStream t => StepT IO a -> IO (t IO a)
go
where
go :: Streamly.IsStream t => Servant.StepT IO a -> IO (t IO a)
go :: StepT IO a -> IO (t IO a)
go StepT IO a
step = case StepT IO a
step of
StepT IO a
Servant.Stop -> t IO a -> IO (t IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return t IO a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Streamly.nil
Servant.Error String
e -> t IO a -> IO (t IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (t IO a -> IO (t IO a)) -> t IO a -> IO (t IO a)
forall a b. (a -> b) -> a -> b
$ IO a -> t IO a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
Streamly.yieldM (IO a -> t IO a) -> IO a -> t IO a
forall a b. (a -> b) -> a -> b
$ String -> IO a
forall a. HasCallStack => String -> a
error String
e
Servant.Skip StepT IO a
n -> StepT IO a -> IO (t IO a)
IsStream t => StepT IO a -> IO (t IO a)
go StepT IO a
n
Servant.Yield a
x StepT IO a
nextStep -> a -> t IO a -> t IO a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
Streamly.cons a
x (t IO a -> t IO a) -> IO (t IO a) -> IO (t IO a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StepT IO a -> IO (t IO a)
IsStream t => StepT IO a -> IO (t IO a)
go StepT IO a
nextStep
Servant.Effect IO (StepT IO a)
nextStep -> IO (StepT IO a)
nextStep IO (StepT IO a) -> (StepT IO a -> IO (t IO a)) -> IO (t IO a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= StepT IO a -> IO (t IO a)
IsStream t => StepT IO a -> IO (t IO a)
go
instance (Streamly.IsStream t) => Servant.FromSourceIO a (t (ResourceT IO) a) where
fromSourceIO :: SourceIO a -> t (ResourceT IO) a
fromSourceIO SourceIO a
src =
(ResourceT IO (t (ResourceT IO) a)
-> ResourceT IO (t (ResourceT IO) a))
-> t (ResourceT IO) (ResourceT IO (t (ResourceT IO) a))
-> t (ResourceT IO) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m (t m b)) -> t m a -> t m b
Streamly.concatMapM ResourceT IO (t (ResourceT IO) a)
-> ResourceT IO (t (ResourceT IO) a)
forall a. a -> a
id (t (ResourceT IO) (ResourceT IO (t (ResourceT IO) a))
-> t (ResourceT IO) a)
-> t (ResourceT IO) (ResourceT IO (t (ResourceT IO) a))
-> t (ResourceT IO) a
forall a b. (a -> b) -> a -> b
$ ResourceT IO (t (ResourceT IO) a)
-> t (ResourceT IO) (ResourceT IO (t (ResourceT IO) a))
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
Streamly.yield (ResourceT IO (t (ResourceT IO) a)
-> t (ResourceT IO) (ResourceT IO (t (ResourceT IO) a)))
-> ResourceT IO (t (ResourceT IO) a)
-> t (ResourceT IO) (ResourceT IO (t (ResourceT IO) a))
forall a b. (a -> b) -> a -> b
$ IO (t (ResourceT IO) a) -> ResourceT IO (t (ResourceT IO) a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (t (ResourceT IO) a) -> ResourceT IO (t (ResourceT IO) a))
-> IO (t (ResourceT IO) a) -> ResourceT IO (t (ResourceT IO) a)
forall a b. (a -> b) -> a -> b
$ SourceIO a
-> (StepT IO a -> IO (t (ResourceT IO) a))
-> IO (t (ResourceT IO) a)
forall (m :: * -> *) a.
SourceT m a -> forall b. (StepT m a -> m b) -> m b
Servant.unSourceT SourceIO a
src StepT IO a -> IO (t (ResourceT IO) a)
IsStream t => StepT IO a -> IO (t (ResourceT IO) a)
go
where
go :: Streamly.IsStream t => Servant.StepT IO a -> IO (t (ResourceT IO) a)
go :: StepT IO a -> IO (t (ResourceT IO) a)
go StepT IO a
step = case StepT IO a
step of
StepT IO a
Servant.Stop -> t (ResourceT IO) a -> IO (t (ResourceT IO) a)
forall (m :: * -> *) a. Monad m => a -> m a
return t (ResourceT IO) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Streamly.nil
Servant.Error String
e -> t (ResourceT IO) a -> IO (t (ResourceT IO) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (t (ResourceT IO) a -> IO (t (ResourceT IO) a))
-> t (ResourceT IO) a -> IO (t (ResourceT IO) a)
forall a b. (a -> b) -> a -> b
$ ResourceT IO a -> t (ResourceT IO) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
Streamly.yieldM (ResourceT IO a -> t (ResourceT IO) a)
-> ResourceT IO a -> t (ResourceT IO) a
forall a b. (a -> b) -> a -> b
$ String -> ResourceT IO a
forall a. HasCallStack => String -> a
error String
e
Servant.Skip StepT IO a
n -> StepT IO a -> IO (t (ResourceT IO) a)
IsStream t => StepT IO a -> IO (t (ResourceT IO) a)
go StepT IO a
n
Servant.Yield a
x StepT IO a
nextStep -> a -> t (ResourceT IO) a -> t (ResourceT IO) a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
Streamly.cons a
x (t (ResourceT IO) a -> t (ResourceT IO) a)
-> IO (t (ResourceT IO) a) -> IO (t (ResourceT IO) a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StepT IO a -> IO (t (ResourceT IO) a)
IsStream t => StepT IO a -> IO (t (ResourceT IO) a)
go StepT IO a
nextStep
Servant.Effect IO (StepT IO a)
nextStep -> IO (StepT IO a)
nextStep IO (StepT IO a)
-> (StepT IO a -> IO (t (ResourceT IO) a))
-> IO (t (ResourceT IO) a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= StepT IO a -> IO (t (ResourceT IO) a)
IsStream t => StepT IO a -> IO (t (ResourceT IO) a)
go