{-# LANGUAGE DataKinds #-}
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}

module Cachix.Client.Push
  ( -- * Pushing a single path
    pushSingleStorePath,
    PushCache (..),
    PushStrategy (..),
    defaultWithXzipCompressor,
    defaultWithXzipCompressorWithLevel,

    -- * Pushing a closure of store paths
    pushClosure,
    mapConcurrentlyBounded,
  )
where

import qualified Cachix.Api as Api
import Cachix.Api.Error
import Cachix.Api.Signing (fingerprint, passthroughHashSink, passthroughHashSinkB16, passthroughSizeSink)
import Cachix.Client.Exception (CachixException (..))
import Cachix.Client.Secrets
import Cachix.Client.Servant
import Cachix.Client.Store (Store)
import qualified Cachix.Client.Store as Store
import qualified Cachix.Types.NarInfoCreate as Api
import Control.Concurrent.Async (mapConcurrently)
import qualified Control.Concurrent.QSem as QSem
import Control.Exception.Safe (MonadMask, throwM)
import Control.Monad.Trans.Resource (ResourceT)
import Control.Retry (RetryPolicy, RetryStatus, exponentialBackoff, limitRetries, recoverAll)
import Crypto.Sign.Ed25519
import qualified Data.ByteString.Base64 as B64
import Data.Conduit
import Data.Conduit.Lzma (compress)
import Data.Conduit.Process
import Data.IORef
import qualified Data.Set as Set
import qualified Data.Text as T
import Network.HTTP.Types (status401, status404)
import Protolude
import Servant.API
import Servant.Auth ()
import Servant.Auth.Client
import Servant.Client.Streaming hiding (ClientError)
import Servant.Conduit ()
import qualified System.Nix.Base32

data PushCache
  = PushCache
      { PushCache -> Text
pushCacheName :: Text,
        PushCache -> SigningKey
pushCacheSigningKey :: SigningKey,
        PushCache -> Token
pushCacheToken :: Token
      }

data PushStrategy m r
  = PushStrategy
      { -- | Called when a path is already in the cache.
        PushStrategy m r -> m r
onAlreadyPresent :: m r,
        PushStrategy m r -> RetryStatus -> Int64 -> m ()
onAttempt :: RetryStatus -> Int64 -> m (),
        PushStrategy m r -> m r
on401 :: m r,
        PushStrategy m r -> ClientError -> m r
onError :: ClientError -> m r,
        PushStrategy m r -> m r
onDone :: m r,
        PushStrategy m r
-> forall a.
   (ConduitM ByteString ByteString (ResourceT IO) () -> m a) -> m a
withXzipCompressor :: forall a. (ConduitM ByteString ByteString (ResourceT IO) () -> m a) -> m a,
        PushStrategy m r -> Bool
omitDeriver :: Bool
      }

defaultWithXzipCompressor :: forall m a. (ConduitM ByteString ByteString (ResourceT IO) () -> m a) -> m a
defaultWithXzipCompressor :: (ConduitM ByteString ByteString (ResourceT IO) () -> m a) -> m a
defaultWithXzipCompressor = ((ConduitM ByteString ByteString (ResourceT IO) () -> m a)
-> ConduitM ByteString ByteString (ResourceT IO) () -> m a
forall a b. (a -> b) -> a -> b
$ Maybe Int -> ConduitM ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
Maybe Int -> ConduitM ByteString ByteString m ()
compress (Int -> Maybe Int
forall a. a -> Maybe a
Just 2))

defaultWithXzipCompressorWithLevel :: Int -> forall m a. (ConduitM ByteString ByteString (ResourceT IO) () -> m a) -> m a
defaultWithXzipCompressorWithLevel :: Int
-> forall (m :: * -> *) a.
   (ConduitM ByteString ByteString (ResourceT IO) () -> m a) -> m a
defaultWithXzipCompressorWithLevel l :: Int
l = ((ConduitM ByteString ByteString (ResourceT IO) () -> m a)
-> ConduitM ByteString ByteString (ResourceT IO) () -> m a
forall a b. (a -> b) -> a -> b
$ Maybe Int -> ConduitM ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
Maybe Int -> ConduitM ByteString ByteString m ()
compress (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
l))

