最近、qsort_r の有効利用から始まり、qsort_b の発見と Apple による C/C++ の Blocks 拡張など、様々な知見が得られたのでまとめておく。
まずは結論を箇条書きにする。
qsort_rは Solaris にはない(POSIX 仕様ではない)。mergesort(BSD由来の実装) は要素のサイズがsizeof(void *)/2より小さいとソートされない(マニュアルにもその旨が明記されている)。よって、charやshort型の配列はエラーとなる。qsort(Mac OS Xの実装) は Snow Leopard 以降は、マニュアルの記述とは異なり、実はイントロソート(クイックソートを再帰レベルに応じたヒープソートにより改良したもの)で実装されている。よって、最悪の場合の計算量は O(n^2) ではなく O(n log n) である。qsort_rも同様。よって、mergesortを使った方が速いという場面はまずないだろう。- Snow Leopard 以降は
heapsort_b,mergesort_b,qsort_b,bsearch_bが導入されており、Blocks 拡張によりこれらを利用できる。よって、qsort_rのよい代替となる。 - C++ STL による
sort(イントロソートによる実装) の方が C のqsortよりも高速である。これは、ソートアルゴリズムに比較関数がインライン展開されるsortと比べて、比較関数を関数ポインタ経由で実行するqsortは不利だからであろう。 - 同様に、C++ STL による
stable_sort(マージソートによる実装) の方がmergesort(BSD由来) よりも、C++ STL によるpartial_sort(ヒープソートによる実装) の方がheapsort(BSD由来) よりも高速である。 - さらに同様に、C++ STL による
binary_search/lower_bound/upper_bound(二分探索) の方がbsearch(ISO C90) よりも、C++ STL によるfind_if(線形検索) の方がlsearch/lfind(BSD由来) よりも高速である。 - 並列マージソートは POSIX threads により自前で
pmergesort_rのように実現可能であり、mergesortよりも並列度 4 の計算機環境で 1.5 倍程度高速となる。それは後述する。 - Blocks 拡張と密接に関連する Mac OS X の GCD(Grand Central Dispatch) が活用されている並列イントロソート
psort,psort_r,psort_bがマルチスレッドに適した計算機環境qsort系の 1.5 〜 2 倍程度高速である。但し、基本的には GCD は「スレッドプール」技術により GCD なしでも自前でpintrosort_rのように実現可能であり、それは後述する。 - 並列マージソートの C++ 版
parallel_stable_sortを自作すると、C++ STL のstable_sortよりも 2 〜 5 倍も高速であり、それも後述する。 - 並列イントロソートの C++ 版
parallel_sortを自作すると、C++ STL のsortよりも 1.5 〜 2.5 倍も高速であり、それも後述する。
まず一度、各種ソートと検索の簡単な例をあげておく。
#include <cstdlib>
#include <search.h>
#include <algorithm>
#include <functional>
using namespace std;
#define T_RCMP(T) ((*(T *)a > *(T *)b) ? -1 : (*(T *)a < *(T *)b) ? 1 : 0)
int d_rcmp(const void *a, const void *b) { return T_RCMP(double); }
int d_rcmp_r(void *thunk, const void *a, const void *b) { return T_RCMP(double); }
int main()
{
int n = 100000;
double *x = new double[n];
int i;
int (^d_rcmp_b)(const void *a, const void *b) = ^(const void *a, const void *b) {
return (*(double *)a > *(double *)b) ? -1 : (*(double *)a < *(double *)b) ? 1 : 0;
};
for (i=0; i<n; i++)
x[i] = (double)random()/((1<<31)-1);
// 以下はすべて、要素doubleの降順ソート
heapsort(x, n, sizeof(x[0]), d_rcmp);
heapsort_b(x, n, sizeof(x[0]), d_rcmp_b);
partial_sort(x, x+n, x+n, greater<double>());
mergesort(x, n, sizeof(x[0]), d_rcmp);
mergesort_b(x, n, sizeof(x[0]), d_rcmp_b);
stable_sort(x, x+n, greater<double>());
qsort(x, n, sizeof(x[0]), d_rcmp);
qsort_r(x, n, sizeof(x[0]), NULL, d_rcmp_r);
qsort_b(x, n, sizeof(x[0]), d_rcmp_b);
sort(x, x+n, greater<double>());
size_t m = n;
double t, *f;
bool b;
t = x[random()%n];
// 以下はすべて、要素doubleの検索
f = (double *)lfind(&t, x, &m, sizeof(x[0]), d_rcmp);
f = find(x, x+n, t);
// 以下はすべて、要素doubleの降順ソート済み検索
f = (double *)bsearch(&t, x, n, sizeof(x[0]), d_rcmp);
f = (double *)bsearch_b(&t, x, n, sizeof(x[0]), d_rcmp_b);
f = lower_bound(x, x+n, t, greater<double>()); // *f == t
f = upper_bound(x, x+n, t, greater<double>()); // *(f-1) == t
b = binary_search(x, x+n, t, greater<double>());
return 0;
}
ここではまだ qsort_r や qsort_b 系を使わざるを得ない例題ではないことに留意しよう。
ここで、d_rcmp_b という関数ポインタらしきもの(アスタリスクではなくハット)がブロック変数であり、それに代入している匿名関数がブロックで、ブロックが定義されたスコープのスタックの複製とともにブロックが実行される。これはまさにクロージャとラムダ式であり、似たようなものとして以下が挙げられる。
- gcc のネストされた関数
- C++0x のラムダ式
- C++ boost の lambda
- C# の匿名デリゲートとラムダ式
もしこういった実装を選択できないのであれば、C の場合は qsort_r のようにコンテキストを渡す変数の追加が必要になるし、C++ の場合は関数オブジェクトを使ってそのメンバ変数にコンテキストを保持する必要がある。
さもなくば、大域変数や局所静的変数を使用する選択肢もあるが、すると再入可能ではなくなってしまい、並列化などが不可能となってしまう。
そのような例題を具体的に考えてみる。
ここで、要素そのものをソートするのではなく、要素のインデックスをソートするものとする。
ソートの条件が複数あり得る場合は往々にしてあり、それらをスレッドで並列化して行う等の効率化も考えられる。よって、大域変数や局所静的変数の使用不可を前提として問題解決にあたる。
まずは結論を箇条書きにする。
- 要素のインデックスのソートは、要素そのもののソートよりも、要素そのもののサイズが大きくない場合には、多少遅くなるがそれは微々たる差に留まる。そもそも、ソートアルゴリズムは「比較」とその結果による「スワップ」から成り、逆に言えば、要素のサイズが大きい場合には、明らかにスワップのコストが高くなるので、要素のインデックスのソートの方がコストが低くなる。
- C には
qsort_rに対応するlsearch_r,lfind_r,bsearch_rが存在しない。よって、これらを自前で実装しないと大域変数等が必要になってしまう。 - 一方、Blocks 拡張の
heapsort_b,mergesort_b,qsort_b,bsearch_bなら要件を満たす実装が可能である。
では解法例を示す。
#include <cstdlib>
#include <search.h>
#include "sort_r.h"
#include <iostream>
#include <algorithm>
#include <functional>
using namespace std;
#define ARI_T_RCMP_R(T) ((((T *)thunk)[*(int *)a] > ((T *)thunk)[*(int *)b]) ? -1 : (((T *)thunk)[*(int *)a] < ((T *)thunk)[*(int *)b]) ? 1 : 0)
int ari_d_rcmp_r(void *thunk, const void *a, const void *b) { return ARI_T_RCMP_R(double); }
template <typename T>
struct ari_rcomp { ari_rcomp(T *p):v(p){} bool operator()(const int &a, const int &b) { return v[a] > v[b]; } private: T *v; };
#define ARI_T_RE_R(T) ((*(T *)k > ((T *)thunk)[*(int *)i]) ? -1 : (*(T *)k < ((T *)thunk)[*(int *)i]) ? 1 : 0)
int ari_d_re_r(void *thunk, const void *k, const void *i) { return ARI_T_RE_R(double); }
template <typename T>
struct ari_rpred { ari_rpred(T *p, T *q):v(p),t(q){} bool operator()(const int &a) { return v[a] == *t; } private: T *v, *t; };
template <typename T>
struct ari_rkomp { ari_rkomp(T *p):v(p){} bool operator()(const int &i, const T &k) { return v[i] > k; } private: T *v; };
template <typename T>
struct ari_rkowp { ari_rkowp(T *p):v(p){} bool operator()(const T &k, const int &i) { return k > v[i]; } private: T *v; };
int main()
{
int n = 100000;
double *x0 = new double[n], *x = new double[n];
int i, *ix = new int[n];
typedef ari_rcomp<double> ari_d_rcomp;
typedef ari_rpred<double> ari_d_rpred;
typedef ari_rkomp<double> ari_d_rkomp;
typedef ari_rkowp<double> ari_d_rkowp;
int (^ari_d_rcmp_b)(const void *a, const void *b) = ^(const void *a, const void *b) {
return (x[*(int *)a] > x[*(int *)b]) ? -1 : (x[*(int *)a] < x[*(int *)b]) ? 1 : 0;
};
int (^ari_d_re_b)(const void *k, const void *i) = ^(const void *k, const void *i) {
return (*(double *)k > x0[*(int *)i]) ? -1 : (*(double *)k < x0[*(int *)i]) ? 1 : 0;
};
for (i=0; i<n; i++)
x[i] = x0[ix[i] = i] = (double)random()/((1<<31)-1);
// 以下はすべて、要素doubleが比較対象のインデックスの降順ソート
heapsort_r(ix, n, sizeof(ix[0]), x,ari_d_rcmp_r); // 自前の実装
heapsort_b(ix, n, sizeof(ix[0]), ari_d_rcmp_b);
partialsort_r(ix, n, n, sizeof(ix[0]), x, ari_d_rcmp_r); // 自前の実装
partial_sort(ix, ix+n, ix+n, ari_d_rcomp(x));
mergesort_r(ix, n, sizeof(ix[0]), x,ari_d_rcmp_r); // 自前の実装
mergesort_b(ix, n, sizeof(ix[0]), ari_d_rcmp_b);
stable_sort(ix, ix+n, ari_d_rcomp(x));
pmergesort_r(ix, n, sizeof(ix[0]), x, ari_d_rcmp_r); // 後述、自前の実装
parallel_stable_sort(ix, ix+n, ari_d_rcomp(x)); // 後述、自前の実装
sort(ix, ix+n, ari_d_rcomp(x));
qsort_r(ix, n,sizeof(ix[0]), x,ari_d_rcmp_r);
qsort_b(ix, n,sizeof(ix[0]), ari_d_rcmp_b);
introsort_r(ix, n, sizeof(ix[0]), x, ari_d_rcmp_r); // 自前の実装
psort_b(ix, n, sizeof(ix[0]), ari_d_rcmp_b); // 後述
pintrosort_r(ix, n, sizeof(ix[0]), x, ari_d_rcmp_r); // 後述、自前の実装
parallel_sort(ix, ix+n, ari_d_rcomp(x)); // 後述、自前の実装
size_t m = n;
double t;
int *g;
bool b;
t = x0[random()%n];
sort(ix, ix+n, ari_d_rcomp(x0));
// 以下はすべて、要素doubleのインデックスの検索
g = find_if(ix, ix+n, ari_d_rpred(x0,&t));
g = (int *)lfind_r(&t, ix, &m, sizeof(ix[0]), x0, ari_d_re_r); // 自前の実装
// 以下はすべて、要素doubleの降順ソート済みインデックス検索
g = (int *)lbsearch_r(&t, ix, n, sizeof(ix[0]), x0, ari_d_re_r); // 自前の実装
g = (int *)ubsearch_r(&t, ix, n, sizeof(ix[0]), x0, ari_d_re_r); // 自前の実装
g = (int *)bsearch_r(&t, ix, n, sizeof(ix[0]), x0, ari_d_re_r); // 自前の実装
g = (int *)bsearch_b(&t, ix, n, sizeof(ix[0]), ari_d_re_b);
g = lower_bound(ix, ix+n, t, ari_d_rkomp(x0)); // x0[*g] == t
g = upper_bound(ix, ix+n, t, ari_d_rkowp(x0)); // x0[*(g-1)] == t
return 0;
}
このように、大域変数等を使わずに、要素のインデックスのソートおよび検索が可能となっている。
こうしてみると、Apple による C/C++ の Blocks 拡張は、C++ STL と関数オブジェクトに比べて効率は劣るものの、極めて有用性が高いと言える。それは、POSIX threads や qsort_r のような仕様、つまり、コンテキストを渡せる実装が既になされているのであれば必要性はそれほど感じられないが、Blocks 拡張の利用は極めて容易なので、開発効率を鑑みると Blocks 拡張に対応した手続きを積極的に利用する利点は大いにある。と言うのも、less `grep -l __BLOCKS__ /usr/include/*` で調べてみると、他にも Blocks 拡張された手続きが数多く存在するし、今後 Blocks 拡張が C/C++ にて標準化されることになれば、さらに有用性が高くなるだろう。
ちなみに、自前の実装としてあげた heapsort_r, partialsort_r, mergesort_r, introsort_r, lsearch_r/lfind_r, bsearch_r/lbsearch_r/ubsearch_r、そして、pmergesort_r, pintrosort_r, は sort_r-20100723.tar.bz2 の sort_r.h, sort_r.c から利用できる。
また、自前の実装として挙げた parallel_stable_sort, parallel_sort も sort_r-20100723.tar.bz2 の sort.hpp から利用できる。
また、ベンチマーク結果(要素数n:100000, 10回の平均の秒数)を以下に示す。
| 要素そのもののソートと検索 | 秒 |
| heapsort(x,n,sizeof(x[0]),d_rcmp) | 0.0249047 |
| heapsort_r(x,n,sizeof(x[0]),0,d_rcmp_r) | 0.01671 |
| heapsort_b(x,n,sizeof(x[0]),d_rcmp_b) | 0.0275569 |
| partialsort_r(x,n,n,sizeof(x[0]),0,d_rcmp_r) | 0.0164991 |
| partial_sort(x,x+n,x+n,greater<double>()) | 0.0114346 |
| mergesort(x,n,sizeof(x[0]),d_rcmp) | 0.0176533 |
| mergesort_r(x,n,sizeof(x[0]),0,d_rcmp_r) | 0.0130815 |
| mergesort_b(x,n,sizeof(x[0]),d_rcmp_b) | 0.0178412 |
| stable_sort(x,x+n,greater<double>()) | 0.00890727 |
| pmergesort_r(x,n,sizeof(x[0]),0,d_rcmp_r) | 0.0075165 |
| parallel_stable_sort(x,x+n,greater<double>()) | 0.00198088 |
| qsort(x,n,sizeof(x[0]),d_rcmp) | 0.0144886 |
| qsort_r(x,n,sizeof(x[0]),0,d_rcmp_r) | 0.01352 |
| qsort_b(x,n,sizeof(x[0]),d_rcmp_b) | 0.0147258 |
| sort(x,x+n,greater<double>()) | 0.00741098 |
| introsort_r(x,n,sizeof(x[0]),0,d_rcmp_r) | 0.0141188 |
| pintrosort_r(x,n,sizeof(x[0]),0,d_rcmp_r) | 0.0067838 |
| psort_b(x,n,sizeof(x[0]),d_rcmp_b) | 0.00691638 |
| parallel_sort(x,x+n,greater<double>()) | 0.00325432 |
| f=(double *)lfind(&t,x0,&m,sizeof(x0[0]),d_rcmp) | 0.000134635 |
| f=(double *)lfind_r(&t,x0,&m,sizeof(x0[0]),0,d_rcmp_r) | 0.000140667 |
| f=find(x0,x0+n,t) | 2.40564e-05 |
| f=(double *)bsearch(&t,x,n,sizeof(x[0]),d_rcmp) | 6.91414e-07 |
| f=(double *)lbsearch_r(&t,x,n,sizeof(x[0]),0,d_rcmp_r) | 8.10623e-07 |
| f=(double *)ubsearch_r(&t,x,n,sizeof(x[0]),0,d_rcmp_r) | 6.67572e-07 |
| f=(double *)bsearch_r(&t,x,n,sizeof(x[0]),0,d_rcmp_r) | 7.86781e-07 |
| f=(double *)bsearch_b(&t,x,n,sizeof(x[0]),d_rcmp_b) | 6.19888e-07 |
| f=lower_bound(x,x+n,t,greater<double>()) | 6.4373e-07 |
| f=upper_bound(x,x+n,t,greater<double>()) | 7.15256e-07 |
| b=binary_search(x,x+n,t,greater<double>()) | 7.15256e-07 |
| 要素のインデックスのソートと検索 | 秒 |
| heapsort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) | 0.0226928 |
| heapsort_b(ix,n,sizeof(ix[0]),ari_d_rcmp_b) | 0.0308669 |
| partialsort_r(ix,n,n,sizeof(ix[0]),x,ari_d_rcmp_r) | 0.0227648 |
| partial_sort(ix,ix+n,ix+n,ari_d_rcomp(x)) | 0.0194682 |
| mergesort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) | 0.0147787 |
| mergesort_b(ix,n,sizeof(ix[0]),ari_d_rcmp_b) | 0.0201805 |
| stable_sort(ix,ix+n,ari_d_rcomp(x)) | 0.0112257 |
| pmergesort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) | 0.00885935 |
| parallel_stable_sort(ix,ix+n,ari_d_rcomp(x)) | 0.00239668 |
| sort(ix,ix+n,ari_d_rcomp(x)) | 0.0099478 |
| qsort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) | 0.0160118 |
| qsort_b(ix,n,sizeof(ix[0]),ari_d_rcmp_b) | 0.0191082 |
| introsort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) | 0.0167039 |
| pintrosort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) | 0.00834651 |
| psort_b(ix,n,sizeof(ix[0]),ari_d_rcmp_b) | 0.00909259 |
| parallel_sort(ix,ix+n,ari_d_rcomp(x)) | 0.00449612 |
| g=find_if(ix,ix+n,ari_d_rpred(x0,&t)) | 0.000172186 |
| g=(int *)lfind_r(&t,ix,&m,sizeof(ix[0]),x0,ari_d_re_r) | 0.000410461 |
| g=(int *)lbsearch_r(&t,ix,n,sizeof(ix[0]),x0,ari_d_re_r) | 7.39098e-07 |
| g=(int *)ubsearch_r(&t,ix,n,sizeof(ix[0]),x0,ari_d_re_r) | 7.15256e-07 |
| g=(int *)bsearch_r(&t,ix,n,sizeof(ix[0]),x0,ari_d_re_r) | 7.39098e-07 |
| g=(int *)bsearch_b(&t,ix,n,sizeof(ix[0]),ari_d_re_b) | 5.72205e-07 |
| g=lower_bound(ix,ix+n,t,ari_d_rkomp(x0)) | 6.91414e-07 |
| g=upper_bound(ix,ix+n,t,ari_d_rkowp(x0)) | 6.4373e-07 |
話題は多少それるが、部分ソートについても知見が得られた。
まずは結論を箇条書きにする。
- C++ STL の
partial_sortに相当するものが C にはない。よってpartialsort_rを実装した。 - C++ STL による
partial_sort(ヒープソートによる実装) の方が C のpartialsort_r(ヒープソートによる実装)よりも高速である。これは、ソートアルゴリズムに比較関数がインライン展開されるpartial_sortと比べて、比較関数を関数ポインタ経由で実行するpartialsort_rは不利だからであろう。 partialsort_rはqsortと比べて、部分ソートの上位数が何割程度までなら高速だろうか、同様に、partial_sortはsortと比べて、部分ソートの上位数が何割程度までなら高速だろうか。結論としては、上位数が2割程度までなら部分ソートを採用する価値がある。それ以上なら、全体をソートしてしまった方が速い。
以下に、ベンチマーク結果(要素数n:100000, 上位数m:100000r, 10回の平均の秒数)を以下に示す。
| 部分ソート | 割合r | 秒数 | qsort,sortの秒数 |
| partialsort_r(x,m,n,sizeof(x[0]),0,c_rcmp_r) | 0.18553 | 0.00579057 | 0.00579412 |
| partial_sort(x,x+m,x+n,greater<char>()) | 0.16153 | 0.00358579 | 0.00358541 |
| partialsort_r(ix,m,n,sizeof(ix[0]),x,ari_c_rcmp_r) | 0.1516 | 0.00591381 | 0.00591898 |
| partial_sort(ix,ix+m,ix+n,ari_c_rcomp(x)) | 0.17991 | 0.0048316 | 0.00484018 |
| partialsort_r(x,m,n,sizeof(x[0]),0,s_rcmp_r) | 0.69863 | 0.0134328 | 0.0134347 |
| partial_sort(x,x+m,x+n,greater<short>()) | 0.30108 | 0.00620561 | 0.0062124 |
| partialsort_r(ix,m,n,sizeof(ix[0]),x,ari_s_rcmp_r) | 0.40503 | 0.0131619 | 0.0131641 |
| partial_sort(ix,ix+m,ix+n,ari_s_rcomp(x)) | 0.27234 | 0.00782237 | 0.00782158 |
| partialsort_r(x,m,n,sizeof(x[0]),0,i_rcmp_r) | 0.90588 | 0.0148749 | 0.0148832 |
| partial_sort(x,x+m,x+n,greater<int>()) | 0.27111 | 0.00592699 | 0.00592942 |
| partialsort_r(ix,m,n,sizeof(ix[0]),x,ari_i_rcmp_r) | 0.38938 | 0.014343 | 0.0143554 |
| partial_sort(ix,ix+m,ix+n,ari_i_rcomp(x)) | 0.27026 | 0.00848997 | 0.00849326 |
| partialsort_r(x,m,n,sizeof(x[0]),0,d_rcmp_r) | 0.59266 | 0.0144848 | 0.0144933 |
| partial_sort(x,x+m,x+n,greater<double>()) | 0.31718 | 0.0074177 | 0.00740693 |
| partialsort_r(ix,m,n,sizeof(ix[0]),x,ari_d_rcmp_r) | 0.37416 | 0.015897 | 0.015906 |
| partial_sort(ix,ix+m,ix+n,ari_d_rcomp(x)) | 0.26319 | 0.00990717 | 0.00991831 |
さて、後述とした、並列マージソート pmergesort_r は POSIX threads により実現可能であり、mergesort よりも並列度 4 の計算機環境で 1.5 倍程度高速となる。並列度 2 の計算機環境では速度向上は見られない。
さらに、後述とした、並列イントロソート psort, psort_r, psort_b は、GCD 環境下かつマルチスレッドに適した計算機環境では、qsort 系の 1.5 〜 2 倍程度高速である。しかし、GCD は基本的には「スレッドプール」技術を OS が支援する機構であるので、それがサポートされない場面では、自前で「スレッドプール」を用意すればよい。
この目的に即した「スレッドプール」を POSIX threads で必要十分に実現するコードは以下の通りである。
/*
thread_pool.c
Copyright (C) 2010 Taiji Yamada <taiji@aihara.co.jp>
Distributed under the Boost Software License, Version 1.0. See
http://www.boost.org/LICENSE_1_0.txt for more details.
References:
[1] http://docs.sun.com/app/docs/doc/819-0390/, 2008.
*/
#include <pthread.h>
typedef struct thread_pool {
struct pool_job {
void *(*func)(void *);
void *arg;
struct pool_job *next;
} *job_head, *job_tail;
pthread_t *thrs;
size_t n_thread, c_idle, c_job;
pthread_mutex_t mutex;
pthread_cond_t cond_work, cond_wait;
unsigned long flags;
} thread_pool_t;
static
void *thread_pool_work(void *arg)
{
struct thread_pool *pool = (struct thread_pool *)arg;
struct pool_job *job;
pthread_mutex_lock(&pool->mutex);
do {
pool->c_idle++;
while (!pool->job_head && !(pool->flags & (1UL<<1)))
pthread_cond_wait(&pool->cond_work, &pool->mutex);
pool->c_idle--;
if (pool->job_head) {
struct pool_job the_job = *(job = pool->job_head);
pool->job_head = job->next;
if (job == pool->job_tail)
pool->job_tail = NULL;
pthread_mutex_unlock(&pool->mutex);
free(job);
the_job.func(the_job.arg);
pthread_mutex_lock(&pool->mutex);
pool->c_job--;
}
if ((pool->flags & (1UL<<0)) && !pool->job_head) {
pool->flags &= ~(1UL<<0);
pthread_cond_broadcast(&pool->cond_wait);
}
} while (!(pool->flags & (1UL<<1)));
pthread_mutex_unlock(&pool->mutex);
return NULL;
}
thread_pool_t *thread_pool_create(int n_thread, pthread_attr_t *attr)
{
thread_pool_t *pool;
int i;
if (!(pool = malloc(sizeof(thread_pool_t)))) return NULL;
pool->job_head = pool->job_tail = NULL;
pthread_mutex_init(&pool->mutex, NULL);
pthread_cond_init(&pool->cond_work, NULL);
pthread_cond_init(&pool->cond_wait, NULL);
if (!(pool->thrs = malloc(sizeof(pthread_t)*(pool->n_thread=n_thread)))) { free(pool); return NULL; }
pool->c_idle = pool->c_job = 0;
pool->flags = 0;
for (i=0; i<n_thread; i++)
pthread_create(&pool->thrs[i], attr, thread_pool_work, pool);
return pool;
}
int thread_pool_queue(thread_pool_t *pool, void *(*func)(void *), void *arg)
{
struct pool_job *job;
if (!(job = malloc(sizeof(struct pool_job)))) return 1;
job->func = func;
job->arg = arg;
job->next = NULL;
pthread_mutex_lock(&pool->mutex);
if (!pool->job_head)
pool->job_head = job;
else
pool->job_tail->next = job;
pool->job_tail = job;
pool->c_job++;
if (pool->c_idle > 0)
pthread_cond_signal(&pool->cond_work);
pthread_mutex_unlock(&pool->mutex);
return 0;
}
void thread_pool_wait(thread_pool_t *pool)
{
pthread_mutex_lock(&pool->mutex);
while (pool->c_job) {
pool->flags |= (1UL<<0);
pthread_cond_wait(&pool->cond_wait, &pool->mutex);
}
pthread_mutex_unlock(&pool->mutex);
}
void thread_pool_join(thread_pool_t *pool)
{
int i;
pthread_mutex_lock(&pool->mutex);
while (pool->c_job) {
pool->flags |= (1UL<<0);
pthread_cond_wait(&pool->cond_wait, &pool->mutex);
}
pool->flags |= (1UL<<1);
pthread_cond_broadcast(&pool->cond_work);
pthread_mutex_unlock(&pool->mutex);
for (i=0; i<pool->n_thread; i++)
pthread_join(pool->thrs[i], NULL);
free(pool->thrs);
free(pool);
}
ちなみに、上記コメントで参考文献とした Sun のスレッドプール実装ではクイックソート/イントロソートの並列化は動作しない。それは、ジョブが再帰的に新たなジョブを生成するので、それを捕捉できないからである。上記コードはこのような再帰的なジョブの生成に対応してある。
よって、上記のこの自前の「スレッドプール」を利用した並列化されたイントロソート pintrosort_r は、GCD の psort 系とほぼ同等に高速となる。但し、ワーカースレッドがあらかじめ起動されている psort 系と比べて多少遅くなるときもあるがそれは微々たる差に留まる。よって、GCD が提供されていない環境では pintrosort_r は極めて有用であろう。
さて、後述とした、並列マージソートの C++ 版 parallel_stable_sort は boost thread により実現可能であり、マルチスレッドに適した計算機環境では、stable_sort よりも 2 〜 5 倍も高速となる。
さらに、後述とした、並列イントロソートの C++ 版 parallel_sort は boost threadpool もしくは自作のスレッドプールにより実現可能であり、マルチスレッドに適した計算機環境では、sort の 1.5 〜 2.5 倍も高速である。
この目的に即した「スレッドプール」を boost thread で必要十分に実現するコードは以下の通りである。
/*
threadpool.hpp
Copyright (C) 2010 Taiji Yamada <taiji@aihara.co.jp>
Distributed under the Boost Software License, Version 1.0. See
http://www.boost.org/LICENSE_1_0.txt for more details.
References:
[1] http://threadpool.sourceforge.net/, 2005-2008.
[2] http://docs.sun.com/app/docs/doc/819-0390/, 2008.
*/
#include <boost/thread.hpp>
#include <queue>
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <boost/thread/condition.hpp>
class threadpool {
typedef boost::function0<void> job_type;
std::queue<job_type> jobs;
boost::thread_group thrs;
int c_idle, c_job;
bool flag_wait, flag_quit;
boost::mutex mutex;
boost::condition cond_work, cond_wait;
void work()
{
do {
job_type job = 0;
{
boost::mutex::scoped_lock lock(mutex);
++c_idle;
while (jobs.empty() && !flag_quit)
cond_work.wait(lock);
--c_idle;
if (!jobs.empty()) {
job = jobs.front();
jobs.pop();
}
}
if (job)
job();
{
boost::mutex::scoped_lock lock(mutex);
if (job)
--c_job;
if (flag_wait && jobs.empty()) {
flag_wait = false;
cond_wait.notify_all();
}
}
} while (!flag_quit);
}
public:
threadpool(size_t size) : c_idle(0), c_job(0), flag_wait(false), flag_quit(false)
{
for (size_t i=0; i<size; i++)
thrs.create_thread(boost::bind(&threadpool::work, this));
}
void push(job_type job)
{
boost::mutex::scoped_lock lock(mutex);
jobs.push(job);
++c_job;
if (c_idle > 0)
cond_work.notify_one();
}
void wait()
{
boost::mutex::scoped_lock lock(mutex);
while (c_job) {
flag_wait = true;
cond_wait.wait(lock);
}
}
void join()
{
{
boost::mutex::scoped_lock lock(mutex);
while (c_job) {
flag_wait = true;
cond_wait.wait(lock);
}
flag_quit = true;
cond_work.notify_all();
}
thrs.join_all();
}
};
この実装よりも、boost threadpool の方が若干性能が高い。
注意事項としては、GCD によらない並列マージソートおよび並列イントロソートは、スレッド処理のオーバーヘッドにより、要素数が数万程度より少ないと多少遅くなる傾向がある。GCD による psort 系はさすがにそういったオーバーヘッドが見られない。よって、GCD の様々な処理系への移植が期待される。