第2回 Non-blocking ソケットで IPv4/IPv6両対応 (クライアント側) [C++] [GTK4]

[2022.11] ページが長くなりすぎ。割った。

Note.

(2006.08.05 追記) (2022.04 追記)

下記のコードでは connect(), read() をnon-blockingモードにしている。しかし、getaddrinfo() も, ホスト名の解決を行うために時間が掛かる (ブロックする)。この考慮が足りない。

glibc 拡張の getaddrinfo_a(), Qt の QDnsLookup クラスは非同期だが、いずれも内部でスレッドを生成している。内部で getaddrinfo() を使う限り、このような実装しかない。libresolvres_nsearch() (res_search() は非推奨.) も応答を待つ。

Ruby では、自前のリゾルバ Resolv::DNS クラスを作っている。すごい。ここまでやれば DNS lookupを非同期にできる。できるが、やりすぎ。

ということで、非同期 asynchronous I/O にするには, 自分でスレッドを生成して、getaddrinfo(), ブロッキングモードの connect() を呼び出してサーバに接続するようにすべき。Windowsでも同様の方法が推奨されている。(WSAAsyncGetHostByName の頁を参照。) もっとシンプルにできるだろう。

別ページで、Qt クライアントサンプルを解説する。ソケット通信はブロッキングモードにする。

余談。Qt はマルチスレッドでのワーカースレッド・メインスレッド間のメッセージのやり取りがシグナル/スロットを使うだけでよく、非常に簡単なので、わざわざ作るのが難しい non-blocking モードのソケットにするメリットもない。

他方、GTKは、Qtを真似したシグナルがスレッドを跨げないので、難儀する。gtkmm は Glib::Dispatcher クラスで, わざわざパイプを生成してスレッド間通信している。意外な機能で g_idle_add(), g_idle_add_full() が、名前と全然関係なく、メインスレッドでコールバックを呼び出してくれるので、これを使う。GTK4 になって非互換ばかりで, ますます無茶苦茶になっている。生産性も低く、もう実用にならない。

Windows では GetAddrInfoExW() (Unicode版のみ!) が非同期に対応している。が、サンプルが全然見当たらない。

(以下、2000.10.7, 2000.10.8の日記を加筆。)

ソケット関係の関数は、connect(), read(), サーバ側の accept() と、どれもこれもブロックする。ネットワーク越しのデータのやり取りは本質的に非同期なので、平行して入力がある場合、いちいちブロックするようにプログラムを書いてはいけない。

例えば、GUIプログラムの場合は、ソケットをブロックしていたら、マウスのクリック、画面の再描画などを受け付けられず、フリーズしてしまう。

ではどうするか。

  1. マルチスレッドにして, ソケットは blocking モードで接続, 通信する. 今どきはこれが簡単.
  2. connect() だけ non-blocking モードにして, 接続する。接続できたら blocking モードにして, select() または poll() で待ち合わせる.
  3. ずっと non-blocking モードで接続, 通信する.

ここでは、フレームワークが用意したコールバックの仕組みを使ってみる。どのようなフレームワークでも、ソケットに限らずさまざまな入力を捌く機構が用意されているので、それを使う。そうすれば、ソケットの入力を待ちながら画面を更新したりできるだろう。

Fedora Core 5 と gtk+ 2.8.20 という環境で試してみた。全体的にやっつけで作っているので、実用にしようと思うと、まだ考えるべきところが多いので注意。

[2022.11] GTK4 に更新。修正を要する非互換が多く, ヒドい.

Non-blocking ソケット

UNIXでは、ソケットに限らず、ファイルIOをブロックしないように設定できる。open() または fcntl()O_NONBLOCK フラグを設定すると、non-blocking モードになる。ソケットでは, 後者の fcntl() 関数の F_SETFL コマンドで O_NONBLOCK フラグを設定できる。

Non-blockingモードでは、read(2), write(), send(), recv() 関数は、ブロックせずに、すぐに返ってくるようになる。戻り値とエラーコードで状況を調べる必要がある。EAGAIN または EWOULDBLOCK エラーが発生する。

connect() 関数もすぐに返ってくる。EINPROGRESS エラーになるかを確認する。

accept(2) も同様。listenキューが空でも、すぐに返ってる。EAGAIN または EWOULDBLOCK エラーが発生する。

EAGAINEWOULDBLOCK エラーは, 異なる値かもしれず、かつどちらのエラーもありうる。Linuxでは同じ値になっているが、移植性のため、両方を確認すること。

