Copyright | (c) Gree, Inc. 2013 |
---|---|
License | MIT-style |
Maintainer | Kiyoshi Ikehara <kiyoshi.ikehara@gree.net> |
Stability | experimental |
Portability | portable |
Safe Haskell | None |
Language | Haskell98 |
Haskell JobQueue is a library used for building a job scheduler with a priority queue. The state of a job is stored in a backend database such as Apache Zookeeper or other highly reliable mesage queue systems.
Unit
Unit represents each state in an entire state machine. Units are described as value
constructors in Haskell code.
Unit itself is not executable. To execute using job queue system, extra information such
as job identifier, scheduled time is needed. An instance of a unit is wrapped by a job
and stored into the job queue with those information.
The code shown below describes how to define a Unit.
data JobUnit = HelloStep | WorldStep deriving (Show, Read) instance Unit JobUnit where
In this case, you define JobUnit type with 2 states, HelloStep and WorldStep. This is the entire state machine of your job queue system. You can define nested or child state machines by defining more complex data types as long as they are serializable with read and show functions.
For more information, see Network.JobQueue.Class.
Job
Each task executed by state machines (such as checking server state or repairing a
cluster) is called a job
.
A job is described as a particular state of a state machine. Each state only does one thing (especially for modifying operations). This prevents jobs ending in a failure state, which the state machine is unable to handle.
You don't have to know the internal data structure of a job, but need to understand its when you write action code.
For more information, see Network.JobQueue.Job.
Environment
Each unit can contain information used in the action of the state. But in many cases, there is some information used by almost all states and it is convenient if there is some kind of global data set that is accessible from all the state's actions.
For this reason, you can define global data structures called environment. The enviroment can be retrieved using getEnv function in action monad.
env <- getEnv
For more information, see Network.JobQueue.Class.
Action
An action is a function that is called with a unit. You can define actions with the "process" function.
let withJobQueue = buildJobQueue loc name $ do process $ \WorldStep -> commitIO (putStrLn "world") >> fin process $ \HelloStep -> commitIO (putStr "hello, ") >> next WorldStep
In general, an action does the following things:
- check if the precondition of the state is satisfied or not
- do the action associated with the state
- check the postcondition and return the next state.
For more information, see Network.JobQueue.Action.
- buildJobQueue :: (Env e, Unit a) => String -> String -> JobM e a () -> (JobQueue e a -> IO ()) -> IO ()
- runJobQueue :: (Env e, Unit a) => e -> String -> String -> JobM e a () -> IO ()
- data Job a = StopTheWorld
- data JobState
- class (Read a, Show a, Desc a, Eq a) => Unit a where
- getPriority :: a -> Int
- getRecovery :: a -> a
- data ActionM e a b
- data (Env e, Unit a) => JobM e a b
- data JobActionState e a
- data Alert
- process :: (Env e, Unit a) => (a -> ActionM e a ()) -> JobM e a ()
- createJob :: Unit a => JobState -> a -> IO (Job a)
- fin :: (Env e, Unit a) => ActionM e a ()
- none :: (Env e, Unit a) => ActionM e a ()
- next :: (Env e, Unit a) => a -> ActionM e a ()
- fork :: (Env e, Unit a) => a -> ActionM e a ()
- forkInTime :: (Env e, Unit a) => NominalDiffTime -> a -> ActionM e a ()
- forkOnTime :: (Env e, Unit a) => UTCTime -> a -> ActionM e a ()
- abort :: (Env e, Unit a) => Alert -> String -> ActionM e a b
- getEnv :: (Env e, Unit a) => ActionM e a e
- param :: (ParamEnv e, Unit a, Read b) => (String, String) -> ActionM e a b
- logMsg :: (Env e, Unit a) => Alert -> String -> ActionM e a ()
- commitIO :: (Env e, Unit a) => IO b -> ActionM e a b
- module Network.JobQueue.Class
- module Network.JobQueue.JobQueue
Documentation
:: (Env e, Unit a) | |
=> String | locator (ex."zookeeper://192.168.0.1/myapp") |
-> String | queue name (ex. "/jobqueue") |
-> JobM e a () | job construction function |
-> (JobQueue e a -> IO ()) -> IO () | job queue executor |
Build a function that takes a function ((JobQueue
a -> IO
()) -> IO ()) as its first parameter.
The following code executes jobs as long as the queue is not empty.
main' loc name = do let withJobQueue = buildJobQueue loc name $ do process $ \WorldStep -> commitIO (putStrLn "world") >> fin process $ \HelloStep -> commitIO (putStr "hello, ") >> next WorldStep withJobQueue $ loop (initJobEnv loc name []) where loop env jq = do executeJob jq env count <- countJobQueue jq when (count > 0) $ loop env jq
The following code registers a job with initial state.
main' loc name = do let withJobQueue = buildJobQueue loc name $ do process $ \WorldStep -> commitIO (putStrLn "world") >> fin process $ \HelloStep -> commitIO (putStr "hello, ") >> next WorldStep withJobQueue $ \jq -> scheduleJob jq HelloStep
:: (Env e, Unit a) | |
=> e | |
-> String | locator (ex."zookeeper://192.168.0.1/myapp") |
-> String | queue name (ex. "/jobqueue") |
-> JobM e a () | job construction function |
-> IO () |
Run a job queue while there is at least one job in the queue.
Job control block Job consists of State, Unit, CTime, OnTime, Id, Group, and Priority.
- State - takes one of 5 states (initialized, runnable, running, aborted and finished)
- Unit - an instance of Unit class, which is specified by type parameter of Job data type
- CTime - creation time
- OnTime - the time at which this job starts
- Id - Identifier of this job
- Group - Group ID of this job
- Priority - the priority of this job
class (Read a, Show a, Desc a, Eq a) => Unit a where Source
Unit class
Nothing
getPriority :: a -> Int Source
Define the priority of a unit.
getRecovery :: a -> a Source
Define the recovery state of a unit.
MonadError ActionError (ActionM e a) | |
Monad (ActionM e a) | |
Functor (ActionM e a) | |
MonadIO (ActionM e a) | |
MonadReader (ActionEnv e a) (ActionM e a) |
data (Env e, Unit a) => JobM e a b Source
Monad (JobM e a) | |
Functor (JobM e a) | |
MonadIO (JobM e a) | |
MonadState (JobActionState e a) (JobM e a) |
data JobActionState e a Source
Default (JobActionState e a) | |
MonadState (JobActionState e a) (JobM e a) |
process :: (Env e, Unit a) => (a -> ActionM e a ()) -> JobM e a () Source
Declare a function which accepts a unit and execute the action of it if possible.
none :: (Env e, Unit a) => ActionM e a () Source
If the unit passed by the job queue system cannot be processed by the action function, the function should call this.
Move to the next state immediately. After the execution of the action the job being processed will be moved to the given state. The next action will be invoked immediately and can continue to work without being interrupted by another job.
forkInTime :: (Env e, Unit a) => NominalDiffTime -> a -> ActionM e a () Source
Create a job with a unit and schedule it after a few micro seconds.
Create a job with a unit and schedule it at a specific time.
abort :: (Env e, Unit a) => Alert -> String -> ActionM e a b Source
Abort the execution of a state machine. If a critical problem is found and there is a need to switch to the failure state, call this function with a human readable meassage.
param :: (ParamEnv e, Unit a, Read b) => (String, String) -> ActionM e a b Source
Get a parameter value with a key from the environment in action. This is a special function for ParamEnv.
logMsg :: (Env e, Unit a) => Alert -> String -> ActionM e a () Source
Put a message to syslog daemon.
commitIO :: (Env e, Unit a) => IO b -> ActionM e a b Source
Do a dirty I/O action to the external system. If it doesn't change the state of the external system, you can use liftIO instead.
module Network.JobQueue.Class
module Network.JobQueue.JobQueue