再考—ソートと検索 in C/C++

ID: 12
creation date: 2010/06/15 23:06
modification date: 2010/07/23 17:58
owner: taiji
tags: qsort_r, qsort_b, C, C++, Blocks

最近、qsort_r の有効利用から始まり、qsort_b の発見と Apple による C/C++ の Blocks 拡張など、様々な知見が得られたのでまとめておく。

まずは結論を箇条書きにする。

  • qsort_r は Solaris にはない(POSIX 仕様ではない)。
  • mergesort (BSD由来の実装) は要素のサイズが sizeof(void *)/2 より小さいとソートされない(マニュアルにもその旨が明記されている)。よって、charshort 型の配列はエラーとなる。
  • qsort (Mac OS Xの実装) は Snow Leopard 以降は、マニュアルの記述とは異なり、実はイントロソート(クイックソートを再帰レベルに応じたヒープソートにより改良したもの)で実装されている。よって、最悪の場合の計算量は O(n^2) ではなく O(n log n) である。qsort_r も同様。よって、mergesort を使った方が速いという場面はまずないだろう。
  • Snow Leopard 以降は heapsort_b, mergesort_b, qsort_b, bsearch_b が導入されており、Blocks 拡張によりこれらを利用できる。よって、qsort_r のよい代替となる。
  • C++ STL による sort (イントロソートによる実装) の方が C の qsort よりも高速である。これは、ソートアルゴリズムに比較関数がインライン展開される sort と比べて、比較関数を関数ポインタ経由で実行する qsort は不利だからであろう。
  • 同様に、C++ STL による stable_sort (マージソートによる実装) の方が mergesort (BSD由来) よりも、C++ STL による partial_sort (ヒープソートによる実装) の方が heapsort (BSD由来) よりも高速である。
  • さらに同様に、C++ STL による binary_search/lower_bound/upper_bound (二分探索) の方が bsearch (ISO C90) よりも、C++ STL による find_if (線形検索) の方が lsearch/lfind (BSD由来) よりも高速である。
  • 並列マージソートは POSIX threads により自前で pmergesort_r のように実現可能であり、mergesort よりも並列度 4 の計算機環境で 1.5 倍程度高速となる。それは後述する
  • Blocks 拡張と密接に関連する Mac OS X の GCD(Grand Central Dispatch) が活用されている並列イントロソート psort, psort_r, psort_b がマルチスレッドに適した計算機環境 qsort 系の 1.5 〜 2 倍程度高速である。但し、基本的には GCD は「スレッドプール」技術により GCD なしでも自前で pintrosort_r のように実現可能であり、それは後述する
  • 並列マージソートの C++ 版 parallel_stable_sort を自作すると、C++ STL の stable_sort よりも 2 〜 5 倍も高速であり、それも後述する。
  • 並列イントロソートの C++ 版 parallel_sort を自作すると、C++ STL の sort よりも 1.5 〜 2.5 倍も高速であり、それも後述する。

まず一度、各種ソートと検索の簡単な例をあげておく。

#include <cstdlib>
#include <search.h>
#include <algorithm>
#include <functional>
using namespace std;

#define T_RCMP(T) ((*(T *)a > *(T *)b) ? -1 : (*(T *)a < *(T *)b) ? 1 : 0)
int d_rcmp(const void *a, const void *b) { return T_RCMP(double); }
int d_rcmp_r(void *thunk, const void *a, const void *b) { return T_RCMP(double); }