何を作るか

ごく簡単なGTK4 クライアントを作る。サーバから fortune テキストを得て、表示するだけ。

GTK アプリケーションの部分は、本題ではないので、割愛する。

接続要求関数

では、接続要求を投げるところから順に作っていこう.

Non-blocking モードの設定

ソケットを non-blocking モードにするには fcntl() を使う。F_GETFL コマンドで現在のフラグを読み出し、F_SETFL で設定する。

C++
[RAW]
  1. /**
  2. * @param mode 非0 = into non-blocking mode.
  3. * @return If failed, -1.
  4. */
  5. int non_blocking( SOCKET sock, int mode )
  6. {
  7. assert( sock != INVALID_SOCKET );
  8. #ifndef _WIN32
  9. // UNIX
  10. int oldflags = fcntl(sock, F_GETFL, 0 /* dummy */);
  11. return fcntl(sock, F_SETFL,
  12. mode ? (oldflags | O_NONBLOCK) : (oldflags & ~O_NONBLOCK) );
  13. #else
  14. // Windows では「現在のモード」を得る方法はない
  15. u_long iMode = mode ? 1 : 0;
  16. int iResult = ioctlsocket( sock, FIONBIO, &iMode);
  17. if ( iResult != NO_ERROR )
  18. return -1;
  19. return 0;
  20. #endif
  21. }

接続要求

getaddrinfo() が複数のソケットアドレスを返す. 通常は IPv6 アドレスと IPv4 アドレスの二つ。両方に対して接続要求を出すので、複雑になる.

Non-blocking ソケットでは、connect() がすぐに返ってくる。サーバがIPv4対応かIPv6対応か分からない状況では、ソケットを2本使う必要がある。両方でconnectを試みて、接続できたほうを生かす。

C++
[RAW]
  1. // <var>hostname</var> からIPアドレスを引き, 全部に接続要求する. Non-blocking 版.
  2. // 実際に接続したときに on_connect() が呼び出されるようにする.
  3. // @return 少なくとも一つが接続要求が成功 = true
  4. bool start_connect_to_server_nonblock(const char* hostname, int port)
  5. {
  6. assert( hostname );
  7. if (port <= 0 || port > 65535 )
  8. return false;
  9. struct addrinfo hints;
  10. memset(&hints, 0, sizeof(hints));
  11. hints.ai_family = AF_UNSPEC; // IPv4/IPv6 両対応
  12. hints.ai_socktype = SOCK_STREAM;
  13. hints.ai_flags = AI_NUMERICSERV;
  14. char service[11];
  15. sprintf(service, "%d", port);
  16. struct addrinfo* res = NULL;
  17. // ●ここで待ちが入る。アカン!!
  18. int err = getaddrinfo(hostname, service, &hints, &res);
  19. if (err != 0) {
  20. fprintf(stderr, "getaddrinfo() failed: %s\n", gai_strerror(err));
  21. error_dialog("ホストまたはポートが不正です。");
  22. return false;
  23. }

冒頭 Note. のとおり, getaddrinfo() がブロックしてしまうため、このサンプルは傷がある。無理やりいくなら、ここだけマルチスレッドにする。

connect() はブロックされず、EINPROGRESS エラーを発生させる。それを確認して、メインループでイベントを待ち合わせる.

接続に成功すると書き込み可能状態 POLLOUT になるので,そのようにコールバック関数を登録する。start_watch() 関数でおこなっている。

C++
[RAW]
  1. SOCKET sockfd = INVALID_SOCKET;
  2. bool succeeded = false;
  3. for (addrinfo* ai = res; ai; ai = ai->ai_next) {
  4. sockaddr_print("connect...", ai->ai_addr, ai->ai_addrlen);
  5. sockfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
  6. if ( sockfd == INVALID_SOCKET ) {
  7. freeaddrinfo(res);
  8. return false;
  9. }
  10. non_blocking(sockfd, 1); // 非ブロックモードに切り替え.
  11. int r = ::connect(sockfd, ai->ai_addr, ai->ai_addrlen);
  12. if (r < 0) {
  13. if (errno == EINPROGRESS || errno == EINTR ) { // 即時確立しなかった
  14. // Non-blocking モードのときは, シグナル割り込みは EINPROGRESS
  15. // と同じでよいみたい.
  16. // 成功. 書き込み可能になるまで待つ.
  17. pair<GIOChannel*, guint> ev =
  18. start_watch(sockfd, G_IO_OUT, on_connect, NULL);
  19. wait_connect_events.insert(ev);
  20. // Non-blocking 版では, 全部を試す.
  21. succeeded = true;
  22. continue;
  23. }
  24. closesocket(sockfd);
  25. sockfd = INVALID_SOCKET;
  26. continue;
  27. }
  28. else {
  29. // 即時成功の場合, 他のを close して, g_connection を設定.
  30. on_connect(NULL, G_IO_OUT, NULL);
  31. return true;
  32. }
  33. }
  34. freeaddrinfo(res); // もういらん.
  35. return succeeded;
  36. }

