We now proceed to the proof of the server. The file is included for the sake of
completeness. In particular, remark that the whole state of the server is defined as
a single data type called {\tt State}. We will use this fact to prove
invariants on the whole server state.
module Parry.Server (
) where
import Control.Concurrent
import Control.Exception as E
import Control.Monad
import Control.Concurrent.MSem as Sem
import Network
import System.IO
import System.Directory
import Data.List
import Data.Time.Format()
import Data.Time.Clock.POSIX
#ifdef UNIX
import System.Posix.Signals
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import qualified Data.Map as M
import qualified Data.Set as S
import GHC.Generics
import Data.Binary
import Codec.Crypto.RSA.Pure
import Parry.Protocol
import Parry.Util
class Exhaustive j where
depth :: j->Int
class Result j r where
data State j r=State {
jobs::S.Set (Int,j),
ongoing::M.Map Integer (HostName,PublicKey,j,Double,Double),
unemployed::S.Set Integer,
} deriving (Show,Read,Generic)
instance (Binary j,Binary r)=>Binary (State j r)
In a server state {\tt st}, the \emph{current job} of a client is the job
registered in the {\tt ongoing} field of {\tt st}.
We call a client \emph{valid} if, at the same time:
\item \label{valid:newjobs}
Its {\tt NewJobs} messages contain all the results in subjobs of its
current job that have been completely explored, and the subjobs of its current
job that have not been completely explored, divided into three fields: the
results it has found, its next current job, and other subjobs.
\item \label{valid:jobdone}
It does not send a {\tt JobDone} message before the task representing its
current job is completely explored.
The main function, {\tt answer}, keeps track of the clients. We now prove the
following Lemma:
If {\tt st} is a state of the server containing (in the union of {\tt job st}
and {\tt ongoing st}) jobs representing all the tasks that have not yet been explored,
and for any job {\tt j}, {\tt j} and {\tt kill j} represent the same task,
then for any message {\tt m} sent by a valid client, all values of {\tt host}
and {\tt time}, {\tt answer host time st m} (the Haskell syntax for ``the value
of function {\tt answer}, called with arguments {\tt t}, {\tt host}, {\tt st} and
{\tt m}'')
is a couple
$(\mathtt{st'},\mathtt{m'})$, where {\tt st'} is a state of the server
containing the roots of all subtrees that have not yet been explored ({\tt m'}
is the message to be sent to the client).
Moreover, all results sent by the clients are added to the server state using
the {\tt addResult} function.
We prove it for all the cases.
answer::(Exhaustive j,Result j r,Eq j,Ord j,Binary j)=>
Double->String->State j r->ClientMessage j
->(State j r,ServerMessage j)
answer t host st (GetJob num key)=
case M.lookup num (ongoing st) of
Just (ho,key0,j0,t0,_)->
If the client is registered as an ``ongoing'' job, we can simply send it the job
it is supposed to be working on. In this case, the invariant is still
maintained, as we do not change its recorded current job (here, we only update the
time at which we last saw this client).
if ho==host && key0==key then
(st { ongoing=M.insert num (ho,key,j0,t0,t) (ongoing st) },
Job (not $ S.null $ unemployed st) j0)
Else, client {\tt num} is not in the map of ongoing jobs.
If there are no more jobs to be done:
\item if there are no more jobs being worked on, we do not modify the state, and
we tell the client to stop (with a {\tt Finished} message).
\item else, we simply record that job as ``unemployed''. The next time a client
reports its state, it will be asked to share its current job. This does not
change the jobs registered in the server's state anyway.
if S.null (jobs st) then
if M.null (ongoing st) then
(st { unemployed=S.insert num (unemployed st) },Die)
Else, if there are still jobs to be done, we pick any such job (using {\tt
S.deleteFindMin}). According to the documentation of Haskell's {\tt Data.Map}
module, $\mathtt{jobs\ st}$ is equal to
$\{\mathtt{h}\}\cup\mathtt{nextJobs}$. Therefore, since {\tt num} is not a
member of {\tt ongoing st}, the returned state contains, in the union of its
{\tt ongoing} and {\tt jobs} fields, exactly the same jobs as in {\tt st}.
let ((_,h),nextJobs)=S.deleteFindMin (jobs st)
shareIt=killed h>0
(st { jobs=nextJobs,
unemployed=S.delete num (unemployed st),
ongoing=M.insert num (host,key,h,t,t) (ongoing st) },
Job shareIt h)
Another message the server can receive is the {\tt NewJobs} message, when
clients reshare their work:
In this case, the client sends its number {\tt num}, the initial
job {\tt initialJob} it was given, the new job {\tt job} that it will now work
on, a list {\tt js} of jobs that need to be shared, and a list of results.
We can think of this message as equivalent to \emph{``I, valid client {\tt num},
hereby RSA-certify that job {\tt currentJob j} you gave me has subjobs
{\tt newJobs j}, and results {\tt results j}''}.
answer t host st j@(NewJobs {})=
case M.lookup (clientId j) (ongoing st) of
Just (ho,key,j0,t0,_)->
if host==ho && j0==currentJob j then
(st { jobs=foldl' (\s x->S.insert (depth x,x) s) (jobs st) (newJobs j),
ongoing=M.insert (clientId j)
(host,key,nextJob j,t0,t)
(ongoing st),
results=foldl' (addResult host) (results st) (jobResults j) }, Ack)
(st, Die)
If the client is not registered as an ``ongoing job'', this message is ignored,
the state is not modified, and the client is sent the {\tt Die} message.
Else, we assumed that this {\tt NewJobs} message can only be sent by a valid
client. Therefore, it contains all subjobs of its current job that have not
been explored, along with the job it will start working on, and the list of all
results that have been found during the exploration of the other subjobs of its
current job. Since all these subjobs are stored in the {\tt jobs} field of the
state, and the {\tt ongoing} field is updated with the client's new current job,
our claim still holds.
answer _ _ st j@(JobDone {})=
case M.lookup (clientId j) (ongoing st) of
Just (host,_,j0,_,_)->
if j0==currentJob j then
(st { ongoing=M.delete (clientId j) (ongoing st),
results=foldl' (addResult host) (results st) (jobResults j),
solved=solved st+1 }, Ack)
In this case, if the client is not registered as an ongoing job, we do not
modify the state. Else, we can safely delete the corresponding job from the
state, and add its results to the state's results field: indeed, since we
assumed that this message is sent by a valid client, that job has been explored
completely. The intuitive version of this message is \emph{``I, valid client
{\tt num}, hereby RSA-certify that I have explored job {\tt currentJob j}
completely, and that it contains exactly results {\tt results j}''}.
The last case of {\tt answer} is when the client sends an ``Alive'' message:
answer t host st (Alive num)=
case M.lookup num (ongoing st) of
Just (ho,key,j,t0,_)->
if ho==host && (S.null (unemployed st) || (not $ S.null $ jobs st)) then
(st { ongoing=M.insert num (ho,key,j,t0,t) (ongoing st) },Ack)
In this case, the set of jobs is not modified, and hence our claim holds.
Our next task is to prove {\tt reply}, the network interface to the {\tt answer}
function. We first need hypotheses on how this interface works, and especially
how the messages are written and read at the ends of the connection.
A client is \emph{fluent} if the messages it sends on the network are of exactly
two kinds:
\item Messages with a single line containing exactly {\tt Hello}.
\item Messages with two lines:
\item the first line is the encoding via {\tt encode16l} of $m$, where $m$ is
the encoding via {\tt encode} of a constructor of the {\tt ClientMessage} type.
\item the second line is the RSA signature, using the client's private key,
of $m$.
\begin{lemma} \label{lem:reply} If all the clients that have their public key in
{\tt authorizedKeys st}, where {\tt st} is the state of the server, are valid
and fluent, and {\tt st} contains all the jobs that have not been completely
explored (in the {\tt ongoing} and {\tt jobs} fields), then so does it after one
run of {\tt reply}, assuming that $\mathtt{decode}\circ\mathtt{encode}$ (from
Haskell's {\tt Data.Binary} module) is the identity, and
$\mathtt{decode16}\circ\mathtt{encode16l}$ (from module {\tt Parry.Util}) is the
We prove this invariant on the code of the {\tt reply} function, which handles
every connection to our server.\vspace{1em}
reply::(Binary j,Exhaustive j,Result j r,Ord j)=>
MVar (State j r) -> Handle -> HostName -> IO ()
reply state rhandle host=(do
l<-B.hGetLine rhandle
if l==B.pack "Hello" then
modifyMVar_ state $ \st->do
LB.hPutStrLn rhandle $ encode16l $ encode $ newId st
return $ st { newId=newId st+1 }
When the first line is the initial {\tt Hello} message, the claim holds: indeed,
the only field of the server state that is modified is the {\tt newId} one,
which represents the first unused client number.
In all other cases, we do the following:
else do
st<-withMVar state return
sig<-B.hGetLine rhandle
let dec=LB.fromStrict $ decode16 l
msg=decode dec
num=case msg of
GetJob x _->x
JobDone x _ _->x
NewJobs x _ _ _ _->x
Alive x->x
key=case msg of
GetJob _ pub->
if any (==pub) (authorizedKeys st) then
Just pub
_->(case M.lookup num $ ongoing st of
Just (_,pub,_,_,_)->Just pub
We will now verify the message signature, using either the public key registered
for this client in the {\tt ongoing} field of the server's state, or the public
key sent by the client itself, in the case of the {\tt GetJob} message (if that
key is registered in the {\tt authorizedKeys} field of the server state):
message<-case key of
Nothing->return Die
Just pub->
case verify pub dec (LB.fromStrict $ decode16 sig) of
Right True->do
modifyMVar state $ \st0->
let (!a,!b)=answer (realToFrac t) host st0 msg in
return (a,b)
_->return Die
LB.hPutStrLn rhandle (encode16l $ encode message)
Since we assumed that $\mathtt{decode}\circ\mathtt{encode}$ and
$\mathtt{decode16}\circ\mathtt{encode16l}$ are both the identity function,
variable {\tt msg} contains the message sent by the client. Because the client
is valid (because its public key is in the {\tt authorizedKeys} field of the
server state), we can conclude using Lemma \ref{lem:answer} that the invariant
is maintained by the {\tt reply} function, because the only call modifying the
state is a call to {\tt answer}.
The last piece of server code that we need to prove is the {\tt cleanupThread}
function, whose aim is to collect all dead machines. We do need this function,
especially on standard clusters with small walltimes compared to the task.
If the {\tt kill} function, defined on jobs, does not change the task
represented by the job, and {\tt state} is a server state containing (in the
{\tt ongoing} and {\tt jobs} fields) all the jobs that have not been explored,
then so is it after one run of {\tt cleanupThread state}.
In the following function: the state is only modified by partitionning the {\tt
ongoing st} map into two maps {\tt a} and {\tt b}, and adding all the jobs of
{\tt a} to the {\tt jobs st} set, possibly calling {\tt kill} on some of
them. Therefore, the tasks represented by jobs in the union of {\tt jobs st} and
{\tt ongoing st} is not modified.
cleanupThread::(Ord j,Exhaustive j)=>MVar (State j r)->IO ()
cleanupThread state=do {
let { t=realToFrac t_ };
modifyMVar_ state $ \st0->do {
let { (a,b)=M.partition (\(_,_,_,_,t1)->(tt1) > 600) (ongoing st0);
st=st0 { jobs=
M.foldl' (\set (_,_,job,t0,_)->
S.insert (depth job,
if tt0 > 3600 then
kill job
else job) set)
(jobs st0)
ongoing=b }
return st
threadDelay 30000000;
cleanupThread state
Finally, the entry point to our server library is the {\tt server} function:
\item all tasks that have not been completely explored have job representants
in the {\tt ongoing} and {\tt jobs} fields of the {\tt state} argument to {\tt server},
\item all clients that sign their messages with a private RSA key whose corresponding
public key is in the {\tt state} variable are valid and fluent,
\item $\mathtt{decode}\circ\mathtt{encode}$ and
$\mathtt{decode16}\circ\mathtt{encode16l}$ are both the identity function,
then after any number of messages received by the server, variable {\tt state}
also contains jobs representing tasks that have not been completely explored,
in the union of its {\tt ongoing} and {\tt jobs} fields.
Clearly, everything {\tt server} does is calling functions that maintain this
invariant, by Lemmas \ref{lem:reply} for {\tt reply} and \ref{lem:cleanupThread}
for {\tt cleanupThread}.
server::(Ord j, Binary j, Exhaustive j, Result j r)=>
Config->MVar (State j r)->IO ()
server config state=withSocketsDo $ do {
#ifdef UNIX
installHandler sigPIPE Ignore Nothing;
threads<-Sem.new $ maxThreads config;
_<-forkIO $ cleanupThread state;
forever $ do {
E.catch (bracket (listenOn (port config)) sClose $
\sock->forever $ do {
bracket (do { (s,a,_)<-accept sock; wait threads; return (s,a) })
(\(s,_)->do { signal threads; hClose s})
(\(s,a)->reply state s a)
(\e->let _=e::SomeException in appendFile (logFile config) (show e++"\n"));
threadDelay 100000;
data Config=Config {
defaultConfig=Config { port=PortNumber 5129, maxThreads=20, logFile="parry.err" }
initState::(Exhaustive j,Ord j,Result j r)=>[j]->r->IO (MVar (State j r))
initState initial r0=
newMVar $ State { jobs=foldl (\s j->S.insert (depth j,j) s) S.empty initial,
authorizedKeys=[] };
stateFromFile::(Binary r,Binary j,Exhaustive j, Result j r,Ord j)=>FilePath->[j]->r->IO (MVar (State j r))
stateFromFile f initial r0=do {
e<-doesFileExist f;
state<-if e then do {
st<-decodeFile f;
newMVar st
} else initState initial r0;
return state
saveThread::(Binary r,Binary j)=>FilePath->Int->MVar (State j r)->IO ()
saveThread f del state=
let { save=do {
e<-doesFileExist f;
if e then renameFile f (f++".last") else return ();
withMVar state $ \st->LB.writeFile f $ encode st;
threadDelay del;
in save