2015年4月9日木曜日

和人君とUnix Domain Socketでプロセス間通信をやってみた

ソケットのままでも通信できるんですね

成長しないHaskellばかりの毎日ですが、訳あってソケットまわりを調べる機会が訪れてしまい、ほぼソケットまわりを忘れていまして再度調べたのでメモ...

Unixドメインソケット


プロセス間通信だとpipeとかありますが、ドメインソケットでもそれっぽい事ができます。

プロセスを2つ作って、互いに通信するよくあるコードですが...コードが単調でグチャグチャになってしまったので先にやってることを図にしますと、こうなってます

まず、和人プロセスが実行されます

和人プロセスは、socketpair()でペアなソケットを作成します、関数の説明は「manを見てください」と言うことで、こんな感じです、socketpair()でSOCK_STREAMを指定してストリームパイプを作成してます、この関数はUnixドメインソケットで利用可能です。

それから和人プロセスは、子供を作ります、どうやって創ったか...は不思議ですが、するとこうなります


和人プロセスは、子供を生んだ直後に「いらないコネクション」閉じちゃいます、でも和人プロセスはもう一本あることを知っています、これを使って和人プロセスは子供にデータを送信します。

さらにその後、和人プロセスと子プロセスはそれぞれ自分でスレッドを1つ作成します。スレッドについては「manを見てください」とお願いしまして、こうなります。

そのスレッドのエントリ関数はparent_thread_func()とchild_thread_func()です、これは強制的に一定間隔で子プロセスから親プロセスへデータを送りつけるための関数です(和人プロセスは無理やりやられます)

大まかにはこんな感じです、では、コードはこちらです、Unixドメインソケットってパイプみたいに使えるのですね。

コードです


実行しますとターミナルへの出力が混ざっちゃいますので注意して見てください..
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <pthread.h>

#define BUFFSIZE 4096

static void parent(void);
static void child(void);

void parent_thread_func(void);
void child_thread_func(void);

/* UNIX DOMAIN SOCKET */
int sock[2];

int main(void)
{
  int result;
  pid_t pid, cid;

  result = socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
  if(result < 0) {
    printf("ソケットぺあ失敗だぜ errno=%s(%d)", strerror(errno), errno);
    return EXIT_FAILURE;
  }

  cid = fork();
  switch(cid) {
  case 0:
    child();
    break;
  case -1:
    printf("フォーク失敗だぜ");
    exit(EXIT_FAILURE);
  default:
    parent();
    break;
  }

  return 0;
}

static void parent(void)
{
  char buff[BUFFSIZE] = "Kazuto";
  int index = 0, val, bytes = 0, result, tret;
  ssize_t n;
  socklen_t len;
  struct msghdr msg;
  struct iovec iov[2];
  struct cmsghdr cmsghdr;
  pthread_t tid;

  iov[0].iov_base = buff;
  iov[0].iov_len = sizeof(buff);
  iov[1].iov_base = &index;
  iov[1].iov_len = sizeof(index);

  msg.msg_name = NULL;
  msg.msg_namelen = 0;
  msg.msg_iov = iov;
  msg.msg_iovlen = 2;

#ifdef __gnu_linux__
  msg.msg_control = NULL;
  msg.msg_controllen = 0;
#else
  msg.msg_accrights = NULL;
  msg.msg_accrightslen = 0;
#endif

  len = sizeof(val);
  result = getsockopt(sock[1], SOL_SOCKET, SO_SNDBUF, &val, &len);
  if(result < 0) {
    printf("のっけからダメかよ... errno=%s(%d)", strerror(errno), errno);
    exit(EXIT_FAILURE);
  }
  printf("ソケットの送信バッファサイズはこんだけ = %d", val);

  close(sock[0]);

  tret = pthread_create(&tid, NULL, (void*)parent_thread_func, NULL);
  if(tret != 0) {
    printf("スレッドできねぇーし");
    exit(EXIT_FAILURE);
  }

  for(;;) {
    n = sendmsg(sock[1], &msg, 0);
    if(n >= 0) {
      printf("ソケットへ和人をブチ込んだバイト数 = %d", (int)n);
      index++;
      bytes += iov[0].iov_len + iov[1].iov_len;
      printf("ブチバイト数の計算あってればこんなもんだろ = %d", bytes);
    } else {
      printf("ああぁーやっちまった errno=%s(%d)", strerror(errno), errno);
    }
  }
}