GTK でコールバック関数を登録する start_watch() 関数を見ておこう。

C++
[RAW]
  1. pair<GIOChannel*, guint>
  2. start_watch(SOCKET sockfd, int condition, GIOFunc proc, gpointer user_data)
  3. {
  4. assert(sockfd != INVALID_SOCKET);
  5. // GIOChannel で wrap する
  6. GIOChannel* channel = g_io_channel_unix_new(sockfd);
  7. guint io_watch = g_io_add_watch_full(channel, G_PRIORITY_DEFAULT,
  8. (GIOCondition) condition,
  9. proc,
  10. user_data,
  11. on_io_destroy);
  12. assert(io_watch);
  13. // g_io_add_watch() 内でもrefされるので、ここでは解放する
  14. g_io_channel_unref(channel);
  15. return make_pair(channel, io_watch);
  16. }

gdk_input_add() 関数が廃止され, g_io_add_watch_full() を使う. 代替関数なしにドンドコ deprecated にしていくのは止めてほしい。ファイルディスクリプタをそのまま登録するのではなく, GIOChannel で wrap する.

余談. g_io_channel_unix_new() の引数の型が int 決め打ちで、64bit Windows では動かない. 何のために wrap したのか意味不明. GPollFD 型は 64bit Windows では64bit ハンドルになっているので、修正漏れか.

接続成功 → イベント登録

接続に成功したら,読み込み可能状態でコールバックしてもらうように登録し直す。あとはコールバックされたら read() すればよい。

C++
[RAW]
  1. // Callback. 接続した or エラー. 成功の場合, g_connection を設定する.
  2. // @param source NULL の場合がある.
  3. gboolean on_connect(GIOChannel* source, GIOCondition condition, gpointer data)
  4. {
  5. print_condition(__func__, condition); // DEBUG
  6. if ( condition & (G_IO_ERR | G_IO_HUP) ) {
  7. // 自分だけ削除する.
  8. SOCKET fd = g_io_channel_unix_get_fd(source);
  9. closesocket(fd);
  10. wait_connect_events.erase(source);
  11. if (wait_connect_events.size() == 0) {
  12. error_dialog("接続に失敗しました。");
  13. g_idle_add(on_connect_error, (void*) -1); // emit.
  14. }
  15. return FALSE; // FALSE = The event source to be removed.
  16. }
  17. assert( wait_connect_events.size() > 0 );
  18. // ほかのを全部 close する
  19. for (auto i = wait_connect_events.begin();
  20. i != wait_connect_events.end(); i++) {
  21. if (i->first == source)
  22. continue;
  23. SOCKET fd = g_io_channel_unix_get_fd(i->first);
  24. closesocket(fd);
  25. g_source_remove(i->second); // 内部で g_io_channel_unref() される.
  26. }
  27. wait_connect_events.clear();
  28. // Callback を改めて登録する.
  29. SOCKET sockfd = g_io_channel_unix_get_fd(source);
  30. g_connection = start_watch(sockfd, G_IO_IN, on_ready_to_read, NULL);
  31. return FALSE; // FALSE = The event source to be removed.
  32. }

複数の接続要求を同時に走らせているので、接続できた場合はそれ以外の全部、接続できなかった場合はそのソケットを閉じていく。不必要に複雑になっている。

読み取って表示

EOF の場合は, エラーではなく, 読み取り可能かつ recv() の戻り値が 0 になる。

