Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for an old version of boost. Click here for the latest Boost documentation.

boost/asio/detail/win_iocp_io_service.hpp

//
// win_iocp_io_service.hpp
// ~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2010 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP
#define BOOST_ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP

#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)

#include <boost/asio/detail/push_options.hpp>

#include <boost/asio/detail/win_iocp_io_service_fwd.hpp>

#if defined(BOOST_ASIO_HAS_IOCP)

#include <boost/asio/detail/push_options.hpp>
#include <boost/limits.hpp>
#include <boost/throw_exception.hpp>
#include <boost/system/system_error.hpp>
#include <boost/asio/detail/pop_options.hpp>

#include <boost/asio/io_service.hpp>
#include <boost/asio/detail/call_stack.hpp>
#include <boost/asio/detail/handler_alloc_helpers.hpp>
#include <boost/asio/detail/handler_invoke_helpers.hpp>
#include <boost/asio/detail/service_base.hpp>
#include <boost/asio/detail/socket_types.hpp>
#include <boost/asio/detail/timer_queue.hpp>
#include <boost/asio/detail/mutex.hpp>

namespace boost {
namespace asio {
namespace detail {

class win_iocp_io_service
  : public boost::asio::detail::service_base<win_iocp_io_service>
{
public:
  // Base class for all operations. A function pointer is used instead of
  // virtual functions to avoid the associated overhead.
  //
  // This class inherits from OVERLAPPED so that we can downcast to get back to
  // the operation pointer from the LPOVERLAPPED out parameter of
  // GetQueuedCompletionStatus.
  class operation;
  friend class operation;
  class operation
    : public OVERLAPPED
  {
  public:
    typedef void (*invoke_func_type)(operation*, DWORD, size_t);
    typedef void (*destroy_func_type)(operation*);

    operation(win_iocp_io_service& iocp_service,
        invoke_func_type invoke_func, destroy_func_type destroy_func)
      : iocp_service_(iocp_service),
        ready_(0),
        last_error_(~DWORD(0)),
        bytes_transferred_(0),
        invoke_func_(invoke_func),
        destroy_func_(destroy_func)
    {
      Internal = 0;
      InternalHigh = 0;
      Offset = 0;
      OffsetHigh = 0;
      hEvent = 0;

      ::InterlockedIncrement(&iocp_service_.outstanding_operations_);
    }

    void reset()
    {
      Internal = 0;
      InternalHigh = 0;
      Offset = 0;
      OffsetHigh = 0;
      hEvent = 0;
      ready_ = 0;
      last_error_ = ~DWORD(0);
      bytes_transferred_ = 0;
    }

    void on_pending()
    {
      if (::InterlockedCompareExchange(&ready_, 1, 0) == 1)
        iocp_service_.post_completion(this, last_error_, bytes_transferred_);
    }

    void on_immediate_completion(DWORD last_error, DWORD bytes_transferred)
    {
      ready_ = 1;
      iocp_service_.post_completion(this, last_error, bytes_transferred);
    }

    bool on_completion(DWORD last_error, DWORD bytes_transferred)
    {
      if (last_error_ == ~DWORD(0))
      {
        last_error_ = last_error;
        bytes_transferred_ = bytes_transferred;
      }

      if (::InterlockedCompareExchange(&ready_, 1, 0) == 1)
      {
        invoke_func_(this, last_error_, bytes_transferred_);
        return true;
      }

      return false;
    }

    void destroy()
    {
      destroy_func_(this);
    }

  protected:
    // Prevent deletion through this type.
    ~operation()
    {
      ::InterlockedDecrement(&iocp_service_.outstanding_operations_);
    }

  private:
    win_iocp_io_service& iocp_service_;
    long ready_;
    DWORD last_error_;
    DWORD bytes_transferred_;
    invoke_func_type invoke_func_;
    destroy_func_type destroy_func_;
  };

  // Constructor.
  win_iocp_io_service(boost::asio::io_service& io_service)
    : boost::asio::detail::service_base<win_iocp_io_service>(io_service),
      iocp_(),
      outstanding_work_(0),
      outstanding_operations_(0),
      stopped_(0),
      shutdown_(0),
      timer_thread_(0),
      timer_interrupt_issued_(false)
  {
  }

  void init(size_t concurrency_hint)
  {
    iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
        static_cast<DWORD>((std::min<size_t>)(concurrency_hint, DWORD(~0))));
    if (!iocp_.handle)
    {
      DWORD last_error = ::GetLastError();
      boost::system::system_error e(
          boost::system::error_code(last_error,
            boost::asio::error::get_system_category()),
          "iocp");
      boost::throw_exception(e);
    }
  }

