PDF版 ePub版

# 8.5 在实践中设计并发代码

## 8.5.1 并行实现：std::for_each

std::for_each的原理很简单：其对某个范围中的元素，依次调用用户提供的函数。并行和串行调用的最大区别就是函数的调用顺序。std::for_each是对范围中的第一个元素调用用户函数，接着是第二个，以此类推，而在并行实现中对于每个元素的处理顺序就不能保证了，并且它们可能(我们希望如此)被并发的处理。

template<typename Iterator,typename Func>
void parallel_for_each(Iterator first,Iterator last,Func f)
{
unsigned long const length=std::distance(first,last);

if(!length)
return;

Iterator block_start=first;
{
Iterator block_end=block_start;
[=]()
{
std::for_each(block_start,block_end,f);
});
block_start=block_end;
}
std::for_each(block_start,last,f);
{
futures[i].get();  // 4
}
}

template<typename Iterator,typename Func>
void parallel_for_each(Iterator first,Iterator last,Func f)
{
unsigned long const length=std::distance(first,last);

if(!length)
return;

{
std::for_each(first,last,f);  // 1
}
else
{
Iterator const mid_point=first+length/2;
std::future<void> first_half=  // 2
std::async(&parallel_for_each<Iterator,Func>,
first,mid_point,f);
parallel_for_each(mid_point,last,f);  // 3
first_half.get();  // 4
}
}

## 8.5.2 并行实现：std::find

template<typename Iterator,typename MatchType>
Iterator parallel_find(Iterator first,Iterator last,MatchType match)
{
struct find_element  // 1
{
void operator()(Iterator begin,Iterator end,
MatchType match,
std::promise<Iterator>* result,
std::atomic<bool>* done_flag)
{
try
{
{
if(*begin==match)
{
result->set_value(begin);  // 3
done_flag->store(true);  // 4
return;
}
}
}
catch(...)  // 5
{
try
{
result->set_exception(std::current_exception());  // 6
done_flag->store(true);
}
catch(...)  // 7
{}
}
}
};

unsigned long const length=std::distance(first,last);

if(!length)
return last;

std::promise<Iterator> result;  // 8
std::atomic<bool> done_flag(false);  // 9
{  // 10

Iterator block_start=first;
{
Iterator block_end=block_start;
block_start,block_end,match,
&result,&done_flag);
block_start=block_end;
}
find_element()(block_start,last,match,&result,&done_flag);  // 12
}
{
return last;
}
return result.get_future().get();  // 14
}

template<typename Iterator,typename MatchType>  // 1
Iterator parallel_find_impl(Iterator first,Iterator last,MatchType match,
std::atomic<bool>& done)
{
try
{
unsigned long const length=std::distance(first,last);
unsigned long const min_per_thread=25;  // 2
{
{
if(*first==match)
{
done=true;  // 5
return first;
}
}
return last;  // 6
}
else
{
Iterator const mid_point=first+(length/2);  // 7
std::future<Iterator> async_result=
std::async(&parallel_find_impl<Iterator,MatchType>,  // 8
mid_point,last,match,std::ref(done));
Iterator const direct_result=
parallel_find_impl(first,mid_point,match,done);  // 9
return (direct_result==mid_point)?
async_result.get():direct_result;  // 10
}
}
catch(...)
{
done=true;  // 11
throw;
}
}

template<typename Iterator,typename MatchType>
Iterator parallel_find(Iterator first,Iterator last,MatchType match)
{
std::atomic<bool> done(false);
return parallel_find_impl(first,last,match,done);  // 12
}

OK，现在你已经使用了并行化的std::find。如在本节开始说的那样，其他相似算法不需要对每一个数据元素进行处理，并且同样的技术可以使用到这些类似的算法上去。我们将在第9章中看到“中断线程”的问题。

## 8.5.3 并行实现：std::partial_sum

std::partial_sum会计算给定范围中的每个元素，并用计算后的结果将原始序列中的值替换掉。比如，有一个序列[1，2，3，4，5]，在执行该算法后会成为：[1，3(1+2)，6(1+2+3)，10(1+2+3+4)，15(1+2+3+4+5)]。让这样一个算法并行起来会很有趣，因为这里不能讲任务分块，对每一块进行独立的计算。比如，原始序列中的第一个元素需要加到后面的一个元素中去。