int main()
{
  int n = 100000;
  double *x = new double[n];
  int i;
  int (^d_rcmp_b)(const void *a, const void *b) = ^(const void *a, const void *b) {
    return (*(double *)a > *(double *)b) ? -1 : (*(double *)a < *(double *)b) ? 1 : 0;
  };

  for (i=0; i<n; i++)
    x[i] = (double)random()/((1<<31)-1);

  // 以下はすべて、要素doubleの降順ソート
  heapsort(x, n, sizeof(x[0]), d_rcmp);
  heapsort_b(x, n, sizeof(x[0]), d_rcmp_b);
  partial_sort(x, x+n, x+n, greater<double>());

  mergesort(x, n, sizeof(x[0]), d_rcmp);
  mergesort_b(x, n, sizeof(x[0]), d_rcmp_b);
  stable_sort(x, x+n, greater<double>());

  qsort(x, n, sizeof(x[0]), d_rcmp);
  qsort_r(x, n, sizeof(x[0]), NULL, d_rcmp_r);
  qsort_b(x, n, sizeof(x[0]), d_rcmp_b);
  sort(x, x+n, greater<double>());

  size_t m = n;
  double t, *f;
  bool b;

  t = x[random()%n];

  // 以下はすべて、要素doubleの検索
  f = (double *)lfind(&t, x, &m, sizeof(x[0]), d_rcmp);
  f = find(x, x+n, t);
  // 以下はすべて、要素doubleの降順ソート済み検索
  f = (double *)bsearch(&t, x, n, sizeof(x[0]), d_rcmp);
  f = (double *)bsearch_b(&t, x, n, sizeof(x[0]), d_rcmp_b);
  f = lower_bound(x, x+n, t, greater<double>());	// *f == t
  f = upper_bound(x, x+n, t, greater<double>());	// *(f-1) == t
  b = binary_search(x, x+n, t, greater<double>());

  return 0;
}

ここではまだ qsort_rqsort_b 系を使わざるを得ない例題ではないことに留意しよう。

ここで、d_rcmp_b という関数ポインタらしきもの(アスタリスクではなくハット)がブロック変数であり、それに代入している匿名関数がブロックで、ブロックが定義されたスコープのスタックの複製とともにブロックが実行される。これはまさにクロージャラムダ式であり、似たようなものとして以下が挙げられる。

もしこういった実装を選択できないのであれば、C の場合は qsort_r のようにコンテキストを渡す変数の追加が必要になるし、C++ の場合は関数オブジェクトを使ってそのメンバ変数にコンテキストを保持する必要がある。

さもなくば、大域変数や局所静的変数を使用する選択肢もあるが、すると再入可能ではなくなってしまい、並列化などが不可能となってしまう

そのような例題を具体的に考えてみる。

----

ここで、要素そのものをソートするのではなく、要素のインデックスをソートするものとする。

ソートの条件が複数あり得る場合は往々にしてあり、それらをスレッドで並列化して行う等の効率化も考えられる。よって、大域変数や局所静的変数の使用不可を前提として問題解決にあたる。

まずは結論を箇条書きにする。

  • 要素のインデックスのソートは、要素そのもののソートよりも、要素そのもののサイズが大きくない場合には、多少遅くなるがそれは微々たる差に留まる。そもそも、ソートアルゴリズムは「比較」とその結果による「スワップ」から成り、逆に言えば、要素のサイズが大きい場合には、明らかにスワップのコストが高くなるので、要素のインデックスのソートの方がコストが低くなる。
  • C には qsort_r に対応する lsearch_r, lfind_r, bsearch_r が存在しない。よって、これらを自前で実装しないと大域変数等が必要になってしまう。
  • 一方、Blocks 拡張の heapsort_b, mergesort_b, qsort_b, bsearch_b なら要件を満たす実装が可能である。

では解法例を示す。

#include <cstdlib>
#include <search.h>
#include "sort_r.h"
#include <iostream>
#include <algorithm>
#include <functional>
using namespace std;

#define ARI_T_RCMP_R(T) ((((T *)thunk)[*(int *)a] > ((T *)thunk)[*(int *)b]) ? -1 : (((T *)thunk)[*(int *)a] < ((T *)thunk)[*(int *)b]) ? 1 : 0)
int ari_d_rcmp_r(void *thunk, const void *a, const void *b) { return ARI_T_RCMP_R(double); }

