Concurrent Sort
并发排序
在《APUE》中pthread_barrier处有一个例子是开8个线程对80万个元素进行排序,这是我第一次接触并发算法,感觉很有趣于是记录一下。
思路是这样的:每个线程对“待排序元素个数/线程数”个元素进行进行排序(局部排序),然后最后通过一个合并算法来统合所有元素。
算法如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69template<
	std::size_t numthread = 1, typename RI1, typename RI2, typename BinaryPred,
	typename V = typename std::iterator_traits<RI1>::value_type
	>
void merge_impl(RI1 first1, RI1 last1,
		RI2 first2,
		BinaryPred pred)
{
	int num = last1 - first1;
	int numperthread = num / numthread;
	int idx[numthread];
	//idx: record the first element index of sorted range
	for(int i = 0; i < numthread; ++i){
		idx[i] = i * numperthread;
	}
	//ridx:result index 
	for(int ridx = 0; ridx != num; ++ridx){
		/*int min_element = INT_MAX;
		int min_idx = 0;
		for(int tidx = 1; tidx != numthread; ++tidx){
			if((idx[tidx] < (tidx + 1) * numperthread) && (pred(*(first1 + idx[tidx]), min_element))){
				min_element = *(first1 + idx[tidx]);
				min_idx = tidx;
			}
		}
		
		*(first2 + ridx) = *(first1 + idx[min_idx]);
		++idx[min_idx];*/
		int sz = 0;
		V minarr[numthread];
		for(int tidx = 0; tidx != numthread; ++tidx){
			if((idx[tidx] < (tidx + 1) * numperthread)){
				minarr[sz++] = *(first1 + idx[tidx]);
			}
		}
		auto min_iter = std::min_element(minarr, minarr + sz, pred);
		*(first2 + ridx) = *min_iter;
		++idx[min_iter - minarr];
	}
}
template<
	std::size_t numthread = 1, typename RI1, typename RI2,
        typename V = typename std::iterator_traits<RI1>::value_type,
	typename BinaryPred = std::less<V>
	>
void mymerge(RI1 first1, RI1 last1, 
	RI2 first2,
	BinaryPred pred = {})
{
	using V1 = typename std::iterator_traits<RI1>::value_type;
	using V2 = typename std::iterator_traits<RI2>::value_type;
	
	static_assert(std::is_convertible<V1, V2>::value || std::is_convertible<V2, V1>::value, 
				  " value_type doesn't convertible\n");
	merge_impl<numthread>(first1, last1, 
						  first2, 
						  pred);
}
这个算法我改良了一下,注释的部分是原文的,但是它只适用于int,为了让它能够与泛型搭配,适用于任何类型(提供了operator<或定制谓词的),用一个容器装符合条件的元素,并用std::min_element得到对应的迭代器,得到其位置和值。
merge算法思路异常简单:
- 开一个数组idx记录每个线程排序区间的head
 - 对这些head进行比较(满足索引在线程排序区间中),由于不用所谓的min_element记录,所以用一个容器装,用pred谓词进行筛选,被选中的元素加入res数组,并将被选择的head所在的索引前进一位
 
当然,为了兼容STL,接口风格是\
除此之外,我还有两个收获:
- 如果该位置后面的元素都可以被推断出来,那么可以给该位置放置默认模板参数(非类型或类型),这种称为
depending on following arguments的default arguments,如果该位置后面的元素提供了默认模板参数亦然可以,和已推断等效 - 一开始std::less<>不是这么写的:
BinaryPred pred = std::less<V>,但是很明显不对,因为此时V是未决类型,编译器此时正在进行模式匹配,从而推断类型,而V并未推断出来,所以编译器不认得,自然报错
改为模板类型参数,然后默认构造就好了(无state的function object) 
主函数部分:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
constexpr int gNumThread = 8;
constexpr int gNumElement = 800000;
constexpr int gNumPerThread = gNumElement / gNumThread;
pthread_barrier_t b;
double nums[gNumElement];
double res[gNumElement];
int main()
{
	struct timeval beg, end;
	std::default_random_engine dre{};
	std::uniform_int_distribution<int> di(0, 9999);
	std::uniform_real_distribution<double> dd(0, 9999.0);
	
	for(int i = 0; i != gNumElement; ++i){
		nums[i] = dd(dre);
		res[i] = 0;
	}
	gettimeofday(&beg, NULL);
	pthread_barrier_init(&b, NULL, gNumThread + 1);
	
	for(int i = 0; i != gNumThread; ++i){
		//auto g = MutexGuard{lock};
		int idx = i * gNumPerThread;
		//printf("idx addr: %p\n", &idx);
		//gIdx = i * gNumPerThread;
		Thread thr([idx](){
						quicksort(nums + idx, nums + idx + gNumPerThread);
						pthread_barrier_wait(&b);
					},
				   "thread");
		//Thread thr(fun, "thread");
		thr.setBarriered(true);
		thr.start();
		/*pthread_t tid;
		TCHECK(pthread_create(&tid, NULL, fun2, (void*)&idx));*/
	}
	
	pthread_barrier_wait(&b);
	pthread_barrier_destroy(&b);
	mymerge<gNumThread>(&*nums, nums + gNumElement, 
						&*res);
	
	gettimeofday(&end, NULL);
	
	constexpr int useconds = 1000000;
	const auto startusec = beg.tv_sec * useconds + beg.tv_usec;
	const auto endusec = end.tv_sec * useconds + end.tv_usec;
	const auto time = static_cast<double>(endusec - startusec) / useconds;
	printf("sort total took %.4f\n", time);
	for(int i = 0; i <= gNumElement; ++i){
		if(i % 10 == 0)
			printf("%d~%d: ", i + 1, i + 10);
		printf("%.3f ", res[i]);
		
		if((i + 1) % 10 == 0)
			puts("");
	}
}
经个人实践,C++想要做到线程安全,比如传参,通过pthread_create是不行的,因为C++不允许int->void*(reinterpret_cast)也别想,而通过地址传值,那就意味着很容易被修改,如前面的for循环,idx如果是地址传值,那么下一个循环idx的值就被改了,于是悲剧了,你可能会说加锁,加锁的话,其他线程的创建就会是原子性的,那么多线程的优势直接没了,其实我们需要的就是原子性传参,那么拷贝一份即可,用function object和lambda以拷贝形式封装该state即可(现在的看法)






