// // experimental/impl/coro.hpp // ~~~~~~~~~~~~~~~~~~~~~~~~~~ // // Copyright (c) 2021-2023 Klemens D. Morgenstern // (klemens dot morgenstern at gmx dot net) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef ASIO_EXPERIMENTAL_IMPL_CORO_HPP #define ASIO_EXPERIMENTAL_IMPL_CORO_HPP #if defined(_MSC_VER) && (_MSC_VER >= 1200) # pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include "asio/detail/config.hpp" #include "asio/append.hpp" #include "asio/associated_cancellation_slot.hpp" #include "asio/bind_allocator.hpp" #include "asio/deferred.hpp" #include "asio/experimental/detail/coro_completion_handler.hpp" #include "asio/detail/push_options.hpp" namespace asio { namespace experimental { template struct coro; namespace detail { struct coro_cancellation_source { cancellation_slot slot; cancellation_state state; bool throw_if_cancelled_ = true; void reset_cancellation_state() { state = cancellation_state(slot); } template void reset_cancellation_state(ASIO_MOVE_ARG(Filter) filter) { state = cancellation_state(slot, ASIO_MOVE_CAST(Filter)(filter)); } template void reset_cancellation_state(ASIO_MOVE_ARG(InFilter) in_filter, ASIO_MOVE_ARG(OutFilter) out_filter) { state = cancellation_state(slot, ASIO_MOVE_CAST(InFilter)(in_filter), ASIO_MOVE_CAST(OutFilter)(out_filter)); } bool throw_if_cancelled() const { return throw_if_cancelled_; } void throw_if_cancelled(bool value) { throw_if_cancelled_ = value; } }; template struct coro_promise; template struct is_noexcept : std::false_type { }; template struct is_noexcept : std::false_type { }; template struct is_noexcept : std::true_type { }; template constexpr bool is_noexcept_v = is_noexcept::value; template struct coro_error; template <> struct coro_error { static asio::error_code invalid() { return asio::error::fault; } static asio::error_code cancelled() { return asio::error::operation_aborted; } static asio::error_code interrupted() { return asio::error::interrupted; } static asio::error_code done() { return asio::error::broken_pipe; } }; template <> struct coro_error { static std::exception_ptr invalid() { return std::make_exception_ptr( asio::system_error( coro_error::invalid())); } static std::exception_ptr cancelled() { return std::make_exception_ptr( asio::system_error( coro_error::cancelled())); } static std::exception_ptr interrupted() { return std::make_exception_ptr( asio::system_error( coro_error::interrupted())); } static std::exception_ptr done() { return std::make_exception_ptr( asio::system_error( coro_error::done())); } }; template struct coro_with_arg { using coro_t = Coroutine; T value; coro_t& coro; struct awaitable_t { T value; coro_t& coro; constexpr static bool await_ready() { return false; } template auto await_suspend(coroutine_handle> h) -> coroutine_handle<> { auto& hp = h.promise(); if constexpr (!coro_promise::is_noexcept) { if ((hp.cancel->state.cancelled() != cancellation_type::none) && hp.cancel->throw_if_cancelled_) { asio::detail::throw_error( asio::error::operation_aborted, "coro-cancelled"); } } if (hp.get_executor() == coro.get_executor()) { coro.coro_->awaited_from = h; coro.coro_->reset_error(); coro.coro_->input_ = std::move(value); coro.coro_->cancel = hp.cancel; return coro.coro_->get_handle(); } else { coro.coro_->awaited_from = dispatch_coroutine( asio::prefer(hp.get_executor(), execution::outstanding_work.tracked), [h]() mutable { h.resume(); }).handle; coro.coro_->reset_error(); coro.coro_->input_ = std::move(value); struct cancel_handler { using src = std::pair; std::shared_ptr st = std::make_shared(); cancel_handler(E e, coro_t& coro) : e(e), coro_(coro.coro_) { st->second.state = cancellation_state(st->second.slot = st->first.slot()); } E e; typename coro_t::promise_type* coro_; void operator()(cancellation_type ct) { asio::dispatch(e, [ct, st = st]() mutable { auto & [sig, state] = *st; sig.emit(ct); }); } }; if (hp.cancel->state.slot().is_connected()) { hp.cancel->state.slot().template emplace( coro.get_executor(), coro); } auto hh = detail::coroutine_handle< typename coro_t::promise_type>::from_promise(*coro.coro_); return dispatch_coroutine( coro.coro_->get_executor(), [hh]() mutable { hh.resume(); }).handle; } } auto await_resume() -> typename coro_t::result_type { coro.coro_->cancel = nullptr; coro.coro_->rethrow_if(); return std::move(coro.coro_->result_); } }; template auto async_resume(CompletionToken&& token) && { return coro.async_resume(std::move(value), std::forward(token)); } auto operator co_await() && { return awaitable_t{std::move(value), coro}; } }; template struct coro_promise_error; template <> struct coro_promise_error { std::exception_ptr error_; void reset_error() { error_ = std::exception_ptr{}; } void unhandled_exception() { error_ = std::current_exception(); } void rethrow_if() { if (error_) std::rethrow_exception(error_); } }; #if defined(__GNUC__) # pragma GCC diagnostic push # if defined(__clang__) # pragma GCC diagnostic ignored "-Wexceptions" # else # pragma GCC diagnostic ignored "-Wterminate" # endif #elif defined(_MSC_VER) # pragma warning(push) # pragma warning (disable:4297) #endif template <> struct coro_promise_error { void reset_error() { } void unhandled_exception() noexcept { throw; } void rethrow_if() { } }; #if defined(__GNUC__) # pragma GCC diagnostic pop #elif defined(_MSC_VER) # pragma warning(pop) #endif template struct yield_input { T& value; coroutine_handle<> awaited_from{noop_coroutine()}; bool await_ready() const noexcept { return false; } template coroutine_handle<> await_suspend(coroutine_handle) noexcept { return std::exchange(awaited_from, noop_coroutine()); } T await_resume() const noexcept { return std::move(value); } }; template <> struct yield_input { coroutine_handle<> awaited_from{noop_coroutine()}; bool await_ready() const noexcept { return false; } auto await_suspend(coroutine_handle<>) noexcept { return std::exchange(awaited_from, noop_coroutine()); } constexpr void await_resume() const noexcept { } }; struct coro_awaited_from { coroutine_handle<> awaited_from{noop_coroutine()}; auto final_suspend() noexcept { struct suspendor { coroutine_handle<> awaited_from; constexpr static bool await_ready() noexcept { return false; } auto await_suspend(coroutine_handle<>) noexcept { return std::exchange(awaited_from, noop_coroutine()); } constexpr static void await_resume() noexcept { } }; return suspendor{std::exchange(awaited_from, noop_coroutine())}; } ~coro_awaited_from() { awaited_from.resume(); }//must be on the right executor }; template struct coro_promise_exchange : coro_awaited_from { using result_type = coro_result_t; result_type result_; Input input_; auto yield_value(Yield&& y) { result_ = std::move(y); return yield_input{std::move(input_), std::exchange(awaited_from, noop_coroutine())}; } auto yield_value(const Yield& y) { result_ = y; return yield_input{std::move(input_), std::exchange(awaited_from, noop_coroutine())}; } void return_value(const Return& r) { result_ = r; } void return_value(Return&& r) { result_ = std::move(r); } }; template struct coro_promise_exchange : coro_awaited_from { using result_type = coro_result_t; result_type result_; auto yield_value(const YieldReturn& y) { result_ = y; return yield_input{std::exchange(awaited_from, noop_coroutine())}; } auto yield_value(YieldReturn&& y) { result_ = std::move(y); return yield_input{std::exchange(awaited_from, noop_coroutine())}; } void return_value(const YieldReturn& r) { result_ = r; } void return_value(YieldReturn&& r) { result_ = std::move(r); } }; template struct coro_promise_exchange : coro_awaited_from { using result_type = coro_result_t; result_type result_; auto yield_value(const Yield& y) { result_.template emplace<0>(y); return yield_input{std::exchange(awaited_from, noop_coroutine())}; } auto yield_value(Yield&& y) { result_.template emplace<0>(std::move(y)); return yield_input{std::exchange(awaited_from, noop_coroutine())}; } void return_value(const Return& r) { result_.template emplace<1>(r); } void return_value(Return&& r) { result_.template emplace<1>(std::move(r)); } }; template struct coro_promise_exchange : coro_awaited_from { using result_type = coro_result_t; result_type result_; Input input_; auto yield_value(Yield&& y) { result_ = std::move(y); return yield_input{input_, std::exchange(awaited_from, noop_coroutine())}; } auto yield_value(const Yield& y) { result_ = y; return yield_input{input_, std::exchange(awaited_from, noop_coroutine())}; } void return_void() { result_.reset(); } }; template struct coro_promise_exchange : coro_awaited_from { using result_type = coro_result_t; result_type result_; void yield_value(); void return_value(const Return& r) { result_ = r; } void return_value(Return&& r) { result_ = std::move(r); } }; template <> struct coro_promise_exchange : coro_awaited_from { void return_void() {} void yield_value(); }; template struct coro_promise_exchange : coro_awaited_from { using result_type = coro_result_t; result_type result_; auto yield_value(const Yield& y) { result_ = y; return yield_input{std::exchange(awaited_from, noop_coroutine())}; } auto yield_value(Yield&& y) { result_ = std::move(y); return yield_input{std::exchange(awaited_from, noop_coroutine())}; } void return_void() { result_.reset(); } }; template struct coro_promise final : coro_promise_allocator, coro_promise_error::is_noexcept>, coro_promise_exchange< typename coro_traits::yield_type, typename coro_traits::input_type, typename coro_traits::return_type> { using coro_type = coro; auto handle() { return coroutine_handle::from_promise(this); } using executor_type = Executor; executor_type executor_; std::optional cancel_source; coro_cancellation_source * cancel; using cancellation_slot_type = asio::cancellation_slot; cancellation_slot_type get_cancellation_slot() const noexcept { return cancel ? cancel->slot : cancellation_slot_type{}; } using allocator_type = typename std::allocator_traits>:: template rebind_alloc; using traits = coro_traits; using input_type = typename traits::input_type; using yield_type = typename traits::yield_type; using return_type = typename traits::return_type; using error_type = typename traits::error_type; using result_type = typename traits::result_type; constexpr static bool is_noexcept = traits::is_noexcept; auto get_executor() const -> Executor { return executor_; } auto get_handle() { return coroutine_handle::from_promise(*this); } template coro_promise(Executor executor, Args&&... args) noexcept : coro_promise_allocator( executor, std::forward(args)...), executor_(std::move(executor)) { } template coro_promise(First&& f, Executor executor, Args&&... args) noexcept : coro_promise_allocator( f, executor, std::forward(args)...), executor_(std::move(executor)) { } template coro_promise(First&& f, Context&& ctx, Args&&... args) noexcept : coro_promise_allocator( f, ctx, std::forward(args)...), executor_(ctx.get_executor()) { } template coro_promise(Context&& ctx, Args&&... args) noexcept : coro_promise_allocator( ctx, std::forward(args)...), executor_(ctx.get_executor()) { } auto get_return_object() { return coro{this}; } auto initial_suspend() noexcept { return suspend_always{}; } using coro_promise_exchange< typename coro_traits::yield_type, typename coro_traits::input_type, typename coro_traits::return_type>::yield_value; auto await_transform(this_coro::executor_t) const { struct exec_helper { const executor_type& value; constexpr static bool await_ready() noexcept { return true; } constexpr static void await_suspend(coroutine_handle<>) noexcept { } executor_type await_resume() const noexcept { return value; } }; return exec_helper{executor_}; } auto await_transform(this_coro::cancellation_state_t) const { struct exec_helper { const asio::cancellation_state& value; constexpr static bool await_ready() noexcept { return true; } constexpr static void await_suspend(coroutine_handle<>) noexcept { } asio::cancellation_state await_resume() const noexcept { return value; } }; assert(cancel); return exec_helper{cancel->state}; } // This await transformation resets the associated cancellation state. auto await_transform(this_coro::reset_cancellation_state_0_t) noexcept { struct result { detail::coro_cancellation_source * src_; bool await_ready() const noexcept { return true; } void await_suspend(coroutine_handle) noexcept { } auto await_resume() const { return src_->reset_cancellation_state(); } }; return result{cancel}; } // This await transformation resets the associated cancellation state. template auto await_transform( this_coro::reset_cancellation_state_1_t reset) noexcept { struct result { detail::coro_cancellation_source* src_; Filter filter_; bool await_ready() const noexcept { return true; } void await_suspend(coroutine_handle) noexcept { } auto await_resume() { return src_->reset_cancellation_state( ASIO_MOVE_CAST(Filter)(filter_)); } }; return result{cancel, ASIO_MOVE_CAST(Filter)(reset.filter)}; } // This await transformation resets the associated cancellation state. template auto await_transform( this_coro::reset_cancellation_state_2_t reset) noexcept { struct result { detail::coro_cancellation_source* src_; InFilter in_filter_; OutFilter out_filter_; bool await_ready() const noexcept { return true; } void await_suspend(coroutine_handle) noexcept { } auto await_resume() { return src_->reset_cancellation_state( ASIO_MOVE_CAST(InFilter)(in_filter_), ASIO_MOVE_CAST(OutFilter)(out_filter_)); } }; return result{cancel, ASIO_MOVE_CAST(InFilter)(reset.in_filter), ASIO_MOVE_CAST(OutFilter)(reset.out_filter)}; } // This await transformation determines whether cancellation is propagated as // an exception. auto await_transform(this_coro::throw_if_cancelled_0_t) noexcept requires (!is_noexcept) { struct result { detail::coro_cancellation_source* src_; bool await_ready() const noexcept { return true; } void await_suspend(coroutine_handle) noexcept { } auto await_resume() { return src_->throw_if_cancelled(); } }; return result{cancel}; } // This await transformation sets whether cancellation is propagated as an // exception. auto await_transform( this_coro::throw_if_cancelled_1_t throw_if_cancelled) noexcept requires (!is_noexcept) { struct result { detail::coro_cancellation_source* src_; bool value_; bool await_ready() const noexcept { return true; } void await_suspend(coroutine_handle) noexcept { } auto await_resume() { src_->throw_if_cancelled(value_); } }; return result{cancel, throw_if_cancelled.value}; } template auto await_transform(coro& kr) -> decltype(auto) { return kr; } template auto await_transform(coro&& kr) { return std::move(kr); } template auto await_transform(coro_with_arg&& kr) -> decltype(auto) { return std::move(kr); } template requires requires(T_ t) {{ t.async_wait(deferred) }; } auto await_transform(T_& t) -> decltype(auto) { return await_transform(t.async_wait(deferred)); } template auto await_transform(Op&& op, typename constraint::value>::type = 0) { if ((cancel->state.cancelled() != cancellation_type::none) && cancel->throw_if_cancelled_) { asio::detail::throw_error( asio::error::operation_aborted, "coro-cancelled"); } using signature = typename completion_signature_of::type; using result_type = detail::coro_completion_handler_type_t; using handler_type = typename detail::coro_completion_handler_type::template completion_handler; struct aw_t { Op op; std::optional result; constexpr static bool await_ready() { return false; } void await_suspend(coroutine_handle h) { std::move(op)(handler_type{h, result}); } auto await_resume() { if constexpr (is_noexcept) { if constexpr (std::tuple_size_v == 0u) return; else if constexpr (std::tuple_size_v == 1u) return std::get<0>(std::move(result).value()); else return std::move(result).value(); } else return detail::coro_interpret_result(std::move(result).value()); } }; return aw_t{std::move(op), {}}; } }; } // namespace detail template struct coro::awaitable_t { coro& coro_; constexpr static bool await_ready() { return false; } template auto await_suspend( detail::coroutine_handle> h) -> detail::coroutine_handle<> { auto& hp = h.promise(); if constexpr (!detail::coro_promise::is_noexcept) { if ((hp.cancel->state.cancelled() != cancellation_type::none) && hp.cancel->throw_if_cancelled_) { asio::detail::throw_error( asio::error::operation_aborted, "coro-cancelled"); } } if (hp.get_executor() == coro_.get_executor()) { coro_.coro_->awaited_from = h; coro_.coro_->cancel = hp.cancel; coro_.coro_->reset_error(); return coro_.coro_->get_handle(); } else { coro_.coro_->awaited_from = detail::dispatch_coroutine( asio::prefer(hp.get_executor(), execution::outstanding_work.tracked), [h]() mutable { h.resume(); }).handle; coro_.coro_->reset_error(); struct cancel_handler { std::shared_ptr> st = std::make_shared< std::pair>(); cancel_handler(E e, coro& coro) : e(e), coro_(coro.coro_) { st->second.state = cancellation_state( st->second.slot = st->first.slot()); } E e; typename coro::promise_type* coro_; void operator()(cancellation_type ct) { asio::dispatch(e, [ct, st = st]() mutable { auto & [sig, state] = *st; sig.emit(ct); }); } }; if (hp.cancel->state.slot().is_connected()) { hp.cancel->state.slot().template emplace( coro_.get_executor(), coro_); } auto hh = detail::coroutine_handle< detail::coro_promise>::from_promise( *coro_.coro_); return detail::dispatch_coroutine( coro_.coro_->get_executor(), [hh]() mutable { hh.resume(); }).handle; } } auto await_resume() -> result_type { coro_.coro_->cancel = nullptr; coro_.coro_->rethrow_if(); if constexpr (!std::is_void_v) return std::move(coro_.coro_->result_); } }; template struct coro::initiate_async_resume { typedef Executor executor_type; typedef Allocator allocator_type; typedef asio::cancellation_slot cancellation_slot_type; explicit initiate_async_resume(coro* self) : coro_(self->coro_) { } executor_type get_executor() const noexcept { return coro_->get_executor(); } allocator_type get_allocator() const noexcept { return coro_->get_allocator(); } template auto handle(E exec, WaitHandler&& handler, std::true_type /* error is noexcept */, std::true_type /* result is void */) //noexcept { return [this, coro = coro_, h = std::forward(handler), exec = std::move(exec)]() mutable { assert(coro); auto ch = detail::coroutine_handle::from_promise(*coro); assert(ch && !ch.done()); coro->awaited_from = post_coroutine(std::move(exec), std::move(h)); coro->reset_error(); ch.resume(); }; } template requires (!std::is_void_v) auto handle(E exec, WaitHandler&& handler, std::true_type /* error is noexcept */, std::false_type /* result is void */) //noexcept { return [coro = coro_, h = std::forward(handler), exec = std::move(exec)]() mutable { assert(coro); auto ch = detail::coroutine_handle::from_promise(*coro); assert(ch && !ch.done()); coro->awaited_from = detail::post_coroutine( exec, std::move(h), coro->result_).handle; coro->reset_error(); ch.resume(); }; } template auto handle(E exec, WaitHandler&& handler, std::false_type /* error is noexcept */, std::true_type /* result is void */) { return [coro = coro_, h = std::forward(handler), exec = std::move(exec)]() mutable { if (!coro) return asio::post(exec, asio::append(std::move(h), detail::coro_error::invalid())); auto ch = detail::coroutine_handle::from_promise(*coro); if (!ch) return asio::post(exec, asio::append(std::move(h), detail::coro_error::invalid())); else if (ch.done()) return asio::post(exec, asio::append(std::move(h), detail::coro_error::done())); else { coro->awaited_from = detail::post_coroutine( exec, std::move(h), coro->error_).handle; coro->reset_error(); ch.resume(); } }; } template auto handle(E exec, WaitHandler&& handler, std::false_type /* error is noexcept */, std::false_type /* result is void */) { return [coro = coro_, h = std::forward(handler), exec = std::move(exec)]() mutable { if (!coro) return asio::post(exec, asio::append(std::move(h), detail::coro_error::invalid(), result_type{})); auto ch = detail::coroutine_handle::from_promise(*coro); if (!ch) return asio::post(exec, asio::append(std::move(h), detail::coro_error::invalid(), result_type{})); else if (ch.done()) return asio::post(exec, asio::append(std::move(h), detail::coro_error::done(), result_type{})); else { coro->awaited_from = detail::post_coroutine( exec, std::move(h), coro->error_, coro->result_).handle; coro->reset_error(); ch.resume(); } }; } template void operator()(WaitHandler&& handler) { const auto exec = asio::prefer( get_associated_executor(handler, get_executor()), execution::outstanding_work.tracked); coro_->cancel = &coro_->cancel_source.emplace(); coro_->cancel->state = cancellation_state( coro_->cancel->slot = get_associated_cancellation_slot(handler)); asio::dispatch(get_executor(), handle(exec, std::forward(handler), std::integral_constant{}, std::is_void{})); } template void operator()(WaitHandler&& handler, Input&& input) { const auto exec = asio::prefer( get_associated_executor(handler, get_executor()), execution::outstanding_work.tracked); coro_->cancel = &coro_->cancel_source.emplace(); coro_->cancel->state = cancellation_state( coro_->cancel->slot = get_associated_cancellation_slot(handler)); asio::dispatch(get_executor(), [h = handle(exec, std::forward(handler), std::integral_constant{}, std::is_void{}), in = std::forward(input), coro = coro_]() mutable { coro->input_ = std::move(in); std::move(h)(); }); } private: typename coro::promise_type* coro_; }; } // namespace experimental } // namespace asio #include "asio/detail/pop_options.hpp" #endif // ASIO_EXPERIMENTAL_IMPL_CORO_HPP