template<typename Iterator>
void parallel_partial_sum(Iterator first,Iterator last)
{
typedef typename Iterator::value_type value_type;

struct process_chunk  // 1
{
void operator()(Iterator begin,Iterator last,
std::future<value_type>* previous_end_value,
std::promise<value_type>* end_value)
{
try
{
Iterator end=last;
++end;
std::partial_sum(begin,end,begin);  // 2
if(previous_end_value)  // 3
{
if(end_value)
{
end_value->set_value(*last);  // 6
}
{
});
}
else if(end_value)
{
end_value->set_value(*last);  // 8
}
}
catch(...)  // 9
{
if(end_value)
{
end_value->set_exception(std::current_exception());  // 10
}
else
{
throw;  // 11
}
}
}
};

unsigned long const length=std::distance(first,last);

if(!length)
return last;

unsigned long const min_per_thread=25;  // 12

typedef typename Iterator::value_type value_type;

std::vector<std::promise<value_type> >
std::vector<std::future<value_type> >
previous_end_values;  // 15

Iterator block_start=first;
{
Iterator block_last=block_start;
block_start,block_last,
(i!=0)?&previous_end_values[i-1]:0,
&end_values[i]);
block_start=block_last;
++block_start;  // 19
previous_end_values.push_back(end_values[i].get_future());  // 20
}
Iterator final_element=block_start;
process_chunk()(block_start,final_element,  // 22
0);
}

OK，现在来看一下process_chunk函数对象①。对于整块的处理是始于对std::partial_sum的调用，包括对于最后一个值的处理②，不过得要知道当前块是否是第一块③。如果当前块不是第一块，就会有一个previous_end_value值从前面的块传过来，所以这里需要等待这个值的产生④。为了将算法最大程度的并行，首先需要对最后一个元素进行更新⑤，这样你就能将这个值传递给下一个数据块(如果有下一个数据块的话)⑥。当完成这个操作，就可以使用std::for_each和简单的lambda函数⑦对剩余的数据项进行更新。

class barrier
{
unsigned const count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;
public:
explicit barrier(unsigned count_):  // 1
count(count_),spaces(count),generation(0)
{}

void wait()
{
unsigned const my_generation=generation;  // 2
if(!--spaces)  // 3
{
spaces=count;  // 4
++generation;  // 5
}
else
{
while(generation==my_generation)  // 6
}
}
};

std::atomic<unsigned> count;

spaces=count.load();

void done_waiting()
{
--count;  // 1
if(!--spaces)  // 2
{
++generation;
}
}

struct barrier
{
std::atomic<unsigned> count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;

barrier(unsigned count_):
count(count_),spaces(count_),generation(0)
{}

void wait()
{
if(!--spaces)
{
++generation;
}
else
{
{
}
}
}

void done_waiting()
{
--count;
if(!--spaces)
{
++generation;
}
}
};

template<typename Iterator>
void parallel_partial_sum(Iterator first,Iterator last)
{
typedef typename Iterator::value_type value_type;

struct process_element  // 1
{
void operator()(Iterator first,Iterator last,
std::vector<value_type>& buffer,
unsigned i,barrier& b)
{
value_type& ith_element=*(first+i);
bool update_source=false;

for(unsigned step=0,stride=1;stride<=i;++step,stride*=2)
{
value_type const& source=(step%2)?  // 2
buffer[i]:ith_element;

value_type& dest=(step%2)?
ith_element:buffer[i];

buffer[i-stride]:*(first+i-stride);

update_source=!(step%2);
b.wait();  // 5
}
if(update_source)  // 6
{
ith_element=buffer[i];
}
b.done_waiting();  // 7
}
};

unsigned long const length=std::distance(first,last);

if(length<=1)
return;

std::vector<value_type> buffer(length);
barrier b(length);

Iterator block_start=first;
for(unsigned long i=0;i<(length-1);++i)
{
}