再考—ソートと検索 in C/C++

by taiji at 2010/06/15 23:06

最近、qsort_r の有効利用から始まり、qsort_b の発見と Apple による C/C++ の Blocks 拡張など、様々な知見が得られたのでまとめておく。

まずは結論を箇条書きにする。

  • qsort_r は Solaris にはない(POSIX 仕様ではない)。
  • mergesort (BSD由来の実装) は要素のサイズが sizeof(void *)/2 より小さいとソートされない(マニュアルにもその旨が明記されている)。よって、charshort 型の配列はエラーとなる。
  • qsort (Mac OS Xの実装) は Snow Leopard 以降は、マニュアルの記述とは異なり、実はイントロソート(クイックソートを再帰レベルに応じたヒープソートにより改良したもの)で実装されている。よって、最悪の場合の計算量は O(n^2) ではなく O(n log n) である。qsort_r も同様。よって、mergesort を使った方が速いという場面はまずないだろう。
  • Snow Leopard 以降は heapsort_b, mergesort_b, qsort_b, bsearch_b が導入されており、Blocks 拡張によりこれらを利用できる。よって、qsort_r のよい代替となる。
  • C++ STL による sort (イントロソートによる実装) の方が C の qsort よりも高速である。これは、ソートアルゴリズムに比較関数がインライン展開される sort と比べて、比較関数を関数ポインタ経由で実行する qsort は不利だからであろう。
  • 同様に、C++ STL による stable_sort (マージソートによる実装) の方が mergesort (BSD由来) よりも、C++ STL による partial_sort (ヒープソートによる実装) の方が heapsort (BSD由来) よりも高速である。
  • さらに同様に、C++ STL による binary_search/lower_bound/upper_bound (二分探索) の方が bsearch (ISO C90) よりも、C++ STL による find_if (線形検索) の方が lsearch/lfind (BSD由来) よりも高速である。
  • 並列マージソートは POSIX threads により自前で pmergesort_r のように実現可能であり、mergesort よりも並列度 4 の計算機環境で 1.5 倍程度高速となる。それは後述する
  • Blocks 拡張と密接に関連する Mac OS X の GCD(Grand Central Dispatch) が活用されている並列イントロソート psort, psort_r, psort_b がマルチスレッドに適した計算機環境 qsort 系の 1.5 〜 2 倍程度高速である。但し、基本的には GCD は「スレッドプール」技術により GCD なしでも自前で pintrosort_r のように実現可能であり、それは後述する
  • 並列マージソートの C++ 版 parallel_stable_sort を自作すると、C++ STL の stable_sort よりも 2 〜 5 倍も高速であり、それも後述する。
  • 並列イントロソートの C++ 版 parallel_sort を自作すると、C++ STL の sort よりも 1.5 〜 2.5 倍も高速であり、それも後述する。

まず一度、各種ソートと検索の簡単な例をあげておく。

#include <cstdlib>
#include <search.h>
#include <algorithm>
#include <functional>
using namespace std;

#define T_RCMP(T) ((*(T *)a > *(T *)b) ? -1 : (*(T *)a < *(T *)b) ? 1 : 0)
int d_rcmp(const void *a, const void *b) { return T_RCMP(double); }
int d_rcmp_r(void *thunk, const void *a, const void *b) { return T_RCMP(double); }

int main()
{
  int n = 100000;
  double *x = new double[n];
  int i;
  int (^d_rcmp_b)(const void *a, const void *b) = ^(const void *a, const void *b) {
    return (*(double *)a > *(double *)b) ? -1 : (*(double *)a < *(double *)b) ? 1 : 0;
  };

  for (i=0; i<n; i++)
    x[i] = (double)random()/((1<<31)-1);

  // 以下はすべて、要素doubleの降順ソート
  heapsort(x, n, sizeof(x[0]), d_rcmp);
  heapsort_b(x, n, sizeof(x[0]), d_rcmp_b);
  partial_sort(x, x+n, x+n, greater<double>());

  mergesort(x, n, sizeof(x[0]), d_rcmp);
  mergesort_b(x, n, sizeof(x[0]), d_rcmp_b);
  stable_sort(x, x+n, greater<double>());

  qsort(x, n, sizeof(x[0]), d_rcmp);
  qsort_r(x, n, sizeof(x[0]), NULL, d_rcmp_r);
  qsort_b(x, n, sizeof(x[0]), d_rcmp_b);
  sort(x, x+n, greater<double>());

  size_t m = n;
  double t, *f;
  bool b;

  t = x[random()%n];

  // 以下はすべて、要素doubleの検索
  f = (double *)lfind(&t, x, &m, sizeof(x[0]), d_rcmp);
  f = find(x, x+n, t);
  // 以下はすべて、要素doubleの降順ソート済み検索
  f = (double *)bsearch(&t, x, n, sizeof(x[0]), d_rcmp);
  f = (double *)bsearch_b(&t, x, n, sizeof(x[0]), d_rcmp_b);
  f = lower_bound(x, x+n, t, greater<double>());	// *f == t
  f = upper_bound(x, x+n, t, greater<double>());	// *(f-1) == t
  b = binary_search(x, x+n, t, greater<double>());

  return 0;
}

ここではまだ qsort_rqsort_b 系を使わざるを得ない例題ではないことに留意しよう。

ここで、d_rcmp_b という関数ポインタらしきもの(アスタリスクではなくハット)がブロック変数であり、それに代入している匿名関数がブロックで、ブロックが定義されたスコープのスタックの複製とともにブロックが実行される。これはまさにクロージャラムダ式であり、似たようなものとして以下が挙げられる。

もしこういった実装を選択できないのであれば、C の場合は qsort_r のようにコンテキストを渡す変数の追加が必要になるし、C++ の場合は関数オブジェクトを使ってそのメンバ変数にコンテキストを保持する必要がある。

さもなくば、大域変数や局所静的変数を使用する選択肢もあるが、すると再入可能ではなくなってしまい、並列化などが不可能となってしまう

そのような例題を具体的に考えてみる。

----

ここで、要素そのものをソートするのではなく、要素のインデックスをソートするものとする。

ソートの条件が複数あり得る場合は往々にしてあり、それらをスレッドで並列化して行う等の効率化も考えられる。よって、大域変数や局所静的変数の使用不可を前提として問題解決にあたる。

