Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for an old version of boost. Click here for the latest Boost documentation.

boost/asio/experimental/impl/co_spawn.hpp

//
// experimental/impl/co_spawn.hpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_ASIO_EXPERIMENTAL_IMPL_CO_SPAWN_HPP
#define BOOST_ASIO_EXPERIMENTAL_IMPL_CO_SPAWN_HPP

#if defined(_MSC_VER) && (_MSC_VER >= 1200)
# pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)

#include <boost/asio/detail/config.hpp>
#include <exception>
#include <functional>
#include <memory>
#include <new>
#include <tuple>
#include <utility>
#include <boost/asio/async_result.hpp>
#include <boost/asio/detail/thread_context.hpp>
#include <boost/asio/detail/thread_info_base.hpp>
#include <boost/asio/detail/type_traits.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/post.hpp>

#include <boost/asio/detail/push_options.hpp>

namespace boost {
namespace asio {
namespace experimental {
namespace detail {

// Promise object for coroutine at top of thread-of-execution "stack".
template <typename Executor>
class awaiter
{
public:
  struct deleter
  {
    void operator()(awaiter* a)
    {
      if (a)
        a->release();
    }
  };

  typedef std::unique_ptr<awaiter, deleter> ptr;

  typedef Executor executor_type;

  ~awaiter()
  {
    if (has_executor_)
      static_cast<Executor*>(static_cast<void*>(executor_))->~Executor();
  }

  void set_executor(const Executor& ex)
  {
    new (&executor_) Executor(ex);
    has_executor_ = true;
  }

  executor_type get_executor() const noexcept
  {
    return *static_cast<const Executor*>(static_cast<const void*>(executor_));
  }

  awaiter* get_return_object()
  {
    return this;
  }

  auto initial_suspend()
  {
    return std::experimental::suspend_always();
  }

  auto final_suspend()
  {
    return std::experimental::suspend_always();
  }

  void return_void()
  {
  }

  awaiter* add_ref()
  {
    ++ref_count_;
    return this;
  }

  void release()
  {
    if (--ref_count_ == 0)
      coroutine_handle<awaiter>::from_promise(*this).destroy();
  }

  void unhandled_exception()
  {
    pending_exception_ = std::current_exception();
  }

  void rethrow_unhandled_exception()
  {
    if (pending_exception_)
    {
      std::exception_ptr ex = std::exchange(pending_exception_, nullptr);
      std::rethrow_exception(ex);
    }
  }

private:
  std::size_t ref_count_ = 0;
  std::exception_ptr pending_exception_ = nullptr;
  alignas(Executor) unsigned char executor_[sizeof(Executor)];
  bool has_executor_ = false;
};

// Base promise for coroutines further down the thread-of-execution "stack".
template <typename Executor>
class awaitee_base
{
public:
#if !defined(BOOST_ASIO_DISABLE_AWAITEE_RECYCLING)
  void* operator new(std::size_t size)
  {
    return boost::asio::detail::thread_info_base::allocate(
        boost::asio::detail::thread_info_base::awaitee_tag(),
        boost::asio::detail::thread_context::thread_call_stack::top(),
        size);
  }

  void operator delete(void* pointer, std::size_t size)
  {
    boost::asio::detail::thread_info_base::deallocate(
        boost::asio::detail::thread_info_base::awaitee_tag(),
        boost::asio::detail::thread_context::thread_call_stack::top(),
        pointer, size);
  }
#endif // !defined(BOOST_ASIO_DISABLE_AWAITEE_RECYCLING)

  auto initial_suspend()
  {
    return std::experimental::suspend_never();
  }

  struct final_suspender
  {
    awaitee_base* this_;

    bool await_ready() const noexcept
    {
      return false;
    }

    void await_suspend(coroutine_handle<void>)
    {
      this_->wake_caller();
    }

    void await_resume() const noexcept
    {
    }
  };

  auto final_suspend()
  {
    return final_suspender{this};
  }

  void set_except(std::exception_ptr e)
  {
    pending_exception_ = e;
  }

  void unhandled_exception()
  {
    set_except(std::current_exception());
  }

  void rethrow_exception()
  {
    if (pending_exception_)
    {
      std::exception_ptr ex = std::exchange(pending_exception_, nullptr);
      std::rethrow_exception(ex);
    }
  }

  awaiter<Executor>* top()
  {
    return awaiter_;
  }

  coroutine_handle<void> caller()
  {
    return caller_;
  }

  bool ready() const
  {
    return ready_;
  }

  void wake_caller()
  {
    if (caller_)
      caller_.resume();
    else
      ready_ = true;
  }

  class awaitable_executor
  {
  public:
    explicit awaitable_executor(awaitee_base* a)
      : this_(a)
    {
    }

    bool await_ready() const noexcept
    {
      return this_->awaiter_ != nullptr;
    }

    template <typename U, typename Ex>
    void await_suspend(coroutine_handle<detail::awaitee<U, Ex>> h) noexcept
    {
      this_->resume_on_attach_ = h;
    }

    Executor await_resume()
    {
      return this_->awaiter_->get_executor();
    }

  private:
    awaitee_base* this_;
  };

  awaitable_executor await_transform(this_coro::executor_t) noexcept
  {
    return awaitable_executor(this);
  }

  class awaitable_token
  {
  public:
    explicit awaitable_token(awaitee_base* a)
      : this_(a)
    {
    }

    bool await_ready() const noexcept
    {
      return this_->awaiter_ != nullptr;
    }

    template <typename U, typename Ex>
    void await_suspend(coroutine_handle<detail::awaitee<U, Ex>> h) noexcept
    {
      this_->resume_on_attach_ = h;
    }

    await_token<Executor> await_resume()
    {
      return await_token<Executor>(this_->awaiter_);
    }

  private:
    awaitee_base* this_;
  };

  awaitable_token await_transform(this_coro::token_t) noexcept
  {
    return awaitable_token(this);
  }

  template <typename T>
  awaitable<T, Executor> await_transform(awaitable<T, Executor>& t) const
  {
    return std::move(t);
  }

  template <typename T>
  awaitable<T, Executor> await_transform(awaitable<T, Executor>&& t) const
  {
    return std::move(t);
  }

  std::experimental::suspend_always await_transform(
      std::experimental::suspend_always) const
  {
    return std::experimental::suspend_always();
  }

  void attach_caller(coroutine_handle<awaiter<Executor>> h)
  {
    this->caller_ = h;
    this->attach_callees(&h.promise());
  }

  template <typename U>
  void attach_caller(coroutine_handle<awaitee<U, Executor>> h)
  {
    this->caller_ = h;
    if (h.promise().awaiter_)
      this->attach_callees(h.promise().awaiter_);
    else
      h.promise().unattached_callee_ = this;
  }

  void attach_callees(awaiter<Executor>* a)
  {
    for (awaitee_base* curr = this; curr != nullptr;
        curr = std::exchange(curr->unattached_callee_, nullptr))
    {
      curr->awaiter_ = a;
      if (curr->resume_on_attach_)
        return std::exchange(curr->resume_on_attach_, nullptr).resume();
    }
  }

protected:
  awaiter<Executor>* awaiter_ = nullptr;
  coroutine_handle<void> caller_ = nullptr;
  awaitee_base<Executor>* unattached_callee_ = nullptr;
  std::exception_ptr pending_exception_ = nullptr;
  coroutine_handle<void> resume_on_attach_ = nullptr;
  bool ready_ = false;
};

// Promise object for coroutines further down the thread-of-execution "stack".
template <typename T, typename Executor>
class awaitee
  : public awaitee_base<Executor>
{
public:
  awaitee()
  {
  }

  awaitee(awaitee&& other) noexcept
    : awaitee_base<Executor>(std::move(other))
  {
  }

  ~awaitee()
  {
    if (has_result_)
      static_cast<T*>(static_cast<void*>(result_))->~T();
  }

  awaitable<T, Executor> get_return_object()
  {
    return awaitable<T, Executor>(this);
  };

  template <typename U>
  void return_value(U&& u)
  {
    new (&result_) T(std::forward<U>(u));
    has_result_ = true;
  }

  T get()
  {
    this->caller_ = nullptr;
    this->rethrow_exception();
    return std::move(*static_cast<T*>(static_cast<void*>(result_)));
  }

private:
  alignas(T) unsigned char result_[sizeof(T)];
  bool has_result_ = false;
};

// Promise object for coroutines further down the thread-of-execution "stack".
template <typename Executor>
class awaitee<void, Executor>
  : public awaitee_base<Executor>
{
public:
  awaitable<void, Executor> get_return_object()
  {
    return awaitable<void, Executor>(this);
  };

  void return_void()
  {
  }

  void get()
  {
    this->caller_ = nullptr;
    this->rethrow_exception();
  }
};

template <typename Executor>
class awaiter_task
{
public:
  typedef Executor executor_type;

  awaiter_task(awaiter<Executor>* a)
    : awaiter_(a->add_ref())
  {
  }

  awaiter_task(awaiter_task&& other) noexcept
    : awaiter_(std::exchange(other.awaiter_, nullptr))
  {
  }

  ~awaiter_task()
  {
    if (awaiter_)
    {
      // Coroutine "stack unwinding" must be performed through the executor.
      executor_type ex(awaiter_->get_executor());
      (post)(ex,
          [a = std::move(awaiter_)]() mutable
          {
            typename awaiter<Executor>::ptr(std::move(a));
          });
    }
  }

  executor_type get_executor() const noexcept
  {
    return awaiter_->get_executor();
  }

protected:
  typename awaiter<Executor>::ptr awaiter_;
};

template <typename Executor>
class co_spawn_handler : public awaiter_task<Executor>
{
public:
  using awaiter_task<Executor>::awaiter_task;

  void operator()()
  {
    typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
    coroutine_handle<awaiter<Executor>>::from_promise(*ptr.get()).resume();
  }
};

template <typename Executor, typename T>
class await_handler_base : public awaiter_task<Executor>
{
public:
  typedef awaitable<T, Executor> awaitable_type;

  await_handler_base(await_token<Executor> token)
    : awaiter_task<Executor>(token.awaiter_),
      awaitee_(nullptr)
  {
  }

  await_handler_base(await_handler_base&& other) noexcept
    : awaiter_task<Executor>(std::move(other)),
      awaitee_(std::exchange(other.awaitee_, nullptr))
  {
  }

  void attach_awaitee(const awaitable<T, Executor>& a)
  {
    awaitee_ = a.awaitee_;
  }

protected:
  awaitee<T, Executor>* awaitee_;
};

template <typename, typename...> class await_handler;

template <typename Executor>
class await_handler<Executor, void>
  : public await_handler_base<Executor, void>
{
public:
  using await_handler_base<Executor, void>::await_handler_base;

  void operator()()
  {
    typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
    this->awaitee_->return_void();
    this->awaitee_->wake_caller();
    ptr->rethrow_unhandled_exception();
  }
};

template <typename Executor>
class await_handler<Executor, boost::system::error_code>
  : public await_handler_base<Executor, void>
{
public:
  typedef void return_type;

  using await_handler_base<Executor, void>::await_handler_base;

  void operator()(const boost::system::error_code& ec)
  {
    typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
    if (ec)
    {
      this->awaitee_->set_except(
          std::make_exception_ptr(boost::system::system_error(ec)));
    }
    else
      this->awaitee_->return_void();
    this->awaitee_->wake_caller();
    ptr->rethrow_unhandled_exception();
  }
};

template <typename Executor>
class await_handler<Executor, std::exception_ptr>
  : public await_handler_base<Executor, void>
{
public:
  using await_handler_base<Executor, void>::await_handler_base;

  void operator()(std::exception_ptr ex)
  {
    typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
    if (ex)
      this->awaitee_->set_except(ex);
    else
      this->awaitee_->return_void();
    this->awaitee_->wake_caller();
    ptr->rethrow_unhandled_exception();
  }
};

template <typename Executor, typename T>
class await_handler<Executor, T>
  : public await_handler_base<Executor, T>
{
public:
  using await_handler_base<Executor, T>::await_handler_base;

  template <typename Arg>
  void operator()(Arg&& arg)
  {
    typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
    this->awaitee_->return_value(std::forward<Arg>(arg));
    this->awaitee_->wake_caller();
    ptr->rethrow_unhandled_exception();
  }
};

template <typename Executor, typename T>
class await_handler<Executor, boost::system::error_code, T>
  : public await_handler_base<Executor, T>
{
public:
  using await_handler_base<Executor, T>::await_handler_base;

  template <typename Arg>
  void operator()(const boost::system::error_code& ec, Arg&& arg)
  {
    typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
    if (ec)
    {
      this->awaitee_->set_except(
          std::make_exception_ptr(boost::system::system_error(ec)));
    }
    else
      this->awaitee_->return_value(std::forward<Arg>(arg));
    this->awaitee_->wake_caller();
    ptr->rethrow_unhandled_exception();
  }
};

template <typename Executor, typename T>
class await_handler<Executor, std::exception_ptr, T>
  : public await_handler_base<Executor, T>
{
public:
  using await_handler_base<Executor, T>::await_handler_base;

  template <typename Arg>
  void operator()(std::exception_ptr ex, Arg&& arg)
  {
    typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
    if (ex)
      this->awaitee_->set_except(ex);
    else
      this->awaitee_->return_value(std::forward<Arg>(arg));
    this->awaitee_->wake_caller();
    ptr->rethrow_unhandled_exception();
  }
};

template <typename Executor, typename... Ts>
class await_handler
  : public await_handler_base<Executor, std::tuple<Ts...>>
{
public:
  using await_handler_base<Executor, std::tuple<Ts...>>::await_handler_base;

  template <typename... Args>
  void operator()(Args&&... args)
  {
    typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
    this->awaitee_->return_value(
        std::forward_as_tuple(std::forward<Args>(args)...));
    this->awaitee_->wake_caller();
    ptr->rethrow_unhandled_exception();
  }
};

template <typename Executor, typename... Ts>
class await_handler<Executor, boost::system::error_code, Ts...>
  : public await_handler_base<Executor, std::tuple<Ts...>>
{
public:
  using await_handler_base<Executor, std::tuple<Ts...>>::await_handler_base;

  template <typename... Args>
  void operator()(const boost::system::error_code& ec, Args&&... args)
  {
    typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
    if (ec)
    {
      this->awaitee_->set_except(
          std::make_exception_ptr(boost::system::system_error(ec)));
    }
    else
    {
      this->awaitee_->return_value(
          std::forward_as_tuple(std::forward<Args>(args)...));
    }
    this->awaitee_->wake_caller();
    ptr->rethrow_unhandled_exception();
  }
};

template <typename Executor, typename... Ts>
class await_handler<Executor, std::exception_ptr, Ts...>
  : public await_handler_base<Executor, std::tuple<Ts...>>
{
public:
  using await_handler_base<Executor, std::tuple<Ts...>>::await_handler_base;

  template <typename... Args>
  void operator()(std::exception_ptr ex, Args&&... args)
  {
    typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_));
    if (ex)
      this->awaitee_->set_except(ex);
    else
    {
      this->awaitee_->return_value(
          std::forward_as_tuple(std::forward<Args>(args)...));
    }
    this->awaitee_->wake_caller();
    ptr->rethrow_unhandled_exception();
  }
};

template <typename T>
struct awaitable_signature;

template <typename T, typename Executor>
struct awaitable_signature<awaitable<T, Executor>>
{
  typedef void type(std::exception_ptr, T);
};

template <typename Executor>
struct awaitable_signature<awaitable<void, Executor>>
{
  typedef void type(std::exception_ptr);
};

template <typename T, typename Executor, typename F, typename Handler>
awaiter<Executor>* co_spawn_entry_point(awaitable<T, Executor>*,
    executor_work_guard<Executor> work_guard, F f, Handler handler)
{
  bool done = false;

  try
  {
    T t = co_await f();

    done = true;

    (dispatch)(work_guard.get_executor(),
        [handler = std::move(handler), t = std::move(t)]() mutable
        {
          handler(std::exception_ptr(), std::move(t));
        });
  }
  catch (...)
  {
    if (done)
      throw;

    (dispatch)(work_guard.get_executor(),
        [handler = std::move(handler), e = std::current_exception()]() mutable
        {
          handler(e, T());
        });
  }
}

template <typename Executor, typename F, typename Handler>
awaiter<Executor>* co_spawn_entry_point(awaitable<void, Executor>*,
    executor_work_guard<Executor> work_guard, F f, Handler handler)
{
  std::exception_ptr e = nullptr;

  try
  {
    co_await f();
  }
  catch (...)
  {
    e = std::current_exception();
  }

  (dispatch)(work_guard.get_executor(),
      [handler = std::move(handler), e]() mutable
      {
        handler(e);
      });
}

template <typename Executor, typename F, typename CompletionToken>
auto co_spawn(const Executor& ex, F&& f, CompletionToken&& token)
{
  typedef typename result_of<F()>::type awaitable_type;
  typedef typename awaitable_type::executor_type executor_type;
  typedef typename awaitable_signature<awaitable_type>::type signature_type;

  async_completion<CompletionToken, signature_type> completion(token);

  executor_type ex2(ex);
  auto work_guard = make_work_guard(completion.completion_handler, ex2);

  auto* a = (co_spawn_entry_point)(
      static_cast<awaitable_type*>(nullptr), std::move(work_guard),
      std::forward<F>(f), std::move(completion.completion_handler));

  a->set_executor(ex2);
  (post)(co_spawn_handler<executor_type>(a));

  return completion.result.get();
}

#if defined(_MSC_VER)
# pragma warning(push)
# pragma warning(disable:4033)
#endif // defined(_MSC_VER)

#if defined(_MSC_VER)
template <typename T> T dummy_return()
{
  return std::move(*static_cast<T*>(nullptr));
}

template <>
inline void dummy_return()
{
}
#endif // defined(_MSC_VER)

template <typename Awaitable>
inline Awaitable make_dummy_awaitable()
{
  for (;;) co_await std::experimental::suspend_always();
#if defined(_MSC_VER)
  co_return dummy_return<typename Awaitable::value_type>();
#endif // defined(_MSC_VER)
}

#if defined(_MSC_VER)
# pragma warning(pop)
#endif // defined(_MSC_VER)

} // namespace detail
} // namespace experimental

template <typename Executor, typename R, typename... Args>
class async_result<experimental::await_token<Executor>, R(Args...)>
{
public:
  typedef experimental::detail::await_handler<
    Executor, typename decay<Args>::type...> completion_handler_type;

  typedef typename experimental::detail::await_handler<
    Executor, Args...>::awaitable_type return_type;

  async_result(completion_handler_type& h)
    : awaitable_(experimental::detail::make_dummy_awaitable<return_type>())
  {
    h.attach_awaitee(awaitable_);
  }

  return_type get()
  {
    return std::move(awaitable_);
  }

private:
  return_type awaitable_;
};

#if !defined(BOOST_ASIO_NO_DEPRECATED)

template <typename Executor, typename R, typename... Args>
struct handler_type<experimental::await_token<Executor>, R(Args...)>
{
  typedef experimental::detail::await_handler<
    Executor, typename decay<Args>::type...> type;
};

template <typename Executor, typename... Args>
class async_result<experimental::detail::await_handler<Executor, Args...>>
{
public:
  typedef typename experimental::detail::await_handler<
    Executor, Args...>::awaitable_type type;

  async_result(experimental::detail::await_handler<Executor, Args...>& h)
    : awaitable_(experimental::detail::make_dummy_awaitable<type>())
  {
    h.attach_awaitee(awaitable_);
  }

  type get()
  {
    return std::move(awaitable_);
  }

private:
  type awaitable_;
};

#endif // !defined(BOOST_ASIO_NO_DEPRECATED)

} // namespace asio
} // namespace boost

namespace std { namespace experimental {

template <typename Executor, typename... Args>
struct coroutine_traits<
  boost::asio::experimental::detail::awaiter<Executor>*, Args...>
{
  typedef boost::asio::experimental::detail::awaiter<Executor> promise_type;
};

template <typename T, typename Executor, typename... Args>
struct coroutine_traits<
  boost::asio::experimental::awaitable<T, Executor>, Args...>
{
  typedef boost::asio::experimental::detail::awaitee<T, Executor> promise_type;
};

}} // namespace std::experimental

#include <boost/asio/detail/pop_options.hpp>

#endif // BOOST_ASIO_EXPERIMENTAL_IMPL_CO_SPAWN_HPP