{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-|
Module      : TimeSeries.Routing
Copyright   : (C) 2013 Parallel Scientific Labs, LLC
License     : GPL-2
Stability   : experimental
Portability : non-portable

Data routing for time series analysis.

-}
module TimeSeries.Routing where

import Control.Monad.State
import Control.Monad.Writer
import Data.List (intersperse)
import Data.Word
import Data.IntMap (IntMap)
import Data.Sequence (Seq)
import System.IO
import qualified Data.Foldable as F
import qualified Data.IntMap as IM
import qualified Data.Sequence as Seq

import TimeSeries.Window (Window, RandomVector(..), Size)
import qualified TimeSeries.Window as W
import qualified TimeSeries.Correlation as Crr

-- --------------------------------------------------------------------------
-- * Types

-- | Configuration values for detecting correlations from
-- uncooperative time series data.
--
-- This data type shall relate to bootstrapping in future, but at the
-- moment, nothing related.
--
data Config = Config
  { -- | Number of random elements in sketch.
    sketchSize :: Int
    -- | Number of subsequences for grouping sketch.
  , sketchGroups :: Word8
    -- | Cutoff value between of correlation, between 0 to 1.
  , corrCutoff :: Double
    -- | Size of sliding window.
  , swSize :: Word64
    -- | Size of basic window.
  , bwSize :: Word64
    -- | Seed for random vectors.
  , randSeed :: Integer
    -- | Implementation.
  , impls :: Implementation
    -- | To show trace output or not.
  , verbose :: Bool
  } deriving (Eq, Show)

-- | Representing basic windows as 'IntMap' of 'Seq' of 'Window's.
--
-- Key of outer 'IntMap' is ID for concurrent input stream, number of
-- keys matches to number of concurrent input stream. Inner 'Seq'
-- is indexed basic window within sliding window, number of elements
-- matches to /nb/, where `nb = sw / bw`, /sw/ is sliding window size
-- and /bw/ is basic window size.
--
type BasicWindows = IntMap (Seq Window)

-- | Summary of sliding window.
--
-- We don't need to maintain whole basic windows when sketch
-- implementation is the only concern. To show correlation values with
-- direct function, preserving the whole basic window contents. From
-- \"3.5 The Issues in Implementation section\":
--
-- /... We need to maintain/
--
-- * ∑i=0,nb-1(sum(Xbwⁱ))
--
-- * ∑i=0,nb-1(sum((Xbwⁱ)²))
--
-- /for a sliding window .../
--
data Summary = Summary
  { -- | Sum of elements in sliding window.
    -- In other words, @∑i=0,nb-1(sum(Xbwⁱ))@.
    smrSum :: Double
    -- | Sum of squared elements in sliding window.
    -- In other words, @∑i=0,nb-1(sum((Xbwⁱ)²))@.
  , smrSumSq :: Double
    -- | Sketch vector, size of this window matches to sketch size.
  , smrSketch :: Window
    -- | Pre-sketches. \"/Pre/\" means that those sketch values
    -- before applying dot product with control vector.
  , smrPreSketches :: [Window]
    -- | Summaries for basic windows, length of 'Seq' is /nb/, where
    -- @nb = sliding_window_size / basic_window_size@.
  , smrBasicWindows :: Seq BWSummary
  } deriving (Eq, Show)

-- | Summary of basic window.
--
-- From \"3.5 The Issues in Implementation section\":
--
-- /... We need to maintain/
--
-- * Xbwⁱ⋅R
--
-- * sum(Xbwⁱ)
--
-- * sum((Xbwⁱ)²)
--
-- /for each basic window./
--
data BWSummary = BWSummary
  { -- | Sketch of basic window, currently not in use.
    bwSketch :: Window
    -- | Sum of elements in basic window.
  , bwsSum :: Double
    -- | Sum of squared elements in basic window.
  , bwsSumSq :: Double
  } deriving (Eq, Show)

{-
XXX: Are those fields in 'Summary' and 'BWSummry' enough?
Update whenever found more needs....
-}

-- | State stored in analysis system.
--
-- Later this shall relate to memory data in hardware implementation,
-- but at the moment its 100% Haskell data type and values.
--
data SysState = SysState
  { sysBasicWindows :: BasicWindows
  , sysRandomVectors :: IntMap RandomVector
  , sysAppendCount :: Word64
  , sysCurrentTime :: Word64
  , sysNumberOfBasicWindows :: Word64
  , sysIsReadyToTell :: Bool
  , sysSummaries :: IntMap Summary
  , sysConfig :: Config
  } deriving (Eq, Show)

-- XXX: There are more data to hold in state, more parameters to configure.
-- Just enough to starting the main loop ....

