ここでは,RaSC上でユーザプログラムを並列に実行する手順を説明します. 起動可能なRaSCサービスを使用しますので, MessagePack RPCでユーザプログラムを呼び出す を実施して,RaSCサービスを起動しておきます.
サービス定義XMLを開いて,下記のように serviceプロパティを jp.go.nict.wisdom.wrapper.StdIOCommandService から jp.go.nict.wisdom.wrapper.StdIOCommandParallelArrayService に変更します. また,poolSize, initPoolSizeなどの設定を追加します.
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
<bean id="target"
class="jp.go.nict.langrid.servicecontainer.handler.TargetServiceFactory">
<property name="service">
<bean class="jp.go.nict.wisdom.wrapper.StdIOCommandParallelArrayService">
<property name="cmdLine" value="___PATH_TO_PROGRAM___" />
<property name="delimiterIn" value="\n" />
<property name="delimiterOut" value="EOS\n" />
<property name="delLastNewline" value="true" />
<property name="poolSize" value="8" />
<property name="initPoolSize" value="8" />
</bean>
</property>
</bean>
</beans>
jp.go.nict.wisdom.wrapper.StandardInputArrayParallelServiceは, TextAnalysisService.class インターフェースを実装しており, String[] analyzeArray(String[] input) を呼び出すと,poolSizeに設定された数だけユーザプログラムのプロセスを起動し,それぞれに入力の要素を割り当てます.上記設定ファイルの場合,poolSizeが8となっているので,最大8プロセスで並行して処理を行います.結果はinputに対応した順で配列に格納され,返却されます.
initPoolSizeは,RaSCサービスの起動時に,ユーザプログラムのプロセスを起動する数です.デフォルトではリクエストを受けてからプロセスを起動しますが,この設定により前もってプロセスが起動されるため,初回のリクエストを受けた際の応答を高速化できます.
サービス定義XMLを変更した場合は,サービスの再起動が必要です.サービスの起動と停止については, MessagePack RPCでユーザプログラムを呼び出す を参照してください.
パッケージに同梱されている SampleClient.java は,1件の入力を処理する analyze() を呼び出します.このプログラムを変更し,別ファイルからデータを読み込んで,1行を1件として analyzeArray() を呼び出すようにします. 以下に変更したプログラムを示します.
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import jp.go.nict.langrid.client.msgpackrpc.MsgPackClientFactory;
import jp.go.nict.wisdom.wrapper.api.TextAnalysisService;
public class SampleClient {
public static void main(String[] args) throws Exception {
MsgPackClientFactory factory = new MsgPackClientFactory();
TextAnalysisService client = factory.create(TextAnalysisService.class,
new InetSocketAddress(args[0], Integer.parseInt(args[1])));
BufferedReader br = null;
try{
br = new BufferedReader(new InputStreamReader(new FileInputStream(args[2])));
List<String> list = new ArrayList<String>();
String str;
while((str = br.readLine()) != null){
list.add(str);
}
String[] ret = client.analyzeArray(list.toArray(new String[0]));
for(String s : ret){
System.out.println(s);
}
}finally{
br.close();
}
factory.close();
}
}
初期化の部分は同様です.指定したファイルに記述された入力文をすべてコレクションに読み込み,配列にしてから, analyzeArray の引数に渡しています.入力における順序は出力でも保存されます.したがって,,実行結果は並列化しない場合と同じです.
コンパイルと実行については, MessagePack RPCでユーザプログラムを呼び出す を参照してください.
一般に,1回の処理に時間がかかる場合や処理件数が多いほど,並列化が有効に働く傾向があります.
RaSCはユーザプログラムの標準入力に書きこみを行いつつ,同時に標準出力からデータの取得を行っています.しかし,入力から出力までに時間がかかり,次データの入力は済んでいるが,処理中の為にデータの取得が行われないことがあります. このような場合,並列実行だと複数のプロセスが処理を行う事が出来るので,待ち時間が分散され,処理自体も並列で行われるのでパフォーマンスが向上します.
但し,RaSCサービス化したユーザプログラムが,別のRaSCサービスに接続し,接続先のサービスが処理をブロックするようなケースでは,並列化による速度向上は期待できません.
また,ユーザプログラムが入力1件あたりを非常に高速に処理する場合,並列処理やネットワーク通信のオーバヘッドのため,並列化してもやはり速度向上は期待できません. MessagePack RPCでユーザプログラムを呼び出す で用いた形態素解析エンジン MeCab では,1文あたりの処理が極めて短い時間で終了するため,ここで示した並列化では速度向上がみられなかったり,逆に低速になることがあります.
そこで,以下により計算負荷の高い処理を行う,構文解析システム KNP を用いた場合の実行時間を示します.並列化設定を行った場合と行わない場合で,ポートを変更して別々にサービスを起動し,500文の解析の実行時間を示しています.実行結果を見ると,大幅に実行時間が向上していることがわかります.
$ ./server.sh KNPService 19998 start # KNPを並列で実行するRaSCサービスを起動
$ time java -cp ./lib/*: SampleClient localhost 19998 ~/sentence_500.txt # 並列
real 0m29.402s
user 0m0.566s
sys 0m0.045s
$ ./server.sh KNPService 19999 start # KNPを非並列で実行するRaSCサービスを起動
$ time java -cp ./lib/*: SampleClient localhost 19999 ~/sentence_500.txt # 非並列
real 2m32.473s
user 0m0.901s
sys 0m0.167s
$ time cat INPUT_TXT | juman | knp > OUTPUT_TXT # ユーザプログラムを直接実行する場合 (非並列)
real 2m28.456s
user 2m17.557s
sys 0m1.011s
Note
上記は,Intel Xeon X5675*2 上で8並列の設定で実行しました.
なお,KNPのRaSCサービスとしての利用については, program_list や, 複数のユーザプログラムをパイプで接続してRaSC上で動作させるには をご覧ください.