第4回 libuv C10K〜C100K サーバサンプル [C++]

C10K問題 (C10K problem) を乗り越えるため, とりあえず 1,000 同時接続を超えるサンプル.

同時接続数とパフォーマンス向上の技術

POSIX API

select(2) は上限が低い. 待ち合わせする file descriptor の「値」の上限が FD_SETSIZE マクロで決まる。

OS によっては、アプリケィションのコンパイル時に, FD_SETSIZE マクロ値をコンパイラオプションで与えて変更することで、上限を引き上げられる。FreeBSD はそのようになっている。

しかし、Linux は 1024 で完全に決め打ちになっている。<sys/select.h> (glibc 提供ファイル) から参照される <linux/posix_types.h> (Linux kernel のファイル) で, __FD_SETSIZE も決め打ちになっている。

File descriptor はソケットだけでなく, 標準入力 = 0, ... のようにファイルを開いても消費される。そのため、せいぜい 1,000 程度しか待ち受けできない。同時接続数が不明な場合は select() を使ってはならない。

POSIX 標準では poll(2) が 1,000 接続の上限を超えて待ち受けできる。しかし、インタフェイスの設計がよくなく、パフォーマンスは select() 程度。

各OSの取組み

Linux での非同期I/O手法とそれらのパフォーマンス: Asynchronous C++ History of Time マルチスレッドでも 10,000 接続ぐらいまではイケる。Linux epoll はよい。

それぞれの OS がバラバラな方法で、多くの接続を「効率的に」待ち受けできるように新しいAPIを導入した。ソケットはカーネルで実装されるので、通常、システムコールとして提供される.

  • UNIX
    • Linux 2.6+ epoll(7) システムコール. epoll_create(2)
    • Solaris7 11/99+ poll(7d). /dev/poll デヴァイス. HP-UX でも実装されている. dvpoll 構造体.
    • Solaris10 Event Ports イベント. port_create(). 問題あり, /dev/poll を使うべきか?
    • FreeBSD 4.1以降 kqueue(2): kqueue(), kevent() システムコール. NetBSD 1.6.1 (2003年), NetBSD 2.0 (2004年) 以降, OpenBSD 2.9以降でも実装.
    • AIX pollset API. pollset_create() など.
  • Windows I/O Completion Port (IOCP). コピー回数が少ない. スレッドを効率的に動かす.

ポータブルなライブラリ

てんでバラバラなので、アプリケィション開発には, これらをカバーするライブラリを使おう。動かしたい環境 (の幅) とオーバヘッドの少なさでどれを使うべきか決まる。

  1. libev - a full-featured and high-performance event loop. UNIX 限定であればこれ. パフォーマンスも各OSのシステムコールを直接叩くのと遜色ない.
  2. libuv - a cross-platform asynchronous I/O. Windows でも動かしたい場合はこれ。パフォーマンスは libev より劣るが libevent よりはよい.
  3. POCO C++ Libraries - 組込みOS (embedded OS) でも動く. 軽いライブラリが必要な場合はこれ。パフォーマンスは libevent = POCO = asio で, libev, libuv より劣る.
  4. ▲ libevent - 古い. 新しいプロジェクトでは libev を使え.
  5. ▲ (non-Boost) Asio and Boost.Asio.

このほか, libhv というのもあって, Windows IOCP もサポートしつつ、libev と同等のパフォーマンスと主張している: GitHub - ithewei/libhv: 🔥 比libevent、libuv更易用的网络库。A c/c++ network library for developing TCP/UDP/SSL/HTTP/WebSocket/MQTT client/server. Fedora 37, Debian 11 にはパッケージがない。試していない。

Current or soft limit

以下で作るサンプルは、そのまま実行してもエラーになる。Soft limits がログイン端末ごとに設定されている。ulimit コマンドで表示できる (open files が 1,024 になっている)。標準入出力なども file descriptor を消費するので、これで 1,000 を少し超えた辺りでエラーになる。

