// // detail/impl/io_uring_service.ipp // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP #define ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP #if defined(_MSC_VER) && (_MSC_VER >= 1200) # pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include "asio/detail/config.hpp" #if defined(ASIO_HAS_IO_URING) #include #include #include "asio/detail/io_uring_service.hpp" #include "asio/detail/reactor_op.hpp" #include "asio/detail/scheduler.hpp" #include "asio/detail/throw_error.hpp" #include "asio/error.hpp" #include "asio/detail/push_options.hpp" namespace asio { namespace detail { io_uring_service::io_uring_service(asio::execution_context& ctx) : execution_context_service_base(ctx), scheduler_(use_service(ctx)), mutex_(ASIO_CONCURRENCY_HINT_IS_LOCKING( REACTOR_REGISTRATION, scheduler_.concurrency_hint())), outstanding_work_(0), submit_sqes_op_(this), pending_sqes_(0), pending_submit_sqes_op_(false), shutdown_(false), timeout_(), registration_mutex_(mutex_.enabled()), reactor_(use_service(ctx)), reactor_data_(), event_fd_(-1) { reactor_.init_task(); init_ring(); register_with_reactor(); } io_uring_service::~io_uring_service() { if (ring_.ring_fd != -1) ::io_uring_queue_exit(&ring_); if (event_fd_ != -1) ::close(event_fd_); } void io_uring_service::shutdown() { mutex::scoped_lock lock(mutex_); shutdown_ = true; lock.unlock(); op_queue ops; // Cancel all outstanding operations. while (io_object* io_obj = registered_io_objects_.first()) { for (int i = 0; i < max_ops; ++i) { if (!io_obj->queues_[i].op_queue_.empty()) { ops.push(io_obj->queues_[i].op_queue_); if (::io_uring_sqe* sqe = get_sqe()) ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0); } } io_obj->shutdown_ = true; registered_io_objects_.free(io_obj); } // Cancel the timeout operation. if (::io_uring_sqe* sqe = get_sqe()) ::io_uring_prep_cancel(sqe, &timeout_, IOSQE_IO_DRAIN); submit_sqes(); // Wait for all completions to come back. for (; outstanding_work_ > 0; --outstanding_work_) { ::io_uring_cqe* cqe = 0; if (::io_uring_wait_cqe(&ring_, &cqe) != 0) break; } timer_queues_.get_all_timers(ops); scheduler_.abandon_operations(ops); } void io_uring_service::notify_fork( asio::execution_context::fork_event fork_ev) { switch (fork_ev) { case asio::execution_context::fork_prepare: { // Cancel all outstanding operations. They will be restarted // after the fork completes. mutex::scoped_lock registration_lock(registration_mutex_); for (io_object* io_obj = registered_io_objects_.first(); io_obj != 0; io_obj = io_obj->next_) { mutex::scoped_lock io_object_lock(io_obj->mutex_); for (int i = 0; i < max_ops; ++i) { if (!io_obj->queues_[i].op_queue_.empty() && !io_obj->queues_[i].cancel_requested_) { mutex::scoped_lock lock(mutex_); if (::io_uring_sqe* sqe = get_sqe()) ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0); } } } // Cancel the timeout operation. { mutex::scoped_lock lock(mutex_); if (::io_uring_sqe* sqe = get_sqe()) ::io_uring_prep_cancel(sqe, &timeout_, IOSQE_IO_DRAIN); submit_sqes(); } // Wait for all completions to come back, and post all completed I/O // queues to the scheduler. Note that some operations may have already // completed, or were explicitly cancelled. All others will be // automatically restarted. op_queue ops; for (; outstanding_work_ > 0; --outstanding_work_) { ::io_uring_cqe* cqe = 0; if (::io_uring_wait_cqe(&ring_, &cqe) != 0) break; if (void* ptr = ::io_uring_cqe_get_data(cqe)) { if (ptr != this && ptr != &timer_queues_ && ptr != &timeout_) { io_queue* io_q = static_cast(ptr); io_q->set_result(cqe->res); ops.push(io_q); } } } scheduler_.post_deferred_completions(ops); // Restart and eventfd operation. register_with_reactor(); } break; case asio::execution_context::fork_parent: // Restart the timeout and eventfd operations. update_timeout(); register_with_reactor(); break; case asio::execution_context::fork_child: { // The child process gets a new io_uring instance. ::io_uring_queue_exit(&ring_); init_ring(); register_with_reactor(); } break; default: break; } } void io_uring_service::init_task() { scheduler_.init_task(); } void io_uring_service::register_io_object( io_uring_service::per_io_object_data& io_obj) { io_obj = allocate_io_object(); mutex::scoped_lock io_object_lock(io_obj->mutex_); io_obj->service_ = this; io_obj->shutdown_ = false; for (int i = 0; i < max_ops; ++i) { io_obj->queues_[i].io_object_ = io_obj; io_obj->queues_[i].cancel_requested_ = false; } } void io_uring_service::register_internal_io_object( io_uring_service::per_io_object_data& io_obj, int op_type, io_uring_operation* op) { io_obj = allocate_io_object(); mutex::scoped_lock io_object_lock(io_obj->mutex_); io_obj->service_ = this; io_obj->shutdown_ = false; for (int i = 0; i < max_ops; ++i) { io_obj->queues_[i].io_object_ = io_obj; io_obj->queues_[i].cancel_requested_ = false; } io_obj->queues_[op_type].op_queue_.push(op); io_object_lock.unlock(); mutex::scoped_lock lock(mutex_); if (::io_uring_sqe* sqe = get_sqe()) { op->prepare(sqe); ::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]); post_submit_sqes_op(lock); } else { asio::error_code ec(ENOBUFS, asio::error::get_system_category()); asio::detail::throw_error(ec, "io_uring_get_sqe"); } } void io_uring_service::register_buffers(const ::iovec* v, unsigned n) { int result = ::io_uring_register_buffers(&ring_, v, n); if (result < 0) { asio::error_code ec(-result, asio::error::get_system_category()); asio::detail::throw_error(ec, "io_uring_register_buffers"); } } void io_uring_service::unregister_buffers() { (void)::io_uring_unregister_buffers(&ring_); } void io_uring_service::start_op(int op_type, io_uring_service::per_io_object_data& io_obj, io_uring_operation* op, bool is_continuation) { if (!io_obj) { op->ec_ = asio::error::bad_descriptor; post_immediate_completion(op, is_continuation); return; } mutex::scoped_lock io_object_lock(io_obj->mutex_); if (io_obj->shutdown_) { io_object_lock.unlock(); post_immediate_completion(op, is_continuation); return; } if (io_obj->queues_[op_type].op_queue_.empty()) { if (op->perform(false)) { io_object_lock.unlock(); scheduler_.post_immediate_completion(op, is_continuation); } else { io_obj->queues_[op_type].op_queue_.push(op); io_object_lock.unlock(); mutex::scoped_lock lock(mutex_); if (::io_uring_sqe* sqe = get_sqe()) { op->prepare(sqe); ::io_uring_sqe_set_data(sqe, &io_obj->queues_[op_type]); scheduler_.work_started(); post_submit_sqes_op(lock); } else { lock.unlock(); io_obj->queues_[op_type].set_result(-ENOBUFS); post_immediate_completion(&io_obj->queues_[op_type], is_continuation); } } } else { io_obj->queues_[op_type].op_queue_.push(op); scheduler_.work_started(); } } void io_uring_service::cancel_ops(io_uring_service::per_io_object_data& io_obj) { if (!io_obj) return; mutex::scoped_lock io_object_lock(io_obj->mutex_); op_queue ops; do_cancel_ops(io_obj, ops); io_object_lock.unlock(); scheduler_.post_deferred_completions(ops); } void io_uring_service::cancel_ops_by_key( io_uring_service::per_io_object_data& io_obj, int op_type, void* cancellation_key) { if (!io_obj) return; mutex::scoped_lock io_object_lock(io_obj->mutex_); bool first = true; op_queue ops; op_queue other_ops; while (io_uring_operation* op = io_obj->queues_[op_type].op_queue_.front()) { io_obj->queues_[op_type].op_queue_.pop(); if (op->cancellation_key_ == cancellation_key) { if (first) { other_ops.push(op); if (!io_obj->queues_[op_type].cancel_requested_) { io_obj->queues_[op_type].cancel_requested_ = true; mutex::scoped_lock lock(mutex_); if (::io_uring_sqe* sqe = get_sqe()) { ::io_uring_prep_cancel(sqe, &io_obj->queues_[op_type], 0); submit_sqes(); } } } else { op->ec_ = asio::error::operation_aborted; ops.push(op); } } else other_ops.push(op); first = false; } io_obj->queues_[op_type].op_queue_.push(other_ops); io_object_lock.unlock(); scheduler_.post_deferred_completions(ops); } void io_uring_service::deregister_io_object( io_uring_service::per_io_object_data& io_obj) { if (!io_obj) return; mutex::scoped_lock io_object_lock(io_obj->mutex_); if (!io_obj->shutdown_) { op_queue ops; bool pending_cancelled_ops = do_cancel_ops(io_obj, ops); io_obj->shutdown_ = true; io_object_lock.unlock(); scheduler_.post_deferred_completions(ops); if (pending_cancelled_ops) { // There are still pending operations. Prevent cleanup_io_object from // freeing the I/O object and let the last operation to complete free it. io_obj = 0; } else { // Leave io_obj set so that it will be freed by the subsequent call to // cleanup_io_object. } } else { // We are shutting down, so prevent cleanup_io_object from freeing // the I/O object and let the destructor free it instead. io_obj = 0; } } void io_uring_service::cleanup_io_object( io_uring_service::per_io_object_data& io_obj) { if (io_obj) { free_io_object(io_obj); io_obj = 0; } } void io_uring_service::run(long usec, op_queue& ops) { __kernel_timespec ts; int local_ops = 0; if (usec > 0) { ts.tv_sec = usec / 1000000; ts.tv_nsec = (usec % 1000000) * 1000; mutex::scoped_lock lock(mutex_); if (::io_uring_sqe* sqe = get_sqe()) { ++local_ops; ::io_uring_prep_timeout(sqe, &ts, 0, 0); ::io_uring_sqe_set_data(sqe, &ts); submit_sqes(); } } ::io_uring_cqe* cqe = 0; int result = (usec == 0) ? ::io_uring_peek_cqe(&ring_, &cqe) : ::io_uring_wait_cqe(&ring_, &cqe); if (result == 0 && usec > 0) { if (::io_uring_cqe_get_data(cqe) != &ts) { mutex::scoped_lock lock(mutex_); if (::io_uring_sqe* sqe = get_sqe()) { ++local_ops; ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&ts), 0); submit_sqes(); } } } bool check_timers = false; int count = 0; while (result == 0) { if (void* ptr = ::io_uring_cqe_get_data(cqe)) { if (ptr == this) { // The io_uring service was interrupted. } else if (ptr == &timer_queues_) { check_timers = true; } else if (ptr == &timeout_) { check_timers = true; timeout_.tv_sec = 0; timeout_.tv_nsec = 0; } else if (ptr == &ts) { --local_ops; } else { io_queue* io_q = static_cast(ptr); io_q->set_result(cqe->res); ops.push(io_q); } } ::io_uring_cqe_seen(&ring_, cqe); result = (++count < complete_batch_size || local_ops > 0) ? ::io_uring_peek_cqe(&ring_, &cqe) : -EAGAIN; } decrement(outstanding_work_, count); if (check_timers) { mutex::scoped_lock lock(mutex_); timer_queues_.get_ready_timers(ops); if (timeout_.tv_sec == 0 && timeout_.tv_nsec == 0) { timeout_ = get_timeout(); if (::io_uring_sqe* sqe = get_sqe()) { ::io_uring_prep_timeout(sqe, &timeout_, 0, 0); ::io_uring_sqe_set_data(sqe, &timeout_); push_submit_sqes_op(ops); } } } } void io_uring_service::interrupt() { mutex::scoped_lock lock(mutex_); if (::io_uring_sqe* sqe = get_sqe()) { ::io_uring_prep_nop(sqe); ::io_uring_sqe_set_data(sqe, this); } submit_sqes(); } void io_uring_service::init_ring() { int result = ::io_uring_queue_init(ring_size, &ring_, 0); if (result < 0) { ring_.ring_fd = -1; asio::error_code ec(-result, asio::error::get_system_category()); asio::detail::throw_error(ec, "io_uring_queue_init"); } #if !defined(ASIO_HAS_IO_URING_AS_DEFAULT) event_fd_ = ::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (event_fd_ < 0) { asio::error_code ec(-result, asio::error::get_system_category()); ::io_uring_queue_exit(&ring_); asio::detail::throw_error(ec, "eventfd"); } result = ::io_uring_register_eventfd(&ring_, event_fd_); if (result < 0) { ::close(event_fd_); ::io_uring_queue_exit(&ring_); asio::error_code ec(-result, asio::error::get_system_category()); asio::detail::throw_error(ec, "io_uring_queue_init"); } #endif // !defined(ASIO_HAS_IO_URING_AS_DEFAULT) } #if !defined(ASIO_HAS_IO_URING_AS_DEFAULT) class io_uring_service::event_fd_read_op : public reactor_op { public: event_fd_read_op(io_uring_service* s) : reactor_op(asio::error_code(), &event_fd_read_op::do_perform, event_fd_read_op::do_complete), service_(s) { } static status do_perform(reactor_op* base) { event_fd_read_op* o(static_cast(base)); for (;;) { // Only perform one read. The kernel maintains an atomic counter. uint64_t counter(0); errno = 0; int bytes_read = ::read(o->service_->event_fd_, &counter, sizeof(uint64_t)); if (bytes_read < 0 && errno == EINTR) continue; break; } op_queue ops; o->service_->run(0, ops); o->service_->scheduler_.post_deferred_completions(ops); return not_done; } static void do_complete(void* /*owner*/, operation* base, const asio::error_code& /*ec*/, std::size_t /*bytes_transferred*/) { event_fd_read_op* o(static_cast(base)); delete o; } private: io_uring_service* service_; }; #endif // !defined(ASIO_HAS_IO_URING_AS_DEFAULT) void io_uring_service::register_with_reactor() { #if !defined(ASIO_HAS_IO_URING_AS_DEFAULT) reactor_.register_internal_descriptor(reactor::read_op, event_fd_, reactor_data_, new event_fd_read_op(this)); #endif // !defined(ASIO_HAS_IO_URING_AS_DEFAULT) } io_uring_service::io_object* io_uring_service::allocate_io_object() { mutex::scoped_lock registration_lock(registration_mutex_); return registered_io_objects_.alloc( ASIO_CONCURRENCY_HINT_IS_LOCKING( REACTOR_IO, scheduler_.concurrency_hint())); } void io_uring_service::free_io_object(io_uring_service::io_object* io_obj) { mutex::scoped_lock registration_lock(registration_mutex_); registered_io_objects_.free(io_obj); } bool io_uring_service::do_cancel_ops( per_io_object_data& io_obj, op_queue& ops) { bool cancel_op = false; for (int i = 0; i < max_ops; ++i) { if (io_uring_operation* first_op = io_obj->queues_[i].op_queue_.front()) { cancel_op = true; io_obj->queues_[i].op_queue_.pop(); while (io_uring_operation* op = io_obj->queues_[i].op_queue_.front()) { op->ec_ = asio::error::operation_aborted; io_obj->queues_[i].op_queue_.pop(); ops.push(op); } io_obj->queues_[i].op_queue_.push(first_op); } } if (cancel_op) { mutex::scoped_lock lock(mutex_); for (int i = 0; i < max_ops; ++i) { if (!io_obj->queues_[i].op_queue_.empty() && !io_obj->queues_[i].cancel_requested_) { io_obj->queues_[i].cancel_requested_ = true; if (::io_uring_sqe* sqe = get_sqe()) ::io_uring_prep_cancel(sqe, &io_obj->queues_[i], 0); } } submit_sqes(); } return cancel_op; } void io_uring_service::do_add_timer_queue(timer_queue_base& queue) { mutex::scoped_lock lock(mutex_); timer_queues_.insert(&queue); } void io_uring_service::do_remove_timer_queue(timer_queue_base& queue) { mutex::scoped_lock lock(mutex_); timer_queues_.erase(&queue); } void io_uring_service::update_timeout() { if (::io_uring_sqe* sqe = get_sqe()) { ::io_uring_prep_timeout_remove(sqe, reinterpret_cast<__u64>(&timeout_), 0); ::io_uring_sqe_set_data(sqe, &timer_queues_); } } __kernel_timespec io_uring_service::get_timeout() const { __kernel_timespec ts; long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000); ts.tv_sec = usec / 1000000; ts.tv_nsec = usec ? (usec % 1000000) * 1000 : 1; return ts; } ::io_uring_sqe* io_uring_service::get_sqe() { ::io_uring_sqe* sqe = ::io_uring_get_sqe(&ring_); if (!sqe) { submit_sqes(); sqe = ::io_uring_get_sqe(&ring_); } if (sqe) { ::io_uring_sqe_set_data(sqe, 0); ++pending_sqes_; } return sqe; } void io_uring_service::submit_sqes() { if (pending_sqes_ != 0) { int result = ::io_uring_submit(&ring_); if (result > 0) { pending_sqes_ -= result; increment(outstanding_work_, result); } } } void io_uring_service::post_submit_sqes_op(mutex::scoped_lock& lock) { if (pending_sqes_ >= submit_batch_size) { submit_sqes(); } else if (pending_sqes_ != 0 && !pending_submit_sqes_op_) { pending_submit_sqes_op_ = true; lock.unlock(); scheduler_.post_immediate_completion(&submit_sqes_op_, false); } } void io_uring_service::push_submit_sqes_op(op_queue& ops) { if (pending_sqes_ != 0 && !pending_submit_sqes_op_) { pending_submit_sqes_op_ = true; ops.push(&submit_sqes_op_); scheduler_.compensating_work_started(); } } io_uring_service::submit_sqes_op::submit_sqes_op(io_uring_service* s) : operation(&io_uring_service::submit_sqes_op::do_complete), service_(s) { } void io_uring_service::submit_sqes_op::do_complete(void* owner, operation* base, const asio::error_code& /*ec*/, std::size_t /*bytes_transferred*/) { if (owner) { submit_sqes_op* o = static_cast(base); mutex::scoped_lock lock(o->service_->mutex_); o->service_->submit_sqes(); if (o->service_->pending_sqes_ != 0) o->service_->scheduler_.post_immediate_completion(o, true); else o->service_->pending_submit_sqes_op_ = false; } } io_uring_service::io_queue::io_queue() : operation(&io_uring_service::io_queue::do_complete) { } struct io_uring_service::perform_io_cleanup_on_block_exit { explicit perform_io_cleanup_on_block_exit(io_uring_service* s) : service_(s), io_object_to_free_(0), first_op_(0) { } ~perform_io_cleanup_on_block_exit() { if (io_object_to_free_) { mutex::scoped_lock lock(service_->mutex_); service_->free_io_object(io_object_to_free_); } if (first_op_) { // Post the remaining completed operations for invocation. if (!ops_.empty()) service_->scheduler_.post_deferred_completions(ops_); // A user-initiated operation has completed, but there's no need to // explicitly call work_finished() here. Instead, we'll take advantage of // the fact that the scheduler will call work_finished() once we return. } else { // No user-initiated operations have completed, so we need to compensate // for the work_finished() call that the scheduler will make once this // operation returns. service_->scheduler_.compensating_work_started(); } } io_uring_service* service_; io_object* io_object_to_free_; op_queue ops_; operation* first_op_; }; operation* io_uring_service::io_queue::perform_io(int result) { perform_io_cleanup_on_block_exit io_cleanup(io_object_->service_); mutex::scoped_lock io_object_lock(io_object_->mutex_); if (result != -ECANCELED || cancel_requested_) { if (io_uring_operation* op = op_queue_.front()) { if (result < 0) { op->ec_.assign(-result, asio::error::get_system_category()); op->bytes_transferred_ = 0; } else { op->ec_.assign(0, op->ec_.category()); op->bytes_transferred_ = static_cast(result); } } while (io_uring_operation* op = op_queue_.front()) { if (op->perform(io_cleanup.ops_.empty())) { op_queue_.pop(); io_cleanup.ops_.push(op); } else break; } } cancel_requested_ = false; if (!op_queue_.empty()) { io_uring_service* service = io_object_->service_; mutex::scoped_lock lock(service->mutex_); if (::io_uring_sqe* sqe = service->get_sqe()) { op_queue_.front()->prepare(sqe); ::io_uring_sqe_set_data(sqe, this); service->post_submit_sqes_op(lock); } else { lock.unlock(); while (io_uring_operation* op = op_queue_.front()) { op->ec_ = asio::error::no_buffer_space; op_queue_.pop(); io_cleanup.ops_.push(op); } } } // The last operation to complete on a shut down object must free it. if (io_object_->shutdown_) { io_cleanup.io_object_to_free_ = io_object_; for (int i = 0; i < max_ops; ++i) if (!io_object_->queues_[i].op_queue_.empty()) io_cleanup.io_object_to_free_ = 0; } // The first operation will be returned for completion now. The others will // be posted for later by the io_cleanup object's destructor. io_cleanup.first_op_ = io_cleanup.ops_.front(); io_cleanup.ops_.pop(); return io_cleanup.first_op_; } void io_uring_service::io_queue::do_complete(void* owner, operation* base, const asio::error_code& ec, std::size_t bytes_transferred) { if (owner) { io_queue* io_q = static_cast(base); int result = static_cast(bytes_transferred); if (operation* op = io_q->perform_io(result)) { op->complete(owner, ec, 0); } } } io_uring_service::io_object::io_object(bool locking) : mutex_(locking) { } } // namespace detail } // namespace asio #include "asio/detail/pop_options.hpp" #endif // defined(ASIO_HAS_IO_URING) #endif // ASIO_DETAIL_IMPL_IO_URING_SERVICE_IPP