{--*-Mode:haskell;coding:utf-8;tab-width:4;c-basic-offset:4;indent-tabs-mode:()-*- ex: set ft=haskell fenc=utf-8 sts=4 ts=4 sw=4 et nomod: -} {- BSD LICENSE Copyright (c) 2017, Michael Truog All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. * All advertising materials mentioning features or use of this software must display the following acknowledgment: This product includes software developed by Michael Truog * The name of the author may not be used to endorse or promote products derived from this software without specific prior written permission THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -} module Foreign.CloudI ( Instance.RequestType(..) , Instance.Source , Instance.Response(..) , Instance.Callback , Instance.T , transIdNull , invalidInputError , messageDecodingError , terminateError , Exception , api , threadCount , subscribe , subscribeCount , unsubscribe , sendAsync , sendSync , mcastAsync , forward_ , forwardAsync , forwardSync , return_ , returnAsync , returnSync , recvAsync , processIndex , processCount , processCountMax , processCountMin , prefix , timeoutInitialize , timeoutAsync , timeoutSync , timeoutTerminate , poll , threadCreate , threadsWait , infoKeyValueParse ) where import Prelude hiding (init,length) import Data.Bits (shiftL,(.|.)) import Data.Maybe (fromMaybe) import Data.Typeable (Typeable) import qualified Control.Exception as Exception import qualified Control.Concurrent as Concurrent import qualified Data.Array.IArray as IArray import qualified Data.Binary.Get as Get import qualified Data.ByteString as ByteString import qualified Data.ByteString.Builder as Builder import qualified Data.ByteString.Char8 as Char8 import qualified Data.ByteString.Lazy as LazyByteString import qualified Data.List as List import qualified Data.Map.Strict as Map import qualified Data.Monoid as Monoid import qualified Data.Sequence as Sequence import qualified Data.Time.Clock as Clock import qualified Data.Time.Clock.POSIX as POSIX (getPOSIXTime) import qualified Data.Word as Word import qualified Foreign.C.Types as C import qualified Foreign.Erlang as Erlang import qualified Foreign.CloudI.Instance as Instance import qualified System.IO as SysIO import qualified System.IO.Unsafe as Unsafe import qualified System.Posix.Env as POSIX (getEnv) type Array = IArray.Array type Builder = Builder.Builder type ByteString = ByteString.ByteString type Get = Get.Get type Handle = SysIO.Handle type LazyByteString = LazyByteString.ByteString type Map = Map.Map type RequestType = Instance.RequestType type SomeException = Exception.SomeException type Source = Instance.Source type ThreadId = Concurrent.ThreadId type Word32 = Word.Word32 messageInit :: Word32 messageInit = 1 messageSendAsync :: Word32 messageSendAsync = 2 messageSendSync :: Word32 messageSendSync = 3 messageRecvAsync :: Word32 messageRecvAsync = 4 messageReturnAsync :: Word32 messageReturnAsync = 5 messageReturnSync :: Word32 messageReturnSync = 6 messageReturnsAsync :: Word32 messageReturnsAsync = 7 messageKeepalive :: Word32 messageKeepalive = 8 messageReinit :: Word32 messageReinit = 9 messageSubscribeCount :: Word32 messageSubscribeCount = 10 messageTerm :: Word32 messageTerm = 11 data Message = MessageSend ( RequestType, ByteString, ByteString, ByteString, ByteString, Int, Int, ByteString, Source) | MessageKeepalive transIdNull :: ByteString transIdNull = Char8.pack $ List.replicate 16 '\0' invalidInputError :: String invalidInputError = "Invalid Input" messageDecodingError :: String messageDecodingError = "Message Decoding Error" terminateError :: String terminateError = "Terminate" data Exception s = ReturnSync (Instance.T s) | ReturnAsync (Instance.T s) | ForwardSync (Instance.T s) | ForwardAsync (Instance.T s) deriving (Show, Typeable) instance Typeable s => Exception.Exception (Exception s) printException :: String -> IO () printException str = SysIO.hPutStrLn SysIO.stderr ("Exception: " ++ str) printError :: String -> IO () printError str = SysIO.hPutStrLn SysIO.stderr ("Error: " ++ str) data CallbackResult s = ReturnI (ByteString, ByteString, s, Instance.T s) | ForwardI (ByteString, ByteString, ByteString, Int, Int, s, Instance.T s) | Finished (Instance.T s) type Result a = Either String a api :: Typeable s => Int -> s -> IO (Result (Instance.T s)) api threadIndex state = do SysIO.hSetEncoding SysIO.stdout SysIO.utf8 SysIO.hSetBuffering SysIO.stdout SysIO.LineBuffering SysIO.hSetEncoding SysIO.stderr SysIO.utf8 SysIO.hSetBuffering SysIO.stderr SysIO.LineBuffering protocolValue <- POSIX.getEnv "CLOUDI_API_INIT_PROTOCOL" bufferSizeValue <- POSIX.getEnv "CLOUDI_API_INIT_BUFFER_SIZE" case (protocolValue, bufferSizeValue) of (Just protocol, Just bufferSizeStr) -> let bufferSize = read bufferSizeStr :: Int fd = C.CInt $ fromIntegral (threadIndex + 3) useHeader = protocol /= "udp" timeoutTerminate' = 1000 initTerms = Erlang.OtpErlangAtom (Char8.pack "init") in case Erlang.termToBinary initTerms (-1) of Left err -> return $ Left $ show err Right initBinary -> do api0 <- Instance.make state protocol fd useHeader bufferSize timeoutTerminate' send api0 initBinary result <- pollRequest api0 (-1) False case result of Left err -> return $ Left err Right (_, api1) -> return $ Right api1 (_, _) -> return $ Left invalidInputError threadCount :: IO (Result Int) threadCount = do threadCountValue <- POSIX.getEnv "CLOUDI_API_INIT_THREAD_COUNT" case threadCountValue of Nothing -> return $ Left invalidInputError Just threadCountStr -> return $ Right (read threadCountStr :: Int) subscribe :: Instance.T s -> ByteString -> Instance.Callback s -> IO (Result (Instance.T s)) subscribe api0 pattern f = let subscribeTerms = Erlang.OtpErlangTuple [ Erlang.OtpErlangAtom (Char8.pack "subscribe") , Erlang.OtpErlangString pattern] in case Erlang.termToBinary subscribeTerms (-1) of Left err -> return $ Left $ show err Right subscribeBinary -> do send api0 subscribeBinary return $ Right $ Instance.callbacksAdd api0 pattern f subscribeCount :: Typeable s => Instance.T s -> ByteString -> IO (Result (Int, Instance.T s)) subscribeCount api0 pattern = let subscribeCountTerms = Erlang.OtpErlangTuple [ Erlang.OtpErlangAtom (Char8.pack "subscribe_count") , Erlang.OtpErlangString pattern] in case Erlang.termToBinary subscribeCountTerms (-1) of Left err -> return $ Left $ show err Right subscribeCountBinary -> do send api0 subscribeCountBinary result <- pollRequest api0 (-1) False case result of Left err -> return $ Left err Right (_, api1@Instance.T{Instance.subscribeCount = count}) -> return $ Right (count, api1) unsubscribe :: Instance.T s -> ByteString -> IO (Result (Instance.T s)) unsubscribe api0 pattern = let unsubscribeTerms = Erlang.OtpErlangTuple [ Erlang.OtpErlangAtom (Char8.pack "unsubscribe") , Erlang.OtpErlangString pattern] in case Erlang.termToBinary unsubscribeTerms (-1) of Left err -> return $ Left $ show err Right unsubscribeBinary -> do send api0 unsubscribeBinary return $ Right $ Instance.callbacksRemove api0 pattern sendAsync :: Typeable s => Instance.T s -> ByteString -> ByteString -> Maybe Int -> Maybe ByteString -> Maybe Int -> IO (Result (ByteString, Instance.T s)) sendAsync api0@Instance.T{ Instance.timeoutAsync = timeoutAsync' , Instance.priorityDefault = priorityDefault} name request timeoutOpt requestInfoOpt priorityOpt = let timeout = fromMaybe timeoutAsync' timeoutOpt requestInfo = fromMaybe ByteString.empty requestInfoOpt priority = fromMaybe priorityDefault priorityOpt sendAsyncTerms = Erlang.OtpErlangTuple [ Erlang.OtpErlangAtom (Char8.pack "send_async") , Erlang.OtpErlangString name , Erlang.OtpErlangBinary requestInfo , Erlang.OtpErlangBinary request , Erlang.OtpErlangInteger timeout , Erlang.OtpErlangInteger priority] in case Erlang.termToBinary sendAsyncTerms (-1) of Left err -> return $ Left $ show err Right sendAsyncBinary -> do send api0 sendAsyncBinary result <- pollRequest api0 (-1) False case result of Left err -> return $ Left err Right (_, api1@Instance.T{Instance.transId = transId}) -> return $ Right (transId, api1) sendSync :: Typeable s => Instance.T s -> ByteString -> ByteString -> Maybe Int -> Maybe ByteString -> Maybe Int -> IO (Result (ByteString, ByteString, ByteString, Instance.T s)) sendSync api0@Instance.T{ Instance.timeoutSync = timeoutSync' , Instance.priorityDefault = priorityDefault} name request timeoutOpt requestInfoOpt priorityOpt = let timeout = fromMaybe timeoutSync' timeoutOpt requestInfo = fromMaybe ByteString.empty requestInfoOpt priority = fromMaybe priorityDefault priorityOpt sendSyncTerms = Erlang.OtpErlangTuple [ Erlang.OtpErlangAtom (Char8.pack "send_sync") , Erlang.OtpErlangString name , Erlang.OtpErlangBinary requestInfo , Erlang.OtpErlangBinary request , Erlang.OtpErlangInteger timeout , Erlang.OtpErlangInteger priority] in case Erlang.termToBinary sendSyncTerms (-1) of Left err -> return $ Left $ show err Right sendSyncBinary -> do send api0 sendSyncBinary result <- pollRequest api0 (-1) False case result of Left err -> return $ Left err Right (_, api1@Instance.T{ Instance.responseInfo = responseInfo , Instance.response = response , Instance.transId = transId}) -> return $ Right (responseInfo, response, transId, api1) mcastAsync :: Typeable s => Instance.T s -> ByteString -> ByteString -> Maybe Int -> Maybe ByteString -> Maybe Int -> IO (Result (Array Int ByteString, Instance.T s)) mcastAsync api0@Instance.T{ Instance.timeoutAsync = timeoutAsync' , Instance.priorityDefault = priorityDefault} name request timeoutOpt requestInfoOpt priorityOpt = let timeout = fromMaybe timeoutAsync' timeoutOpt requestInfo = fromMaybe ByteString.empty requestInfoOpt priority = fromMaybe priorityDefault priorityOpt mcastAsyncTerms = Erlang.OtpErlangTuple [ Erlang.OtpErlangAtom (Char8.pack "mcast_async") , Erlang.OtpErlangString name , Erlang.OtpErlangBinary requestInfo , Erlang.OtpErlangBinary request , Erlang.OtpErlangInteger timeout , Erlang.OtpErlangInteger priority] in case Erlang.termToBinary mcastAsyncTerms (-1) of Left err -> return $ Left $ show err Right mcastAsyncBinary -> do send api0 mcastAsyncBinary result <- pollRequest api0 (-1) False case result of Left err -> return $ Left err Right (_, api1@Instance.T{Instance.transIds = transIds}) -> return $ Right (transIds, api1) forward_ :: Typeable s => Instance.T s -> Instance.RequestType -> ByteString -> ByteString -> ByteString -> Int -> Int -> ByteString -> Source -> IO () forward_ api0 Instance.ASYNC = forwardAsync api0 forward_ api0 Instance.SYNC = forwardSync api0 forwardAsyncI :: Instance.T s -> ByteString -> ByteString -> ByteString -> Int -> Int -> ByteString -> Source -> IO (Result (Instance.T s)) forwardAsyncI api0@Instance.T{ Instance.requestTimeoutAdjustment = requestTimeoutAdjustment , Instance.requestTimer = requestTimer , Instance.requestTimeout = requestTimeout} name responseInfo response timeout priority transId pid = do timeoutNew <- if requestTimeoutAdjustment && timeout == requestTimeout then timeoutAdjustment requestTimer timeout else return timeout let forwardTerms = Erlang.OtpErlangTuple [ Erlang.OtpErlangAtom (Char8.pack "forward_async") , Erlang.OtpErlangString name , Erlang.OtpErlangBinary responseInfo , Erlang.OtpErlangBinary response , Erlang.OtpErlangInteger timeoutNew , Erlang.OtpErlangInteger priority , Erlang.OtpErlangBinary transId , Erlang.OtpErlangPid pid] case Erlang.termToBinary forwardTerms (-1) of Left err -> return $ Left $ show err Right forwardBinary -> do send api0 forwardBinary return $ Right api0 forwardAsync :: Typeable s => Instance.T s -> ByteString -> ByteString -> ByteString -> Int -> Int -> ByteString -> Source -> IO () forwardAsync api0 name responseInfo response timeout priority transId pid = do result <- forwardAsyncI api0 name responseInfo response timeout priority transId pid case result of Left err -> error err Right api1 -> Exception.throwIO $ ForwardAsync api1 forwardSyncI :: Instance.T s -> ByteString -> ByteString -> ByteString -> Int -> Int -> ByteString -> Source -> IO (Result (Instance.T s)) forwardSyncI api0@Instance.T{ Instance.requestTimeoutAdjustment = requestTimeoutAdjustment , Instance.requestTimer = requestTimer , Instance.requestTimeout = requestTimeout} name responseInfo response timeout priority transId pid = do timeoutNew <- if requestTimeoutAdjustment && timeout == requestTimeout then timeoutAdjustment requestTimer timeout else return timeout let forwardTerms = Erlang.OtpErlangTuple [ Erlang.OtpErlangAtom (Char8.pack "forward_sync") , Erlang.OtpErlangString name , Erlang.OtpErlangBinary responseInfo , Erlang.OtpErlangBinary response , Erlang.OtpErlangInteger timeoutNew , Erlang.OtpErlangInteger priority , Erlang.OtpErlangBinary transId , Erlang.OtpErlangPid pid] case Erlang.termToBinary forwardTerms (-1) of Left err -> return $ Left $ show err Right forwardBinary -> do send api0 forwardBinary return $ Right api0 forwardSync :: Typeable s => Instance.T s -> ByteString -> ByteString -> ByteString -> Int -> Int -> ByteString -> Source -> IO () forwardSync api0 name responseInfo response timeout priority transId pid = do result <- forwardSyncI api0 name responseInfo response timeout priority transId pid case result of Left err -> error err Right api1 -> Exception.throwIO $ ForwardSync api1 return_ :: Typeable s => Instance.T s -> Instance.RequestType -> ByteString -> ByteString -> ByteString -> ByteString -> Int -> ByteString -> Source -> IO () return_ api0 Instance.ASYNC = returnAsync api0 return_ api0 Instance.SYNC = returnSync api0 returnAsyncI :: Instance.T s -> ByteString -> ByteString -> ByteString -> ByteString -> Int -> ByteString -> Source -> IO (Result (Instance.T s)) returnAsyncI api0@Instance.T{ Instance.requestTimeoutAdjustment = requestTimeoutAdjustment , Instance.requestTimer = requestTimer , Instance.requestTimeout = requestTimeout} name pattern responseInfo response timeout transId pid = do timeoutNew <- if requestTimeoutAdjustment && timeout == requestTimeout then timeoutAdjustment requestTimer timeout else return timeout let returnTerms = Erlang.OtpErlangTuple [ Erlang.OtpErlangAtom (Char8.pack "return_async") , Erlang.OtpErlangString name , Erlang.OtpErlangString pattern , Erlang.OtpErlangBinary responseInfo , Erlang.OtpErlangBinary response , Erlang.OtpErlangInteger timeoutNew , Erlang.OtpErlangBinary transId , Erlang.OtpErlangPid pid] case Erlang.termToBinary returnTerms (-1) of Left err -> return $ Left $ show err Right returnBinary -> do send api0 returnBinary return $ Right api0 returnAsync :: Typeable s => Instance.T s -> ByteString -> ByteString -> ByteString -> ByteString -> Int -> ByteString -> Source -> IO () returnAsync api0 name pattern responseInfo response timeout transId pid = do result <- returnAsyncI api0 name pattern responseInfo response timeout transId pid case result of Left err -> error err Right api1 -> Exception.throwIO $ ReturnAsync api1 returnSyncI :: Instance.T s -> ByteString -> ByteString -> ByteString -> ByteString -> Int -> ByteString -> Source -> IO (Result (Instance.T s)) returnSyncI api0@Instance.T{ Instance.requestTimeoutAdjustment = requestTimeoutAdjustment , Instance.requestTimer = requestTimer , Instance.requestTimeout = requestTimeout} name pattern responseInfo response timeout transId pid = do timeoutNew <- if requestTimeoutAdjustment && timeout == requestTimeout then timeoutAdjustment requestTimer timeout else return timeout let returnTerms = Erlang.OtpErlangTuple [ Erlang.OtpErlangAtom (Char8.pack "return_sync") , Erlang.OtpErlangString name , Erlang.OtpErlangString pattern , Erlang.OtpErlangBinary responseInfo , Erlang.OtpErlangBinary response , Erlang.OtpErlangInteger timeoutNew , Erlang.OtpErlangBinary transId , Erlang.OtpErlangPid pid] case Erlang.termToBinary returnTerms (-1) of Left err -> return $ Left $ show err Right returnBinary -> do send api0 returnBinary return $ Right api0 returnSync :: Typeable s => Instance.T s -> ByteString -> ByteString -> ByteString -> ByteString -> Int -> ByteString -> Source -> IO () returnSync api0 name pattern responseInfo response timeout transId pid = do result <- returnSyncI api0 name pattern responseInfo response timeout transId pid case result of Left err -> error err Right api1 -> Exception.throwIO $ ReturnSync api1 recvAsync :: Typeable s => Instance.T s -> Maybe Int -> Maybe ByteString -> Maybe Bool -> IO (Result (ByteString, ByteString, ByteString, Instance.T s)) recvAsync api0@Instance.T{Instance.timeoutSync = timeoutSync'} timeoutOpt transIdOpt consumeOpt = let timeout = fromMaybe timeoutSync' timeoutOpt transId = fromMaybe transIdNull transIdOpt consume = fromMaybe True consumeOpt recvAsyncTerms = Erlang.OtpErlangTuple [ Erlang.OtpErlangAtom (Char8.pack "recv_async") , Erlang.OtpErlangInteger timeout , Erlang.OtpErlangBinary transId , Erlang.OtpErlangAtomBool consume] in case Erlang.termToBinary recvAsyncTerms (-1) of Left err -> return $ Left $ show err Right recvAsyncBinary -> do send api0 recvAsyncBinary result <- pollRequest api0 (-1) False case result of Left err -> return $ Left err Right (_, api1@Instance.T{ Instance.responseInfo = responseInfo , Instance.response = response , Instance.transId = transId'}) -> return $ Right (responseInfo, response, transId', api1) processIndex :: Instance.T s -> Int processIndex Instance.T{Instance.processIndex = processIndex'} = processIndex' processCount :: Instance.T s -> Int processCount Instance.T{Instance.processCount = processCount'} = processCount' processCountMax :: Instance.T s -> Int processCountMax Instance.T{Instance.processCountMax = processCountMax'} = processCountMax' processCountMin :: Instance.T s -> Int processCountMin Instance.T{Instance.processCountMin = processCountMin'} = processCountMin' prefix :: Instance.T s -> ByteString prefix Instance.T{Instance.prefix = prefix'} = prefix' timeoutInitialize :: Instance.T s -> Int timeoutInitialize Instance.T{Instance.timeoutInitialize = timeoutInitialize'} = timeoutInitialize' timeoutAsync :: Instance.T s -> Int timeoutAsync Instance.T{Instance.timeoutAsync = timeoutAsync'} = timeoutAsync' timeoutSync :: Instance.T s -> Int timeoutSync Instance.T{Instance.timeoutSync = timeoutSync'} = timeoutSync' timeoutTerminate :: Instance.T s -> Int timeoutTerminate Instance.T{Instance.timeoutTerminate = timeoutTerminate'} = timeoutTerminate' nullResponse :: RequestType -> ByteString -> ByteString -> ByteString -> ByteString -> Int -> Int -> ByteString -> Source -> s -> Instance.T s -> IO (Instance.Response s) nullResponse _ _ _ _ _ _ _ _ _ state api0 = return $ Instance.Null (state, api0) callback :: Typeable s => Instance.T s -> (RequestType, ByteString, ByteString, ByteString, ByteString, Int, Int, ByteString, Source) -> IO (Result (Instance.T s)) callback api0@Instance.T{ Instance.state = state , Instance.callbacks = callbacks , Instance.requestTimeoutAdjustment = requestTimeoutAdjustment} (requestType, name, pattern, requestInfo, request, timeout, priority, transId, pid) = do api1 <- if requestTimeoutAdjustment then do requestTimer <- POSIX.getPOSIXTime return api0{ Instance.requestTimer = requestTimer , Instance.requestTimeout = timeout} else return api0 let (callbackF, callbacksNew) = case Map.lookup pattern callbacks of Nothing -> (nullResponse, callbacks) Just functionQueue -> let f = Sequence.index functionQueue 0 functionQueueNew = (Sequence.|>) (Sequence.drop 0 functionQueue) f in (f, Map.insert pattern functionQueueNew callbacks) api2 = api1{Instance.callbacks = callbacksNew} empty = ByteString.empty callbackResultValue <- Exception.try $ case requestType of Instance.ASYNC -> do callbackResultAsyncValue <- Exception.try $ callbackF requestType name pattern requestInfo request timeout priority transId pid state api2 case callbackResultAsyncValue of Left (ReturnSync api3) -> do printException "Synchronous Call Return Invalid" return $ Finished api3 Left (ReturnAsync api3) -> return $ Finished api3 Left (ForwardSync api3) -> do printException "Synchronous Call Forward Invalid" return $ Finished api3 Left (ForwardAsync api3) -> return $ Finished api3 Right (Instance.ResponseInfo (v0, v1, v2, v3)) -> return $ ReturnI (v0, v1, v2, v3) Right (Instance.Response (v0, v1, v2)) -> return $ ReturnI (empty, v0, v1, v2) Right (Instance.Forward (v0, v1, v2, v3, v4)) -> return $ ForwardI (v0, v1, v2, timeout, priority, v3, v4) Right (Instance.Forward_ (v0, v1, v2, v3, v4, v5, v6)) -> return $ ForwardI (v0, v1, v2, v3, v4, v5, v6) Right (Instance.Null (v0, v1)) -> return $ ReturnI (empty, empty, v0, v1) Right (Instance.NullError (err, v0, v1)) -> do printError err return $ ReturnI (empty, empty, v0, v1) Instance.SYNC -> do callbackResultSyncValue <- Exception.try $ callbackF requestType name pattern requestInfo request timeout priority transId pid state api2 case callbackResultSyncValue of Left (ReturnSync api3) -> return $ Finished api3 Left (ReturnAsync api3) -> do printException "Asynchronous Call Return Invalid" return $ Finished api3 Left (ForwardSync api3) -> return $ Finished api3 Left (ForwardAsync api3) -> do printException "Asynchronous Call Forward Invalid" return $ Finished api3 Right (Instance.ResponseInfo (v0, v1, v2, v3)) -> return $ ReturnI (v0, v1, v2, v3) Right (Instance.Response (v0, v1, v2)) -> return $ ReturnI (empty, v0, v1, v2) Right (Instance.Forward (v0, v1, v2, v3, v4)) -> return $ ForwardI (v0, v1, v2, timeout, priority, v3, v4) Right (Instance.Forward_ (v0, v1, v2, v3, v4, v5, v6)) -> return $ ForwardI (v0, v1, v2, v3, v4, v5, v6) Right (Instance.Null (v0, v1)) -> return $ ReturnI (empty, empty, v0, v1) Right (Instance.NullError (err, v0, v1)) -> do printError err return $ ReturnI (empty, empty, v0, v1) callbackResultType <- case callbackResultValue of Left exception -> do printException $ show (exception :: Exception.SomeException) return $ Finished api2 Right callbackResult -> return $ callbackResult case requestType of Instance.ASYNC -> case callbackResultType of Finished api4 -> return $ Right api4 ReturnI (responseInfo, response, state', api4) -> returnAsyncI api4{Instance.state = state'} name pattern responseInfo response timeout transId pid ForwardI (name', requestInfo', request', timeout', priority', state', api4) -> forwardAsyncI api4{Instance.state = state'} name' requestInfo' request' timeout' priority' transId pid Instance.SYNC -> case callbackResultType of Finished api4 -> return $ Right api4 ReturnI (responseInfo, response, state', api4) -> returnSyncI api4{Instance.state = state'} name pattern responseInfo response timeout transId pid ForwardI (name', requestInfo', request', timeout', priority', state', api4) -> forwardSyncI api4{Instance.state = state'} name' requestInfo' request' timeout' priority' transId pid handleEvents :: [Message] -> Instance.T s -> Bool -> Word32 -> Get ([Message], Instance.T s) handleEvents messages api0 external cmd0 = do cmd <- if cmd0 == 0 then Get.getWord32host else return cmd0 case () of _ | cmd == messageTerm -> let api1 = api0{ Instance.terminate = True , Instance.timeout = Just False} in if external then return ([], api1) else fail terminateError | cmd == messageReinit -> do processCount' <- Get.getWord32host timeoutAsync' <- Get.getWord32host timeoutSync' <- Get.getWord32host priorityDefault <- Get.getInt8 requestTimeoutAdjustment <- Get.getWord8 let api1 = Instance.reinit api0 processCount' timeoutAsync' timeoutSync' priorityDefault requestTimeoutAdjustment empty <- Get.isEmpty if not empty then handleEvents messages api1 external 0 else return (messages, api1) | cmd == messageKeepalive -> do let messagesNew = MessageKeepalive:messages empty <- Get.isEmpty if not empty then handleEvents messagesNew api0 external 0 else return (messagesNew, api0) | otherwise -> fail messageDecodingError pollRequestDataGet :: [Message] -> Instance.T s -> Bool -> Get ([Message], Instance.T s) pollRequestDataGet messages api0 external = do cmd <- Get.getWord32host case () of _ | cmd == messageInit -> do processIndex' <- Get.getWord32host processCount' <- Get.getWord32host processCountMax' <- Get.getWord32host processCountMin' <- Get.getWord32host prefixSize <- Get.getWord32host prefix' <- Get.getByteString $ fromIntegral prefixSize - 1 Get.skip 1 timeoutInitialize' <- Get.getWord32host timeoutAsync' <- Get.getWord32host timeoutSync' <- Get.getWord32host timeoutTerminate' <- Get.getWord32host priorityDefault <- Get.getInt8 requestTimeoutAdjustment <- Get.getWord8 let api1 = Instance.init api0 processIndex' processCount' processCountMax' processCountMin' prefix' timeoutInitialize' timeoutAsync' timeoutSync' timeoutTerminate' priorityDefault requestTimeoutAdjustment empty <- Get.isEmpty if not empty then handleEvents messages api1 external 0 else return (messages, api1) | cmd == messageSendAsync || cmd == messageSendSync -> do nameSize <- Get.getWord32host name <- Get.getByteString $ fromIntegral nameSize - 1 Get.skip 1 patternSize <- Get.getWord32host pattern <- Get.getByteString $ fromIntegral patternSize - 1 Get.skip 1 requestInfoSize <- Get.getWord32host requestInfo <- Get.getByteString $ fromIntegral requestInfoSize Get.skip 1 requestSize <- Get.getWord32host request <- Get.getByteString $ fromIntegral requestSize Get.skip 1 timeout <- Get.getWord32host priority <- Get.getInt8 transId <- Get.getByteString 16 pidSize <- Get.getWord32host pidData <- Get.getLazyByteString $ fromIntegral pidSize empty <- Get.isEmpty case Erlang.binaryToTerm pidData of Left err -> fail $ show err Right (Erlang.OtpErlangPid (pid)) -> let requestType = if cmd == messageSendAsync then Instance.ASYNC else -- cmd == messageSendSync Instance.SYNC messagesNew = (MessageSend ( requestType, name, pattern, requestInfo, request, fromIntegral timeout, fromIntegral priority, transId, pid)):messages in if not empty then handleEvents messagesNew api0 external 0 else return (messagesNew, api0) Right _ -> fail messageDecodingError | cmd == messageRecvAsync || cmd == messageReturnSync -> do responseInfoSize <- Get.getWord32host responseInfo <- Get.getByteString $ fromIntegral responseInfoSize responseSize <- Get.getWord32host response <- Get.getByteString $ fromIntegral responseSize transId <- Get.getByteString 16 empty <- Get.isEmpty let api1 = Instance.setResponse api0 responseInfo response transId if not empty then handleEvents messages api1 external 0 else return (messages, api1) | cmd == messageReturnAsync -> do transId <- Get.getByteString 16 empty <- Get.isEmpty let api1 = Instance.setTransId api0 transId if not empty then handleEvents messages api1 external 0 else return (messages, api1) | cmd == messageReturnsAsync -> do transIdCount <- Get.getWord32host transIds <- Get.getByteString $ 16 * (fromIntegral transIdCount) empty <- Get.isEmpty let api1 = Instance.setTransIds api0 transIds transIdCount if not empty then handleEvents messages api1 external 0 else return (messages, api1) | cmd == messageSubscribeCount -> do subscribeCount' <- Get.getWord32host empty <- Get.isEmpty let api1 = Instance.setSubscribeCount api0 subscribeCount' if not empty then handleEvents messages api1 external 0 else return (messages, api1) | cmd == messageTerm -> handleEvents messages api0 external cmd | cmd == messageReinit -> do processCount' <- Get.getWord32host timeoutAsync' <- Get.getWord32host timeoutSync' <- Get.getWord32host priorityDefault <- Get.getInt8 requestTimeoutAdjustment <- Get.getWord8 let api1 = Instance.reinit api0 processCount' timeoutAsync' timeoutSync' priorityDefault requestTimeoutAdjustment empty <- Get.isEmpty if not empty then pollRequestDataGet messages api1 external else return (messages, api1) | cmd == messageKeepalive -> do let messagesNew = MessageKeepalive:messages empty <- Get.isEmpty if not empty then pollRequestDataGet messagesNew api0 external else return (messagesNew, api0) | otherwise -> fail messageDecodingError pollRequestDataProcess :: Typeable s => [Message] -> Instance.T s -> IO (Result (Instance.T s)) pollRequestDataProcess [] api0 = return $ Right api0 pollRequestDataProcess (message:messages) api0 = case message of MessageSend callbackData -> do callbackResult <- callback api0 callbackData case callbackResult of Left err -> return $ Left err Right api1 -> pollRequestDataProcess messages api1 MessageKeepalive -> let aliveTerms = Erlang.OtpErlangAtom (Char8.pack "keepalive") in case Erlang.termToBinary aliveTerms (-1) of Left err -> return $ Left $ show err Right aliveBinary -> do send api0 aliveBinary pollRequestDataProcess messages api0 pollRequestData :: Typeable s => Instance.T s -> Bool -> LazyByteString -> IO (Result (Instance.T s)) pollRequestData api0 external dataIn = case Get.runGetOrFail (pollRequestDataGet [] api0 external) dataIn of Left (_, _, err) -> return $ Left err Right (_, _, (messages, api1)) -> pollRequestDataProcess (List.reverse messages) api1 pollRequestLoop :: Typeable s => Instance.T s -> Int -> Bool -> Clock.NominalDiffTime -> IO (Result (Bool, Instance.T s)) pollRequestLoop api0@Instance.T{ Instance.socketHandle = socketHandle , Instance.bufferRecvSize = bufferRecvSize} timeout external pollTimer = do inputAvailable <- if bufferRecvSize > 0 then return True else SysIO.hWaitForInput socketHandle timeout if not inputAvailable then return $ Right (True, api0{Instance.timeout = Nothing}) else do (dataIn, _, api1) <- recv api0 dataResult <- pollRequestData api1 external dataIn case dataResult of Left err -> return $ Left err Right api2@Instance.T{Instance.timeout = Just result} -> return $ Right (result, api2{Instance.timeout = Nothing}) Right api2 -> do (pollTimerNew, timeoutNew) <- if timeout <= 0 then return (0, timeout) else timeoutAdjustmentPoll pollTimer timeout if timeout == 0 then return $ Right (True, api2{Instance.timeout = Nothing}) else pollRequestLoop api2 timeoutNew external pollTimerNew pollRequestLoopBegin :: Typeable s => Instance.T s -> Int -> Bool -> Clock.NominalDiffTime -> IO (Result (Bool, Instance.T s)) pollRequestLoopBegin api0 timeout external pollTimer = do result <- Exception.try (pollRequestLoop api0 timeout external pollTimer) case result of Left exception -> return $ Left $ show (exception :: SomeException) Right success -> return success pollRequest :: Typeable s => Instance.T s -> Int -> Bool -> IO (Result (Bool, Instance.T s)) pollRequest api0@Instance.T{ Instance.initializationComplete = initializationComplete , Instance.terminate = terminate} timeout external = if terminate then return $ Right (False, api0) else do pollTimer <- if timeout <= 0 then return 0 else POSIX.getPOSIXTime if external && not initializationComplete then let pollingTerms = Erlang.OtpErlangAtom (Char8.pack "polling") in case Erlang.termToBinary pollingTerms (-1) of Left err -> return $ Left $ show err Right pollingBinary -> do send api0 pollingBinary pollRequestLoopBegin api0{Instance.initializationComplete = True} timeout external pollTimer else pollRequestLoopBegin api0 timeout external pollTimer poll :: Typeable s => Instance.T s -> Int -> IO (Result (Bool, Instance.T s)) poll api0 timeout = pollRequest api0 timeout True send :: Instance.T s -> LazyByteString -> IO () send Instance.T{ Instance.useHeader = True , Instance.socketHandle = socketHandle} binary = do let total = fromIntegral (LazyByteString.length binary) :: Word32 LazyByteString.hPut socketHandle (Builder.toLazyByteString $ Builder.word32BE total `Monoid.mappend` Builder.lazyByteString binary) send Instance.T{ Instance.useHeader = False , Instance.socketHandle = socketHandle} binary = do LazyByteString.hPut socketHandle binary recvBuffer :: Builder -> Int -> Int -> Handle -> Int -> IO (Builder, Int) recvBuffer bufferRecv bufferRecvSize recvSize socketHandle bufferSize | recvSize <= bufferRecvSize = return (bufferRecv, bufferRecvSize) | otherwise = do fragment <- ByteString.hGetNonBlocking socketHandle bufferSize recvBuffer (bufferRecv `Monoid.mappend` Builder.byteString fragment) (bufferRecvSize + ByteString.length fragment) recvSize socketHandle bufferSize recvBufferAll :: Builder -> Int -> Handle -> Int -> IO (Builder, Int) recvBufferAll bufferRecv bufferRecvSize socketHandle bufferSize = do fragment <- ByteString.hGetNonBlocking socketHandle bufferSize let fragmentSize = ByteString.length fragment bufferRecvNew = bufferRecv `Monoid.mappend` Builder.byteString fragment bufferRecvSizeNew = bufferRecvSize + fragmentSize if fragmentSize == bufferSize then recvBufferAll bufferRecvNew bufferRecvSizeNew socketHandle bufferSize else return (bufferRecv, bufferRecvSize) recv :: Instance.T s -> IO (LazyByteString, Int, Instance.T s) recv api0@Instance.T{ Instance.useHeader = True , Instance.socketHandle = socketHandle , Instance.bufferSize = bufferSize , Instance.bufferRecv = bufferRecv , Instance.bufferRecvSize = bufferRecvSize} = do (bufferRecvHeader, bufferRecvHeaderSize) <- recvBuffer bufferRecv bufferRecvSize 4 socketHandle bufferSize let header0 = Builder.toLazyByteString bufferRecvHeader Just (byte0, header1) = LazyByteString.uncons header0 Just (byte1, header2) = LazyByteString.uncons header1 Just (byte2, header3) = LazyByteString.uncons header2 Just (byte3, headerRemaining) = LazyByteString.uncons header3 total = fromIntegral $ (fromIntegral byte0 :: Word32) `shiftL` 24 .|. (fromIntegral byte1 :: Word32) `shiftL` 16 .|. (fromIntegral byte2 :: Word32) `shiftL` 8 .|. (fromIntegral byte3 :: Word32) :: Int (bufferRecvAll, bufferRecvAllSize) <- recvBuffer (Builder.lazyByteString headerRemaining) (bufferRecvHeaderSize - 4) total socketHandle bufferSize let bufferRecvAllStr = Builder.toLazyByteString bufferRecvAll (bufferRecvData, bufferRecvNew) = LazyByteString.splitAt (fromIntegral total) bufferRecvAllStr return $ ( bufferRecvData , total , api0{ Instance.bufferRecv = Builder.lazyByteString bufferRecvNew , Instance.bufferRecvSize = bufferRecvAllSize - total}) recv api0@Instance.T{ Instance.useHeader = False , Instance.socketHandle = socketHandle , Instance.bufferSize = bufferSize , Instance.bufferRecv = bufferRecv , Instance.bufferRecvSize = bufferRecvSize} = do (bufferRecvAll, bufferRecvAllSize) <- recvBufferAll bufferRecv bufferRecvSize socketHandle bufferSize return $ ( Builder.toLazyByteString bufferRecvAll , bufferRecvAllSize , api0{ Instance.bufferRecv = Monoid.mempty , Instance.bufferRecvSize = 0}) timeoutAdjustment :: Clock.NominalDiffTime -> Int -> IO (Int) timeoutAdjustment t0 timeout = do t1 <- POSIX.getPOSIXTime if t1 <= t0 then return timeout else let elapsed = floor ((t1 - t0) * 1000) :: Integer timeoutValue = fromIntegral timeout :: Integer in if elapsed >= timeoutValue then return 0 else return $ fromIntegral $ timeoutValue - elapsed timeoutAdjustmentPoll :: Clock.NominalDiffTime -> Int -> IO (Clock.NominalDiffTime, Int) timeoutAdjustmentPoll t0 timeout = do t1 <- POSIX.getPOSIXTime if t1 <= t0 then return (t1, timeout) else let elapsed = floor ((t1 - t0) * 1000) :: Integer timeoutValue = fromIntegral timeout :: Integer in if elapsed >= timeoutValue then return (t1, 0) else return (t1, fromIntegral $ timeoutValue - elapsed) threadList :: Concurrent.MVar [Concurrent.MVar ()] threadList = Unsafe.unsafePerformIO (Concurrent.newMVar []) threadCreate :: (Int -> IO ()) -> Int -> IO ThreadId threadCreate f threadIndex = do thread <- Concurrent.newEmptyMVar threads <- Concurrent.takeMVar threadList Concurrent.putMVar threadList (thread:threads) threadCreateFork threadIndex (f threadIndex) (\_ -> Concurrent.putMVar thread ()) threadCreateFork :: Int -> IO a -> (Either SomeException a -> IO ()) -> IO ThreadId threadCreateFork threadIndex action afterF = -- similar to Concurrent.forkFinally Exception.mask $ \restore -> Concurrent.forkOn threadIndex (Exception.try (restore action) >>= afterF) threadsWait :: IO () threadsWait = do threads <- Concurrent.takeMVar threadList case threads of [] -> return () done:remaining -> do Concurrent.putMVar threadList remaining Concurrent.takeMVar done threadsWait textKeyValueParse :: ByteString -> Map ByteString [ByteString] textKeyValueParse text = let loop m [] = m loop m [v] = if v == ByteString.empty then m else error "not text_pairs" loop m (k:(v:l')) = case Map.lookup k m of Nothing -> loop (Map.insert k [v] m) l' Just v' -> loop (Map.insert k (v:v') m) l' in loop Map.empty (Char8.split '\0' text) infoKeyValueParse :: ByteString -> Map ByteString [ByteString] infoKeyValueParse = textKeyValueParse