$ ulimit -a
real-time non-blocking time  (microseconds, -R) unlimited
core file size              (blocks, -c) unlimited
data seg size               (kbytes, -d) unlimited
scheduling priority                 (-e) 0
file size                   (blocks, -f) unlimited
pending signals                     (-i) 23404
max locked memory           (kbytes, -l) 8192
max memory size             (kbytes, -m) unlimited
open files                          (-n) 1024
pipe size                (512 bytes, -p) 8
POSIX message queues         (bytes, -q) 819200
real-time priority                  (-r) 0
stack size                  (kbytes, -s) 8192
cpu time                   (seconds, -t) unlimited
max user processes                  (-u) 23404
virtual memory              (kbytes, -v) unlimited
file locks                          (-x) unlimited

サーバ側の実行前に $ ulimit -S -n 110000 などしておく.

同時接続数 10,000 (C10K) 〜 C100K サーバ

libuv で C10K サーバを作ってみる。たかだか一つだけのクライアントから接続を受け付けるサンプルは Web 上にいくらでも見つかるが、複数接続で C++ のサンプルは、意外と見当たらない。

Echo server を作る。

main() から見ていこう.

server-common.cpp

C++
[RAW]
  1. constexpr int PORT = 7000;
  2. constexpr int BACKLOG = 100;
  3. // イベントループ. グローバル.
  4. uv_loop_t* g_loop = NULL;
  5. // サーバ. グローバル.
  6. uv_tcp_t* g_server = NULL;
  7. // これをサンプルで切り替える.
  8. extern void on_connection( uv_stream_t* server, int status );
  9. int main()
  10. {
  11. struct rlimit rlim;
  12. getrlimit(RLIMIT_NOFILE, &rlim);
  13. printf("soft = %ld, hard = %ld, FD_SETSIZE = %d\n",
  14. rlim.rlim_cur, rlim.rlim_max, FD_SETSIZE);
  15. // イベントループ
  16. g_loop = uv_default_loop();
  17. g_server = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
  18. uv_tcp_init( g_loop, g_server ); // まだソケットは作られない.
  19. printf("Waiting port = %d\n", PORT);
  20. int r = tcp_setup_server(g_server, NULL, PORT);
  21. if (r) {
  22. fprintf(stderr, "tcp_setup_server() failed.\n");
  23. return 1;
  24. }
  25. r = uv_listen( (uv_stream_t*) g_server, BACKLOG, on_connection );
  26. if (r) {
  27. // Error
  28. fprintf( stderr, "uv_listen() failed: %s\n", uv_strerror(r) );
  29. return 1;
  30. }
  31. // ループ開始
  32. r = uv_run(g_loop, UV_RUN_DEFAULT);
  33. printf("Exit uv_run()\n"); // uv_close(server) でループを抜ける.
  34. return r;
  35. }

uv_default_loop() でイベントループを得る.

ソケットを wrap する uv_tcp_t 変数をイベントループに結びつける。libuv はイベントループが中心になる。ソケットを tcp_setup_server() 内で bind し, listen して接続を受け付けたら on_connection() をコールバックさせる.

uv_run() でループ開始する。

次に, tcp_setup_server() 関数. ソケットを開いて bind する. uv_tcp_t のメソッドである uv_tcp_bind() は, BSDソケット関数の socket(), bind() を一体にしたもの。

server-common.cpp

C++
[RAW]
  1. #include <stdio.h>
  2. #include <assert.h>
  3. #include <string.h>
  4. #include <uv.h>
  5. #include <sys/resource.h> // getrlimit()
  6. #include <stdlib.h>
  7. /**
  8. * TCP で bind() までを行う. IPv6対応.
  9. * uv_stream_t 型を基底クラスとして, uv_tcp_t, uv_pipe_t および uv_tty_t が派生
  10. * クラス.
  11. * @param node bind() するホスト名. NULL の場合, INADDR_ANY, IN6ADDR_ANY_INIT.
  12. * @param port ポート番号. 0 の場合, 空いているポートを割り当てる.
  13. *
  14. * @return 成功 = 0, エラーの場合 = -1
  15. */
  16. int tcp_setup_server(uv_tcp_t* server, const char* node, int port)
  17. {
  18. assert(server);
  19. if (port < 0)
  20. return -1;
  21. // uv_ip4_addr() を使うサンプルは古い!!
  22. struct addrinfo hints;
  23. struct addrinfo* res = NULL;
  24. memset(&hints, 0, sizeof hints);
  25. if (!node)
  26. hints.ai_family = AF_INET6; // IPv4/IPv6両対応
  27. hints.ai_socktype = SOCK_STREAM;
  28. // AI_PASSIVE をセットして node = NULLのときは, INADDR_ANY, IN6ADDR_ANY_INIT.
  29. hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV;
  30. // (node, service) の両方を NULL にしてはいけない.
  31. char service[11];
  32. sprintf(service, "%d", port);
  33. int err = getaddrinfo(node, service, &hints, &res);
  34. if (err != 0) {
  35. fprintf( stderr, "getaddrinfo() failed: %s\n", gai_strerror(err) );
  36. return -1;
  37. }
  38. // socket(), bind() を一体にしたもの.
  39. int r = uv_tcp_bind(server, res->ai_addr, 0);
  40. freeaddrinfo(res);
  41. if (r) {
  42. fprintf(stderr, "uv_tcp_bind() failed: %s\n", uv_strerror(r) );
  43. return -1;
  44. }
  45. return 0;
  46. }

コールバックされる on_connection() は次のようにする。

よくあるサンプルでは, uv_accept() で得たソケットをグローバル変数に格納したりする。それでは一つのクライアントからの接続にしか対応できない。ClientSession クラスを作って、配列に格納するようにする。

multi-tcp-server.cpp

C++
[RAW]
  1. map<uv_tcp_t*, class ClientSession*> ClientSession::_session_list;
  2. void on_connection( uv_stream_t* server, int status )
  3. {
  4. if ( status != 0 ) {
  5. // エラーの場合, エラー名を得る uv_err_name() or エラーメッセージを得る
  6. // uv_strerror() を使う.
  7. fprintf(stderr, "New connection error: %s\n", uv_strerror(status) );
  8. return;
  9. }
  10. uv_tcp_t* client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
  11. uv_tcp_init( g_loop, client );
  12. int r = uv_accept(server, (uv_stream_t*) client);
  13. if (r) {
  14. fprintf(stderr, "uv_accept() failed: %s\n", uv_strerror(r) );
  15. free(client);
  16. return;
  17. }
  18. ClientSession* session = ClientSession::new_instance(client); // client所有渡す
  19. r = session->read_start();
  20. assert( r == 0 );
  21. }

BSD ソケットの accept() を wrap する uv_accept() で, 新しくクライアントとの接続用のソケットができる。

その新しいソケットで, 読み取り可能になったときに呼び出されるコールバック関数を登録、そのまま待ち受け開始する。ClientSession クラスの read_start() までを作る.

C++
[RAW]
  1. class ClientSession: private noncopyable
  2. {
  3. // uv_tcp_t < uv_stream_t < uv_handle_t
  4. uv_tcp_t* m_handle; // 所有する
  5. static map<uv_tcp_t*, class ClientSession*> _session_list;
  6. protected:
  7. ClientSession(uv_tcp_t* h): m_handle(h) {
  8. assert( h != NULL );
  9. }
  10. public:
  11. static ClientSession* new_instance(uv_tcp_t* h) {
  12. assert(h);
  13. ClientSession* self = new ClientSession(h);
  14. _session_list.insert(make_pair(h, self));
  15. return self;
  16. }
  17. ~ClientSession() {
  18. assert(m_handle);
  19. _session_list.erase(m_handle);
  20. free(m_handle); m_handle = NULL;
  21. }
  22. // @return If error, non-zero
  23. int read_start() {
  24. assert( m_handle != NULL );
  25. // Read data from an incoming stream. alloc_buffer() で領域を確保する.
  26. return uv_read_start( (uv_stream_t*) m_handle, _alloc_buffer,
  27. (uv_read_cb) _on_read );
  28. }

ハンドルに対してコールバック関数を uv_read_start() 関数で登録すればOK.

次のコールバック関数を書いていく。配列のどのセッションか特定する _on_read() と, 実際のコールバック関数の on_receive() を書く.

C++
[RAW]
  1. /**
  2. * Callback.
  3. * EOF の場合, uv__stream_eof() から呼び出される. 第2引数が UV_EOF.
  4. * この場合も, 領域確保済み.
  5. * それ以外, uv__read() から呼び出される.
  6. * @param nread < 0 の場合, EOF or エラー. > 0 の場合, 読み出せる.
  7. * == 0 の場合, ソケットが non-blocking で読み出せるデータなし.
  8. */
  9. static void _on_read( uv_stream_t* client, ssize_t nread, uv_buf_t* buf ) {
  10. ClientSession* v = _session_list.at((uv_tcp_t*) client);
  11. v->on_receive(nread, buf);
  12. // エラーの場合も, 領域を解放しなければならない. ●●重い処理.
  13. if (buf->base) {
  14. free(buf->base); buf->base = NULL;
  15. }
  16. }

libuv の特徴は、読み込み可能になったらコールバックされるのではなく、Windows 流に合わせて、読み込みが完了してからコールバックされる.

C++
[RAW]
  1. void on_receive(ssize_t nread, const uv_buf_t* buf) {
  2. assert(buf);
  3. if (nread < 0) {
  4. if (nread != UV_EOF) {
  5. fprintf(stderr, "recv() EOF or error: %s\n", uv_err_name(nread) );
  6. }
  7. // If error, *user* should call uv_close().
  8. shutdown();
  9. }
  10. else if (nread > 0) {
  11. if (buf->base[0] == 'X') {
  12. printf("Get client shutdown message.\n");
  13. shutdown();
  14. }
  15. else if (buf->base[0] == 'Z') {
  16. printf("Get server shutdown message.\n");
  17. server_shutdown();
  18. }
  19. else {
  20. // 書き込みもややこしい.
  21. uv_write_t* req = (uv_write_t*) malloc(sizeof(uv_write_t));
  22. uv_buf_t wbuf = uv_buf_init(buf->base, nread); // ローカル変数でよい.
  23. uv_write(req, (uv_stream_t*) m_handle, &wbuf, 1, _after_write);
  24. }
  25. }
  26. }

_alloc_buffer() で, 毎回, メモリを確保する。uv_read_start() でコールバックを登録していた。

C++
[RAW]
  1. /**
  2. * Callback. stream->alloc_cb(stream, 64 * 1024, &buf) の形で呼び出される.
  3. * @param handle stream
  4. * @param suggested_size 64 * 1024 固定.
  5. * @param [out] buf バッファを格納. .base == NULL || .len == 0 で戻すと,
  6. * read_cb() に UV_ENOBUFS エラーを通知.
  7. */
  8. static void _alloc_buffer( uv_handle_t* handle, size_t suggested_size,
  9. uv_buf_t* buf)
  10. {
  11. //printf("alloc_buffer() called: .\n");
  12. buf->base = (char*) malloc(suggested_size); // ●●重い! 自分で管理が必要.
  13. buf->len = suggested_size;
  14. }

1回の読み込みごとにメモリを解放するようになっており、非常に重い。完全に固定長のバッファなので、別にメモリプールを作るとパフォーマンスが上がる可能性が高い。

クライアント

次に、クライアント. こちらは libuv を使っていない。少ない台数でテストできるようにするため, 子プロセスをドンドン走らせていく。

Linux で試したところ、100 プロセスにすると、接続に失敗する子プロセスが発生する。別のパラメータの上限がありそう. -> Ephemeral port (エフェメラルポート) を使い果たしたエラー.

接続エラーになる少し前の状態。この後にポートを使い果たしたか。

$ ss -tan | awk '{print $1}' | sort | uniq -c
 107693 ESTAB
      8 FIN-WAIT-2
      9 LISTEN
      1 State
    233 TIME-WAIT
C++
[RAW]
  1. constexpr int CLIENT_SIZE = 1000;
  2. // C100K => 50 は OK. 100 にすると失敗発生しだす.
  3. // connect() failed: Cannot assign requested address
  4. // Addr is...: AF_INET6 [::1]:7000
  5. constexpr int CHILD_NUMBER = 100;
  6. // @return 0 が成功.
  7. int make_children(const char* argv0, const char* hostname)
  8. {
  9. assert(hostname);
  10. #ifndef _WIN32
  11. struct rlimit rlim;
  12. getrlimit(RLIMIT_NOFILE, &rlim);
  13. printf("soft = %ld, hard = %ld, FD_SETSIZE = %d\n",
  14. rlim.rlim_cur, rlim.rlim_max, FD_SETSIZE);
  15. #endif
  16. for (int k = 0; k < CHILD_NUMBER; k++) {
  17. #ifndef _WIN32
  18. // UNIX: fork() + exec*() が伝統的な方法. posix_spawn(3) でもよい.
  19. pid_t pid = fork();
  20. if (pid == -1) {
  21. fprintf(stderr, "fork() failed.\n");
  22. return 1;
  23. }
  24. else if (pid == 0) {
  25. // child
  26. execl(argv0, argv0, "-c", hostname, NULL); // restart
  27. abort(); // ここには来ない
  28. }
  29. // 親プロセス
  30. #else
  31. // Windows: _spawn* 系かCreateProcess() を直接使う.
  32. _spawnl(_P_NOWAIT, argv0, argv0, "-c", hostname, NULL);
  33. #endif
  34. }
  35. return 0;
  36. }

子プロセスで、実際に通信する。接続を繋ぎっぱなしにするため、適宜、待ちを入れる.

C++
[RAW]
  1. SOCKET sockfd[CLIENT_SIZE];
  2. // @return 0 が成功.
  3. int test_connect_server(const char* hostname)
  4. {
  5. assert(hostname);
  6. #ifdef _WIN32
  7. // Initialize Winsock
  8. WSADATA wsaData;
  9. int iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
  10. if (iResult != 0) {
  11. printf("WSAStartup failed: %d\n", iResult);
  12. return 1;
  13. }
  14. #endif
  15. for (int i = 0; i < CLIENT_SIZE; i++) {
  16. sockfd[i] = connect_to_server(hostname, 7000);
  17. if (sockfd[i] == INVALID_SOCKET) {
  18. fprintf(stderr, "%d: connect_to_server() failed.\n", i); //=> 1020 でコケる
  19. return 1;
  20. }
  21. }
  22. const char ch = 'A';
  23. for (int i = 0; i < CLIENT_SIZE; i++) {
  24. // 送受信
  25. send(sockfd[i], &ch, 1, 0);
  26. }
  27. printf("send finished.\n");
  28. #ifndef _WIN32
  29. sleep(5);
  30. #else
  31. Sleep(5 * 1000);
  32. #endif
  33. for (int i = 0; i < CLIENT_SIZE; i++) {
  34. char rch = '\0';
  35. recv(sockfd[i], &rch, 1, 0);
  36. if (ch != rch) {
  37. fprintf(stderr, "%d: char from server = %c\n", i, rch);
  38. exit(1);
  39. }
  40. }
  41. printf("receive finished.\n");
  42. #ifndef _WIN32
  43. sleep(5);
  44. #else
  45. Sleep(5 * 1000);
  46. #endif
  47. /*
  48. #if 0
  49. // 明示的に shutdown する
  50. ch = 'X';
  51. send(sockfd, &ch, 1, 0);
  52. #endif
  53. // 明示的に server を shutdown する
  54. ch = 'Z';
  55. send(sockfd, &ch, 1, 0);
  56. */
  57. for (int i = 0; i < CLIENT_SIZE; i++) {
  58. closesocket(sockfd[i]);
  59. }
  60. return 0;
  61. }