Для того, чтобы поддерживать широкий диапазон поведения управления исполнением, корутинные типысимметричный_корутин<>иасимметричный_корутин<>можно использовать дляпобега и возвращенияпетли,побег и возвращениерекурсивные вычисления и длякооперативныеМногозадачность помогает решать задачи гораздо проще и элегантнее, чем при одном потоке управления.
Модель, управляемая событиями, является парадигмой программирования, где поток программы определяется событиями. События генерируются несколькими независимыми источниками, и диспетчер событий, ожидающий всех внешних источников, запускает функции обратного вызова (операторы событий) всякий раз, когда обнаруживается одно из этих событий (петля событий). Приложение делится на выбор событий (обнаружение) и обработку событий.

Полученные приложения являются высокомасштабируемыми, гибкими, имеют высокую отзывчивость, а компоненты слабо связаны. Это делает модель, управляемую событиями, подходящей для приложений пользовательского интерфейса, систем производства на основе правил или приложений, имеющих дело с асинхронным I/O (например, сетевые серверы).
Классическая синхронная консольная программа выдает запрос ввода/вывода (например, для ввода пользователем или данных файловой системы) и блокирует, пока запрос не будет завершен.
Напротив, асинхронная функция ввода/вывода инициирует физическую операцию, но сразу же возвращается к абоненту, даже если операция еще не завершена. Программа, написанная для использования этой функции, не блокирует: она может выполнять другую работу (включая другие запросы ввода/вывода параллельно), пока исходная операция еще не завершена. После завершения операции программа уведомляется. Поскольку асинхронные приложения тратят меньше времени на ожидание операций, они могут превзойти синхронные программы.
События являются одной из парадигм асинхронного исполнения, но не все асинхронные системы используют события. Хотя асинхронное программирование может быть выполнено с использованием потоков, они имеют свои собственные затраты:
- Трудно программировать (ловушки для неосторожных)
- Требования к памяти высокие
- большие накладные расходы с созданием и поддержанием состояния нити
- Переключение контекста между потоками
Асинхронная модель на основе событий позволяет избежать таких проблем:
- Проще благодаря единому потоку инструкций
- Менее дорогие переключатели контекста
Недостатком этой парадигмы является субоптимальная программная структура. Программа, управляемая событиями, должна разделить свой код на несколько небольших функций обратного вызова, то есть код организован в последовательности небольших шагов, которые выполняются периодически. Алгоритм, который обычно выражается в виде иерархии функций и циклов, должен быть преобразован в обратный вызов. Полное состояние должно храниться в структуре данных, в то время как поток управления возвращается в контур событий. Как следствие, приложения, управляемые событиями, часто утомительны и запутанны в написании. Каждый возврат вызова вводит новый объем, возврат ошибки и т. Д. Последовательный характер алгоритма разделен на несколько calltacks, что затрудняет отладку приложения. Обработчики исключений ограничены локальными обработчиками: невозможно обернуть последовательность событий в один блок поиска. Использование локальных переменных, в то время как/для циклов, рекурсий и т.д. вместе с контуром событий невозможно. Код становится менее выразительным.
В прошлом код, использующий асинхронные операции асио, был запутанным функциями обратного вызова.
class session
{
public:
session(boost::asio::io_service& io_service) :
socket_(io_service)
{}
tcp::socket& socket(){
return socket_;
}
void start(){
socket_.async_read_some(boost::asio::buffer(data_,max_length),
boost::bind(&session::handle_read,this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
private:
void handle_read(const boost::system::error_code& error,
size_t bytes_transferred){
if (!error)
boost::asio::async_write(socket_,
boost::asio::buffer(data_,bytes_transferred),
boost::bind(&session::handle_write,this,
boost::asio::placeholders::error));
else
delete this;
}
void handle_write(const boost::system::error_code& error){
if (!error)
socket_.async_read_some(boost::asio::buffer(data_,max_length),
boost::bind(&session::handle_read,this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
else
delete this;
}
boost::asio::ip::tcp::socket socket_;
enum { max_length=1024 };
char data_[max_length];
};
В этом примере простой эхо-сервер, логика разделена на три функции-члена - локальное состояние (например, буфер данных) перемещается в переменные-члена.
Boost.Asioпредоставляет свой новыйасинхронный результатс новой структурой, объединяющей модель, управляемую событиями, и корутинами, скрывая сложность программирования, управляемого событиями, и позволяя стиль классического последовательного кода. Приложение не обязано передавать функции обратного вызова асинхронным операциям, а локальное состояние сохраняется как локальные переменные. Поэтому код намного проще читать и понимать..boost::asio::yield_contextвнутренне используетBoost.Coroutine:
void session(boost::asio::io_service& io_service){
boost::asio::ip::tcp::socket socket(io_service);
try{
for(;;){
char data[max_length];
boost::system::error_code ec;
std::size_t length=socket.async_read_some(
boost::asio::buffer(data),
boost::asio::yield[ec]);
if (ec==boost::asio::error::eof)
break;
else if(ec)
throw boost::system::system_error(ec);
boost::asio::async_write(
socket,
boost::asio::buffer(data,length),
boost::asio::yield[ec]);
if (ec==boost::asio::error::eof)
break;
else if(ec)
throw boost::system::system_error(ec);
}
} catch(std::exception const& e){
std::cerr<<"Exception: "<<e.what()<<"\n";
}
}
В отличие от предыдущего примера, этот создает впечатление последовательного кода и локальных данныхпри использовании асинхронных операцийasync_read(),async_write(). Алгоритм реализован в одной функции, а обработка ошибок выполняется одним блоком поиска.
Для тех, кто знает SAX, фраза «рекурсивный анализ SAX» может показаться бессмысленной. Вы получаете обратные вызовы от SAX; вы должны управлять стеком элементов самостоятельно. Если вы хотите рекурсивную обработку XML, вы должны сначала прочитать весь DOM в память, а затем пройтись по дереву.
Но корутина позволяет вам перевернуть поток управления, чтобы вы могли запросить события SAX. Как только вы это сделаете, вы сможете обрабатывать их рекурсивно.
struct BaseEvent{
BaseEvent(const BaseEvent&)=delete;
BaseEvent& operator=(const BaseEvent&)=delete;
};
struct CloseEvent: public BaseEvent{
CloseEvent(const xml::sax::Parser::TagType& name):
mName(name)
{}
const xml::sax::Parser::TagType& mName;
};
struct OpenEvent: public CloseEvent{
OpenEvent(const xml::sax::Parser::TagType& name,
xml::sax::AttributeIterator& attrs):
CloseEvent(name),
mAttrs(attrs)
{}
xml::sax::AttributeIterator& mAttrs;
};
struct TextEvent: public BaseEvent{
TextEvent(xml::sax::CharIterator& text):
mText(text)
{}
xml::sax::CharIterator& mText;
};
typedef boost::coroutines::asymmetric_coroutine<const BaseEvent&> coro_t;
void parser(coro_t::push_type& sink,std::istream& in){
xml::sax::Parser xparser;
xparser.startDocument([&sink](const xml::sax::Parser::TagType& name,
xml::sax::AttributeIterator& attrs)
{
sink(OpenEvent(name,attrs));
});
xparser.startTag([&sink](const xml::sax::Parser::TagType& name,
xml::sax::AttributeIterator& attrs)
{
sink(OpenEvent(name,attrs));
});
xparser.endTag([&sink](const xml::sax::Parser::TagType& name)
{
sink(CloseEvent(name));
});
xparser.endDocument([&sink](const xml::sax::Parser::TagType& name)
{
sink(CloseEvent(name));
});
xparser.characters([&sink](xml::sax::CharIterator& text)
{
sink(TextEvent(text));
});
try
{
xparser.parse(in);
}
catch (xml::Exception e)
{
throw std::runtime_error(exception_name(e));
}
}
const CloseEvent& process(coro_t::pull_type& events,const OpenEvent& context,
const std::string& indent=""){
xml::sax::Parser::TagType tagName = context.mName;
while(events()){
const BaseEvent& event=events.get();
const OpenEvent* oe;
const CloseEvent* ce;
const TextEvent* te;
if((oe=dynamic_cast<const OpenEvent*>(&event))){
process(events,*oe,indent+" ");
}
else if((ce=dynamic_cast<const CloseEvent*>(&event))){
assert(ce->mName == tagName);
return *ce;
}
else if((te=dynamic_cast<const TextEvent*>(&event))){
std::cout<<indent<<"text: '"<<te->mText.getText()<<"'\n";
}
}
}
std::istringstream in(doc);
try
{
coro_t::pull_type events(std::bind(parser,_1,std::ref(in)));
assert(events);
const OpenEvent& context=dynamic_cast<const OpenEvent&>(events.get());
process(events, context);
}
catch (std::exception& e)
{
std::cout << "Parsing error: " << e.what() << '\n';
}
Эта проблема совсем не подходит для общения между независимыми потоками. Нет смысла, чтобы обе стороны действовали независимо друг от друга. Вы хотите, чтобы они передавали контроль туда и обратно.
Решение включает в себя небольшую полиморфную иерархию событий класса, к которой мы передаем ссылки. Фактические экземпляры являются временными на стеке корутина; корутина передает каждую ссылку в свою очередь к основной логике. Копирование их как ценностей базового класса разрежет их.
Если бы мы попытались позволить парсеру SAX действовать независимо от логики потребления, можно было бы представить распределение экземпляров подкласса событий на куче, передавая их по безвредной очереди указателей. Но это также не работает, потому что эти классы событий связывают ссылки, переданные парсером SAX. В тот момент, когда парсер движется дальше, эти ссылки становятся недействительными.
Вместо того, чтобы связыватьTagType&ссылка, мы могли бы сохранить копиюТагтайпвЗакрыть Событие. Но это не решает всей проблемы. Для атрибутов мы получаем.AttributeIterator&; для текста получаемCharIterator&. Хранение копии этих итераторов бессмысленно: как только парсер движется дальше, эти итераторы недействительны. Вы должны обработать итератор атрибутов (или итератор символов) во время обратного вызова SAX для этого события.
Естественно, мы могли бы получить и сохранить копию каждого атрибута и его значения; мы могли бы сохранить копию каждого фрагмента текста. Это фактически был бы весь текст в документе - высокая цена, если причина, по которой мы используем SAX, - это озабоченность по поводу установки всего DOM в память.
Есть еще одно преимущество использования корутин. Этот SAX парсер делает исключение, когда парсинг терпит неудачу. При реализации корутина вам нужно только обернуть код вызова в try/catch.
С помощью коммуникационных потоков вам придется договориться, чтобы поймать исключение и передать указатель исключения в той же очереди, которую вы используете для доставки других событий. Затем вам придется перебросить исключение, чтобы отключить рекурсивную обработку документов.
Корутинное решение очень естественно отображает проблемное пространство.
Преимущества суспендирования на произвольной глубине вызова можно увидеть особенно четко с использованием рекурсивной функции, такой как обход деревьев. Если пересечение двух разных деревьев в одном и том же детерминистском порядке приводит к одному и тому же списку листовых узлов, то оба дерева имеют один и тот же край.

Оба дерева на картине имеют одинаковую оконечность, хотя структура деревьев отличается.
Та же самая пограничная проблема может быть решена с помощью корутин путем итерации по узлам листа и сравнения этой последовательности черезstd::equal(). Диапазон значений данных генерируется функциейtraverse(), который рекурсивно пересекает дерево и передает значение данных каждого узла егоasymmetric_coroutine<>::push_type.asymmetric_coroutine<>::push_typeприостанавливает рекурсивные вычисления и передает значение данных в основной контекст исполнения.asymmetric_coroutine<>::pull_type::iterator, созданный изasymmetric_coroutine<>::pull_type, перешагивает эти значения данных и доставляет их вstd::equal()Для сравнения. Каждое приращениеasymmetric_coroutine<>::pull_type::iteratorвозобновляетtraverse().. По возвращении изитератор::оператор++(), либо доступно новое значение данных, либо завершено прохождение дерева (итератор недействителен).
По сути, корутинный итератор представляет сглаженный вид рекурсивной структуры данных.
struct node{
typedef boost::shared_ptr<node> ptr_t;
ptr_t left,right;
std::string value;
node(const std::string& v):
left(),right(),value(v)
{}
node(ptr_t l,const std::string& v,ptr_t r):
left(l),right(r),value(v)
{}
static ptr_t create(const std::string& v){
return ptr_t(new node(v));
}
static ptr_t create(ptr_t l,const std::string& v,ptr_t r){
return ptr_t(new node(l,v,r));
}
};
node::ptr_t create_left_tree_from(const std::string& root){
return node::create(
node::create(
node::create("a"),
"b",
node::create("c")),
root,
node::create("e"));
}
node::ptr_t create_right_tree_from(const std::string& root){
return node::create(
node::create("a"),
root,
node::create(
node::create("c"),
"d",
node::create("e")));
}
void traverse(node::ptr_t n,
boost::coroutines::asymmetric_coroutine<std::string>::push_type& out){
if(n->left) traverse(n->left,out);
out(n->value);
if(n->right) traverse(n->right,out);
}
{
node::ptr_t left_d(create_left_tree_from("d"));
boost::coroutines::asymmetric_coroutine<std::string>::pull_type left_d_reader(
[&]( boost::coroutines::asymmetric_coroutine<std::string>::push_type & out){
traverse(left_d,out);
});
node::ptr_t right_b(create_right_tree_from("b"));
boost::coroutines::asymmetric_coroutine<std::string>::pull_type right_b_reader(
[&]( boost::coroutines::asymmetric_coroutine<std::string>::push_type & out){
traverse(right_b,out);
});
std::cout << "left tree from d == right tree from b? "
<< std::boolalpha
<< std::equal(boost::begin(left_d_reader),
boost::end(left_d_reader),
boost::begin(right_b_reader))
<< std::endl;
}
{
node::ptr_t left_d(create_left_tree_from("d"));
boost::coroutines::asymmetric_coroutine<std::string>::pull_type left_d_reader(
[&]( boost::coroutines::asymmetric_coroutine<std::string>::push_type & out){
traverse(left_d,out);
});
node::ptr_t right_x(create_right_tree_from("x"));
boost::coroutines::asymmetric_coroutine<std::string>::pull_type right_x_reader(
[&]( boost::coroutines::asymmetric_coroutine<std::string>::push_type & out){
traverse(right_x,out);
});
std::cout << "left tree from d == right tree from x? "
<< std::boolalpha
<< std::equal(boost::begin(left_d_reader),
boost::end(left_d_reader),
boost::begin(right_x_reader))
<< std::endl;
}
std::cout << "Done" << std::endl;
output:
left tree from d == right tree from b? true
left tree from d == right tree from x? false
Done
Этот пример показывает, как симметричные корутины объединяют два сортированных массива.
std::vector<int> merge(const std::vector<int>& a,const std::vector<int>& b){
std::vector<int> c;
std::size_t idx_a=0,idx_b=0;
boost::coroutines::symmetric_coroutine<void>::call_type *other_a=0,*other_b=0;
boost::coroutines::symmetric_coroutine<void>::call_type coro_a(
[&](boost::coroutines::symmetric_coroutine<void>::yield_type& yield){
while(idx_a<a.size()){
if(b[idx_b]<a[idx_a])
yield(*other_b);
c.push_back(a[idx_a++]);
}
while(idx_b<b.size())
c.push_back(b[idx_b++]);
});
boost::coroutines::symmetric_coroutine<void>::call_type coro_b(
[&](boost::coroutines::symmetric_coroutine<void>::yield_type& yield){
while(idx_b<b.size()){
if(a[idx_a]<b[idx_b])
yield(*other_a);
c.push_back(b[idx_b++]);
}
while(idx_a<a.size())
c.push_back(a[idx_a++]);
});
other_a=&coro_a;
other_b=&coro_b;
coro_a();
return c;
}
Этот код показывает, как могут быть прикованы корутины.
typedef boost::coroutines::asymmetric_coroutine<std::string> coro_t;
void readlines(coro_t::push_type& sink,std::istream& in){
std::string line;
while(std::getline(in,line))
sink(line);
}
void tokenize(coro_t::push_type& sink, coro_t::pull_type& source){
BOOST_FOREACH(std::string line,source){
std::string::size_type pos=0;
while(pos<line.length()){
if(line[pos]=='"'){
std::string token;
++pos;
while(pos<line.length()&&line[pos]!='"')
token+=line[pos++];
++pos;
sink(token);
} else if (std::isspace(line[pos])){
++pos;
} else if (std::isalpha(line[pos])){
std::string token;
while (pos < line.length() && std::isalpha(line[pos]))
token += line[pos++];
sink(token);
} else {
sink(std::string(1,line[pos++]));
}
}
}
}
void only_words(coro_t::push_type& sink,coro_t::pull_type& source){
BOOST_FOREACH(std::string token,source){
if (!token.empty() && std::isalpha(token[0]))
sink(token);
}
}
void trace(coro_t::push_type& sink, coro_t::pull_type& source){
BOOST_FOREACH(std::string token,source){
std::cout << "trace: '" << token << "'\n";
sink(token);
}
}
struct FinalEOL{
~FinalEOL(){
std::cout << std::endl;
}
};
void layout(coro_t::pull_type& source,int num,int width){
FinalEOL eol;
for (;;){
for (int i = 0; i < num; ++i){
if (!source) return;
std::cout << std::setw(width) << source.get();
source();
}
std::cout << std::endl;
}
}
std::string data(
"This is the first line.\n"
"This, the second.\n"
"The third has \"a phrase\"!\n"
);
{
std::cout << "\nfilter:\n";
std::istringstream infile(data);
coro_t::pull_type reader(boost::bind(readlines, _1, boost::ref(infile)));
coro_t::pull_type tokenizer(boost::bind(tokenize, _1, boost::ref(reader)));
coro_t::pull_type filter(boost::bind(only_words, _1, boost::ref(tokenizer)));
coro_t::pull_type tracer(boost::bind(trace, _1, boost::ref(filter)));
BOOST_FOREACH(std::string token,tracer){
}
}
{
std::cout << "\nlayout() as coroutine::push_type:\n";
std::istringstream infile(data);
coro_t::pull_type reader(boost::bind(readlines, _1, boost::ref(infile)));
coro_t::pull_type tokenizer(boost::bind(tokenize, _1, boost::ref(reader)));
coro_t::pull_type filter(boost::bind(only_words, _1, boost::ref(tokenizer)));
coro_t::push_type writer(boost::bind(layout, _1, 5, 15));
BOOST_FOREACH(std::string token,filter){
writer(token);
}
}
{
std::cout << "\nfiltering output:\n";
std::istringstream infile(data);
coro_t::pull_type reader(boost::bind(readlines,_1,boost::ref(infile)));
coro_t::pull_type tokenizer(boost::bind(tokenize,_1,boost::ref(reader)));
coro_t::push_type writer(boost::bind(layout,_1,5,15));
coro_t::push_type filter(boost::bind(only_words,boost::ref(writer),_1));
BOOST_FOREACH(std::string token,tokenizer){
filter(token);
}
}