streambuf を拡張し, ソケット対応 [C++]

[[2018.6]]

C++ の IOストリームで, 新しい対象に対して入出力できるようにしたい場合, istream, ostream, iostream の各クラスに似たストリームクラスと, basic_streambuf クラスから派生させたクラスをペアで作ります。

iostream クラスは書式付き入出力を担当し, 媒体への書き込み・読み込みは streambuf クラスで行なうようになっています。

例えばファイルに対しては fstream クラスと filebuf クラスがペアになります。

ソケットの入出力のストリームクラス, streambuf クラスを作りました。

作ったソースコードはこちら; network · master · netsphere / my-cpp-lib · GitLab

TLS 版については, HTTP/2 のページに掲載しています。HTTP/2 クライアント実装サンプル (TLS版)

仕様

全体的に, なんでこんな仕様になってるの? というところが多い。

書式化は iostream のほうでするのに, ロケールを streambuf のほうに持たせるのは何でだ? streambuf がえらく複雑になっている。

ストリームクラスが streambuf を持つ、というのがチグハグと思う。

バッファリング

読み込みバッファと書き込みバッファを別に設定できる。読み込みバッファは protected setg() で, 書き込みバッファは protected setp() で設定する。(g は get のg, p は put のp. 略さないでほしい...)

これらを呼び出さなければ, バッファリングしない. バッファリングしない場合, public sungetc(), public sputbackc() が失敗する.

読み込み

public sgetn() が固定長の読み込み. 読めた分だけ返すのではなく、指定のバイト数に達するまでブロックする、となっている。::recv() と挙動が違う。recv() のつもりで呼び出すと, スタックしてしまう。

行末まで読みたい、など、どれだけ読めばいいか事前に分からないときは, sgetn() は使えない. そうすると, 1文字ずつ読むしかなく、バッファリングが必須.

実装は, protected xsgetn() に投げてる. そこから uflow() で行っている。uflow() は単に underflow() に投げてる.

basic_streambuf<>::underflow()
{ return traits_type::eof(); }

このメソッドだけを override すれば, とりあえず動くようになる。

エラーハンドリング

underflow() は, 高々1バイトを読み込む。エラーが起こったときの通知方法がない。握りつぶすよりかは, とりあえず例外でも投げるか。

書き込み側, xsputn() も, エラーが起こったときの通知ができない。こちらも, とりあえず例外にするか。

何でこんなことになっているのか.

ヘッダファイル (実装)

やや長くなるが、全文を示す。

socket_streambuf.h:

C++
[RAW]
  1. #include <ios>
  2. #include <assert.h>
  3. #include <algorithm> // min()
  4. #include <string.h> // memmove()
  5. #ifdef _WIN32
  6. #define STRICT
  7. #define WIN32_LEAN_AND_MEAN
  8. #include <winsock2.h>
  9. #include <windows.h>
  10. #define SHUT_RDWR SD_BOTH
  11. #else
  12. #include <sys/socket.h> // recv(), send(), shutdown()
  13. #include <unistd.h> // close()
  14. typedef int SOCKET;
  15. typedef unsigned char BYTE;
  16. #define INVALID_SOCKET -1
  17. #endif
  18. // TCP socket.
  19. // 非SSL (TLS). TLS 版は TlsSocketStreamBuf クラス.
  20. class SocketStreamBuf: public std::streambuf
  21. {
  22. // sgetn() は読めた分だけ返すのではなく, 指定のバイト数に達するまで,
  23. // ブロックする。
  24. // そのため, どれだけ読むべきか事前に不明な場合は sgetn() は使えず,
  25. // 1バイトずつ読まなければならない。
  26. // 実装としては, バッファが (ほぼ) 必須.
  27. char* m_buf;
  28. #define max_headroom ((ssize_t) 16)
  29. static constexpr long bufsiz = 4096 + max_headroom;
  30. protected:
  31. // 所有.
  32. SOCKET m_sock;
  33. public:
  34. SocketStreamBuf(): m_buf(nullptr), m_sock(INVALID_SOCKET) { }
  35. virtual ~SocketStreamBuf() {
  36. close();
  37. if (m_buf) {
  38. free(m_buf);
  39. m_buf = nullptr;
  40. }
  41. }
  42. // SocketStream::open() から呼び出される.
  43. virtual void connected( SOCKET sock, const char* host ) {
  44. assert( sock != INVALID_SOCKET );
  45. assert( host && *host );
  46. assert( m_sock == INVALID_SOCKET ); // TODO: 例外を投げる?
  47. m_sock = sock;
  48. if (!m_buf)
  49. m_buf = (char*) malloc(bufsiz);
  50. setg(m_buf, m_buf, m_buf);
  51. }
  52. // std::streambuf<> には close() がない. => オブジェクトを破壊する必要.
  53. // @return If failed, -1.
  54. virtual int close() {
  55. if (m_sock == INVALID_SOCKET )
  56. return 0;
  57. // なくてもいいはずだが、OSなどの不具合などによりデータ消失があるた
  58. // め (?), 入れた方がいいという意見もある.
  59. // See TCPのはなし
  60. // http://www.silex.jp/blog/wireless/2015/11/tcp.html
  61. // See close vs shutdown socket?
  62. // https://code.i-harness.com/en/q/3f7b5b
  63. ::shutdown(m_sock, SHUT_RDWR);
  64. int r;
  65. #ifdef _WIN32
  66. r = ::closesocket(m_sock);
  67. #else
  68. r = ::close(m_sock);
  69. #endif
  70. m_sock = INVALID_SOCKET;
  71. return r;
  72. }
  73. bool is_open() const throw() {
  74. return m_sock != INVALID_SOCKET;
  75. }
  76. protected:
  77. // バッファを無視して, 読み込む.
  78. // length に満たない場合がある.
  79. // @return エラーの場合, -1
  80. virtual ssize_t sysread(void* buffer, size_t length) {
  81. ssize_t ret;
  82. do {
  83. ret = ::recv(m_sock, (char*) buffer, length, 0);
  84. } while (ret == -1 && errno == EINTR);
  85. return ret;
  86. }
  87. // 読み込み用で最低限 override しなければならないのは, underflow() だけ.
  88. // @return 読み込んだ 1文字. EOF だった場合, traits_type::eof().
  89. virtual int_type underflow() {
  90. if ( !is_open() )
  91. return traits_type::eof();
  92. // protected eback() 先頭, gptr() 現在の位置, egptr() 終端. 名前が??
  93. if (!gptr() || gptr() >= egptr()) {
  94. // バッファを読み切った.
  95. // sungetc() できるようにする.
  96. long rest = std::min(egptr() - eback(), max_headroom);
  97. if (rest > 0)
  98. memmove(m_buf, gptr() - rest, rest);
  99. ssize_t r = this->sysread(m_buf + rest, bufsiz - rest);
  100. if (r == 0) // EOF
  101. return traits_type::eof();
  102. else if (r < 0)
  103. throw errno;
  104. setg(m_buf, m_buf + rest, m_buf + rest + r);
  105. }
  106. return *gptr(); // ポインタ進めない.
  107. }

上述のとおり underflow() だけを override すればとりあえず動くが、バッファリングして, sysread() でドカンと読み込むようにする。

C++
[RAW]
  1. // 同様に, 書き込み側については, 最低限 overflow() を override すればよい.
  2. // 1バイトを書き込む.
  3. // @param __c 書き込む値.
  4. // @return 成功したら EOF 以外の何か. 失敗したら EOF.
  5. // ここの解説では, __c が EOF の場合, 失敗 (EOF) を返す.
  6. // http://www.cplusplus.com/reference/streambuf/basic_streambuf/overflow/
  7. // basic_filebuf<>::overflow() の実装では, __c が EOF の場合も
  8. // 「成功」を返している.
  9. virtual int_type overflow(int_type __c = traits_type::eof() )
  10. {
  11. if ( traits_type::eq_int_type(__c, traits_type::eof()) )
  12. return __c; // error とする.
  13. char cc = __c;
  14. if ( xsputn(&cc, 1) <= 0 )
  15. return traits_type::eof();
  16. return 1; // 成功
  17. }
  18. // 効率悪いので, xsputn() も override.
  19. // @return 書き込めたバイト数.
  20. // basic_streambuf<>::xsputn() の実装を見ると, overflow() が EOF
  21. // を返すと, そこまでのバイト数を返す.
  22. // エラーで1バイトも書き込めなかったときも, 0 を返す. マジか!!
  23. virtual std::streamsize xsputn( const char_type* s,
  24. std::streamsize count )
  25. {
  26. if ( !is_open() )
  27. return 0;
  28. ssize_t r = ::send(m_sock, s, count, 0 );
  29. if (r >= 0)
  30. return r;
  31. else
  32. throw errno;
  33. }
  34. };