{-# LANGUAGE BangPatterns, ScopedTypeVariables #-}

-- | The functionality for the limits and getting the environment and database were mostly
-- obtained from the [lmdb-simple](https://hackage.haskell.org/package/lmdb-simple) library.
module Streamly.External.LMDB
    (
    -- ** Types
    Database,
    Environment,
    Limits(..),
    LMDB_Error(..),
    MDB_ErrCode(..),
    Mode,
    ReadWrite,
    ReadOnly,
    ReadDirection(..),
    ReadOptions(..),
    OverwriteOptions(..),
    WriteOptions(..),

    -- ** Environment and database
    defaultLimits,
    openEnvironment,
    isReadOnlyEnvironment,
    getDatabase,

    -- ** Utility
    gibibyte,
    tebibyte,
    clearDatabase,

    -- ** Reading
    defaultReadOptions,
    readLMDB,
    unsafeReadLMDB,

    -- ** Writing
    defaultWriteOptions,
    writeLMDB) where

import Control.Concurrent (isCurrentThreadBound, myThreadId)
import Control.Concurrent.Async (asyncBound, wait)
import Control.Exception (Exception, catch, tryJust, mask_, throw)
import Control.Monad (guard, unless, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString, packCStringLen)
import Data.ByteString.Unsafe (unsafePackCStringLen, unsafeUseAsCStringLen)
import Data.Maybe (fromJust)
import Data.Void (Void)
import Foreign (Ptr, alloca, free, malloc, nullPtr, peek)
import Foreign.C (Errno (Errno), eNOTDIR)
import Foreign.C.String (CStringLen)
import Foreign.Marshal.Utils (with)
import Foreign.Storable (poke)
import Streamly.Internal.Data.Fold (Fold (Fold))
import Streamly.Internal.Data.Stream.StreamD (newFinalizedIORef, runIORefFinalizer)
import Streamly.Internal.Data.Stream.StreamD.Type (Step (Stop, Yield))
import Streamly.Internal.Data.Unfold (supply)
import Streamly.Internal.Data.Unfold.Types (Unfold (Unfold))

import Streamly.External.LMDB.Internal
import Streamly.External.LMDB.Internal.Foreign

newtype Environment mode = Environment (Ptr MDB_env)

isReadOnlyEnvironment :: Mode mode => Environment mode -> Bool
isReadOnlyEnvironment :: Environment mode -> Bool
isReadOnlyEnvironment = mode -> Bool
forall a. Mode a => a -> Bool
isReadOnlyMode (mode -> Bool)
-> (Environment mode -> mode) -> Environment mode -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Environment mode -> mode
forall mode. Environment mode -> mode
mode
    where
        mode :: Environment mode -> mode
        mode :: Environment mode -> mode
mode = Environment mode -> mode
forall a. HasCallStack => a
undefined

-- | LMDB environments have various limits on the size and number of databases and concurrent readers.
data Limits = Limits
    { Limits -> Int
mapSize      :: !Int  -- ^ Memory map size, in bytes (also the maximum size of all databases).
    , Limits -> Int
maxDatabases :: !Int  -- ^ Maximum number of named databases.
    , Limits -> Int
maxReaders   :: !Int  -- ^ Maximum number of concurrent 'ReadOnly' transactions
                            --   (also the number of slots in the lock table).
  }

-- | The default limits are 1 MiB map size, 0 named databases, and 126 concurrent readers. These can be adjusted
-- freely, and in particular the 'mapSize' may be set very large (limited only by available address space). However,
-- LMDB is not optimized for a large number of named databases so 'maxDatabases' should be kept to a minimum.
--
-- The default 'mapSize' is intentionally small, and should be changed to something appropriate for your application.
-- It ought to be a multiple of the OS page size, and should be chosen as large as possible to accommodate future
-- growth of the database(s). Once set for an environment, this limit cannot be reduced to a value smaller than
-- the space already consumed by the environment, however it can later be increased.
--
-- If you are going to use any named databases then you will need to change 'maxDatabases'
-- to the number of named databases you plan to use. However, you do not need to change
-- this field if you are only going to use the single main (unnamed) database.
defaultLimits :: Limits
defaultLimits :: Limits
defaultLimits = $WLimits :: Int -> Int -> Int -> Limits
Limits
    { mapSize :: Int
mapSize      = 1024 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1024  -- 1 MiB.
    , maxDatabases :: Int
maxDatabases = 0
    , maxReaders :: Int
maxReaders   = 126
    }

-- A convenience constant for obtaining a 1 GiB map size.
gibibyte :: Int
gibibyte :: Int
gibibyte = 1024 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1024 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1024

-- A convenience constant for obtaining a 1 TiB map size.
tebibyte :: Int
tebibyte :: Int
tebibyte = 1024 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1024 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1024 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1024

-- | Open an LMDB environment in either 'ReadWrite' or 'ReadOnly' mode. The 'FilePath' argument
-- may be either a directory or a regular file, but it must already exist. If a regular file,
-- an additional file with "-lock" appended to the name is used for the reader lock table.
--
-- Note that an environment must have been opened in 'ReadWrite'
-- mode at least once before it can be opened in 'ReadOnly' mode.
--
-- An environment opened in 'ReadOnly' mode may still modify the reader lock table
-- (except when the filesystem is read-only, in which case no locks are used).
openEnvironment :: Mode mode => FilePath -> Limits -> IO (Environment mode)
openEnvironment :: FilePath -> Limits -> IO (Environment mode)
openEnvironment path :: FilePath
path limits :: Limits
limits = do
    Ptr MDB_env
penv <- IO (Ptr MDB_env)
mdb_env_create

    Ptr MDB_env -> Int -> IO ()
mdb_env_set_mapsize Ptr MDB_env
penv (Limits -> Int
mapSize Limits
limits)
    let maxDbs :: Int
maxDbs = Limits -> Int
maxDatabases Limits
limits in Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
maxDbs Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= 0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Ptr MDB_env -> Int -> IO ()
mdb_env_set_maxdbs Ptr MDB_env
penv Int
maxDbs
    Ptr MDB_env -> Int -> IO ()
mdb_env_set_maxreaders Ptr MDB_env
penv (Limits -> Int
maxReaders Limits
limits)

    -- Always use MDB_NOTLS.
    let env :: Environment mode
env = Ptr MDB_env -> Environment mode
forall mode. Ptr MDB_env -> Environment mode
Environment Ptr MDB_env
penv :: Mode mode => Environment mode
        flags :: [CUInt]
flags = CUInt
mdb_notls CUInt -> [CUInt] -> [CUInt]
forall a. a -> [a] -> [a]
: [CUInt
mdb_rdonly | Environment mode -> Bool
forall mode. Mode mode => Environment mode -> Bool
isReadOnlyEnvironment Environment mode
env]

    let isNotDirectoryError :: LMDB_Error -> Bool
        isNotDirectoryError :: LMDB_Error -> Bool
isNotDirectoryError LMDB_Error { e_code :: LMDB_Error -> Either Int MDB_ErrCode
e_code = Left code :: Int
code }
            | CInt -> Errno
Errno (Int -> CInt
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
code) Errno -> Errno -> Bool
forall a. Eq a => a -> a -> Bool
== Errno
eNOTDIR = Bool
True
        isNotDirectoryError _                      = Bool
False

    Either () ()
r <- (LMDB_Error -> Maybe ()) -> IO () -> IO (Either () ())
forall e b a.
Exception e =>
(e -> Maybe b) -> IO a -> IO (Either b a)
tryJust (Bool -> Maybe ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (Bool -> Maybe ())
-> (LMDB_Error -> Bool) -> LMDB_Error -> Maybe ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LMDB_Error -> Bool
isNotDirectoryError) (IO () -> IO (Either () ())) -> IO () -> IO (Either () ())
forall a b. (a -> b) -> a -> b
$ Ptr MDB_env -> FilePath -> CUInt -> IO ()
mdb_env_open Ptr MDB_env
penv FilePath
path ([CUInt] -> CUInt
combineOptions [CUInt]
flags)
    case Either () ()
r of
        Left  _ -> Ptr MDB_env -> FilePath -> CUInt -> IO ()
mdb_env_open Ptr MDB_env
penv FilePath
path ([CUInt] -> CUInt
combineOptions ([CUInt] -> CUInt) -> [CUInt] -> CUInt
forall a b. (a -> b) -> a -> b
$ CUInt
mdb_nosubdir CUInt -> [CUInt] -> [CUInt]
forall a. a -> [a] -> [a]
: [CUInt]
flags)
        Right _ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    Environment mode -> IO (Environment mode)
forall (m :: * -> *) a. Monad m => a -> m a
return Environment mode
env

getDatabase :: (Mode mode) => Environment mode -> Maybe String -> IO (Database mode)
getDatabase :: Environment mode -> Maybe FilePath -> IO (Database mode)
getDatabase env :: Environment mode
env@(Environment penv :: Ptr MDB_env
penv) name :: Maybe FilePath
name = do
    Ptr MDB_txn
ptxn <- Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin Ptr MDB_env
penv Ptr MDB_txn
forall a. Ptr a
nullPtr ([CUInt] -> CUInt
combineOptions ([CUInt] -> CUInt) -> [CUInt] -> CUInt
forall a b. (a -> b) -> a -> b
$ [CUInt
mdb_rdonly | Environment mode -> Bool
forall mode. Mode mode => Environment mode -> Bool
isReadOnlyEnvironment Environment mode
env])
    MDB_dbi_t
dbi <- Ptr MDB_txn -> Maybe FilePath -> CUInt -> IO MDB_dbi_t
mdb_dbi_open Ptr MDB_txn
ptxn Maybe FilePath
name ([CUInt] -> CUInt
combineOptions ([CUInt] -> CUInt) -> [CUInt] -> CUInt
forall a b. (a -> b) -> a -> b
$ [CUInt
mdb_create | Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ Environment mode -> Bool
forall mode. Mode mode => Environment mode -> Bool
isReadOnlyEnvironment Environment mode
env])
    Ptr MDB_txn -> IO ()
mdb_txn_commit Ptr MDB_txn
ptxn
    Database mode -> IO (Database mode)
forall (m :: * -> *) a. Monad m => a -> m a
return (Database mode -> IO (Database mode))
-> Database mode -> IO (Database mode)
forall a b. (a -> b) -> a -> b
$ Ptr MDB_env -> MDB_dbi_t -> Database mode
forall mode. Ptr MDB_env -> MDB_dbi_t -> Database mode
Database Ptr MDB_env
penv MDB_dbi_t
dbi

-- | Clears, i.e., removes all key-value pairs from, the given database.
clearDatabase :: (Mode mode) => Database mode -> IO ()
clearDatabase :: Database mode -> IO ()
clearDatabase (Database penv :: Ptr MDB_env
penv dbi :: MDB_dbi_t
dbi) = IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
asyncBound (do
    Ptr MDB_txn
ptxn <- Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin Ptr MDB_env
penv Ptr MDB_txn
forall a. Ptr a
nullPtr 0
    Ptr MDB_txn -> MDB_dbi_t -> IO ()
mdb_clear Ptr MDB_txn
ptxn MDB_dbi_t
dbi
    Ptr MDB_txn -> IO ()
mdb_txn_commit Ptr MDB_txn
ptxn) IO (Async ()) -> (Async () -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Async () -> IO ()
forall a. Async a -> IO a
wait

-- | Direction of key iteration.
data ReadDirection = Forward | Backward deriving (Int -> ReadDirection -> ShowS
[ReadDirection] -> ShowS
ReadDirection -> FilePath
(Int -> ReadDirection -> ShowS)
-> (ReadDirection -> FilePath)
-> ([ReadDirection] -> ShowS)
-> Show ReadDirection
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [ReadDirection] -> ShowS
$cshowList :: [ReadDirection] -> ShowS
show :: ReadDirection -> FilePath
$cshow :: ReadDirection -> FilePath
showsPrec :: Int -> ReadDirection -> ShowS
$cshowsPrec :: Int -> ReadDirection -> ShowS
Show)

data ReadOptions = ReadOptions
    { ReadOptions -> ReadDirection
readDirection :: !ReadDirection
    -- | If 'Nothing', a forward [backward] iteration starts at the beginning [end] of the database.
    -- Otherwise, it starts at the first key that is greater [less] than or equal to the 'Just' key.
    , ReadOptions -> Maybe ByteString
readStart     :: !(Maybe ByteString) } deriving (Int -> ReadOptions -> ShowS
[ReadOptions] -> ShowS
ReadOptions -> FilePath
(Int -> ReadOptions -> ShowS)
-> (ReadOptions -> FilePath)
-> ([ReadOptions] -> ShowS)
-> Show ReadOptions
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [ReadOptions] -> ShowS
$cshowList :: [ReadOptions] -> ShowS
show :: ReadOptions -> FilePath
$cshow :: ReadOptions -> FilePath
showsPrec :: Int -> ReadOptions -> ShowS
$cshowsPrec :: Int -> ReadOptions -> ShowS
Show)

defaultReadOptions :: ReadOptions
defaultReadOptions :: ReadOptions
defaultReadOptions = $WReadOptions :: ReadDirection -> Maybe ByteString -> ReadOptions
ReadOptions
    { readDirection :: ReadDirection
readDirection = ReadDirection
Forward
    , readStart :: Maybe ByteString
readStart = Maybe ByteString
forall a. Maybe a
Nothing }

-- | Creates an unfold with which we can stream key-value pairs from the given database.
--
-- A read transaction is kept open for the duration of the unfold; one should therefore
-- bear in mind LMDB's [caveats regarding long-lived transactions](https://git.io/JJZE6).
--
-- If you don’t want the overhead of intermediate 'ByteString's (on your
-- way to your eventual data structures), use 'unsafeReadLMDB' instead.
{-# INLINE readLMDB #-}
readLMDB :: (MonadIO m, Mode mode) => Database mode -> ReadOptions -> Unfold m Void (ByteString, ByteString)
readLMDB :: Database mode
-> ReadOptions -> Unfold m Void (ByteString, ByteString)
readLMDB db :: Database mode
db ropts :: ReadOptions
ropts = Database mode
-> ReadOptions
-> (CStringLen -> IO ByteString)
-> (CStringLen -> IO ByteString)
-> Unfold m Void (ByteString, ByteString)
forall (m :: * -> *) mode k v.
(MonadIO m, Mode mode) =>
Database mode
-> ReadOptions
-> (CStringLen -> IO k)
-> (CStringLen -> IO v)
-> Unfold m Void (k, v)
unsafeReadLMDB Database mode
db ReadOptions
ropts CStringLen -> IO ByteString
packCStringLen CStringLen -> IO ByteString
packCStringLen

-- | Creates an unfold with which we can stream key-value pairs from the given database.
--
-- A read transaction is kept open for the duration of the unfold; one should therefore
-- bear in mind LMDB's [caveats regarding long-lived transactions](https://git.io/JJZE6).
--
-- To ensure safety, make sure that the memory pointed to by the 'CStringLen' for each key/value mapping function
-- call is (a) only read (and not written to); and (b) not used after the mapping function has returned. One way to
-- transform the 'CStringLen's to your desired data structures is to use 'Data.ByteString.Unsafe.unsafePackCStringLen'.
{-# INLINE unsafeReadLMDB #-}
unsafeReadLMDB :: (MonadIO m, Mode mode)
               => Database mode
               -> ReadOptions
               -> (CStringLen -> IO k)
               -> (CStringLen -> IO v)
               -> Unfold m Void (k, v)
unsafeReadLMDB :: Database mode
-> ReadOptions
-> (CStringLen -> IO k)
-> (CStringLen -> IO v)
-> Unfold m Void (k, v)
unsafeReadLMDB (Database penv :: Ptr MDB_env
penv dbi :: MDB_dbi_t
dbi) ropts :: ReadOptions
ropts kmap :: CStringLen -> IO k
kmap vmap :: CStringLen -> IO v
vmap =
    let (firstOp :: MDB_dbi_t
firstOp, subsequentOp :: MDB_dbi_t
subsequentOp) = case (ReadOptions -> ReadDirection
readDirection ReadOptions
ropts, ReadOptions -> Maybe ByteString
readStart ReadOptions
ropts) of
            (Forward, Nothing) -> (MDB_dbi_t
mdb_first, MDB_dbi_t
mdb_next)
            (Forward, Just _) -> (MDB_dbi_t
mdb_set_range, MDB_dbi_t
mdb_next)
            (Backward, Nothing) -> (MDB_dbi_t
mdb_last, MDB_dbi_t
mdb_prev)
            (Backward, Just _) ->  (MDB_dbi_t
mdb_set_range, MDB_dbi_t
mdb_prev)
    in (Unfold m MDB_dbi_t (k, v) -> MDB_dbi_t -> Unfold m Void (k, v))
-> MDB_dbi_t -> Unfold m MDB_dbi_t (k, v) -> Unfold m Void (k, v)
forall a b c. (a -> b -> c) -> b -> a -> c
flip Unfold m MDB_dbi_t (k, v) -> MDB_dbi_t -> Unfold m Void (k, v)
forall (m :: * -> *) a b. Unfold m a b -> a -> Unfold m Void b
supply MDB_dbi_t
firstOp (Unfold m MDB_dbi_t (k, v) -> Unfold m Void (k, v))
-> Unfold m MDB_dbi_t (k, v) -> Unfold m Void (k, v)
forall a b. (a -> b) -> a -> b
$ ((MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
  IORef (Maybe (IO ())))
 -> m (Step
         (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
          IORef (Maybe (IO ())))
         (k, v)))
-> (MDB_dbi_t
    -> m (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
          IORef (Maybe (IO ()))))
-> Unfold m MDB_dbi_t (k, v)
forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
Unfold
        (\(op :: MDB_dbi_t
op, pcurs :: Ptr MDB_cursor
pcurs, pk :: Ptr MDB_val
pk, pv :: Ptr MDB_val
pv, ref :: IORef (Maybe (IO ()))
ref) -> do
            CInt
rc <- IO CInt -> m CInt
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO CInt -> m CInt) -> IO CInt -> m CInt
forall a b. (a -> b) -> a -> b
$
                if MDB_dbi_t
op MDB_dbi_t -> MDB_dbi_t -> Bool
forall a. Eq a => a -> a -> Bool
== MDB_dbi_t
mdb_set_range Bool -> Bool -> Bool
&& MDB_dbi_t
subsequentOp MDB_dbi_t -> MDB_dbi_t -> Bool
forall a. Eq a => a -> a -> Bool
== MDB_dbi_t
mdb_prev then do
                    -- Reverse MDB_SET_RANGE.
                    MDB_val
kfst' <- Ptr MDB_val -> IO MDB_val
forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pk
                    ByteString
kfst <- CStringLen -> IO ByteString
packCStringLen (MDB_val -> Ptr CChar
mv_data MDB_val
kfst', CSize -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CSize -> Int) -> CSize -> Int
forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
kfst')
                    CInt
rc <- Ptr MDB_cursor
-> Ptr MDB_val -> Ptr MDB_val -> MDB_dbi_t -> IO CInt
c_mdb_cursor_get Ptr MDB_cursor
pcurs Ptr MDB_val
pk Ptr MDB_val
pv MDB_dbi_t
op
                    if CInt
rc CInt -> CInt -> Bool
forall a. Eq a => a -> a -> Bool
/= 0 Bool -> Bool -> Bool
&& CInt
rc CInt -> CInt -> Bool
forall a. Eq a => a -> a -> Bool
== CInt
mdb_notfound then
                        Ptr MDB_cursor
-> Ptr MDB_val -> Ptr MDB_val -> MDB_dbi_t -> IO CInt
c_mdb_cursor_get Ptr MDB_cursor
pcurs Ptr MDB_val
pk Ptr MDB_val
pv MDB_dbi_t
mdb_last
                    else if CInt
rc CInt -> CInt -> Bool
forall a. Eq a => a -> a -> Bool
== 0 then do
                        MDB_val
k' <- Ptr MDB_val -> IO MDB_val
forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pk
                        ByteString
k <- CStringLen -> IO ByteString
unsafePackCStringLen (MDB_val -> Ptr CChar
mv_data MDB_val
k', CSize -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CSize -> Int) -> CSize -> Int
forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
k')
                        if ByteString
k ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
kfst then
                            Ptr MDB_cursor
-> Ptr MDB_val -> Ptr MDB_val -> MDB_dbi_t -> IO CInt
c_mdb_cursor_get Ptr MDB_cursor
pcurs Ptr MDB_val
pk Ptr MDB_val
pv MDB_dbi_t
mdb_prev
                        else
                            CInt -> IO CInt
forall (m :: * -> *) a. Monad m => a -> m a
return CInt
rc
                    else
                        CInt -> IO CInt
forall (m :: * -> *) a. Monad m => a -> m a
return CInt
rc
                else
                    Ptr MDB_cursor
-> Ptr MDB_val -> Ptr MDB_val -> MDB_dbi_t -> IO CInt
c_mdb_cursor_get Ptr MDB_cursor
pcurs Ptr MDB_val
pk Ptr MDB_val
pv MDB_dbi_t
op

            Bool
found <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$
                if CInt
rc CInt -> CInt -> Bool
forall a. Eq a => a -> a -> Bool
/= 0 Bool -> Bool -> Bool
&& CInt
rc CInt -> CInt -> Bool
forall a. Eq a => a -> a -> Bool
/= CInt
mdb_notfound then do
                    IORef (Maybe (IO ())) -> IO ()
forall (m :: * -> *). MonadIO m => IORef (Maybe (IO ())) -> m ()
runIORefFinalizer IORef (Maybe (IO ()))
ref
                    FilePath -> CInt -> IO Bool
forall noReturn. FilePath -> CInt -> IO noReturn
throwLMDBErrNum "mdb_cursor_get" CInt
rc
                else
                    Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ CInt
rc CInt -> CInt -> Bool
forall a. Eq a => a -> a -> Bool
/= CInt
mdb_notfound

            if Bool
found then do
                !k
k <- IO k -> m k
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO k -> m k) -> IO k -> m k
forall a b. (a -> b) -> a -> b
$ (\x :: MDB_val
x -> CStringLen -> IO k
kmap (MDB_val -> Ptr CChar
mv_data MDB_val
x, CSize -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CSize -> Int) -> CSize -> Int
forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
x)) (MDB_val -> IO k) -> IO MDB_val -> IO k
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Ptr MDB_val -> IO MDB_val
forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pk
                !v
v <- IO v -> m v
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO v -> m v) -> IO v -> m v
forall a b. (a -> b) -> a -> b
$ (\x :: MDB_val
x -> CStringLen -> IO v
vmap (MDB_val -> Ptr CChar
mv_data MDB_val
x, CSize -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CSize -> Int) -> CSize -> Int
forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
x)) (MDB_val -> IO v) -> IO MDB_val -> IO v
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Ptr MDB_val -> IO MDB_val
forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pv
                Step
  (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
   IORef (Maybe (IO ())))
  (k, v)
-> m (Step
        (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
         IORef (Maybe (IO ())))
        (k, v))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step
   (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
    IORef (Maybe (IO ())))
   (k, v)
 -> m (Step
         (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
          IORef (Maybe (IO ())))
         (k, v)))
-> Step
     (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
      IORef (Maybe (IO ())))
     (k, v)
-> m (Step
        (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
         IORef (Maybe (IO ())))
        (k, v))
forall a b. (a -> b) -> a -> b
$ (k, v)
-> (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
    IORef (Maybe (IO ())))
-> Step
     (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
      IORef (Maybe (IO ())))
     (k, v)
forall s a. a -> s -> Step s a
Yield (k
k, v
v) (MDB_dbi_t
subsequentOp, Ptr MDB_cursor
pcurs, Ptr MDB_val
pk, Ptr MDB_val
pv, IORef (Maybe (IO ()))
ref)
            else do
                IORef (Maybe (IO ())) -> m ()
forall (m :: * -> *). MonadIO m => IORef (Maybe (IO ())) -> m ()
runIORefFinalizer IORef (Maybe (IO ()))
ref
                Step
  (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
   IORef (Maybe (IO ())))
  (k, v)
-> m (Step
        (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
         IORef (Maybe (IO ())))
        (k, v))
forall (m :: * -> *) a. Monad m => a -> m a
return Step
  (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
   IORef (Maybe (IO ())))
  (k, v)
forall s a. Step s a
Stop)
        (\op :: MDB_dbi_t
op -> do
            (pcurs :: Ptr MDB_cursor
pcurs, pk :: Ptr MDB_val
pk, pv :: Ptr MDB_val
pv, ref :: IORef (Maybe (IO ()))
ref) <- IO
  (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
-> m (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
      IORef (Maybe (IO ())))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
   (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
 -> m (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
       IORef (Maybe (IO ()))))
-> IO
     (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
-> m (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
      IORef (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ IO
  (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
-> IO
     (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
forall a. IO a -> IO a
mask_ (IO
   (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
 -> IO
      (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ()))))
-> IO
     (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
-> IO
     (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ do
                Ptr MDB_txn
ptxn <- IO (Ptr MDB_txn) -> IO (Ptr MDB_txn)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Ptr MDB_txn) -> IO (Ptr MDB_txn))
-> IO (Ptr MDB_txn) -> IO (Ptr MDB_txn)
forall a b. (a -> b) -> a -> b
$ Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin Ptr MDB_env
penv Ptr MDB_txn
forall a. Ptr a
nullPtr CUInt
mdb_rdonly
                Ptr MDB_cursor
pcurs <- IO (Ptr MDB_cursor) -> IO (Ptr MDB_cursor)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Ptr MDB_cursor) -> IO (Ptr MDB_cursor))
-> IO (Ptr MDB_cursor) -> IO (Ptr MDB_cursor)
forall a b. (a -> b) -> a -> b
$ Ptr MDB_txn -> MDB_dbi_t -> IO (Ptr MDB_cursor)
mdb_cursor_open Ptr MDB_txn
ptxn MDB_dbi_t
dbi
                Ptr MDB_val
pk <- IO (Ptr MDB_val) -> IO (Ptr MDB_val)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Ptr MDB_val)
forall a. Storable a => IO (Ptr a)
malloc
                Ptr MDB_val
pv <- IO (Ptr MDB_val) -> IO (Ptr MDB_val)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Ptr MDB_val)
forall a. Storable a => IO (Ptr a)
malloc

                ()
_ <- case ReadOptions -> Maybe ByteString
readStart ReadOptions
ropts of
                    Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                    Just k :: ByteString
k -> ByteString -> (CStringLen -> IO ()) -> IO ()
forall a. ByteString -> (CStringLen -> IO a) -> IO a
unsafeUseAsCStringLen ByteString
k ((CStringLen -> IO ()) -> IO ()) -> (CStringLen -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(kp :: Ptr CChar
kp, kl :: Int
kl) ->
                        Ptr MDB_val -> MDB_val -> IO ()
forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr MDB_val
pk (CSize -> Ptr CChar -> MDB_val
MDB_val (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
kl) Ptr CChar
kp)

                IORef (Maybe (IO ()))
ref <- IO (IORef (Maybe (IO ()))) -> IO (IORef (Maybe (IO ())))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (IO ()))) -> IO (IORef (Maybe (IO ()))))
-> (IO () -> IO (IORef (Maybe (IO ()))))
-> IO ()
-> IO (IORef (Maybe (IO ())))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO (IORef (Maybe (IO ())))
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
m a -> m (IORef (Maybe (IO ())))
newFinalizedIORef (IO () -> IO (IORef (Maybe (IO ()))))
-> IO () -> IO (IORef (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ do
                    Ptr MDB_val -> IO ()
forall a. Ptr a -> IO ()
free Ptr MDB_val
pv IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Ptr MDB_val -> IO ()
forall a. Ptr a -> IO ()
free Ptr MDB_val
pk
                    Ptr MDB_cursor -> IO ()
c_mdb_cursor_close Ptr MDB_cursor
pcurs
                    -- No need to commit this read-only transaction.
                    Ptr MDB_txn -> IO ()
c_mdb_txn_abort Ptr MDB_txn
ptxn
                (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
-> IO
     (Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val, IORef (Maybe (IO ())))
forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr MDB_cursor
pcurs, Ptr MDB_val
pk, Ptr MDB_val
pv, IORef (Maybe (IO ()))
ref)
            (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
 IORef (Maybe (IO ())))
-> m (MDB_dbi_t, Ptr MDB_cursor, Ptr MDB_val, Ptr MDB_val,
      IORef (Maybe (IO ())))
forall (m :: * -> *) a. Monad m => a -> m a
return (MDB_dbi_t
op, Ptr MDB_cursor
pcurs, Ptr MDB_val
pk, Ptr MDB_val
pv, IORef (Maybe (IO ()))
ref))

data OverwriteOptions = OverwriteAllow | OverwriteAllowSame | OverwriteDisallow deriving (OverwriteOptions -> OverwriteOptions -> Bool
(OverwriteOptions -> OverwriteOptions -> Bool)
-> (OverwriteOptions -> OverwriteOptions -> Bool)
-> Eq OverwriteOptions
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: OverwriteOptions -> OverwriteOptions -> Bool
$c/= :: OverwriteOptions -> OverwriteOptions -> Bool
== :: OverwriteOptions -> OverwriteOptions -> Bool
$c== :: OverwriteOptions -> OverwriteOptions -> Bool
Eq)

data WriteOptions = WriteOptions
    { WriteOptions -> Int
writeTransactionSize :: !Int
    , WriteOptions -> OverwriteOptions
overwriteOptions :: !OverwriteOptions
    , WriteOptions -> Bool
writeAppend :: !Bool }

defaultWriteOptions :: WriteOptions
defaultWriteOptions :: WriteOptions
defaultWriteOptions = $WWriteOptions :: Int -> OverwriteOptions -> Bool -> WriteOptions
WriteOptions
    { writeTransactionSize :: Int
writeTransactionSize = 1
    , overwriteOptions :: OverwriteOptions
overwriteOptions = OverwriteOptions
OverwriteAllow
    , writeAppend :: Bool
writeAppend = Bool
False }


newtype ExceptionString = ExceptionString String deriving (Int -> ExceptionString -> ShowS
[ExceptionString] -> ShowS
ExceptionString -> FilePath
(Int -> ExceptionString -> ShowS)
-> (ExceptionString -> FilePath)
-> ([ExceptionString] -> ShowS)
-> Show ExceptionString
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [ExceptionString] -> ShowS
$cshowList :: [ExceptionString] -> ShowS
show :: ExceptionString -> FilePath
$cshow :: ExceptionString -> FilePath
showsPrec :: Int -> ExceptionString -> ShowS
$cshowsPrec :: Int -> ExceptionString -> ShowS
Show)
instance Exception ExceptionString

-- | Creates a fold with which we can stream key-value pairs into the given database.
--
-- It is the responsibility of the user to execute the fold on a bound thread.
--
-- The fold currently cannot be used with a scan. (The plan is for this shortcoming to be
-- remedied with or after a future release of streamly that addresses the underlying issue.)
--
-- Please specify a suitable transaction size in the write options; the default of 1 (one write transaction for each
-- key-value pair) could yield suboptimal performance. One could try, e.g., 100 KB chunks and benchmark from there.
{-# INLINE writeLMDB #-}
writeLMDB :: (MonadIO m) => Database ReadWrite -> WriteOptions -> Fold m (ByteString, ByteString) ()
writeLMDB :: Database ReadWrite
-> WriteOptions -> Fold m (ByteString, ByteString) ()
writeLMDB (Database penv :: Ptr MDB_env
penv dbi :: MDB_dbi_t
dbi) options :: WriteOptions
options =
    let txnSize :: Int
txnSize = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max 1 (WriteOptions -> Int
writeTransactionSize WriteOptions
options)
        overwriteOpt :: OverwriteOptions
overwriteOpt = WriteOptions -> OverwriteOptions
overwriteOptions WriteOptions
options
        flags :: CUInt
flags = [CUInt] -> CUInt
combineOptions ([CUInt] -> CUInt) -> [CUInt] -> CUInt
forall a b. (a -> b) -> a -> b
$
                    [CUInt
mdb_nooverwrite | OverwriteOptions
overwriteOpt OverwriteOptions -> [OverwriteOptions] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [OverwriteOptions
OverwriteAllowSame, OverwriteOptions
OverwriteDisallow]]
                    [CUInt] -> [CUInt] -> [CUInt]
forall a. [a] -> [a] -> [a]
++ [CUInt
mdb_append | WriteOptions -> Bool
writeAppend WriteOptions
options]
    in ((ThreadId, Int, Int, Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
 -> (ByteString, ByteString)
 -> m (ThreadId, Int, Int,
       Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))))
-> m (ThreadId, Int, Int,
      Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
-> ((ThreadId, Int, Int,
     Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
    -> m ())
-> Fold m (ByteString, ByteString) ()
forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold (\(threadId :: ThreadId
threadId, iter :: Int
iter, currChunkSz :: Int
currChunkSz, mtxn :: Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
mtxn) (k :: ByteString
k, v :: ByteString
v) -> do
        -- In the first few iterations, ascertain that we are still on the same (bound) thread.
        Int
iter' <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$
            if Int
iter Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< 3 then do
                ThreadId
threadId' <- IO ThreadId
myThreadId
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ThreadId
threadId' ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
/= ThreadId
threadId) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                    (ExceptionString -> IO ()
forall a e. Exception e => e -> a
throw (ExceptionString -> IO ()) -> ExceptionString -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> ExceptionString
ExceptionString "Error: writeLMDB veered off the original bound thread")
                Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Int
iter Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1
            else
                Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
iter

        Int
currChunkSz' <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$
            if Int
currChunkSz Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
txnSize then do
                let (_, ref :: IORef (Maybe (IO ()))
ref) = Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
-> (Ptr MDB_txn, IORef (Maybe (IO ())))
forall a. HasCallStack => Maybe a -> a
fromJust Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
mtxn
                IORef (Maybe (IO ())) -> IO ()
forall (m :: * -> *). MonadIO m => IORef (Maybe (IO ())) -> m ()
runIORefFinalizer IORef (Maybe (IO ()))
ref
                Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return 0
            else
                Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
currChunkSz

        (ptxn :: Ptr MDB_txn
ptxn, ref :: IORef (Maybe (IO ()))
ref) <-
            if Int
currChunkSz' Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== 0 then
                IO (Ptr MDB_txn, IORef (Maybe (IO ())))
-> m (Ptr MDB_txn, IORef (Maybe (IO ())))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Ptr MDB_txn, IORef (Maybe (IO ())))
 -> m (Ptr MDB_txn, IORef (Maybe (IO ()))))
-> IO (Ptr MDB_txn, IORef (Maybe (IO ())))
-> m (Ptr MDB_txn, IORef (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ IO (Ptr MDB_txn, IORef (Maybe (IO ())))
-> IO (Ptr MDB_txn, IORef (Maybe (IO ())))
forall a. IO a -> IO a
mask_ (IO (Ptr MDB_txn, IORef (Maybe (IO ())))
 -> IO (Ptr MDB_txn, IORef (Maybe (IO ()))))
-> IO (Ptr MDB_txn, IORef (Maybe (IO ())))
-> IO (Ptr MDB_txn, IORef (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ do
                    Ptr MDB_txn
ptxn <- Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin Ptr MDB_env
penv Ptr MDB_txn
forall a. Ptr a
nullPtr 0
                    IORef (Maybe (IO ()))
ref <- IO () -> IO (IORef (Maybe (IO ())))
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
m a -> m (IORef (Maybe (IO ())))
newFinalizedIORef (IO () -> IO (IORef (Maybe (IO ()))))
-> IO () -> IO (IORef (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ Ptr MDB_txn -> IO ()
mdb_txn_commit Ptr MDB_txn
ptxn
                    (Ptr MDB_txn, IORef (Maybe (IO ())))
-> IO (Ptr MDB_txn, IORef (Maybe (IO ())))
forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr MDB_txn
ptxn, IORef (Maybe (IO ()))
ref)
            else
                (Ptr MDB_txn, IORef (Maybe (IO ())))
-> m (Ptr MDB_txn, IORef (Maybe (IO ())))
forall (m :: * -> *) a. Monad m => a -> m a
return ((Ptr MDB_txn, IORef (Maybe (IO ())))
 -> m (Ptr MDB_txn, IORef (Maybe (IO ()))))
-> (Ptr MDB_txn, IORef (Maybe (IO ())))
-> m (Ptr MDB_txn, IORef (Maybe (IO ())))
forall a b. (a -> b) -> a -> b
$ Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
-> (Ptr MDB_txn, IORef (Maybe (IO ())))
forall a. HasCallStack => Maybe a -> a
fromJust Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
mtxn

        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ ByteString -> (CStringLen -> IO ()) -> IO ()
forall a. ByteString -> (CStringLen -> IO a) -> IO a
unsafeUseAsCStringLen ByteString
k ((CStringLen -> IO ()) -> IO ()) -> (CStringLen -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(kp :: Ptr CChar
kp, kl :: Int
kl) -> ByteString -> (CStringLen -> IO ()) -> IO ()
forall a. ByteString -> (CStringLen -> IO a) -> IO a
unsafeUseAsCStringLen ByteString
v ((CStringLen -> IO ()) -> IO ()) -> (CStringLen -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(vp :: Ptr CChar
vp, vl :: Int
vl) ->
            IO () -> (LMDB_Error -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch (Ptr MDB_txn
-> MDB_dbi_t
-> Ptr CChar
-> CSize
-> Ptr CChar
-> CSize
-> CUInt
-> IO ()
mdb_put_ Ptr MDB_txn
ptxn MDB_dbi_t
dbi Ptr CChar
kp (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
kl) Ptr CChar
vp (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
vl) CUInt
flags)
                (\(LMDB_Error
e :: LMDB_Error) -> do
                    -- Discard error if OverwriteAllowSame was specified and the error from LMDB
                    -- was due to the exact same key-value pair already existing in the database.
                    Bool
ok <- MDB_val -> (Ptr MDB_val -> IO Bool) -> IO Bool
forall a b. Storable a => a -> (Ptr a -> IO b) -> IO b
with (CSize -> Ptr CChar -> MDB_val
MDB_val (Int -> CSize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
kl) Ptr CChar
kp) ((Ptr MDB_val -> IO Bool) -> IO Bool)
-> (Ptr MDB_val -> IO Bool) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \pk :: Ptr MDB_val
pk ->
                            (Ptr MDB_val -> IO Bool) -> IO Bool
forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca ((Ptr MDB_val -> IO Bool) -> IO Bool)
-> (Ptr MDB_val -> IO Bool) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \pv :: Ptr MDB_val
pv -> do
                                CInt
rc <- Ptr MDB_txn -> MDB_dbi_t -> Ptr MDB_val -> Ptr MDB_val -> IO CInt
c_mdb_get Ptr MDB_txn
ptxn MDB_dbi_t
dbi Ptr MDB_val
pk Ptr MDB_val
pv
                                if CInt
rc CInt -> CInt -> Bool
forall a. Eq a => a -> a -> Bool
== 0 then do
                                    MDB_val
v' <- Ptr MDB_val -> IO MDB_val
forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pv
                                    ByteString
vbs <-  CStringLen -> IO ByteString
unsafePackCStringLen (MDB_val -> Ptr CChar
mv_data MDB_val
v', CSize -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CSize -> Int) -> CSize -> Int
forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
v')
                                    Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ OverwriteOptions
overwriteOpt OverwriteOptions -> OverwriteOptions -> Bool
forall a. Eq a => a -> a -> Bool
== OverwriteOptions
OverwriteAllowSame
                                            Bool -> Bool -> Bool
&& LMDB_Error -> Either Int MDB_ErrCode
e_code LMDB_Error
e Either Int MDB_ErrCode -> Either Int MDB_ErrCode -> Bool
forall a. Eq a => a -> a -> Bool
== MDB_ErrCode -> Either Int MDB_ErrCode
forall a b. b -> Either a b
Right MDB_ErrCode
MDB_KEYEXIST
                                            Bool -> Bool -> Bool
&& ByteString
vbs ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
v
                                else
                                    Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
                    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
ok (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe (IO ())) -> IO ()
forall (m :: * -> *). MonadIO m => IORef (Maybe (IO ())) -> m ()
runIORefFinalizer IORef (Maybe (IO ()))
ref IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> LMDB_Error -> IO ()
forall a e. Exception e => e -> a
throw LMDB_Error
e)
        (ThreadId, Int, Int, Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
-> m (ThreadId, Int, Int,
      Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId
threadId, Int
iter', Int
currChunkSz' Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1, (Ptr MDB_txn, IORef (Maybe (IO ())))
-> Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
forall a. a -> Maybe a
Just (Ptr MDB_txn
ptxn, IORef (Maybe (IO ()))
ref)))
    (do
        Bool
isBound <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Bool
isCurrentThreadBound
        ThreadId
threadId <- IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ThreadId -> m ThreadId) -> IO ThreadId -> m ThreadId
forall a b. (a -> b) -> a -> b
$ IO ThreadId
myThreadId
        if Bool
isBound then
            (ThreadId, Int, Int, Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
-> m (ThreadId, Int, Int,
      Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId
threadId, 0 :: Int, 0, Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
forall a. Maybe a
Nothing)
        else
            ExceptionString
-> m (ThreadId, Int, Int,
      Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
forall a e. Exception e => e -> a
throw (ExceptionString
 -> m (ThreadId, Int, Int,
       Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))))
-> ExceptionString
-> m (ThreadId, Int, Int,
      Maybe (Ptr MDB_txn, IORef (Maybe (IO ()))))
forall a b. (a -> b) -> a -> b
$ FilePath -> ExceptionString
ExceptionString "Error: writeLMDB should be executed on a bound thread")
    -- This final part is incompatible with scans.
    (\(threadId :: ThreadId
threadId, _, _, mtxn :: Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
mtxn) -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        ThreadId
threadId' <- IO ThreadId
myThreadId
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ThreadId
threadId' ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
/= ThreadId
threadId) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
            (ExceptionString -> IO ()
forall a e. Exception e => e -> a
throw (ExceptionString -> IO ()) -> ExceptionString -> IO ()
forall a b. (a -> b) -> a -> b
$ FilePath -> ExceptionString
ExceptionString "Error: writeLMDB veered off the original bound thread at the end")
        case Maybe (Ptr MDB_txn, IORef (Maybe (IO ())))
mtxn of
            Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just (_, ref :: IORef (Maybe (IO ()))
ref) -> IORef (Maybe (IO ())) -> IO ()
forall (m :: * -> *). MonadIO m => IORef (Maybe (IO ())) -> m ()
runIORefFinalizer IORef (Maybe (IO ()))
ref)