まずは結論を箇条書きにする。

  • 要素のインデックスのソートは、要素そのもののソートよりも、要素そのもののサイズが大きくない場合には、多少遅くなるがそれは微々たる差に留まる。そもそも、ソートアルゴリズムは「比較」とその結果による「スワップ」から成り、逆に言えば、要素のサイズが大きい場合には、明らかにスワップのコストが高くなるので、要素のインデックスのソートの方がコストが低くなる。
  • C には qsort_r に対応する lsearch_r, lfind_r, bsearch_r が存在しない。よって、これらを自前で実装しないと大域変数等が必要になってしまう。
  • 一方、Blocks 拡張の heapsort_b, mergesort_b, qsort_b, bsearch_b なら要件を満たす実装が可能である。

では解法例を示す。

#include <cstdlib>
#include <search.h>
#include "sort_r.h"
#include <iostream>
#include <algorithm>
#include <functional>
using namespace std;

#define ARI_T_RCMP_R(T) ((((T *)thunk)[*(int *)a] > ((T *)thunk)[*(int *)b]) ? -1 : (((T *)thunk)[*(int *)a] < ((T *)thunk)[*(int *)b]) ? 1 : 0)
int ari_d_rcmp_r(void *thunk, const void *a, const void *b) { return ARI_T_RCMP_R(double); }

template <typename T>
struct ari_rcomp { ari_rcomp(T *p):v(p){} bool operator()(const int &a, const int &b) { return v[a] > v[b]; } private: T *v; };

#define ARI_T_RE_R(T) ((*(T *)k > ((T *)thunk)[*(int *)i]) ? -1 : (*(T *)k < ((T *)thunk)[*(int *)i]) ? 1 : 0)
int ari_d_re_r(void *thunk, const void *k, const void *i) { return ARI_T_RE_R(double); }

template <typename T>
struct ari_rpred { ari_rpred(T *p, T *q):v(p),t(q){} bool operator()(const int &a) { return v[a] == *t; } private: T *v, *t; };

template <typename T>
struct ari_rkomp { ari_rkomp(T *p):v(p){} bool operator()(const int &i, const T &k) { return v[i] > k; } private: T *v; };

template <typename T>
struct ari_rkowp { ari_rkowp(T *p):v(p){} bool operator()(const T &k, const int &i) { return k > v[i]; } private: T *v; };

int main()
{
  int n = 100000;
  double *x0 = new double[n], *x = new double[n];
  int i, *ix = new int[n];
  typedef ari_rcomp<double> ari_d_rcomp;
  typedef ari_rpred<double> ari_d_rpred;
  typedef ari_rkomp<double> ari_d_rkomp;
  typedef ari_rkowp<double> ari_d_rkowp;
  int (^ari_d_rcmp_b)(const void *a, const void *b) = ^(const void *a, const void *b) {
    return (x[*(int *)a] > x[*(int *)b]) ? -1 : (x[*(int *)a] < x[*(int *)b]) ? 1 : 0;
  };
  int (^ari_d_re_b)(const void *k, const void *i) = ^(const void *k, const void *i) {
    return (*(double *)k > x0[*(int *)i]) ? -1 : (*(double *)k < x0[*(int *)i]) ? 1 : 0;
  };

  for (i=0; i<n; i++)
    x[i] = x0[ix[i] = i] = (double)random()/((1<<31)-1);

  // 以下はすべて、要素doubleが比較対象のインデックスの降順ソート
  heapsort_r(ix, n, sizeof(ix[0]), x,ari_d_rcmp_r);		// 自前の実装
  heapsort_b(ix, n, sizeof(ix[0]), ari_d_rcmp_b);
  partialsort_r(ix, n, n, sizeof(ix[0]), x, ari_d_rcmp_r);	// 自前の実装
  partial_sort(ix, ix+n, ix+n, ari_d_rcomp(x));

  mergesort_r(ix, n, sizeof(ix[0]), x,ari_d_rcmp_r);		// 自前の実装
  mergesort_b(ix, n, sizeof(ix[0]), ari_d_rcmp_b);
  stable_sort(ix, ix+n, ari_d_rcomp(x));
  pmergesort_r(ix, n, sizeof(ix[0]), x, ari_d_rcmp_r);		// 後述、自前の実装
  parallel_stable_sort(ix, ix+n, ari_d_rcomp(x));		// 後述、自前の実装

  sort(ix, ix+n, ari_d_rcomp(x));
  qsort_r(ix, n,sizeof(ix[0]), x,ari_d_rcmp_r);
  qsort_b(ix, n,sizeof(ix[0]), ari_d_rcmp_b);

  introsort_r(ix, n, sizeof(ix[0]), x, ari_d_rcmp_r);		// 自前の実装

  psort_b(ix, n, sizeof(ix[0]), ari_d_rcmp_b);			// 後述
  pintrosort_r(ix, n, sizeof(ix[0]), x, ari_d_rcmp_r);		// 後述、自前の実装
  parallel_sort(ix, ix+n, ari_d_rcomp(x));			// 後述、自前の実装

  size_t m = n;
  double t;
  int *g;
  bool b;

  t = x0[random()%n];
  sort(ix, ix+n, ari_d_rcomp(x0));

  // 以下はすべて、要素doubleのインデックスの検索
  g = find_if(ix, ix+n, ari_d_rpred(x0,&t));
  g = (int *)lfind_r(&t, ix, &m, sizeof(ix[0]), x0, ari_d_re_r);	// 自前の実装
  // 以下はすべて、要素doubleの降順ソート済みインデックス検索
  g = (int *)lbsearch_r(&t, ix, n, sizeof(ix[0]), x0, ari_d_re_r);	// 自前の実装
  g = (int *)ubsearch_r(&t, ix, n, sizeof(ix[0]), x0, ari_d_re_r);	// 自前の実装
  g = (int *)bsearch_r(&t, ix, n, sizeof(ix[0]), x0, ari_d_re_r);	// 自前の実装
  g = (int *)bsearch_b(&t, ix, n, sizeof(ix[0]), ari_d_re_b);
  g = lower_bound(ix, ix+n, t, ari_d_rkomp(x0));	// x0[*g] == t
  g = upper_bound(ix, ix+n, t, ari_d_rkowp(x0));	// x0[*(g-1)] == t

  return 0;
}

このように、大域変数等を使わずに、要素のインデックスのソートおよび検索が可能となっている。

