{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | = Acknowledgments
--
-- The functionality for the limits and getting the environment and database, in particular the idea
-- of specifying the read-only or read-write mode at the type level, was mostly obtained from the
-- [lmdb-simple](https://hackage.haskell.org/package/lmdb-simple) library.
module Streamly.External.LMDB
  ( -- * Environment

    -- | With LMDB, one first creates a so-called “environment,” which one can think of as a file or
    -- folder on disk.
    Environment,
    openEnvironment,
    isReadOnlyEnvironment,
    closeEnvironment,

    -- ** Mode
    Mode,
    ReadWrite,
    ReadOnly,

    -- ** Limits
    Limits (..),
    defaultLimits,
    gibibyte,
    tebibyte,

    -- * Database

    -- | After creating an environment, one creates within it one or more databases.
    Database,
    getDatabase,
    clearDatabase,
    closeDatabase,

    -- * Reading
    readLMDB,
    unsafeReadLMDB,

    -- ** Read-only transactions and cursors
    ReadOnlyTxn,
    beginReadOnlyTxn,
    abortReadOnlyTxn,
    Cursor,
    openCursor,
    closeCursor,

    -- ** Read options
    ReadOptions (..),
    defaultReadOptions,
    ReadDirection (..),

    -- * Writing
    writeLMDB,
    WriteOptions (..),
    defaultWriteOptions,
    OverwriteOptions (..),

    -- * Error types
    LMDB_Error (..),
    MDB_ErrCode (..),
  )
where

import Control.Concurrent (isCurrentThreadBound, myThreadId)
import Control.Concurrent.Async (asyncBound, wait)
import Control.Exception (Exception, catch, mask_, throw, tryJust)
import Control.Monad (guard, unless, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString, packCStringLen)
import Data.ByteString.Unsafe (unsafePackCStringLen, unsafeUseAsCStringLen)
import Data.Maybe (fromJust, isNothing)
import Data.Void (Void)
import Foreign (Ptr, alloca, free, malloc, nullPtr, peek)
import Foreign.C (Errno (Errno), eNOTDIR)
import Foreign.C.String (CStringLen)
import Foreign.Marshal.Utils (with)
import Foreign.Storable (poke)
import Streamly.External.LMDB.Internal (Database (..), Mode (..), ReadOnly, ReadWrite)
import Streamly.External.LMDB.Internal.Foreign
  ( LMDB_Error (..),
    MDB_ErrCode (..),
    MDB_cursor,
    MDB_env,
    MDB_txn,
    MDB_val (MDB_val, mv_data, mv_size),
    c_mdb_cursor_close,
    c_mdb_cursor_get,
    c_mdb_dbi_close,
    c_mdb_env_close,
    c_mdb_get,
    c_mdb_txn_abort,
    combineOptions,
    mdb_append,
    mdb_clear,
    mdb_create,
    mdb_cursor_open,
    mdb_dbi_open,
    mdb_env_create,
    mdb_env_open,
    mdb_env_set_mapsize,
    mdb_env_set_maxdbs,
    mdb_env_set_maxreaders,
    mdb_first,
    mdb_last,
    mdb_next,
    mdb_nooverwrite,
    mdb_nosubdir,
    mdb_notfound,
    mdb_notls,
    mdb_prev,
    mdb_put_,
    mdb_rdonly,
    mdb_set_range,
    mdb_txn_begin,
    mdb_txn_commit,
    throwLMDBErrNum,
  )
import Streamly.Internal.Data.Fold (Fold (Fold), Step (Partial))
import Streamly.Internal.Data.IOFinalizer (newIOFinalizer, runIOFinalizer)
import Streamly.Internal.Data.Stream.StreamD.Type (Step (Stop, Yield))
import Streamly.Internal.Data.Unfold (lmap)
import Streamly.Internal.Data.Unfold.Type (Unfold (Unfold))

newtype Environment mode = Environment (Ptr MDB_env)

isReadOnlyEnvironment :: Mode mode => Environment mode -> Bool
isReadOnlyEnvironment :: forall mode. Mode mode => Environment mode -> Bool
isReadOnlyEnvironment = forall a. Mode a => a -> Bool
isReadOnlyMode forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall mode. Environment mode -> mode
mode
  where
    mode :: Environment mode -> mode
    mode :: forall mode. Environment mode -> mode
mode = forall a. HasCallStack => a
undefined

-- | LMDB environments have various limits on the size and number of databases and concurrent
-- readers.
data Limits = Limits
  { -- | Memory map size, in bytes (also the maximum size of all databases).
    Limits -> Int
mapSize :: !Int,
    -- | Maximum number of named databases.
    Limits -> Int
maxDatabases :: !Int,
    -- | Maximum number of concurrent 'ReadOnly' transactions
    --   (also the number of slots in the lock table).
    Limits -> Int
maxReaders :: !Int
  }

-- | The default limits are 1 MiB map size, 0 named databases, and 126 concurrent readers. These can
-- be adjusted freely, and in particular the 'mapSize' may be set very large (limited only by
-- available address space). However, LMDB is not optimized for a large number of named databases so
-- 'maxDatabases' should be kept to a minimum.
--
-- The default 'mapSize' is intentionally small, and should be changed to something appropriate for
-- your application. It ought to be a multiple of the OS page size, and should be chosen as large as
-- possible to accommodate future growth of the database(s). Once set for an environment, this limit
-- cannot be reduced to a value smaller than the space already consumed by the environment, however
-- it can later be increased.
--
-- If you are going to use any named databases then you will need to change 'maxDatabases' to the
-- number of named databases you plan to use. However, you do not need to change this field if you
-- are only going to use the single main (unnamed) database.
defaultLimits :: Limits
defaultLimits :: Limits
defaultLimits =
  Limits
    { mapSize :: Int
mapSize = Int
1024 forall a. Num a => a -> a -> a
* Int
1024, -- 1 MiB.
      maxDatabases :: Int
maxDatabases = Int
0,
      maxReaders :: Int
maxReaders = Int
126
    }

-- | A convenience constant for obtaining a 1 GiB map size.
gibibyte :: Int
gibibyte :: Int
gibibyte = Int
1024 forall a. Num a => a -> a -> a
* Int
1024 forall a. Num a => a -> a -> a
* Int
1024

-- | A convenience constant for obtaining a 1 TiB map size.
tebibyte :: Int
tebibyte :: Int
tebibyte = Int
1024 forall a. Num a => a -> a -> a
* Int
1024 forall a. Num a => a -> a -> a
* Int
1024 forall a. Num a => a -> a -> a
* Int
1024

-- | Open an LMDB environment in either 'ReadWrite' or 'ReadOnly' mode. The 'FilePath' argument may
-- be either a directory or a regular file, but it must already exist. If a regular file, an
-- additional file with "-lock" appended to the name is used for the reader lock table.
--
-- Note that an environment must have been opened in 'ReadWrite' mode at least once before it can be
-- opened in 'ReadOnly' mode.
--
-- An environment opened in 'ReadOnly' mode may still modify the reader lock table (except when the
-- filesystem is read-only, in which case no locks are used).
openEnvironment :: Mode mode => FilePath -> Limits -> IO (Environment mode)
openEnvironment :: forall mode.
Mode mode =>
FilePath -> Limits -> IO (Environment mode)
openEnvironment FilePath
path Limits
limits = do
  Ptr MDB_env
penv <- IO (Ptr MDB_env)
mdb_env_create

  Ptr MDB_env -> Int -> IO ()
mdb_env_set_mapsize Ptr MDB_env
penv (Limits -> Int
mapSize Limits
limits)
  let maxDbs :: Int
maxDbs = Limits -> Int
maxDatabases Limits
limits in forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
maxDbs forall a. Eq a => a -> a -> Bool
/= Int
0) forall a b. (a -> b) -> a -> b
$ Ptr MDB_env -> Int -> IO ()
mdb_env_set_maxdbs Ptr MDB_env
penv Int
maxDbs
  Ptr MDB_env -> Int -> IO ()
mdb_env_set_maxreaders Ptr MDB_env
penv (Limits -> Int
maxReaders Limits
limits)

  -- Always use MDB_NOTLS; this is crucial for Haskell applications. (See
  -- https://github.com/LMDB/lmdb/blob/8d0cbbc936091eb85972501a9b31a8f86d4c51a7/libraries/liblmdb/lmdb.h#L615)
  let env :: Environment mode
env = forall mode. Ptr MDB_env -> Environment mode
Environment Ptr MDB_env
penv :: Mode mode => Environment mode
      flags :: [CUInt]
flags = CUInt
mdb_notls forall a. a -> [a] -> [a]
: [CUInt
mdb_rdonly | forall mode. Mode mode => Environment mode -> Bool
isReadOnlyEnvironment Environment mode
env]

  let isNotDirectoryError :: LMDB_Error -> Bool
      isNotDirectoryError :: LMDB_Error -> Bool
isNotDirectoryError LMDB_Error {e_code :: LMDB_Error -> Either Int MDB_ErrCode
e_code = Left Int
code}
        | CInt -> Errno
Errno (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
code) forall a. Eq a => a -> a -> Bool
== Errno
eNOTDIR = Bool
True
      isNotDirectoryError LMDB_Error
_ = Bool
False

  Either () ()
r <- forall e b a.
Exception e =>
(e -> Maybe b) -> IO a -> IO (Either b a)
tryJust (forall (f :: * -> *). Alternative f => Bool -> f ()
guard forall b c a. (b -> c) -> (a -> b) -> a -> c
. LMDB_Error -> Bool
isNotDirectoryError) forall a b. (a -> b) -> a -> b
$ Ptr MDB_env -> FilePath -> CUInt -> IO ()
mdb_env_open Ptr MDB_env
penv FilePath
path ([CUInt] -> CUInt
combineOptions [CUInt]
flags)
  case Either () ()
r of
    Left ()
_ -> Ptr MDB_env -> FilePath -> CUInt -> IO ()
mdb_env_open Ptr MDB_env
penv FilePath
path ([CUInt] -> CUInt
combineOptions forall a b. (a -> b) -> a -> b
$ CUInt
mdb_nosubdir forall a. a -> [a] -> [a]
: [CUInt]
flags)
    Right ()
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

  forall (m :: * -> *) a. Monad m => a -> m a
return Environment mode
env

-- | Closes the given environment.
--
-- If you have merely a few dozen environments at most, there should be no need for this. (It is a
-- common practice with LMDB to create one’s environments once and reuse them for the remainder of
-- the program’s execution.) If you find yourself needing this, it is your responsibility to heed
-- the [documented
-- caveats](https://github.com/LMDB/lmdb/blob/8d0cbbc936091eb85972501a9b31a8f86d4c51a7/libraries/liblmdb/lmdb.h#L787).
--
-- In particular, you will probably, before calling this function, want to (a) use 'closeDatabase',
-- and (b) pass in precreated transactions and cursors to 'readLMDB' and 'unsafeReadLMDB' to make
-- sure there are no transactions or cursors still left to be cleaned up by the garbage collector.
-- (As an alternative to (b), one could try manually triggering the garbage collector.)
closeEnvironment :: (Mode mode) => Environment mode -> IO ()
closeEnvironment :: forall mode. Mode mode => Environment mode -> IO ()
closeEnvironment (Environment Ptr MDB_env
penv) =
  Ptr MDB_env -> IO ()
c_mdb_env_close Ptr MDB_env
penv

-- | Gets a database with the given name. When creating a database (i.e., getting it for the first
-- time), one must do so in 'ReadWrite' mode.
--
-- If only one database is desired within the environment, the name can be 'Nothing' (known as the
-- “unnamed database”).
--
-- If one or more named databases (a database with a 'Just' name) are desired, the 'maxDatabases' of
-- the environment’s limits should have been adjusted accordingly. The unnamed database will in this
-- case contain the names of the named databases as keys, which one is allowed to read but not
-- write.
getDatabase :: (Mode mode) => Environment mode -> Maybe String -> IO (Database mode)
getDatabase :: forall mode.
Mode mode =>
Environment mode -> Maybe FilePath -> IO (Database mode)
getDatabase env :: Environment mode
env@(Environment Ptr MDB_env
penv) Maybe FilePath
name = do
  Ptr MDB_txn
ptxn <- Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin Ptr MDB_env
penv forall a. Ptr a
nullPtr ([CUInt] -> CUInt
combineOptions forall a b. (a -> b) -> a -> b
$ [CUInt
mdb_rdonly | forall mode. Mode mode => Environment mode -> Bool
isReadOnlyEnvironment Environment mode
env])
  MDB_dbi_t
dbi <- Ptr MDB_txn -> Maybe FilePath -> CUInt -> IO MDB_dbi_t
mdb_dbi_open Ptr MDB_txn
ptxn Maybe FilePath
name ([CUInt] -> CUInt
combineOptions forall a b. (a -> b) -> a -> b
$ [CUInt
mdb_create | Bool -> Bool
not forall a b. (a -> b) -> a -> b
$ forall mode. Mode mode => Environment mode -> Bool
isReadOnlyEnvironment Environment mode
env])
  Ptr MDB_txn -> IO ()
mdb_txn_commit Ptr MDB_txn
ptxn
  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall mode. Ptr MDB_env -> MDB_dbi_t -> Database mode
Database Ptr MDB_env
penv MDB_dbi_t
dbi

-- | Clears, i.e., removes all key-value pairs from, the given database.
clearDatabase :: (Mode mode) => Database mode -> IO ()
clearDatabase :: forall mode. Mode mode => Database mode -> IO ()
clearDatabase (Database Ptr MDB_env
penv MDB_dbi_t
dbi) =
  forall a. IO a -> IO (Async a)
asyncBound
    ( do
        Ptr MDB_txn
ptxn <- Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin Ptr MDB_env
penv forall a. Ptr a
nullPtr CUInt
0
        Ptr MDB_txn -> MDB_dbi_t -> IO ()
mdb_clear Ptr MDB_txn
ptxn MDB_dbi_t
dbi
        Ptr MDB_txn -> IO ()
mdb_txn_commit Ptr MDB_txn
ptxn
    )
    forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. Async a -> IO a
wait

-- | Closes the given database.
--
-- If you have merely a few dozen databases at most, there should be no need for this. (It is a
-- common practice with LMDB to create one’s databases once and reuse them for the remainder of the
-- program’s execution.) If you find yourself needing this, it is your responsibility to heed the
-- [documented
-- caveats](https://github.com/LMDB/lmdb/blob/8d0cbbc936091eb85972501a9b31a8f86d4c51a7/libraries/liblmdb/lmdb.h#L1200).
closeDatabase :: (Mode mode) => Database mode -> IO ()
closeDatabase :: forall mode. Mode mode => Database mode -> IO ()
closeDatabase (Database Ptr MDB_env
penv MDB_dbi_t
dbi) =
  Ptr MDB_env -> MDB_dbi_t -> IO ()
c_mdb_dbi_close Ptr MDB_env
penv MDB_dbi_t
dbi

-- | Creates an unfold with which we can stream key-value pairs from the given database.
--
-- If an existing read-only transaction and cursor are not provided, a read-only transaction and
-- cursor are automatically created and kept open for the duration of the unfold; we suggest doing
-- this as a first option. However, if you find this to be a bottleneck (e.g., if you find upon
-- profiling that a significant time is being spent at @mdb_txn_begin@, or if you find yourself
-- having to increase 'maxReaders' in the environment’s limits because the transactions and cursors
-- are not being garbage collected fast enough), consider precreating a transaction and cursor using
-- 'beginReadOnlyTxn' and 'openCursor'.
--
-- In any case, bear in mind at all times LMDB’s [caveats regarding long-lived
-- transactions](https://github.com/LMDB/lmdb/blob/8d0cbbc936091eb85972501a9b31a8f86d4c51a7/libraries/liblmdb/lmdb.h#L107).
--
-- If you don’t want the overhead of intermediate @ByteString@s (on your way to your eventual data
-- structures), use 'unsafeReadLMDB' instead.
{-# INLINE readLMDB #-}
readLMDB ::
  (MonadIO m, Mode mode) =>
  Database mode ->
  Maybe (ReadOnlyTxn, Cursor) ->
  ReadOptions ->
  Unfold m Void (ByteString, ByteString)
readLMDB :: forall (m :: * -> *) mode.
(MonadIO m, Mode mode) =>
Database mode
-> Maybe (ReadOnlyTxn, Cursor)
-> ReadOptions
-> Unfold m Void (ByteString, ByteString)
readLMDB Database mode
db Maybe (ReadOnlyTxn, Cursor)
mtxncurs ReadOptions
ropts = forall (m :: * -> *) mode k v.
(MonadIO m, Mode mode) =>
Database mode
-> Maybe (ReadOnlyTxn, Cursor)
-> ReadOptions
-> (CStringLen -> IO k)
-> (CStringLen -> IO v)
-> Unfold m Void (k, v)
unsafeReadLMDB Database mode
db Maybe (ReadOnlyTxn, Cursor)
mtxncurs ReadOptions
ropts CStringLen -> IO ByteString
packCStringLen CStringLen -> IO ByteString
packCStringLen

-- | Similar to 'readLMDB', except that the keys and values are not automatically converted into
-- Haskell @ByteString@s.
--
-- To ensure safety, make sure that the memory pointed to by the 'CStringLen' for each key/value
-- mapping function call is (a) only read (and not written to); and (b) not used after the mapping
-- function has returned. One way to transform the 'CStringLen's to your desired data structures is
-- to use 'Data.ByteString.Unsafe.unsafePackCStringLen'.
{-# INLINE unsafeReadLMDB #-}
unsafeReadLMDB ::
  (MonadIO m, Mode mode) =>
  Database mode ->
  Maybe (ReadOnlyTxn, Cursor) ->
  ReadOptions ->
  (CStringLen -> IO k) ->
  (CStringLen -> IO v) ->
  Unfold m Void (k, v)
unsafeReadLMDB :: forall (m :: * -> *) mode k v.
(MonadIO m, Mode mode) =>
Database mode
-> Maybe (ReadOnlyTxn, Cursor)
-> ReadOptions
-> (CStringLen -> IO k)
-> (CStringLen -> IO v)
-> Unfold m Void (k, v)
unsafeReadLMDB (Database Ptr MDB_env
penv MDB_dbi_t
dbi) Maybe (ReadOnlyTxn, Cursor)
mtxncurs ReadOptions
ropts CStringLen -> IO k
kmap CStringLen -> IO v
vmap =
  let (MDB_dbi_t
firstOp, MDB_dbi_t
subsequentOp) = case (ReadOptions -> ReadDirection
readDirection ReadOptions
ropts, ReadOptions -> Maybe ByteString
readStart ReadOptions
ropts) of
        (ReadDirection
Forward, Maybe ByteString
Nothing) -> (MDB_dbi_t
mdb_first, MDB_dbi_t
mdb_next)
        (ReadDirection
Forward, Just ByteString
_) -> (MDB_dbi_t
mdb_set_range, MDB_dbi_t
mdb_next)
        (ReadDirection
Backward, Maybe ByteString
Nothing) -> (MDB_dbi_t
mdb_last, MDB_dbi_t
mdb_prev)
        (ReadDirection
Backward, Just ByteString
_) -> (MDB_dbi_t
mdb_set_range, MDB_dbi_t
mdb_prev)
      supply :: c -> Unfold m c b -> Unfold m a b
supply = forall a c (m :: * -> *) b.
(a -> c) -> Unfold m c b -> Unfold m a b
lmap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> b -> a
const
   in forall {c} {m :: * -> *} {b} {a}. c -> Unfold m c b -> Unfold m a b
supply MDB_dbi_t
firstOp forall a b. (a -> b) -> a -> b
$
        forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
Unfold
          ( \(MDB_dbi_t
op, Ptr MDB_cursor
pcurs, Ptr MDB_val
pk, Ptr MDB_val
pv, IOFinalizer
ref) -> do
              CInt
rc <-
                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
                  if MDB_dbi_t
op forall a. Eq a => a -> a -> Bool
== MDB_dbi_t
mdb_set_range Bool -> Bool -> Bool
&& MDB_dbi_t
subsequentOp forall a. Eq a => a -> a -> Bool
== MDB_dbi_t
mdb_prev
                    then do
                      -- Reverse MDB_SET_RANGE.
                      MDB_val
kfst' <- forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pk
                      ByteString
kfst <- CStringLen -> IO ByteString
packCStringLen (MDB_val -> Ptr CChar
mv_data MDB_val
kfst', forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
kfst')
                      CInt
rc <- Ptr MDB_cursor
-> Ptr MDB_val -> Ptr MDB_val -> MDB_dbi_t -> IO CInt
c_mdb_cursor_get Ptr MDB_cursor
pcurs Ptr MDB_val
pk Ptr MDB_val
pv MDB_dbi_t
op
                      if CInt
rc forall a. Eq a => a -> a -> Bool
/= CInt
0 Bool -> Bool -> Bool
&& CInt
rc forall a. Eq a => a -> a -> Bool
== CInt
mdb_notfound
                        then Ptr MDB_cursor
-> Ptr MDB_val -> Ptr MDB_val -> MDB_dbi_t -> IO CInt
c_mdb_cursor_get Ptr MDB_cursor
pcurs Ptr MDB_val
pk Ptr MDB_val
pv MDB_dbi_t
mdb_last
                        else
                          if CInt
rc forall a. Eq a => a -> a -> Bool
== CInt
0
                            then do
                              MDB_val
k' <- forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pk
                              ByteString
k <- CStringLen -> IO ByteString
unsafePackCStringLen (MDB_val -> Ptr CChar
mv_data MDB_val
k', forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
k')
                              if ByteString
k forall a. Eq a => a -> a -> Bool
/= ByteString
kfst
                                then Ptr MDB_cursor
-> Ptr MDB_val -> Ptr MDB_val -> MDB_dbi_t -> IO CInt
c_mdb_cursor_get Ptr MDB_cursor
pcurs Ptr MDB_val
pk Ptr MDB_val
pv MDB_dbi_t
mdb_prev
                                else forall (m :: * -> *) a. Monad m => a -> m a
return CInt
rc
                            else forall (m :: * -> *) a. Monad m => a -> m a
return CInt
rc
                    else Ptr MDB_cursor
-> Ptr MDB_val -> Ptr MDB_val -> MDB_dbi_t -> IO CInt
c_mdb_cursor_get Ptr MDB_cursor
pcurs Ptr MDB_val
pk Ptr MDB_val
pv MDB_dbi_t
op

              Bool
found <-
                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
                  if CInt
rc forall a. Eq a => a -> a -> Bool
/= CInt
0 Bool -> Bool -> Bool
&& CInt
rc forall a. Eq a => a -> a -> Bool
/= CInt
mdb_notfound
                    then do
                      forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref
                      forall noReturn. FilePath -> CInt -> IO noReturn
throwLMDBErrNum FilePath
"mdb_cursor_get" CInt
rc
                    else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ CInt
rc forall a. Eq a => a -> a -> Bool
/= CInt
mdb_notfound

              if Bool
found
                then do
                  !k
k <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ (\MDB_val
x -> CStringLen -> IO k
kmap (MDB_val -> Ptr CChar
mv_data MDB_val
x, forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
x)) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pk
                  !v
v <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ (\MDB_val
x -> CStringLen -> IO v
vmap (MDB_val -> Ptr CChar
mv_data MDB_val
x, forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
x)) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pv
                  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield (k
k, v
v) (MDB_dbi_t
subsequentOp, Ptr MDB_cursor
pcurs, Ptr MDB_val
pk, Ptr MDB_val
pv, IOFinalizer
ref)
                else do
                  forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref
                  forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
          )
          ( \MDB_dbi_t
op -> do
              (Ptr MDB_cursor
pcurs, Ptr MDB_val
pk, Ptr MDB_val
pv, IOFinalizer
ref) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
                forall a. IO a -> IO a
mask_ forall a b. (a -> b) -> a -> b
$ do
                  (Ptr MDB_txn
ptxn, Ptr MDB_cursor
pcurs) <- case Maybe (ReadOnlyTxn, Cursor)
mtxncurs of
                    Maybe (ReadOnlyTxn, Cursor)
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
                      Ptr MDB_txn
ptxn <- Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin Ptr MDB_env
penv forall a. Ptr a
nullPtr CUInt
mdb_rdonly
                      Ptr MDB_cursor
pcurs <- Ptr MDB_txn -> MDB_dbi_t -> IO (Ptr MDB_cursor)
mdb_cursor_open Ptr MDB_txn
ptxn MDB_dbi_t
dbi
                      forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr MDB_txn
ptxn, Ptr MDB_cursor
pcurs)
                    Just (ReadOnlyTxn Ptr MDB_txn
ptxn, Cursor Ptr MDB_cursor
pcurs) ->
                      forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr MDB_txn
ptxn, Ptr MDB_cursor
pcurs)
                  Ptr MDB_val
pk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a. Storable a => IO (Ptr a)
malloc
                  Ptr MDB_val
pv <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a. Storable a => IO (Ptr a)
malloc

                  ()
_ <- case ReadOptions -> Maybe ByteString
readStart ReadOptions
ropts of
                    Maybe ByteString
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
                    Just ByteString
k -> forall a. ByteString -> (CStringLen -> IO a) -> IO a
unsafeUseAsCStringLen ByteString
k forall a b. (a -> b) -> a -> b
$ \(Ptr CChar
kp, Int
kl) ->
                      forall a. Storable a => Ptr a -> a -> IO ()
poke Ptr MDB_val
pk (CSize -> Ptr CChar -> MDB_val
MDB_val (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
kl) Ptr CChar
kp)

                  IOFinalizer
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadIO m => IO a -> m IOFinalizer
newIOFinalizer forall a b. (a -> b) -> a -> b
$ do
                    forall a. Ptr a -> IO ()
free Ptr MDB_val
pv forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. Ptr a -> IO ()
free Ptr MDB_val
pk
                    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Maybe a -> Bool
isNothing Maybe (ReadOnlyTxn, Cursor)
mtxncurs) forall a b. (a -> b) -> a -> b
$
                      -- There is no need to commit this read-only transaction.
                      Ptr MDB_cursor -> IO ()
c_mdb_cursor_close Ptr MDB_cursor
pcurs forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Ptr MDB_txn -> IO ()
c_mdb_txn_abort Ptr MDB_txn
ptxn
                  forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr MDB_cursor
pcurs, Ptr MDB_val
pk, Ptr MDB_val
pv, IOFinalizer
ref)
              forall (m :: * -> *) a. Monad m => a -> m a
return (MDB_dbi_t
op, Ptr MDB_cursor
pcurs, Ptr MDB_val
pk, Ptr MDB_val
pv, IOFinalizer
ref)
          )

newtype ReadOnlyTxn = ReadOnlyTxn (Ptr MDB_txn)

-- | Begins an LMDB read-only transaction for use with 'readLMDB' or 'unsafeReadLMDB'. It is your
-- responsibility to (a) use the transaction only on databases in the same environment, (b) make
-- sure that those databases were already obtained before the transaction was begun, and (c) dispose
-- of the transaction with 'abortReadOnlyTxn'.
beginReadOnlyTxn :: Environment mode -> IO ReadOnlyTxn
beginReadOnlyTxn :: forall mode. Environment mode -> IO ReadOnlyTxn
beginReadOnlyTxn (Environment Ptr MDB_env
penv) = Ptr MDB_txn -> ReadOnlyTxn
ReadOnlyTxn forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin Ptr MDB_env
penv forall a. Ptr a
nullPtr CUInt
mdb_rdonly

-- | Disposes of a read-only transaction created with 'beginReadOnlyTxn'.
abortReadOnlyTxn :: ReadOnlyTxn -> IO ()
abortReadOnlyTxn :: ReadOnlyTxn -> IO ()
abortReadOnlyTxn (ReadOnlyTxn Ptr MDB_txn
ptxn) = Ptr MDB_txn -> IO ()
c_mdb_txn_abort Ptr MDB_txn
ptxn

newtype Cursor = Cursor (Ptr MDB_cursor)

-- | Opens a cursor for use with 'readLMDB' or 'unsafeReadLMDB'. It is your responsibility to (a)
-- make sure the cursor only gets used by a single 'readLMDB' or 'unsafeReadLMDB' @Unfold@ at the
-- same time (to be safe, one can open a new cursor for every 'readLMDB' or 'unsafeReadLMDB' call),
-- (b) make sure the provided database is within the environment on which the provided transaction
-- was begun, and (c) dispose of the cursor with 'closeCursor' (logically before 'abortReadOnlyTxn',
-- although the order doesn’t really matter for read-only transactions).
openCursor :: ReadOnlyTxn -> Database mode -> IO Cursor
openCursor :: forall mode. ReadOnlyTxn -> Database mode -> IO Cursor
openCursor (ReadOnlyTxn Ptr MDB_txn
ptxn) (Database Ptr MDB_env
_ MDB_dbi_t
dbi) =
  Ptr MDB_cursor -> Cursor
Cursor forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ptr MDB_txn -> MDB_dbi_t -> IO (Ptr MDB_cursor)
mdb_cursor_open Ptr MDB_txn
ptxn MDB_dbi_t
dbi

-- | Disposes of a cursor created with 'openCursor'.
closeCursor :: Cursor -> IO ()
closeCursor :: Cursor -> IO ()
closeCursor (Cursor Ptr MDB_cursor
pcurs) =
  Ptr MDB_cursor -> IO ()
c_mdb_cursor_close Ptr MDB_cursor
pcurs

data ReadOptions = ReadOptions
  { ReadOptions -> ReadDirection
readDirection :: !ReadDirection,
    -- | If 'Nothing', a forward [backward] iteration starts at the beginning [end] of the database.
    -- Otherwise, it starts at the first key that is greater [less] than or equal to the 'Just' key.
    ReadOptions -> Maybe ByteString
readStart :: !(Maybe ByteString)
  }
  deriving (Int -> ReadOptions -> ShowS
[ReadOptions] -> ShowS
ReadOptions -> FilePath
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [ReadOptions] -> ShowS
$cshowList :: [ReadOptions] -> ShowS
show :: ReadOptions -> FilePath
$cshow :: ReadOptions -> FilePath
showsPrec :: Int -> ReadOptions -> ShowS
$cshowsPrec :: Int -> ReadOptions -> ShowS
Show)

-- | By default, we start reading from the beginning of the database (i.e., from the smallest key).
defaultReadOptions :: ReadOptions
defaultReadOptions :: ReadOptions
defaultReadOptions =
  ReadOptions
    { readDirection :: ReadDirection
readDirection = ReadDirection
Forward,
      readStart :: Maybe ByteString
readStart = forall a. Maybe a
Nothing
    }

-- | Direction of key iteration.
data ReadDirection = Forward | Backward deriving (Int -> ReadDirection -> ShowS
[ReadDirection] -> ShowS
ReadDirection -> FilePath
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [ReadDirection] -> ShowS
$cshowList :: [ReadDirection] -> ShowS
show :: ReadDirection -> FilePath
$cshow :: ReadDirection -> FilePath
showsPrec :: Int -> ReadDirection -> ShowS
$cshowsPrec :: Int -> ReadDirection -> ShowS
Show)

data OverwriteOptions = OverwriteAllow | OverwriteAllowSame | OverwriteDisallow deriving (OverwriteOptions -> OverwriteOptions -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: OverwriteOptions -> OverwriteOptions -> Bool
$c/= :: OverwriteOptions -> OverwriteOptions -> Bool
== :: OverwriteOptions -> OverwriteOptions -> Bool
$c== :: OverwriteOptions -> OverwriteOptions -> Bool
Eq)

data WriteOptions = WriteOptions
  { WriteOptions -> Int
writeTransactionSize :: !Int,
    WriteOptions -> OverwriteOptions
overwriteOptions :: !OverwriteOptions,
    WriteOptions -> Bool
writeAppend :: !Bool
  }

defaultWriteOptions :: WriteOptions
defaultWriteOptions :: WriteOptions
defaultWriteOptions =
  WriteOptions
    { writeTransactionSize :: Int
writeTransactionSize = Int
1,
      overwriteOptions :: OverwriteOptions
overwriteOptions = OverwriteOptions
OverwriteAllow,
      writeAppend :: Bool
writeAppend = Bool
False
    }

newtype ExceptionString = ExceptionString String deriving (Int -> ExceptionString -> ShowS
[ExceptionString] -> ShowS
ExceptionString -> FilePath
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [ExceptionString] -> ShowS
$cshowList :: [ExceptionString] -> ShowS
show :: ExceptionString -> FilePath
$cshow :: ExceptionString -> FilePath
showsPrec :: Int -> ExceptionString -> ShowS
$cshowsPrec :: Int -> ExceptionString -> ShowS
Show)

instance Exception ExceptionString

-- | Creates a fold with which we can stream key-value pairs into the given database.
--
-- It is the responsibility of the user to execute the fold on a bound thread.
--
-- The fold currently cannot be used with a scan. (The plan is for this shortcoming to be remedied
-- with or after a future release of streamly that addresses the underlying issue.)
--
-- Please specify a suitable transaction size in the write options; the default of 1 (one write
-- transaction for each key-value pair) could yield suboptimal performance. One could try, e.g., 100
-- KB chunks and benchmark from there.
{-# INLINE writeLMDB #-}
writeLMDB :: (MonadIO m) => Database ReadWrite -> WriteOptions -> Fold m (ByteString, ByteString) ()
writeLMDB :: forall (m :: * -> *).
MonadIO m =>
Database ReadWrite
-> WriteOptions -> Fold m (ByteString, ByteString) ()
writeLMDB (Database Ptr MDB_env
penv MDB_dbi_t
dbi) WriteOptions
options =
  let txnSize :: Int
txnSize = forall a. Ord a => a -> a -> a
max Int
1 (WriteOptions -> Int
writeTransactionSize WriteOptions
options)
      overwriteOpt :: OverwriteOptions
overwriteOpt = WriteOptions -> OverwriteOptions
overwriteOptions WriteOptions
options
      flags :: CUInt
flags =
        [CUInt] -> CUInt
combineOptions forall a b. (a -> b) -> a -> b
$
          [CUInt
mdb_nooverwrite | OverwriteOptions
overwriteOpt forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [OverwriteOptions
OverwriteAllowSame, OverwriteOptions
OverwriteDisallow]]
            forall a. [a] -> [a] -> [a]
++ [CUInt
mdb_append | WriteOptions -> Bool
writeAppend WriteOptions
options]
   in forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold
        ( \(ThreadId
threadId, Int
iter, Int
currChunkSz, Maybe (Ptr MDB_txn, IOFinalizer)
mtxn) (ByteString
k, ByteString
v) -> do
            -- In the first few iterations, ascertain that we are still on the same (bound) thread.
            Int
iter' <-
              forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
                if Int
iter forall a. Ord a => a -> a -> Bool
< Int
3
                  then do
                    ThreadId
threadId' <- IO ThreadId
myThreadId
                    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ThreadId
threadId' forall a. Eq a => a -> a -> Bool
/= ThreadId
threadId) forall a b. (a -> b) -> a -> b
$
                      forall a e. Exception e => e -> a
throw
                        (FilePath -> ExceptionString
ExceptionString FilePath
"Error: writeLMDB veered off the original bound thread")
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int
iter forall a. Num a => a -> a -> a
+ Int
1
                  else forall (m :: * -> *) a. Monad m => a -> m a
return Int
iter

            Int
currChunkSz' <-
              forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
                if Int
currChunkSz forall a. Ord a => a -> a -> Bool
>= Int
txnSize
                  then do
                    let (Ptr MDB_txn
_, IOFinalizer
ref) = forall a. HasCallStack => Maybe a -> a
fromJust Maybe (Ptr MDB_txn, IOFinalizer)
mtxn
                    forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref
                    forall (m :: * -> *) a. Monad m => a -> m a
return Int
0
                  else forall (m :: * -> *) a. Monad m => a -> m a
return Int
currChunkSz

            (Ptr MDB_txn
ptxn, IOFinalizer
ref) <-
              if Int
currChunkSz' forall a. Eq a => a -> a -> Bool
== Int
0
                then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
                  forall a. IO a -> IO a
mask_ forall a b. (a -> b) -> a -> b
$ do
                    Ptr MDB_txn
ptxn <- Ptr MDB_env -> Ptr MDB_txn -> CUInt -> IO (Ptr MDB_txn)
mdb_txn_begin Ptr MDB_env
penv forall a. Ptr a
nullPtr CUInt
0
                    IOFinalizer
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m IOFinalizer
newIOFinalizer forall a b. (a -> b) -> a -> b
$ Ptr MDB_txn -> IO ()
mdb_txn_commit Ptr MDB_txn
ptxn
                    forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr MDB_txn
ptxn, IOFinalizer
ref)
                else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => Maybe a -> a
fromJust Maybe (Ptr MDB_txn, IOFinalizer)
mtxn

            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
              forall a. ByteString -> (CStringLen -> IO a) -> IO a
unsafeUseAsCStringLen ByteString
k forall a b. (a -> b) -> a -> b
$ \(Ptr CChar
kp, Int
kl) -> forall a. ByteString -> (CStringLen -> IO a) -> IO a
unsafeUseAsCStringLen ByteString
v forall a b. (a -> b) -> a -> b
$ \(Ptr CChar
vp, Int
vl) ->
                forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
                  (Ptr MDB_txn
-> MDB_dbi_t
-> Ptr CChar
-> CSize
-> Ptr CChar
-> CSize
-> CUInt
-> IO ()
mdb_put_ Ptr MDB_txn
ptxn MDB_dbi_t
dbi Ptr CChar
kp (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
kl) Ptr CChar
vp (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
vl) CUInt
flags)
                  ( \(LMDB_Error
e :: LMDB_Error) -> do
                      -- Discard error if OverwriteAllowSame was specified and the error from LMDB
                      -- was due to the exact same key-value pair already existing in the database.
                      Bool
ok <- forall a b. Storable a => a -> (Ptr a -> IO b) -> IO b
with (CSize -> Ptr CChar -> MDB_val
MDB_val (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
kl) Ptr CChar
kp) forall a b. (a -> b) -> a -> b
$ \Ptr MDB_val
pk ->
                        forall a b. Storable a => (Ptr a -> IO b) -> IO b
alloca forall a b. (a -> b) -> a -> b
$ \Ptr MDB_val
pv -> do
                          CInt
rc <- Ptr MDB_txn -> MDB_dbi_t -> Ptr MDB_val -> Ptr MDB_val -> IO CInt
c_mdb_get Ptr MDB_txn
ptxn MDB_dbi_t
dbi Ptr MDB_val
pk Ptr MDB_val
pv
                          if CInt
rc forall a. Eq a => a -> a -> Bool
== CInt
0
                            then do
                              MDB_val
v' <- forall a. Storable a => Ptr a -> IO a
peek Ptr MDB_val
pv
                              ByteString
vbs <- CStringLen -> IO ByteString
unsafePackCStringLen (MDB_val -> Ptr CChar
mv_data MDB_val
v', forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ MDB_val -> CSize
mv_size MDB_val
v')
                              forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
                                OverwriteOptions
overwriteOpt forall a. Eq a => a -> a -> Bool
== OverwriteOptions
OverwriteAllowSame
                                  Bool -> Bool -> Bool
&& LMDB_Error -> Either Int MDB_ErrCode
e_code LMDB_Error
e forall a. Eq a => a -> a -> Bool
== forall a b. b -> Either a b
Right MDB_ErrCode
MDB_KEYEXIST
                                  Bool -> Bool -> Bool
&& ByteString
vbs forall a. Eq a => a -> a -> Bool
== ByteString
v
                            else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
                      forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
ok forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a e. Exception e => e -> a
throw LMDB_Error
e
                  )
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
Partial (ThreadId
threadId, Int
iter', Int
currChunkSz' forall a. Num a => a -> a -> a
+ Int
1, forall a. a -> Maybe a
Just (Ptr MDB_txn
ptxn, IOFinalizer
ref))
        )
        ( do
            Bool
isBound <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Bool
isCurrentThreadBound
            ThreadId
threadId <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId
            if Bool
isBound
              then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
Partial (ThreadId
threadId, Int
0 :: Int, Int
0, forall a. Maybe a
Nothing)
              else forall a e. Exception e => e -> a
throw forall a b. (a -> b) -> a -> b
$ FilePath -> ExceptionString
ExceptionString FilePath
"Error: writeLMDB should be executed on a bound thread"
        )
        -- This final part is incompatible with scans.
        ( \(ThreadId
threadId, Int
_, Int
_, Maybe (Ptr MDB_txn, IOFinalizer)
mtxn) -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
            ThreadId
threadId' <- IO ThreadId
myThreadId
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ThreadId
threadId' forall a. Eq a => a -> a -> Bool
/= ThreadId
threadId) forall a b. (a -> b) -> a -> b
$
              forall a e. Exception e => e -> a
throw
                (FilePath -> ExceptionString
ExceptionString FilePath
"Error: writeLMDB veered off the original bound thread at the end")
            case Maybe (Ptr MDB_txn, IOFinalizer)
mtxn of
              Maybe (Ptr MDB_txn, IOFinalizer)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
              Just (Ptr MDB_txn
_, IOFinalizer
ref) -> forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref
        )