(2019年9月)
RPC の実装の一つ, Apache Thrift を試した。Apache Thrift は、非常にポータブルなのが売り。
複数のクライアントからの呼び出しに対応できるように, non-blocking サーバを書いてみる。
- Fedora 30 Linux
- thrift-0.10.0-15.fc29.x86_64 -- fc30 パッケージはまだない。
結論: 簡易な用途には足りるが、機能性は CORBA よりだいぶ低く、完全な代替にはならない。また, Thrift v0.10 の C++ binding は, Java binding に比べて機能性が不十分.
1. IDL を書く
インタフェイスはIDLで定義する。ここから各プログラミング言語用の stub やスケルトンを生成する。
Apache Thrift - Interface Description Language (IDL)
Thrift の IDL ファイルは、拡張子を .thrift
とする。
my_service.thrift
// 列挙型.
enum City {
// 値を指定してもよい
Tokyo = 1,
NewYork = 2,
}
// 構造体. 継承はできない.
struct Person {
// 'field-id' で区別する。
// field-id: 型 名前. "required" は必須フィールド.
1: i32 id,
2: required string name,
5: set hobbies,
}
// 例外型は構造体とは別
exception MyException
{
1: i32 err_code,
2: string message
}
// 複数のサービスを書ける
service ClockService {
string get_time(1: City city)
}
service PeopleService {
void add(1: Person person),
// map, set, list が使える.
// 例外を送出できる
list find_by_name(1: string query) throws(1: MyException error),
i32 size()
}
インタフェイスは、service
として書く。継承はできない。構造体、例外はデータ型。
コンテナとして map
, set
, list
が使える。
いちいち field-id を明記する。
2. Stub コードを生成する
thrift コマンドで, プログラミング言語を指定して, stub コードを生成する。メジャな言語はだいたいサポートしている。すごくマイナな言語も生成できる。
ActionScript
Plain C with GLib
C++
C#
D
Dart
Delphi
Erlang
Go
Haskell
Java
JavaScript -- node.js, TypeScript
Lua
OCaml
Perl
PHP
Python
Ruby
Smalltalk
Swift
さらに, Thrift v0.12 から Common Lisp が加わった. v0.11 Rust.
-gen でプログラミング言語を指定。
$ thrift -gen cpp my_service.thrift
gen-cpp/
ディレクトリにソースコード、ヘッダが出力される。
ClockService.cpp PeopleService_server.skeleton.cpp
ClockService.h my_service_constants.cpp
ClockService_server.skeleton.cpp my_service_constants.h
PeopleService.cpp my_service_types.cpp
PeopleService.h my_service_types.h
ClockService_server.skeleton.cpp
ファイルは、サーバを書くための参考で、これ自体はコンパイル・リンクしない。
3. サーバ側を書く
次は、サーバを書く。
まずは、インタフェイスの実装クラスの宣言。
ファイル: clock_service_impl.h
C++
- #include "../gen-cpp/ClockService.h"
- #include <string>
-
-
-
- class ClockServiceImpl: virtual public ClockServiceIf
- {
- public:
- ClockServiceImpl();
- virtual ~ClockServiceImpl();
- virtual void get_time(std::string& _return, const City::type city) override;
-
- private:
- std::string _now;
- };
インタフェイスで戻り値として定義したものが、C++ binding では引数として渡される。
実装クラスを実装する。このクラスのインスタンスは、一つだけ生成され、クライアントを跨いで使いまわされる。メンバ変数の取扱いに注意しよう。
ファイル: clock_service_impl.cpp
C++
- #include <stdio.h>
- #include "../gen-cpp/ClockService.h"
- #include "clock_service_impl.h"
- using namespace std;
-
- ClockServiceImpl::ClockServiceImpl() {
- time_t t;
- tm t_st;
- char buf[100];
-
- time(&t);
- localtime_r(&t, &t_st);
- asctime_r(&t_st, buf);
-
- _now = buf;
- }
-
- ClockServiceImpl::~ClockServiceImpl() { }
-
-
- void ClockServiceImpl::get_time(string& _return, const City::type city) {
- printf("get_time()\n");
- sleep(5);
- _return = _now;
- }
引数である _return
に値をセットすることで、戻り値になる。
もう一つのインタフェイスの実装も、同様に。
people_service_impl.h
C++
- #include "../gen-cpp/PeopleService.h"
- #include <string>
- #include <vector>
-
- class PeopleServiceImpl: virtual public PeopleServiceIf
- {
- public:
- PeopleServiceImpl();
- virtual ~PeopleServiceImpl();
- virtual void add(const Person& person) override;
- virtual void find_by_name(std::vector<Person>& _return,
- const std::string& query) override;
- virtual int32_t size() override;
- };
インタフェイスの戻り値が primitive な型の場合は、実装でも戻り値として書く。
people_service_impl.cpp
C++
- #include <stdio.h>
- #include "../gen-cpp/PeopleService.h"
- #include "people_service_impl.h"
- using namespace std;
-
- PeopleServiceImpl::PeopleServiceImpl() { }
-
- PeopleServiceImpl::~PeopleServiceImpl() { }
-
- void PeopleServiceImpl::add(const Person& person) {
- printf("add()\n");
- }
-
- void PeopleServiceImpl::find_by_name(vector<Person>& _return,
- const string& query) {
- printf("find_by_name()\n");
- if (query == "tanaka") {
- Person r; r.id = 2; r.name = query; r.hobbies.insert("shogi");
- _return.push_back(r);
- }
- else
- throw MyException();
- }
-
- int32_t PeopleServiceImpl::size() {
- printf("size()\n");
- return 15;
- }
例外は、単に throw
で投げればいい。
メインループのコードを書く。blocking server と non-blocking server を切り替えられるようにしてみた。
Non-blocking server は、クライアントの接続ごとにスレッドを起動する。一つ一つのクライアントに対しては、同期したレスポンスを返す。
C++ binding については、この Thrift v0.10 では、非同期呼び出しは使いものにならない。Java binding では, org.apache.thrift.async.AsyncMethodCallback<>
のコールバック関数を与えることができる。
main.cpp
C++
- #include "../config.h"
-
- #include "people_service_impl.h"
- #include "clock_service_impl.h"
- #include <thrift/protocol/TBinaryProtocol.h>
- #include <thrift/transport/TBufferTransports.h>
- #include <thrift/processor/TMultiplexedProcessor.h>
- #ifdef NONBLOCKING
-
-
- #include <thrift/concurrency/ThreadManager.h>
- #include <thrift/concurrency/PosixThreadFactory.h>
- #include <thrift/server/TNonblockingServer.h>
- #else
- #include <thrift/transport/TServerSocket.h>
- #include <thrift/server/TSimpleServer.h>
- #endif
-
- using namespace apache::thrift;
-
- int main(int argc, char* argv[])
- {
- #ifdef _WIN32
- WSADATA wsa_data;
- WSAStartup(MAKEWORD(2, 2), &wsa_data);
- #endif
-
- const int port = 9090;
-
-
-
- boost::shared_ptr<PeopleServiceImpl> handler1(new PeopleServiceImpl());
- boost::shared_ptr<TProcessor> processor1(new PeopleServiceProcessor(handler1));
-
-
- boost::shared_ptr<ClockServiceImpl> handler2(new ClockServiceImpl());
- boost::shared_ptr<TProcessor> processor2(new ClockServiceProcessor(handler2));
-
-
- boost::shared_ptr<protocol::TProtocolFactory>
- protocolFactory(new protocol::TBinaryProtocolFactory());
-
- #ifdef NONBLOCKING
-
- boost::shared_ptr<concurrency::ThreadManager> threadManager =
- concurrency::ThreadManager::newSimpleThreadManager(15);
- boost::shared_ptr<concurrency::PosixThreadFactory>
- threadFactory(new concurrency::PosixThreadFactory());
- threadManager->threadFactory(threadFactory);
- threadManager->start();
- #else
-
- boost::shared_ptr<transport::TServerTransport>
- serverTransport(new transport::TServerSocket(port));
-
-
- boost::shared_ptr<transport::TTransportFactory>
- transportFactory(new transport::TFramedTransportFactory());
- #endif
-
-
- boost::shared_ptr<TMultiplexedProcessor>
- mp_processor(new TMultiplexedProcessor());
- mp_processor->registerProcessor("clock", processor2);
- mp_processor->registerProcessor("people", processor1);
-
- #ifdef NONBLOCKING
-
- server::TNonblockingServer server(mp_processor, protocolFactory, port,
- threadManager);
- #else
-
- server::TSimpleServer server(mp_processor, serverTransport, transportFactory,
- protocolFactory);
- #endif
- server.serve();
-
- return 0;
- }
プロセサ、プロトコル、トランスポートを組み合わせることで、いろいろな形式で通信することができるようになっている。
一つのサーバで複数のサービスを提供するのは普通。にも関わらず, TNonblockingServer
または TSimpleServer
インスタンスには、processor は一つしか渡すことができない。
TMultiplexedProcessor
を間にはさむことで、名前でサービスを振り分けられる。
4. クライアント側を書く
次は、クライアント。ファイルは一つだけ。
こちらも、サービスを名前で呼び出すために, TMultiplexedProtocol
を通す。あまり作りがよくない。
C++
- #include "../config.h"
-
- #include "../gen-cpp/ClockService.h"
- #include "../gen-cpp/PeopleService.h"
- #include <iostream>
- #include <thrift/protocol/TBinaryProtocol.h>
- #include <thrift/transport/TSocket.h>
- #include <thrift/transport/TTransportUtils.h>
- #include <thrift/protocol/TMultiplexedProtocol.h>
- using namespace std;
- using namespace apache::thrift;
-
- int main(int argc, char* argv[])
- {
- const string host = "localhost";
- const int port = 9090;
-
- boost::shared_ptr<transport::TTransport> socket(
- new transport::TSocket(host, port));
-
-
- boost::shared_ptr<transport::TTransport> transport(
- new transport::TFramedTransport(socket));
- boost::shared_ptr<protocol::TProtocol> protocol(
- new protocol::TBinaryProtocol(transport));
-
- transport->open();
-
-
- boost::shared_ptr<protocol::TMultiplexedProtocol> mp1(
- new protocol::TMultiplexedProtocol(protocol, "clock"));
-
-
- ClockServiceClient client1(mp1);
- string r;
- client1.get_time(r, City::Tokyo);
- cout << r << "\n";
-
- boost::shared_ptr<protocol::TMultiplexedProtocol> mp2(
- new protocol::TMultiplexedProtocol(protocol, "people"));
- PeopleServiceClient client2(mp2);
- vector<Person> s;
- client2.find_by_name(s, "tanaka"); cout << *s.begin()->hobbies.begin() << "\n";
- cout << client2.size() << "\n";
-
- try {
- s.clear();
- client2.find_by_name(s, "sato");
- } catch (MyException& e) {
- printf("catch a MyException!!\n");
- }
-
- transport->close();
- return 0;
- }
サーバ側の例外は、クライアントでは単純に catch
で受け取ることができる。
5. 実行結果
$ ./server
Thrift: Mon Sep 2 20:35:56 2019 TNonblockingServer: Serving on port 9090, 1 io threads.
Thrift: Mon Sep 2 20:35:56 2019 TNonblockingServer: using libevent 2.1.8-stable method epoll
Thrift: Mon Sep 2 20:35:56 2019 TNonblocking: IO thread #0 registered for listen.
Thrift: Mon Sep 2 20:35:56 2019 TNonblocking: IO thread #0 registered for notify.
Thrift: Mon Sep 2 20:35:56 2019 TNonblockingServer: IO thread #0 entering loop...
クライアントを実行。
$ ./client
Mon Sep 2 20:35:56 2019
shogi
15
catch a MyException!!
例外の捕捉もできている。OK!!