{- | Client inner-loop

     This function is generally only needed if you are adding a new communication channel.
-}
processRemoteState :: IsAcidic st =>
                      IO CommChannel -- ^ (re-)connect function
                   -> IO (AcidState st)
processRemoteState reconnect
  = do cmdQueue    <- atomically newTQueue
       ccTMV       <- atomically newEmptyTMVar
       isClosed    <- newIORef False

       let actor :: Command -> IO (MVar Response)
           actor command =
               do debugStrLn "actor: begin."
                  readIORef isClosed >>= flip when (throwIO AcidStateClosed)
                  ref <- newEmptyMVar
                  atomically $ writeTQueue cmdQueue (command, ref)
                  debugStrLn "actor: end."
                  return ref

           expireQueue listenQueue =
               do mCallback <- atomically $ tryReadTQueue listenQueue
                  case mCallback of
                    Nothing         -> return ()
                    (Just callback) ->
                        do callback ConnectionError
                           expireQueue listenQueue

           handleReconnect :: SomeException -> IO ()
           handleReconnect e
             = case fromException e of
                 (Just ThreadKilled) ->
                     do debugStrLn "handleReconnect: ThreadKilled. Not attempting to reconnect."
                        return ()
                 _ ->
                   do debugStrLn "handleReconnect begin."
                      tmv <- atomically $ tryTakeTMVar ccTMV
                      case tmv of
                        Nothing ->
                            do debugStrLn "handleReconnect: error handling already in progress."
                               debugStrLn "handleReconnect end."
                               return ()
                        (Just (oldCC, oldListenQueue, oldListenerTID)) ->
                            do thisTID <- myThreadId
                               when (thisTID /= oldListenerTID) (killThread oldListenerTID)
                               ccClose oldCC
                               expireQueue oldListenQueue
                               cc <- reconnect
                               listenQueue <- atomically newTQueue
                               listenerTID <- forkIO $ listener cc listenQueue
                               atomically $ putTMVar ccTMV (cc, listenQueue, listenerTID)
                               debugStrLn "handleReconnect end."
                               return ()

           listener :: CommChannel -> TQueue (Response -> IO ()) -> IO ()
           listener cc listenQueue
             = getResponse Strict.empty `catch` handleReconnect
               where
                 getResponse leftover =
                     do debugStrLn "listener: listening for Response."
                        let go inp = case inp of
                                   Fail msg _     -> error msg
                                   Partial cont   -> do debugStrLn "listener: ccGetSome"
                                                        bs <- ccGetSome cc 1024
                                                        go (cont bs)
                                   Done resp rest -> do debugStrLn "listener: getting callback"
                                                        callback <- atomically $ readTQueue listenQueue
                                                        debugStrLn "listener: passing Response to callback"
                                                        callback (resp :: Response)
                                                        return rest
                        rest <- go (runGetPartial get leftover) -- `catch` (\e -> do handleReconnect e
                                                                --                   throwIO e
                                                                 --        )
                        getResponse rest

           actorThread :: IO ()
           actorThread = forever $
             do debugStrLn "actorThread: waiting for something to do."
                (cc, cmd) <- atomically $
                  do (cmd, ref)        <- readTQueue cmdQueue
                     (cc, listenQueue, _) <- readTMVar ccTMV
                     writeTQueue listenQueue (putMVar ref)
                     return (cc, cmd)
                debugStrLn "actorThread: sending command."
                ccPut cc (encode cmd) `catch` handleReconnect
                debugStrLn "actorThread: sent."
                return ()

           shutdown :: ThreadId -> IO ()
           shutdown actorTID =
               do debugStrLn "shutdown: update isClosed IORef to True."
                  writeIORef isClosed True
                  debugStrLn "shutdown: killing actor thread."
                  killThread actorTID
                  debugStrLn "shutdown: taking ccTMV."
                  (cc, listenQueue, listenerTID) <- atomically $ takeTMVar ccTMV -- FIXME: or should this by tryTakeTMVar
                  debugStrLn "shutdown: killing listener thread."
                  killThread listenerTID
                  debugStrLn "shutdown: expiring listen queue."
                  expireQueue  listenQueue
                  debugStrLn "shutdown: closing connection."
                  ccClose cc
                  return ()

       cc <- reconnect
       listenQueue <- atomically newTQueue

       actorTID    <- forkIO actorThread
       listenerTID <- forkIO $ listener cc listenQueue

       atomically $ putTMVar ccTMV (cc, listenQueue, listenerTID)

       return (toAcidState $ RemoteState actor (shutdown actorTID))