void parent_thread_func(void)
{
  char buff[BUFFSIZE];
  int index;
  ssize_t n;
  struct msghdr msg;
  struct iovec iov[2];

  iov[0].iov_base = buff;
  iov[0].iov_len = sizeof(buff);
  iov[1].iov_base = &index;
  iov[1].iov_len = sizeof(index);

  msg.msg_name = NULL;
  msg.msg_namelen = 0;
  msg.msg_iov = iov;
  msg.msg_iovlen = 2;

#ifdef __gnu_linux__
  msg.msg_control = NULL;
  msg.msg_controllen = 0;
#else
  msg.msg_accrights = NULL;
  msg.msg_accrightslen = 0;
#endif

  for(;;) {
    n = recvmsg(sock[1], &msg, 0);
    if(n >= 0) {
      printf("(裏)和人きたバイト数 ret = %d", (int)n);
      printf("   iovec[0]=%s", buff);
      printf("   iovec[0].size=%d", (int)msg.msg_iov[0].iov_len);
      printf("   iovec[1]=%d", index);
      printf("   iovec[1].size=%d", (int)msg.msg_iov[1].iov_len);
      sleep(5);
    } else {
      printf("(裏)ここはレアだぜ errno=%s(%d)", strerror(errno), errno);
    }
  }
}

static void child(void)
{
  char buff[BUFFSIZE];
  int index = 0, val, bytes = 0, result, tret;;
  ssize_t n;
  socklen_t len;
  struct msghdr msg;
  struct iovec iov[2];
  pthread_t tid;

  iov[0].iov_base = buff;
  iov[0].iov_len = sizeof(buff);
  iov[1].iov_base = &index;
  iov[1].iov_len = sizeof(index);

  msg.msg_name = NULL;
  msg.msg_namelen = 0;
  msg.msg_iov = iov;
  msg.msg_iovlen = 2;

#ifdef __gnu_linux__
  msg.msg_control = NULL;
  msg.msg_controllen = 0;
#else
  msg.msg_accrights = NULL;
  msg.msg_accrightslen = 0;
#endif

  len = sizeof(val);
  result = getsockopt(sock[0], SOL_SOCKET, SO_RCVBUF, &val, &len);
  if(result < 0) {
    printf("socketぺあー errno=%s(%d)", strerror(errno), errno);
    exit(EXIT_FAILURE);
  }
  printf("ソケットの受信バッファサイズはこんだけ = %d", val);

  close(sock[1]);

  tret = pthread_create(&tid, NULL, (void*)child_thread_func, NULL);
  if(tret != 0) {
    printf("スレッドできねぇーし");
    exit(EXIT_FAILURE);
  }

  for(;;) {
    puts("ソケットから何か読んでみるから 何かキー押せ...なければ寝る...");
    getchar();
    n = recvmsg(sock[0], &msg, 0);
    if(n >= 0) {
      printf("和人きた...データあったw ret = %d", (int)n);
      printf("   iovec[0]=%s", buff);
      printf("   iovec[0].size=%d", (int)msg.msg_iov[0].iov_len);
      printf("   iovec[1]=%d", index);
      printf("   iovec[1].size=%d", (int)msg.msg_iov[1].iov_len);
    } else {
      printf("recvmsg() errno=%s(%d)", strerror(errno), errno);
    }
  }
}

void child_thread_func(void)
{
  char buff[BUFFSIZE] = "Konomae Oka-ga, kitattena";
  int index = 10000;
  ssize_t n;
  struct msghdr msg;
  struct iovec iov[2];
  struct cmsghdr cmsghdr;

  iov[0].iov_base = buff;
  iov[0].iov_len = sizeof(buff);
  iov[1].iov_base = &index;
  iov[1].iov_len = sizeof(index);

  msg.msg_name = NULL;
  msg.msg_namelen = 0;
  msg.msg_iov = iov;
  msg.msg_iovlen = 2;

#ifdef __gnu_linux__
  msg.msg_control = NULL;
  msg.msg_controllen = 0;
#else
  msg.msg_accrights = NULL;
  msg.msg_accrightslen = 0;
#endif
  
  for(;;) {
    n = sendmsg(sock[0], &msg, 0);
    if(n >= 0) {
      printf("(裏)和人ぶち込みバイト数 = %d", (int)n);
      index++;
    } else {
      printf("(裏)めんどくせぇー errno=%s(%d)", strerror(errno), errno);
    }
  }
}

ビルド方法はこんな感じです

Linuxですとこうで
$ gcc -g -O0 socketpair.c -lpthread


Solarisですとこうです、cc使ってます
$ cc -g -O0 socketpair.c -lsocket -lpthread

msghdr構造体とiovec構造体で送受信ができます、送信したいデータはiovec構造体にデータとデータの長さをセットする(scatter/gather配列と言うらしいです)、サンプルでは2つのデータを設定して送信しています。

1つ目はテキストデータで、2つ目はint型の整数で、和人プロセス<=>子プロセス間は0からインクリメントされていき、スレッド<=>スレッド間は10000からインクリメントされるようになっています。

sendmsg()とrecvmsg()で送受信しますが、ブロッキングソケットなので、ソケットバッファがいっぱいになった時にsendmsg()はブロックし、recvmsg()は読み込みに行ったときにバッファにデータが無いとブロックして帰ってきません。

和人プロセス<=>子プロセス間の通信はsendmsg()が先に走るように設定してありますので送信バッファが一杯になってブロックした後に、ENTERキーを押すとrecvmsg()が走るようにしています。

