Как только мы захотим собрать значения возврата из всех функций задачи, мы сразу увидим, как повторно использовать wait_first_value()
для этой цели. Все, что нам нужно сделать, это не закрывать его после первого значения!
Но на самом деле, сбор нескольких значений поднимает интересный вопрос: хотим ли мы на самом деле ждать, пока наступит самый медленный из них? Не лучше ли обработать каждый результат, как только он станет доступен?
Учитывая wait_all_values_source()
, легко реализовать wait_all_values()
:
template< typename Fn, typename ... Fns >
std::vector< typename std::result_of< Fn() >::type >
wait_all_values( Fn && function, Fns && ... functions) {
std::size_t count( 1 + sizeof ... ( functions) );
typedef typename std::result_of< Fn() >::type return_t;
typedef std::vector< return_t > vector_t;
vector_t results;
results.reserve( count);
std::shared_ptr< boost::fibers::unbounded_channel< return_t > > channel =
wait_all_values_source( std::forward< Fn >( function),
std::forward< Fns >( functions) ... );
return_t value;
while ( boost::fibers::channel_op_status::success == channel->pop(value) ) {
results.push_back( value);
}
return results;
}
Его можно было бы назвать так:
std::vector< std::string > values =
wait_all_values(
[](){ return sleeper("wav_late", 150); },
[](){ return sleeper("wav_middle", 100); },
[](){ return sleeper("wav_early", 50); });
Как вы можете видеть из цикла в wait_all_values()
, вместо того, чтобы требовать от своего абонента подсчета значений, мы определяем wait_all_values_source()
до unbounded_channel::close()
канал при выполнении. Но как нам это сделать? Каждый производитель волокон является независимым. Он не имеет понятия, является ли он последним для unbounded_channel::push()
значение.
Мы можем решить эту проблему с помощью счета façade для unbounded_channel<>
. На самом деле, наш façade нуждается только в поддержке продюсерского конца канала.
template< typename T >
class nchannel {
public:
nchannel( std::shared_ptr< boost::fibers::unbounded_channel< T > > cp,
std::size_t lm):
channel_( cp),
limit_( lm) {
assert(channel_);
if ( 0 == limit_) {
channel_->close();
}
}
boost::fibers::channel_op_status push( T && va) {
boost::fibers::channel_op_status ok =
channel_->push( std::forward< T >( va) );
if ( ok == boost::fibers::channel_op_status::success &&
--limit_ == 0) {
channel_->close();
}
return ok;
}
private:
std::shared_ptr< boost::fibers::unbounded_channel< T > > channel_;
std::size_t limit_;
};
Вооруженный nchannel<>
, мы можем реализовать wait_all_values_source()
. Начинается как wait_first_value()
. Разница заключается в том, что мы обертываем unbounded_channel<T>
с помощью nchannel<T>
для передачи к волокнам производителя.
Затем, конечно, вместо того, чтобы выскакивать первое значение, закрывая канал и возвращая его, мы просто возвращаем shared_ptr<unbounded_channel<T>>
.
template< typename Fn, typename ... Fns >
std::shared_ptr< boost::fibers::unbounded_channel< typename std::result_of< Fn() >::type > >
wait_all_values_source( Fn && function, Fns && ... functions) {
std::size_t count( 1 + sizeof ... ( functions) );
typedef typename std::result_of< Fn() >::type return_t;
typedef boost::fibers::unbounded_channel< return_t > channel_t;
auto channelp( std::make_shared< channel_t >() );
auto ncp( std::make_shared< nchannel< return_t > >( channelp, count) );
wait_all_values_impl< return_t >( ncp,
std::forward< Fn >( function),
std::forward< Fns >( functions) ... );
return channelp;
}
Например:
std::shared_ptr< boost::fibers::unbounded_channel< std::string > > channel =
wait_all_values_source(
[](){ return sleeper("wavs_third", 150); },
[](){ return sleeper("wavs_second", 100); },
[](){ return sleeper("wavs_first", 50); });
std::string value;
while ( boost::fibers::channel_op_status::success == channel->pop(value) ) {
std::cout << "wait_all_values_source() => '" << value
<< "'" << std::endl;
}
wait_all_values_impl()wait_first_value_impl()
, за исключением использования nchannel<
:
template< typename T, typename Fn >
void wait_all_values_impl( std::shared_ptr< nchannel< T > > channel,
Fn && function) {
boost::fibers::fiber( [channel, function](){
channel->push(function());
}).detach();
}