// // experimental/impl/parallel_group.hpp // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP #define ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP #if defined(_MSC_VER) && (_MSC_VER >= 1200) # pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include "asio/detail/config.hpp" #include #include #include #include #include #include "asio/associated_cancellation_slot.hpp" #include "asio/detail/recycling_allocator.hpp" #include "asio/detail/type_traits.hpp" #include "asio/dispatch.hpp" #include "asio/detail/push_options.hpp" namespace asio { namespace experimental { namespace detail { // Stores the result from an individual asynchronous operation. template struct parallel_group_op_result { public: parallel_group_op_result() : has_value_(false) { } parallel_group_op_result(parallel_group_op_result&& other) : has_value_(other.has_value_) { if (has_value_) new (&u_.value_) T(std::move(other.get())); } ~parallel_group_op_result() { if (has_value_) u_.value_.~T(); } T& get() noexcept { return u_.value_; } template void emplace(Args&&... args) { new (&u_.value_) T(std::forward(args)...); has_value_ = true; } private: union u { u() {} ~u() {} char c_; T value_; } u_; bool has_value_; }; // Proxy completion handler for the group of parallel operatations. Unpacks and // concatenates the individual operations' results, and invokes the user's // completion handler. template struct parallel_group_completion_handler { typedef typename decay< typename prefer_result< typename associated_executor::type, execution::outstanding_work_t::tracked_t >::type >::type executor_type; parallel_group_completion_handler(Handler&& h) : handler_(std::move(h)), executor_( asio::prefer( asio::get_associated_executor(handler_), execution::outstanding_work.tracked)) { } executor_type get_executor() const noexcept { return executor_; } void operator()() { this->invoke(asio::detail::make_index_sequence()); } template void invoke(asio::detail::index_sequence) { this->invoke(std::tuple_cat(std::move(std::get(args_).get())...)); } template void invoke(std::tuple&& args) { this->invoke(std::move(args), asio::detail::index_sequence_for()); } template void invoke(std::tuple&& args, asio::detail::index_sequence) { std::move(handler_)(completion_order_, std::move(std::get(args))...); } Handler handler_; executor_type executor_; std::array completion_order_{}; std::tuple< parallel_group_op_result< typename parallel_op_signature_as_tuple< typename completion_signature_of::type >::type >... > args_{}; }; // Shared state for the parallel group. template struct parallel_group_state { parallel_group_state(Condition&& c, Handler&& h) : cancellation_condition_(std::move(c)), handler_(std::move(h)) { } // The number of operations that have completed so far. Used to determine the // order of completion. std::atomic completed_{0}; // The non-none cancellation type that resulted from a cancellation condition. // Stored here for use by the group's initiating function. std::atomic cancel_type_{cancellation_type::none}; // The number of cancellations that have been requested, either on completion // of the operations within the group, or via the cancellation slot for the // group operation. Initially set to the number of operations to prevent // cancellation signals from being emitted until after all of the group's // operations' initiating functions have completed. std::atomic cancellations_requested_{sizeof...(Ops)}; // The number of operations that are yet to complete. Used to determine when // it is safe to invoke the user's completion handler. std::atomic outstanding_{sizeof...(Ops)}; // The cancellation signals for each operation in the group. asio::cancellation_signal cancellation_signals_[sizeof...(Ops)]; // The cancellation condition is used to determine whether the results from an // individual operation warrant a cancellation request for the whole group. Condition cancellation_condition_; // The proxy handler to be invoked once all operations in the group complete. parallel_group_completion_handler handler_; }; // Handler for an individual operation within the parallel group. template struct parallel_group_op_handler { typedef asio::cancellation_slot cancellation_slot_type; parallel_group_op_handler( std::shared_ptr > state) : state_(std::move(state)) { } cancellation_slot_type get_cancellation_slot() const noexcept { return state_->cancellation_signals_[I].slot(); } template void operator()(Args... args) { // Capture this operation into the completion order. state_->handler_.completion_order_[state_->completed_++] = I; // Determine whether the results of this operation require cancellation of // the whole group. cancellation_type_t cancel_type = state_->cancellation_condition_(args...); // Capture the result of the operation into the proxy completion handler. std::get(state_->handler_.args_).emplace(std::move(args)...); if (cancel_type != cancellation_type::none) { // Save the type for potential use by the group's initiating function. state_->cancel_type_ = cancel_type; // If we are the first operation to request cancellation, emit a signal // for each operation in the group. if (state_->cancellations_requested_++ == 0) for (std::size_t i = 0; i < sizeof...(Ops); ++i) if (i != I) state_->cancellation_signals_[i].emit(cancel_type); } // If this is the last outstanding operation, invoke the user's handler. if (--state_->outstanding_ == 0) asio::dispatch(std::move(state_->handler_)); } std::shared_ptr > state_; }; // Handler for an individual operation within the parallel group that has an // explicitly specified executor. template struct parallel_group_op_handler_with_executor : parallel_group_op_handler { typedef parallel_group_op_handler base_type; typedef asio::cancellation_slot cancellation_slot_type; typedef Executor executor_type; parallel_group_op_handler_with_executor( std::shared_ptr > state, executor_type ex) : parallel_group_op_handler(std::move(state)) { cancel_proxy_ = &this->state_->cancellation_signals_[I].slot().template emplace(this->state_, std::move(ex)); } cancellation_slot_type get_cancellation_slot() const noexcept { return cancel_proxy_->signal_.slot(); } executor_type get_executor() const noexcept { return cancel_proxy_->executor_; } // Proxy handler that forwards the emitted signal to the correct executor. struct cancel_proxy { cancel_proxy( std::shared_ptr > state, executor_type ex) : state_(std::move(state)), executor_(std::move(ex)) { } void operator()(cancellation_type_t type) { if (auto state = state_.lock()) { asio::cancellation_signal* sig = &signal_; asio::dispatch(executor_, [state, sig, type]{ sig->emit(type); }); } } std::weak_ptr > state_; asio::cancellation_signal signal_; executor_type executor_; }; cancel_proxy* cancel_proxy_; }; // Helper to launch an operation using the correct executor, if any. template struct parallel_group_op_launcher { template static void launch(Op& op, const std::shared_ptr >& state) { typedef typename associated_executor::type ex_type; ex_type ex = asio::get_associated_executor(op); std::move(op)( parallel_group_op_handler_with_executor(state, std::move(ex))); } }; // Specialised launcher for operations that specify no executor. template struct parallel_group_op_launcher::asio_associated_executor_is_unspecialised, void >::value >::type> { template static void launch(Op& op, const std::shared_ptr >& state) { std::move(op)( parallel_group_op_handler(state)); } }; template struct parallel_group_cancellation_handler { parallel_group_cancellation_handler( std::shared_ptr > state) : state_(std::move(state)) { } void operator()(cancellation_type_t cancel_type) { // If we are the first place to request cancellation, i.e. no operation has // yet completed and requested cancellation, emit a signal for each // operation in the group. if (cancel_type != cancellation_type::none) if (auto state = state_.lock()) if (state->cancellations_requested_++ == 0) for (std::size_t i = 0; i < sizeof...(Ops); ++i) state->cancellation_signals_[i].emit(cancel_type); } std::weak_ptr > state_; }; template void parallel_group_launch(Condition cancellation_condition, Handler handler, std::tuple& ops, asio::detail::index_sequence) { // Get the user's completion handler's cancellation slot, so that we can allow // cancellation of the entire group. typename associated_cancellation_slot::type slot = asio::get_associated_cancellation_slot(handler); // Create the shared state for the operation. typedef parallel_group_state state_type; std::shared_ptr state = std::allocate_shared( asio::detail::recycling_allocator(), std::move(cancellation_condition), std::move(handler)); // Initiate each individual operation in the group. int fold[] = { 0, ( parallel_group_op_launcher::launch(std::get(ops), state), 0 )... }; (void)fold; // Check if any of the operations has already requested cancellation, and if // so, emit a signal for each operation in the group. if ((state->cancellations_requested_ -= sizeof...(Ops)) > 0) for (auto& signal : state->cancellation_signals_) signal.emit(state->cancel_type_); // Register a handler with the user's completion handler's cancellation slot. if (slot.is_connected()) slot.template emplace< parallel_group_cancellation_handler< Condition, Handler, Ops...> >(state); } // Proxy completion handler for the ranged group of parallel operatations. // Unpacks and recombines the individual operations' results, and invokes the // user's completion handler. template struct ranged_parallel_group_completion_handler { typedef typename decay< typename prefer_result< typename associated_executor::type, execution::outstanding_work_t::tracked_t >::type >::type executor_type; typedef typename parallel_op_signature_as_tuple< typename completion_signature_of::type >::type op_tuple_type; typedef parallel_group_op_result op_result_type; ranged_parallel_group_completion_handler(Handler&& h, std::size_t size, const Allocator& allocator) : handler_(std::move(h)), executor_( asio::prefer( asio::get_associated_executor(handler_), execution::outstanding_work.tracked)), allocator_(allocator), completion_order_(size, 0, ASIO_REBIND_ALLOC(Allocator, std::size_t)(allocator)), args_(ASIO_REBIND_ALLOC(Allocator, op_result_type)(allocator)) { for (std::size_t i = 0; i < size; ++i) args_.emplace_back(); } executor_type get_executor() const noexcept { return executor_; } void operator()() { this->invoke( asio::detail::make_index_sequence< std::tuple_size::value>()); } template void invoke(asio::detail::index_sequence) { typedef typename parallel_op_signature_as_tuple< typename ranged_parallel_group_signature< typename completion_signature_of::type, Allocator >::raw_type >::type vectors_type; // Construct all result vectors using the supplied allocator. vectors_type vectors{ typename std::tuple_element::type( ASIO_REBIND_ALLOC(Allocator, int)(allocator_))...}; // Reserve sufficient space in each of the result vectors. int reserve_fold[] = { 0, ( std::get(vectors).reserve(completion_order_.size()), 0 )... }; (void)reserve_fold; // Copy the results from all operations into the result vectors. for (std::size_t idx = 0; idx < completion_order_.size(); ++idx) { int pushback_fold[] = { 0, ( std::get(vectors).push_back( std::move(std::get(args_[idx].get()))), 0 )... }; (void)pushback_fold; } std::move(handler_)(completion_order_, std::move(std::get(vectors))...); } Handler handler_; executor_type executor_; Allocator allocator_; std::vector completion_order_; std::deque args_; }; // Shared state for the parallel group. template struct ranged_parallel_group_state { ranged_parallel_group_state(Condition&& c, Handler&& h, std::size_t size, const Allocator& allocator) : cancellations_requested_(size), outstanding_(size), cancellation_signals_( ASIO_REBIND_ALLOC(Allocator, asio::cancellation_signal)(allocator)), cancellation_condition_(std::move(c)), handler_(std::move(h), size, allocator) { for (std::size_t i = 0; i < size; ++i) cancellation_signals_.emplace_back(); } // The number of operations that have completed so far. Used to determine the // order of completion. std::atomic completed_{0}; // The non-none cancellation type that resulted from a cancellation condition. // Stored here for use by the group's initiating function. std::atomic cancel_type_{cancellation_type::none}; // The number of cancellations that have been requested, either on completion // of the operations within the group, or via the cancellation slot for the // group operation. Initially set to the number of operations to prevent // cancellation signals from being emitted until after all of the group's // operations' initiating functions have completed. std::atomic cancellations_requested_; // The number of operations that are yet to complete. Used to determine when // it is safe to invoke the user's completion handler. std::atomic outstanding_; // The cancellation signals for each operation in the group. std::deque cancellation_signals_; // The cancellation condition is used to determine whether the results from an // individual operation warrant a cancellation request for the whole group. Condition cancellation_condition_; // The proxy handler to be invoked once all operations in the group complete. ranged_parallel_group_completion_handler handler_; }; // Handler for an individual operation within the parallel group. template struct ranged_parallel_group_op_handler { typedef asio::cancellation_slot cancellation_slot_type; ranged_parallel_group_op_handler( std::shared_ptr > state, std::size_t idx) : state_(std::move(state)), idx_(idx) { } cancellation_slot_type get_cancellation_slot() const noexcept { return state_->cancellation_signals_[idx_].slot(); } template void operator()(Args... args) { // Capture this operation into the completion order. state_->handler_.completion_order_[state_->completed_++] = idx_; // Determine whether the results of this operation require cancellation of // the whole group. cancellation_type_t cancel_type = state_->cancellation_condition_(args...); // Capture the result of the operation into the proxy completion handler. state_->handler_.args_[idx_].emplace(std::move(args)...); if (cancel_type != cancellation_type::none) { // Save the type for potential use by the group's initiating function. state_->cancel_type_ = cancel_type; // If we are the first operation to request cancellation, emit a signal // for each operation in the group. if (state_->cancellations_requested_++ == 0) for (std::size_t i = 0; i < state_->cancellation_signals_.size(); ++i) if (i != idx_) state_->cancellation_signals_[i].emit(cancel_type); } // If this is the last outstanding operation, invoke the user's handler. if (--state_->outstanding_ == 0) asio::dispatch(std::move(state_->handler_)); } std::shared_ptr > state_; std::size_t idx_; }; // Handler for an individual operation within the parallel group that has an // explicitly specified executor. template struct ranged_parallel_group_op_handler_with_executor : ranged_parallel_group_op_handler { typedef ranged_parallel_group_op_handler< Condition, Handler, Op, Allocator> base_type; typedef asio::cancellation_slot cancellation_slot_type; typedef Executor executor_type; ranged_parallel_group_op_handler_with_executor( std::shared_ptr > state, executor_type ex, std::size_t idx) : ranged_parallel_group_op_handler( std::move(state), idx) { cancel_proxy_ = &this->state_->cancellation_signals_[idx].slot().template emplace(this->state_, std::move(ex)); } cancellation_slot_type get_cancellation_slot() const noexcept { return cancel_proxy_->signal_.slot(); } executor_type get_executor() const noexcept { return cancel_proxy_->executor_; } // Proxy handler that forwards the emitted signal to the correct executor. struct cancel_proxy { cancel_proxy( std::shared_ptr > state, executor_type ex) : state_(std::move(state)), executor_(std::move(ex)) { } void operator()(cancellation_type_t type) { if (auto state = state_.lock()) { asio::cancellation_signal* sig = &signal_; asio::dispatch(executor_, [state, sig, type]{ sig->emit(type); }); } } std::weak_ptr > state_; asio::cancellation_signal signal_; executor_type executor_; }; cancel_proxy* cancel_proxy_; }; template struct ranged_parallel_group_cancellation_handler { ranged_parallel_group_cancellation_handler( std::shared_ptr > state) : state_(std::move(state)) { } void operator()(cancellation_type_t cancel_type) { // If we are the first place to request cancellation, i.e. no operation has // yet completed and requested cancellation, emit a signal for each // operation in the group. if (cancel_type != cancellation_type::none) if (auto state = state_.lock()) if (state->cancellations_requested_++ == 0) for (std::size_t i = 0; i < state->cancellation_signals_.size(); ++i) state->cancellation_signals_[i].emit(cancel_type); } std::weak_ptr > state_; }; template void ranged_parallel_group_launch(Condition cancellation_condition, Handler handler, Range&& range, const Allocator& allocator) { // Get the user's completion handler's cancellation slot, so that we can allow // cancellation of the entire group. typename associated_cancellation_slot::type slot = asio::get_associated_cancellation_slot(handler); // The type of the asynchronous operation. typedef typename std::decay())>::type op_type; // Create the shared state for the operation. typedef ranged_parallel_group_state state_type; std::shared_ptr state = std::allocate_shared( asio::detail::recycling_allocator(), std::move(cancellation_condition), std::move(handler), range.size(), allocator); std::size_t idx = 0; for (auto&& op : std::forward(range)) { typedef typename associated_executor::type ex_type; ex_type ex = asio::get_associated_executor(op); std::move(op)( ranged_parallel_group_op_handler_with_executor< ex_type, Condition, Handler, op_type, Allocator>( state, std::move(ex), idx++)); } // Check if any of the operations has already requested cancellation, and if // so, emit a signal for each operation in the group. if ((state->cancellations_requested_ -= range.size()) > 0) for (auto& signal : state->cancellation_signals_) signal.emit(state->cancel_type_); // Register a handler with the user's completion handler's cancellation slot. if (slot.is_connected()) slot.template emplace< ranged_parallel_group_cancellation_handler< Condition, Handler, op_type, Allocator> >(state); } } // namespace detail } // namespace experimental template