pushSingleStorePath ::
  (MonadMask m, MonadIO m) =>
  -- | cachix base url, connection manager, see 'Cachix.Client.URI.defaultCachixBaseUrl', 'Servant.Client.mkClientEnv'
  ClientEnv ->
  Store ->
  -- | details for pushing to cache
  PushCache ->
  -- | how to report results, (some) errors, and do some things
  PushStrategy m r ->
  -- | store path
  Text ->
  -- | r is determined by the 'PushStrategy'
  m r
pushSingleStorePath :: ClientEnv -> Store -> PushCache -> PushStrategy m r -> Text -> m r
pushSingleStorePath clientEnv :: ClientEnv
clientEnv _store :: Store
_store cache :: PushCache
cache cb :: PushStrategy m r
cb storePath :: Text
storePath = (RetryStatus -> m r) -> m r
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
(RetryStatus -> m a) -> m a
retryAll ((RetryStatus -> m r) -> m r) -> (RetryStatus -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \retrystatus :: RetryStatus
retrystatus -> do
  let storeHash :: Text
storeHash = (Text, Text) -> Text
forall a b. (a, b) -> a
fst ((Text, Text) -> Text) -> (Text, Text) -> Text
forall a b. (a -> b) -> a -> b
$ Text -> (Text, Text)
splitStorePath (Text -> (Text, Text)) -> Text -> (Text, Text)
forall a b. (a -> b) -> a -> b
$ Text -> Text
forall a b. StringConv a b => a -> b
toS Text
storePath
      name :: Text
name = PushCache -> Text
pushCacheName PushCache
cache
  -- Check if narinfo already exists
  -- TODO: query also cache.nixos.org? server-side?
  Either ClientError NoContent
res <-
    IO (Either ClientError NoContent)
-> m (Either ClientError NoContent)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either ClientError NoContent)
 -> m (Either ClientError NoContent))
-> IO (Either ClientError NoContent)
-> m (Either ClientError NoContent)
forall a b. (a -> b) -> a -> b
$ (ClientM NoContent -> ClientEnv -> IO (Either ClientError NoContent)
forall a.
NFData a =>
ClientM a -> ClientEnv -> IO (Either ClientError a)
`runClientM` ClientEnv
clientEnv) (ClientM NoContent -> IO (Either ClientError NoContent))
-> ClientM NoContent -> IO (Either ClientError NoContent)
forall a b. (a -> b) -> a -> b
$
      BinaryCacheAPI (AsClientT ClientM)
-> Token -> NarInfoC -> ClientM NoContent
forall route.
BinaryCacheAPI route
-> route :- (CachixAuth :> (Capture "narinfo" NarInfoC :> Head))
Api.narinfoHead
        (Text -> BinaryCacheAPI (AsClientT ClientM)
cachixBCClient Text
name)
        (PushCache -> Token
pushCacheToken PushCache
cache)
        (Text -> NarInfoC
Api.NarInfoC Text
storeHash)
  case Either ClientError NoContent
res of
    Right NoContent -> PushStrategy m r -> m r
forall (m :: * -> *) r. PushStrategy m r -> m r
onAlreadyPresent PushStrategy m r
cb -- we're done as store path is already in the cache
    Left err :: ClientError
err
      | ClientError -> Status -> Bool
isErr ClientError
err Status
status404 -> ClientEnv
-> Store
-> PushCache
-> PushStrategy m r
-> Text
-> RetryStatus
-> m r
forall (m :: * -> *) r.
(MonadMask m, MonadIO m) =>
ClientEnv
-> Store
-> PushCache
-> PushStrategy m r
-> Text
-> RetryStatus
-> m r
uploadStorePath ClientEnv
clientEnv Store
_store PushCache
cache PushStrategy m r
cb Text
storePath RetryStatus
retrystatus
      | ClientError -> Status -> Bool
isErr ClientError
err Status
status401 -> PushStrategy m r -> m r
forall (m :: * -> *) r. PushStrategy m r -> m r
on401 PushStrategy m r
cb
      | Bool
otherwise -> PushStrategy m r -> ClientError -> m r
forall (m :: * -> *) r. PushStrategy m r -> ClientError -> m r
onError PushStrategy m r
cb ClientError
err

uploadStorePath ::
  (MonadMask m, MonadIO m) =>
  -- | cachix base url, connection manager, see 'Cachix.Client.URI.defaultCachixBaseUrl', 'Servant.Client.mkClientEnv'
  ClientEnv ->
  Store ->
  -- | details for pushing to cache
  PushCache ->
  -- | how to report results, (some) errors, and do some things
  PushStrategy m r ->
  -- | store path
  Text ->
  RetryStatus ->
  -- | r is determined by the 'PushStrategy'
  m r