template <typename T>
struct ari_rcomp { ari_rcomp(T *p):v(p){} bool operator()(const int &a, const int &b) { return v[a] > v[b]; } private: T *v; };

#define ARI_T_RE_R(T) ((*(T *)k > ((T *)thunk)[*(int *)i]) ? -1 : (*(T *)k < ((T *)thunk)[*(int *)i]) ? 1 : 0)
int ari_d_re_r(void *thunk, const void *k, const void *i) { return ARI_T_RE_R(double); }

template <typename T>
struct ari_rpred { ari_rpred(T *p, T *q):v(p),t(q){} bool operator()(const int &a) { return v[a] == *t; } private: T *v, *t; };

template <typename T>
struct ari_rkomp { ari_rkomp(T *p):v(p){} bool operator()(const int &i, const T &k) { return v[i] > k; } private: T *v; };

template <typename T>
struct ari_rkowp { ari_rkowp(T *p):v(p){} bool operator()(const T &k, const int &i) { return k > v[i]; } private: T *v; };

int main()
{
  int n = 100000;
  double *x0 = new double[n], *x = new double[n];
  int i, *ix = new int[n];
  typedef ari_rcomp<double> ari_d_rcomp;
  typedef ari_rpred<double> ari_d_rpred;
  typedef ari_rkomp<double> ari_d_rkomp;
  typedef ari_rkowp<double> ari_d_rkowp;
  int (^ari_d_rcmp_b)(const void *a, const void *b) = ^(const void *a, const void *b) {
    return (x[*(int *)a] > x[*(int *)b]) ? -1 : (x[*(int *)a] < x[*(int *)b]) ? 1 : 0;
  };
  int (^ari_d_re_b)(const void *k, const void *i) = ^(const void *k, const void *i) {
    return (*(double *)k > x0[*(int *)i]) ? -1 : (*(double *)k < x0[*(int *)i]) ? 1 : 0;
  };

  for (i=0; i<n; i++)
    x[i] = x0[ix[i] = i] = (double)random()/((1<<31)-1);

  // 以下はすべて、要素doubleが比較対象のインデックスの降順ソート
  heapsort_r(ix, n, sizeof(ix[0]), x,ari_d_rcmp_r);		// 自前の実装
  heapsort_b(ix, n, sizeof(ix[0]), ari_d_rcmp_b);
  partialsort_r(ix, n, n, sizeof(ix[0]), x, ari_d_rcmp_r);	// 自前の実装
  partial_sort(ix, ix+n, ix+n, ari_d_rcomp(x));

  mergesort_r(ix, n, sizeof(ix[0]), x,ari_d_rcmp_r);		// 自前の実装
  mergesort_b(ix, n, sizeof(ix[0]), ari_d_rcmp_b);
  stable_sort(ix, ix+n, ari_d_rcomp(x));
  pmergesort_r(ix, n, sizeof(ix[0]), x, ari_d_rcmp_r);		// 後述、自前の実装
  parallel_stable_sort(ix, ix+n, ari_d_rcomp(x));		// 後述、自前の実装

  sort(ix, ix+n, ari_d_rcomp(x));
  qsort_r(ix, n,sizeof(ix[0]), x,ari_d_rcmp_r);
  qsort_b(ix, n,sizeof(ix[0]), ari_d_rcmp_b);

  introsort_r(ix, n, sizeof(ix[0]), x, ari_d_rcmp_r);		// 自前の実装

  psort_b(ix, n, sizeof(ix[0]), ari_d_rcmp_b);			// 後述
  pintrosort_r(ix, n, sizeof(ix[0]), x, ari_d_rcmp_r);		// 後述、自前の実装
  parallel_sort(ix, ix+n, ari_d_rcomp(x));			// 後述、自前の実装

  size_t m = n;
  double t;
  int *g;
  bool b;

  t = x0[random()%n];
  sort(ix, ix+n, ari_d_rcomp(x0));

  // 以下はすべて、要素doubleのインデックスの検索
  g = find_if(ix, ix+n, ari_d_rpred(x0,&t));
  g = (int *)lfind_r(&t, ix, &m, sizeof(ix[0]), x0, ari_d_re_r);	// 自前の実装
  // 以下はすべて、要素doubleの降順ソート済みインデックス検索
  g = (int *)lbsearch_r(&t, ix, n, sizeof(ix[0]), x0, ari_d_re_r);	// 自前の実装
  g = (int *)ubsearch_r(&t, ix, n, sizeof(ix[0]), x0, ari_d_re_r);	// 自前の実装
  g = (int *)bsearch_r(&t, ix, n, sizeof(ix[0]), x0, ari_d_re_r);	// 自前の実装
  g = (int *)bsearch_b(&t, ix, n, sizeof(ix[0]), ari_d_re_b);
  g = lower_bound(ix, ix+n, t, ari_d_rkomp(x0));	// x0[*g] == t
  g = upper_bound(ix, ix+n, t, ari_d_rkowp(x0));	// x0[*(g-1)] == t

  return 0;
}

