Concurrent Sort
并发排序
在《APUE》中pthread_barrier处有一个例子是开8个线程对80万个元素进行排序,这是我第一次接触并发算法,感觉很有趣于是记录一下。
思路是这样的:每个线程对“待排序元素个数/线程数”个元素进行进行排序(局部排序),然后最后通过一个合并算法来统合所有元素。
算法如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69template<
std::size_t numthread = 1, typename RI1, typename RI2, typename BinaryPred,
typename V = typename std::iterator_traits<RI1>::value_type
>
void merge_impl(RI1 first1, RI1 last1,
RI2 first2,
BinaryPred pred)
{
int num = last1 - first1;
int numperthread = num / numthread;
int idx[numthread];
//idx: record the first element index of sorted range
for(int i = 0; i < numthread; ++i){
idx[i] = i * numperthread;
}
//ridx:result index
for(int ridx = 0; ridx != num; ++ridx){
/*int min_element = INT_MAX;
int min_idx = 0;
for(int tidx = 1; tidx != numthread; ++tidx){
if((idx[tidx] < (tidx + 1) * numperthread) && (pred(*(first1 + idx[tidx]), min_element))){
min_element = *(first1 + idx[tidx]);
min_idx = tidx;
}
}
*(first2 + ridx) = *(first1 + idx[min_idx]);
++idx[min_idx];*/
int sz = 0;
V minarr[numthread];
for(int tidx = 0; tidx != numthread; ++tidx){
if((idx[tidx] < (tidx + 1) * numperthread)){
minarr[sz++] = *(first1 + idx[tidx]);
}
}
auto min_iter = std::min_element(minarr, minarr + sz, pred);
*(first2 + ridx) = *min_iter;
++idx[min_iter - minarr];
}
}
template<
std::size_t numthread = 1, typename RI1, typename RI2,
typename V = typename std::iterator_traits<RI1>::value_type,
typename BinaryPred = std::less<V>
>
void mymerge(RI1 first1, RI1 last1,
RI2 first2,
BinaryPred pred = {})
{
using V1 = typename std::iterator_traits<RI1>::value_type;
using V2 = typename std::iterator_traits<RI2>::value_type;
static_assert(std::is_convertible<V1, V2>::value || std::is_convertible<V2, V1>::value,
" value_type doesn't convertible\n");
merge_impl<numthread>(first1, last1,
first2,
pred);
}
这个算法我改良了一下,注释的部分是原文的,但是它只适用于int,为了让它能够与泛型搭配,适用于任何类型(提供了operator<或定制谓词的),用一个容器装符合条件的元素,并用std::min_element得到对应的迭代器,得到其位置和值。
merge算法思路异常简单:
- 开一个数组idx记录每个线程排序区间的head
- 对这些head进行比较(满足索引在线程排序区间中),由于不用所谓的min_element记录,所以用一个容器装,用pred谓词进行筛选,被选中的元素加入res数组,并将被选择的head所在的索引前进一位
当然,为了兼容STL,接口风格是\
除此之外,我还有两个收获:
- 如果该位置后面的元素都可以被推断出来,那么可以给该位置放置默认模板参数(非类型或类型),这种称为
depending on following arguments
的default arguments,如果该位置后面的元素提供了默认模板参数亦然可以,和已推断等效 - 一开始std::less<>不是这么写的:
BinaryPred pred = std::less<V>
,但是很明显不对,因为此时V是未决类型,编译器此时正在进行模式匹配,从而推断类型,而V并未推断出来,所以编译器不认得,自然报错
改为模板类型参数,然后默认构造就好了(无state的function object)
主函数部分:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
constexpr int gNumThread = 8;
constexpr int gNumElement = 800000;
constexpr int gNumPerThread = gNumElement / gNumThread;
pthread_barrier_t b;
double nums[gNumElement];
double res[gNumElement];
int main()
{
struct timeval beg, end;
std::default_random_engine dre{};
std::uniform_int_distribution<int> di(0, 9999);
std::uniform_real_distribution<double> dd(0, 9999.0);
for(int i = 0; i != gNumElement; ++i){
nums[i] = dd(dre);
res[i] = 0;
}
gettimeofday(&beg, NULL);
pthread_barrier_init(&b, NULL, gNumThread + 1);
for(int i = 0; i != gNumThread; ++i){
//auto g = MutexGuard{lock};
int idx = i * gNumPerThread;
//printf("idx addr: %p\n", &idx);
//gIdx = i * gNumPerThread;
Thread thr([idx](){
quicksort(nums + idx, nums + idx + gNumPerThread);
pthread_barrier_wait(&b);
},
"thread");
//Thread thr(fun, "thread");
thr.setBarriered(true);
thr.start();
/*pthread_t tid;
TCHECK(pthread_create(&tid, NULL, fun2, (void*)&idx));*/
}
pthread_barrier_wait(&b);
pthread_barrier_destroy(&b);
mymerge<gNumThread>(&*nums, nums + gNumElement,
&*res);
gettimeofday(&end, NULL);
constexpr int useconds = 1000000;
const auto startusec = beg.tv_sec * useconds + beg.tv_usec;
const auto endusec = end.tv_sec * useconds + end.tv_usec;
const auto time = static_cast<double>(endusec - startusec) / useconds;
printf("sort total took %.4f\n", time);
for(int i = 0; i <= gNumElement; ++i){
if(i % 10 == 0)
printf("%d~%d: ", i + 1, i + 10);
printf("%.3f ", res[i]);
if((i + 1) % 10 == 0)
puts("");
}
}
经个人实践,C++想要做到线程安全,比如传参,通过pthread_create是不行的,因为C++不允许int->void*(reinterpret_cast)也别想,而通过地址传值,那就意味着很容易被修改,如前面的for循环,idx如果是地址传值,那么下一个循环idx的值就被改了,于是悲剧了,你可能会说加锁,加锁的话,其他线程的创建就会是原子性的,那么多线程的优势直接没了,其实我们需要的就是原子性传参,那么拷贝一份即可,用function object和lambda以拷贝形式封装该state即可(现在的看法)