C10K問題 (C10K problem) を乗り越えるため, とりあえず 1,000 同時接続を超えるサンプル.
同時接続数とパフォーマンス向上の技術
POSIX API
select(2)
は上限が低い. 待ち合わせする file descriptor の「値」の上限が FD_SETSIZE
マクロで決まる。
OS によっては、アプリケィションのコンパイル時に, FD_SETSIZE
マクロ値をコンパイラオプションで与えて変更することで、上限を引き上げられる。FreeBSD はそのようになっている。
しかし、Linux は 1024 で完全に決め打ちになっている。<sys/select.h>
(glibc 提供ファイル) から参照される <linux/posix_types.h>
(Linux kernel のファイル) で, __FD_SETSIZE
も決め打ちになっている。
File descriptor はソケットだけでなく, 標準入力 = 0, ... のようにファイルを開いても消費される。そのため、せいぜい 1,000 程度しか待ち受けできない。同時接続数が不明な場合は select()
を使ってはならない。
POSIX 標準では poll(2)
が 1,000 接続の上限を超えて待ち受けできる。しかし、インタフェイスの設計がよくなく、パフォーマンスは select()
程度。
各OSの取組み
Linux での非同期I/O手法とそれらのパフォーマンス: Asynchronous C++ History of Time マルチスレッドでも 10,000 接続ぐらいまではイケる。Linux epoll はよい。
それぞれの OS がバラバラな方法で、多くの接続を「効率的に」待ち受けできるように新しいAPIを導入した。ソケットはカーネルで実装されるので、通常、システムコールとして提供される.
- UNIX
- Linux 2.6+ epoll(7) システムコール.
epoll_create(2)
- Solaris7 11/99+ poll(7d).
/dev/poll
デヴァイス. HP-UX でも実装されている. dvpoll
構造体.
- Solaris10 Event Ports イベント.
port_create()
. 問題あり, /dev/poll
を使うべきか?
- FreeBSD 4.1以降 kqueue(2):
kqueue()
, kevent()
システムコール. NetBSD 1.6.1 (2003年), NetBSD 2.0 (2004年) 以降, OpenBSD 2.9以降でも実装.
- AIX pollset API.
pollset_create()
など.
- Windows I/O Completion Port (IOCP). コピー回数が少ない. スレッドを効率的に動かす.
ポータブルなライブラリ
てんでバラバラなので、アプリケィション開発には, これらをカバーするライブラリを使おう。動かしたい環境 (の幅) とオーバヘッドの少なさでどれを使うべきか決まる。
- libev - a full-featured and high-performance event loop. UNIX 限定であればこれ. パフォーマンスも各OSのシステムコールを直接叩くのと遜色ない.
- libuv - a cross-platform asynchronous I/O. Windows でも動かしたい場合はこれ。パフォーマンスは libev より劣るが libevent よりはよい.
- POCO C++ Libraries - 組込みOS (embedded OS) でも動く. 軽いライブラリが必要な場合はこれ。パフォーマンスは libevent = POCO = asio で, libev, libuv より劣る.
- ▲ libevent - 古い. 新しいプロジェクトでは libev を使え.
- ▲ (non-Boost) Asio and Boost.Asio.
このほか, libhv というのもあって, Windows IOCP もサポートしつつ、libev と同等のパフォーマンスと主張している: GitHub - ithewei/libhv: 🔥 比libevent、libuv更易用的网络库。A c/c++ network library for developing TCP/UDP/SSL/HTTP/WebSocket/MQTT client/server. Fedora 37, Debian 11 にはパッケージがない。試していない。
Current or soft limit
以下で作るサンプルは、そのまま実行してもエラーになる。Soft limits がログイン端末ごとに設定されている。ulimit コマンドで表示できる (open files が 1,024 になっている)。標準入出力なども file descriptor を消費するので、これで 1,000 を少し超えた辺りでエラーになる。
$ ulimit -a
real-time non-blocking time (microseconds, -R) unlimited
core file size (blocks, -c) unlimited
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 23404
max locked memory (kbytes, -l) 8192
max memory size (kbytes, -m) unlimited
open files (-n) 1024
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) 8192
cpu time (seconds, -t) unlimited
max user processes (-u) 23404
virtual memory (kbytes, -v) unlimited
file locks (-x) unlimited
サーバ側の実行前に $ ulimit -S -n 110000 などしておく.
同時接続数 10,000 (C10K) 〜 C100K サーバ
libuv で C10K サーバを作ってみる。たかだか一つだけのクライアントから接続を受け付けるサンプルは Web 上にいくらでも見つかるが、複数接続で C++ のサンプルは、意外と見当たらない。
Echo server を作る。
main()
から見ていこう.
server-common.cpp
C++
- constexpr int PORT = 7000;
- constexpr int BACKLOG = 100;
-
-
- uv_loop_t* g_loop = NULL;
-
-
- uv_tcp_t* g_server = NULL;
-
-
- extern void on_connection( uv_stream_t* server, int status );
-
- int main()
- {
- struct rlimit rlim;
- getrlimit(RLIMIT_NOFILE, &rlim);
- printf("soft = %ld, hard = %ld, FD_SETSIZE = %d\n",
- rlim.rlim_cur, rlim.rlim_max, FD_SETSIZE);
-
-
- g_loop = uv_default_loop();
-
- g_server = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
- uv_tcp_init( g_loop, g_server );
-
- printf("Waiting port = %d\n", PORT);
- int r = tcp_setup_server(g_server, NULL, PORT);
- if (r) {
- fprintf(stderr, "tcp_setup_server() failed.\n");
- return 1;
- }
-
- r = uv_listen( (uv_stream_t*) g_server, BACKLOG, on_connection );
- if (r) {
-
- fprintf( stderr, "uv_listen() failed: %s\n", uv_strerror(r) );
- return 1;
- }
-
-
- r = uv_run(g_loop, UV_RUN_DEFAULT);
- printf("Exit uv_run()\n");
-
- return r;
- }
uv_default_loop()
でイベントループを得る.
ソケットを wrap する uv_tcp_t
変数をイベントループに結びつける。libuv はイベントループが中心になる。ソケットを tcp_setup_server()
内で bind し, listen して接続を受け付けたら on_connection()
をコールバックさせる.
uv_run()
でループ開始する。
次に, tcp_setup_server()
関数. ソケットを開いて bind する. uv_tcp_t
のメソッドである uv_tcp_bind()
は, BSDソケット関数の socket()
, bind()
を一体にしたもの。
server-common.cpp
C++
- #include <stdio.h>
- #include <assert.h>
- #include <string.h>
- #include <uv.h>
- #include <sys/resource.h>
- #include <stdlib.h>
-
-
-
-
-
-
-
-
-
-
- int tcp_setup_server(uv_tcp_t* server, const char* node, int port)
- {
- assert(server);
- if (port < 0)
- return -1;
-
-
- struct addrinfo hints;
- struct addrinfo* res = NULL;
-
- memset(&hints, 0, sizeof hints);
- if (!node)
- hints.ai_family = AF_INET6;
- hints.ai_socktype = SOCK_STREAM;
-
- hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV;
-
-
- char service[11];
- sprintf(service, "%d", port);
- int err = getaddrinfo(node, service, &hints, &res);
- if (err != 0) {
- fprintf( stderr, "getaddrinfo() failed: %s\n", gai_strerror(err) );
- return -1;
- }
-
-
- int r = uv_tcp_bind(server, res->ai_addr, 0);
- freeaddrinfo(res);
- if (r) {
- fprintf(stderr, "uv_tcp_bind() failed: %s\n", uv_strerror(r) );
- return -1;
- }
-
- return 0;
- }
コールバックされる on_connection()
は次のようにする。
よくあるサンプルでは, uv_accept()
で得たソケットをグローバル変数に格納したりする。それでは一つのクライアントからの接続にしか対応できない。ClientSession
クラスを作って、配列に格納するようにする。
multi-tcp-server.cpp
C++
- map<uv_tcp_t*, class ClientSession*> ClientSession::_session_list;
-
- void on_connection( uv_stream_t* server, int status )
- {
- if ( status != 0 ) {
-
-
- fprintf(stderr, "New connection error: %s\n", uv_strerror(status) );
- return;
- }
-
- uv_tcp_t* client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
- uv_tcp_init( g_loop, client );
-
- int r = uv_accept(server, (uv_stream_t*) client);
- if (r) {
- fprintf(stderr, "uv_accept() failed: %s\n", uv_strerror(r) );
- free(client);
- return;
- }
-
- ClientSession* session = ClientSession::new_instance(client);
- r = session->read_start();
- assert( r == 0 );
- }
BSD ソケットの accept()
を wrap する uv_accept()
で, 新しくクライアントとの接続用のソケットができる。
その新しいソケットで, 読み取り可能になったときに呼び出されるコールバック関数を登録、そのまま待ち受け開始する。ClientSession
クラスの read_start()
までを作る.
C++
- class ClientSession: private noncopyable
- {
-
- uv_tcp_t* m_handle;
-
- static map<uv_tcp_t*, class ClientSession*> _session_list;
-
- protected:
- ClientSession(uv_tcp_t* h): m_handle(h) {
- assert( h != NULL );
- }
-
- public:
- static ClientSession* new_instance(uv_tcp_t* h) {
- assert(h);
- ClientSession* self = new ClientSession(h);
- _session_list.insert(make_pair(h, self));
- return self;
- }
-
- ~ClientSession() {
- assert(m_handle);
- _session_list.erase(m_handle);
- free(m_handle); m_handle = NULL;
- }
-
-
- int read_start() {
- assert( m_handle != NULL );
-
- return uv_read_start( (uv_stream_t*) m_handle, _alloc_buffer,
- (uv_read_cb) _on_read );
- }
ハンドルに対してコールバック関数を uv_read_start()
関数で登録すればOK.
次のコールバック関数を書いていく。配列のどのセッションか特定する _on_read()
と, 実際のコールバック関数の on_receive()
を書く.
C++
-
-
-
-
-
-
-
-
- static void _on_read( uv_stream_t* client, ssize_t nread, uv_buf_t* buf ) {
- ClientSession* v = _session_list.at((uv_tcp_t*) client);
- v->on_receive(nread, buf);
-
-
- if (buf->base) {
- free(buf->base); buf->base = NULL;
- }
- }
libuv の特徴は、読み込み可能になったらコールバックされるのではなく、Windows 流に合わせて、読み込みが完了してからコールバックされる.
C++
- void on_receive(ssize_t nread, const uv_buf_t* buf) {
- assert(buf);
-
- if (nread < 0) {
- if (nread != UV_EOF) {
- fprintf(stderr, "recv() EOF or error: %s\n", uv_err_name(nread) );
- }
-
- shutdown();
- }
- else if (nread > 0) {
- if (buf->base[0] == 'X') {
- printf("Get client shutdown message.\n");
- shutdown();
- }
- else if (buf->base[0] == 'Z') {
- printf("Get server shutdown message.\n");
- server_shutdown();
- }
- else {
-
- uv_write_t* req = (uv_write_t*) malloc(sizeof(uv_write_t));
- uv_buf_t wbuf = uv_buf_init(buf->base, nread);
- uv_write(req, (uv_stream_t*) m_handle, &wbuf, 1, _after_write);
- }
- }
- }
_alloc_buffer()
で, 毎回, メモリを確保する。uv_read_start()
でコールバックを登録していた。
C++
-
-
-
-
-
-
-
- static void _alloc_buffer( uv_handle_t* handle, size_t suggested_size,
- uv_buf_t* buf)
- {
-
- buf->base = (char*) malloc(suggested_size);
- buf->len = suggested_size;
- }
1回の読み込みごとにメモリを解放するようになっており、非常に重い。完全に固定長のバッファなので、別にメモリプールを作るとパフォーマンスが上がる可能性が高い。
クライアント
次に、クライアント. こちらは libuv を使っていない。少ない台数でテストできるようにするため, 子プロセスをドンドン走らせていく。
Linux で試したところ、100 プロセスにすると、接続に失敗する子プロセスが発生する。別のパラメータの上限がありそう. -> Ephemeral port (エフェメラルポート) を使い果たしたエラー.
接続エラーになる少し前の状態。この後にポートを使い果たしたか。
$ ss -tan | awk '{print $1}' | sort | uniq -c
107693 ESTAB
8 FIN-WAIT-2
9 LISTEN
1 State
233 TIME-WAIT
C++
- constexpr int CLIENT_SIZE = 1000;
-
-
-
- constexpr int CHILD_NUMBER = 100;
-
-
- int make_children(const char* argv0, const char* hostname)
- {
- assert(hostname);
-
- #ifndef _WIN32
- struct rlimit rlim;
- getrlimit(RLIMIT_NOFILE, &rlim);
- printf("soft = %ld, hard = %ld, FD_SETSIZE = %d\n",
- rlim.rlim_cur, rlim.rlim_max, FD_SETSIZE);
- #endif
-
- for (int k = 0; k < CHILD_NUMBER; k++) {
- #ifndef _WIN32
-
- pid_t pid = fork();
- if (pid == -1) {
- fprintf(stderr, "fork() failed.\n");
- return 1;
- }
- else if (pid == 0) {
-
- execl(argv0, argv0, "-c", hostname, NULL);
- abort();
- }
-
- #else
-
- _spawnl(_P_NOWAIT, argv0, argv0, "-c", hostname, NULL);
- #endif
- }
- return 0;
- }
子プロセスで、実際に通信する。接続を繋ぎっぱなしにするため、適宜、待ちを入れる.
C++
- SOCKET sockfd[CLIENT_SIZE];
-
-
- int test_connect_server(const char* hostname)
- {
- assert(hostname);
-
- #ifdef _WIN32
-
- WSADATA wsaData;
- int iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
- if (iResult != 0) {
- printf("WSAStartup failed: %d\n", iResult);
- return 1;
- }
- #endif
-
- for (int i = 0; i < CLIENT_SIZE; i++) {
- sockfd[i] = connect_to_server(hostname, 7000);
- if (sockfd[i] == INVALID_SOCKET) {
- fprintf(stderr, "%d: connect_to_server() failed.\n", i);
- return 1;
- }
- }
-
- const char ch = 'A';
- for (int i = 0; i < CLIENT_SIZE; i++) {
-
- send(sockfd[i], &ch, 1, 0);
- }
- printf("send finished.\n");
- #ifndef _WIN32
- sleep(5);
- #else
- Sleep(5 * 1000);
- #endif
- for (int i = 0; i < CLIENT_SIZE; i++) {
- char rch = '\0';
- recv(sockfd[i], &rch, 1, 0);
- if (ch != rch) {
- fprintf(stderr, "%d: char from server = %c\n", i, rch);
- exit(1);
- }
- }
- printf("receive finished.\n");
- #ifndef _WIN32
- sleep(5);
- #else
- Sleep(5 * 1000);
- #endif
-
-
-
-
-
-
-
-
-
-
- for (int i = 0; i < CLIENT_SIZE; i++) {
- closesocket(sockfd[i]);
- }
-
- return 0;
- }