-- | Result of correlation analysis.
--
-- Inspired from output found in \"statStream\". See:
--
-- * <http://cs.nyu.edu/cs/faculty/shasha/papers/statstream/doc.html>
--
data CorrResult = CorrResult
  { -- | Index of time series input A.
    crIndexA :: Int
    -- | Index of time series input B.
  , crIndexB :: Int
    -- | Start time of base window.
  , crStartTime :: Word64
    -- | 'crTimeStart' + (size of base window)
  , crEndTime :: Word64
    -- | A value between -1 to 1.
  , crValue :: Double
  } deriving (Eq, Show)

-- | Implementations.
--
-- As for a prototype, used to analyse and compre resulting values for
-- different implementations.
--
data Implementation
  = Direct -- ^ Direct correlation computation.
  | Sketch -- ^ Sketch based computation.
  deriving (Eq, Show, Read)

-- | Loop for analysing correlation of input data and updating
-- analysis results and windowed data.
newtype Loop a =
  Loop {unLoop :: StateT SysState (Writer [Either String CorrResult]) a}
  deriving
    ( Functor, Monad, MonadState SysState
    , MonadWriter [Either String CorrResult]
    )

-- --------------------------------------------------------------------------
-- * Looping actions
--
-- $ Looping with 'State' and 'Writer'.
--

-- | The main loop of time series analysis.
--
-- Specifying number of concurrent time series data in this function.
-- In hardware implementation, this may relates to fixed value, which
-- possibly been configured at the time of code generation.
--
loop ::
  Handle        -- ^ Where to show results.
  -> Config     -- ^ Configuration values used for computation.
  -> [[Double]] -- ^ Input values.
  -> IO ()
