Distributed queue adaptor
template<typename ProcessGroup, typename Buffer>
class distributed_queue
{
public:
typedef ProcessGroup process_group_type;
typedef Buffer buffer_type;
typedef typename buffer_type::value_type value_type;
typedef typename buffer_type::size_type size_type;
explicit
distributed_queue(const ProcessGroup& process_group = ProcessGroup(),
const Buffer& buffer = Buffer(),
bool polling = false);
distributed_queue(const ProcessGroup& process_group, bool polling);
void push(const value_type& x);
void pop();
value_type& top();
const value_type& top() const;
bool empty() const;
size_type size() const;
};
template<typename ProcessGroup, typename Buffer>
inline distributed_queue<ProcessGroup, Buffer>
make_distributed_queue(const ProcessGroup& process_group, const Buffer& buffer,
bool polling = false);
Классный шаблонРаспределённая очередьреализует распределенную очередь в группе процессов. Распределенная очередь представляет собой адаптер поверх существующей (местной) очереди, который должен моделировать концепциюБуфер. Каждый процесс хранит отдельную копию локальной очереди, из которой он извлекает или удаляет элементы черезpopиtop.Члены.
Тип значения локальной очереди должен быть моделью концепцииGlobal Descriptor.толкаетработу распределенной очереди, передает (через сообщение) значение собственному процессору. Таким образом, элементы в определенной локальной очереди гарантированно имеют процесс владения этой локальной очередью в качестве владельца.
Синхронизация распределенных очередей происходит в функцияхпустойиразмер, которые возвращают только значения "пустая" (истинное или 0, соответственно), когда вся распределенная очередь пуста. Если локальная очередь пуста, но распределенная очередь отсутствует, операция блокируется до тех пор, пока не изменится ни одно из условий. Когдаразмерфункция непустой очереди возвращается, она возвращает размер местной очереди. Эти семантики были выбраны таким образом, чтобы последовательный код, обрабатывающий элементы в очереди посредством следующего идиома, мог быть распараллелен посредством введения распределенной очереди:
distributed_queue<...> Q;
Q.push(x);
while (!Q.empty()) {
// do something, that may push a value onto Q
}
В параллельном варианте начальный толчокоперации поместит значениеxна очередь его владельца. Все процессы будут синхронизироваться при вызове в пустоту, и только процесс, владеющийx, сможет выполнить циклQ.empty()возвращает ложный. Эта итерация может, в свою очередь, подталкивать значения к другим удаленным очередям, поэтому, когда этот процесс завершает выполнение корпуса цикла, и все процессы снова синхронизируются впустом. Как только все локальные очереди пусты,Q.empty()возвращаетложныедля всех процессов.
Распределенная очередь может принимать сообщения в два разных времени: во время синхронизации и при опросепустой. Сообщения всегда принимаются во время синхронизации, чтобы можно было определить точные размеры очередей. Однако вопрос о том, должен липустойопрос для сообщений, определяется в качестве варианта для конструктора. Опрос может быть желателен, когда порядок, в котором обрабатываются элементы в очереди, не важен, поскольку он позволяет меньше шагов синхронизации и меньше накладных расходов на связь. Однако, когда требуются более строгие гарантии заказа, опрос может быть семантически неправильным. Отключив опрос, можно гарантировать, что параллельное выполнение с использованием идиомы выше не будет обрабатывать элемент на более позднем уровне до более раннего уровня.
Распределенная очередь почти моделирует концепцию буфера. Однако рутинатолканияне обязательно увеличивает результатразмера ()на единицу (хотя размер глобальной очереди увеличивается на единицу).
Member Functions
explicit
distributed_queue(const ProcessGroup& process_group = ProcessGroup(),
const Buffer& buffer = Buffer(),
bool polling = false);
Постройте новую распределенную очередь, которая передает данныепроцесс_группа, чья локальная очередь инициализируется черезбуфери которая может или не может опрашивать сообщения.
distributed_queue(const ProcessGroup& process_group, bool polling);
Создайте новую распределенную очередь, которая общается через даннуюprocess_group, локальная очередь которой по умолчанию инициируется и которая может или не может опрашивать сообщения.
void push(const value_type& x);
Нажмите элемент на распределенную очередь.
Элемент будет отправлен в процесс его владельца для добавления в локальную очередь этого процесса. Если опрос включен для этой очереди, и процесс владельца является текущим процессом, стоимость будет немедленно перенесена на местную очередь.
Сложность: O(1) сообщения размера O(размера(value_type)) будут переданы.
void pop();
Снимите элемент с местной очереди. Очередь не должна бытьпустой.
value_type& top();
const value_type& top();
Вернет верхний элемент в локальную очередь. Очередь не должна бытьпустой.
bool empty() const;
Определяет, является ли очередь пустой.
Когда местная очередь не пуста, возвращается правда. Если локальная очередь пуста, синхронизируется со всеми другими процессами в группе процессов до тех пор, пока либо (1) локальная очередь не пуста (возвращается истинной), (2) вся распределенная очередь пуста (возвращается ложной).
size_type size() const;
Определяет размер местной очереди.
Поведение этой рутины эквивалентно поведениюпустого, за исключением того, что когдапустоговозвращается истинно, эта функция возвращает размер локальной очереди, а когдапустоговозвращается ложно, эта функция возвращается нулевой.
Free Functions
template<typename ProcessGroup, typename Buffer>
inline distributed_queue<ProcessGroup, Buffer>
make_distributed_queue(const ProcessGroup& process_group, const Buffer& buffer,
bool polling = false);
Построение распределенной очереди.
Copyright (C) 2004, 2005 Попечители Университета Индианы.
Авторы: Дуглас Грегор и Эндрю Лумсдейн