并发排序

在《APUE》中pthread_barrier处有一个例子是开8个线程对80万个元素进行排序,这是我第一次接触并发算法,感觉很有趣于是记录一下。

思路是这样的:每个线程对“待排序元素个数/线程数”个元素进行进行排序(局部排序),然后最后通过一个合并算法来统合所有元素。
算法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
template<
std::size_t numthread = 1, typename RI1, typename RI2, typename BinaryPred,
typename V = typename std::iterator_traits<RI1>::value_type
>
void merge_impl(RI1 first1, RI1 last1,
RI2 first2,
BinaryPred pred)
{
int num = last1 - first1;
int numperthread = num / numthread;

int idx[numthread];
//idx: record the first element index of sorted range
for(int i = 0; i < numthread; ++i){
idx[i] = i * numperthread;
}

//ridx:result index
for(int ridx = 0; ridx != num; ++ridx){
/*int min_element = INT_MAX;
int min_idx = 0;

for(int tidx = 1; tidx != numthread; ++tidx){
if((idx[tidx] < (tidx + 1) * numperthread) && (pred(*(first1 + idx[tidx]), min_element))){
min_element = *(first1 + idx[tidx]);
min_idx = tidx;
}
}


*(first2 + ridx) = *(first1 + idx[min_idx]);
++idx[min_idx];*/

int sz = 0;
V minarr[numthread];

for(int tidx = 0; tidx != numthread; ++tidx){
if((idx[tidx] < (tidx + 1) * numperthread)){
minarr[sz++] = *(first1 + idx[tidx]);
}
}

auto min_iter = std::min_element(minarr, minarr + sz, pred);

*(first2 + ridx) = *min_iter;
++idx[min_iter - minarr];
}
}

template<
std::size_t numthread = 1, typename RI1, typename RI2,
typename V = typename std::iterator_traits<RI1>::value_type,
typename BinaryPred = std::less<V>
>
void mymerge(RI1 first1, RI1 last1,
RI2 first2,
BinaryPred pred = {})
{
using V1 = typename std::iterator_traits<RI1>::value_type;
using V2 = typename std::iterator_traits<RI2>::value_type;

static_assert(std::is_convertible<V1, V2>::value || std::is_convertible<V2, V1>::value,
" value_type doesn't convertible\n");


merge_impl<numthread>(first1, last1,
first2,
pred);
}

这个算法我改良了一下,注释的部分是原文的,但是它只适用于int,为了让它能够与泛型搭配,适用于任何类型(提供了operator<或定制谓词的),用一个容器装符合条件的元素,并用std::min_element得到对应的迭代器,得到其位置和值。
merge算法思路异常简单:

  • 开一个数组idx记录每个线程排序区间的head
  • 对这些head进行比较(满足索引在线程排序区间中),由于不用所谓的min_element记录,所以用一个容器装,用pred谓词进行筛选,被选中的元素加入res数组,并将被选择的head所在的索引前进一位

当然,为了兼容STL,接口风格是\式的,你需要保证dst区间大小至少与src区间一样大,如果是空容器,必须用inserter迭代器适配器

除此之外,我还有两个收获:

  • 如果该位置后面的元素都可以被推断出来,那么可以给该位置放置默认模板参数(非类型或类型),这种称为depending on following arguments的default arguments,如果该位置后面的元素提供了默认模板参数亦然可以,和已推断等效
  • 一开始std::less<>不是这么写的:
    BinaryPred pred = std::less<V>,但是很明显不对,因为此时V是未决类型,编译器此时正在进行模式匹配,从而推断类型,而V并未推断出来,所以编译器不认得,自然报错
    改为模板类型参数,然后默认构造就好了(无state的function object)

主函数部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#define quicksort sort

constexpr int gNumThread = 8;
constexpr int gNumElement = 800000;
constexpr int gNumPerThread = gNumElement / gNumThread;

pthread_barrier_t b;
double nums[gNumElement];
double res[gNumElement];

int main()
{
struct timeval beg, end;


std::default_random_engine dre{};
std::uniform_int_distribution<int> di(0, 9999);
std::uniform_real_distribution<double> dd(0, 9999.0);

for(int i = 0; i != gNumElement; ++i){
nums[i] = dd(dre);
res[i] = 0;
}

gettimeofday(&beg, NULL);
pthread_barrier_init(&b, NULL, gNumThread + 1);

for(int i = 0; i != gNumThread; ++i){
//auto g = MutexGuard{lock};
int idx = i * gNumPerThread;
//printf("idx addr: %p\n", &idx);
//gIdx = i * gNumPerThread;

Thread thr([idx](){
quicksort(nums + idx, nums + idx + gNumPerThread);
pthread_barrier_wait(&b);
},
"thread");

//Thread thr(fun, "thread");
thr.setBarriered(true);
thr.start();
/*pthread_t tid;
TCHECK(pthread_create(&tid, NULL, fun2, (void*)&idx));*/
}

pthread_barrier_wait(&b);
pthread_barrier_destroy(&b);

mymerge<gNumThread>(&*nums, nums + gNumElement,
&*res);

gettimeofday(&end, NULL);

constexpr int useconds = 1000000;
const auto startusec = beg.tv_sec * useconds + beg.tv_usec;
const auto endusec = end.tv_sec * useconds + end.tv_usec;

const auto time = static_cast<double>(endusec - startusec) / useconds;

printf("sort total took %.4f\n", time);

#if PRINT_SWITCH
for(int i = 0; i <= gNumElement; ++i){
if(i % 10 == 0)
printf("%d~%d: ", i + 1, i + 10);

printf("%.3f ", res[i]);

if((i + 1) % 10 == 0)
puts("");
}
#endif
}

经个人实践,C++想要做到线程安全,比如传参,通过pthread_create是不行的,因为C++不允许int->void*(reinterpret_cast)也别想,而通过地址传值,那就意味着很容易被修改,如前面的for循环,idx如果是地址传值,那么下一个循环idx的值就被改了,于是悲剧了,你可能会说加锁,加锁的话,其他线程的创建就会是原子性的,那么多线程的优势直接没了,其实我们需要的就是原子性传参,那么拷贝一份即可,用function object和lambda以拷贝形式封装该state即可(现在的看法)