C++
[RAW]
  1. // Callback. GIOChannel のほうが引数になる. -> 直接 g_source_remove() できない.
  2. gboolean on_ready_to_read(GIOChannel* source, GIOCondition condition,
  3. gpointer data)
  4. {
  5. print_condition(__func__, condition); // DEBUG
  6. if ( condition & (G_IO_ERR | G_IO_HUP) ) {
  7. error_dialog("読み込み中にエラー発生");
  8. close_and_notify(false);
  9. return FALSE; // FALSE = The event source to be removed.
  10. }
  11. if (condition & G_IO_IN) {
  12. SOCKET fd = g_io_channel_unix_get_fd(g_connection.first);
  13. char buf[1000];
  14. int bytes_read = recv(fd, buf, sizeof(buf) - 1, 0);
  15. if (bytes_read < 0) {
  16. perror("recv() failed");
  17. error_dialog("recv() でエラー発生");
  18. close_and_notify(false);
  19. return FALSE; // FALSE = The event source to be removed.
  20. }
  21. else if ( bytes_read == 0 ) {
  22. // EOF
  23. close_and_notify(true);
  24. return FALSE;
  25. }
  26. assert( bytes_read > 0 );
  27. buf[bytes_read] = '\0';
  28. GtkTextBuffer* buffer = gtk_text_view_get_buffer(top_window->textView);
  29. assert(buffer);
  30. gtk_text_buffer_insert_at_cursor(buffer, buf, bytes_read);
  31. }
  32. return TRUE; // 再利用する.
  33. }

read() は次のいずれかで返る。

  1. 引数として与えたバッファが埋まった
  2. 書き込み側のwrite()の切れ目
  3. 書き込み側でソケットが閉じられた

上記の 2. から, バッファのバイト長より読めた長さが短くても、ソケットが閉じられたとは限らない。同様にread()の戻り値がバッファのバイト長と同じでも,まだデータがあるとは限らない。

ブロッキングモードの場合, read() の戻り値がたまたまバッファのバイト長と同じで、まだデータがあると思ってさらに read() するとブロックしてしまう。

一回だけ read() したら読み込みのルーチンを終了し,メインループに戻すようにする。メインループの待ち合わせでデータがあるかどうかを検査し、また読み込みルーチンを呼び出してもらう。

開始ボタン

あとは、開始ボタンのハンドラ。マルチスレッド版のコードと共用。

callbacks.cpp

C++
[RAW]
  1. // Callback
  2. void on_get_button_clicked(GtkButton* button, gpointer user_data)
  3. {
  4. #ifdef USE_THREAD
  5. if (worker_thread)
  6. return;
  7. #endif
  8. // GTK4: gtk_entry_get_text() がいきなり廃止。
  9. GtkEntryBuffer* b = gtk_entry_get_buffer( GTK_ENTRY(top_window->hostEntry) );
  10. worker_param.hostname = gtk_entry_buffer_get_text(b);
  11. b = gtk_entry_get_buffer( GTK_ENTRY(top_window->portEntry) );
  12. const char* port_str = gtk_entry_buffer_get_text(b);
  13. worker_param.port = atoi(port_str);
  14. if ( worker_param.port <= 0 || worker_param.port > 65535 ) {
  15. GtkDialog* dialog = GTK_DIALOG( gtk_message_dialog_new(top_window->self,
  16. GTK_DIALOG_DESTROY_WITH_PARENT,
  17. GTK_MESSAGE_ERROR,
  18. GTK_BUTTONS_CLOSE,
  19. "Port error") );
  20. g_signal_connect(dialog, "response",
  21. G_CALLBACK(gtk_window_destroy), NULL);
  22. //GTK4: gtk_dialog_run() は廃止.
  23. gtk_widget_show( GTK_WIDGET(dialog) );
  24. return;
  25. }
  26. #ifdef USE_THREAD
  27. // g_thread_new() は使うな. glib 2.32以降.
  28. // g_thread_create() は廃れた.
  29. GError* error = nullptr;
  30. worker_thread = g_thread_try_new("worker", start_get_fortune, &worker_param,
  31. &error);
  32. if ( !worker_thread ) {
  33. g_printerr("Error creating thread: %s\n", error->message);
  34. g_clear_error(&error);
  35. return;
  36. }
  37. #else
  38. if (!start_connect_to_server_nonblock(
  39. worker_param.hostname.c_str(), worker_param.port)) {
  40. return;
  41. }
  42. #endif
  43. gtk_widget_set_sensitive( GTK_WIDGET(top_window->hostEntry), FALSE );
  44. gtk_widget_set_sensitive( GTK_WIDGET(top_window->portEntry), FALSE );
  45. gtk_widget_set_sensitive( GTK_WIDGET(button), FALSE );
  46. }

関数をいきなり廃止したりするの、本当に止めてほしい。