并发程序设计6:IOCP


  本节记录Windows下与epoll类似的机制IOCP(input outpout completion port)。对于单台电脑的多TCP连接请求,IOCP和epoll是比较好的选择。因为IOCP会用到重叠IO的一些函数,因此先记录重叠IO。

1. 重叠IO

1.1 关键函数

  由于IOCP的使用会用到较多重叠IO相关的函数,先记录一下重叠IO。所谓重叠IO,就是在异步IO时,由于发送/接收函数调用后立即返回,那么单线程就能同时收发多个数据,表现出来就是IO同时向不同套接字同时发送,如下图所示:

                               

                                                                       重叠IO模型

为了实现重叠IO,必须创建适用于重叠IO的套接字。

#include 

SOCKET WSASocket(int af,int type,int protocol,LPWSAPROTOCOL_INFO lpProtocolInfo, GROUP g,DWORD DWFlags); 

//创建适用于重叠IO的套接字示例:
WSASocket(PF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);

 创建了重叠IO的套接字还要能使用重叠IO的发送和接收函数

#include 

int WSASend
(
    SOCKET s,  //发送的套接字
    LPWSABUF lpBuffers, //存储待发送数据的数组
    DWORD dwBufferCount, //数组个数,一般为1
   LPDWORD lpNumberofBytesSent,
LPWSAOVERLAPPED lpOverlapped, //非常重要 LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine )
int WSARecv
(
  SOCKET s,
  LPWSABUF lpBuffers,
  DWORD dwBufferCount,
  LPDWORD lpNumberofByteRecved,
  LPWORD lpFlags,
  LPWSAOVERLAPPED lpOverlapped,
  LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
)

下面来说明上面比较重要的参数

第二个参数lpBuffers存放发送和接收数据的数组,结构体定义如下:

typedef struct __WSABUF
{
u_long len; //数组长度
char FAR* buf;
} WSABUF,* LPWSABUF

第四个参数代表收发的具体字节数,由于是异步收发,如果调用能当场完成,存储的就是实际收发的字节数;如果不能当场完成,会返回SOCKET_ERROR,此时再调用以下函数:

WSAEVENT event=WSACreateEvent();
WSAOVERLAPPED overlapped;
overlapped.hEvent=event; //通过该事件获取返回通知
if
(WSASend(...)==SOCKET_ERROR) { if(WSAGetLastError()==WSA_IO_PENDING) { WSAWaitForMultipleEvents(1,&event,TRUE,WSA_IFINITE,FALSE); WSAGetOverlappedResult(sock,&overlapped,&sentbyte,FALSE,NULL); //sentbyte此时存储的为实际发送字节,接收与此相同,具体示例查看完整代码 } }

第六个参数lpOverlapped为LPWSAOVERLAPPED类型结构体指针,该结构体有个关键变量WSAEVENT,通过该事件可以获取收发完成的通知。如果调用函数时该变量NULL,将会以阻塞方式工作。第七个参数也用于获取收发完成的通知。

1.2 获取收发完成通知

  获取收发完成的通知有两种方式,第一种为通过WSASend和WSARecv第六个参数lpOverlapped中的WSAEVENT获取。即收发完成,lpOverlapped的事件会置为signaled状态,调用WSAWaitForMultipleEvents()即可,上面在介绍获取收发实际字节数时采用的即时此方法。

  下面主要介绍利用最后一个参数的方法。WSASend和WSARecv指定一个函数来验证收发完成情况,一旦收发完成,就调用该函数。为了不在主函数执行重要任务时被打断,规定只有在alertable_wait状态才能调用验证收发完成的函数。使程序进入alertable_wait有以下几个函数:

WaitForSingleObjectEx
WaitForMultipleObjectsEx
WSAWaitForMultipleEvents
SleepEx

   最后利用重叠IO完成一个回声服务器服务器端函数的编写,代码如下:

  1 #include 
  2 #include 
  3 #include 
  4 #include 
  5 #include 
  6 #define BUF_SIZE 50
  7 #pragma comment(lib,"ws2_32.lib")
  8 
  9 
 10 void CALLBACK ReadCmpRoutine(DWORD dwError,DWORD szRecvBytes,LPWSAOVERLAPPED lpoverlapped,DWORD flags);  //验证接收完成的函数,传入参数固定
 11 void CALLBACK WriteCmpRoutine(DWORD dwError, DWORD szRecvBytes, LPWSAOVERLAPPED lpoverlapped, DWORD flags);
 12 void error_handle(const char* msg)
 13 {
 14     printf("%s\n", msg);
 15     exit(0);
 16 }
 17 
 18 typedef struct
 19 {
 20     SOCKET sock;
 21     char buf[BUF_SIZE];
 22     WSABUF wsabuf;
 23 }IODATA,*LPIODATA;
 24 
 25 
 26 int main()
 27 {
 28     SOCKET servsock, clntsock;
 29     SOCKADDR_IN servaddr, clntaddr;
 30 
 31     WSADATA wsadata;
 32     if (WSAStartup(MAKEWORD(2, 2), &wsadata) != 0)
 33         error_handle("Startup() error");
 34 
 35     u_long mode = 1;
 36     servsock = WSASocket(PF_INET, SOCK_STREAM, 0,NULL,0,WSA_FLAG_OVERLAPPED); 
 37     ioctlsocket(servsock,FIONBIO,&mode);  //创建非阻塞套接字
 38 
 39     memset(&servaddr, 0, sizeof(servaddr));
 40     servaddr.sin_family = AF_INET;
 41     const char* host = "192.168.0.105"; //本机主机IP
 42     inet_pton(AF_INET, host, (void*)&servaddr.sin_addr);
 43     servaddr.sin_port = htons(8000);  //由于VS版本检查,一些早期的函数会报错
 44 
 45     if (bind(servsock, (SOCKADDR*)&servaddr, sizeof(sockaddr)) == SOCKET_ERROR)
 46         error_handle("bind() error");
 47     if (listen(servsock, 5) == SOCKET_ERROR)
 48         error_handle("listen() error");
 49     int clntlen = sizeof(clntaddr);
 50 
 51     LPWSAOVERLAPPED overlapped; //overlapped指针
 52     LPIODATA IOData;
 53     DWORD recvBytes, flaginfo=0;
 54 
 55     while (1)
 56     {
 57         SleepEx(100, TRUE); //进入alertable_wait状态
 58         clntsock = accept(servsock, (SOCKADDR*)&clntaddr, &clntlen);
 59         if (clntsock == INVALID_SOCKET)
 60         {
 61             if (WSAGetLastError() == WSAEWOULDBLOCK) //当前没有连接请求
 62                 continue;
 63             else
 64                 error_handle("accept() error");
 65         }
 66         overlapped = (LPWSAOVERLAPPED)malloc(sizeof(WSAOVERLAPPED));
 67         IOData = (LPIODATA)malloc(sizeof(IODATA));
 68         IOData->sock =clntsock;
 69         (IOData->wsabuf).buf = IOData->buf;
 70         (IOData->wsabuf).len = BUF_SIZE; //调用WSASend和WSARecv函数必须的结构体声明
 71         overlapped->hEvent =(HANDLE)IOData;//因为不采用事件作为接收完成的通知,此处利用hEvent传递其他数据
 72 
 73         WSARecv(clntsock, &(IOData->wsabuf), 1, &recvBytes, &flaginfo, overlapped, ReadCmpRoutine);
 74     }
 75     closesocket(clntsock);
 76     closesocket(servsock);
 77     WSACleanup();
 78     return 0;
 79 }
 80 
 81 void CALLBACK ReadCmpRoutine(DWORD dwError, DWORD szRecvBytes, LPWSAOVERLAPPED lpoverlapped, DWORD flags)
 82 {
 83     LPIODATA IOData = (LPIODATA)lpoverlapped->hEvent; //获取传递过来的数据
 84     DWORD SentBytes;
 85     if (szRecvBytes == 0) //关闭套接字请求
 86     {
 87         closesocket(IOData->sock);
 88         free(IOData);
 89         free(lpoverlapped);
 90     }
 91     else  //发送回客户端
 92     {
 93         (IOData->wsabuf).len = szRecvBytes;
 94         WSASend(IOData->sock, &(IOData->wsabuf), 1, &SentBytes, 0, lpoverlapped, WriteCmpRoutine);
 95     }
 96 }
 97 
 98 void CALLBACK WriteCmpRoutine(DWORD dwError, DWORD szRecvBytes, LPWSAOVERLAPPED lpoverlapped, DWORD flags)
 99 {
100     LPIODATA IOData = (LPIODATA)lpoverlapped->hEvent; //获取传递过来的数据
101     DWORD flaginfo = 0;
102     DWORD recvBytes;
103     WSARecv(IOData->sock, &(IOData->wsabuf), 1, &recvBytes, &flaginfo, lpoverlapped, ReadCmpRoutine);
104 }

   先简要说明一下上面代码的逻辑:创建非阻塞重复IO的套接字之后,开始不断监测连接请求,当有连接请求时,调用WSARecv(),此时注册了ReadCmpRoutine函数,因此接收完成就会跳入ReadCmpRoutine函数,然后该函数又调用WSASend()把数据发送回去。在发送回去的同时,注册WriteCmpRoutine函数,该函数又调用WSARecv开始接收数据,并注册ReadCmpRoutine。因此接收到数据之后,又会执行ReadCmpRoutine函数,如此往复。

注:

(1) 66-71行申请并初始化相关指针,通过将指针赋值给overlapped->hEvent进行指针传递,这样就将客户端套接字和接收消息的数组传给了ReadCmpRoutine函数,然后客户端套接字和接收消息的数组就在ReadCmpRoutine与WriteCmpRoutine函数之间传递;

(2) 93行接收到消息后,将wsabuf的长度设置为实际接收数据的长度再发送,那么发送的长度就和接收长度一致了,否则wsabuf接收到的数据后面的乱码也会发送;

(3) 重复调用accept和SleepEx是影响该程序性能的主要原因。

2. IOCP

IOCP全称Input output completion port(输入输出完成端口)。简单来说,就是专门设计线程来监听发送,接收完成事件。本人的理解(不一定对),在主线程创建完成端口并与套接字绑定,此时在此套接字上的所有发送接收信息的通知都进入一个完成端口管理的队列。然后完成端口有专门的线程对完成端口管理的队列进行处理,也就是通过显式的线程去监控发送,接收情况。

  使用完成端口有三个主要步骤:创建完成端口,完成端口与待监视的套接字绑定,线程监视套接字发送,接收完成情况。对应着两个函数:

#include 
HANDLE IOCP=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,threadNum);//创建完成端口,返回完成端口句柄
threadNum:创建多少个线程来管理完成端口对收发的监控,为0时就创建CPU核数那么多的线程

SOCKET clntsock=accept() ; //调用accept函数来获得与客户端连接的套接字
CreateIoCompletionPort((HANDLE)clntsock, IOCP,ULONG_PTR CompletionKey , 0); //完成监听的套接字与IOCP的绑定,其中CompletionKey 可以用来传递数据

GetQueuedCompletionStatus(IOCP, &byteTransferred, ULONG_PTR CompletionKey, LPOVERLAPPED* lpOverlapped, DWORD dwMilliseconds);  //确认已完成收发的套接字,专门创建的线程调用此函数,以从完成端口的队列中取出收发完成的端口
IOCP:完成端口句柄;
byteTransferred:记录传输的字节数
CompletionKey:该参数就是调用CreateIoCompletionPort完成绑定时的第三个参数。比如完成绑定时,我需要把与客户端连接的套接字传到完成端口处理线程中,就可以通过该参数传递;
lpOverlapped:调用WSASend()和WSARecv()时其第六个参数,也可以用该参数传递数据;
dwMilliseconds:等待时间

   上面最难理解的参数是GetQueuedCompletionStatus()函数的第三个和第四个参数,这两个参数分别与调用CreateIoCompletionPort()函数的第3个参数和调用WSASend()与WSARecv()函数的第六个参数相同,即相当于调用CreateIoCompletionPort,WSASend,WSARecv时传递实参,而函数GetQueuedCompletionStatus第三个和第四个变量为形参,在接下来完整的IOCP代码中会实现。同样以回声服务器服务器端为例,先介绍主要流程:

可以看到实现调用GetQueuedCompletionStatus函数从完成端口队列中取出相应套接字处理,调用WSASend和WSARecv将套接字加入到完成端口队列。下面是完整代码:

  1 #include 
  2 #include 
  3 #include 
  4 #include 
  5 #include 
  6 #include 
  7 #include 
  8 #define BUF_SIZE 50
  9 #define READ 2
 10 #define WRITE 3
 11 
 12 #pragma comment(lib,"ws2_32.lib")
 13 
 14 
 15 void error_handle(const char* msg)
 16 {
 17     printf("%s\n", msg);
 18     exit(0);
 19 }
 20 
 21 unsigned   __stdcall ThreadForIOCP(void* port);
 22 
 23 typedef struct
 24 {
 25     OVERLAPPED overlapped;
 26     WSABUF wsabuf;
 27     char buf[BUF_SIZE];
 28     int rwMode;
 29 }IODATA,*LPIODATA;
 30 
 31 int main()
 32 {
 33     SOCKET servsock, clntsock;
 34     SOCKADDR_IN servaddr, clntaddr;
 35 
 36     WSADATA wsadata;
 37     if (WSAStartup(MAKEWORD(2, 2), &wsadata) != 0)
 38         error_handle("Startup() error");
 39 
 40     HANDLE IOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); //创建完成端口
 41     for (int i = 0; i < 3; i++)
 42         _beginthreadex(NULL, 0, ThreadForIOCP, (void*)IOCP, 0, NULL);  //创建线程处理完成IO监测
 43 
 44     servsock = WSASocket(PF_INET, SOCK_STREAM, 0,NULL,0,WSA_FLAG_OVERLAPPED);  //创建重叠IO
 45     memset(&servaddr, 0, sizeof(servaddr));
 46     servaddr.sin_family = AF_INET;
 47     const char* host = "192.168.0.105"; //本机主机IP
 48     inet_pton(AF_INET, host, (void*)&servaddr.sin_addr);
 49     servaddr.sin_port = htons(8000);  //由于VS版本检查,一些早期的函数会报错
 50 
 51 
 52     if (bind(servsock, (SOCKADDR*)&servaddr, sizeof(sockaddr)) == SOCKET_ERROR)
 53         error_handle("bind() error");
 54     if (listen(servsock, 5) == SOCKET_ERROR)
 55         error_handle("listen() error");
 56 
 57     int clntlen = sizeof(clntaddr);
 58     DWORD recvByte = 0,flag=0;
 59 
 60     while (1)
 61     {
 62         clntsock = accept(servsock, (SOCKADDR*)&clntaddr, &clntlen);  //accept是阻塞的
 63         if (clntsock == INVALID_SOCKET)
 64         {
 65             closesocket(clntsock);
 66             continue;
 67         }
 68         CreateIoCompletionPort((HANDLE)clntsock, IOCP, (DWORD)clntsock, 0); //将完成端口与套接字绑定,传递套接字
 69 
 70         LPIODATA IOData = (LPIODATA)malloc(sizeof(IODATA));
 71         (IOData->wsabuf).len = BUF_SIZE;
 72         (IOData->wsabuf).buf = IOData->buf;
 73         IOData->rwMode = READ;  //传递收发信息
 74         memset(&(IOData->overlapped), 0, sizeof(WSAOVERLAPPED));
 75         WSARecv(clntsock, &(IOData->wsabuf), 1, &recvByte, &flag, &(IOData->overlapped), NULL); 
 76     }
 77     return 0;
 78 }
 79 
 80 unsigned   __stdcall ThreadForIOCP(void* port)
 81 {
 82     HANDLE IOCP = (HANDLE)port;  //接收1完成端口
 83     SOCKET sock;
 84     DWORD byteTransferred;
 85     DWORD flags = 0;
 86     LPIODATA IOData;
 87     while (1)
 88     {
 89         GetQueuedCompletionStatus(IOCP, &byteTransferred, (LPDWORD)&sock, (LPOVERLAPPED*)&IOData, INFINITE);
 90         if (IOData->rwMode == READ)
 91         {
 92             if (byteTransferred == 0) //关闭请求
 93             {
 94                 closesocket(sock);
 95                 free(IOData);
 96                 continue;
 97             }
 98             (IOData->wsabuf).len = byteTransferred;
 99             IOData->rwMode = WRITE;
100             send(sock, (IOData->wsabuf).buf, byteTransferred, 0);
101             WSASend(sock, &(IOData->wsabuf), 1, NULL, 0, &(IOData->overlapped), NULL);
102 
103             IOData = (LPIODATA)malloc(sizeof(IODATA));
104             memset(&(IOData->overlapped), 0, sizeof(OVERLAPPED));
105             (IOData->wsabuf).buf = IOData->buf;
106             (IOData->wsabuf).len = BUF_SIZE;
107             IOData->rwMode = READ;
108             
109             WSARecv(sock,&(IOData->wsabuf),1,NULL,&flags,&(IOData->overlapped),NULL);
110         }
111         else
112         {
113             printf("Message sent");
114             free(IOData);
115         }
116     }
117 
118 
119 }

 在上面的代码中,需要注意以下重要的细节:

(1) 42行创建了专门用于管理完成端口的线程,通过线程传入了完成端口的句柄IOCP,这样线程就能操作完成端口;

(2)68行在第二次调用CreateIoCompletionPort()函数绑定IOCP与对应的套接字时,通过该函数第三个参数ULONG_PTR CompletionKey传入了与客户端连接的套接字信息;

(3) 75行首次调用WSARecv函数,通过该函数的第六个变量传入了结构体IOData。该结构体定义了OVERLAPPED结构体(调用WSARecv需要该结构体),WSABUF(保存收发的信息),rwMode(表明此次处理的完成端口事件是读还是写)。因为结构体的地址与该结构体第一个变量地址相同,所以传入IOData->overlapped的地址相当于传入IOData的地址。

(4) 实际在使用IOCP时,在完成IOCP的创建与IOCP和套接字绑定后,完成端口的线程就能处理IO完成事件。通常的网络库会将accept,close,send,recv(即建立连接,关闭连接,读,写)都加入IOCP中(加入IOCP队列即调用特定的函数,比如想要IO发送完数据后进入IOCP完成队列,只需要调用WSASend,调用send发送完则不会进入IOCP完成队列),同时设置一个标志位(类似本程序中的rwMode)来标志本次是何种IO完成事件