{-# LANGUAGE ExistentialQuantification, DeriveDataTypeable , FlexibleInstances, MultiParamTypeClasses, OverloadedStrings, CPP #-} module Transient.MapReduce ( Distributable(..),distribute, getText, getUrl, getFile,textUrl, textFile, mapKeyB, mapKeyU, reduce,eval, -- * internals DDS(..),Partition(..),PartRef(..)) where #ifdef ghcjs_HOST_OS import Transient.Base import Transient.Move hiding (pack) import Transient.Logged hiding (hash) -- dummy Transient.MapReduce module, reduce _ _ = local stop :: Loggable a => Cloud a mapKeyB _ _= undefined mapKeyU _ _= undefined distribute _ = undefined getText _ _ = undefined textFile _ = undefined getUrl _ _ = undefined textUrl _ = undefined getFile _ _ = undefined eval _= local stop data Partition data DDS= DDS class Distributable data PartRef a=PartRef a #else import Transient.Internals hiding (Ref) import Transient.Mailboxes import Transient.Parse import Transient.Logged import Transient.Move.Internals hiding (pack) import Transient.Indeterminism import Control.Applicative import System.Random import Control.Monad.State import Control.Monad import Data.Monoid import Data.Typeable import Data.Foldable import Data.List hiding (delete, foldl') import Control.Exception import Control.Concurrent --import Data.Time.Clock import Network.HTTP import Data.TCache hiding (onNothing) import Data.TCache.Defs hiding (serialize,deserialize) import Data.ByteString.Lazy.Char8 (pack,unpack, toStrict) import Data.ByteString.Builder import qualified Data.Map.Strict as M import Control.Arrow (second) import qualified Data.Vector.Unboxed as DVU import qualified Data.Vector as DV import Data.Hashable import System.IO.Unsafe import qualified Data.Foldable as F import qualified Data.Text as Text import Data.Text.Encoding import Data.IORef -- | a DDS contains a distrib. computation which return a (non-deterministic/stream/set of) -- link/s to the generated chunk/s of data, in different nodes, thanks to the non-deterministic -- and multithreaded nature of the Transient/Cloud comp. data DDS a= Loggable a => DDS (Cloud (PartRef a)) -- | a link to a chunk of data, located in a node data PartRef a= Ref Node Path Save deriving (Typeable, Read, Show) -- | the chunk of data loaded in memory data Partition a= Part Node Path Save a deriving (Typeable,Read,Show) type Save= Bool instance Loggable Text.Text where serialize t= byteString (encodeUtf8 t) deserialize = tTakeWhile (/= '/') >>= return . decodeUtf8 . toStrict instance Typeable a => Loggable (PartRef a) where serialize(Ref node path save)= serialize node <> "/" <> serialize path <> "/" <> serialize save -- <> "/" deserialize= Ref <$> (deserialize <* slash) <*> (deserialize <* slash) <*> (deserialize) where slash= tChar '/' instance Indexable (Partition a) where key (Part _ string b _)= keyp string b keyp s True= "PartP@"++s :: String keyp s False="PartT@"++s instance Loggable a => IResource (Partition a) where keyResource= key readResourceByKey k= r where typePart :: IO (Maybe a) -> a typePart = undefined r = if k !! 4 /= 'P' then return Nothing else defaultReadByKey (defPath (typePart r) ++ k) >>= return . fmap ( read . unpack) writeResource (s@(Part _ _ save _))= unless (not save) $ defaultWrite (defPath s ++ key s) (pack $ show s) eval :: DDS a -> Cloud (PartRef a) eval (DDS mx) = mx type Path=String instance F.Foldable DVU.Vector where {-# INLINE foldr #-} foldr = foldr {-# INLINE foldl #-} foldl = foldl {-# INLINE foldr1 #-} foldr1 = foldr1 {-# INLINE foldl1 #-} foldl1 = foldl1 --foldlIt' :: V.Unbox a => (b -> a -> b) -> b -> V.Vector a -> b --foldlIt' f z0 xs= V.foldr f' id xs z0 -- where f' x k z = k $! f z x -- --foldlIt1 :: V.Unbox a => (a -> a -> a) -> V.Vector a -> a --foldlIt1 f xs = fromMaybe (error "foldl1: empty structure") -- (V.foldl mf Nothing xs) -- where -- mf m y = Just (case m of -- Nothing -> y -- Just x -> f x y) class (F.Foldable c, Monoid (c a), Loggable (c a),Typeable c, Typeable a) => Distributable c a where singleton :: a -> c a splitAt :: Int -> c a -> (c a, c a) fromList :: [a] -> c a instance Loggable a => Loggable (DV.Vector a) where serialize v= intDec (DV.length v) <> "/" <> foldl' (\s x -> s <> "/" <> serialize x ) mempty v deserialize= do len <- int DV.replicateM len $ tChar '/' *> deserialize instance (Typeable a, Loggable a) => Distributable DV.Vector a where singleton = DV.singleton splitAt= DV.splitAt fromList = DV.fromList instance (Loggable a,DVU.Unbox a) => Loggable (DVU.Vector a) where serialize v= intDec (DVU.length v) <> "/" <> serialize(v DVU.! 0) <> DVU.ifoldl' (\s _ x -> s <> "/" <> serialize x) mempty (DVU.slice 1 (DVU.length v -1) v) deserialize= do len <- int DVU.replicateM len $ tChar '/' *> deserialize instance (Typeable a, Loggable a, DVU.Unbox a) => Distributable DVU.Vector a where singleton= DVU.singleton splitAt= DVU.splitAt fromList= DVU.fromList -- | perform a map and partition the result with different keys using boxed vectors -- The final result will be used by reduce. mapKeyB :: (Typeable a, Loggable a, Typeable b,Loggable b, Typeable k, Loggable k,Ord k) => (a -> (k,b)) -> DDS (DV.Vector a) -> DDS (M.Map k(DV.Vector b)) mapKeyB= mapKey -- | perform a map and partition the result with different keys using unboxed vectors -- The final result will be used by reduce. mapKeyU :: (Typeable a, Loggable a, DVU.Unbox a, Typeable b, Loggable b, DVU.Unbox b, Typeable k, Loggable k,Ord k) => (a -> (k,b)) -> DDS (DVU.Vector a) -> DDS (M.Map k(DVU.Vector b)) mapKeyU= mapKey -- | perform a map and partition the result with different keys. -- The final result will be used by reduce. mapKey :: (Distributable container a,Distributable container b, Typeable k, Loggable k,Ord k) => (a -> (k,b)) -> DDS (container a) -> DDS (M.Map k (container b)) mapKey f (DDS mx)= DDS $ loggedc $ do refs <- mx process refs !> ("process",refs) where -- process :: Partition a -> Cloud [Partition b] process (ref@(Ref node path sav))= runAt node $ local $ do xs <- getPartitionData ref !> ("CMAP", ref,node) (generateRef $ map1 f xs) -- map1 :: (Ord k, F.Foldable container) => (a -> (k,b)) -> container a -> M.Map k(container b) map1 f v= F.foldl' f1 M.empty v where f1 map x= let (k,r) = f x in M.insertWith (<>) k (Transient.MapReduce.singleton r) map {- map :: (Distributable container a,Distributable container b, Loggable k,Ord k) => (a -> b) -> DDS (container a) -> DDS (container b) map f (DDS mx)= DDS $ loggedc $ do refs <- mx process refs -- !> ("process",refs) where -- process :: Partition a -> Cloud [Partition b] process (ref@(Ref node path sav))= runAt node $ local $ do xs <- getPartitionData ref -- !> ("CMAP", ref,node) (generateRef $ map1 f xs) // xxx !> "MAP" map1 :: (Ord k, F.Foldable container) => (a -> b) -> container a -> container b map1 f v= F.foldl' f1 M.empty v where f1 map x= let r = f x in M.insertWith (<>) k (Transient.MapReduce.singleton r) map -} data ReduceChunk a= EndReduce | Reduce a deriving (Typeable, Read, Show) boxids= unsafePerformIO $ newIORef (0 :: Int) reduce :: (Hashable k,Ord k, Distributable container a, Typeable k, Loggable k, Typeable a, Loggable a) => (a -> a -> a) -> DDS (M.Map k (container a)) ->Cloud (M.Map k a) reduce red (dds@(DDS mx))= loggedc $ do mboxid <- localIO $ atomicModifyIORef boxids $ \n -> let n'= n+1 in (n',n') nodes <- local getEqualNodes -- return () !> ("REDUCE NODES=", nodes) let lengthNodes = length nodes shuffler nodes = do localIO $ threadDelay 100000 ref@(Ref node path sav) <- mx -- return the resulting blocks of the map runAt node $ do localIO $ return () !> ("FOLDANDSEND","REF",ref, "runAt", node) foldAndSend node nodes ref stop -- groupByDestiny :: (Hashable k, Distributable container a) => M.Map k (container a) -> M.Map Int [(k ,container a)] groupByDestiny map = M.foldlWithKey' f M.empty map where -- f :: M.Map Int [(k ,container a)] -> k -> container a -> M.Map Int [(k ,container a)] f map k vs= M.insertWith (<>) (hash1 k) [(k,vs)] map hash1 k= abs $ hash k `rem` length nodes -- foldAndSend :: (Hashable k, Distributable container a)=> (Int,[(k,container a)]) -> Cloud () foldAndSend node nodes ref= do pairs <- onAll $ getPartitionData1 ref <|> return (error $ "DDS computed out of his node:"++ show ref ) let mpairs = groupByDestiny pairs length <- local . return $ M.size mpairs let port2= nodePort node if length == 0 then sendEnd nodes else do nsent <- onAll $ liftIO $ newMVar 0 (i,folded) <- local $ parallelize foldthem (M.assocs mpairs) n <- localIO $ modifyMVar nsent $ \r -> return (r+1, r+1) :: IO (Int,Int) -- sourcenode <- local getMyNode -- XXX borrar, solo para debug --localIO $ return () !> ("PUTMAILBOX TOSEND from",sourcenode,n,length,i,folded) (runAt (nodes !! i) $ do return () !> "JUST BEFORE PUTMAILBOX" local $ (putMailbox' mboxid (Reduce folded `asTypeOf` paramOf dds)) !> ("PUTMAILBOX SENT ",n,length,i,folded)) when (n == length) $ sendEnd nodes return () empty where foldthem (i,kvs)= async . return $ (i,map (\(k,vs) -> (k,foldl1 red vs)) kvs) sendEnd nodes = do -- node <- local getMyNode -- XXX quitar. solo para debug onNodes nodes $ local $ putMailbox' mboxid (EndReduce `asTypeOf` paramOf dds) -- !> ("PUTMAILBOX ENDREDUCE FROM", node)) onNodes nodes f = foldr (<|>) empty $ map (\n -> runAt n f ) nodes sumNodes nodes f= foldr (<>) mempty $ map (\n -> runAt n f ) nodes reducer nodes= sumNodes nodes reduce1 -- a reduce1 process in each node, get the results and mappend them -- reduce1 :: (Ord k) => Cloud (M.Map k v) reduce1 =local $ do reduceResults <- liftIO $ newMVar M.empty numberSent <- liftIO $ newMVar 0 return () !> "GETMAILBOX" minput <- getMailbox' mboxid -- get the chunk once it arrives to the mailbox case minput of EndReduce -> do return () !> "ENDREDUCE" n <- liftIO $ modifyMVar numberSent $ \r -> let r'= r+1 in return (r', r') if n == lengthNodes !> ("END REDUCE RECEIVEDDDD",n, lengthNodes) then do deleteMailbox' mboxid (EndReduce `asTypeOf` paramOf dds) r <- liftIO $ readMVar reduceResults return r !> ("RETURNING",r) else stop Reduce kvs -> do return () !> "REDUCE RECEIVEDDDDDDDDDDDD" let addIt (k,inp) = do let input= inp `asTypeOf` atype dds liftIO $ modifyMVar_ reduceResults $ \map -> do let maccum = M.lookup k map return $ M.insert k (case maccum of Just accum -> red input accum Nothing -> input) map mapM addIt (kvs `asTypeOf` paramOf' dds) !> ("RECEIVED REDUCEEEEEEEEEEEEE",kvs) stop r <- reducer nodes <|> shuffler nodes localIO $ return () !> "RETRETRET" return r where atype ::DDS(M.Map k (container a)) -> a atype = undefined -- type level paramOf :: DDS (M.Map k (container a)) -> ReduceChunk [( k, a)] paramOf = undefined -- type level paramOf' :: DDS (M.Map k (container a)) -> [( k, a)] paramOf' = undefined -- type level -- parallelize :: Loggable b => (a -> Cloud b) -> [a] -> Cloud b parallelize f xs = foldr (<|>) empty $ map f xs mparallelize f xs = loggedc $ foldr (<>) mempty $ map f xs getPartitionData :: (Typeable a, Loggable a) => PartRef a -> TransIO a getPartitionData (Ref node path save) = Transient $ do mp <- (liftIO $ atomically $ readDBRef $ getDBRef $ keyp path save) `onNothing` error ("not found DDS data: "++ keyp path save) case mp of (Part _ _ _ xs) -> return $ Just xs getPartitionData1 :: (Typeable a, Loggable a) => PartRef a -> TransIO a getPartitionData1 (Ref node path save) = Transient $ do mp <- liftIO $ atomically $ readDBRef $ getDBRef $ keyp path save case mp of Just (Part _ _ _ xs) -> return $ Just xs Nothing -> return Nothing getPartitionData2 :: (Typeable a,Loggable a) => PartRef a -> IO a getPartitionData2 (Ref node path save) = do mp <- ( atomically $ readDBRef $ getDBRef $ keyp path save) `onNothing` error ("not found DDS data: "++ keyp path save) case mp of (Part _ _ _ xs) -> return xs -- en caso de fallo de Node, se lanza un clustered en busca del path -- si solo uno lo tiene, se copia a otro -- se pone ese nodo de referencia en Part runAtP :: Loggable a => Node -> (Path -> IO a) -> Path -> Cloud a runAtP node f uuid= do r <- runAt node $ onAll . liftIO $ (SLast <$> f uuid) `catch` sendAnyError case r of SLast r -> return r SError e -> do nodes <- mclustered $ search uuid when(length nodes < 1) $ asyncDuplicate node uuid runAtP ( head nodes) f uuid search uuid= error $ "chunk failover not yet defined. Lookin for: "++ uuid asyncDuplicate node uuid= do forkTo node nodes <- onAll getEqualNodes let node'= head $ nodes \\ [node] content <- onAll . liftIO $ readFile uuid runAt node' $ local $ liftIO $ writeFile uuid content sendAnyError :: SomeException -> IO (StreamData a) sendAnyError e= return $ SError e -- | distribute a container of values among many nodes. -- If the container is static and sharable, better use the get* primitives -- since each node will load the data independently. distribute :: (Loggable a, Distributable container a ) => container a -> DDS (container a) distribute = DDS . distribute' distribute' xs= loggedc $ do nodes <- local getEqualNodes -- !> "DISTRIBUTE" let lnodes = length nodes let size= case F.length xs `div` (length nodes) of 0 ->1 ; n -> n xss= split size lnodes 1 xs -- !> size r <- distribute'' xss nodes return r where split n s s' xs | s==s' = [xs] split n s s' xs= let (h,t)= Transient.MapReduce.splitAt n xs in h : split n s (s'+1) t distribute'' :: (Loggable a, Distributable container a) => [container a] -> [Node] -> Cloud (PartRef (container a)) distribute'' xss nodes = parallelize move $ zip nodes xss -- !> show xss where move (node, xs)= runAt node $ local $ do par <- generateRef xs return par !> ("move", node,xs) -- | input data from a text that must be static and shared by all the nodes. -- The function parameter partition the text in words getText :: (Loggable a, Distributable container a) => (String -> [a]) -> String -> DDS (container a) getText part str= DDS $ loggedc $ do nodes <- local getEqualNodes !> "getText" return () !> ("DISTRIBUTE TEXT IN NODES:",nodes) let lnodes = length nodes parallelize (process lnodes) $ zip nodes [0..lnodes-1] where process lnodes (node,i)= runAt node $ local $ do let xs = part str size= case length xs `div` lnodes of 0 ->1 ; n -> n xss= Transient.MapReduce.fromList $ if i== lnodes-1 then drop (i* size) xs else take size $ drop (i * size) xs generateRef xss !> "GETTEXT PROCESS" -- | get the worlds of an URL textUrl :: String -> DDS (DV.Vector Text.Text) textUrl= getUrl (map Text.pack . words) -- | generate a DDS from the content of a URL. -- The first parameter is a function that divide the text in words getUrl :: (Loggable a, Distributable container a) => (String -> [a]) -> String -> DDS (container a) getUrl partitioner url= DDS $ do nodes <- local getEqualNodes -- !> "DISTRIBUTE" let lnodes = length nodes parallelize (process lnodes) $ zip nodes [0..lnodes-1] -- !> show xss where process lnodes (node,i)= runAt node $ local $ do r <- liftIO . simpleHTTP $ getRequest url body <- liftIO $ getResponseBody r let xs = partitioner body size= case length xs `div` lnodes of 0 ->1 ; n -> n xss= Transient.MapReduce.fromList $ if i== lnodes-1 then drop (i* size) xs else take size $ drop (i * size) xs generateRef xss !> "GETURL" -- | get the words of a file textFile :: String -> DDS (DV.Vector Text.Text) textFile= getFile (map Text.pack . words) -- | generate a DDS from a file. All the nodes must access the file with the same path -- the first parameter is the parser that generates elements from the content getFile :: (Loggable a, Distributable container a) => (String -> [a]) -> String -> DDS (container a) getFile partitioner file= DDS $ do nodes <- local getEqualNodes -- !> "DISTRIBUTE" let lnodes = length nodes parallelize (process lnodes) $ zip nodes [0..lnodes-1] -- !> show xss where process lnodes (node, i)= runAt node $ local $ do content <- do c <- liftIO $ readFile file length c `seq` return c let xs = partitioner content size= case length xs `div` lnodes of 0 ->1 ; n -> n xss= Transient.MapReduce.fromList $ if i== lnodes-1 then drop (i* size) xs else take size $ drop (i * size) xs generateRef xss !> "GETFILE" generateRef :: (Typeable a, Loggable a) => a -> TransIO (PartRef a) generateRef x= do node <- getMyNode liftIO $ do temp <- getTempName let reg= Part node temp True x -- False to not save atomically $ newDBRef reg -- syncCache (return $ getRef reg) -- !> ("generateRef",reg,node) getRef (Part n t s x)= Ref n t s getTempName :: IO String getTempName= ("DDS" ++) <$> replicateM 5 (randomRIO ('a','z')) -------------- Distributed Datasource Streams --------- -- | produce a stream of DDS's that can be map-reduced. Similar to spark streams. -- each interval of time,a new DDS is produced.(to be tested) streamDDS :: (Loggable a, Distributable container a) => Int -> IO (StreamData a) -> DDS (container a) streamDDS time io= DDS $ do xs <- local . groupByTime time $ do r <- parallel io case r of SDone -> empty SLast x -> return [x] SMore x -> return [x] SError e -> error $ show e distribute' $ Transient.MapReduce.fromList xs #endif