こうしてみると、Apple による C/C++ の Blocks 拡張は、C++ STL と関数オブジェクトに比べて効率は劣るものの、極めて有用性が高いと言える。それは、POSIX threads や qsort_r のような仕様、つまり、コンテキストを渡せる実装が既になされているのであれば必要性はそれほど感じられないが、Blocks 拡張の利用は極めて容易なので、開発効率を鑑みると Blocks 拡張に対応した手続きを積極的に利用する利点は大いにある。と言うのも、less `grep -l __BLOCKS__ /usr/include/*` で調べてみると、他にも Blocks 拡張された手続きが数多く存在するし、今後 Blocks 拡張が C/C++ にて標準化されることになれば、さらに有用性が高くなるだろう。

ちなみに、自前の実装としてあげた heapsort_r, partialsort_r, mergesort_r, introsort_r, lsearch_r/lfind_r, bsearch_r/lbsearch_r/ubsearch_r、そして、pmergesort_r, pintrosort_r, は sort_r-20100723.tar.bz2sort_r.h, sort_r.c から利用できる。

また、自前の実装として挙げた parallel_stable_sort, parallel_sortsort_r-20100723.tar.bz2sort.hpp から利用できる。

また、ベンチマーク結果(要素数n:100000, 10回の平均の秒数)を以下に示す。

要素そのもののソートと検索
heapsort(x,n,sizeof(x[0]),d_rcmp) 0.0249047
heapsort_r(x,n,sizeof(x[0]),0,d_rcmp_r) 0.01671
heapsort_b(x,n,sizeof(x[0]),d_rcmp_b) 0.0275569
partialsort_r(x,n,n,sizeof(x[0]),0,d_rcmp_r) 0.0164991
partial_sort(x,x+n,x+n,greater<double>()) 0.0114346
mergesort(x,n,sizeof(x[0]),d_rcmp) 0.0176533
mergesort_r(x,n,sizeof(x[0]),0,d_rcmp_r) 0.0130815
mergesort_b(x,n,sizeof(x[0]),d_rcmp_b) 0.0178412
stable_sort(x,x+n,greater<double>()) 0.00890727
pmergesort_r(x,n,sizeof(x[0]),0,d_rcmp_r) 0.0075165
parallel_stable_sort(x,x+n,greater<double>()) 0.00198088
qsort(x,n,sizeof(x[0]),d_rcmp) 0.0144886
qsort_r(x,n,sizeof(x[0]),0,d_rcmp_r) 0.01352
qsort_b(x,n,sizeof(x[0]),d_rcmp_b) 0.0147258
sort(x,x+n,greater<double>()) 0.00741098
introsort_r(x,n,sizeof(x[0]),0,d_rcmp_r) 0.0141188
pintrosort_r(x,n,sizeof(x[0]),0,d_rcmp_r) 0.0067838
psort_b(x,n,sizeof(x[0]),d_rcmp_b) 0.00691638
parallel_sort(x,x+n,greater<double>()) 0.00325432
f=(double *)lfind(&t,x0,&m,sizeof(x0[0]),d_rcmp) 0.000134635
f=(double *)lfind_r(&t,x0,&m,sizeof(x0[0]),0,d_rcmp_r) 0.000140667
f=find(x0,x0+n,t) 2.40564e-05
f=(double *)bsearch(&t,x,n,sizeof(x[0]),d_rcmp) 6.91414e-07
f=(double *)lbsearch_r(&t,x,n,sizeof(x[0]),0,d_rcmp_r) 8.10623e-07
f=(double *)ubsearch_r(&t,x,n,sizeof(x[0]),0,d_rcmp_r) 6.67572e-07
f=(double *)bsearch_r(&t,x,n,sizeof(x[0]),0,d_rcmp_r) 7.86781e-07
f=(double *)bsearch_b(&t,x,n,sizeof(x[0]),d_rcmp_b) 6.19888e-07
f=lower_bound(x,x+n,t,greater<double>()) 6.4373e-07
f=upper_bound(x,x+n,t,greater<double>()) 7.15256e-07
b=binary_search(x,x+n,t,greater<double>()) 7.15256e-07
要素のインデックスのソートと検索
heapsort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) 0.0226928
heapsort_b(ix,n,sizeof(ix[0]),ari_d_rcmp_b) 0.0308669
partialsort_r(ix,n,n,sizeof(ix[0]),x,ari_d_rcmp_r) 0.0227648
partial_sort(ix,ix+n,ix+n,ari_d_rcomp(x)) 0.0194682
mergesort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) 0.0147787
mergesort_b(ix,n,sizeof(ix[0]),ari_d_rcmp_b) 0.0201805
stable_sort(ix,ix+n,ari_d_rcomp(x)) 0.0112257
pmergesort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) 0.00885935
parallel_stable_sort(ix,ix+n,ari_d_rcomp(x)) 0.00239668
sort(ix,ix+n,ari_d_rcomp(x)) 0.0099478
qsort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) 0.0160118
qsort_b(ix,n,sizeof(ix[0]),ari_d_rcmp_b) 0.0191082
introsort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) 0.0167039
pintrosort_r(ix,n,sizeof(ix[0]),x,ari_d_rcmp_r) 0.00834651
psort_b(ix,n,sizeof(ix[0]),ari_d_rcmp_b) 0.00909259
parallel_sort(ix,ix+n,ari_d_rcomp(x)) 0.00449612
g=find_if(ix,ix+n,ari_d_rpred(x0,&t)) 0.000172186
g=(int *)lfind_r(&t,ix,&m,sizeof(ix[0]),x0,ari_d_re_r) 0.000410461
g=(int *)lbsearch_r(&t,ix,n,sizeof(ix[0]),x0,ari_d_re_r) 7.39098e-07
g=(int *)ubsearch_r(&t,ix,n,sizeof(ix[0]),x0,ari_d_re_r) 7.15256e-07
g=(int *)bsearch_r(&t,ix,n,sizeof(ix[0]),x0,ari_d_re_r) 7.39098e-07
g=(int *)bsearch_b(&t,ix,n,sizeof(ix[0]),ari_d_re_b) 5.72205e-07
g=lower_bound(ix,ix+n,t,ari_d_rkomp(x0)) 6.91414e-07
g=upper_bound(ix,ix+n,t,ari_d_rkowp(x0)) 6.4373e-07
----