uploadStorePath :: ClientEnv
-> Store
-> PushCache
-> PushStrategy m r
-> Text
-> RetryStatus
-> m r
uploadStorePath clientEnv :: ClientEnv
clientEnv store :: Store
store cache :: PushCache
cache cb :: PushStrategy m r
cb storePath :: Text
storePath retrystatus :: RetryStatus
retrystatus = do
  let (storeHash :: Text
storeHash, storeSuffix :: Text
storeSuffix) = Text -> (Text, Text)
splitStorePath (Text -> (Text, Text)) -> Text -> (Text, Text)
forall a b. (a -> b) -> a -> b
$ Text -> Text
forall a b. StringConv a b => a -> b
toS Text
storePath
      name :: Text
name = PushCache -> Text
pushCacheName PushCache
cache
  IORef Integer
narSizeRef <- IO (IORef Integer) -> m (IORef Integer)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef Integer) -> m (IORef Integer))
-> IO (IORef Integer) -> m (IORef Integer)
forall a b. (a -> b) -> a -> b
$ Integer -> IO (IORef Integer)
forall a. a -> IO (IORef a)
newIORef 0
  IORef Integer
fileSizeRef <- IO (IORef Integer) -> m (IORef Integer)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef Integer) -> m (IORef Integer))
-> IO (IORef Integer) -> m (IORef Integer)
forall a b. (a -> b) -> a -> b
$ Integer -> IO (IORef Integer)
forall a. a -> IO (IORef a)
newIORef 0
  IORef ByteString
narHashRef <- IO (IORef ByteString) -> m (IORef ByteString)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ByteString) -> m (IORef ByteString))
-> IO (IORef ByteString) -> m (IORef ByteString)
forall a b. (a -> b) -> a -> b
$ ByteString -> IO (IORef ByteString)
forall a. a -> IO (IORef a)
newIORef ("" :: ByteString)
  IORef ByteString
fileHashRef <- IO (IORef ByteString) -> m (IORef ByteString)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ByteString) -> m (IORef ByteString))
-> IO (IORef ByteString) -> m (IORef ByteString)
forall a b. (a -> b) -> a -> b
$ ByteString -> IO (IORef ByteString)
forall a. a -> IO (IORef a)
newIORef ("" :: ByteString)
  ByteString
normalized <- IO ByteString -> m ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> m ByteString) -> IO ByteString -> m ByteString
forall a b. (a -> b) -> a -> b
$ Store -> ByteString -> IO ByteString
Store.followLinksToStorePath Store
store (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
forall a b. StringConv a b => a -> b
toS Text
storePath
  ForeignPtr (Ref ValidPathInfo)
pathinfo <- IO (ForeignPtr (Ref ValidPathInfo))
-> m (ForeignPtr (Ref ValidPathInfo))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ForeignPtr (Ref ValidPathInfo))
 -> m (ForeignPtr (Ref ValidPathInfo)))
-> IO (ForeignPtr (Ref ValidPathInfo))
-> m (ForeignPtr (Ref ValidPathInfo))
forall a b. (a -> b) -> a -> b
$ Store -> ByteString -> IO (ForeignPtr (Ref ValidPathInfo))
Store.queryPathInfo Store
store ByteString
normalized
  -- stream store path as xz compressed nar file
  let cmd :: CreateProcess
cmd = FilePath -> [FilePath] -> CreateProcess
proc "nix-store" ["--dump", Text -> FilePath
forall a b. StringConv a b => a -> b
toS Text
storePath]
      storePathSize :: Int64
      storePathSize :: Int64
storePathSize = ForeignPtr (Ref ValidPathInfo) -> Int64
Store.validPathInfoNarSize ForeignPtr (Ref ValidPathInfo)
pathinfo
  PushStrategy m r -> RetryStatus -> Int64 -> m ()
forall (m :: * -> *) r.
PushStrategy m r -> RetryStatus -> Int64 -> m ()
onAttempt PushStrategy m r
cb RetryStatus
retrystatus Int64
storePathSize
  (ClosedStream, stdoutStream :: ConduitM () ByteString (ResourceT IO) ()
stdoutStream, Inherited, cph :: StreamingProcessHandle
cph) <- IO
  (ClosedStream, ConduitM () ByteString (ResourceT IO) (), Inherited,
   StreamingProcessHandle)
