// // experimental/impl/co_composed.hpp // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef ASIO_IMPL_EXPERIMENTAL_CO_COMPOSED_HPP #define ASIO_IMPL_EXPERIMENTAL_CO_COMPOSED_HPP #if defined(_MSC_VER) && (_MSC_VER >= 1200) # pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include "asio/detail/config.hpp" #include #include #include #include "asio/associated_cancellation_slot.hpp" #include "asio/associator.hpp" #include "asio/async_result.hpp" #include "asio/cancellation_state.hpp" #include "asio/detail/composed_work.hpp" #include "asio/detail/recycling_allocator.hpp" #include "asio/detail/throw_error.hpp" #include "asio/detail/type_traits.hpp" #include "asio/error.hpp" #if defined(ASIO_HAS_STD_COROUTINE) # include #else // defined(ASIO_HAS_STD_COROUTINE) # include #endif // defined(ASIO_HAS_STD_COROUTINE) #if defined(ASIO_ENABLE_HANDLER_TRACKING) # if defined(ASIO_HAS_SOURCE_LOCATION) # include "asio/detail/source_location.hpp" # endif // defined(ASIO_HAS_SOURCE_LOCATION) #endif // defined(ASIO_ENABLE_HANDLER_TRACKING) #include "asio/detail/push_options.hpp" namespace asio { namespace experimental { namespace detail { #if defined(ASIO_HAS_STD_COROUTINE) using std::coroutine_handle; using std::suspend_always; using std::suspend_never; #else // defined(ASIO_HAS_STD_COROUTINE) using std::experimental::coroutine_handle; using std::experimental::suspend_always; using std::experimental::suspend_never; #endif // defined(ASIO_HAS_STD_COROUTINE) using asio::detail::composed_io_executors; using asio::detail::composed_work; using asio::detail::composed_work_guard; using asio::detail::get_composed_io_executor; using asio::detail::make_composed_io_executors; using asio::detail::recycling_allocator; using asio::detail::throw_error; template class co_composed_state; template class co_composed_handler_base; template class co_composed_promise; template class co_composed_returns { }; struct co_composed_on_suspend { void (*fn_)(void*) = nullptr; void* arg_ = nullptr; }; template struct co_composed_completion : std::tuple { template co_composed_completion(U&&... u) noexcept : std::tuple(std::forward(u)...) { } }; template class co_composed_state_return_overload; template class co_composed_state_return_overload< Executors, Handler, Return, R(Args...)> { public: using derived_type = co_composed_state; using promise_type = co_composed_promise; using return_type = std::tuple; void on_cancellation_complete_with(Args... args) { derived_type& state = *static_cast(this); state.return_value_ = std::make_tuple(std::move(args)...); state.cancellation_on_suspend_fn( [](void* p) { auto& promise = *static_cast(p); co_composed_handler_base composed_handler(promise); Handler handler(std::move(promise.state().handler_)); return_type result( std::move(std::get(promise.state().return_value_))); co_composed_handler_base(std::move(composed_handler)); std::apply(std::move(handler), std::move(result)); }); } }; template class co_composed_state_return; template class co_composed_state_return< Executors, Handler, co_composed_returns> : public co_composed_state_return_overload, Signatures>... { public: using co_composed_state_return_overload, Signatures>::on_cancellation_complete_with...; private: template friend class co_composed_promise_return_overload; template friend class co_composed_state_return_overload; std::variant, Signatures>::return_type...> return_value_; }; template struct co_composed_state_default_cancellation_on_suspend_impl; template struct co_composed_state_default_cancellation_on_suspend_impl< Executors, Handler, Return> { static constexpr void (*fn())(void*) { return nullptr; } }; template struct co_composed_state_default_cancellation_on_suspend_impl< Executors, Handler, Return, R(Args...), Signatures...> { static constexpr void (*fn())(void*) { return co_composed_state_default_cancellation_on_suspend_impl< Executors, Handler, Return, Signatures...>::fn(); } }; template struct co_composed_state_default_cancellation_on_suspend_impl { using promise_type = co_composed_promise; using return_type = std::tuple; static constexpr void (*fn())(void*) { if constexpr ((is_constructible::value && ...)) { return [](void* p) { auto& promise = *static_cast(p); co_composed_handler_base composed_handler(promise); Handler handler(std::move(promise.state().handler_)); co_composed_handler_base(std::move(composed_handler)); std::move(handler)( asio::error_code(asio::error::operation_aborted), Args{}...); }; } else { return co_composed_state_default_cancellation_on_suspend_impl< Executors, Handler, Return, Signatures...>::fn(); } } }; template struct co_composed_state_default_cancellation_on_suspend_impl { using promise_type = co_composed_promise; using return_type = std::tuple; static constexpr void (*fn())(void*) { if constexpr ((is_constructible::value && ...)) { return [](void* p) { auto& promise = *static_cast(p); co_composed_handler_base composed_handler(promise); Handler handler(std::move(promise.state().handler_)); co_composed_handler_base(std::move(composed_handler)); std::move(handler)( std::make_exception_ptr( asio::system_error( asio::error::operation_aborted, "co_await")), Args{}...); }; } else { return co_composed_state_default_cancellation_on_suspend_impl< Executors, Handler, Return, Signatures...>::fn(); } } }; template struct co_composed_state_default_cancellation_on_suspend; template struct co_composed_state_default_cancellation_on_suspend< Executors, Handler, co_composed_returns> : co_composed_state_default_cancellation_on_suspend_impl, Signatures...> { }; template class co_composed_state_cancellation { public: using cancellation_slot_type = cancellation_slot; cancellation_slot_type get_cancellation_slot() const noexcept { return cancellation_state_.slot(); } cancellation_state get_cancellation_state() const noexcept { return cancellation_state_; } void reset_cancellation_state() { cancellation_state_ = cancellation_state( (get_associated_cancellation_slot)( static_cast*>( this)->handler())); } template void reset_cancellation_state(Filter filter) { cancellation_state_ = cancellation_state( (get_associated_cancellation_slot)( static_cast*>( this)->handler()), filter, filter); } template void reset_cancellation_state(InFilter&& in_filter, OutFilter&& out_filter) { cancellation_state_ = cancellation_state( (get_associated_cancellation_slot)( static_cast*>( this)->handler()), std::forward(in_filter), std::forward(out_filter)); } cancellation_type_t cancelled() const noexcept { return cancellation_state_.cancelled(); } void clear_cancellation_slot() noexcept { cancellation_state_.slot().clear(); } [[nodiscard]] bool throw_if_cancelled() const noexcept { return throw_if_cancelled_; } void throw_if_cancelled(bool b) noexcept { throw_if_cancelled_ = b; } [[nodiscard]] bool complete_if_cancelled() const noexcept { return complete_if_cancelled_; } void complete_if_cancelled(bool b) noexcept { complete_if_cancelled_ = b; } private: template friend class co_composed_promise; template friend class co_composed_state_return_overload; void cancellation_on_suspend_fn(void (*fn)(void*)) { cancellation_on_suspend_fn_ = fn; } void check_for_cancellation_on_transform() { if (throw_if_cancelled_ && !!cancelled()) throw_error(asio::error::operation_aborted, "co_await"); } bool check_for_cancellation_on_suspend( co_composed_promise& promise) noexcept { if (complete_if_cancelled_ && !!cancelled() && cancellation_on_suspend_fn_) { promise.state().work_.reset(); promise.state().on_suspend_->fn_ = cancellation_on_suspend_fn_; promise.state().on_suspend_->arg_ = &promise; return false; } return true; } cancellation_state cancellation_state_; void (*cancellation_on_suspend_fn_)(void*) = co_composed_state_default_cancellation_on_suspend< Executors, Handler, Return>::fn(); bool throw_if_cancelled_ = false; bool complete_if_cancelled_ = true; }; template requires is_same< typename associated_cancellation_slot< Handler, cancellation_slot >::asio_associated_cancellation_slot_is_unspecialised, void>::value class co_composed_state_cancellation { public: void reset_cancellation_state() { } template void reset_cancellation_state(Filter) { } template void reset_cancellation_state(InFilter&&, OutFilter&&) { } cancellation_type_t cancelled() const noexcept { return cancellation_type::none; } void clear_cancellation_slot() noexcept { } [[nodiscard]] bool throw_if_cancelled() const noexcept { return false; } void throw_if_cancelled(bool) noexcept { } [[nodiscard]] bool complete_if_cancelled() const noexcept { return false; } void complete_if_cancelled(bool) noexcept { } private: template friend class co_composed_promise; template friend class co_composed_state_return_overload; void cancellation_on_suspend_fn(void (*)(void*)) { } void check_for_cancellation_on_transform() noexcept { } bool check_for_cancellation_on_suspend( co_composed_promise&) noexcept { return true; } }; template class co_composed_state : public co_composed_state_return, public co_composed_state_cancellation { public: using io_executor_type = typename composed_work_guard< typename composed_work::head_type>::executor_type; template co_composed_state(composed_io_executors&& executors, H&& h, co_composed_on_suspend& on_suspend) : work_(std::move(executors)), handler_(std::forward(h)), on_suspend_(&on_suspend) { this->reset_cancellation_state(enable_terminal_cancellation()); } io_executor_type get_io_executor() const noexcept { return work_.head_.get_executor(); } template [[nodiscard]] co_composed_completion complete(Args&&... args) requires requires { declval()(std::forward(args)...); } { return co_composed_completion(std::forward(args)...); } const Handler& handler() const noexcept { return handler_; } private: template friend class co_composed_handler_base; template friend class co_composed_promise; template friend class co_composed_promise_return_overload; template friend class co_composed_state_cancellation; template friend class co_composed_state_return_overload; template friend struct co_composed_state_default_cancellation_on_suspend_impl; composed_work work_; Handler handler_; co_composed_on_suspend* on_suspend_; }; template class co_composed_handler_cancellation { public: using cancellation_slot_type = cancellation_slot; cancellation_slot_type get_cancellation_slot() const noexcept { return static_cast< const co_composed_handler_base*>( this)->promise().state().get_cancellation_slot(); } }; template requires is_same< typename associated_cancellation_slot< Handler, cancellation_slot >::asio_associated_cancellation_slot_is_unspecialised, void>::value class co_composed_handler_cancellation { }; template class co_composed_handler_base : public co_composed_handler_cancellation { public: co_composed_handler_base( co_composed_promise& p) noexcept : p_(&p) { } co_composed_handler_base(co_composed_handler_base&& other) noexcept : p_(std::exchange(other.p_, nullptr)) { } ~co_composed_handler_base() { if (p_) [[unlikely]] p_->destroy(); } co_composed_promise& promise() const noexcept { return *p_; } protected: void resume(void* result) { co_composed_on_suspend on_suspend{}; std::exchange(p_, nullptr)->resume(p_, result, on_suspend); if (on_suspend.fn_) on_suspend.fn_(on_suspend.arg_); } private: co_composed_promise* p_; }; template class co_composed_handler; template class co_composed_handler : public co_composed_handler_base { public: using co_composed_handler_base::co_composed_handler_base; using result_type = std::tuple::type...>; template void operator()(T&&... args) { result_type result(std::forward(args)...); this->resume(&result); } static auto on_resume(void* result) { auto& args = *static_cast(result); if constexpr (sizeof...(Args) == 0) return; else if constexpr (sizeof...(Args) == 1) return std::move(std::get<0>(args)); else return std::move(args); } }; template class co_composed_handler : public co_composed_handler_base { public: using co_composed_handler_base::co_composed_handler_base; using args_type = std::tuple::type...>; using result_type = std::tuple; template void operator()(const asio::error_code& ec, T&&... args) { result_type result(ec, args_type(std::forward(args)...)); this->resume(&result); } static auto on_resume(void* result) { auto& [ec, args] = *static_cast(result); throw_error(ec); if constexpr (sizeof...(Args) == 0) return; else if constexpr (sizeof...(Args) == 1) return std::move(std::get<0>(args)); else return std::move(args); } }; template class co_composed_handler : public co_composed_handler_base { public: using co_composed_handler_base::co_composed_handler_base; using args_type = std::tuple::type...>; using result_type = std::tuple; template void operator()(std::exception_ptr ex, T&&... args) { result_type result(std::move(ex), args_type(std::forward(args)...)); this->resume(&result); } static auto on_resume(void* result) { auto& [ex, args] = *static_cast(result); if (ex) std::rethrow_exception(ex); if constexpr (sizeof...(Args) == 0) return; else if constexpr (sizeof...(Args) == 1) return std::move(std::get<0>(args)); else return std::move(args); } }; template class co_composed_promise_return; template class co_composed_promise_return> { public: auto final_suspend() noexcept { return suspend_never(); } void return_void() noexcept { } }; template class co_composed_promise_return_overload; template class co_composed_promise_return_overload< Executors, Handler, Return, R(Args...)> { public: using derived_type = co_composed_promise; using return_type = std::tuple; void return_value(std::tuple&& value) { derived_type& promise = *static_cast(this); promise.state().return_value_ = std::move(value); promise.state().work_.reset(); promise.state().on_suspend_->arg_ = this; promise.state().on_suspend_->fn_ = [](void* p) { auto& promise = *static_cast(p); co_composed_handler_base composed_handler(promise); Handler handler(std::move(promise.state().handler_)); return_type result( std::move(std::get(promise.state().return_value_))); co_composed_handler_base(std::move(composed_handler)); std::apply(std::move(handler), std::move(result)); }; } }; template class co_composed_promise_return> : public co_composed_promise_return_overload, Signatures>... { public: auto final_suspend() noexcept { return suspend_always(); } using co_composed_promise_return_overload, Signatures>::return_value...; private: template friend class co_composed_promise_return_overload; }; template class co_composed_promise : public co_composed_promise_return { public: template void* operator new(std::size_t size, co_composed_state& state, Args&&...) { block_allocator_type allocator( (get_associated_allocator)(state.handler_, recycling_allocator())); block* base_ptr = std::allocator_traits::allocate( allocator, blocks(sizeof(allocator_type)) + blocks(size)); new (static_cast(base_ptr)) allocator_type(std::move(allocator)); return base_ptr + blocks(sizeof(allocator_type)); } template void* operator new(std::size_t size, C&&, co_composed_state& state, Args&&...) { return co_composed_promise::operator new(size, state); } void operator delete(void* ptr, std::size_t size) { block* base_ptr = static_cast(ptr) - blocks(sizeof(allocator_type)); allocator_type* allocator_ptr = std::launder( static_cast(static_cast(base_ptr))); block_allocator_type block_allocator(std::move(*allocator_ptr)); allocator_ptr->~allocator_type(); std::allocator_traits::deallocate(block_allocator, base_ptr, blocks(sizeof(allocator_type)) + blocks(size)); } template co_composed_promise( co_composed_state& state, Args&&...) : state_(state) { } template co_composed_promise(C&&, co_composed_state& state, Args&&...) : state_(state) { } void destroy() noexcept { coroutine_handle::from_promise(*this).destroy(); } void resume(co_composed_promise*& owner, void* result, co_composed_on_suspend& on_suspend) { state_.on_suspend_ = &on_suspend; state_.clear_cancellation_slot(); owner_ = &owner; result_ = result; coroutine_handle::from_promise(*this).resume(); } co_composed_state& state() noexcept { return state_; } void get_return_object() noexcept { } auto initial_suspend() noexcept { return suspend_never(); } void unhandled_exception() { if (owner_) *owner_ = this; throw; } template auto await_transform(Op&& op #if defined(ASIO_ENABLE_HANDLER_TRACKING) # if defined(ASIO_HAS_SOURCE_LOCATION) , asio::detail::source_location location = asio::detail::source_location::current() # endif // defined(ASIO_HAS_SOURCE_LOCATION) #endif // defined(ASIO_ENABLE_HANDLER_TRACKING) ) { class [[nodiscard]] awaitable { public: awaitable(Op&& op, co_composed_promise& promise #if defined(ASIO_ENABLE_HANDLER_TRACKING) # if defined(ASIO_HAS_SOURCE_LOCATION) , const asio::detail::source_location& location # endif // defined(ASIO_HAS_SOURCE_LOCATION) #endif // defined(ASIO_ENABLE_HANDLER_TRACKING) ) : op_(std::forward(op)), promise_(promise) #if defined(ASIO_ENABLE_HANDLER_TRACKING) # if defined(ASIO_HAS_SOURCE_LOCATION) , location_(location) # endif // defined(ASIO_HAS_SOURCE_LOCATION) #endif // defined(ASIO_ENABLE_HANDLER_TRACKING) { } constexpr bool await_ready() const noexcept { return false; } void await_suspend(coroutine_handle) { if (promise_.state_.check_for_cancellation_on_suspend(promise_)) { promise_.state_.on_suspend_->arg_ = this; promise_.state_.on_suspend_->fn_ = [](void* p) { #if defined(ASIO_ENABLE_HANDLER_TRACKING) # if defined(ASIO_HAS_SOURCE_LOCATION) ASIO_HANDLER_LOCATION(( static_cast(p)->location_.file_name(), static_cast(p)->location_.line(), static_cast(p)->location_.function_name())); # endif // defined(ASIO_HAS_SOURCE_LOCATION) #endif // defined(ASIO_ENABLE_HANDLER_TRACKING) std::forward(static_cast(p)->op_)( co_composed_handler>( static_cast(p)->promise_)); }; } } auto await_resume() { return co_composed_handler>::on_resume(promise_.result_); } private: Op&& op_; co_composed_promise& promise_; #if defined(ASIO_ENABLE_HANDLER_TRACKING) # if defined(ASIO_HAS_SOURCE_LOCATION) asio::detail::source_location location_; # endif // defined(ASIO_HAS_SOURCE_LOCATION) #endif // defined(ASIO_ENABLE_HANDLER_TRACKING) }; state_.check_for_cancellation_on_transform(); return awaitable{std::forward(op), *this #if defined(ASIO_ENABLE_HANDLER_TRACKING) # if defined(ASIO_HAS_SOURCE_LOCATION) , location # endif // defined(ASIO_HAS_SOURCE_LOCATION) #endif // defined(ASIO_ENABLE_HANDLER_TRACKING) }; } template auto yield_value(co_composed_completion&& result) { class [[nodiscard]] awaitable { public: awaitable(co_composed_completion&& result, co_composed_promise& promise) : result_(std::move(result)), promise_(promise) { } constexpr bool await_ready() const noexcept { return false; } void await_suspend(coroutine_handle) { promise_.state_.work_.reset(); promise_.state_.on_suspend_->arg_ = this; promise_.state_.on_suspend_->fn_ = [](void* p) { awaitable& a = *static_cast(p); co_composed_handler_base composed_handler(a.promise_); Handler handler(std::move(a.promise_.state_.handler_)); std::tuple::type...> result( std::move(static_cast>(a.result_))); co_composed_handler_base(std::move(composed_handler)); std::apply(std::move(handler), std::move(result)); }; } void await_resume() noexcept { } private: co_composed_completion result_; co_composed_promise& promise_; }; return awaitable{std::move(result), *this}; } private: using allocator_type = associated_allocator_t>; union block { std::max_align_t max_align; alignas(allocator_type) char pad[alignof(allocator_type)]; }; using block_allocator_type = typename std::allocator_traits ::template rebind_alloc; static constexpr std::size_t blocks(std::size_t size) { return (size + sizeof(block) - 1) / sizeof(block); } co_composed_state& state_; co_composed_promise** owner_ = nullptr; void* result_ = nullptr; }; template class initiate_co_composed { public: using executor_type = typename composed_io_executors::head_type; template initiate_co_composed(I&& impl, composed_io_executors&& executors) : implementation_(std::forward(impl)), executors_(std::move(executors)) { } executor_type get_executor() const noexcept { return executors_.head_; } template void operator()(Handler&& handler, InitArgs&&... init_args) const & { using handler_type = typename decay::type; using returns_type = co_composed_returns; co_composed_on_suspend on_suspend{}; implementation_( co_composed_state( executors_, std::forward(handler), on_suspend), std::forward(init_args)...); if (on_suspend.fn_) on_suspend.fn_(on_suspend.arg_); } template void operator()(Handler&& handler, InitArgs&&... init_args) && { using handler_type = typename decay::type; using returns_type = co_composed_returns; co_composed_on_suspend on_suspend{}; std::move(implementation_)( co_composed_state( std::move(executors_), std::forward(handler), on_suspend), std::forward(init_args)...); if (on_suspend.fn_) on_suspend.fn_(on_suspend.arg_); } private: Implementation implementation_; composed_io_executors executors_; }; template inline initiate_co_composed make_initiate_co_composed(Implementation&& implementation, composed_io_executors&& executors) { return initiate_co_composed< typename decay::type, Executors, Signatures...>( std::forward(implementation), std::move(executors)); } } // namespace detail template inline auto co_composed(Implementation&& implementation, IoObjectsOrExecutors&&... io_objects_or_executors) { return detail::make_initiate_co_composed( std::forward(implementation), detail::make_composed_io_executors( detail::get_composed_io_executor( std::forward( io_objects_or_executors))...)); } } // namespace experimental #if !defined(GENERATING_DOCUMENTATION) template