このように、大域変数等を使わずに、要素のインデックスのソートおよび検索が可能となっている。

こうしてみると、Apple による C/C++ の Blocks 拡張は、C++ STL と関数オブジェクトに比べて効率は劣るものの、極めて有用性が高いと言える。それは、POSIX threads や qsort_r のような仕様、つまり、コンテキストを渡せる実装が既になされているのであれば必要性はそれほど感じられないが、Blocks 拡張の利用は極めて容易なので、開発効率を鑑みると Blocks 拡張に対応した手続きを積極的に利用する利点は大いにある。と言うのも、less `grep -l __BLOCKS__ /usr/include/*` で調べてみると、他にも Blocks 拡張された手続きが数多く存在するし、今後 Blocks 拡張が C/C++ にて標準化されることになれば、さらに有用性が高くなるだろう。

ちなみに、自前の実装としてあげた heapsort_r, partialsort_r, mergesort_r, introsort_r, lsearch_r/lfind_r, bsearch_r/lbsearch_r/ubsearch_r、そして、pmergesort_r, pintrosort_r, は sort_r-20100723.tar.bz2sort_r.h, sort_r.c から利用できる。

また、自前の実装として挙げた parallel_stable_sort, parallel_sortsort_r-20100723.tar.bz2sort.hpp から利用できる。

また、ベンチマーク結果(要素数n:100000, 10回の平均の秒数)を以下に示す。