  // Destroy all user-defined handler objects owned by the service.
  void shutdown_service()
  {
    ::InterlockedExchange(&shutdown_, 1);

    while (::InterlockedExchangeAdd(&outstanding_operations_, 0) > 0)
    {
      DWORD bytes_transferred = 0;
#if defined(WINVER) && (WINVER < 0x0500)
      DWORD completion_key = 0;
#else
      DWORD_PTR completion_key = 0;
#endif
      LPOVERLAPPED overlapped = 0;
      ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
          &completion_key, &overlapped, INFINITE);
      if (overlapped)
        static_cast<operation*>(overlapped)->destroy();
    }

    for (std::size_t i = 0; i < timer_queues_.size(); ++i)
      timer_queues_[i]->destroy_timers();
    timer_queues_.clear();
  }

  // Initialise the task. Nothing to do here.
  void init_task()
  {
  }

  // Register a handle with the IO completion port.
  boost::system::error_code register_handle(
      HANDLE handle, boost::system::error_code& ec)
  {
    if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0) == 0)
    {
      DWORD last_error = ::GetLastError();
      ec = boost::system::error_code(last_error,
          boost::asio::error::get_system_category());
    }
    else
    {
      ec = boost::system::error_code();
    }
    return ec;
  }

  // Run the event loop until stopped or no more work.
  size_t run(boost::system::error_code& ec)
  {
    if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
    {
      ec = boost::system::error_code();
      return 0;
    }

    call_stack<win_iocp_io_service>::context ctx(this);

    size_t n = 0;
    while (do_one(true, ec))
      if (n != (std::numeric_limits<size_t>::max)())
        ++n;
    return n;
  }

  // Run until stopped or one operation is performed.
  size_t run_one(boost::system::error_code& ec)
  {
    if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
    {
      ec = boost::system::error_code();
      return 0;
    }

    call_stack<win_iocp_io_service>::context ctx(this);

    return do_one(true, ec);
  }

  // Poll for operations without blocking.
  size_t poll(boost::system::error_code& ec)
  {
    if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
    {
      ec = boost::system::error_code();
      return 0;
    }

    call_stack<win_iocp_io_service>::context ctx(this);

    size_t n = 0;
    while (do_one(false, ec))
      if (n != (std::numeric_limits<size_t>::max)())
        ++n;
    return n;
  }

  // Poll for one operation without blocking.
  size_t poll_one(boost::system::error_code& ec)
  {
    if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
    {
      ec = boost::system::error_code();
      return 0;
    }

    call_stack<win_iocp_io_service>::context ctx(this);

    return do_one(false, ec);
  }

  // Stop the event processing loop.
  void stop()
  {
    if (::InterlockedExchange(&stopped_, 1) == 0)
    {
      if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
      {
        DWORD last_error = ::GetLastError();
        boost::system::system_error e(
            boost::system::error_code(last_error,
              boost::asio::error::get_system_category()),
            "pqcs");
        boost::throw_exception(e);
      }
    }
  }

  // Reset in preparation for a subsequent run invocation.
  void reset()
  {
    ::InterlockedExchange(&stopped_, 0);
  }

  // Notify that some work has started.
  void work_started()
  {
    ::InterlockedIncrement(&outstanding_work_);
  }

  // Notify that some work has finished.
  void work_finished()
  {
    if (::InterlockedDecrement(&outstanding_work_) == 0)
      stop();
  }

  // Request invocation of the given handler.
  template <typename Handler>
  void dispatch(Handler handler)
  {
    if (call_stack<win_iocp_io_service>::contains(this))
      boost_asio_handler_invoke_helpers::invoke(handler, handler);
    else
      post(handler);
  }

  // Request invocation of the given handler and return immediately.
  template <typename Handler>
  void post(Handler handler)
  {
    // If the service has been shut down we silently discard the handler.
    if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
      return;

    // Allocate and construct an operation to wrap the handler.
    typedef handler_operation<Handler> value_type;
    typedef handler_alloc_traits<Handler, value_type> alloc_traits;
    raw_handler_ptr<alloc_traits> raw_ptr(handler);
    handler_ptr<alloc_traits> ptr(raw_ptr, *this, handler);

    // Enqueue the operation on the I/O completion port.
    ptr.get()->on_immediate_completion(0, 0);

    // Operation has been successfully posted.
    ptr.release();
  }

  // Request invocation of the given OVERLAPPED-derived operation.
  void post_completion(operation* op, DWORD op_last_error,
      DWORD bytes_transferred)
  {
    // Enqueue the operation on the I/O completion port.
    if (!::PostQueuedCompletionStatus(iocp_.handle,
          bytes_transferred, op_last_error, op))
    {
      DWORD last_error = ::GetLastError();
      boost::system::system_error e(
          boost::system::error_code(last_error,
            boost::asio::error::get_system_category()),
          "pqcs");
      boost::throw_exception(e);
    }
  }

  // Add a new timer queue to the service.
  template <typename Time_Traits>
  void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
  {
    boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
    timer_queues_.push_back(&timer_queue);
  }

  // Remove a timer queue from the service.
  template <typename Time_Traits>
  void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
  {
    boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
    for (std::size_t i = 0; i < timer_queues_.size(); ++i)
    {
      if (timer_queues_[i] == &timer_queue)
      {
        timer_queues_.erase(timer_queues_.begin() + i);
        return;
      }
    }
  }

  // Schedule a timer in the given timer queue to expire at the specified
  // absolute time. The handler object will be invoked when the timer expires.
  template <typename Time_Traits, typename Handler>
  void schedule_timer(timer_queue<Time_Traits>& timer_queue,
      const typename Time_Traits::time_type& time, Handler handler, void* token)
  {
    // If the service has been shut down we silently discard the timer.
    if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
      return;

    boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
    if (timer_queue.enqueue_timer(time, handler, token))
    {
      if (!timer_interrupt_issued_)
      {
        timer_interrupt_issued_ = true;
        lock.unlock();
        ::PostQueuedCompletionStatus(iocp_.handle,
            0, steal_timer_dispatching, 0);
      }
    }
  }

  // Cancel the timer associated with the given token. Returns the number of
  // handlers that have been posted or dispatched.
  template <typename Time_Traits>
  std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
  {
    // If the service has been shut down we silently ignore the cancellation.
    if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
      return 0;

    boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
    std::size_t n = timer_queue.cancel_timer(token);
    if (n > 0 && !timer_interrupt_issued_)
    {
      timer_interrupt_issued_ = true;
      lock.unlock();
      ::PostQueuedCompletionStatus(iocp_.handle,
          0, steal_timer_dispatching, 0);
    }
    return n;
  }

