Thrift RPC: Non-blockingサーバのサンプル [C++]

(2019年9月)

RPC の実装の一つ, Apache Thrift を試した。Apache Thrift は、非常にポータブルなのが売り。

複数のクライアントからの呼び出しに対応できるように, non-blocking サーバを書いてみる。

  • Fedora 30 Linux
  • thrift-0.10.0-15.fc29.x86_64 -- fc30 パッケージはまだない。

結論: 簡易な用途には足りるが、機能性は CORBA よりだいぶ低く、完全な代替にはならない。また, Thrift v0.10 の C++ binding は, Java binding に比べて機能性が不十分.

1. IDL を書く

インタフェイスはIDLで定義する。ここから各プログラミング言語用の stub やスケルトンを生成する。

Apache Thrift - Interface Description Language (IDL)

Thrift の IDL ファイルは、拡張子を .thrift とする。

my_service.thrift

// 列挙型.
enum City {
  // 値を指定してもよい
  Tokyo = 1,
  NewYork = 2,
}

// 構造体. 継承はできない.
struct Person {
  // 'field-id' で区別する。
  // field-id: 型 名前. "required" は必須フィールド.
  1: i32         id,
  2: required    string name,
  5: set hobbies,
}

// 例外型は構造体とは別
exception MyException
{
  1: i32    err_code,
  2: string message
}

// 複数のサービスを書ける
service ClockService {
  string get_time(1: City city)
}

service PeopleService {
  void add(1: Person person),
  // map, set, list が使える.
  // 例外を送出できる
  list find_by_name(1: string query) throws(1: MyException error),
  i32 size()
}

インタフェイスは、service として書く。継承はできない。構造体、例外はデータ型。

コンテナとして map, set, list が使える。

いちいち field-id を明記する。

2. Stub コードを生成する

thrift コマンドで, プログラミング言語を指定して, stub コードを生成する。メジャな言語はだいたいサポートしている。すごくマイナな言語も生成できる。

ActionScript Plain C with GLib C++ C# D Dart Delphi Erlang Go Haskell Java JavaScript -- node.js, TypeScript Lua OCaml Perl PHP Python Ruby Smalltalk Swift

さらに, Thrift v0.12 から Common Lisp が加わった. v0.11 Rust.

-gen でプログラミング言語を指定。

$ thrift -gen cpp my_service.thrift

gen-cpp/ ディレクトリにソースコード、ヘッダが出力される。

ClockService.cpp PeopleService_server.skeleton.cpp ClockService.h my_service_constants.cpp ClockService_server.skeleton.cpp my_service_constants.h PeopleService.cpp my_service_types.cpp PeopleService.h my_service_types.h

ClockService_server.skeleton.cpp ファイルは、サーバを書くための参考で、これ自体はコンパイル・リンクしない。

3. サーバ側を書く

次は、サーバを書く。

まずは、インタフェイスの実装クラスの宣言。

ファイル: clock_service_impl.h

C++
[RAW]
  1. #include "../gen-cpp/ClockService.h"
  2. #include <string>
  3. // サービス実装クラス. '...If' クラスから派生する
  4. // インスタンスは1回だけ生成され、呼び出しごとに生成されるわけではない.
  5. class ClockServiceImpl: virtual public ClockServiceIf
  6. {
  7. public:
  8. ClockServiceImpl();
  9. virtual ~ClockServiceImpl();
  10. virtual void get_time(std::string& _return, const City::type city) override;
  11. private:
  12. std::string _now;
  13. };

インタフェイスで戻り値として定義したものが、C++ binding では引数として渡される。

実装クラスを実装する。このクラスのインスタンスは、一つだけ生成され、クライアントを跨いで使いまわされる。メンバ変数の取扱いに注意しよう。

ファイル: clock_service_impl.cpp

C++
[RAW]
  1. #include <stdio.h>
  2. #include "../gen-cpp/ClockService.h"
  3. #include "clock_service_impl.h"
  4. using namespace std;
  5. ClockServiceImpl::ClockServiceImpl() {
  6. time_t t;
  7. tm t_st;
  8. char buf[100];
  9. time(&t);
  10. localtime_r(&t, &t_st);
  11. asctime_r(&t_st, buf);
  12. _now = buf;
  13. }
  14. ClockServiceImpl::~ClockServiceImpl() { }
  15. // 戻り値は引数になっている
  16. void ClockServiceImpl::get_time(string& _return, const City::type city) {
  17. printf("get_time()\n");
  18. sleep(5);
  19. _return = _now;
  20. }

引数である _return に値をセットすることで、戻り値になる。

もう一つのインタフェイスの実装も、同様に。

people_service_impl.h

C++
[RAW]
  1. #include "../gen-cpp/PeopleService.h"
  2. #include <string>
  3. #include <vector>
  4. class PeopleServiceImpl: virtual public PeopleServiceIf
  5. {
  6. public:
  7. PeopleServiceImpl();
  8. virtual ~PeopleServiceImpl();
  9. virtual void add(const Person& person) override;
  10. virtual void find_by_name(std::vector<Person>& _return,
  11. const std::string& query) override;
  12. virtual int32_t size() override;
  13. };

インタフェイスの戻り値が primitive な型の場合は、実装でも戻り値として書く。

people_service_impl.cpp

C++
[RAW]
  1. #include <stdio.h>
  2. #include "../gen-cpp/PeopleService.h"
  3. #include "people_service_impl.h"
  4. using namespace std;
  5. PeopleServiceImpl::PeopleServiceImpl() { }
  6. PeopleServiceImpl::~PeopleServiceImpl() { }
  7. void PeopleServiceImpl::add(const Person& person) {
  8. printf("add()\n");
  9. }
  10. void PeopleServiceImpl::find_by_name(vector<Person>& _return,
  11. const string& query) {
  12. printf("find_by_name()\n");
  13. if (query == "tanaka") {
  14. Person r; r.id = 2; r.name = query; r.hobbies.insert("shogi");
  15. _return.push_back(r);
  16. }
  17. else
  18. throw MyException(); // 例外のテスト
  19. }
  20. int32_t PeopleServiceImpl::size() {
  21. printf("size()\n");
  22. return 15;
  23. }

例外は、単に throw で投げればいい。

メインループのコードを書く。blocking server と non-blocking server を切り替えられるようにしてみた。

Non-blocking server は、クライアントの接続ごとにスレッドを起動する。一つ一つのクライアントに対しては、同期したレスポンスを返す。

C++ binding については、この Thrift v0.10 では、非同期呼び出しは使いものにならない。Java binding では, org.apache.thrift.async.AsyncMethodCallback<> のコールバック関数を与えることができる。

main.cpp

C++
[RAW]
  1. #include "../config.h"
  2. #include "people_service_impl.h"
  3. #include "clock_service_impl.h"
  4. #include <thrift/protocol/TBinaryProtocol.h>
  5. #include <thrift/transport/TBufferTransports.h>
  6. #include <thrift/processor/TMultiplexedProcessor.h>
  7. #ifdef NONBLOCKING
  8. // Needs /usr/include/event.h
  9. // Do not use 'libev-libevent-devel' package. Use 'libevent-devel'.
  10. #include <thrift/concurrency/ThreadManager.h>
  11. #include <thrift/concurrency/PosixThreadFactory.h>
  12. #include <thrift/server/TNonblockingServer.h>
  13. #else
  14. #include <thrift/transport/TServerSocket.h>
  15. #include <thrift/server/TSimpleServer.h>
  16. #endif
  17. using namespace apache::thrift;
  18. int main(int argc, char* argv[])
  19. {
  20. #ifdef _WIN32
  21. WSADATA wsa_data;
  22. WSAStartup(MAKEWORD(2, 2), &wsa_data);
  23. #endif
  24. const int port = 9090;
  25. // 一つ目のサービス
  26. // サービス実装クラスのインタンスは、1回だけ生成され、呼び出ごとに生成されるわけではない.
  27. boost::shared_ptr<PeopleServiceImpl> handler1(new PeopleServiceImpl());
  28. boost::shared_ptr<TProcessor> processor1(new PeopleServiceProcessor(handler1));
  29. // 2つ目
  30. boost::shared_ptr<ClockServiceImpl> handler2(new ClockServiceImpl());
  31. boost::shared_ptr<TProcessor> processor2(new ClockServiceProcessor(handler2));
  32. // メッセージ形式
  33. boost::shared_ptr<protocol::TProtocolFactory>
  34. protocolFactory(new protocol::TBinaryProtocolFactory());
  35. #ifdef NONBLOCKING
  36. // non-blocking server
  37. boost::shared_ptr<concurrency::ThreadManager> threadManager =
  38. concurrency::ThreadManager::newSimpleThreadManager(15);
  39. boost::shared_ptr<concurrency::PosixThreadFactory>
  40. threadFactory(new concurrency::PosixThreadFactory());
  41. threadManager->threadFactory(threadFactory);
  42. threadManager->start();
  43. #else
  44. // blocking server
  45. boost::shared_ptr<transport::TServerTransport>
  46. serverTransport(new transport::TServerSocket(port));
  47. // transport::TBufferedTransport は、サーバ側が non-blocking のときは動かない.
  48. // blocking のときも, transport::TFramedTransport を使うようにする.
  49. boost::shared_ptr<transport::TTransportFactory>
  50. transportFactory(new transport::TFramedTransportFactory());
  51. #endif
  52. // 名前で振り分けるためのプロセサ
  53. boost::shared_ptr<TMultiplexedProcessor>
  54. mp_processor(new TMultiplexedProcessor());
  55. mp_processor->registerProcessor("clock", processor2);
  56. mp_processor->registerProcessor("people", processor1);
  57. #ifdef NONBLOCKING
  58. // 複数のクライアントは並列に捌くが、一つ一つのクライアントからの呼び出しは直列に処理.
  59. server::TNonblockingServer server(mp_processor, protocolFactory, port,
  60. threadManager);
  61. #else
  62. // サーバ
  63. server::TSimpleServer server(mp_processor, serverTransport, transportFactory,
  64. protocolFactory);
  65. #endif
  66. server.serve();
  67. return 0;
  68. }

プロセサ、プロトコル、トランスポートを組み合わせることで、いろいろな形式で通信することができるようになっている。

一つのサーバで複数のサービスを提供するのは普通。にも関わらず, TNonblockingServer または TSimpleServer インスタンスには、processor は一つしか渡すことができない。

TMultiplexedProcessor を間にはさむことで、名前でサービスを振り分けられる。

4. クライアント側を書く

次は、クライアント。ファイルは一つだけ。

こちらも、サービスを名前で呼び出すために, TMultiplexedProtocol を通す。あまり作りがよくない。

C++
[RAW]
  1. #include "../config.h"
  2. #include "../gen-cpp/ClockService.h" // From generated code
  3. #include "../gen-cpp/PeopleService.h" // From generated code
  4. #include <iostream>
  5. #include <thrift/protocol/TBinaryProtocol.h>
  6. #include <thrift/transport/TSocket.h>
  7. #include <thrift/transport/TTransportUtils.h>
  8. #include <thrift/protocol/TMultiplexedProtocol.h>
  9. using namespace std;
  10. using namespace apache::thrift;
  11. int main(int argc, char* argv[])
  12. {
  13. const string host = "localhost";
  14. const int port = 9090; // The port on which server is listening
  15. boost::shared_ptr<transport::TTransport> socket(
  16. new transport::TSocket(host, port));
  17. // transport::TBufferedTransport は、サーバ側が non-blocking のときは動かない.
  18. // 原因は不明.
  19. boost::shared_ptr<transport::TTransport> transport(
  20. new transport::TFramedTransport(socket));
  21. boost::shared_ptr<protocol::TProtocol> protocol(
  22. new protocol::TBinaryProtocol(transport));
  23. transport->open();
  24. // 名前で振り分ける
  25. boost::shared_ptr<protocol::TMultiplexedProtocol> mp1(
  26. new protocol::TMultiplexedProtocol(protocol, "clock"));
  27. // ClockServiceClient か ClockServiceConcurrentClient のいずれか.
  28. // ClockServiceConcurrentClient はスレッドセーフ.
  29. ClockServiceClient client1(mp1);
  30. string r;
  31. client1.get_time(r, City::Tokyo); // ここで、長い処理は待つ.
  32. cout << r << "\n";
  33. boost::shared_ptr<protocol::TMultiplexedProtocol> mp2(
  34. new protocol::TMultiplexedProtocol(protocol, "people"));
  35. PeopleServiceClient client2(mp2);
  36. vector<Person> s;
  37. client2.find_by_name(s, "tanaka"); cout << *s.begin()->hobbies.begin() << "\n";
  38. cout << client2.size() << "\n";
  39. try {
  40. s.clear();
  41. client2.find_by_name(s, "sato"); // 例外発生
  42. } catch (MyException& e) {
  43. printf("catch a MyException!!\n");
  44. }
  45. transport->close();
  46. return 0;
  47. }

サーバ側の例外は、クライアントでは単純に catch で受け取ることができる。

5. 実行結果

$ ./server 
Thrift: Mon Sep  2 20:35:56 2019 TNonblockingServer: Serving on port 9090, 1 io threads.
Thrift: Mon Sep  2 20:35:56 2019 TNonblockingServer: using libevent 2.1.8-stable method epoll
Thrift: Mon Sep  2 20:35:56 2019 TNonblocking: IO thread #0 registered for listen.
Thrift: Mon Sep  2 20:35:56 2019 TNonblocking: IO thread #0 registered for notify.
Thrift: Mon Sep  2 20:35:56 2019 TNonblockingServer: IO thread #0 entering loop...

クライアントを実行。

$ ./client 
Mon Sep  2 20:35:56 2019

shogi
15
catch a MyException!!

例外の捕捉もできている。OK!!