module Network.K8s.Application
( withK8sEndpoint
, Config(..)
, defConfig
, K8sChecks(..)
) where
import Control.Concurrent (killThread, threadDelay, forkIO)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception (finally, AsyncException, throwIO, fromException)
import Control.Monad
import Data.Foldable
import Network.HTTP.Types
import Network.Wai as Wai
import Network.Wai.Handler.Warp qualified as Warp
import Network.Wai.Middleware.Prometheus as Prometheus
data Config = Config
{ Config -> Int
port :: Int
, Config -> Int
maxTearDownPeriodSeconds :: Int
} deriving (Int -> Config -> ShowS
[Config] -> ShowS
Config -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Config] -> ShowS
$cshowList :: [Config] -> ShowS
show :: Config -> String
$cshow :: Config -> String
showsPrec :: Int -> Config -> ShowS
$cshowsPrec :: Int -> Config -> ShowS
Show)
defConfig :: Config
defConfig :: Config
defConfig = Int -> Int -> Config
Config Int
10120 Int
30
data K8sChecks = K8sChecks
{ K8sChecks -> IO Bool
runReadynessCheck :: IO Bool
, K8sChecks -> IO Bool
runLivenessCheck :: IO Bool
}
data ApplicationState
= ApplicationStarting (Async ())
| ApplicationRunning
| ApplicationTeardownConfirm (TVar Bool)
| ApplicationTearingDown
withK8sEndpoint
:: Config
-> K8sChecks
-> IO a
-> (a -> IO b)
-> IO ()
withK8sEndpoint :: forall a b. Config -> K8sChecks -> IO a -> (a -> IO b) -> IO ()
withK8sEndpoint Config{Int
maxTearDownPeriodSeconds :: Int
port :: Int
maxTearDownPeriodSeconds :: Config -> Int
port :: Config -> Int
..} K8sChecks
k8s IO a
startup a -> IO b
action = do
Async a
startup_handle <- forall a. IO a -> IO (Async a)
async IO a
startup
TVar ApplicationState
state_box <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. a -> STM (TVar a)
newTVar forall a b. (a -> b) -> a -> b
$ Async () -> ApplicationState
ApplicationStarting (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a b. a -> b -> a
const ()) Async a
startup_handle)
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync (do
a
x <- forall a. Async a -> IO a
wait Async a
startup_handle
forall a. STM a -> IO a
atomically (TVar ApplicationState -> STM ()
switchToRunning TVar ApplicationState
state_box)
a -> IO b
action a
x) forall a b. (a -> b) -> a -> b
$ \Async b
server -> do
Async ()
k8s_server <- forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ forall void.
Int
-> Int -> K8sChecks -> TVar ApplicationState -> Async void -> IO ()
runK8sServiceEndpoint Int
port Int
maxTearDownPeriodSeconds K8sChecks
k8s TVar ApplicationState
state_box Async b
server
(do Either SomeException ()
result <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum
[ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Async a -> STM (Either SomeException a)
waitCatchSTM Async b
server
, forall a. TVar a -> STM a
readTVar TVar ApplicationState
state_box forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ApplicationState
ApplicationTearingDown -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right ()
ApplicationState
_ -> forall a. STM a
retry
]
case Either SomeException ()
result of
Left SomeException
se
| Just (AsyncCancelled
_ :: AsyncCancelled) <- forall e. Exception e => SomeException -> Maybe e
fromException SomeException
se -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
| Just (AsyncException
_ :: AsyncException) <- forall e. Exception e => SomeException -> Maybe e
fromException SomeException
se -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
| Bool
otherwise -> forall e a. Exception e => e -> IO a
throwIO SomeException
se
Right{} -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
) forall a b. IO a -> IO b -> IO a
`finally`
(let half_interval :: Int
half_interval = Int
maxTearDownPeriodSeconds forall a. Num a => a -> a -> a
* Int
1_000_000 forall a. Integral a => a -> a -> a
`div` Int
2
in forall a b. IO a -> IO b -> IO ()
race_ (Int -> IO ()
threadDelay Int
half_interval) (forall a. Async a -> IO ()
cancel Async b
server))
forall a b. IO a -> IO b -> IO a
`finally`
(IO () -> IO ThreadId
forkIO (forall a. Async a -> IO ()
cancel Async ()
k8s_server))
runK8sServiceEndpoint
:: Int
-> Int
-> K8sChecks
-> TVar ApplicationState
-> Async void
-> IO ()
runK8sServiceEndpoint :: forall void.
Int
-> Int -> K8sChecks -> TVar ApplicationState -> Async void -> IO ()
runK8sServiceEndpoint Int
port Int
teardown_time_seconds K8sChecks{IO Bool
runLivenessCheck :: IO Bool
runReadynessCheck :: IO Bool
runLivenessCheck :: K8sChecks -> IO Bool
runReadynessCheck :: K8sChecks -> IO Bool
..} TVar ApplicationState
state_box Async void
server = Int -> Application -> IO ()
Warp.run Int
port forall a b. (a -> b) -> a -> b
$ \Request
req Response -> IO ResponseReceived
resp -> do
case Request -> [Text]
Wai.pathInfo Request
req of
[Text
"started"] -> do
forall a. TVar a -> IO a
readTVarIO TVar ApplicationState
state_box forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ApplicationStarting{} ->
Response -> IO ResponseReceived
resp forall a b. (a -> b) -> a -> b
$ Status -> ResponseHeaders -> ByteString -> Response
responseLBS Status
status400 [(HeaderName
hContentType, ByteString
"text/plain")] ByteString
"starting"
ApplicationRunning{} ->
Response -> IO ResponseReceived
resp forall a b. (a -> b) -> a -> b
$ Status -> ResponseHeaders -> ByteString -> Response
responseLBS Status
status200 [(HeaderName
hContentType, ByteString
"text/plain")] ByteString
"ok"
ApplicationState
_ ->
Response -> IO ResponseReceived
resp forall a b. (a -> b) -> a -> b
$ Status -> ResponseHeaders -> ByteString -> Response
responseLBS Status
status200 [(HeaderName
hContentType, ByteString
"text/plain")] ByteString
"tearing down"
[Text
"ready"] -> do
forall a. TVar a -> IO a
readTVarIO TVar ApplicationState
state_box forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ApplicationStarting{} ->
Response -> IO ResponseReceived
resp forall a b. (a -> b) -> a -> b
$ Status -> ResponseHeaders -> ByteString -> Response
responseLBS Status
status400 [(HeaderName
hContentType, ByteString
"text/plain")] ByteString
"starting"
ApplicationRunning{} -> do
Bool
isReady <- IO Bool
runReadynessCheck
if Bool
isReady
then Response -> IO ResponseReceived
resp forall a b. (a -> b) -> a -> b
$ Status -> ResponseHeaders -> ByteString -> Response
responseLBS Status
status200 [(HeaderName
hContentType, ByteString
"text/plain")] ByteString
"running"
else Response -> IO ResponseReceived
resp forall a b. (a -> b) -> a -> b
$ Status -> ResponseHeaders -> ByteString -> Response
responseLBS Status
status400 [(HeaderName
hContentType, ByteString
"text/plain")] ByteString
"not running"
ApplicationTeardownConfirm TVar Bool
confirmed -> do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
confirmed Bool
True
Response -> IO ResponseReceived
resp forall a b. (a -> b) -> a -> b
$ Status -> ResponseHeaders -> ByteString -> Response
responseLBS Status
status400 [(HeaderName
hContentType, ByteString
"text/plain")] ByteString
"tearing down"
ApplicationTearingDown{} ->
Response -> IO ResponseReceived
resp forall a b. (a -> b) -> a -> b
$ Status -> ResponseHeaders -> ByteString -> Response
responseLBS Status
status400 [(HeaderName
hContentType, ByteString
"text/plain")] ByteString
"tearing down"
[Text
"health"] -> do
forall a. TVar a -> IO a
readTVarIO TVar ApplicationState
state_box forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ApplicationStarting{} ->
Response -> IO ResponseReceived
resp forall a b. (a -> b) -> a -> b
$ Status -> ResponseHeaders -> ByteString -> Response
responseLBS Status
status400 [(HeaderName
hContentType, ByteString
"text/plain")] ByteString
"starting"
ApplicationState
_ -> do
Bool
isAlive <- IO Bool
runLivenessCheck
if Bool
isAlive
then Response -> IO ResponseReceived
resp forall a b. (a -> b) -> a -> b
$ Status -> ResponseHeaders -> ByteString -> Response
responseLBS Status
status200 [(HeaderName
hContentType, ByteString
"text/plain")] ByteString
"running"
else Response -> IO ResponseReceived
resp forall a b. (a -> b) -> a -> b
$ Status -> ResponseHeaders -> ByteString -> Response
responseLBS Status
status400 [(HeaderName
hContentType, ByteString
"text/plain")] ByteString
"unhealthy"
[Text
"stop"] -> do
Async ()
_ <- forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ do
TVar Bool
d <- Int -> IO (TVar Bool)
registerDelay (Int
teardown_time_seconds forall a. Num a => a -> a -> a
* Int
1_000_000)
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar Bool -> TVar ApplicationState -> STM (IO ())
switchToTeardown TVar Bool
d TVar ApplicationState
state_box
ThreadId -> IO ()
killThread forall a b. (a -> b) -> a -> b
$ forall a. Async a -> ThreadId
asyncThreadId Async void
server
Response -> IO ResponseReceived
resp forall a b. (a -> b) -> a -> b
$ Status -> ResponseHeaders -> ByteString -> Response
responseLBS Status
status200 [(HeaderName
hContentType, ByteString
"text/plain")] ByteString
"tearing down"
[Text
"_metrics"] -> Application
metricsApp Request
req Response -> IO ResponseReceived
resp
[Text]
_ -> Response -> IO ResponseReceived
resp forall a b. (a -> b) -> a -> b
$ Status -> ResponseHeaders -> ByteString -> Response
Wai.responseLBS Status
status404 [(HeaderName
hContentType, ByteString
"text/plain")] ByteString
"Not found"
switchToTeardown :: TVar Bool -> TVar ApplicationState -> STM (IO ())
switchToTeardown :: TVar Bool -> TVar ApplicationState -> STM (IO ())
switchToTeardown TVar Bool
timeout TVar ApplicationState
state = forall a. TVar a -> STM a
readTVar TVar ApplicationState
state forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ApplicationStarting Async ()
init_thread -> do
forall a. TVar a -> a -> STM ()
writeTVar TVar ApplicationState
state ApplicationState
ApplicationTearingDown
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a. Async a -> IO ()
cancel Async ()
init_thread
ApplicationRunning{} -> do
TVar Bool
confirmed <- forall a. a -> STM (TVar a)
newTVar Bool
False
forall a. TVar a -> a -> STM ()
writeTVar TVar ApplicationState
state forall a b. (a -> b) -> a -> b
$ TVar Bool -> ApplicationState
ApplicationTeardownConfirm TVar Bool
confirmed
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum
[ forall a. TVar a -> STM a
readTVar TVar Bool
confirmed forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check
, forall a. TVar a -> STM a
readTVar TVar Bool
timeout forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check
]
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar ApplicationState
state ApplicationState
ApplicationTearingDown
ApplicationTeardownConfirm TVar Bool
confirmed -> do
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum
[ forall a. TVar a -> STM a
readTVar TVar Bool
confirmed forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check
, forall a. TVar a -> STM a
readTVar TVar Bool
timeout forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Bool -> STM ()
check
]
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM ()
writeTVar TVar ApplicationState
state ApplicationState
ApplicationTearingDown
ApplicationState
ApplicationTearingDown ->
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
switchToRunning :: TVar ApplicationState -> STM ()
switchToRunning :: TVar ApplicationState -> STM ()
switchToRunning TVar ApplicationState
state = forall a. TVar a -> STM a
readTVar TVar ApplicationState
state forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ApplicationStarting{} -> do
forall a. TVar a -> a -> STM ()
writeTVar TVar ApplicationState
state ApplicationState
ApplicationRunning
ApplicationRunning{} -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
ApplicationTeardownConfirm{} -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
ApplicationState
ApplicationTearingDown -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()