{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Streamly.External.LMDB
(
Environment,
openEnvironment,
isReadOnlyEnvironment,
closeEnvironment,
Mode,
ReadWrite,
ReadOnly,
Limits (..),
defaultLimits,
gibibyte,
tebibyte,
Database,
getDatabase,
clearDatabase,
closeDatabase,
readLMDB,
unsafeReadLMDB,
ReadOnlyTxn,
beginReadOnlyTxn,
abortReadOnlyTxn,
Cursor,
openCursor,
closeCursor,
ReadOptions (..),
defaultReadOptions,
ReadDirection (..),
writeLMDB,
WriteOptions (..),
defaultWriteOptions,
OverwriteOptions (..),
LMDB_Error (..),
MDB_ErrCode (..),
)
where
import Control.Concurrent (isCurrentThreadBound, myThreadId)
import Control.Concurrent.Async (asyncBound, wait)
import Control.Exception (Exception, catch, mask_, throw, tryJust)
import Control.Monad (guard, unless, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString, packCStringLen)
import Data.ByteString.Unsafe (unsafePackCStringLen, unsafeUseAsCStringLen)
import Data.Maybe (fromJust, isNothing)
import Data.Void (Void)
import Foreign (Ptr, alloca, free, malloc, nullPtr, peek)
import Foreign.C (Errno (Errno), eNOTDIR)
import Foreign.C.String (CStringLen)
import Foreign.Marshal.Utils (with)
import Foreign.Storable (poke)
import Streamly.External.LMDB.Internal (Database (..), Mode (..), ReadOnly, ReadWrite)
import Streamly.External.LMDB.Internal.Foreign
import Streamly.Internal.Data.Fold (Fold (Fold), Step (Partial))
import Streamly.Internal.Data.IOFinalizer (newIOFinalizer, runIOFinalizer)
import Streamly.Internal.Data.Stream.StreamD.Type (Step (Stop, Yield))
import Streamly.Internal.Data.Unfold (lmap)
import Streamly.Internal.Data.Unfold.Type (Unfold (Unfold))
newtype Environment mode = Environment (Ptr MDB_env)
isReadOnlyEnvironment :: Mode mode => Environment mode -> Bool
isReadOnlyEnvironment :: forall mode. Mode mode => Environment mode -> Bool
isReadOnlyEnvironment = forall a. Mode a => a -> Bool
isReadOnlyMode forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall mode. Environment mode -> mode
mode
where
mode :: Environment mode -> mode
mode :: forall mode. Environment mode -> mode
mode = forall a. HasCallStack => a
undefined
data Limits = Limits
{
Limits -> Int
mapSize :: !Int,
Limits -> Int
maxDatabases :: !Int,
Limits -> Int
maxReaders :: !Int
}
defaultLimits :: Limits
defaultLimits :: Limits
defaultLimits =
Limits
{ mapSize :: Int
mapSize = Int
1024 forall a. Num a => a -> a -> a
* Int
1024,
maxDatabases :: Int
maxDatabases = Int
0,
maxReaders :: Int
maxReaders = Int
126
}
gibibyte :: Int
gibibyte :: Int
gibibyte = Int
1024 forall a. Num a => a -> a -> a
* Int
1024 forall a. Num a => a -> a -> a
* Int
1024
tebibyte :: Int
tebibyte :: Int
tebibyte = Int
1024 forall a. Num a => a -> a -> a
* Int
1024 forall a. Num a => a -> a -> a
* Int
1024 forall a. Num a => a -> a -> a
* Int
1024
openEnvironment :: Mode mode => FilePath -> Limits -> IO (Environment mode)
openEnvironment :: forall mode.
Mode mode =>
FilePath -> Limits -> IO (Environment mode)
openEnvironment FilePath
path Limits
limits = do
Ptr MDB_env
penv <- IO (Ptr MDB_env)
mdb_env_create
Ptr MDB_env -> Int -> IO ()
mdb_env_set_mapsize Ptr MDB_env
penv (Limits -> Int
mapSize Limits
limits)
let maxDbs :: Int
maxDbs = Limits -> Int
maxDatabases Limits
limits in forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
maxDbs forall a. Eq a => a -> a -> Bool
/= Int
0) forall a b. (a -> b) -> a -> b
$ Ptr MDB_env -> Int -> IO ()
mdb_env_set_maxdbs Ptr MDB_env
penv Int
maxDbs
Ptr MDB_env -> Int -> IO ()
mdb_env_set_maxreaders Ptr MDB_env
penv (Limits -> Int
maxReaders Limits
limits)
let env :: Environment mode
env = forall mode. Ptr MDB_env -> Environment mode
Environment Ptr MDB_env
penv :: Mode mode => Environment mode
flags :: [CUInt]
flags = CUInt
mdb_notls forall a. a -> [a] -> [a]
: [CUInt
mdb_rdonly | forall mode. Mode mode => Environment mode -> Bool
isReadOnlyEnvironment Environment mode
env]
let isNotDirectoryError :: LMDB_Error -> Bool
isNotDirectoryError :: LMDB_Error -> Bool
isNotDirectoryError LMDB_Error {e_code :: LMDB_Error -> Either Int MDB_ErrCode
e_code = Left Int
code}
| CInt -> Errno
Errno (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
code) forall a. Eq a => a -> a -> Bool
== Errno
eNOTDIR = Bool
True
isNotDirectoryError LMDB_Error
_ = Bool
False
Either () ()
r <- forall e b a.
Exception e =>
(e -> Maybe b) -> IO a -> IO (Either b a)
tryJust (forall (f :: * -> *). Alternative f => Bool -> f ()
guard forall b c a. (b -> c) -> (a -> b) -> a -> c
. LMDB_Error -> Bool
isNotDirectoryError) forall a b. (a -> b) -> a -> b
$ Ptr MDB_env -> FilePath -> CUInt -> IO ()
mdb_env_open Ptr MDB_env
penv FilePath
path ([CUInt] -> CUInt
combineOptions [CUInt]
flags)
case Either () ()
r of
Left ()
_ -> Ptr MDB_env -> FilePath -> CUInt -> IO ()
mdb_env_open Ptr MDB_env
penv FilePath
path ([CUInt] -> CUInt
combineOptions forall a b. (a -> b) -> a -> b
$ CUInt
mdb_nosubdir forall a. a -> [a] -> [a]
: [CUInt]
flags)
Right ()
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
forall (m :: * -> *) a. Monad m => a -> m a
return Environment mode
env
closeEnvironment :: (Mode mode) => Environment mode -> IO ()
closeEnvironment :: forall mode. Mode mode => Environment mode -> IO ()
closeEnvironment (Environment Ptr MDB_env
penv) =
Ptr MDB_env -> IO ()
c_mdb_env_close Ptr MDB_env
penv
getDatabase :: (Mode mode) => Environment mode -> Maybe String -> IO (Database mode)
getDatabase :: forall mode.
Mode mode =>
Environment mode -> Maybe FilePath -> IO (Database mode)
getDatabase env :: Environment mode
env@(Environment Ptr MDB_env
penv) Maybe FilePath
name = do
Ptr MDB_txn
ptxn <- Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin Ptr MDB_env
penv forall a. Ptr a
nullPtr ([CUInt] -> CUInt
combineOptions forall a b. (a -> b) -> a -> b
$ [CUInt
mdb_rdonly | forall mode. Mode mode => Environment mode -> Bool
isReadOnlyEnvironment Environment mode
env])
MDB_dbi_t
dbi <- Ptr MDB_txn -> Maybe FilePath -> CUInt -> IO MDB_dbi_t
mdb_dbi_open Ptr MDB_txn
ptxn Maybe FilePath
name ([CUInt] -> CUInt
combineOptions forall a b. (a -> b) -> a -> b
$ [CUInt
mdb_create | Bool -> Bool
not forall a b. (a -> b) -> a -> b
$ forall mode. Mode mode => Environment mode -> Bool
isReadOnlyEnvironment Environment mode
env])
Ptr MDB_txn -> IO ()
mdb_txn_commit Ptr MDB_txn
ptxn
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall mode. Ptr MDB_env -> MDB_dbi_t -> Database mode
Database Ptr MDB_env
penv MDB_dbi_t
dbi
clearDatabase :: (Mode mode) => Database mode -> IO ()
clearDatabase :: forall mode. Mode mode => Database mode -> IO ()
clearDatabase (Database Ptr MDB_env
penv MDB_dbi_t
dbi) =
forall a. IO a -> IO (Async a)
asyncBound
( do
Ptr MDB_txn
ptxn <- Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin Ptr MDB_env
penv forall a. Ptr a
nullPtr CUInt
0
Ptr MDB_txn -> MDB_dbi_t -> IO ()
mdb_clear Ptr MDB_txn
ptxn MDB_dbi_t
dbi
Ptr MDB_txn -> IO ()
mdb_txn_commit Ptr MDB_txn
ptxn
)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. Async a -> IO a
wait
closeDatabase :: (Mode mode) => Database mode -> IO ()
closeDatabase :: forall mode. Mode mode => Database mode -> IO ()
closeDatabase (Database Ptr MDB_env
penv MDB_dbi_t
dbi) =
Ptr MDB_env -> MDB_dbi_t -> IO ()
c_mdb_dbi_close Ptr MDB_env
penv MDB_dbi_t
dbi
{-# INLINE readLMDB #-}
readLMDB ::
(MonadIO m, Mode mode) =>
Database mode ->
Maybe (ReadOnlyTxn, Cursor) ->
ReadOptions ->
Unfold m Void (ByteString, ByteString)
readLMDB :: forall (m :: * -> *) mode.
(MonadIO m, Mode mode) =>
Database mode
-> Maybe (ReadOnlyTxn, Cursor)
-> ReadOptions
-> Unfold m Void (ByteString, ByteString)
readLMDB Database mode
db Maybe (ReadOnlyTxn, Cursor)
mtxncurs ReadOptions
ropts = forall (m :: * -> *) mode k v.
(MonadIO m, Mode mode) =>
Database mode
-> Maybe (ReadOnlyTxn, Cursor)
-> ReadOptions
-> (CStringLen -> IO k)
-> (CStringLen -> IO v)
-> Unfold m Void (k, v)
unsafeReadLMDB Database mode
db Maybe (ReadOnlyTxn, Cursor)
mtxncurs ReadOptions
ropts CStringLen -> IO ByteString
packCStringLen CStringLen -> IO ByteString
packCStringLen
{-# INLINE unsafeReadLMDB #-}
unsafeReadLMDB ::
(MonadIO m, Mode mode) =>
Database mode ->
Maybe (ReadOnlyTxn, Cursor) ->
ReadOptions ->
(CStringLen -> IO k) ->
(CStringLen -> IO v) ->
Unfold m Void (k, v)
unsafeReadLMDB :: forall (m :: * -> *) mode k v.
(MonadIO m, Mode mode) =>
Database mode
-> Maybe (ReadOnlyTxn, Cursor)
-> ReadOptions
-> (CStringLen -> IO k)
-> (CStringLen -> IO v)
-> Unfold m Void (k, v)
unsafeReadLMDB (Database Ptr MDB_env
penv MDB_dbi_t
dbi) Maybe (ReadOnlyTxn, Cursor)
mtxncurs ReadOptions
ropts CStringLen -> IO k
kmap CStringLen -> IO v
vmap =
let (MDB_dbi_t
firstOp, MDB_dbi_t
subsequentOp) = case (ReadOptions -> ReadDirection
readDirection ReadOptions
ropts, ReadOptions -> Maybe ByteString
readStart ReadOptions
ropts) of
(ReadDirection
Forward, Maybe ByteString
Nothing) -> (MDB_dbi_t
mdb_first, MDB_dbi_t
mdb_next)
(ReadDirection
Forward, Just ByteString
_) -> (MDB_dbi_t
mdb_set_range, MDB_dbi_t
mdb_next)
(ReadDirection
Backward, Maybe ByteString
Nothing) -> (MDB_dbi_t
mdb_last, MDB_dbi_t
mdb_prev)
(ReadDirection
Backward, Just ByteString
_) -> (MDB_dbi_t
mdb_set_range, MDB_dbi_t
mdb_prev)
(Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
txn_begin, Ptr MDB_txn -> MDB_dbi_t -> IO (Ptr MDB_cursor)
cursor_open, Ptr MDB_cursor
-> Ptr MDB_val -> Ptr MDB_val -> MDB_dbi_t -> IO CInt
cursor_get, Ptr MDB_cursor -> IO ()
cursor_close, Ptr MDB_txn -> IO ()
txn_abort) =
if ReadOptions -> Bool
readUnsafeFFI ReadOptions
ropts
then
( Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin_unsafe,
Ptr MDB_txn -> MDB_dbi_t -> IO (Ptr MDB_cursor)
mdb_cursor_open_unsafe,
Ptr MDB_cursor
-> Ptr MDB_val -> Ptr MDB_val -> MDB_dbi_t -> IO CInt
c_mdb_cursor_get_unsafe,
Ptr MDB_cursor -> IO ()
c_mdb_cursor_close_unsafe,
Ptr MDB_txn -> IO ()
c_mdb_txn_abort_unsafe
)
else
( Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin,
Ptr MDB_txn -> MDB_dbi_t -> IO (Ptr MDB_cursor)
mdb_cursor_open,
Ptr MDB_cursor
-> Ptr MDB_val -> Ptr MDB_val -> MDB_dbi_t -> IO CInt
c_mdb_cursor_get,
Ptr MDB_cursor -> IO ()
c_mdb_cursor_close,
Ptr MDB_txn -> IO ()
c_mdb_txn_abort
)
supply :: c -> Unfold m c b -> Unfold m a b
supply = forall a c (m :: * -> *) b.
(a -> c) -> Unfold m c b -> Unfold m a b
lmap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> b -> a
const
in forall {c} {m :: * -> *} {b} {a}. c -> Unfold m c b -> Unfold m a b
supply MDB_dbi_t
firstOp forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
Unfold
( \(MDB_dbi_t
op, Ptr MDB_cursor
pcurs, Ptr MDB_val
pk, Ptr MDB_val
pv, IOFinalizer
ref) -> do
CInt
rc <-
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
if MDB_dbi_t
op forall a. Eq a => a -> a -> Bool
== MDB_dbi_t
mdb_set_range Bool -> Bool -> Bool
&& MDB_dbi_t
subsequentOp forall a. Eq a => a -> a -> Bool
== MDB_dbi_t
mdb_prev
then do
MDB_val
kfst' <- forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pk
ByteString
kfst <- CStringLen -> IO ByteString
packCStringLen (MDB_val -> Ptr CChar
mv_data MDB_val
kfst', forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
kfst')
CInt
rc <- Ptr MDB_cursor
-> Ptr MDB_val -> Ptr MDB_val -> MDB_dbi_t -> IO CInt
cursor_get Ptr MDB_cursor
pcurs Ptr MDB_val
pk Ptr MDB_val
pv MDB_dbi_t
op
if CInt
rc forall a. Eq a => a -> a -> Bool
/= CInt
0 Bool -> Bool -> Bool
&& CInt
rc forall a. Eq a => a -> a -> Bool
== CInt
mdb_notfound
then Ptr MDB_cursor
-> Ptr MDB_val -> Ptr MDB_val -> MDB_dbi_t -> IO CInt
cursor_get Ptr MDB_cursor
pcurs Ptr MDB_val
pk Ptr MDB_val
pv MDB_dbi_t
mdb_last
else
if CInt
rc forall a. Eq a => a -> a -> Bool
== CInt
0
then do
MDB_val
k' <- forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pk
ByteString
k <- CStringLen -> IO ByteString
unsafePackCStringLen (MDB_val -> Ptr CChar
mv_data MDB_val
k', forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
k')
if ByteString
k forall a. Eq a => a -> a -> Bool
/= ByteString
kfst
then Ptr MDB_cursor
-> Ptr MDB_val -> Ptr MDB_val -> MDB_dbi_t -> IO CInt
cursor_get Ptr MDB_cursor
pcurs Ptr MDB_val
pk Ptr MDB_val
pv MDB_dbi_t
mdb_prev
else forall (m :: * -> *) a. Monad m => a -> m a
return CInt
rc
else forall (m :: * -> *) a. Monad m => a -> m a
return CInt
rc
else Ptr MDB_cursor
-> Ptr MDB_val -> Ptr MDB_val -> MDB_dbi_t -> IO CInt
cursor_get Ptr MDB_cursor
pcurs Ptr MDB_val
pk Ptr MDB_val
pv MDB_dbi_t
op
Bool
found <-
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
if CInt
rc forall a. Eq a => a -> a -> Bool
/= CInt
0 Bool -> Bool -> Bool
&& CInt
rc forall a. Eq a => a -> a -> Bool
/= CInt
mdb_notfound
then do
forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref
forall noReturn. FilePath -> CInt -> IO noReturn
throwLMDBErrNum FilePath
"mdb_cursor_get" CInt
rc
else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ CInt
rc forall a. Eq a => a -> a -> Bool
/= CInt
mdb_notfound
if Bool
found
then do
!k
k <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ (\MDB_val
x -> CStringLen -> IO k
kmap (MDB_val -> Ptr CChar
mv_data MDB_val
x, forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
x)) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pk
!v
v <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ (\MDB_val
x -> CStringLen -> IO v
vmap (MDB_val -> Ptr CChar
mv_data MDB_val
x, forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
x)) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pv
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield (k
k, v
v) (MDB_dbi_t
subsequentOp, Ptr MDB_cursor
pcurs, Ptr MDB_val
pk, Ptr MDB_val
pv, IOFinalizer
ref)
else do
forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref
forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
)
( \MDB_dbi_t
op -> do
(Ptr MDB_cursor
pcurs, Ptr MDB_val
pk, Ptr MDB_val
pv, IOFinalizer
ref) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
forall a. IO a -> IO a
mask_ forall a b. (a -> b) -> a -> b
$ do
(Ptr MDB_txn
ptxn, Ptr MDB_cursor
pcurs) <- case Maybe (ReadOnlyTxn, Cursor)
mtxncurs of
Maybe (ReadOnlyTxn, Cursor)
Nothing -> do
Ptr MDB_txn
ptxn <- Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
txn_begin Ptr MDB_env
penv forall a. Ptr a
nullPtr CUInt
mdb_rdonly
Ptr MDB_cursor
pcurs <- Ptr MDB_txn -> MDB_dbi_t -> IO (Ptr MDB_cursor)
cursor_open Ptr MDB_txn
ptxn MDB_dbi_t
dbi
forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr MDB_txn
ptxn, Ptr MDB_cursor
pcurs)
Just (ReadOnlyTxn Ptr MDB_txn
ptxn, Cursor Ptr MDB_cursor
pcurs) ->
forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr MDB_txn
ptxn, Ptr MDB_cursor
pcurs)
Ptr MDB_val
pk <- forall a. Storable a => IO (Ptr a)
malloc
Ptr MDB_val
pv <- forall a. Storable a => IO (Ptr a)
malloc
()
_ <- case ReadOptions -> Maybe ByteString
readStart ReadOptions
ropts of
Maybe ByteString
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just ByteString
k -> forall a. ByteString -> (CStringLen -> IO a) -> IO a
unsafeUseAsCStringLen ByteString
k forall a b. (a -> b) -> a -> b
$ \(Ptr CChar
kp, Int
kl) ->
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr MDB_val
pk (CSize -> Ptr CChar -> MDB_val
MDB_val (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
kl) Ptr CChar
kp)
IOFinalizer
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m IOFinalizer
newIOFinalizer forall a b. (a -> b) -> a -> b
$ do
forall a. Ptr a -> IO ()
free Ptr MDB_val
pv forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. Ptr a -> IO ()
free Ptr MDB_val
pk
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Maybe a -> Bool
isNothing Maybe (ReadOnlyTxn, Cursor)
mtxncurs) forall a b. (a -> b) -> a -> b
$
Ptr MDB_cursor -> IO ()
cursor_close Ptr MDB_cursor
pcurs forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Ptr MDB_txn -> IO ()
txn_abort Ptr MDB_txn
ptxn
forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr MDB_cursor
pcurs, Ptr MDB_val
pk, Ptr MDB_val
pv, IOFinalizer
ref)
forall (m :: * -> *) a. Monad m => a -> m a
return (MDB_dbi_t
op, Ptr MDB_cursor
pcurs, Ptr MDB_val
pk, Ptr MDB_val
pv, IOFinalizer
ref)
)
newtype ReadOnlyTxn = ReadOnlyTxn (Ptr MDB_txn)
beginReadOnlyTxn :: Environment mode -> IO ReadOnlyTxn
beginReadOnlyTxn :: forall mode. Environment mode -> IO ReadOnlyTxn
beginReadOnlyTxn (Environment Ptr MDB_env
penv) = Ptr MDB_txn -> ReadOnlyTxn
ReadOnlyTxn forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin Ptr MDB_env
penv forall a. Ptr a
nullPtr CUInt
mdb_rdonly
abortReadOnlyTxn :: ReadOnlyTxn -> IO ()
abortReadOnlyTxn :: ReadOnlyTxn -> IO ()
abortReadOnlyTxn (ReadOnlyTxn Ptr MDB_txn
ptxn) = Ptr MDB_txn -> IO ()
c_mdb_txn_abort Ptr MDB_txn
ptxn
newtype Cursor = Cursor (Ptr MDB_cursor)
openCursor :: ReadOnlyTxn -> Database mode -> IO Cursor
openCursor :: forall mode. ReadOnlyTxn -> Database mode -> IO Cursor
openCursor (ReadOnlyTxn Ptr MDB_txn
ptxn) (Database Ptr MDB_env
_ MDB_dbi_t
dbi) =
Ptr MDB_cursor -> Cursor
Cursor forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ptr MDB_txn -> MDB_dbi_t -> IO (Ptr MDB_cursor)
mdb_cursor_open Ptr MDB_txn
ptxn MDB_dbi_t
dbi
closeCursor :: Cursor -> IO ()
closeCursor :: Cursor -> IO ()
closeCursor (Cursor Ptr MDB_cursor
pcurs) =
Ptr MDB_cursor -> IO ()
c_mdb_cursor_close Ptr MDB_cursor
pcurs
data ReadOptions = ReadOptions
{ ReadOptions -> ReadDirection
readDirection :: !ReadDirection,
ReadOptions -> Maybe ByteString
readStart :: !(Maybe ByteString),
ReadOptions -> Bool
readUnsafeFFI :: !Bool
}
deriving (Int -> ReadOptions -> ShowS
[ReadOptions] -> ShowS
ReadOptions -> FilePath
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [ReadOptions] -> ShowS
$cshowList :: [ReadOptions] -> ShowS
show :: ReadOptions -> FilePath
$cshow :: ReadOptions -> FilePath
showsPrec :: Int -> ReadOptions -> ShowS
$cshowsPrec :: Int -> ReadOptions -> ShowS
Show)
defaultReadOptions :: ReadOptions
defaultReadOptions :: ReadOptions
defaultReadOptions =
ReadOptions
{ readDirection :: ReadDirection
readDirection = ReadDirection
Forward,
readStart :: Maybe ByteString
readStart = forall a. Maybe a
Nothing,
readUnsafeFFI :: Bool
readUnsafeFFI = Bool
False
}
data ReadDirection = Forward | Backward deriving (Int -> ReadDirection -> ShowS
[ReadDirection] -> ShowS
ReadDirection -> FilePath
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [ReadDirection] -> ShowS
$cshowList :: [ReadDirection] -> ShowS
show :: ReadDirection -> FilePath
$cshow :: ReadDirection -> FilePath
showsPrec :: Int -> ReadDirection -> ShowS
$cshowsPrec :: Int -> ReadDirection -> ShowS
Show)
data OverwriteOptions
=
OverwriteAllow
|
OverwriteAllowSame
|
OverwriteDisallow
deriving (OverwriteOptions -> OverwriteOptions -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: OverwriteOptions -> OverwriteOptions -> Bool
$c/= :: OverwriteOptions -> OverwriteOptions -> Bool
== :: OverwriteOptions -> OverwriteOptions -> Bool
$c== :: OverwriteOptions -> OverwriteOptions -> Bool
Eq)
data WriteOptions = WriteOptions
{
WriteOptions -> Int
writeTransactionSize :: !Int,
WriteOptions -> OverwriteOptions
writeOverwriteOptions :: !OverwriteOptions,
WriteOptions -> Bool
writeAppend :: !Bool,
WriteOptions -> Bool
writeUnsafeFFI :: !Bool
}
defaultWriteOptions :: WriteOptions
defaultWriteOptions :: WriteOptions
defaultWriteOptions =
WriteOptions
{ writeTransactionSize :: Int
writeTransactionSize = Int
1,
writeOverwriteOptions :: OverwriteOptions
writeOverwriteOptions = OverwriteOptions
OverwriteAllow,
writeAppend :: Bool
writeAppend = Bool
False,
writeUnsafeFFI :: Bool
writeUnsafeFFI = Bool
False
}
newtype ExceptionString = ExceptionString String deriving (Int -> ExceptionString -> ShowS
[ExceptionString] -> ShowS
ExceptionString -> FilePath
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [ExceptionString] -> ShowS
$cshowList :: [ExceptionString] -> ShowS
show :: ExceptionString -> FilePath
$cshow :: ExceptionString -> FilePath
showsPrec :: Int -> ExceptionString -> ShowS
$cshowsPrec :: Int -> ExceptionString -> ShowS
Show)
instance Exception ExceptionString
{-# INLINE writeLMDB #-}
writeLMDB :: (MonadIO m) => Database ReadWrite -> WriteOptions -> Fold m (ByteString, ByteString) ()
writeLMDB :: forall (m :: * -> *).
MonadIO m =>
Database ReadWrite
-> WriteOptions -> Fold m (ByteString, ByteString) ()
writeLMDB (Database Ptr MDB_env
penv MDB_dbi_t
dbi) WriteOptions
wopts =
let txnSize :: Int
txnSize = forall a. Ord a => a -> a -> a
max Int
1 (WriteOptions -> Int
writeTransactionSize WriteOptions
wopts)
overwriteOpt :: OverwriteOptions
overwriteOpt = WriteOptions -> OverwriteOptions
writeOverwriteOptions WriteOptions
wopts
flags :: CUInt
flags =
[CUInt] -> CUInt
combineOptions forall a b. (a -> b) -> a -> b
$
[CUInt
mdb_nooverwrite | OverwriteOptions
overwriteOpt forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [OverwriteOptions
OverwriteAllowSame, OverwriteOptions
OverwriteDisallow]]
forall a. [a] -> [a] -> [a]
++ [CUInt
mdb_append | WriteOptions -> Bool
writeAppend WriteOptions
wopts]
(Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
txn_begin, Ptr MDB_txn -> IO ()
txn_commit, Ptr MDB_txn
-> MDB_dbi_t
-> Ptr CChar
-> CSize
-> Ptr CChar
-> CSize
-> CUInt
-> IO ()
put_, Ptr MDB_txn -> MDB_dbi_t -> Ptr MDB_val -> Ptr MDB_val -> IO CInt
get) =
if WriteOptions -> Bool
writeUnsafeFFI WriteOptions
wopts
then (Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin_unsafe, Ptr MDB_txn -> IO ()
mdb_txn_commit_unsafe, Ptr MDB_txn
-> MDB_dbi_t
-> Ptr CChar
-> CSize
-> Ptr CChar
-> CSize
-> CUInt
-> IO ()
mdb_put_unsafe_, Ptr MDB_txn -> MDB_dbi_t -> Ptr MDB_val -> Ptr MDB_val -> IO CInt
c_mdb_get_unsafe)
else (Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin, Ptr MDB_txn -> IO ()
mdb_txn_commit, Ptr MDB_txn
-> MDB_dbi_t
-> Ptr CChar
-> CSize
-> Ptr CChar
-> CSize
-> CUInt
-> IO ()
mdb_put_, Ptr MDB_txn -> MDB_dbi_t -> Ptr MDB_val -> Ptr MDB_val -> IO CInt
c_mdb_get)
in forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold
( \(ThreadId
threadId, Int
iter, Int
currChunkSz, Maybe (Ptr MDB_txn, IOFinalizer)
mtxn) (ByteString
k, ByteString
v) -> do
Int
iter' <-
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
if Int
iter forall a. Ord a => a -> a -> Bool
< Int
3
then do
ThreadId
threadId' <- IO ThreadId
myThreadId
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ThreadId
threadId' forall a. Eq a => a -> a -> Bool
/= ThreadId
threadId) forall a b. (a -> b) -> a -> b
$
forall a e. Exception e => e -> a
throw
(FilePath -> ExceptionString
ExceptionString FilePath
"Error: writeLMDB veered off the original bound thread")
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int
iter forall a. Num a => a -> a -> a
+ Int
1
else forall (m :: * -> *) a. Monad m => a -> m a
return Int
iter
Int
currChunkSz' <-
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
if Int
currChunkSz forall a. Ord a => a -> a -> Bool
>= Int
txnSize
then do
let (Ptr MDB_txn
_, IOFinalizer
ref) = forall a. HasCallStack => Maybe a -> a
fromJust Maybe (Ptr MDB_txn, IOFinalizer)
mtxn
forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref
forall (m :: * -> *) a. Monad m => a -> m a
return Int
0
else forall (m :: * -> *) a. Monad m => a -> m a
return Int
currChunkSz
(Ptr MDB_txn
ptxn, IOFinalizer
ref) <-
if Int
currChunkSz' forall a. Eq a => a -> a -> Bool
== Int
0
then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
forall a. IO a -> IO a
mask_ forall a b. (a -> b) -> a -> b
$ do
Ptr MDB_txn
ptxn <- Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
txn_begin Ptr MDB_env
penv forall a. Ptr a
nullPtr CUInt
0
IOFinalizer
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m IOFinalizer
newIOFinalizer forall a b. (a -> b) -> a -> b
$ Ptr MDB_txn -> IO ()
txn_commit Ptr MDB_txn
ptxn
forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr MDB_txn
ptxn, IOFinalizer
ref)
else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => Maybe a -> a
fromJust Maybe (Ptr MDB_txn, IOFinalizer)
mtxn
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
forall a. ByteString -> (CStringLen -> IO a) -> IO a
unsafeUseAsCStringLen ByteString
k forall a b. (a -> b) -> a -> b
$ \(Ptr CChar
kp, Int
kl) -> forall a. ByteString -> (CStringLen -> IO a) -> IO a
unsafeUseAsCStringLen ByteString
v forall a b. (a -> b) -> a -> b
$ \(Ptr CChar
vp, Int
vl) ->
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
(Ptr MDB_txn
-> MDB_dbi_t
-> Ptr CChar
-> CSize
-> Ptr CChar
-> CSize
-> CUInt
-> IO ()
put_ Ptr MDB_txn
ptxn MDB_dbi_t
dbi Ptr CChar
kp (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
kl) Ptr CChar
vp (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
vl) CUInt
flags)
( \(LMDB_Error
e :: LMDB_Error) -> do
Bool
ok <- forall a b. Storable a => a -> (Ptr a -> IO b) -> IO b
with (CSize -> Ptr CChar -> MDB_val
MDB_val (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
kl) Ptr CChar
kp) forall a b. (a -> b) -> a -> b
$ \Ptr MDB_val
pk ->
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca forall a b. (a -> b) -> a -> b
$ \Ptr MDB_val
pv -> do
CInt
rc <- Ptr MDB_txn -> MDB_dbi_t -> Ptr MDB_val -> Ptr MDB_val -> IO CInt
get Ptr MDB_txn
ptxn MDB_dbi_t
dbi Ptr MDB_val
pk Ptr MDB_val
pv
if CInt
rc forall a. Eq a => a -> a -> Bool
== CInt
0
then do
MDB_val
v' <- forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pv
ByteString
vbs <- CStringLen -> IO ByteString
unsafePackCStringLen (MDB_val -> Ptr CChar
mv_data MDB_val
v', forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
v')
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
OverwriteOptions
overwriteOpt forall a. Eq a => a -> a -> Bool
== OverwriteOptions
OverwriteAllowSame
Bool -> Bool -> Bool
&& LMDB_Error -> Either Int MDB_ErrCode
e_code LMDB_Error
e forall a. Eq a => a -> a -> Bool
== forall a b. b -> Either a b
Right MDB_ErrCode
MDB_KEYEXIST
Bool -> Bool -> Bool
&& ByteString
vbs forall a. Eq a => a -> a -> Bool
== ByteString
v
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
ok forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a e. Exception e => e -> a
throw LMDB_Error
e
)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
Partial (ThreadId
threadId, Int
iter', Int
currChunkSz' forall a. Num a => a -> a -> a
+ Int
1, forall a. a -> Maybe a
Just (Ptr MDB_txn
ptxn, IOFinalizer
ref))
)
( do
Bool
isBound <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Bool
isCurrentThreadBound
ThreadId
threadId <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId
if Bool
isBound
then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
Partial (ThreadId
threadId, Int
0 :: Int, Int
0, forall a. Maybe a
Nothing)
else forall a e. Exception e => e -> a
throw forall a b. (a -> b) -> a -> b
$ FilePath -> ExceptionString
ExceptionString FilePath
"Error: writeLMDB should be executed on a bound thread"
)
( \(ThreadId
threadId, Int
_, Int
_, Maybe (Ptr MDB_txn, IOFinalizer)
mtxn) -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
ThreadId
threadId' <- IO ThreadId
myThreadId
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ThreadId
threadId' forall a. Eq a => a -> a -> Bool
/= ThreadId
threadId) forall a b. (a -> b) -> a -> b
$
forall a e. Exception e => e -> a
throw
(FilePath -> ExceptionString
ExceptionString FilePath
"Error: writeLMDB veered off the original bound thread at the end")
case Maybe (Ptr MDB_txn, IOFinalizer)
mtxn of
Maybe (Ptr MDB_txn, IOFinalizer)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just (Ptr MDB_txn
_, IOFinalizer
ref) -> forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref
)