// // experimental/detail/impl/channel_service.hpp // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP #define ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP #if defined(_MSC_VER) && (_MSC_VER >= 1200) # pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include "asio/detail/push_options.hpp" namespace asio { namespace experimental { namespace detail { template inline channel_service::channel_service(execution_context& ctx) : asio::detail::execution_context_service_base(ctx), mutex_(), impl_list_(0) { } template inline void channel_service::shutdown() { // Abandon all pending operations. asio::detail::op_queue ops; asio::detail::mutex::scoped_lock lock(mutex_); base_implementation_type* impl = impl_list_; while (impl) { ops.push(impl->waiters_); impl = impl->next_; } } template inline void channel_service::construct( channel_service::base_implementation_type& impl, std::size_t max_buffer_size) { impl.max_buffer_size_ = max_buffer_size; impl.receive_state_ = block; impl.send_state_ = max_buffer_size ? buffer : block; // Insert implementation into linked list of all implementations. asio::detail::mutex::scoped_lock lock(mutex_); impl.next_ = impl_list_; impl.prev_ = 0; if (impl_list_) impl_list_->prev_ = &impl; impl_list_ = &impl; } template template void channel_service::destroy( channel_service::implementation_type& impl) { cancel(impl); base_destroy(impl); } template template void channel_service::move_construct( channel_service::implementation_type& impl, channel_service::implementation_type< Traits, Signatures...>& other_impl) { impl.max_buffer_size_ = other_impl.max_buffer_size_; impl.receive_state_ = other_impl.receive_state_; other_impl.receive_state_ = block; impl.send_state_ = other_impl.send_state_; other_impl.send_state_ = other_impl.max_buffer_size_ ? buffer : block; impl.buffer_move_from(other_impl); // Insert implementation into linked list of all implementations. asio::detail::mutex::scoped_lock lock(mutex_); impl.next_ = impl_list_; impl.prev_ = 0; if (impl_list_) impl_list_->prev_ = &impl; impl_list_ = &impl; } template template void channel_service::move_assign( channel_service::implementation_type& impl, channel_service& other_service, channel_service::implementation_type< Traits, Signatures...>& other_impl) { cancel(impl); if (this != &other_service) { // Remove implementation from linked list of all implementations. asio::detail::mutex::scoped_lock lock(mutex_); if (impl_list_ == &impl) impl_list_ = impl.next_; if (impl.prev_) impl.prev_->next_ = impl.next_; if (impl.next_) impl.next_->prev_= impl.prev_; impl.next_ = 0; impl.prev_ = 0; } impl.max_buffer_size_ = other_impl.max_buffer_size_; impl.receive_state_ = other_impl.receive_state_; other_impl.receive_state_ = block; impl.send_state_ = other_impl.send_state_; other_impl.send_state_ = other_impl.max_buffer_size_ ? buffer : block; impl.buffer_move_from(other_impl); if (this != &other_service) { // Insert implementation into linked list of all implementations. asio::detail::mutex::scoped_lock lock(other_service.mutex_); impl.next_ = other_service.impl_list_; impl.prev_ = 0; if (other_service.impl_list_) other_service.impl_list_->prev_ = &impl; other_service.impl_list_ = &impl; } } template inline void channel_service::base_destroy( channel_service::base_implementation_type& impl) { // Remove implementation from linked list of all implementations. asio::detail::mutex::scoped_lock lock(mutex_); if (impl_list_ == &impl) impl_list_ = impl.next_; if (impl.prev_) impl.prev_->next_ = impl.next_; if (impl.next_) impl.next_->prev_= impl.prev_; impl.next_ = 0; impl.prev_ = 0; } template inline std::size_t channel_service::capacity( const channel_service::base_implementation_type& impl) const ASIO_NOEXCEPT { typename Mutex::scoped_lock lock(impl.mutex_); return impl.max_buffer_size_; } template inline bool channel_service::is_open( const channel_service::base_implementation_type& impl) const ASIO_NOEXCEPT { typename Mutex::scoped_lock lock(impl.mutex_); return impl.send_state_ != closed; } template template void channel_service::reset( channel_service::implementation_type& impl) { cancel(impl); typename Mutex::scoped_lock lock(impl.mutex_); impl.receive_state_ = block; impl.send_state_ = impl.max_buffer_size_ ? buffer : block; impl.buffer_clear(); } template template void channel_service::close( channel_service::implementation_type& impl) { typedef typename implementation_type::traits_type traits_type; typedef typename implementation_type::payload_type payload_type; typename Mutex::scoped_lock lock(impl.mutex_); if (impl.receive_state_ == block) { while (channel_operation* op = impl.waiters_.front()) { impl.waiters_.pop(); traits_type::invoke_receive_closed( complete_receive( static_cast*>(op))); } } impl.send_state_ = closed; if (impl.receive_state_ != buffer) impl.receive_state_ = closed; } template template void channel_service::cancel( channel_service::implementation_type& impl) { typedef typename implementation_type::traits_type traits_type; typedef typename implementation_type::payload_type payload_type; typename Mutex::scoped_lock lock(impl.mutex_); while (channel_operation* op = impl.waiters_.front()) { if (impl.send_state_ == block) { impl.waiters_.pop(); static_cast*>(op)->cancel(); } else { impl.waiters_.pop(); traits_type::invoke_receive_cancelled( complete_receive( static_cast*>(op))); } } if (impl.receive_state_ == waiter) impl.receive_state_ = block; if (impl.send_state_ == waiter) impl.send_state_ = impl.max_buffer_size_ ? buffer : block; } template template void channel_service::cancel_by_key( channel_service::implementation_type& impl, void* cancellation_key) { typedef typename implementation_type::traits_type traits_type; typedef typename implementation_type::payload_type payload_type; typename Mutex::scoped_lock lock(impl.mutex_); asio::detail::op_queue other_ops; while (channel_operation* op = impl.waiters_.front()) { if (op->cancellation_key_ == cancellation_key) { if (impl.send_state_ == block) { impl.waiters_.pop(); static_cast*>(op)->cancel(); } else { impl.waiters_.pop(); traits_type::invoke_receive_cancelled( complete_receive( static_cast*>(op))); } } else { impl.waiters_.pop(); other_ops.push(op); } } impl.waiters_.push(other_ops); if (impl.waiters_.empty()) { if (impl.receive_state_ == waiter) impl.receive_state_ = block; if (impl.send_state_ == waiter) impl.send_state_ = impl.max_buffer_size_ ? buffer : block; } } template inline bool channel_service::ready( const channel_service::base_implementation_type& impl) const ASIO_NOEXCEPT { typename Mutex::scoped_lock lock(impl.mutex_); return impl.receive_state_ != block; } template template bool channel_service::try_send( channel_service::implementation_type& impl, ASIO_MOVE_ARG(Args)... args) { typedef typename implementation_type::payload_type payload_type; typename Mutex::scoped_lock lock(impl.mutex_); switch (impl.send_state_) { case block: { return false; } case buffer: { impl.buffer_push(Message(0, ASIO_MOVE_CAST(Args)(args)...)); impl.receive_state_ = buffer; if (impl.buffer_size() == impl.max_buffer_size_) impl.send_state_ = block; return true; } case waiter: { payload_type payload(Message(0, ASIO_MOVE_CAST(Args)(args)...)); channel_receive* receive_op = static_cast*>(impl.waiters_.front()); impl.waiters_.pop(); receive_op->complete(ASIO_MOVE_CAST(payload_type)(payload)); if (impl.waiters_.empty()) impl.send_state_ = impl.max_buffer_size_ ? buffer : block; return true; } case closed: default: { return false; } } } template template std::size_t channel_service::try_send_n( channel_service::implementation_type& impl, std::size_t count, ASIO_MOVE_ARG(Args)... args) { typedef typename implementation_type::payload_type payload_type; typename Mutex::scoped_lock lock(impl.mutex_); if (count == 0) return 0; switch (impl.send_state_) { case block: return 0; case buffer: case waiter: break; case closed: default: return 0; } payload_type payload(Message(0, ASIO_MOVE_CAST(Args)(args)...)); for (std::size_t i = 0; i < count; ++i) { switch (impl.send_state_) { case block: { return i; } case buffer: { i += impl.buffer_push_n(count - i, ASIO_MOVE_CAST(payload_type)(payload)); impl.receive_state_ = buffer; if (impl.buffer_size() == impl.max_buffer_size_) impl.send_state_ = block; return i; } case waiter: { channel_receive* receive_op = static_cast*>(impl.waiters_.front()); impl.waiters_.pop(); receive_op->complete(payload); if (impl.waiters_.empty()) impl.send_state_ = impl.max_buffer_size_ ? buffer : block; break; } case closed: default: { return i; } } } return count; } template template void channel_service::start_send_op( channel_service::implementation_type& impl, channel_send::payload_type>* send_op) { typedef typename implementation_type::payload_type payload_type; typename Mutex::scoped_lock lock(impl.mutex_); switch (impl.send_state_) { case block: { impl.waiters_.push(send_op); if (impl.receive_state_ == block) impl.receive_state_ = waiter; return; } case buffer: { impl.buffer_push(send_op->get_payload()); impl.receive_state_ = buffer; if (impl.buffer_size() == impl.max_buffer_size_) impl.send_state_ = block; send_op->immediate(); break; } case waiter: { channel_receive* receive_op = static_cast*>(impl.waiters_.front()); impl.waiters_.pop(); receive_op->complete(send_op->get_payload()); if (impl.waiters_.empty()) impl.send_state_ = impl.max_buffer_size_ ? buffer : block; send_op->immediate(); break; } case closed: default: { send_op->close(); break; } } } template template bool channel_service::try_receive( channel_service::implementation_type& impl, ASIO_MOVE_ARG(Handler) handler) { typedef typename implementation_type::payload_type payload_type; typename Mutex::scoped_lock lock(impl.mutex_); switch (impl.receive_state_) { case block: { return false; } case buffer: { payload_type payload(impl.buffer_front()); if (channel_send* send_op = static_cast*>(impl.waiters_.front())) { impl.buffer_pop(); impl.buffer_push(send_op->get_payload()); impl.waiters_.pop(); send_op->complete(); } else { impl.buffer_pop(); if (impl.buffer_size() == 0) impl.receive_state_ = (impl.send_state_ == closed) ? closed : block; impl.send_state_ = (impl.send_state_ == closed) ? closed : buffer; } lock.unlock(); asio::detail::non_const_lvalue handler2(handler); channel_handler::type>( ASIO_MOVE_CAST(payload_type)(payload), handler2.value)(); return true; } case waiter: { channel_send* send_op = static_cast*>(impl.waiters_.front()); payload_type payload = send_op->get_payload(); impl.waiters_.pop(); send_op->complete(); if (impl.waiters_.front() == 0) impl.receive_state_ = (impl.send_state_ == closed) ? closed : block; lock.unlock(); asio::detail::non_const_lvalue handler2(handler); channel_handler::type>( ASIO_MOVE_CAST(payload_type)(payload), handler2.value)(); return true; } case closed: default: { return false; } } } template template void channel_service::start_receive_op( channel_service::implementation_type& impl, channel_receive::payload_type>* receive_op) { typedef typename implementation_type::traits_type traits_type; typedef typename implementation_type::payload_type payload_type; typename Mutex::scoped_lock lock(impl.mutex_); switch (impl.receive_state_) { case block: { impl.waiters_.push(receive_op); if (impl.send_state_ != closed) impl.send_state_ = waiter; return; } case buffer: { payload_type payload( ASIO_MOVE_CAST(payload_type)(impl.buffer_front())); if (channel_send* send_op = static_cast*>(impl.waiters_.front())) { impl.buffer_pop(); impl.buffer_push(send_op->get_payload()); impl.waiters_.pop(); send_op->complete(); } else { impl.buffer_pop(); if (impl.buffer_size() == 0) impl.receive_state_ = (impl.send_state_ == closed) ? closed : block; impl.send_state_ = (impl.send_state_ == closed) ? closed : buffer; } receive_op->immediate(ASIO_MOVE_CAST(payload_type)(payload)); break; } case waiter: { channel_send* send_op = static_cast*>(impl.waiters_.front()); payload_type payload = send_op->get_payload(); impl.waiters_.pop(); send_op->complete(); if (impl.waiters_.front() == 0) impl.receive_state_ = (impl.send_state_ == closed) ? closed : block; receive_op->immediate(ASIO_MOVE_CAST(payload_type)(payload)); break; } case closed: default: { traits_type::invoke_receive_closed( complete_receive(receive_op)); break; } } } } // namespace detail } // namespace experimental } // namespace asio #include "asio/detail/pop_options.hpp" #endif // ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP