Boost C++ Libraries

...one of the most highly regarded and expertly designed C++ library projects in the world. Herb Sutter and Andrei Alexandrescu, C++ Coding Standards

This is the documentation for an old version of Boost. Click here to view this page for the latest version.

boost/property_map/parallel/impl/distributed_property_map.ipp

// Copyright (C) 2004-2006 The Trustees of Indiana University.

// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

//  Authors: Douglas Gregor
//           Nick Edmonds
//           Andrew Lumsdaine
#include <boost/property_map/parallel/distributed_property_map.hpp>
#include <boost/graph/parallel/detail/untracked_pair.hpp>
#include <boost/type_traits/is_base_and_derived.hpp>
#include <boost/bind.hpp>
#include <boost/graph/parallel/simple_trigger.hpp>

#ifndef BOOST_GRAPH_USE_MPI
#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
#endif

namespace boost { namespace parallel {

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
template<typename Reduce>
PBGL_DISTRIB_PMAP
::distributed_property_map(const ProcessGroup& pg, const GlobalMap& global,
                           const StorageMap& pm, const Reduce& reduce)
  : data(new data_t(pg, global, pm, reduce, Reduce::non_default_resolver))
{
  typedef handle_message<Reduce> Handler;

  data->ghost_cells.reset(new ghost_cells_type());
  data->reset = &data_t::template do_reset<Reduce>;
  data->process_group.replace_handler(Handler(data, reduce));
  data->process_group.template get_receiver<Handler>()
    ->setup_triggers(data->process_group);
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
PBGL_DISTRIB_PMAP::~distributed_property_map() { }

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
template<typename Reduce>
void 
PBGL_DISTRIB_PMAP::set_reduce(const Reduce& reduce)
{
  typedef handle_message<Reduce> Handler;
  data->process_group.replace_handler(Handler(data, reduce));
  Handler* handler = data->process_group.template get_receiver<Handler>();
  assert(handler);
  handler->setup_triggers(data->process_group);
  data->get_default_value = reduce;
  data->has_default_resolver = Reduce::non_default_resolver;
  int model = data->model;
  data->reset = &data_t::template do_reset<Reduce>;
  set_consistency_model(model);
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
void PBGL_DISTRIB_PMAP::prune_ghost_cells() const
{
  if (data->max_ghost_cells == 0)
    return;

  while (data->ghost_cells->size() > data->max_ghost_cells) {
    // Evict the last ghost cell

    if (data->model & cm_flush) {
      // We need to flush values when we evict them.
      boost::parallel::detail::untracked_pair<key_type, value_type> const& victim
        = data->ghost_cells->back();
      send(data->process_group, get(data->global, victim.first).first, 
           property_map_put, victim);
    }

    // Actually remove the ghost cell
    data->ghost_cells->pop_back();
  }
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
typename PBGL_DISTRIB_PMAP::value_type&
PBGL_DISTRIB_PMAP::cell(const key_type& key, bool request_if_missing) const
{
  // Index by key
  ghost_cells_key_index_type const& key_index 
    = data->ghost_cells->template get<1>();

  // Search for the ghost cell by key, and project back to the sequence
  iterator ghost_cell 
    = data->ghost_cells->template project<0>(key_index.find(key));
  if (ghost_cell == data->ghost_cells->end()) {
    value_type value;
    if (data->has_default_resolver)
      // Since we have a default resolver, use it to create a default
      // value for this ghost cell.
      value = data->get_default_value(key);
    else if (request_if_missing)
      // Request the actual value of this key from its owner
      send_oob_with_reply(data->process_group, get(data->global, key).first, 
                          property_map_get, key, value);
    else
      value = value_type();

    // Create a ghost cell containing the new value
    ghost_cell 
      = data->ghost_cells->push_front(std::make_pair(key, value)).first;

    // If we need to, prune the ghost cells
    if (data->max_ghost_cells > 0)
      prune_ghost_cells();
  } else if (data->max_ghost_cells > 0)
    // Put this cell at the beginning of the MRU list
    data->ghost_cells->relocate(data->ghost_cells->begin(), ghost_cell);

  return const_cast<value_type&>(ghost_cell->second);
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
template<typename Reduce>
void
PBGL_DISTRIB_PMAP
::handle_message<Reduce>::operator()(process_id_type source, int tag)
{
  assert(false);
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
template<typename Reduce>
void
PBGL_DISTRIB_PMAP::handle_message<Reduce>::
handle_put(int /*source*/, int /*tag*/, 
           const boost::parallel::detail::untracked_pair<key_type, value_type>& req, trigger_receive_context)
{
  using boost::get;

  shared_ptr<data_t> data(data_ptr);

  owner_local_pair p = get(data->global, req.first);
  assert(p.first == process_id(data->process_group));

  detail::maybe_put(data->storage, p.second,
                    reduce(req.first,
                           get(data->storage, p.second),
                           req.second));
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
template<typename Reduce>
typename PBGL_DISTRIB_PMAP::value_type
PBGL_DISTRIB_PMAP::handle_message<Reduce>::
handle_get(int source, int /*tag*/, const key_type& key, 
           trigger_receive_context)
{
  using boost::get;

  shared_ptr<data_t> data(data_ptr);
  assert(data);

  owner_local_pair p = get(data->global, key);
  return get(data->storage, p.second);
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
template<typename Reduce>
void
PBGL_DISTRIB_PMAP::handle_message<Reduce>::
handle_multiget(int source, int tag, const std::vector<key_type>& keys,
                trigger_receive_context)
{
  shared_ptr<data_t> data(data_ptr);
  assert(data);

  typedef boost::parallel::detail::untracked_pair<key_type, value_type> key_value;
  std::vector<key_value> results;
  std::size_t n = keys.size();
  results.reserve(n);

  using boost::get;

  for (std::size_t i = 0; i < n; ++i) {
    local_key_type local_key = get(data->global, keys[i]).second;
    results.push_back(key_value(keys[i], get(data->storage, local_key)));
  }
  send(data->process_group, source, property_map_multiget_reply, results);
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
template<typename Reduce>
void
PBGL_DISTRIB_PMAP::handle_message<Reduce>::
handle_multiget_reply
  (int source, int tag, 
   const std::vector<boost::parallel::detail::untracked_pair<key_type, value_type> >& msg,
   trigger_receive_context)
{
  shared_ptr<data_t> data(data_ptr);
  assert(data);

  // Index by key
  ghost_cells_key_index_type const& key_index 
    = data->ghost_cells->template get<1>();

  std::size_t n = msg.size();
  for (std::size_t i = 0; i < n; ++i) {
    // Search for the ghost cell by key, and project back to the sequence
    iterator position
      = data->ghost_cells->template project<0>(key_index.find(msg[i].first));

    if (position != data->ghost_cells->end())
      const_cast<value_type&>(position->second) = msg[i].second;
  }
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
template<typename Reduce>
void
PBGL_DISTRIB_PMAP::handle_message<Reduce>::
handle_multiput
  (int source, int tag, 
   const std::vector<unsafe_pair<local_key_type, value_type> >& values,
   trigger_receive_context)
{
  using boost::get;

  shared_ptr<data_t> data(data_ptr);
  assert(data);

  std::size_t n = values.size();
  for (std::size_t i = 0; i < n; ++i) {
    local_key_type local_key = values[i].first;
    value_type local_value = get(data->storage, local_key);
    detail::maybe_put(data->storage, values[i].first,
                      reduce(values[i].first,
                             local_value,
                             values[i].second));
  }
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
template<typename Reduce>
void
PBGL_DISTRIB_PMAP::handle_message<Reduce>::
setup_triggers(process_group_type& pg)
{
  using boost::graph::parallel::simple_trigger;

  simple_trigger(pg, property_map_put, this, &handle_message::handle_put);
  simple_trigger(pg, property_map_get, this, &handle_message::handle_get);
  simple_trigger(pg, property_map_multiget, this, 
                 &handle_message::handle_multiget);
  simple_trigger(pg, property_map_multiget_reply, this, 
                 &handle_message::handle_multiget_reply);
  simple_trigger(pg, property_map_multiput, this, 
                 &handle_message::handle_multiput);
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
void
PBGL_DISTRIB_PMAP
::on_synchronize::operator()()
{
  int stage=0; // we only get called at the start now
  shared_ptr<data_t> data(data_ptr);
  assert(data);

  // Determine in which stage backward consistency messages should be sent.
  int backward_stage = -1;
  if (data->model & cm_backward) {
    if (data->model & cm_flush) backward_stage = 1;
    else backward_stage = 0;
  }

  // Flush results in first stage
  if (stage == 0 && data->model & cm_flush)
    data->flush();

  // Backward consistency
  if (stage == backward_stage && !(data->model & (cm_clear | cm_reset)))
    data->refresh_ghost_cells();

  // Optionally clear results
  if (data->model & cm_clear)
    data->clear();

  // Optionally reset results
  if (data->model & cm_reset) {
    if (data->reset) ((*data).*data->reset)();
  }
}


template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
void
PBGL_DISTRIB_PMAP::set_consistency_model(int model)
{
  data->model = model;

  int stages = 1;
  bool need_on_synchronize = (model != cm_forward);

  // Backward consistency is a two-stage process.
  if (model & cm_backward) {
    if (model & cm_flush) stages = 3;
    else stages = 2;

    // For backward consistency to work, we absolutely cannot throw
    // away any ghost cells.
    data->max_ghost_cells = 0;
  }

  // attach the on_synchronize handler.
  if (need_on_synchronize)
    data->process_group.replace_on_synchronize_handler(on_synchronize(data));
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
void
PBGL_DISTRIB_PMAP::set_max_ghost_cells(std::size_t max_ghost_cells)
{
  if ((data->model & cm_backward) && max_ghost_cells > 0)
      boost::throw_exception(std::runtime_error("distributed_property_map::set_max_ghost_cells: "
                                                "cannot limit ghost-cell usage with a backward "
                                                "consistency model"));

  if (max_ghost_cells == 1)
    // It is not safe to have only 1 ghost cell; the cell() method
    // will fail.
    max_ghost_cells = 2;

  data->max_ghost_cells = max_ghost_cells;
  prune_ghost_cells();
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
void PBGL_DISTRIB_PMAP::clear()
{
  data->clear();
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
void PBGL_DISTRIB_PMAP::data_t::clear()
{
  ghost_cells->clear();
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
void PBGL_DISTRIB_PMAP::reset()
{
  if (data->reset) ((*data).*data->reset)();
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
void PBGL_DISTRIB_PMAP::flush()
{
  data->flush();
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
void PBGL_DISTRIB_PMAP::data_t::refresh_ghost_cells()
{
  using boost::get;

  std::vector<std::vector<key_type> > keys;
  keys.resize(num_processes(process_group));

  // Collect the set of keys for which we will request values
  for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
    keys[get(global, i->first).first].push_back(i->first);

  // Send multiget requests to each of the other processors
  typedef typename ProcessGroup::process_size_type process_size_type;
  process_size_type n = num_processes(process_group);
  process_id_type id = process_id(process_group);
  for (process_size_type p = (id + 1) % n ; p != id ; p = (p + 1) % n) {
    if (!keys[p].empty())
      send(process_group, p, property_map_multiget, keys[p]);
  }  
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
void PBGL_DISTRIB_PMAP::data_t::flush()
{
  using boost::get;

  int n = num_processes(process_group);
  std::vector<std::vector<unsafe_pair<local_key_type, value_type> > > values;
  values.resize(n);

  // Collect all of the flushed values
  for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i) {
    std::pair<int, local_key_type> g = get(global, i->first);
    values[g.first].push_back(std::make_pair(g.second, i->second));
  }

  // Transmit flushed values
  for (int p = 0; p < n; ++p) {
    if (!values[p].empty())
      send(process_group, p, property_map_multiput, values[p]);
  }
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
void PBGL_DISTRIB_PMAP::do_synchronize()
{
  if (data->model & cm_backward) {
    synchronize(data->process_group);
    return;
  }

  // Request refreshes of the values of our ghost cells
  data->refresh_ghost_cells();

  // Allows all of the multigets to get to their destinations
  synchronize(data->process_group);

  // Allows all of the multiget responses to get to their destinations
  synchronize(data->process_group);
}

template<typename ProcessGroup, typename GlobalMap, typename StorageMap>
template<typename Resolver>
void PBGL_DISTRIB_PMAP::data_t::do_reset()
{
  Resolver* resolver = get_default_value.template target<Resolver>();
  assert(resolver);

  for (iterator i = ghost_cells->begin(); i != ghost_cells->end(); ++i)
    const_cast<value_type&>(i->second) = (*resolver)(i->first);
}

} } // end namespace boost::parallel