{-# LANGUAGE ExistentialQuantification, DeriveDataTypeable
, FlexibleInstances, MultiParamTypeClasses, OverloadedStrings, CPP #-}
module Transient.MapReduce
(
Distributable(..),distribute, getText,
getUrl, getFile,textUrl, textFile,
mapKeyB, mapKeyU, reduce,eval,
DDS(..),Partition(..),PartRef(..))
where
#ifdef ghcjs_HOST_OS
import Transient.Base
import Transient.Move hiding (pack)
import Transient.Logged hiding (hash)
reduce _ _ = local stop :: Loggable a => Cloud a
mapKeyB _ _= undefined
mapKeyU _ _= undefined
distribute _ = undefined
getText _ _ = undefined
textFile _ = undefined
getUrl _ _ = undefined
textUrl _ = undefined
getFile _ _ = undefined
eval _= local stop
data Partition
data DDS= DDS
class Distributable
data PartRef a=PartRef a
#else
import Transient.Internals hiding (Ref)
import Transient.Mailboxes
import Transient.Parse
import Transient.Logged
import Transient.Move.Internals hiding (pack)
import Transient.Indeterminism
import Control.Applicative
import System.Random
import Control.Monad.State
import Control.Monad
import Data.Monoid
import Data.Typeable
import Data.Foldable
import Data.List hiding (delete, foldl')
import Control.Exception
import Control.Concurrent
import Network.HTTP
import Data.TCache hiding (onNothing)
import Data.TCache.Defs hiding (serialize,deserialize)
import Data.ByteString.Lazy.Char8 (pack,unpack, toStrict)
import Data.ByteString.Builder
import qualified Data.Map.Strict as M
import Control.Arrow (second)
import qualified Data.Vector.Unboxed as DVU
import qualified Data.Vector as DV
import Data.Hashable
import System.IO.Unsafe
import qualified Data.Foldable as F
import qualified Data.Text as Text
import Data.Text.Encoding
import Data.IORef
data DDS a= Loggable a => DDS (Cloud (PartRef a))
data PartRef a= Ref Node Path Save deriving (Typeable, Read, Show)
data Partition a= Part Node Path Save a deriving (Typeable,Read,Show)
type Save= Bool
instance Loggable Text.Text where
serialize t= byteString (encodeUtf8 t)
deserialize = tTakeWhile (/= '/') >>= return . decodeUtf8 . toStrict
instance Typeable a => Loggable (PartRef a) where
serialize(Ref node path save)= serialize node <> "/" <> serialize path <> "/" <> serialize save
deserialize= Ref <$> (deserialize <* slash)
<*> (deserialize <* slash)
<*> (deserialize)
where
slash= tChar '/'
instance Indexable (Partition a) where
key (Part _ string b _)= keyp string b
keyp s True= "PartP@"++s :: String
keyp s False="PartT@"++s
instance Loggable a => IResource (Partition a) where
keyResource= key
readResourceByKey k= r
where
typePart :: IO (Maybe a) -> a
typePart = undefined
r = if k !! 4 /= 'P' then return Nothing else
defaultReadByKey (defPath (typePart r) ++ k) >>= return . fmap ( read . unpack)
writeResource (s@(Part _ _ save _))=
unless (not save) $ defaultWrite (defPath s ++ key s) (pack $ show s)
eval :: DDS a -> Cloud (PartRef a)
eval (DDS mx) = mx
type Path=String
instance F.Foldable DVU.Vector where
{-# INLINE foldr #-}
foldr = foldr
{-# INLINE foldl #-}
foldl = foldl
{-# INLINE foldr1 #-}
foldr1 = foldr1
{-# INLINE foldl1 #-}
foldl1 = foldl1
class (F.Foldable c, Monoid (c a), Loggable (c a),Typeable c, Typeable a) => Distributable c a where
singleton :: a -> c a
splitAt :: Int -> c a -> (c a, c a)
fromList :: [a] -> c a
instance Loggable a => Loggable (DV.Vector a) where
serialize v= intDec (DV.length v) <> "/" <> foldl' (\s x -> s <> "/" <> serialize x ) mempty v
deserialize= do
len <- int
DV.replicateM len $ tChar '/' *> deserialize
instance (Typeable a, Loggable a) => Distributable DV.Vector a where
singleton = DV.singleton
splitAt= DV.splitAt
fromList = DV.fromList
instance (Loggable a,DVU.Unbox a) => Loggable (DVU.Vector a) where
serialize v= intDec (DVU.length v) <> "/" <> serialize(v DVU.! 0) <> DVU.ifoldl' (\s _ x -> s <> "/" <> serialize x) mempty (DVU.slice 1 (DVU.length v -1) v)
deserialize= do
len <- int
DVU.replicateM len $ tChar '/' *> deserialize
instance (Typeable a, Loggable a, DVU.Unbox a) => Distributable DVU.Vector a where
singleton= DVU.singleton
splitAt= DVU.splitAt
fromList= DVU.fromList
mapKeyB :: (Typeable a, Loggable a, Typeable b,Loggable b, Typeable k, Loggable k,Ord k)
=> (a -> (k,b))
-> DDS (DV.Vector a)
-> DDS (M.Map k(DV.Vector b))
mapKeyB= mapKey
mapKeyU :: (Typeable a, Loggable a, DVU.Unbox a, Typeable b, Loggable b, DVU.Unbox b, Typeable k, Loggable k,Ord k)
=> (a -> (k,b))
-> DDS (DVU.Vector a)
-> DDS (M.Map k(DVU.Vector b))
mapKeyU= mapKey
mapKey :: (Distributable container a,Distributable container b, Typeable k, Loggable k,Ord k)
=> (a -> (k,b))
-> DDS (container a)
-> DDS (M.Map k (container b))
mapKey f (DDS mx)= DDS $ loggedc $ do
refs <- mx
process refs !> ("process",refs)
where
process (ref@(Ref node path sav))= runAt node $ local $ do
xs <- getPartitionData ref !> ("CMAP", ref,node)
(generateRef $ map1 f xs)
map1 f v= F.foldl' f1 M.empty v
where
f1 map x=
let (k,r) = f x
in M.insertWith (<>) k (Transient.MapReduce.singleton r) map
data ReduceChunk a= EndReduce | Reduce a deriving (Typeable, Read, Show)
boxids= unsafePerformIO $ newIORef (0 :: Int)
reduce :: (Hashable k,Ord k, Distributable container a, Typeable k, Loggable k, Typeable a, Loggable a)
=> (a -> a -> a) -> DDS (M.Map k (container a)) ->Cloud (M.Map k a)
reduce red (dds@(DDS mx))= loggedc $ do
mboxid <- localIO $ atomicModifyIORef boxids $ \n -> let n'= n+1 in (n',n')
nodes <- local getEqualNodes
let lengthNodes = length nodes
shuffler nodes = do
localIO $ threadDelay 100000
ref@(Ref node path sav) <- mx
runAt node $ do
localIO $ return () !> ("FOLDANDSEND","REF",ref, "runAt", node)
foldAndSend node nodes ref
stop
groupByDestiny map = M.foldlWithKey' f M.empty map
where
f map k vs= M.insertWith (<>) (hash1 k) [(k,vs)] map
hash1 k= abs $ hash k `rem` length nodes
foldAndSend node nodes ref= do
pairs <- onAll $ getPartitionData1 ref
<|> return (error $ "DDS computed out of his node:"++ show ref )
let mpairs = groupByDestiny pairs
length <- local . return $ M.size mpairs
let port2= nodePort node
if length == 0 then sendEnd nodes else do
nsent <- onAll $ liftIO $ newMVar 0
(i,folded) <- local $ parallelize foldthem (M.assocs mpairs)
n <- localIO $ modifyMVar nsent $ \r -> return (r+1, r+1) :: IO (Int,Int)
(runAt (nodes !! i) $ do
return () !> "JUST BEFORE PUTMAILBOX"
local $ (putMailbox' mboxid (Reduce folded `asTypeOf` paramOf dds))
!> ("PUTMAILBOX SENT ",n,length,i,folded))
when (n == length) $ sendEnd nodes
return ()
empty
where
foldthem (i,kvs)= async . return
$ (i,map (\(k,vs) -> (k,foldl1 red vs)) kvs)
sendEnd nodes = do
onNodes nodes $ local $
putMailbox' mboxid (EndReduce `asTypeOf` paramOf dds)
onNodes nodes f = foldr (<|>) empty $ map (\n -> runAt n f ) nodes
sumNodes nodes f= foldr (<>) mempty $ map (\n -> runAt n f ) nodes
reducer nodes= sumNodes nodes reduce1
reduce1 =local $ do
reduceResults <- liftIO $ newMVar M.empty
numberSent <- liftIO $ newMVar 0
return () !> "GETMAILBOX"
minput <- getMailbox' mboxid
case minput of
EndReduce -> do
return () !> "ENDREDUCE"
n <- liftIO $ modifyMVar numberSent $ \r -> let r'= r+1 in return (r', r')
if n == lengthNodes
!> ("END REDUCE RECEIVEDDDD",n, lengthNodes)
then do
deleteMailbox' mboxid (EndReduce `asTypeOf` paramOf dds)
r <- liftIO $ readMVar reduceResults
return r !> ("RETURNING",r)
else stop
Reduce kvs -> do
return () !> "REDUCE RECEIVEDDDDDDDDDDDD"
let addIt (k,inp) = do
let input= inp `asTypeOf` atype dds
liftIO $ modifyMVar_ reduceResults
$ \map -> do
let maccum = M.lookup k map
return $ M.insert k (case maccum of
Just accum -> red input accum
Nothing -> input) map
mapM addIt (kvs `asTypeOf` paramOf' dds)
!> ("RECEIVED REDUCEEEEEEEEEEEEE",kvs)
stop
r <- reducer nodes <|> shuffler nodes
localIO $ return () !> "RETRETRET"
return r
where
atype ::DDS(M.Map k (container a)) -> a
atype = undefined
paramOf :: DDS (M.Map k (container a)) -> ReduceChunk [( k, a)]
paramOf = undefined
paramOf' :: DDS (M.Map k (container a)) -> [( k, a)]
paramOf' = undefined
parallelize f xs = foldr (<|>) empty $ map f xs
mparallelize f xs = loggedc $ foldr (<>) mempty $ map f xs
getPartitionData :: (Typeable a, Loggable a) => PartRef a -> TransIO a
getPartitionData (Ref node path save) = Transient $ do
mp <- (liftIO $ atomically
$ readDBRef
$ getDBRef
$ keyp path save)
`onNothing` error ("not found DDS data: "++ keyp path save)
case mp of
(Part _ _ _ xs) -> return $ Just xs
getPartitionData1 :: (Typeable a, Loggable a) => PartRef a -> TransIO a
getPartitionData1 (Ref node path save) = Transient $ do
mp <- liftIO $ atomically
$ readDBRef
$ getDBRef
$ keyp path save
case mp of
Just (Part _ _ _ xs) -> return $ Just xs
Nothing -> return Nothing
getPartitionData2 :: (Typeable a,Loggable a) => PartRef a -> IO a
getPartitionData2 (Ref node path save) = do
mp <- ( atomically
$ readDBRef
$ getDBRef
$ keyp path save)
`onNothing` error ("not found DDS data: "++ keyp path save)
case mp of
(Part _ _ _ xs) -> return xs
runAtP :: Loggable a => Node -> (Path -> IO a) -> Path -> Cloud a
runAtP node f uuid= do
r <- runAt node $ onAll . liftIO $ (SLast <$> f uuid) `catch` sendAnyError
case r of
SLast r -> return r
SError e -> do
nodes <- mclustered $ search uuid
when(length nodes < 1) $ asyncDuplicate node uuid
runAtP ( head nodes) f uuid
search uuid= error $ "chunk failover not yet defined. Lookin for: "++ uuid
asyncDuplicate node uuid= do
forkTo node
nodes <- onAll getEqualNodes
let node'= head $ nodes \\ [node]
content <- onAll . liftIO $ readFile uuid
runAt node' $ local $ liftIO $ writeFile uuid content
sendAnyError :: SomeException -> IO (StreamData a)
sendAnyError e= return $ SError e
distribute :: (Loggable a, Distributable container a ) => container a -> DDS (container a)
distribute = DDS . distribute'
distribute' xs= loggedc $ do
nodes <- local getEqualNodes
let lnodes = length nodes
let size= case F.length xs `div` (length nodes) of 0 ->1 ; n -> n
xss= split size lnodes 1 xs
r <- distribute'' xss nodes
return r
where
split n s s' xs | s==s' = [xs]
split n s s' xs=
let (h,t)= Transient.MapReduce.splitAt n xs
in h : split n s (s'+1) t
distribute'' :: (Loggable a, Distributable container a)
=> [container a] -> [Node] -> Cloud (PartRef (container a))
distribute'' xss nodes =
parallelize move $ zip nodes xss
where
move (node, xs)= runAt node $ local $ do
par <- generateRef xs
return par
!> ("move", node,xs)
getText :: (Loggable a, Distributable container a) => (String -> [a]) -> String -> DDS (container a)
getText part str= DDS $ loggedc $ do
nodes <- local getEqualNodes !> "getText"
return () !> ("DISTRIBUTE TEXT IN NODES:",nodes)
let lnodes = length nodes
parallelize (process lnodes) $ zip nodes [0..lnodes-1]
where
process lnodes (node,i)=
runAt node $ local $ do
let xs = part str
size= case length xs `div` lnodes of 0 ->1 ; n -> n
xss= Transient.MapReduce.fromList $
if i== lnodes-1 then drop (i* size) xs else take size $ drop (i * size) xs
generateRef xss
!> "GETTEXT PROCESS"
textUrl :: String -> DDS (DV.Vector Text.Text)
textUrl= getUrl (map Text.pack . words)
getUrl :: (Loggable a, Distributable container a) => (String -> [a]) -> String -> DDS (container a)
getUrl partitioner url= DDS $ do
nodes <- local getEqualNodes
let lnodes = length nodes
parallelize (process lnodes) $ zip nodes [0..lnodes-1]
where
process lnodes (node,i)= runAt node $ local $ do
r <- liftIO . simpleHTTP $ getRequest url
body <- liftIO $ getResponseBody r
let xs = partitioner body
size= case length xs `div` lnodes of 0 ->1 ; n -> n
xss= Transient.MapReduce.fromList $
if i== lnodes-1 then drop (i* size) xs else take size $ drop (i * size) xs
generateRef xss
!> "GETURL"
textFile :: String -> DDS (DV.Vector Text.Text)
textFile= getFile (map Text.pack . words)
getFile :: (Loggable a, Distributable container a) => (String -> [a]) -> String -> DDS (container a)
getFile partitioner file= DDS $ do
nodes <- local getEqualNodes
let lnodes = length nodes
parallelize (process lnodes) $ zip nodes [0..lnodes-1]
where
process lnodes (node, i)= runAt node $ local $ do
content <- do
c <- liftIO $ readFile file
length c `seq` return c
let xs = partitioner content
size= case length xs `div` lnodes of 0 ->1 ; n -> n
xss= Transient.MapReduce.fromList $
if i== lnodes-1 then drop (i* size) xs else take size $ drop (i * size) xs
generateRef xss
!> "GETFILE"
generateRef :: (Typeable a, Loggable a) => a -> TransIO (PartRef a)
generateRef x= do
node <- getMyNode
liftIO $ do
temp <- getTempName
let reg= Part node temp True x
atomically $ newDBRef reg
(return $ getRef reg)
getRef (Part n t s x)= Ref n t s
getTempName :: IO String
getTempName= ("DDS" ++) <$> replicateM 5 (randomRIO ('a','z'))
streamDDS
:: (Loggable a, Distributable container a) =>
Int -> IO (StreamData a) -> DDS (container a)
streamDDS time io= DDS $ do
xs <- local . groupByTime time $ do
r <- parallel io
case r of
SDone -> empty
SLast x -> return [x]
SMore x -> return [x]
SError e -> error $ show e
distribute' $ Transient.MapReduce.fromList xs
#endif