話題は多少それるが、部分ソートについても知見が得られた。

まずは結論を箇条書きにする。

  • C++ STL の partial_sort に相当するものが C にはない。よって partialsort_r を実装した
  • C++ STL による partial_sort (ヒープソートによる実装) の方が C の partialsort_r (ヒープソートによる実装)よりも高速である。これは、ソートアルゴリズムに比較関数がインライン展開される partial_sort と比べて、比較関数を関数ポインタ経由で実行する partialsort_r は不利だからであろう。
  • partialsort_rqsort と比べて、部分ソートの上位数が何割程度までなら高速だろうか、同様に、partial_sortsort と比べて、部分ソートの上位数が何割程度までなら高速だろうか。結論としては、上位数が2割程度までなら部分ソートを採用する価値がある。それ以上なら、全体をソートしてしまった方が速い。

以下に、ベンチマーク結果(要素数n:100000, 上位数m:100000r, 10回の平均の秒数)を以下に示す。

部分ソート 割合r 秒数 qsort,sortの秒数
partialsort_r(x,m,n,sizeof(x[0]),0,c_rcmp_r) 0.18553 0.00579057 0.00579412
partial_sort(x,x+m,x+n,greater<char>()) 0.16153 0.00358579 0.00358541
partialsort_r(ix,m,n,sizeof(ix[0]),x,ari_c_rcmp_r) 0.1516 0.00591381 0.00591898
partial_sort(ix,ix+m,ix+n,ari_c_rcomp(x)) 0.17991 0.0048316 0.00484018
partialsort_r(x,m,n,sizeof(x[0]),0,s_rcmp_r) 0.69863 0.0134328 0.0134347
partial_sort(x,x+m,x+n,greater<short>()) 0.30108 0.00620561 0.0062124
partialsort_r(ix,m,n,sizeof(ix[0]),x,ari_s_rcmp_r) 0.40503 0.0131619 0.0131641
partial_sort(ix,ix+m,ix+n,ari_s_rcomp(x)) 0.27234 0.00782237 0.00782158
partialsort_r(x,m,n,sizeof(x[0]),0,i_rcmp_r) 0.90588 0.0148749 0.0148832
partial_sort(x,x+m,x+n,greater<int>()) 0.27111 0.00592699 0.00592942
partialsort_r(ix,m,n,sizeof(ix[0]),x,ari_i_rcmp_r) 0.38938 0.014343 0.0143554
partial_sort(ix,ix+m,ix+n,ari_i_rcomp(x)) 0.27026 0.00848997 0.00849326
partialsort_r(x,m,n,sizeof(x[0]),0,d_rcmp_r) 0.59266 0.0144848 0.0144933
partial_sort(x,x+m,x+n,greater<double>()) 0.31718 0.0074177 0.00740693
partialsort_r(ix,m,n,sizeof(ix[0]),x,ari_d_rcmp_r) 0.37416 0.015897 0.015906
partial_sort(ix,ix+m,ix+n,ari_d_rcomp(x)) 0.26319 0.00990717 0.00991831
----

さて、後述とした、並列マージソート pmergesort_r は POSIX threads により実現可能であり、mergesort よりも並列度 4 の計算機環境で 1.5 倍程度高速となる。並列度 2 の計算機環境では速度向上は見られない。

さらに、後述とした、並列イントロソート psort, psort_r, psort_b は、GCD 環境下かつマルチスレッドに適した計算機環境では、qsort 系の 1.5 〜 2 倍程度高速である。しかし、GCD は基本的には「スレッドプール」技術を OS が支援する機構であるので、それがサポートされない場面では、自前で「スレッドプール」を用意すればよい。

この目的に即した「スレッドプール」を POSIX threads で必要十分に実現するコードは以下の通りである。

/*
  thread_pool.c

  Copyright (C) 2010 Taiji Yamada <taiji@aihara.co.jp>

  Distributed under the Boost Software License, Version 1.0. See
  http://www.boost.org/LICENSE_1_0.txt for more details.

  References:
  [1] http://docs.sun.com/app/docs/doc/819-0390/, 2008.
 */
#include <pthread.h>

typedef struct thread_pool {
  struct pool_job {
    void *(*func)(void *);
    void *arg;
    struct pool_job *next;
  } *job_head, *job_tail;
  pthread_t *thrs;
  size_t n_thread, c_idle, c_job;
  pthread_mutex_t mutex;
  pthread_cond_t cond_work, cond_wait;
  unsigned long flags;
} thread_pool_t;

static
void *thread_pool_work(void *arg)
{
  struct thread_pool *pool = (struct thread_pool *)arg;
  struct pool_job *job;

  pthread_mutex_lock(&pool->mutex);
  do {
    pool->c_idle++;
    while (!pool->job_head && !(pool->flags & (1UL<<1)))
      pthread_cond_wait(&pool->cond_work, &pool->mutex);
    pool->c_idle--;
    if (pool->job_head) {
      struct pool_job the_job = *(job = pool->job_head);

      pool->job_head = job->next;
      if (job == pool->job_tail)
	pool->job_tail = NULL;
      pthread_mutex_unlock(&pool->mutex);
      free(job);
      the_job.func(the_job.arg);
      pthread_mutex_lock(&pool->mutex);
      pool->c_job--;
    }
    if ((pool->flags & (1UL<<0)) && !pool->job_head) {
      pool->flags &= ~(1UL<<0);
      pthread_cond_broadcast(&pool->cond_wait);
    }
  } while (!(pool->flags & (1UL<<1)));
  pthread_mutex_unlock(&pool->mutex);
  return NULL;
}
thread_pool_t *thread_pool_create(int n_thread, pthread_attr_t *attr)
{
  thread_pool_t *pool;
  int i;

  if (!(pool = malloc(sizeof(thread_pool_t)))) return NULL;
  pool->job_head = pool->job_tail = NULL;
  pthread_mutex_init(&pool->mutex, NULL);
  pthread_cond_init(&pool->cond_work, NULL);
  pthread_cond_init(&pool->cond_wait, NULL);
  if (!(pool->thrs = malloc(sizeof(pthread_t)*(pool->n_thread=n_thread)))) { free(pool); return NULL; }
  pool->c_idle = pool->c_job = 0;
  pool->flags = 0;
  for (i=0; i<n_thread; i++)
    pthread_create(&pool->thrs[i], attr, thread_pool_work, pool);
  return pool;
}
int thread_pool_queue(thread_pool_t *pool, void *(*func)(void *), void *arg)
{
  struct pool_job *job;

  if (!(job = malloc(sizeof(struct pool_job)))) return 1;
  job->func = func;
  job->arg = arg;
  job->next = NULL;

  pthread_mutex_lock(&pool->mutex);
  if (!pool->job_head)
    pool->job_head = job;
  else
    pool->job_tail->next = job;
  pool->job_tail = job;
  pool->c_job++;
  if (pool->c_idle > 0)
    pthread_cond_signal(&pool->cond_work);
  pthread_mutex_unlock(&pool->mutex);
  return 0;
}
void thread_pool_wait(thread_pool_t *pool)
{
  pthread_mutex_lock(&pool->mutex);
  while (pool->c_job) {
    pool->flags |= (1UL<<0);
    pthread_cond_wait(&pool->cond_wait, &pool->mutex);
  }
  pthread_mutex_unlock(&pool->mutex);
}
void thread_pool_join(thread_pool_t *pool)
{
  int i;

  pthread_mutex_lock(&pool->mutex);
  while (pool->c_job) {
    pool->flags |= (1UL<<0);
    pthread_cond_wait(&pool->cond_wait, &pool->mutex);
  }
  pool->flags |= (1UL<<1);
  pthread_cond_broadcast(&pool->cond_work);
  pthread_mutex_unlock(&pool->mutex);

  for (i=0; i<pool->n_thread; i++)
    pthread_join(pool->thrs[i], NULL);

  free(pool->thrs);
  free(pool);
}