ちなみにmsghdr構造体ですが、Linuxではこうで
---------------------------
 Linux
---------------------------
/* Structure describing messages sent by
   `sendmsg' and received by `recvmsg'.  */
struct msghdr
  {
    void *msg_name;  /* Address to send to/receive from.  */
    socklen_t msg_namelen; /* Length of address data.  */

    struct iovec *msg_iov; /* Vector of data to send/receive into.  */
    size_t msg_iovlen;  /* Number of elements in the vector.  */

    void *msg_control;  /* Ancillary data (eg BSD filedesc passing). */
    size_t msg_controllen; /* Ancillary data buffer length.
                               !! The type should be socklen_t but the
                               definition of the kernel is incompatible
                               with this.  */

    int msg_flags;  /* Flags on received message.  */
  };

/* Structure used for storage of ancillary data object information.  */
struct cmsghdr {
    size_t cmsg_len; /* Length of data in cmsg_data plus length
                               of cmsghdr structure.
                               !! The type should be socklen_t but the
                               definition of the kernel is incompatible  with this.  */
    int cmsg_level;  /* Originating protocol.  */
    int cmsg_type;  /* Protocol specific type.  */
#if (!defined __STRICT_ANSI__ && __GNUC__ >= 2) || __STDC_VERSION__ >= 199901L
    __extension__ unsigned char __cmsg_data __flexarr; /* Ancillary data.  */
#endif
};


Solarisではこうなっていました、ちょっと違いますね
--------------------
./sys/socket.h:327:
--------------------
/*
 * Message header for recvmsg and sendmsg calls.
 */
struct msghdr {
 void  *msg_name;  /* optional address */
 socklen_t msg_namelen;  /* size of address */
 struct iovec *msg_iov;  /* scatter/gather array */
 int  msg_iovlen;  /* # elements in msg_iov */

#if defined(__lint) || defined(_XPG4_2) || defined(_KERNEL)
 void  *msg_control;  /* ancillary data */
 socklen_t msg_controllen;  /* ancillary data buffer len */
 int  msg_flags;  /* flags on received message */
#else
 caddr_t  msg_accrights; /* access rights sent/received */
 int  msg_accrightslen;
#endif /* defined(_XPG4_2) || defined(_KERNEL) */
};

--------------------
sendmsg()
--------------------
ssize_t sendmsg(int s, const struct msghdr *msg, int flags);

--------------------
struct iovec
--------------------
#if !defined(__USE_LEGACY_PROTOTYPES__)
typedef struct iovec {
    void    *iov_base;
    size_t  iov_len;
} iovec_t;
#else
typedef struct iovec {
    caddr_t iov_base;
#if defined(_LP64)
    size_t  iov_len;
#else
    long    iov_len;
#endif
} iovec_t;
#endif  /* !defined(__USE_LEGACY_PROTOTYPES__) */

--------------------
./sys/socket.h:426:
--------------------
struct cmsghdr {
 socklen_t cmsg_len; /* data byte count, including hdr */
 int  cmsg_level; /* originating protocol */
 int  cmsg_type; /* protocol-specific type */
};

「defined(__lint) || defined(_XPG4_2) || defined(_KERNEL)」のあたりがいつ定義されるかは詳しく分かってません、すいません。

ソケットバッファのサイズも64bit版Linux、(そうもちろんGentooですが...)とSolaris Zoneでは違うのですね、Solaris Zoneは32bitでコンパイルしてしまいましたが...

UNIXドメインソケットのいろいろ

ソケットってイメージてきには


こういう感じですかね、データが混ざらないんですね、当たり前ですか...。

サンプルは面倒な方法でやっていますが、もっと簡単な方法でreadv()やwritev()などの関数もあるので調べてみるのも面白いとおもいます。
例えばMySQLの/tmp/mysql.sockなんかsockaddr_un構造体へファイルのパスを指定してbind()を呼んでいるソケットです、Unixドメインソケットにもいろいろありますね。


あまりというか、おもしろくないコードですが、意外とソケットの動きを確認できます、「こういう時にBroken pipeが出ちゃうかぁ...」とか、「バッファが一杯でブロックするのね」、とか、「ソケットの送受信バッファってOSが違うとだいぶ違うのね」、とか発見がおおいです。

できる事はこれだけじゃなくて、「ディスクリプタパッシング」ってファイルディスクリプタを別のプロセスに送っちゃったり、ユーザーのクレデンシャルを送受信できたりといろいろな利用方法があるのですね、案外忘れてることが多いです。(こういうデータは補助情報、msg_controlメンバに載せるらしくUnixドメインソケットを作成する時に追加オプションが必要らしいが...)まぁまぁいいとして、

なかなか勉強になるではないですか!

是非、皆さんにもUnixドメインソケットライフで楽しんでいただきたいと思います、

和人君が最近リア充らしくSocketライフを楽しんでるっていう話にチョイムカつき気味...な私ですが

(次はNonblockingソケットにしてやるって...)




0 件のコメント:

コメントを投稿