module Interpreter.Lib.Concurrency where import Control.Concurrent import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TMVar import Control.Exception (AsyncException(..), SomeException, catch, toException) import Control.Monad.IO.Class import Control.Monad.State.Strict import Data.Coerce import Interpreter.Common import Interpreter.Interpreter builtInLaunchThread :: BuiltInFnWithDoc '[ '("callback_thread", Callback), '("callback_arg", Value) ] builtInLaunchThread ((coerce -> (processCb :: Callback)) :> (coerce -> threadArg) :> EmptyArgs) = do istate <- get liftIO $ do resultRef <- newEmptyTMVarIO threadId <- forkIO $ flip catch (asynExHandler resultRef) $ do (r, _) <- flip runStateT istate (evaluateCallback processCb [threadArg]) atomically $ putTMVar resultRef $ case r of Just r' -> Right r' Nothing -> Left (toException MissingProcedureReturn) pure $ Just $ ThreadRef $ ThreadInfo threadId resultRef where asynExHandler :: TMVar (Either SomeException Value) -> AsyncException -> IO () asynExHandler ref e = atomically $ putTMVar ref $ Left $ toException e builtInKillThread :: BuiltInFnWithDoc '[ '("thread_result", ThreadInfo) ] builtInKillThread ((coerce -> (ThreadInfo threadId _)) :> EmptyArgs) = do liftIO $ killThread threadId pure Nothing builtInAwait :: BuiltInFnWithDoc '[ '("thread_result", ThreadInfo) ] builtInAwait ((coerce -> (ThreadInfo _ pmvar)) :> EmptyArgs) = do void $ liftIO $ atomically $ readTMVar pmvar pure Nothing builtInAwaitResult :: BuiltInFnWithDoc '[ '("thread_result", ThreadInfo) ] builtInAwaitResult ((coerce -> (ThreadInfo _ pmvar)) :> EmptyArgs) = (liftIO $ atomically $ readTMVar pmvar) >>= \case Right x -> pure $ Just x Left e -> throwErr e builtInNewChannel :: BuiltInFnWithDoc '[] builtInNewChannel _ = (pure . Channel . ChannelRef) <$> liftIO newTChanIO builtInWriteChannel :: BuiltInFnWithDoc '[ '("channel_ref", ChannelRef), '("value", Value)] builtInWriteChannel ((coerce -> (ChannelRef chan)) :> (coerce -> val) :> _) = do liftIO $ atomically $ writeTChan chan val pure Nothing builtInReadChannel :: BuiltInFnWithDoc '[ '("channel_ref", ChannelRef)] builtInReadChannel ((coerce -> (ChannelRef chan)) :> _) = Just <$> (liftIO $ atomically $ readTChan chan) builtInNewRef :: BuiltInFnWithDoc '[ '("init_value", Value) ] builtInNewRef ((coerce -> (v :: Value)) :> EmptyArgs) = do ref <- liftIO $ newTMVarIO v pure $ Just $ Ref $ MutableRef ref builtInWriteRef :: BuiltInFnWithDoc '[ '("ref", MutableRef), '("new_value", Value) ] builtInWriteRef ((coerce -> (MutableRef ref)) :> (coerce -> v) :> EmptyArgs) = do void $ liftIO $ atomically $ swapTMVar ref v pure Nothing builtInReadRef :: BuiltInFnWithDoc '[ '("ref", MutableRef) ] builtInReadRef ((coerce -> (MutableRef ref)) :> EmptyArgs) = do v <- liftIO $ atomically $ readTMVar ref pure $ Just v builtInModifyRef :: BuiltInFnWithDoc '[ '("ref", MutableRef), '("callback", Callback) ] builtInModifyRef ((coerce -> (MutableRef ref)) :> (coerce -> callback) :> EmptyArgs) = do v <- liftIO $ atomically $ takeTMVar ref evaluateCallback callback [v] >>= \case Just v' -> liftIO $ atomically $ putTMVar ref v' Nothing -> throwErr MissingProcedureReturn pure Nothing