ちなみに、上記コメントで参考文献とした Sun のスレッドプール実装ではクイックソート/イントロソートの並列化は動作しない。それは、ジョブが再帰的に新たなジョブを生成するので、それを捕捉できないからである。上記コードはこのような再帰的なジョブの生成に対応してある。

よって、上記のこの自前の「スレッドプール」を利用した並列化されたイントロソート pintrosort_r は、GCD の psort 系とほぼ同等に高速となる。但し、ワーカースレッドがあらかじめ起動されている psort 系と比べて多少遅くなるときもあるがそれは微々たる差に留まる。よって、GCD が提供されていない環境では pintrosort_r は極めて有用であろう。

----

さて、後述とした、並列マージソートの C++ 版 parallel_stable_sort は boost thread により実現可能であり、マルチスレッドに適した計算機環境では、stable_sort よりも 2 〜 5 倍も高速となる。

さらに、後述とした、並列イントロソートの C++ 版 parallel_sort は boost threadpool もしくは自作のスレッドプールにより実現可能であり、マルチスレッドに適した計算機環境では、sort の 1.5 〜 2.5 倍も高速である。

この目的に即した「スレッドプール」を boost thread で必要十分に実現するコードは以下の通りである。

/*
  threadpool.hpp

  Copyright (C) 2010 Taiji Yamada <taiji@aihara.co.jp>

  Distributed under the Boost Software License, Version 1.0. See
  http://www.boost.org/LICENSE_1_0.txt for more details.

  References:
  [1] http://threadpool.sourceforge.net/, 2005-2008.
  [2] http://docs.sun.com/app/docs/doc/819-0390/, 2008.
 */
#include <boost/thread.hpp>
#include <queue>
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <boost/thread/condition.hpp>

class threadpool {
  typedef boost::function0<void> job_type;
  std::queue<job_type> jobs;
  boost::thread_group thrs;
  int c_idle, c_job;
  bool flag_wait, flag_quit;
  boost::mutex mutex;
  boost::condition cond_work, cond_wait;
  void work()
  {
    do {
      job_type job = 0;
      {
	boost::mutex::scoped_lock lock(mutex);
	++c_idle;
	while (jobs.empty() && !flag_quit)
	  cond_work.wait(lock);
	--c_idle;
	if (!jobs.empty()) {
	  job = jobs.front();
	  jobs.pop();
	}
      }
      if (job)
	job();
      {
	boost::mutex::scoped_lock lock(mutex);
	if (job)
	  --c_job;
	if (flag_wait && jobs.empty()) {
	  flag_wait = false;
	  cond_wait.notify_all();
	}
      }
    } while (!flag_quit);
  }
public:
  threadpool(size_t size) : c_idle(0), c_job(0), flag_wait(false), flag_quit(false)
  {
    for (size_t i=0; i<size; i++)
      thrs.create_thread(boost::bind(&threadpool::work, this));
  }
  void push(job_type job)
  {
    boost::mutex::scoped_lock lock(mutex);
    jobs.push(job);
    ++c_job;
    if (c_idle > 0)
      cond_work.notify_one();
  }
  void wait()
  {
    boost::mutex::scoped_lock lock(mutex);
    while (c_job) {
      flag_wait = true;
      cond_wait.wait(lock);
    }
  }
  void join()
  {
    {
      boost::mutex::scoped_lock lock(mutex);
      while (c_job) {
	flag_wait = true;
	cond_wait.wait(lock);
      }
      flag_quit = true;
      cond_work.notify_all();
    }
    thrs.join_all();
  }
};

この実装よりも、boost threadpool の方が若干性能が高い。

注意事項としては、GCD によらない並列マージソートおよび並列イントロソートは、スレッド処理のオーバーヘッドにより、要素数が数万程度より少ないと多少遅くなる傾向がある。GCD による psort 系はさすがにそういったオーバーヘッドが見られない。よって、GCD の様々な処理系への移植が期待される。


Cocoa プログラミング概観

by taiji at 2010/04/08 01:20
image:1:Do6HkZkVlqQAAAAASUVORK5CYII=
image:2:AAAAAElFTkSuQmCC

コードなしで動かしてみる - Outlets, Target/Action 体験

  1. Shift+Command+N で新規プロジェクト、Core Data Application、プロジェクト名を「SpeechClock」として「完了」。
  2. SpeechClock プロジェクトで「MainMenu.nib」をダブルクリックしてインターフェースビルダー(IB)で開く。
  3. Instances の「Window」を delete。かわりに、Cocoa Windows の「NSWindow and NSDrawer」を Instances へ。すると、NSDrawer1 と Parent Window と DrawContentView というインスタンスが作られる。
  4. Parent Window のタイトルを Command+1 で「SpeechClock」に変更。
  5. Cocoa Controls の NSButton を「SpeechClock」Window へ。NSButton のタイトルは「Voice」とする。Command+1 で「Visible at launch time」をオンにしておく。
  6. Voice ボタンから Control を押しながら「NSDrawer1」へ「toggle:」Action で Connect。
  7. ウィンドウが狭いと作り難いので、とりあえずサイズを 480x360 ぐらいに広くしておこう。
  8. 「NSDrawer1」の Preferred Edge を Command+1 で「Bottom」にしておく。
  9. ここで、IB 上で、Command+R で試験。Voice ボタンで NSDrawer ウィンドウが出現すれば成功。

データのためのテーブルを配置してみる - Core Data とレイアウトの体験

  1. SpeechClock プロジェクトで「SpeechClock_DataModel.xcdatamodel」を開き、エンティティ「voices」(クラスは規定の NSManagedObject)を作成。属性は「voice、文字列」とする。
  2. 次の操作に備え、MainMenu.nib を IB で開いておく。
  3. voices エンティティを Option を押しながら、「DrawContentView」Windowへ。さまざまな Cocoa Controls が生成される。
  4. 生成された NSTableView について「Use Alternating Raw Background」「Automatically Hide Scrollers」を有効にした。
  5. 生成されたさまざまな部品の配置はなるべくコンパクトにまとめ、NSDrawer1 の Command+3 のサイズで最小/最大サイズ等を調整するとよい。
  6. それぞれの部品の自動サイズを調整し、意図するレイアウトになるまで Command+R で試しながら調整するとよい。

コードを書いて動かしてみる - Outlet の作成と Connection を体験

  1. SpeechClock プロジェクトで「SpeechClock_AppDelegate.h」と「SpeechClock_AppDelegate.m」をリスト1のように編集する。
  2. Classes の「SpeechClock_AppDelegate」(インスタンスをダブルクリックするとクラスへ)の Command+1 で outlet として「sampleTextField」、action として「startSpeechSampleText」と「stopSpeechSampleText」を追加する。
  3. NSBox を「DrawContentView」Windowへ。タイトルは「Speech」とする。さらに、NSTextField と2つの NSButton を「Speech」Box へ。NSTextField のタイトルは適当なゼリフ「Well begun is half done.」、NSButton のタイトルは配置の右から「Speak」「Stop」とする。
  4. speech「Speak」Button から SpeechClock_AppDelegate の「startSpeechSampleText」へ Connection。
  5. speech「Stop」Button から SpeechClock_AppDelegate の「stopSpeechSampleText」へ Connection。
  6. SpeechClock_AppDelegate の「sampleTextField」outlet から speech NSTextField へ Connection。
  7. ここで、Xcode上で、Command+R で試験。Speak ボタンを押して声がして、Stop ボタンで途中で止められれば成功。

リスト1

--- SpeechClock_AppDelegate.h~01	2010-04-09 12:49:38.000000000 +0900
+++ SpeechClock_AppDelegate.h	2010-04-09 14:58:15.000000000 +0900
@@ -11,6 +11,8 @@
 @interface SpeechClock_AppDelegate : NSObject 
 {
     IBOutlet NSWindow *window;
+    IBOutlet NSTextField *sampleTextField;
+    NSSpeechSynthesizer *speechSynthesizer;
     
     NSPersistentStoreCoordinator *persistentStoreCoordinator;
     NSManagedObjectModel *managedObjectModel;
@@ -23,4 +25,7 @@
 
 - (IBAction)saveAction:sender;
 
+- (IBAction)startSpeechSampleText:(id)sender;
+- (IBAction)stopSpeechSampleText:(id)sender;
+
 @end
--- SpeechClock_AppDelegate.m~01	2010-04-09 12:49:38.000000000 +0900
+++ SpeechClock_AppDelegate.m	2010-04-09 14:59:18.000000000 +0900
@@ -10,6 +10,23 @@
 
 @implementation SpeechClock_AppDelegate
 
+- (id)init
+{
+  [super init];
+  speechSynthesizer = [[NSSpeechSynthesizer alloc] initWithVoice:nil];
+  return self;
+}
+- (IBAction)startSpeechSampleText:(id)sender
+{
+  NSString *string;
+  if (![(string = [sampleTextField stringValue]) length])
+    return;
+  [speechSynthesizer startSpeakingString:string];
+}
+- (IBAction)stopSpeechSampleText:(id)sender
+{
+  [speechSynthesizer stopSpeaking];
+}

部品の状態を制御する - delegate Outlet の作成、awakeFromNib の利用を体験

  1. 「SpeechClock_AppDelegate.h」と「SpeechClock_AppDelegate.m」をリスト2のように編集する。
  2. Classes の「SpeechClock_AppDelegate」の Command+1 で outlet として「startSpeechSampleTextButton」と「stopSpeechSampleTextButton」を追加する。
  3. SpeechClock_AppDelegate の「startSpeechTextField」outlet から「Speak」Button へConnection。
  4. SpeechClock_AppDelegate の「stopSpeechTextField」outlet から「Stop」Button へConnection。
  5. ここで、Xcode上で、Command+R で試験。起動時に Stop ボタンは無効化されていて、Speak ボタンを押して声がしている間は「Speak」Button が無効化され、声が終われば元に戻れば成功。

リスト2

--- SpeechClock_AppDelegate.h~02	2010-04-09 14:58:15.000000000 +0900
+++ SpeechClock_AppDelegate.h	2010-04-09 15:10:10.000000000 +0900
@@ -13,6 +13,8 @@
     IBOutlet NSWindow *window;
     IBOutlet NSTextField *sampleTextField;
     NSSpeechSynthesizer *speechSynthesizer;
+    IBOutlet NSButton *startSpeechSampleTextButton;
+    IBOutlet NSButton *stopSpeechSampleTextButton; 
     
     NSPersistentStoreCoordinator *persistentStoreCoordinator;
     NSManagedObjectModel *managedObjectModel;
--- SpeechClock_AppDelegate.m~02	2010-04-09 14:59:18.000000000 +0900
+++ SpeechClock_AppDelegate.m	2010-04-09 15:16:44.000000000 +0900
@@ -14,20 +14,35 @@
 {
   [super init];
   speechSynthesizer = [[NSSpeechSynthesizer alloc] initWithVoice:nil];
+  [speechSynthesizer setDelegate:self];
   return self;
 }
+- (void)awakeFromNib
+{
+  [startSpeechSampleTextButton setEnabled:!NO];
+  [stopSpeechSampleTextButton setEnabled:!YES];
+}
 - (IBAction)startSpeechSampleText:(id)sender
 {
   NSString *string;
   if (![(string = [sampleTextField stringValue]) length])
     return;
   [speechSynthesizer startSpeakingString:string];
+  [startSpeechSampleTextButton setEnabled:NO];
+  [stopSpeechSampleTextButton setEnabled:YES];
 }
 - (IBAction)stopSpeechSampleText:(id)sender
 {
   [speechSynthesizer stopSpeaking];
 }
 
+- (void)speechSynthesizer:(NSSpeechSynthesizer *)sender
+	didFinishSpeaking:(BOOL)success
+{
+  [startSpeechSampleTextButton setEnabled:!NO];
+  [stopSpeechSampleTextButton setEnabled:!YES];
+}
+

データの状態を取得する - NSArrayController の利用を体験

  1. 「SpeechClock_AppDelegate.h」と「SpeechClock_AppDelegate.m」をリスト3のように編集する。
  2. Classes の「SpeechClock_AppDelegate」の Command+1 で outlet として「arrayController」を追加する。
  3. SpeechClock_AppDelegate の「arrayController」outlet から「Voices Array Controller」へConnection。
  4. ここで、Xcode上で、Command+R で試験。NSTableView に例えば「com.apple.speech.synthesis.voice.Agnes」などを入力してみて、声色が変更できれば成功。
  5. さらに、正常終了させた後、改めて実行した時に、前に入力したデータが保存されていれば成功。

リスト3

--- SpeechClock_AppDelegate.h~03	2010-04-09 15:10:10.000000000 +0900
+++ SpeechClock_AppDelegate.h	2010-04-09 15:30:33.000000000 +0900
@@ -15,6 +15,7 @@
     NSSpeechSynthesizer *speechSynthesizer;
     IBOutlet NSButton *startSpeechSampleTextButton;
     IBOutlet NSButton *stopSpeechSampleTextButton; 
+    IBOutlet NSArrayController *voicesArrayController;
     
     NSPersistentStoreCoordinator *persistentStoreCoordinator;
     NSManagedObjectModel *managedObjectModel;
--- SpeechClock_AppDelegate.m~03	2010-04-09 15:16:44.000000000 +0900
+++ SpeechClock_AppDelegate.m	2010-04-09 15:32:56.000000000 +0900
@@ -27,6 +27,10 @@
   NSString *string;
   if (![(string = [sampleTextField stringValue]) length])
     return;
+  NSString *voice = ([[voicesArrayController selectionIndexes] count] == 1) ?
+    [voicesArrayController valueForKeyPath:@"selection.voice"] :
+    [NSSpeechSynthesizer defaultVoice];
+  [speechSynthesizer setVoice:voice];
   [speechSynthesizer startSpeakingString:string];
   [startSpeechSampleTextButton setEnabled:NO];
   [stopSpeechSampleTextButton setEnabled:YES];

データの状態を制御する - NSManagedObject の利用を体験

  1. 「SpeechClock_AppDelegate.h」と「SpeechClock_AppDelegate.m」をリスト4のように編集する。
  2. NSButton を「Speech」Box へ1つ追加。NSButton のタイトルは「default」とする。
  3. Classes の「SpeechClock_AppDelegate」の Command+1 で action として「setVoicesByDefault」を追加する。
  4. speech「default」Button から SpeechClock_AppDelegate の「setVoicesByDefault」へ Connection。
  5. ここで、Xcode上で、Command+R で試験。default ボタンでシステム標準の声色リストが NSTableView に揃えられ、選択した声色に変更できれば成功。
  6. さらに、正常終了させた後、改めて実行した時に、前に入力したデータが保存されていれば成功。

リスト4

--- SpeechClock_AppDelegate.h~04	2010-04-09 15:30:33.000000000 +0900
+++ SpeechClock_AppDelegate.h	2010-04-09 16:03:43.000000000 +0900
@@ -30,5 +30,6 @@
 
 - (IBAction)startSpeechSampleText:(id)sender;
 - (IBAction)stopSpeechSampleText:(id)sender;
+- (IBAction)setVoicesByDefault:(id)sender;
 
 @end
--- SpeechClock_AppDelegate.m~04	2010-04-09 15:32:56.000000000 +0900
+++ SpeechClock_AppDelegate.m	2010-04-09 16:40:41.000000000 +0900
@@ -39,6 +39,22 @@
 {
   [speechSynthesizer stopSpeaking];
 }
+- (IBAction)setVoicesByDefault:(id)sender
+{
+  NSArray *voices = [NSSpeechSynthesizer availableVoices];
+  int i;
+  //[voicesArrayController willChangeValueForKey:@"voice"];	// unnecessary
+  [voicesArrayController setFilterPredicate:nil];
+  [voicesArrayController removeObjects:[voicesArrayController arrangedObjects]];
+  for (i=0; i<[voices count]; i++) {
+    NSManagedObject *voice =
+      [NSEntityDescription insertNewObjectForEntityForName:@"voices"
+			   inManagedObjectContext:[self managedObjectContext]];
+    [voice setValue:[voices objectAtIndex:i] forKey:@"voice"];
+    [voicesArrayController addObject:voice];
+  }
+  //[voicesArrayController didChangeValueForKey:@"voice"];	// unnecessary
+}
 
 - (void)speechSynthesizer:(NSSpeechSynthesizer *)sender
 	didFinishSpeaking:(BOOL)success

時計を表示 - キーバリューコーディング(KVC)とバインド、NSTimerとセレクタを体験

  1. 「SpeechClock_AppDelegate.h」と「SpeechClock_AppDelegate.m」をリスト5のように編集する。
  2. NSBox を Parent Window である「SpeechClock」Windowへ。タイトルは「Clock」とする。さらに、NSTextField を「Clock」Box へ。NSTextField のタイトルは「clock」とする。
  3. clock TextField の value バインドを Command+4 で「SpeechClock_AppDelegate、Controller Key はなし、Model Key Path は currentDate」とする。
  4. ここで、Xcode上で、Command+R で試験。clock TextField が時計として動いていれば成功。

リスト5

--- SpeechClock_AppDelegate.h~05	2010-04-09 16:03:43.000000000 +0900
+++ SpeechClock_AppDelegate.h	2010-04-09 21:55:22.000000000 +0900
@@ -16,6 +16,8 @@
     IBOutlet NSButton *startSpeechSampleTextButton;
     IBOutlet NSButton *stopSpeechSampleTextButton;
     IBOutlet NSArrayController *voicesArrayController;
+    NSDate *currentDate;
+    NSTimer *clockTimer;
     
     NSPersistentStoreCoordinator *persistentStoreCoordinator;
     NSManagedObjectModel *managedObjectModel;
@@ -32,4 +34,12 @@
 - (IBAction)stopSpeechSampleText:(id)sender;
 - (IBAction)setVoicesByDefault:(id)sender;
 
+#if !defined(MAC_OS_X_VERSION_10_5) || MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_5
+- (NSDate *)currentDate;
+- (void)setCurrentDate:(NSDate *)aCurrentDate;
+#else
+@property(readwrite, retain) NSDate *currentDate;
+#endif
+- (void)updateCurrentDate;
+
 @end
--- SpeechClock_AppDelegate.m~05	2010-04-09 16:40:41.000000000 +0900
+++ SpeechClock_AppDelegate.m	2010-04-09 21:56:09.000000000 +0900
@@ -15,12 +15,18 @@
   [super init];
   speechSynthesizer = [[NSSpeechSynthesizer alloc] initWithVoice:nil];
   [speechSynthesizer setDelegate:self];
+  currentDate = [[NSDate alloc] init];
   return self;
 }
 - (void)awakeFromNib
 {
   [startSpeechSampleTextButton setEnabled:!NO];
   [stopSpeechSampleTextButton setEnabled:!YES];
+  clockTimer = [[NSTimer scheduledTimerWithTimeInterval:1
+			 target:self
+			 selector:@selector(updateCurrentDate)
+			 userInfo:nil
+			 repeats:YES] retain];
 }
 - (IBAction)startSpeechSampleText:(id)sender
 {
@@ -56,6 +62,28 @@
   //[voicesArrayController didChangeValueForKey:@"voice"];	// unnecessary
 }
 
+#if !defined(MAC_OS_X_VERSION_10_5) || MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_5
+- (NSDate *)currentDate
+{
+  return currentDate;
+}
+- (void)setCurrentDate:(NSDate *)aCurrentDate
+{
+  [aCurrentDate retain];
+  [currentDate release];
+  currentDate = aCurrentDate;
+}
+#else
+@synthesize currentDate;
+#endif
+
+- (void)updateCurrentDate
+{
+  //[self willChangeValueForKey:@"currentDate"];	// unnecessary
+  [self setCurrentDate:[[NSDate alloc] init]];
+  //[self didChangeValueForKey:@"currentDate"];	// unnecessary
+}
+
 - (void)speechSynthesizer:(NSSpeechSynthesizer *)sender
 	didFinishSpeaking:(BOOL)success
 {
----

OpenMPI に関する略語

by taiji at 2010/03/10 16:05

OpenMPI に関する略語をまとめてみた。

  • OMPI: The Open Message Passing Interface
  • ORTE: The Open Run-Time Environment layer
  • OPAL: The Open Portable Access Layer
  • MCA: The Modular Component Architecture
  • DSO: Dynamic Shared Objects
  • OTF: The Open Trace Format

GAP(Groups, Algorithms, Programming)

by taiji at 2010/02/27 02:38

群論とかを扱えるものを探していたら、GAP という数式処理システムに遭遇。それを、ビルド&インストールしてみたときの備忘録。

  • `./configure --prefix=/opt/local/gap4 && make` でのビルドで問題ないけれども、`make install` でインストールということではなく、どうもビルドした作業ディレクトリが実行環境らしい。
  • なので、インストール時にビルドするという変則的な手続き `sudo mkdir -p /opt/local/gap4 && tar cf - . | (cd /opt/local/gap4 && sudo tar xvf - && sudo ./configure --prefix=/opt/local/gap4 && sudo make)` でインストールした。
  • Emacs から使えたら便利かもしれないので、Emacs GAP mode を試してみたが、特に便利じゃないので止めた。Maxima for Emacs (imaxima & imath-mode) 程の便利さを期待したのだが… 使い方が悪いのかもしれないので、次の機会にはちゃんと調べよう。

OpenMPI + Boost MPI

by taiji at 2010/02/26 13:44

Boost MPI から OpenMPI を使うとき、以下のように mpic++ でコンパイルすればよい。

$ mpic++ -L/opt/local/lib -lboost_mpi -lboost_serialization -o mpi00 mpi00.cc

これは以下のように g++ でコンパイルするのと同じである。

$ g++ `mpic++ -showme:compile` -L/opt/local/lib -lboost_mpi -lboost_serialization `mpic++ -showme:link` -o mpi00 mpi00.cc

とりあえず動かしてみるには:

$ mpirun -np 4 ./mpi00

(2010/03/10補足) いくつかのマシンで動かしてみるには:

$ mpirun -host mitsuha,kurao,ubagami -np 4 ./mpi00

この場合、すべてのマシンについて、同一のパスに同名のプログラムがあり、SSHが通る必要があり、また、ファイアーウォールの設定を見直す必要(Leopardではortedの受信接続を許可、TigerではTCPポート49152-65535を許可)がある。

ところで、OpenMPI をビルド&インストールするには `configure --prefix=/opt/local && make && sudo make install` で特に問題ない。一方、Boost は MPI は標準では有効になっていないので、`emacs tools/build/v2/user-config.jam` で以下の一行を追加する。

using mpi ;

あとは bjam --prefix=/opt/local && sudo bjam --prefix=/opt/local install のように通常通りビルド&インストールすればよい(--with-mpi は不要)。

(2010/03/10追記) OpenMPI のユニバーサルバイナリを構築する目的でクロスコンパイルしようとすると、

/usr/bin/ld: Undefined symbols:
_lt_libltdlc_LTX_preloaded_symbols
collect2: ld returned 1 exit status

とエラーとなってしまってビルドできない。解決策は不本意ながら、`configure --disable-dlopen` ぐらいしか見つからなかった。これだと MCA DSO が無効となる。