{-# LANGUAGE ExistentialQuantification, DeriveDataTypeable
, FlexibleInstances, MultiParamTypeClasses, OverloadedStrings, CPP #-}
module Transient.MapReduce
(
Distributable(..),distribute, getText,
getUrl, getFile,textUrl, textFile,
mapKeyB, mapKeyU, reduce,eval,
DDS(..),Partition(..),PartRef(..))
where
#ifdef ghcjs_HOST_OS
import Transient.Base
import Transient.Move hiding (pack)
import Transient.Logged
reduce _ _ = local stop :: Loggable a => Cloud a
mapKeyB _ _= undefined
mapKeyU _ _= undefined
distribute _ = undefined
getText _ _ = undefined
textFile _ = undefined
getUrl _ _ = undefined
textUrl _ = undefined
getFile _ _ = undefined
eval _= local stop
data Partition
data DDS= DDS
class Distributable
data PartRef a=PartRef a
#else
import Transient.Internals hiding (Ref)
import Transient.Move.Internals hiding (pack)
import Transient.Indeterminism
import Control.Applicative
import System.Random
import Control.Monad.IO.Class
import Control.Monad
import Data.Monoid
import Data.Typeable
import Data.List hiding (delete, foldl')
import Control.Exception
import Control.Concurrent
import Network.HTTP
import Data.TCache hiding (onNothing)
import Data.TCache.Defs
import Data.ByteString.Lazy.Char8 (pack,unpack)
import qualified Data.Map.Strict as M
import Control.Arrow (second)
import qualified Data.Vector.Unboxed as DVU
import qualified Data.Vector as DV
import Data.Hashable
import System.IO.Unsafe
import qualified Data.Foldable as F
import qualified Data.Text as Text
import Data.IORef
data DDS a= Loggable a => DDS (Cloud (PartRef a))
data PartRef a= Ref Node Path Save deriving (Typeable, Read, Show)
data Partition a= Part Node Path Save a deriving (Typeable,Read,Show)
type Save= Bool
instance Indexable (Partition a) where
key (Part _ string b _)= keyp string b
keyp s True= "PartP@"++s :: String
keyp s False="PartT@"++s
instance Loggable a => IResource (Partition a) where
keyResource= key
readResourceByKey k= r
where
typePart :: IO (Maybe a) -> a
typePart = undefined
r = if k !! 4 /= 'P' then return Nothing else
defaultReadByKey (defPath (typePart r) ++ k) >>= return . fmap ( read . unpack)
writeResource (s@(Part _ _ save _))=
unless (not save) $ defaultWrite (defPath s ++ key s) (pack $ show s)
eval :: DDS a -> Cloud (PartRef a)
eval (DDS mx) = mx
type Path=String
instance F.Foldable DVU.Vector where
{-# INLINE foldr #-}
foldr = foldr
{-# INLINE foldl #-}
foldl = foldl
{-# INLINE foldr1 #-}
foldr1 = foldr1
{-# INLINE foldl1 #-}
foldl1 = foldl1
class (F.Foldable c, Typeable c, Typeable a, Monoid (c a), Loggable (c a)) => Distributable c a where
singleton :: a -> c a
splitAt :: Int -> c a -> (c a, c a)
fromList :: [a] -> c a
instance (Loggable a) => Distributable DV.Vector a where
singleton = DV.singleton
splitAt= DV.splitAt
fromList = DV.fromList
instance (Loggable a,DVU.Unbox a) => Distributable DVU.Vector a where
singleton= DVU.singleton
splitAt= DVU.splitAt
fromList= DVU.fromList
mapKeyB :: (Loggable a, Loggable b, Loggable k,Ord k)
=> (a -> (k,b))
-> DDS (DV.Vector a)
-> DDS (M.Map k(DV.Vector b))
mapKeyB= mapKey
mapKeyU :: (Loggable a, DVU.Unbox a, Loggable b, DVU.Unbox b, Loggable k,Ord k)
=> (a -> (k,b))
-> DDS (DVU.Vector a)
-> DDS (M.Map k(DVU.Vector b))
mapKeyU= mapKey
mapKey :: (Distributable vector a,Distributable vector b, Loggable k,Ord k)
=> (a -> (k,b))
-> DDS (vector a)
-> DDS (M.Map k (vector b))
mapKey f (DDS mx)= DDS $ loggedc $ do
refs <- mx
process refs
where
process (ref@(Ref node path sav))= runAt node $ local $ do
xs <- getPartitionData ref
(generateRef $ map1 f xs)
map1 f v= F.foldl' f1 M.empty v
where
f1 map x=
let (k,r) = f x
in M.insertWith (<>) k (Transient.MapReduce.singleton r) map
data ReduceChunk a= EndReduce | Reduce a deriving (Typeable, Read, Show)
boxids= unsafePerformIO $ newIORef 0
reduce :: (Hashable k,Ord k, Distributable vector a, Loggable k,Loggable a)
=> (a -> a -> a) -> DDS (M.Map k (vector a)) ->Cloud (M.Map k a)
reduce red (dds@(DDS mx))= loggedc $ do
mboxid <- localIO $ atomicModifyIORef boxids $ \n -> let n'= n+1 in (n',n')
nodes <- local getEqualNodes
let lengthNodes = length nodes
shuffler nodes = do
localIO $ threadDelay 100000
ref@(Ref node path sav) <- mx
runAt node $ foldAndSend node nodes ref
stop
groupByDestiny map = M.foldlWithKey' f M.empty map
where
f map k vs= M.insertWith (<>) (hash1 k) [(k,vs)] map
hash1 k= abs $ hash k `rem` length nodes
foldAndSend node nodes ref= do
pairs <- onAll $ getPartitionData1 ref
<|> return (error $ "DDS computed out of his node:"++ show ref )
let mpairs = groupByDestiny pairs
length <- local . return $ M.size mpairs
let port2= nodePort node
if length == 0 then sendEnd nodes else do
nsent <- onAll $ liftIO $ newMVar 0
(i,folded) <- local $ parallelize foldthem (M.assocs mpairs)
n <- localIO $ modifyMVar nsent $ \r -> return (r+1, r+1)
(runAt (nodes !! i) $ local $ putMailbox' mboxid (Reduce folded))
!> ("send",n,length,i,folded)
when (n == length) $ sendEnd nodes
empty
where
foldthem (i,kvs)= async . return
$ (i,map (\(k,vs) -> (k,foldl1 red vs)) kvs)
sendEnd nodes = onNodes nodes $ local
$ putMailbox' mboxid (EndReduce `asTypeOf` paramOf dds)
onNodes nodes f = foldr (<|>) empty $ map (\n -> runAt n f) nodes
sumNodes nodes f= foldr (<>) mempty $ map (\n -> runAt n f) nodes
reducer nodes= sumNodes nodes reduce1
reduce1 = local $ do
reduceResults <- liftIO $ newMVar M.empty
numberSent <- liftIO $ newMVar 0
minput <- getMailbox' mboxid
case minput of
EndReduce -> do
n <- liftIO $ modifyMVar numberSent $ \r -> let r'= r+1 in return (r', r')
if n == lengthNodes
then do
cleanMailbox' mboxid (EndReduce `asTypeOf` paramOf dds)
r <- liftIO $ readMVar reduceResults
return r
else stop
Reduce kvs -> do
let addIt (k,inp) = do
let input= inp `asTypeOf` atype dds
liftIO $ modifyMVar_ reduceResults
$ \map -> do
let maccum = M.lookup k map
return $ M.insert k (case maccum of
Just accum -> red input accum
Nothing -> input) map
mapM addIt (kvs `asTypeOf` paramOf' dds)
!> ("Received Reduce",kvs)
stop
reducer nodes <|> shuffler nodes
where
atype ::DDS(M.Map k (vector a)) -> a
atype = undefined
paramOf :: DDS (M.Map k (vector a)) -> ReduceChunk [( k, a)]
paramOf = undefined
paramOf' :: DDS (M.Map k (vector a)) -> [( k, a)]
paramOf' = undefined
parallelize f xs = foldr (<|>) empty $ map f xs
mparallelize f xs = loggedc $ foldr (<>) mempty $ map f xs
getPartitionData :: Loggable a => PartRef a -> TransIO a
getPartitionData (Ref node path save) = Transient $ do
mp <- (liftIO $ atomically
$ readDBRef
$ getDBRef
$ keyp path save)
`onNothing` error ("not found DDS data: "++ keyp path save)
case mp of
(Part _ _ _ xs) -> return $ Just xs
getPartitionData1 :: Loggable a => PartRef a -> TransIO a
getPartitionData1 (Ref node path save) = Transient $ do
mp <- liftIO $ atomically
$ readDBRef
$ getDBRef
$ keyp path save
case mp of
Just (Part _ _ _ xs) -> return $ Just xs
Nothing -> return Nothing
getPartitionData2 :: Loggable a => PartRef a -> IO a
getPartitionData2 (Ref node path save) = do
mp <- ( atomically
$ readDBRef
$ getDBRef
$ keyp path save)
`onNothing` error ("not found DDS data: "++ keyp path save)
case mp of
(Part _ _ _ xs) -> return xs
runAtP :: Loggable a => Node -> (Path -> IO a) -> Path -> Cloud a
runAtP node f uuid= do
r <- runAt node $ onAll . liftIO $ (SLast <$> f uuid) `catch` sendAnyError
case r of
SLast r -> return r
SError e -> do
nodes <- mclustered $ search uuid
when(length nodes < 1) $ asyncDuplicate node uuid
runAtP ( head nodes) f uuid
search uuid= error $ "chunk failover not yet defined. Lookin for: "++ uuid
asyncDuplicate node uuid= do
forkTo node
nodes <- onAll getEqualNodes
let node'= head $ nodes \\ [node]
content <- onAll . liftIO $ readFile uuid
runAt node' $ local $ liftIO $ writeFile uuid content
sendAnyError :: SomeException -> IO (StreamData a)
sendAnyError e= return $ SError e
distribute :: (Loggable a, Distributable vector a ) => vector a -> DDS (vector a)
distribute = DDS . distribute'
distribute' xs= loggedc $ do
nodes <- local getEqualNodes
let lnodes = length nodes
let size= case F.length xs `div` (length nodes) of 0 ->1 ; n -> n
xss= split size lnodes 1 xs
r <- distribute'' xss nodes
return r
where
split n s s' xs | s==s' = [xs]
split n s s' xs=
let (h,t)= Transient.MapReduce.splitAt n xs
in h : split n s (s'+1) t
distribute'' :: (Loggable a, Distributable vector a)
=> [vector a] -> [Node] -> Cloud (PartRef (vector a))
distribute'' xss nodes =
parallelize move $ zip nodes xss
where
move (node, xs)= runAt node $ local $ do
par <- generateRef xs
return par
getText :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a)
getText part str= DDS $ loggedc $ do
nodes <- local getEqualNodes
let lnodes = length nodes
parallelize (process lnodes) $ zip nodes [0..lnodes-1]
where
process lnodes (node,i)=
runAt node $ local $ do
let xs = part str
size= case length xs `div` lnodes of 0 ->1 ; n -> n
xss= Transient.MapReduce.fromList $
if i== lnodes-1 then drop (i* size) xs else take size $ drop (i * size) xs
generateRef xss
textUrl :: String -> DDS (DV.Vector Text.Text)
textUrl= getUrl (map Text.pack . words)
getUrl :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a)
getUrl partitioner url= DDS $ do
nodes <- local getEqualNodes
let lnodes = length nodes
parallelize (process lnodes) $ zip nodes [0..lnodes-1]
where
process lnodes (node,i)= runAt node $ local $ do
r <- liftIO . simpleHTTP $ getRequest url
body <- liftIO $ getResponseBody r
let xs = partitioner body
size= case length xs `div` lnodes of 0 ->1 ; n -> n
xss= Transient.MapReduce.fromList $
if i== lnodes-1 then drop (i* size) xs else take size $ drop (i * size) xs
generateRef xss
textFile :: String -> DDS (DV.Vector Text.Text)
textFile= getFile (map Text.pack . words)
getFile :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a)
getFile partitioner file= DDS $ do
nodes <- local getEqualNodes
let lnodes = length nodes
parallelize (process lnodes) $ zip nodes [0..lnodes-1]
where
process lnodes (node, i)= runAt node $ local $ do
content <- do
c <- liftIO $ readFile file
length c `seq` return c
let xs = partitioner content
size= case length xs `div` lnodes of 0 ->1 ; n -> n
xss= Transient.MapReduce.fromList $
if i== lnodes-1 then drop (i* size) xs else take size $ drop (i * size) xs
generateRef xss
generateRef :: Loggable a => a -> TransIO (PartRef a)
generateRef x= do
node <- getMyNode
liftIO $ do
temp <- getTempName
let reg= Part node temp False x
atomically $ newDBRef reg
(return $ getRef reg)
getRef (Part n t s x)= Ref n t s
getTempName :: IO String
getTempName= ("DDS" ++) <$> replicateM 5 (randomRIO ('a','z'))
streamDDS
:: (Loggable a, Distributable vector a) =>
Integer -> IO (StreamData a) -> DDS (vector a)
streamDDS time io= DDS $ do
xs <- local . groupByTime time $ do
r <- parallel io
case r of
SDone -> empty
SLast x -> return x
SMore x -> return x
SError e -> error $ show e
distribute' $ Transient.MapReduce.fromList xs
#endif