private:
  // Dequeues at most one operation from the I/O completion port, and then
  // executes it. Returns the number of operations that were dequeued (i.e.
  // either 0 or 1).
  size_t do_one(bool block, boost::system::error_code& ec)
  {
    long this_thread_id = static_cast<long>(::GetCurrentThreadId());

    for (;;)
    {
      // Try to acquire responsibility for dispatching timers.
      bool dispatching_timers = (::InterlockedCompareExchange(
            &timer_thread_, this_thread_id, 0) == 0);

      // Calculate timeout for GetQueuedCompletionStatus call.
      DWORD timeout = max_timeout;
      if (dispatching_timers)
      {
        boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
        timer_interrupt_issued_ = false;
        timeout = get_timeout();
      }

      // Get the next operation from the queue.
      DWORD bytes_transferred = 0;
#if defined(WINVER) && (WINVER < 0x0500)
      DWORD completion_key = 0;
#else
      DWORD_PTR completion_key = 0;
#endif
      LPOVERLAPPED overlapped = 0;
      ::SetLastError(0);
      BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
          &completion_key, &overlapped, block ? timeout : 0);
      DWORD last_error = ::GetLastError();

      // Dispatch any pending timers.
      if (dispatching_timers)
      {
        try
        {
          boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
          if (!timer_queues_.empty())
          {
            timer_queues_copy_ = timer_queues_;
            for (std::size_t i = 0; i < timer_queues_copy_.size(); ++i)
            {
              timer_queues_copy_[i]->dispatch_timers();
              timer_queues_copy_[i]->dispatch_cancellations();
              timer_queues_copy_[i]->complete_timers();
            }
          }
        }
        catch (...)
        {
          // Transfer responsibility for dispatching timers to another thread.
          if (::InterlockedCompareExchange(&timer_thread_,
                0, this_thread_id) == this_thread_id)
          {
            ::PostQueuedCompletionStatus(iocp_.handle,
                0, transfer_timer_dispatching, 0);
          }

          throw;
        }
      }

      if (!ok && overlapped == 0)
      {
        if (block && last_error == WAIT_TIMEOUT)
        {
          // Relinquish responsibility for dispatching timers.
          if (dispatching_timers)
          {
            ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);
          }

          continue;
        }

        // Transfer responsibility for dispatching timers to another thread.
        if (dispatching_timers && ::InterlockedCompareExchange(
              &timer_thread_, 0, this_thread_id) == this_thread_id)
        {
          ::PostQueuedCompletionStatus(iocp_.handle,
              0, transfer_timer_dispatching, 0);
        }

        ec = boost::system::error_code();
        return 0;
      }
      else if (overlapped)
      {
        // We may have been passed a last_error value in the completion_key.
        if (last_error == 0)
        {
          last_error = completion_key;
        }

        // Transfer responsibility for dispatching timers to another thread.
        if (dispatching_timers && ::InterlockedCompareExchange(
              &timer_thread_, 0, this_thread_id) == this_thread_id)
        {
          ::PostQueuedCompletionStatus(iocp_.handle,
              0, transfer_timer_dispatching, 0);
        }

        // Ensure that the io_service does not exit due to running out of work
        // while we make the upcall.
        auto_work work(*this);

        // Dispatch the operation.
        operation* op = static_cast<operation*>(overlapped);
        if (op->on_completion(last_error, bytes_transferred))
        {
          ec = boost::system::error_code();
          return 1;
        }
      }
      else if (completion_key == transfer_timer_dispatching)
      {
        // Woken up to try to acquire responsibility for dispatching timers.
        ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);
      }
      else if (completion_key == steal_timer_dispatching)
      {
        // Woken up to steal responsibility for dispatching timers.
        ::InterlockedExchange(&timer_thread_, 0);
      }
      else
      {
        // Relinquish responsibility for dispatching timers. If the io_service
        // is not being stopped then the thread will get an opportunity to
        // reacquire timer responsibility on the next loop iteration.
        if (dispatching_timers)
        {
          ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);
        }

        // The stopped_ flag is always checked to ensure that any leftover
        // interrupts from a previous run invocation are ignored.
        if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
        {
          // Wake up next thread that is blocked on GetQueuedCompletionStatus.
          if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
          {
            last_error = ::GetLastError();
            ec = boost::system::error_code(last_error,
                boost::asio::error::get_system_category());
            return 0;
          }

          ec = boost::system::error_code();
          return 0;
        }
      }
    }
  }

  // Check if all timer queues are empty.
  bool all_timer_queues_are_empty() const
  {
    for (std::size_t i = 0; i < timer_queues_.size(); ++i)
      if (!timer_queues_[i]->empty())
        return false;
    return true;
  }

  // Get the timeout value for the GetQueuedCompletionStatus call. The timeout
  // value is returned as a number of milliseconds. We will wait no longer than
  // 1000 milliseconds.
  DWORD get_timeout()
  {
    if (all_timer_queues_are_empty())
      return max_timeout;

    boost::posix_time::time_duration minimum_wait_duration
      = boost::posix_time::milliseconds(max_timeout);

    for (std::size_t i = 0; i < timer_queues_.size(); ++i)
    {
      boost::posix_time::time_duration wait_duration
        = timer_queues_[i]->wait_duration();
      if (wait_duration < minimum_wait_duration)
        minimum_wait_duration = wait_duration;
    }

    if (minimum_wait_duration > boost::posix_time::time_duration())
    {
      int milliseconds = minimum_wait_duration.total_milliseconds();
      return static_cast<DWORD>(milliseconds > 0 ? milliseconds : 1);
    }
    else
    {
      return 0;
    }
  }

  struct auto_work
  {
    auto_work(win_iocp_io_service& io_service)
      : io_service_(io_service)
    {
      io_service_.work_started();
    }

    ~auto_work()
    {
      io_service_.work_finished();
    }

  private:
    win_iocp_io_service& io_service_;
  };

  template <typename Handler>
  struct handler_operation
    : public operation
  {
    handler_operation(win_iocp_io_service& io_service,
        Handler handler)
      : operation(io_service, &handler_operation<Handler>::do_completion_impl,
          &handler_operation<Handler>::destroy_impl),
        io_service_(io_service),
        handler_(handler)
    {
      io_service_.work_started();
    }

    ~handler_operation()
    {
      io_service_.work_finished();
    }

  private:
    // Prevent copying and assignment.
    handler_operation(const handler_operation&);
    void operator=(const handler_operation&);
    
    static void do_completion_impl(operation* op, DWORD, size_t)
    {
      // Take ownership of the operation object.
      typedef handler_operation<Handler> op_type;
      op_type* handler_op(static_cast<op_type*>(op));
      typedef handler_alloc_traits<Handler, op_type> alloc_traits;
      handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);

      // Make a copy of the handler so that the memory can be deallocated before
      // the upcall is made.
      Handler handler(handler_op->handler_);

      // Free the memory associated with the handler.
      ptr.reset();

      // Make the upcall.
      boost_asio_handler_invoke_helpers::invoke(handler, handler);
    }

    static void destroy_impl(operation* op)
    {
      // Take ownership of the operation object.
      typedef handler_operation<Handler> op_type;
      op_type* handler_op(static_cast<op_type*>(op));
      typedef handler_alloc_traits<Handler, op_type> alloc_traits;
      handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);

      // A sub-object of the handler may be the true owner of the memory
      // associated with the handler. Consequently, a local copy of the handler
      // is required to ensure that any owning sub-object remains valid until
      // after we have deallocated the memory here.
      Handler handler(handler_op->handler_);
      (void)handler;

      // Free the memory associated with the handler.
      ptr.reset();
    }

    win_iocp_io_service& io_service_;
    Handler handler_;
  };

  // The IO completion port used for queueing operations.
  struct iocp_holder
  {
    HANDLE handle;
    iocp_holder() : handle(0) {}
    ~iocp_holder() { if (handle) ::CloseHandle(handle); }
  } iocp_;

  // The count of unfinished work.
  long outstanding_work_;

  // The count of unfinished operations.
  long outstanding_operations_;
  friend class operation;

  // Flag to indicate whether the event loop has been stopped.
  long stopped_;

  // Flag to indicate whether the service has been shut down.
  long shutdown_;

  enum
  {
    // Maximum GetQueuedCompletionStatus timeout, in milliseconds.
    max_timeout = 500,

    // Completion key value to indicate that responsibility for dispatching
    // timers is being cooperatively transferred from one thread to another.
    transfer_timer_dispatching = 1,

    // Completion key value to indicate that responsibility for dispatching
    // timers should be stolen from another thread.
    steal_timer_dispatching = 2
  };

  // The thread that's currently in charge of dispatching timers.
  long timer_thread_;

  // Mutex for protecting access to the timer queues.
  mutex timer_mutex_;

  // Whether a thread has been interrupted to process a new timeout.
  bool timer_interrupt_issued_;

  // The timer queues.
  std::vector<timer_queue_base*> timer_queues_;

  // A copy of the timer queues, used when dispatching, cancelling and cleaning
  // up timers. The copy is stored as a class data member to avoid unnecessary
  // memory allocation.
  std::vector<timer_queue_base*> timer_queues_copy_;
};

} // namespace detail
} // namespace asio
} // namespace boost

#endif // defined(BOOST_ASIO_HAS_IOCP)

#include <boost/asio/detail/pop_options.hpp>

#endif // BOOST_ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP