module Database.MongoDB.GridFS
( Bucket
, files, chunks
, File
, document, bucket
, openDefaultBucket
, openBucket
, findFile
, findOneFile
, fetchFile
, deleteFile
, sourceFile
, sinkFile
)
where
import Control.Applicative((<$>))
import Control.Monad(when)
import Control.Monad.IO.Class
import Control.Monad.Trans(MonadTrans, lift)
import Data.Conduit
import Data.Digest.Pure.MD5
import Data.Int
import Data.Tagged(Tagged, untag)
import Data.Text(Text, append)
import Data.Time.Clock(getCurrentTime)
import Database.MongoDB
import Prelude
import qualified Data.Bson as B
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
defaultChunkSize :: Int64
defaultChunkSize = 256 * 1024
md5BlockSizeInBytes :: Int
md5BlockSizeInBytes = 64
data Bucket = Bucket {files :: Text, chunks :: Text}
openDefaultBucket :: (Monad m, MonadIO m) => Action m Bucket
openDefaultBucket = openBucket "fs"
openBucket :: (Monad m, MonadIO m) => Text -> Action m Bucket
openBucket name = do
let filesCollection = name `append` ".files"
let chunksCollection = name `append` ".chunks"
ensureIndex $ (index filesCollection ["filename" =: (1::Int), "uploadDate" =: (1::Int)])
ensureIndex $ (index chunksCollection ["files_id" =: (1::Int), "n" =: (1::Int)]) { iUnique = True, iDropDups = True }
return $ Bucket filesCollection chunksCollection
data File = File {bucket :: Bucket, document :: Document}
getChunk :: (Monad m, MonadIO m) => File -> Int -> Action m (Maybe S.ByteString)
getChunk (File bucket doc) i = do
files_id <- B.look "_id" doc
result <- findOne $ select ["files_id" := files_id, "n" =: i] $ chunks bucket
let content = at "data" <$> result
case content of
Just (Binary b) -> return (Just b)
_ -> return Nothing
findFile :: MonadIO m => Bucket -> Selector -> Action m [File]
findFile bucket sel = do
cursor <- find $ select sel $ files bucket
results <- rest cursor
return $ File bucket <$> results
findOneFile :: MonadIO m => Bucket -> Selector -> Action m (Maybe File)
findOneFile bucket sel = do
mdoc <- findOne $ select sel $ files bucket
return $ File bucket <$> mdoc
fetchFile :: MonadIO m => Bucket -> Selector -> Action m File
fetchFile bucket sel = do
doc <- fetch $ select sel $ files bucket
return $ File bucket doc
deleteFile :: (MonadIO m) => File -> Action m ()
deleteFile (File bucket doc) = do
files_id <- B.look "_id" doc
delete $ select ["_id" := files_id] $ files bucket
delete $ select ["files_id" := files_id] $ chunks bucket
putChunk :: (Monad m, MonadIO m) => Bucket -> ObjectId -> Int -> L.ByteString -> Action m ()
putChunk bucket files_id i chunk = do
insert_ (chunks bucket) ["files_id" =: files_id, "n" =: i, "data" =: Binary (L.toStrict chunk)]
sourceFile :: (Monad m, MonadIO m) => File -> Producer (Action m) S.ByteString
sourceFile file = yieldChunk 0 where
yieldChunk i = do
mbytes <- lift $ getChunk file i
case mbytes of
Just bytes -> yield bytes >> yieldChunk (i+1)
Nothing -> return ()
data FileWriter = FileWriter
{ fwChunkSize :: Int64
, fwBucket :: Bucket
, fwFilesId :: ObjectId
, fwChunkIndex :: Int
, fwSize :: Int64
, fwAcc :: L.ByteString
, fwMd5Context :: MD5Context
, fwMd5acc :: L.ByteString
}
finalizeFile :: (Monad m, MonadIO m) => Text -> FileWriter -> Action m File
finalizeFile filename (FileWriter chunkSize bucket files_id i size acc md5context md5acc) = do
let md5digest = finalizeMD5 md5context (L.toStrict md5acc)
when (L.length acc > 0) $ putChunk bucket files_id i acc
timestamp <- liftIO $ getCurrentTime
let doc = [ "_id" =: files_id
, "length" =: size
, "uploadDate" =: timestamp
, "md5" =: show (md5digest)
, "chunkSize" =: chunkSize
, "filename" =: filename
]
insert_ (files bucket) doc
return $ File bucket doc
finalizeMD5 :: MD5Context -> S.ByteString -> MD5Digest
finalizeMD5 ctx rest =
md5Finalize ctx2 (S.drop lu rest)
where
l = S.length rest
r = l `mod` md5BlockSizeInBytes
lu = l r
ctx2 = md5Update ctx (S.take lu rest)
writeChunks :: (Monad m, MonadIO m) => FileWriter -> L.ByteString -> Action m FileWriter
writeChunks (FileWriter chunkSize bucket files_id i size acc md5context md5acc) chunk = do
let md5BlockLength = fromIntegral $ untag (blockLength :: Tagged MD5Digest Int)
let md5acc_temp = (md5acc `L.append` chunk)
let (md5context', md5acc') =
if (L.length md5acc_temp < md5BlockLength)
then (md5context, md5acc_temp)
else let numBlocks = L.length md5acc_temp `div` md5BlockLength
(current, rest) = L.splitAt (md5BlockLength * numBlocks) md5acc_temp
in (md5Update md5context (L.toStrict current), rest)
let size' = (size + L.length chunk)
let acc_temp = (acc `L.append` chunk)
if (L.length acc_temp < chunkSize)
then return (FileWriter chunkSize bucket files_id i size' acc_temp md5context' md5acc')
else do
let (chunk, acc') = L.splitAt chunkSize acc_temp
putChunk bucket files_id i chunk
writeChunks (FileWriter chunkSize bucket files_id (i+1) size' acc' md5context' md5acc') L.empty
sinkFile :: (Monad m, MonadIO m) => Bucket -> Text -> Consumer S.ByteString (Action m) File
sinkFile bucket filename = do
files_id <- liftIO $ genObjectId
awaitChunk $ FileWriter defaultChunkSize bucket files_id 0 0 L.empty md5InitialContext L.empty
where
awaitChunk fw = do
mchunk <- await
case mchunk of
Nothing -> lift (finalizeFile filename fw)
Just chunk -> lift (writeChunks fw (L.fromStrict chunk)) >>= awaitChunk