loop hdl conf ins0 = go st0 ins0 where
  nTimeSeries = length (head ins0)
  st0 = initialSysState conf nTimeSeries
  go st ins = case ins of
    []     -> putStrLn "done."
    ds:dss -> do
      let ((_,st'),res) = runLoop (step conf ds) st
      mapM_ (hPutStrLn hdl . either id simpleCorrResult) res
      go st' dss

-- | Unwrap 'Loop', run internal 'State' and 'Writer'.
runLoop ::
  Loop a -> SysState -> ((a, SysState), [Either String CorrResult])
runLoop (Loop lp) = runWriter . runStateT lp

-- | Single step to take with new input data.
--
-- Handle state management and if found any, report analysis result.
--
step :: Config -> [Double] -> Loop ()
step Config{..} values = do

  -- What we do with new inputs.
  updateStates values

  -- Notify correlation value.
  -- Some of the state may relate to recursive values in FPGA registers,
  -- some may relate to contents of external memory, in hardware
  -- implementation.
  notify <- sysIsReadyToTell `fmap` get

  -- In later phase, 'tell' below should be a response to external
  -- call for getting analysis result.
  when notify $ do

    -- Update summaries used for standardization of sketch distances,
    -- and shift basic windows when basic window is full state.
    updateSummariesAndShift

    -- ... And get state.
    SysState{..} <- get

    -- Computation of correlation with current system state.
    let results = corrPermute
          sysCurrentTime bwSize swSize sysBasicWindows sysSummaries
    tell $ concatMap (map Right . filterImpl impls corrCutoff) results

    -- For tracing.
    when verbose $ do

      -- Differences of direct correlation and sketch correlation.
      -- tell [ Left "\nDiffs:"
      --      , let diff :: Double -> Double -> String
      --            diff x y = printf "%.22f" (abs (x-y))
      --            ds = map (uncurry (diff `on` crValue)) results
      --        in  Left (unlines $ map ("  " ++) ds)
      --      ]

      -- Averages and variances of sketch.
      tell [ Left "\nIntMap (avg,var) of sketches:"
           , let av :: Summary -> (Double,Double)
                 av Summary{..} = (W.average smrSketch, W.variance smrSketch)
             in  Left (IM.showTree $ IM.map av sysSummaries)
           ]


-- | Filter results with implementation and cutoff value.
filterImpl ::
  Implementation
  -- ^ Which result to choose.
  -> Double
  -- ^ Cutoff value.
  -> (CorrResult,CorrResult)
  -- ^ (Direct correlation result, sketch correlation result).
  -> [CorrResult]
filterImpl impl coff (d,s) = case impl of
  Direct | abs (crValue d) > coff -> [d]
  Sketch | abs (crValue s) > coff -> [s]
  _                               -> []

-- | Compute correlations.
corrPermute ::
  Word64
  -- ^ System current time.
  -- ... What is the time format used in hardware implementation?
  -> Word64
  -- ^ Basic window size.
  -> Word64
  -- ^ Sliding window size.
  -> BasicWindows
  -- ^ Basic windows.
  -> IntMap Summary
  -- ^ 'Summary' for each input stream.
  -> [(CorrResult,CorrResult)]
  -- ^ Direct result and sketch result.
corrPermute time bwsz swsz im smrs =
  let im' = IM.toList $ slidingWindow swsz im
      f (mykey,mywin) = concatMap g im' where
        g (otherkey,otherwin)
          | mykey < otherkey = [(directCrr,sketchCrr)]
          | otherwise        = []
          where
            directCrr = crr $ Crr.direct mywin otherwin
            sketchCrr = crr $ Crr.sketch mysketch othersketch
            crr = CorrResult mykey otherkey startTime endTime
            startTime = time - bwsz + 1
            endTime = time
            _mytup = (mysketch,mysum,mysumsq)
            mysum = smrSum (smrs IM.! mykey)
            mysumsq = smrSumSq (smrs IM.! mykey)
            _othertup = (othersketch,othersum,othersumsq)
            othersum = smrSum (smrs IM.! otherkey)
            othersumsq = smrSumSq (smrs IM.! otherkey)
            -- sw' = fromIntegral swsz
            mysketch = smrSketch (smrs IM.! mykey)
            othersketch = smrSketch (smrs IM.! otherkey)
  in  concatMap f im'

-- | Struct sliding windows from basic windows.
--
-- This function is not needed in hardware implementation, should used
-- for direct calculation only.
--
slidingWindow :: Word64 -> BasicWindows -> IntMap Window
slidingWindow swsz bw = fmap (F.foldr W.append (W.empty swsz)) bw

-- | Adding new elements to windows, removing old elements.
updateStates :: [Double] -> Loop ()
updateStates values =
  modify $ \st@SysState{..} ->
    let values' =
          IM.fromList . zip [0..] . map (Seq.singleton . W.singleton) $
          values
        count = (sysAppendCount+1) `mod` bwSize sysConfig
        shifting = count == (bwSize sysConfig - 1)
        bwin = IM.unionWith consBW values' sysBasicWindows
        vmap = IM.fromList $ zip [0..] values
        -- Sum and sum of squares in head basic window are updated in
        -- smry'.
        smry' = IM.mapWithKey f sysSummaries
        f k s@Summary{..} =
          let v = vmap IM.! k
              sbw0 = Seq.index smrBasicWindows 0
              sbw0' = sbw0
                { bwsSum = bwsSum sbw0 + v
                , bwsSumSq = bwsSumSq sbw0 + v^(2::Int)
                }
              sbw' = Seq.update 0 sbw0' smrBasicWindows
          in  s {smrBasicWindows = sbw'}
    in  st { sysBasicWindows = bwin
           , sysAppendCount = count
           , sysIsReadyToTell = shifting
           , sysCurrentTime = sysCurrentTime + 1
           , sysSummaries = smry'
           }

-- | Append first argument to second argument.
consBW ::
  Seq Window -- ^ Singleton 'Seq' of singleton 'Window'.
  -> Seq Window -- ^ Basic windows in system status.
  -> Seq Window
consBW a b = b' where
  a0 = Seq.index a 0
  b0 = Seq.index b 0
  b' = W.append a0 b0 Seq.<| Seq.drop 1 b

-- | Update sum of elements in sliding window, and sum of squared
-- elements in sliding window, and shift contents of basic windows.
updateSummariesAndShift :: Loop ()
updateSummariesAndShift = modify $ \st@SysState{..} ->
  let -- Popping out last element, pushing size /bw/ zero filled window to
      -- store incoming data.
      bws = IM.map (rotateSeq (W.empty (bwSize sysConfig))) sysBasicWindows

      -- Word64 to Int conversion.
      nb = fromIntegral sysNumberOfBasicWindows

      -- Updating sums and sum of squares.
      smry' = IM.mapWithKey f sysSummaries
      f k s = s { smrSum = sum'
                , smrSumSq = sumsq'
                , smrBasicWindows = smrbws'
                , smrSketch = sketch'
                , smrPreSketches = preSketches'
                }
        where
          -- Sum and sum of squares, of elements in sliding window.
          sum' = smrSum s - bwsSum bwsmrynb + bwsSum bwsmry0
          sumsq' = smrSumSq s - bwsSumSq bwsmrynb + bwsSumSq bwsmry0

          -- Basic window summaries.
          bwsmry0 = Seq.index smrbws 0
          bwsmrynb = Seq.index smrbws (nb-1)
          smrbws = smrBasicWindows s
          bws' = sysBasicWindows IM.! k

          -- Sketch of sliding window (Naively standardized)
          -- Sums and sums of squares are not used ....
          sketch' = W.fromList [(xi-mu)/sigma|xi<-sks]
          sks = zipWith W.dotp cs preSketches'

          mu = W.average $ W.fromList sks
          sigma = W.variance $ W.fromList sks

          -- Using these 'mu' and 'sigma', symmetrical shape is
          -- preserved but scaling is inappropriate...
          --
          -- mu = (sum'/swSize')
          -- sigma = (sumsq'/swSize') - mu^(2::Int)
          -- swSize' = fromIntegral (swSize sysConfig)

          -- Summaries of basic windows.
          smrbws' = rotateSeq iniBW smrbws
          iniBW = initialBWSummary (fromIntegral $ sketchSize sysConfig)

          -- Pre-sketches, stored for next looping step.
          preSketches' :: [Window]
          preSketches' =
            zipWith (\a b -> W.append (W.singleton a) b)
              (W.toList $ W.dotps urs bw0) (smrPreSketches s)

          -- New basic window.
          bw0 :: Window
          bw0 = Seq.index bws' 0

          -- Unit random vectors.
          urs :: [Window]
          urs = map unitRV . IM.elems $ sysRandomVectors

          -- Control vectors.
          cs :: [Window]
          cs = map controlRV . IM.elems $ sysRandomVectors

  in  st { sysBasicWindows = bws
         , sysSummaries = smry'
         }

-- | Cons given element to given 'Seq' and remove last element in
-- 'Seq'.
rotateSeq :: a -> Seq a -> Seq a
rotateSeq x sq = x Seq.<| Seq.take (Seq.length sq - 1) sq

{-
XXX:

* Do 'incremental approach' described in Chapter 4 of
'High Performance Algorithms for ... ' pdf to update state.

* Use partitioned sketches, with parameters: N, d, c, and f.

* Make code closer to model hardware implementation.

* Asynchrnous correlation (i.e. correlations between time shifted
windows) are not computed.

-}

-- --------------------------------------------------------------------------
-- * Initial values

-- | Initial state.
--
-- Other than 'Config', number of concurrent time series input data is
-- passed.
--
initialSysState :: Config -> Int -> SysState
initialSysState conf@Config{..} numTimeSeries = SysState
  { sysBasicWindows = initialBasicWindows bwSize nb numTimeSeries
  , sysRandomVectors = randomVectors randSeed bwSize swSize sketchSize
  , sysAppendCount = 0
  , sysNumberOfBasicWindows = swSize `div` bwSize
  , sysIsReadyToTell = False
  , sysCurrentTime = 0
  , sysSummaries = initialSummaries conf numTimeSeries
  , sysConfig = conf
  } where nb = swSize `div` bwSize

initialSlidingWindows :: Size -> Int -> IntMap Window
initialSlidingWindows sz n = IM.fromList $ zip [0..n-1] (repeat $ W.empty sz)

initialBasicWindows :: Size -> Size -> Int -> BasicWindows
initialBasicWindows sz nb n = IM.fromList $ zip [0..n-1] (repeat sw0) where
  sw0 = Seq.fromList (replicate (fromIntegral nb) bw0)
  bw0 = W.empty sz

initialSummaries ::
  Config -- ^ Main configuration.
  -> Int -- ^ Number of concurrent input streams.
  -> IntMap Summary
initialSummaries Config{..} nt = IM.fromList $ zip [0..nt-1] smrs where
  smrs = repeat $ Summary
    { smrSum = 0
    , smrSumSq = 0
    , smrSketch = W.empty (fromIntegral sketchSize)
    , smrPreSketches = replicate sketchSize (W.empty nb)
    , smrBasicWindows = Seq.fromList . replicate nb . initialBWSummary $
                        fromIntegral sketchSize
    }
  nb :: Num a => a
  nb = fromIntegral (swSize `div` bwSize)

-- | Initial basic window summary data.
initialBWSummary ::
  Size -- ^ Sketch size.
  -> BWSummary
initialBWSummary d = BWSummary
  { bwSketch = W.empty d
  , bwsSum = 0
  , bwsSumSq = 0
  }

-- | Initial random vectors.
randomVectors ::
  Integer -- ^ Random seed.
  -> Size -- ^ Basic window size.
  -> Size -- ^ Sliding window size.
  -> Int  -- ^ Sketch size.
  -> IntMap RandomVector
randomVectors seed bw sw d = IM.fromList wins where
  wins = zip ks rs
  ks = [0..d-1]
  rs = [W.randomVector (fromIntegral k + seed) bw nb | k <- ks]
  nb = fromIntegral (sw `div` bw)

-- --------------------------------------------------------------------------
-- * Pretty printer
--
-- $ Actually, not much pretty.

-- | Simplified string representation of 'CorrResult'.
--
-- >>> simpleCorrResult (CorrResult 1 2 10 20 0.5)
-- "1, 2, 10, 20, 0.5"
--
simpleCorrResult :: CorrResult -> String
simpleCorrResult CorrResult{..} = concat $ intersperse ", "
   [ show crIndexA, show crIndexB
   , show crStartTime, show crEndTime
   , show crValue
   ]