-> m (ClosedStream, ConduitM () ByteString (ResourceT IO) (),
      Inherited, StreamingProcessHandle)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
   (ClosedStream, ConduitM () ByteString (ResourceT IO) (), Inherited,
    StreamingProcessHandle)
 -> m (ClosedStream, ConduitM () ByteString (ResourceT IO) (),
       Inherited, StreamingProcessHandle))
-> IO
     (ClosedStream, ConduitM () ByteString (ResourceT IO) (), Inherited,
      StreamingProcessHandle)
-> m (ClosedStream, ConduitM () ByteString (ResourceT IO) (),
      Inherited, StreamingProcessHandle)
forall a b. (a -> b) -> a -> b
$ CreateProcess
-> IO
     (ClosedStream, ConduitM () ByteString (ResourceT IO) (), Inherited,
      StreamingProcessHandle)
forall (m :: * -> *) stdin stdout stderr.
(MonadIO m, InputSource stdin, OutputSink stdout,
 OutputSink stderr) =>
CreateProcess -> m (stdin, stdout, stderr, StreamingProcessHandle)
streamingProcess CreateProcess
cmd
  PushStrategy m r
-> forall a.
   (ConduitM ByteString ByteString (ResourceT IO) () -> m a) -> m a
forall (m :: * -> *) r.
PushStrategy m r
-> forall a.
   (ConduitM ByteString ByteString (ResourceT IO) () -> m a) -> m a
