К настоящему времени внимательный читатель думает: но, конечно, с Асио в частности, мы должны быть в состоянии сделать гораздо лучше, чем периодические опросы!
Это оказывается на удивление сложным. Мы предлагаем возможный подход в<examples/asio/round_robin.hpp
>.
Один из вариантов применения. Asio— это то, что вы всегда должны позволять Asio приостанавливать бегущую нить. С тех пор Asio знает о ожидающих запросах ввода/вывода, он может организовать приостановку потока таким образом, чтобы ОС разбудила его при завершении ввода/вывода. Никто другой не обладает достаточными знаниями.
Таким образом, планировщик волокон должен зависеть от Asio для подвески и возобновления. Требуется звонок обработчика Asio, чтобы разбудить его.
Одним из тревожных последствий является то, что мы не можем поддерживать несколько потоков, вызывающих<io_service::run()
>на одном и том же<io_service
>экземпляре. Причина в том, что Asio не предоставляет никакого способа ограничить вызов конкретного обработчика только на указанной нити. Пример планировщика волокон привязан к определенной нити: этот экземпляр не может управлять никакими другими нитями’s волокон. Тем не менее, если мы позволим нескольким нитям вызывать<io_service::run()
>на одном и том же<io_service
>экземпляре, планировщик волокон, который должен спать, не может гарантировать, что он своевременно проснется. Он может установить таймер Asio, как описано выше & #8212; но этот таймер & #8217 может работать на другом потоке!
Еще один вывод заключается в том, что, поскольку планировщик волокон Asio-aware (не говоря уже о<boost::fibers::asio::yield
>) зависит от вызовов обработчика от<io_service
>, приложение & #8217 несет ответственность за то, чтобы<io_service::stop()
>не вызывался до тех пор, пока каждое волокно не будет прекращено.
Легче рассуждать о поведении представленного<asio::round_robin
>планировщика, если мы требуем, чтобы после первоначальной настройки основным волокном потока & #8217 было волокно, которое вызывает<io_service::run()
>, поэтому давайте & #8217 наложим это требование.
Естественно, первое, что мы должны сделать на каждой нити, используя пользовательский планировщик волокон, это позвонить<use_scheduling_algorithm()
>. Однако, поскольку<asio::round_robin
>требует<io_service
>примера, мы должны сначала заявить об этом.
boost::asio::io_service io_svc;
boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_svc);
<use_scheduling_algorithm()
>мгновенно<asio::round_robin
>, который, естественно, называет своего конструктора:
round_robin( boost::asio::io_service & io_svc) :
io_svc_( io_svc),
suspend_timer_( io_svc_) {
boost::asio::add_service( io_svc_, new service( io_svc_));
}
<asio::round_robin
>связывает принятую<io_service
>ссылку и инициализирует<boost::asio::steady_timer
>:
boost::asio::io_service & io_svc_;
boost::asio::steady_timer suspend_timer_;
<boost::asio::add_service()
>,<service
>, [<service
>, [<service
>], [<service
>].
struct service : public boost::asio::io_service::service {
static boost::asio::io_service::id id;
std::unique_ptr< boost::asio::io_service::work > work_;
service( boost::asio::io_service & io_svc) :
boost::asio::io_service::service( io_svc),
work_{ new boost::asio::io_service::work( io_svc) } {
io_svc.post([&io_svc](){
...
});
}
virtual ~service() {}
service( service const&) = delete;
service & operator=( service const&) = delete;
void shutdown_service() override final {
work_.reset();
}
};
В<service
>строчке есть пара ролей.
Его главная роль заключается в управлении<std::unique_ptr<boost::asio::io_service::work
>>
. Мы хотим, чтобы экземпляр<io_service
>продолжал свою основную петлю, даже когда нет ожидающего ввода / вывода Asio.
io_service
может закрыться должным образом.
.
Его другое назначение —<post()
>лямбда (еще не показана). Пусть [8217] пойдет дальше по программе примеров, прежде чем вернуться, чтобы объяснить эту лямбду.
Конструктор<service
>возвращается к<asio::round_robin
>& #8217;s, который возвращается к<use_scheduling_algorithm()
>, который возвращается к коду приложения.
Как только приложение назовет<use_scheduling_algorithm()
>, оно может запустить несколько волокон:
tcp::acceptor a( io_svc, tcp::endpoint( tcp::v4(), 9999) );
boost::fibers::fiber( server, std::ref( io_svc), std::ref( a) ).detach();
const unsigned iterations = 2;
const unsigned clients = 3;
boost::fibers::barrier b( clients);
for ( unsigned i = 0; i < clients; ++i) {
boost::fibers::fiber(
client, std::ref( io_svc), std::ref( a), std::ref( b), iterations).detach();
}
Поскольку мы не указываем<launch
>, эти волокна готовы к работе, но еще не введены.
После того, как все настроено, приложение вызывает<io_service::run()
>:
io_svc.run();
Что теперь?
Поскольку этот<io_service
>экземпляр имеет<io_service::work
>экземпляр,<run()
>не сразу возвращается. Но — ни одно из волокон, которые будут выполнять реальную работу, еще даже не введено!
Без этого первоначального<post()
>вызова<service
>и #8217; конструкторничегоне произошло бы. Заявка будет висеть здесь.
Что же это такое?<post()
>.<this_fiber::yield()
>.
Это было бы многообещающим началом. Но у нас нет никакой гарантии, что любое из других волокон инициирует какие-либо операции Asio, чтобы держать мяч в движении. Все мы знаем, что любой другой волокно может достичь аналогичного<this_fiber::yield()
>вызова первым. Контроль вернется к обработчику<post()
>, который вернется к Asio, и... приложение повиснет.
<post()
>,<post()
>,<post()
>,<post()
>. Но, как обсуждалось впредыдущем разделе, как только есть фактические операции ввода/вывода в полете & #8212; как только мы достигнем состояния, в котором нет волокна готов & #8212; это приведет к вращению нити.
Мы могли бы, конечно, установить таймер Asio & #8212; опять же, какранее обсуждалось. Но в этом& #8220;глубокое погружение,& #8221;мы & #8217 пытаемся сделать немного лучше.
Ключом к улучшению является то, что, поскольку мы & #8217; находимся в оптоволокне, мы можем запустить фактическую петлю & #8212, а не просто цепочку обратных вызовов. Мы можем ждатьи #8220;что-то произойдети #8221;, позвонив<io_service::run_one()
>и #8212; или мы можем выполнить уже поставленные в очередь обработчики Asio, позвонив<io_service::poll()
>.
Здесь’s тело лямбды перешло на<post()
>вызов.
while ( ! io_svc.stopped() ) {
if ( boost::fibers::has_ready_fibers() ) {
while ( io_svc.poll() );
this_fiber::yield();
} else {
if ( ! io_svc.run_one() ) {
break;
}
}
}
Мы хотим, чтобы этот цикл вышел, как только<io_service
>экземпляр был<stopped()
>.
Пока есть готовые волокна, мы переплетаем готовые обработчики Asio с готовыми волокнами.
Если нет готовых волокон, мы ждем, позвонив<run_one()
>. Когда-то любой обработчик Asio был назван — независимо от того, какой —<run_one()
>возвращается. Этот обработчик, возможно, перевел некоторое волокно в готовое состояние, поэтому мы снова зацикливаемся, чтобы проверить.
(Мы выиграли & #8217;t описать<awakened()
>,<pick_next()
>или<has_ready_fibers()
>, так как они такие же, как<round_robin::awakened()
>,<round_robin::pick_next()
>и<round_robin::has_ready_fibers()
>.)
<suspend_until()
>и<notify()
>.
Сомневаюсь, что вы спрашивали себя: почему мы звоним<io_service::run_one()
>в лямбда-петлю? Почему бы не назвать его<suspend_until()
>, чей API был разработан именно для такой цели?
При нормальных обстоятельствах, когда оптоволоконный диспетчер не находит готовых волокон, он вызывает<algorithm::suspend_until()
>. Зачем тестировать<has_ready_fibers()
>в лямбда-петле? Почему бы не использовать нормальный механизм?
Ответ: важно, кто и #8217 спрашивает.
Рассмотрим лямбда-петлю, показанную выше. ЕдинственныеBoost.FiberAPI, которые он использует, это<has_ready_fibers()
>и<this_fiber::yield()
>.<yield()
>не блокируетвызывающее волокно: вызывающее волокно не становится неготовым. Он немедленно возвращается к<algorithm::awakened()
>, чтобы быть возобновленным в свою очередь, когда все другие готовые волокна имели возможность работать. Другими словами: во время вызова<yield()
>всегда есть по крайней мере одно готовое волокно.
Пока эта лямбда-петля еще работает, диспетчер волокон не звонит<suspend_until()
>, потому что у него всегда есть волокно, готовое к работе.
Тем не менее,<suspend_until()
>и<notify()
>на самом деле называются во время упорядоченной обработки отключения, поэтому давайте & #8217 попробуем правдоподобную реализацию.
void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept {
if ( suspend_timer_.expires_at() != abs_time) {
suspend_timer_.expires_at( abs_time);
suspend_timer_.async_wait([](boost::system::error_code const&){});
}
}
Как и следовало ожидать,<suspend_until()
>устанавливает<asio::steady_timer
>на<expires_at()
>пройденный<std::chrono::steady_clock::time_point
>. Обычно.
Как указано в комментариях, мы избегаем установки<suspend_timer_
>несколько раз нато же значение<time_point
>, поскольку каждый<expires_at()
>звонок отменяет любой предыдущий<async_wait()
>звонок. Есть шанс, что мы сможем покрутиться. Достижение<suspend_until()
>означает, что диспетчер волокон намерен передать процессор Asio. Отмена предыдущего вызова<async_wait()
>приведет к увольнению его обработчика, в результате чего<run_one()
>вернется, что потенциально может привести к тому, что менеджер волокна снова позвонит<suspend_until()
>с тем же значением<time_point
>.
Учитывая, что мы приостанавливаем поток, позвонив<io_service::run_one()
>, важно то, что наш<async_wait()
>вызов заставит обработчика работать, что заставит<run_one()
>вернуться. Это’s не так важно конкретно, что этот обработчик делает.
void notify() noexcept {
suspend_timer_.expires_at( std::chrono::steady_clock::now() );
}
Поскольку<expires_at()
>звонок отменяет любой предыдущий<async_wait()
>звонок, мы можем сделать<notify()
>просто звонок<steady_timer::expires_at()
>. Это приводит к тому, что<io_service
>вызывает<async_wait()
>обработчика<operation_aborted
>.
Комментарии в<notify()
>объясняют, почему мы называем<expires_at()
>, а не<cancel()
>.
Эта<boost::fibers::asio::round_robin
>реализация используется в<examples/asio/autoecho.cpp
>.
Возможно, вы могли бы собрать более элегантную интеграцию Fiber / Asio. Но, как было отмечено в начале: это & #8217;s хитрый.