要素そのもののソートと検索
heapsort(x,n,sizeof(x[0]),d_rcmp) 0.0249047
heapsort_r(x,n,sizeof(x[0]),0,d_rcmp_r) 0.01671
heapsort_b(x,n,sizeof(x[0]),d_rcmp_b) 0.0275569
partialsort_r(x,n,n,sizeof(x[0]),0,d_rcmp_r) 0.0164991
partial_sort(x,x+n,x+n,greater<double>()) 0.0114346
mergesort(x,n,sizeof(x[0]),d_rcmp) 0.0176533
mergesort_r(x,n,sizeof(x[0]),0,d_rcmp_r) 0.0130815
mergesort_b(x,n,sizeof(x[0]),d_rcmp_b) 0.0178412
stable_sort(x,x+n,greater<double>()) 0.00890727
pmergesort_r(x,n,sizeof(x[0]),0,d_rcmp_r) 0.0075165
parallel_stable_sort(x,x+n,greater<double>()) 0.00198088
qsort(x,n,sizeof(x[0]),d_rcmp) 0.0144886
qsort_r(x,n,sizeof(x[0]),0,d_rcmp_r) 0.01352
qsort_b(x,n,sizeof(x[0]),d_rcmp_b) 0.0147258
sort(x,x+n,greater<double>()) 0.00741098
introsort_r(x,n,sizeof(x[0]),0,d_rcmp_r) 0.0141188
pintrosort_r(x,n,sizeof(x[0]),0,d_rcmp_r) 0.0067838
psort_b(x,n,sizeof(x[0]),d_rcmp_b) 0.00691638
parallel_sort(x,x+n,greater<double>()) 0.00325432
f=(double *)lfind(&t,x0,&m,sizeof(x0[0]),d_rcmp) 0.000134635
f=(double *)lfind_r(&t,x0,&m,sizeof(x0[0]),0,d_rcmp_r) 0.000140667
f=find(x0,x0+n,t) 2.40564e-05
f=(double *)bsearch(&t,x,n,sizeof(x[0]),d_rcmp) 6.91414e-07
f=(double *)lbsearch_r(&t,x,n,sizeof(x[0]),0,d_rcmp_r) 8.10623e-07
f=(double *)ubsearch_r(&t,x,n,sizeof(x[0]),0,d_rcmp_r) 6.67572e-07
f=(double *)bsearch_r(&t,x,n,sizeof(x[0]),0,d_rcmp_r) 7.86781e-07
f=(double *)bsearch_b(&t,x,n,sizeof(x[0]),d_rcmp_b) 6.19888e-07
f=lower_bound(x,x+n,t,greater<double>()) 6.4373e-07
f=upper_bound(x,x+n,t,greater<double>()) 7.15256e-07
b=binary_search(x,x+n,t,greater<double>()) 7.15256e-07
要素のインデックスのソートと検索
heapsort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) 0.0226928
heapsort_b(ix,n,sizeof(ix[0]),ari_d_rcmp_b) 0.0308669
partialsort_r(ix,n,n,sizeof(ix[0]),x,ari_d_rcmp_r) 0.0227648
partial_sort(ix,ix+n,ix+n,ari_d_rcomp(x)) 0.0194682
mergesort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) 0.0147787
mergesort_b(ix,n,sizeof(ix[0]),ari_d_rcmp_b) 0.0201805
stable_sort(ix,ix+n,ari_d_rcomp(x)) 0.0112257
pmergesort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) 0.00885935
parallel_stable_sort(ix,ix+n,ari_d_rcomp(x)) 0.00239668
sort(ix,ix+n,ari_d_rcomp(x)) 0.0099478
qsort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) 0.0160118
qsort_b(ix,n,sizeof(ix[0]),ari_d_rcmp_b) 0.0191082
introsort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) 0.0167039
pintrosort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) 0.00834651
psort_b(ix,n,sizeof(ix[0]),ari_d_rcmp_b) 0.00909259
parallel_sort(ix,ix+n,ari_d_rcomp(x)) 0.00449612
g=find_if(ix,ix+n,ari_d_rpred(x0,&t)) 0.000172186
g=(int *)lfind_r(&t,ix,&m,sizeof(ix[0]),x0,ari_d_re_r) 0.000410461
g=(int *)lbsearch_r(&t,ix,n,sizeof(ix[0]),x0,ari_d_re_r) 7.39098e-07
g=(int *)ubsearch_r(&t,ix,n,sizeof(ix[0]),x0,ari_d_re_r) 7.15256e-07
g=(int *)bsearch_r(&t,ix,n,sizeof(ix[0]),x0,ari_d_re_r) 7.39098e-07
g=(int *)bsearch_b(&t,ix,n,sizeof(ix[0]),ari_d_re_b) 5.72205e-07
g=lower_bound(ix,ix+n,t,ari_d_rkomp(x0)) 6.91414e-07
g=upper_bound(ix,ix+n,t,ari_d_rkowp(x0)) 6.4373e-07
----

話題は多少それるが、部分ソートについても知見が得られた。

まずは結論を箇条書きにする。

  • C++ STL の partial_sort に相当するものが C にはない。よって partialsort_r を実装した
  • C++ STL による partial_sort (ヒープソートによる実装) の方が C の partialsort_r (ヒープソートによる実装)よりも高速である。これは、ソートアルゴリズムに比較関数がインライン展開される partial_sort と比べて、比較関数を関数ポインタ経由で実行する partialsort_r は不利だからであろう。
  • partialsort_rqsort と比べて、部分ソートの上位数が何割程度までなら高速だろうか、同様に、partial_sortsort と比べて、部分ソートの上位数が何割程度までなら高速だろうか。結論としては、上位数が2割程度までなら部分ソートを採用する価値がある。それ以上なら、全体をソートしてしまった方が速い。

以下に、ベンチマーク結果(要素数n:100000, 上位数m:100000r, 10回の平均の秒数)を以下に示す。

部分ソート 割合r 秒数 qsort,sortの秒数
partialsort_r(x,m,n,sizeof(x[0]),0,c_rcmp_r) 0.18553 0.00579057 0.00579412
partial_sort(x,x+m,x+n,greater<char>()) 0.16153 0.00358579 0.00358541
partialsort_r(ix,m,n,sizeof(ix[0]),x,ari_c_rcmp_r) 0.1516 0.00591381 0.00591898
partial_sort(ix,ix+m,ix+n,ari_c_rcomp(x)) 0.17991 0.0048316 0.00484018
partialsort_r(x,m,n,sizeof(x[0]),0,s_rcmp_r) 0.69863 0.0134328 0.0134347
partial_sort(x,x+m,x+n,greater<short>()) 0.30108 0.00620561 0.0062124
partialsort_r(ix,m,n,sizeof(ix[0]),x,ari_s_rcmp_r) 0.40503 0.0131619 0.0131641
partial_sort(ix,ix+m,ix+n,ari_s_rcomp(x)) 0.27234 0.00782237 0.00782158
partialsort_r(x,m,n,sizeof(x[0]),0,i_rcmp_r) 0.90588 0.0148749 0.0148832
partial_sort(x,x+m,x+n,greater<int>()) 0.27111 0.00592699 0.00592942
partialsort_r(ix,m,n,sizeof(ix[0]),x,ari_i_rcmp_r) 0.38938 0.014343 0.0143554
partial_sort(ix,ix+m,ix+n,ari_i_rcomp(x)) 0.27026 0.00848997 0.00849326
partialsort_r(x,m,n,sizeof(x[0]),0,d_rcmp_r) 0.59266 0.0144848 0.0144933
partial_sort(x,x+m,x+n,greater<double>()) 0.31718 0.0074177 0.00740693
partialsort_r(ix,m,n,sizeof(ix[0]),x,ari_d_rcmp_r) 0.37416 0.015897 0.015906
partial_sort(ix,ix+m,ix+n,ari_d_rcomp(x)) 0.26319 0.00990717 0.00991831
----

さて、後述とした、並列マージソート pmergesort_r は POSIX threads により実現可能であり、mergesort よりも並列度 4 の計算機環境で 1.5 倍程度高速となる。並列度 2 の計算機環境では速度向上は見られない。

さらに、後述とした、並列イントロソート psort, psort_r, psort_b は、GCD 環境下かつマルチスレッドに適した計算機環境では、qsort 系の 1.5 〜 2 倍程度高速である。しかし、GCD は基本的には「スレッドプール」技術を OS が支援する機構であるので、それがサポートされない場面では、自前で「スレッドプール」を用意すればよい。

この目的に即した「スレッドプール」を POSIX threads で必要十分に実現するコードは以下の通りである。

/*
  thread_pool.c

  Copyright (C) 2010 Taiji Yamada <taiji@aihara.co.jp>

  Distributed under the Boost Software License, Version 1.0. See
  http://www.boost.org/LICENSE_1_0.txt for more details.

  References:
  [1] http://docs.sun.com/app/docs/doc/819-0390/, 2008.
 */
#include <pthread.h>

typedef struct thread_pool {
  struct pool_job {
    void *(*func)(void *);
    void *arg;
    struct pool_job *next;
  } *job_head, *job_tail;
  pthread_t *thrs;
  size_t n_thread, c_idle, c_job;
  pthread_mutex_t mutex;
  pthread_cond_t cond_work, cond_wait;
  unsigned long flags;
} thread_pool_t;

static
void *thread_pool_work(void *arg)
{
  struct thread_pool *pool = (struct thread_pool *)arg;
  struct pool_job *job;

  pthread_mutex_lock(&pool->mutex);
  do {
    pool->c_idle++;
    while (!pool->job_head && !(pool->flags & (1UL<<1)))
      pthread_cond_wait(&pool->cond_work, &pool->mutex);
    pool->c_idle--;
    if (pool->job_head) {
      struct pool_job the_job = *(job = pool->job_head);

      pool->job_head = job->next;
      if (job == pool->job_tail)
	pool->job_tail = NULL;
      pthread_mutex_unlock(&pool->mutex);
      free(job);
      the_job.func(the_job.arg);
      pthread_mutex_lock(&pool->mutex);
      pool->c_job--;
    }
    if ((pool->flags & (1UL<<0)) && !pool->job_head) {
      pool->flags &= ~(1UL<<0);
      pthread_cond_broadcast(&pool->cond_wait);
    }
  } while (!(pool->flags & (1UL<<1)));
  pthread_mutex_unlock(&pool->mutex);
  return NULL;
}
thread_pool_t *thread_pool_create(int n_thread, pthread_attr_t *attr)
{
  thread_pool_t *pool;
  int i;

  if (!(pool = malloc(sizeof(thread_pool_t)))) return NULL;
  pool->job_head = pool->job_tail = NULL;
  pthread_mutex_init(&pool->mutex, NULL);
  pthread_cond_init(&pool->cond_work, NULL);
  pthread_cond_init(&pool->cond_wait, NULL);
  if (!(pool->thrs = malloc(sizeof(pthread_t)*(pool->n_thread=n_thread)))) { free(pool); return NULL; }
  pool->c_idle = pool->c_job = 0;
  pool->flags = 0;
  for (i=0; i<n_thread; i++)
    pthread_create(&pool->thrs[i], attr, thread_pool_work, pool);
  return pool;
}
int thread_pool_queue(thread_pool_t *pool, void *(*func)(void *), void *arg)
{
  struct pool_job *job;

  if (!(job = malloc(sizeof(struct pool_job)))) return 1;
  job->func = func;
  job->arg = arg;
  job->next = NULL;

  pthread_mutex_lock(&pool->mutex);
  if (!pool->job_head)
    pool->job_head = job;
  else
    pool->job_tail->next = job;
  pool->job_tail = job;
  pool->c_job++;
  if (pool->c_idle > 0)
    pthread_cond_signal(&pool->cond_work);
  pthread_mutex_unlock(&pool->mutex);
  return 0;
}
void thread_pool_wait(thread_pool_t *pool)
{
  pthread_mutex_lock(&pool->mutex);
  while (pool->c_job) {
    pool->flags |= (1UL<<0);
    pthread_cond_wait(&pool->cond_wait, &pool->mutex);
  }
  pthread_mutex_unlock(&pool->mutex);
}
void thread_pool_join(thread_pool_t *pool)
{
  int i;

  pthread_mutex_lock(&pool->mutex);
  while (pool->c_job) {
    pool->flags |= (1UL<<0);
    pthread_cond_wait(&pool->cond_wait, &pool->mutex);
  }
  pool->flags |= (1UL<<1);
  pthread_cond_broadcast(&pool->cond_work);
  pthread_mutex_unlock(&pool->mutex);

  for (i=0; i<pool->n_thread; i++)
    pthread_join(pool->thrs[i], NULL);

  free(pool->thrs);
  free(pool);
}

ちなみに、上記コメントで参考文献とした Sun のスレッドプール実装ではクイックソート/イントロソートの並列化は動作しない。それは、ジョブが再帰的に新たなジョブを生成するので、それを捕捉できないからである。上記コードはこのような再帰的なジョブの生成に対応してある。

よって、上記のこの自前の「スレッドプール」を利用した並列化されたイントロソート pintrosort_r は、GCD の psort 系とほぼ同等に高速となる。但し、ワーカースレッドがあらかじめ起動されている psort 系と比べて多少遅くなるときもあるがそれは微々たる差に留まる。よって、GCD が提供されていない環境では pintrosort_r は極めて有用であろう。

----

さて、後述とした、並列マージソートの C++ 版 parallel_stable_sort は boost thread により実現可能であり、マルチスレッドに適した計算機環境では、stable_sort よりも 2 〜 5 倍も高速となる。

さらに、後述とした、並列イントロソートの C++ 版 parallel_sort は boost threadpool もしくは自作のスレッドプールにより実現可能であり、マルチスレッドに適した計算機環境では、sort の 1.5 〜 2.5 倍も高速である。

この目的に即した「スレッドプール」を boost thread で必要十分に実現するコードは以下の通りである。

/*
  threadpool.hpp

  Copyright (C) 2010 Taiji Yamada <taiji@aihara.co.jp>

  Distributed under the Boost Software License, Version 1.0. See
  http://www.boost.org/LICENSE_1_0.txt for more details.

  References:
  [1] http://threadpool.sourceforge.net/, 2005-2008.
  [2] http://docs.sun.com/app/docs/doc/819-0390/, 2008.
 */
#include <boost/thread.hpp>
#include <queue>
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <boost/thread/condition.hpp>

class threadpool {
  typedef boost::function0<void> job_type;
  std::queue<job_type> jobs;
  boost::thread_group thrs;
  int c_idle, c_job;
  bool flag_wait, flag_quit;
  boost::mutex mutex;
  boost::condition cond_work, cond_wait;
  void work()
  {
    do {
      job_type job = 0;
      {
	boost::mutex::scoped_lock lock(mutex);
	++c_idle;
	while (jobs.empty() && !flag_quit)
	  cond_work.wait(lock);
	--c_idle;
	if (!jobs.empty()) {
	  job = jobs.front();
	  jobs.pop();
	}
      }
      if (job)
	job();
      {
	boost::mutex::scoped_lock lock(mutex);
	if (job)
	  --c_job;
	if (flag_wait && jobs.empty()) {
	  flag_wait = false;
	  cond_wait.notify_all();
	}
      }
    } while (!flag_quit);
  }
public:
  threadpool(size_t size) : c_idle(0), c_job(0), flag_wait(false), flag_quit(false)
  {
    for (size_t i=0; i<size; i++)
      thrs.create_thread(boost::bind(&threadpool::work, this));
  }
  void push(job_type job)
  {
    boost::mutex::scoped_lock lock(mutex);
    jobs.push(job);
    ++c_job;
    if (c_idle > 0)
      cond_work.notify_one();
  }
  void wait()
  {
    boost::mutex::scoped_lock lock(mutex);
    while (c_job) {
      flag_wait = true;
      cond_wait.wait(lock);
    }
  }
  void join()
  {
    {
      boost::mutex::scoped_lock lock(mutex);
      while (c_job) {
	flag_wait = true;
	cond_wait.wait(lock);
      }
      flag_quit = true;
      cond_work.notify_all();
    }
    thrs.join_all();
  }
};

この実装よりも、boost threadpool の方が若干性能が高い。

注意事項としては、GCD によらない並列マージソートおよび並列イントロソートは、スレッド処理のオーバーヘッドにより、要素数が数万程度より少ないと多少遅くなる傾向がある。GCD による psort 系はさすがにそういったオーバーヘッドが見られない。よって、GCD の様々な処理系への移植が期待される。

0 コメント
ゲストコメント認証用なぞなぞ:
キーボードのLから左に全部打って下さい。それを二回やって下さい。 ...