withXzipCompressor PushStrategy m r
cb ((ConduitM ByteString ByteString (ResourceT IO) () -> m r) -> m r)
-> (ConduitM ByteString ByteString (ResourceT IO) () -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \xzCompressor :: ConduitM ByteString ByteString (ResourceT IO) ()
xzCompressor -> do
    let stream' :: ConduitM () ByteString (ResourceT IO) ()
stream' =
          ConduitM () ByteString (ResourceT IO) ()
stdoutStream
            ConduitM () ByteString (ResourceT IO) ()
-> ConduitM ByteString ByteString (ResourceT IO) ()
-> ConduitM () ByteString (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| IORef Integer -> ConduitM ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
IORef Integer -> ConduitT ByteString ByteString m ()
passthroughSizeSink IORef Integer
narSizeRef
            ConduitM ByteString ByteString (ResourceT IO) ()
-> ConduitM ByteString ByteString (ResourceT IO) ()
-> ConduitM ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| IORef ByteString
-> ConduitM ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
IORef ByteString -> ConduitT ByteString ByteString m ()
passthroughHashSink IORef ByteString
narHashRef
            ConduitM ByteString ByteString (ResourceT IO) ()
-> ConduitM ByteString ByteString (ResourceT IO) ()
-> ConduitM ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM ByteString ByteString (ResourceT IO) ()
xzCompressor
            ConduitM ByteString ByteString (ResourceT IO) ()
-> ConduitM ByteString ByteString (ResourceT IO) ()
-> ConduitM ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| IORef Integer -> ConduitM ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
IORef Integer -> ConduitT ByteString ByteString m ()
passthroughSizeSink IORef Integer
fileSizeRef
            ConduitM ByteString ByteString (ResourceT IO) ()
-> ConduitM ByteString ByteString (ResourceT IO) ()
-> ConduitM ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| IORef ByteString
-> ConduitM ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
IORef ByteString -> ConduitT ByteString ByteString m ()
passthroughHashSinkB16 IORef ByteString
fileHashRef
    let subdomain :: FilePath
subdomain =
          -- TODO: multipart
          if (Int64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
storePathSize Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ (1024 Double -> Double -> Double
forall a. Num a => a -> a -> a
* 1024) :: Double) Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> 100
            then "api"
            else Text -> FilePath
forall a b. StringConv a b => a -> b
toS Text
name
        newClientEnv :: ClientEnv
newClientEnv =
          ClientEnv
clientEnv
            { baseUrl :: BaseUrl
baseUrl = (ClientEnv -> BaseUrl
baseUrl ClientEnv
clientEnv) {baseUrlHost :: FilePath
baseUrlHost = FilePath
subdomain FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> "." FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> BaseUrl -> FilePath
baseUrlHost (ClientEnv -> BaseUrl
baseUrl ClientEnv
clientEnv)}
            }
    (NoContent
_ :: NoContent) <-
      IO NoContent -> m NoContent
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
        (IO NoContent -> m NoContent) -> IO NoContent -> m NoContent
forall a b. (a -> b) -> a -> b
$ (ClientM NoContent
-> ClientEnv
-> (Either ClientError NoContent -> IO NoContent)
-> IO NoContent
forall a b.
ClientM a -> ClientEnv -> (Either ClientError a -> IO b) -> IO b
`withClientM` ClientEnv
newClientEnv)
          (BinaryCacheStreamingAPI (AsClientT ClientM)
-> ConduitM () ByteString (ResourceT IO) () -> ClientM NoContent
forall route.
BinaryCacheStreamingAPI route
-> route
   :- ("nar"
       :> (StreamBody
             NoFraming XNixNar (ConduitM () ByteString (ResourceT IO) ())
           :> Post '[JSON] NoContent))
Api.createNar (Text -> BinaryCacheStreamingAPI (AsClientT ClientM)
cachixBCStreamingClient Text
name) ConduitM () ByteString (ResourceT IO) ()
stream')
        ((Either ClientError NoContent -> IO NoContent) -> IO NoContent)
-> (Either ClientError NoContent -> IO NoContent) -> IO NoContent
forall a b. (a -> b) -> a -> b
$ Either ClientError NoContent -> IO NoContent
forall exc (m :: * -> *) a.
(Exception exc, MonadThrow m) =>
Either exc a -> m a
escalate
          (Either ClientError NoContent -> IO NoContent)
-> (NoContent -> IO NoContent)
-> Either ClientError NoContent
-> IO NoContent
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> \NoContent -> do
            ExitCode
exitcode <- StreamingProcessHandle -> IO ExitCode
forall (m :: * -> *).
MonadIO m =>
StreamingProcessHandle -> m ExitCode
waitForStreamingProcess StreamingProcessHandle
cph
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ExitCode
exitcode ExitCode -> ExitCode -> Bool
forall a. Eq a => a -> a -> Bool
/= ExitCode
ExitSuccess) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ CachixException -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (CachixException -> IO ()) -> CachixException -> IO ()
forall a b. (a -> b) -> a -> b
$ ExitCode -> Text -> CachixException
NarStreamingError ExitCode
exitcode (Text -> CachixException) -> Text -> CachixException
forall a b. (a -> b) -> a -> b
$ CreateProcess -> Text
forall a b. (Show a, StringConv FilePath b) => a -> b
show CreateProcess
cmd
            NoContent -> IO NoContent
forall (m :: * -> *) a. Monad m => a -> m a
return NoContent
NoContent
    (NoContent
_ :: NoContent) <- IO NoContent -> m NoContent
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO NoContent -> m NoContent) -> IO NoContent -> m NoContent
forall a b. (a -> b) -> a -> b
$ do
      Integer
narSize <- IORef Integer -> IO Integer
forall a. IORef a -> IO a
readIORef IORef Integer
narSizeRef
      Text
narHash <- ("sha256:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<>) (Text -> Text) -> (ByteString -> Text) -> ByteString -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
System.Nix.Base32.encode (ByteString -> Text) -> IO ByteString -> IO Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef ByteString -> IO ByteString
forall a. IORef a -> IO a
readIORef IORef ByteString
narHashRef
      ByteString
narHashNix <- ForeignPtr (Ref ValidPathInfo) -> IO ByteString
Store.validPathInfoNarHash ForeignPtr (Ref ValidPathInfo)
pathinfo
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Text
narHash Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString -> Text
forall a b. StringConv a b => a -> b
toS ByteString
narHashNix) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ CachixException -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (CachixException -> IO ()) -> CachixException -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> CachixException
NarHashMismatch "Nar hash mismatch between nix-store --dump and nix db"
      ByteString
fileHash <- IORef ByteString -> IO ByteString
forall a. IORef a -> IO a
readIORef IORef ByteString
fileHashRef
      Integer
fileSize <- IORef Integer -> IO Integer
forall a. IORef a -> IO a
readIORef IORef Integer
fileSizeRef
      Text
deriver <-
        if PushStrategy m r -> Bool
forall (m :: * -> *) r. PushStrategy m r -> Bool
omitDeriver PushStrategy m r
cb
          then Text -> IO Text
forall (f :: * -> *) a. Applicative f => a -> f a
pure Text
Store.unknownDeriver
          else ByteString -> Text
forall a b. StringConv a b => a -> b
toS (ByteString -> Text) -> IO ByteString -> IO Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ForeignPtr (Ref ValidPathInfo) -> IO ByteString
Store.validPathInfoDeriver ForeignPtr (Ref ValidPathInfo)
pathinfo
      PathSet
referencesPathSet <- ForeignPtr (Ref ValidPathInfo) -> IO PathSet
Store.validPathInfoReferences ForeignPtr (Ref ValidPathInfo)
pathinfo
      [Text]
references <- [Text] -> [Text]
forall a. Ord a => [a] -> [a]
sort ([Text] -> [Text]) -> IO [Text] -> IO [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (ByteString -> IO Text) -> PathSet -> IO [Text]
forall a. (ByteString -> IO a) -> PathSet -> IO [a]
Store.traversePathSet (Text -> IO Text
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> IO Text) -> (ByteString -> Text) -> ByteString -> IO Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
forall a b. StringConv a b => a -> b
toS) PathSet
referencesPathSet
      let fp :: ByteString
fp = Text -> Text -> Integer -> [Text] -> ByteString
fingerprint Text
storePath Text
narHash Integer
narSize [Text]
references
          sig :: Signature
sig = SecretKey -> ByteString -> Signature
dsign (SigningKey -> SecretKey
signingSecretKey (SigningKey -> SecretKey) -> SigningKey -> SecretKey
forall a b. (a -> b) -> a -> b
$ PushCache -> SigningKey
pushCacheSigningKey PushCache
cache) ByteString
fp
          nic :: NarInfoCreate
nic =
            NarInfoCreate :: Text
-> Text
-> Text
-> Integer
-> Text
-> Integer
-> [Text]
-> Text
-> Text
-> NarInfoCreate
Api.NarInfoCreate
              { cStoreHash :: Text
Api.cStoreHash = Text
storeHash,
                cStoreSuffix :: Text
Api.cStoreSuffix = Text
storeSuffix,
                cNarHash :: Text
Api.cNarHash = Text
narHash,
                cNarSize :: Integer
Api.cNarSize = Integer
narSize,
                cFileSize :: Integer
Api.cFileSize = Integer
fileSize,
                cFileHash :: Text
Api.cFileHash = ByteString -> Text
forall a b. StringConv a b => a -> b
toS ByteString
fileHash,
                cReferences :: [Text]
Api.cReferences = (Text -> Text) -> [Text] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> Text -> Text
T.drop 11) [Text]
references,
                cDeriver :: Text
Api.cDeriver =
                  if Text
deriver Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
Store.unknownDeriver
                    then Text
deriver
                    else Int -> Text -> Text
T.drop 11 Text
deriver,
                cSig :: Text
Api.cSig = ByteString -> Text
forall a b. StringConv a b => a -> b
toS (ByteString -> Text) -> ByteString -> Text
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
B64.encode (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Signature -> ByteString
unSignature Signature
sig
              }
      Either NarInfoInvalid () -> IO ()
forall exc (m :: * -> *) a.
(Exception exc, MonadThrow m) =>
Either exc a -> m a
escalate (Either NarInfoInvalid () -> IO ())
-> Either NarInfoInvalid () -> IO ()
forall a b. (a -> b) -> a -> b
$ NarInfoCreate -> Either NarInfoInvalid ()
Api.isNarInfoCreateValid NarInfoCreate
nic
      -- Upload narinfo with signature
      Either ClientError NoContent -> IO NoContent
forall exc (m :: * -> *) a.
(Exception exc, MonadThrow m) =>
Either exc a -> m a
escalate (Either ClientError NoContent -> IO NoContent)
-> (ClientM NoContent -> IO (Either ClientError NoContent))
-> ClientM NoContent
-> IO NoContent
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< (ClientM NoContent -> ClientEnv -> IO (Either ClientError NoContent)
forall a.
NFData a =>
ClientM a -> ClientEnv -> IO (Either ClientError a)
`runClientM` ClientEnv
clientEnv) (ClientM NoContent -> IO NoContent)
-> ClientM NoContent -> IO NoContent
forall a b. (a -> b) -> a -> b
$
        BinaryCacheAPI (AsClientT ClientM)
-> NarInfoC -> NarInfoCreate -> ClientM NoContent
forall route.
BinaryCacheAPI route
-> route
   :- (Capture "narinfo" NarInfoC
       :> (ReqBody '[JSON] NarInfoCreate :> Post '[JSON] NoContent))
Api.createNarinfo
          (Text -> BinaryCacheAPI (AsClientT ClientM)
cachixBCClient Text
name)
          (Text -> NarInfoC
Api.NarInfoC Text
storeHash)
          NarInfoCreate
nic
    PushStrategy m r -> m r
forall (m :: * -> *) r. PushStrategy m r -> m r
onDone PushStrategy m r
cb

-- Catches all exceptions except skipAsyncExceptions
retryAll :: (MonadIO m, MonadMask m) => (RetryStatus -> m a) -> m a
retryAll :: (RetryStatus -> m a) -> m a
retryAll = RetryPolicyM m -> (RetryStatus -> m a) -> m a
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
RetryPolicyM m -> (RetryStatus -> m a) -> m a
recoverAll RetryPolicyM m
RetryPolicy
defaultRetryPolicy
  where
    defaultRetryPolicy :: RetryPolicy
    defaultRetryPolicy :: RetryPolicyM m
defaultRetryPolicy =
      Int -> RetryPolicy
exponentialBackoff 100000 RetryPolicyM m -> RetryPolicyM m -> RetryPolicyM m
forall a. Semigroup a => a -> a -> a
<> Int -> RetryPolicy
limitRetries 3

-- | Push an entire closure
--
-- Note: 'onAlreadyPresent' will be called less often in the future.
pushClosure ::
  (MonadIO m, MonadMask m) =>
  -- | Traverse paths, responsible for bounding parallel processing of paths
  --
  -- For example: @'mapConcurrentlyBounded' 4@
  (forall a b. (a -> m b) -> [a] -> m [b]) ->
  -- | See 'pushSingleStorePath'
  ClientEnv ->
  Store ->
  PushCache ->
  (Text -> PushStrategy m r) ->
  -- | Initial store paths
  [Text] ->
  -- | Every @r@ per store path of the entire closure of store paths
  m [r]
pushClosure :: (forall a b. (a -> m b) -> [a] -> m [b])
-> ClientEnv
-> Store
-> PushCache
-> (Text -> PushStrategy m r)
-> [Text]
-> m [r]
pushClosure traversal :: forall a b. (a -> m b) -> [a] -> m [b]
traversal clientEnv :: ClientEnv
clientEnv store :: Store
store pushCache :: PushCache
pushCache pushStrategy :: Text -> PushStrategy m r
pushStrategy inputStorePaths :: [Text]
inputStorePaths = do
  -- Get the transitive closure of dependencies
  [Text]
paths <-
    IO [Text] -> m [Text]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Text] -> m [Text]) -> IO [Text] -> m [Text]
forall a b. (a -> b) -> a -> b
$ do
      PathSet
inputs <- IO PathSet
Store.newEmptyPathSet
      [Text] -> (Text -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [Text]
inputStorePaths ((Text -> IO ()) -> IO ()) -> (Text -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \path :: Text
path -> do
        ByteString
normalized <- Store -> ByteString -> IO ByteString
Store.followLinksToStorePath Store
store (Text -> ByteString
encodeUtf8 Text
path)
        ByteString -> PathSet -> IO ()
Store.addToPathSet ByteString
normalized PathSet
inputs
      PathSet
closure <- Store -> ClosureParams -> PathSet -> IO PathSet
Store.computeFSClosure Store
store ClosureParams
Store.defaultClosureParams PathSet
inputs
      (ByteString -> IO Text) -> PathSet -> IO [Text]
forall a. (ByteString -> IO a) -> PathSet -> IO [a]
Store.traversePathSet (Text -> IO Text
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Text -> IO Text) -> (ByteString -> Text) -> ByteString -> IO Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text
forall a b. StringConv a b => a -> b
toSL) PathSet
closure
  -- Check what store paths are missing
  -- TODO: query also cache.nixos.org? server-side?
  [Text]
missingHashesList <-
    (RetryStatus -> m [Text]) -> m [Text]
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
(RetryStatus -> m a) -> m a
retryAll ((RetryStatus -> m [Text]) -> m [Text])
-> (RetryStatus -> m [Text]) -> m [Text]
forall a b. (a -> b) -> a -> b
$ \_ ->
      Either ClientError [Text] -> m [Text]
forall exc (m :: * -> *) a.
(Exception exc, MonadThrow m) =>
Either exc a -> m a
escalate
        (Either ClientError [Text] -> m [Text])
-> m (Either ClientError [Text]) -> m [Text]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (Either ClientError [Text]) -> m (Either ClientError [Text])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
          ( (ClientM [Text] -> ClientEnv -> IO (Either ClientError [Text])
forall a.
NFData a =>
ClientM a -> ClientEnv -> IO (Either ClientError a)
`runClientM` ClientEnv
clientEnv) (ClientM [Text] -> IO (Either ClientError [Text]))
-> ClientM [Text] -> IO (Either ClientError [Text])
forall a b. (a -> b) -> a -> b
$
              BinaryCacheAPI (AsClientT ClientM)
-> Token -> [Text] -> ClientM [Text]
forall route.
BinaryCacheAPI route
-> route
   :- (CachixAuth
       :> ("narinfo"
           :> (Summary
                 "Given a list of store hashes, return a list of those that are missing"
               :> (ReqBody '[JSON] [Text] :> Post '[JSON] [Text]))))
Api.narinfoBulk
                (Text -> BinaryCacheAPI (AsClientT ClientM)
cachixBCClient (PushCache -> Text
pushCacheName PushCache
pushCache))
                (PushCache -> Token
pushCacheToken PushCache
pushCache)
                ((Text, Text) -> Text
forall a b. (a, b) -> a
fst ((Text, Text) -> Text) -> (Text -> (Text, Text)) -> Text -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> (Text, Text)
splitStorePath (Text -> Text) -> [Text] -> [Text]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Text]
paths)
          )
  let missingHashes :: Set Text
missingHashes = [Text] -> Set Text
forall a. Ord a => [a] -> Set a
Set.fromList [Text]
missingHashesList
      missingPaths :: [Text]
missingPaths = (Text -> Bool) -> [Text] -> [Text]
forall a. (a -> Bool) -> [a] -> [a]
filter (\path :: Text
path -> Text -> Set Text -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member ((Text, Text) -> Text
forall a b. (a, b) -> a
fst (Text -> (Text, Text)
splitStorePath Text
path)) Set Text
missingHashes) [Text]
paths
  -- TODO: make pool size configurable, on beefier machines this could be doubled
  (Text -> m r) -> [Text] -> m [r]
forall a b. (a -> m b) -> [a] -> m [b]
traversal (\path :: Text
path -> (RetryStatus -> m r) -> m r
forall (m :: * -> *) a.
(MonadIO m, MonadMask m) =>
(RetryStatus -> m a) -> m a
retryAll ((RetryStatus -> m r) -> m r) -> (RetryStatus -> m r) -> m r
forall a b. (a -> b) -> a -> b
$ \retrystatus :: RetryStatus
retrystatus -> ClientEnv
-> Store
-> PushCache
-> PushStrategy m r
-> Text
-> RetryStatus
-> m r
forall (m :: * -> *) r.
(MonadMask m, MonadIO m) =>
ClientEnv
-> Store
-> PushCache
-> PushStrategy m r
-> Text
-> RetryStatus
-> m r
uploadStorePath ClientEnv
clientEnv Store
store PushCache
pushCache (Text -> PushStrategy m r
pushStrategy Text
path) Text
path RetryStatus
retrystatus) [Text]
missingPaths

mapConcurrentlyBounded :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
mapConcurrentlyBounded :: Int -> (a -> IO b) -> t a -> IO (t b)
mapConcurrentlyBounded bound :: Int
bound action :: a -> IO b
action items :: t a
items = do
  QSem
qs <- Int -> IO QSem
QSem.newQSem Int
bound
  let wrapped :: a -> IO b
wrapped x :: a
x = IO () -> IO () -> IO b -> IO b
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (QSem -> IO ()
QSem.waitQSem QSem
qs) (QSem -> IO ()
QSem.signalQSem QSem
qs) (a -> IO b
action a
x)
  (a -> IO b) -> t a -> IO (t b)
forall (t :: * -> *) a b.
Traversable t =>
(a -> IO b) -> t a -> IO (t b)
mapConcurrently a -> IO b
wrapped t a
items

-------------------
-- Private terms --
splitStorePath :: Text -> (Text, Text)
splitStorePath :: Text -> (Text, Text)
splitStorePath storePath :: Text
storePath =
  (Int -> Text -> Text
T.take 32 (Int -> Text -> Text
T.drop 11 Text
storePath), Int -> Text -> Text
T.drop 44 Text
storePath)