Rover12421's Blog

The End.

Windows Socket五种I/O模型——代码全攻略

       如果你想在Windows平台上构建服务器应用,那么I/O模型是你必须考虑的。Windows操作系统提供了选择(Select)、异步选择(WSAAsyncSelect)、事件选择(WSAEventSelect)、重叠I/O(Overlapped I/O)和完成端口(Completion Port)共五种I/O模型。每一种模型均适用于一种特定的应用场景。程序员应该对自己的应用需求非常明确,而且综合考虑到程序的扩展性和可移植性等因素,作出自己的选择。

        我会以一个回应反射式服务器(与《Windows网络编程》第八章一样)来介绍这五种I/O模型。
        我们假设客户端的代码如下(为代码直观,省去所有错误检查,以下同):

Code Snippet
  1. #include
  2. #include
  3.  
  4. #define SERVER_ADDRESS "137.117.2.148"
  5. #define PORT           5150
  6. #define MSGSIZE        1024
  7.  
  8. #pragma comment(lib, "ws2_32.lib")
  9.  
  10. int main()
  11. {
  12.     WSADATA     wsaData;
  13.     SOCKET      sClient;
  14.     SOCKADDR_IN server;
  15.     char        szMessage[MSGSIZE];
  16.     int         ret;
  17.  
  18.     // Initialize Windows socket library
  19.     WSAStartup(0x0202, &wsaData);
  20.  
  21.     // Create client socket
  22.     sClient = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  23.  
  24.     // Connect to server
  25.     memset(&server, 0, sizeof(SOCKADDR_IN));
  26.     server.sin_family = AF_INET;
  27.     server.sin_addr.S_un.S_addr = inet_addr(SERVER_ADDRESS);
  28.     server.sin_port = htons(PORT);
  29.  
  30.     connect(sClient, (struct sockaddr *)&server, sizeof(SOCKADDR_IN));
  31.  
  32.     while (TRUE)
  33.     {
  34.         printf("Send:");
  35.         gets(szMessage);
  36.  
  37.         // Send message
  38.         send(sClient, szMessage, strlen(szMessage), 0);
  39.  
  40.         // Receive message
  41.         ret = recv(sClient, szMessage, MSGSIZE, 0);
  42.         szMessage[ret] = '\0';
  43.  
  44.         printf("Received [%d bytes]: '%s'\n", ret, szMessage);
  45.     }
  46.  
  47.     // Clean up
  48.     closesocket(sClient);
  49.     WSACleanup();
  50.     return 0;
  51. }

        客户端所做的事情相当简单,创建套接字,连接服务器,然后不停的发送和接收数据。

        比较容易想到的一种服务器模型就是采用一个主线程,负责监听客户端的连接请求,当接收到某个客户端的连接请求后,创建一个专门用于和该客户端通信的套接字和一个辅助线程。以后该客户端和服务器的交互都在这个辅助线程内完成。这种方法比较直观,程序非常简单而且可移植性好,但是不能利用平台相关的特性。例如,如果连接数增多的时候(成千上万的连接),那么线程数成倍增长,操作系统忙于频繁的线程间切换,而且大部分线程在其生命周期内都是处于非活动状态的,这大大浪费了系统的资源。所以,如果你已经知道你的代码只会运行在Windows平台上,建议采用Winsock I/O模型。

        一.选择模型
        Select(选择)模型是Winsock中最常见的I/O模型。之所以称其为“Select模型”,是由于它的“中心思想”便是利用select函数,实现对I/O的管理。最初设计该模型时,主要面向的是某些使用UNIX操作系统的计算机,它们采用的是Berkeley套接字方案。Select模型已集成到Winsock 1.1中,它使那些想避免在套接字调用过程中被无辜“锁定”的应用程序,采取一种有序的方式,同时进行对多个套接字的管理。由于Winsock 1.1向后兼容于Berkeley套接字实施方案,所以假如有一个Berkeley套接字应用使用了select函数,那么从理论角度讲,毋需对其进行任何修改,便可正常运行。(节选自《Windows网络编程》第八章)
下面的这段程序就是利用选择模型实现的Echo服务器的代码(已经不能再精简了):

Code Snippet
  1. #include
  2. #include
  3.  
  4. #define PORT       5150
  5. #define MSGSIZE    1024
  6.  
  7. #pragma comment(lib, "ws2_32.lib")
  8.  
  9. int    g_iTotalConn = 0;
  10. SOCKET g_CliSocketArr[FD_SETSIZE];
  11.  
  12. DWORD WINAPI WorkerThread(LPVOID lpParameter);
  13.  
  14. int main()
  15. {
  16.     WSADATA     wsaData;
  17.     SOCKET      sListen, sClient;
  18.     SOCKADDR_IN local, client;
  19.     int         iaddrSize = sizeof(SOCKADDR_IN);
  20.     DWORD       dwThreadId;
  21.  
  22.     // Initialize Windows socket library
  23.     WSAStartup(0x0202, &wsaData);
  24.  
  25.     // Create listening socket
  26.     sListen = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  27.  
  28.     // Bind
  29.     local.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
  30.     local.sin_family = AF_INET;
  31.     local.sin_port = htons(PORT);
  32.     bind(sListen, (struct sockaddr *)&local, sizeof(SOCKADDR_IN));
  33.  
  34.     // Listen
  35.     listen(sListen, 3);
  36.  
  37.     // Create worker thread
  38.     CreateThread(NULL, 0, WorkerThread, NULL, 0, &dwThreadId);  
  39.  
  40.     while (TRUE)
  41.     {
  42.         // Accept a connection
  43.         sClient = accept(sListen, (struct sockaddr *)&client, &iaddrSize);
  44.         printf("Accepted client:%s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));
  45.  
  46.         // Add socket to g_CliSocketArr
  47.         g_CliSocketArr[g_iTotalConn++] = sClient;
  48.     }
  49.  
  50.     return 0;
  51. }
  52.  
  53. DWORD WINAPI WorkerThread(LPVOID lpParam)
  54. {
  55.     int            i;
  56.     fd_set         fdread;
  57.     int            ret;
  58.     struct timeval tv = {1, 0};
  59.     char           szMessage[MSGSIZE];
  60.  
  61.     while (TRUE)
  62.     {
  63.         FD_ZERO(&fdread);
  64.         for (i = 0; i < g_iTotalConn; i++)
  65.         {
  66.             FD_SET(g_CliSocketArr[i], &fdread);
  67.         }
  68.  
  69.         // We only care read event
  70.         ret = select(0, &fdread, NULL, NULL, &tv);
  71.  
  72.         if (ret == 0)
  73.         {
  74.             // Time expired
  75.             continue;
  76.         }
  77.  
  78.         for (i = 0; i < g_iTotalConn; i++)
  79.         {
  80.             if (FD_ISSET(g_CliSocketArr[i], &fdread))
  81.             {
  82.                 // A read event happened on g_CliSocketArr[i]
  83.                 ret = recv(g_CliSocketArr[i], szMessage, MSGSIZE, 0);
  84.                 if (ret == 0 || (ret == SOCKET_ERROR && WSAGetLastError() == WSAECONNRESET))
  85.                 {
  86.                     // Client socket closed
  87.                     printf("Client socket %d closed.\n", g_CliSocketArr[i]);
  88.                     closesocket(g_CliSocketArr[i]);
  89.                     if (i < g_iTotalConn - 1)
  90.                     {            
  91.                         g_CliSocketArr[i--] = g_CliSocketArr[--g_iTotalConn];
  92.                     }
  93.                 }
  94.                 else
  95.                 {
  96.                     // We received a message from client
  97.                     szMessage[ret] = '\0';
  98.                     send(g_CliSocketArr[i], szMessage, strlen(szMessage), 0);
  99.                 }
  100.             }
  101.         }
  102.     }
  103.  
  104.     return 0;
  105. }

      服务器的几个主要动作如下:
        1.创建监听套接字,绑定,监听;
        2.创建工作者线程;
        3.创建一个套接字数组,用来存放当前所有活动的客户端套接字,每accept一个连接就更新一次数组;
        4.接受客户端的连接。这里有一点需要注意的,就是我没有重新定义FD_SETSIZE宏,所以服务器最多支持的并发连接数为64。而且,这里决不能无条件的accept,服务器应该根据当前的连接数来决定是否接受来自某个客户端的连接。一种比较好的实现方案就是采用WSAAccept函数,而且让WSAAccept回调自己实现的Condition Function。如下所示:

Code Snippet
  1. int CALLBACK ConditionFunc(LPWSABUF lpCallerId,LPWSABUF lpCallerData, LPQOS lpSQOS,LPQOS lpGQOS,LPWSABUF lpCalleeId, LPWSABUF lpCalleeData,GROUP FAR * g,DWORD dwCallbackData)
  2. {
  3.     if (µ±Ç°Á¬½ÓÊý < FD_SETSIZE)
  4.         return CF_ACCEPT;
  5.     else
  6.         return CF_REJECT;
  7. }

       工作者线程里面是一个死循环,一次循环完成的动作是:
        1.将当前所有的客户端套接字加入到读集fdread中;
        2.调用select函数;
        3.查看某个套接字是否仍然处于读集中,如果是,则接收数据。如果接收的数据长度为0,或者发生WSAECONNRESET错误,则表示客户端套接字主动关闭,这时需要将服务器中对应的套接字所绑定的资源释放掉,然后调整我们的套接字数组(将数组中最后一个套接字挪到当前的位置上)

        除了需要有条件接受客户端的连接外,还需要在连接数为0的情形下做特殊处理,因为如果读集中没有任何套接字,select函数会立刻返回,这将导致工作者线程成为一个毫无停顿的死循环,CPU的占用率马上达到100%。

        二.异步选择
        Winsock提供了一个有用的异步I/O模型。利用这个模型,应用程序可在一个套接字上,接收以Windows消息为基础的网络事件通知。具体的做法是在建好一个套接字后,调用WSAAsyncSelect函数。该模型最早出现于Winsock的1.1版本中,用于帮助应用程序开发者面向一些早期的16位Windows平台(如Windows for Workgroups),适应其“落后”的多任务消息环境。应用程序仍可从这种模型中得到好处,特别是它们用一个标准的Windows例程(常称为"WndProc"),对窗口消息进行管理的时候。该模型亦得到了Microsoft Foundation Class(微软基本类,MFC)对象CSocket的采纳。(节选自《Windows网络编程》第八章)
        我还是先贴出代码,然后做详细解释:

Code Snippet
  1. #include
  2. #include
  3.  
  4. #define PORT      5150
  5. #define MSGSIZE   1024
  6. #define WM_SOCKET WM_USER+0
  7.  
  8. #pragma comment(lib, "ws2_32.lib")
  9.  
  10. LRESULT CALLBACK WndProc(HWND, UINT, WPARAM, LPARAM);
  11.  
  12. int WINAPI WinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, PSTR szCmdLine, int iCmdShow)
  13. {
  14.     static TCHAR szAppName[] = _T("AsyncSelect Model");
  15.     HWND         hwnd ;
  16.     MSG          msg ;
  17.     WNDCLASS     wndclass ;
  18.  
  19.     wndclass.style         = CS_HREDRAW | CS_VREDRAW ;
  20.     wndclass.lpfnWndProc   = WndProc ;
  21.     wndclass.cbClsExtra    = 0 ;
  22.     wndclass.cbWndExtra    = 0 ;
  23.     wndclass.hInstance     = hInstance ;
  24.     wndclass.hIcon         = LoadIcon (NULL, IDI_APPLICATION) ;
  25.     wndclass.hCursor       = LoadCursor (NULL, IDC_ARROW) ;
  26.     wndclass.hbrBackground = (HBRUSH) GetStockObject (WHITE_BRUSH) ;
  27.     wndclass.lpszMenuName  = NULL ;
  28.     wndclass.lpszClassName = szAppName ;
  29.  
  30.     if (!RegisterClass(&wndclass))
  31.     {
  32.         MessageBox (NULL, TEXT ("This program requires Windows NT!"), szAppName, MB_ICONERROR) ;
  33.         return 0 ;
  34.     }
  35.  
  36.     hwnd = CreateWindow (szAppName,                  // window class name
  37.         TEXT ("AsyncSelect Model"), // window caption
  38.         WS_OVERLAPPEDWINDOW,        // window style
  39.         CW_USEDEFAULT,              // initial x position
  40.         CW_USEDEFAULT,              // initial y position
  41.         CW_USEDEFAULT,              // initial x size
  42.         CW_USEDEFAULT,              // initial y size
  43.         NULL,                       // parent window handle
  44.         NULL,                       // window menu handle
  45.         hInstance,                  // program instance handle
  46.         NULL) ;                     // creation parameters
  47.  
  48.     ShowWindow(hwnd, iCmdShow);
  49.     UpdateWindow(hwnd);
  50.  
  51.     while (GetMessage(&msg, NULL, 0, 0))
  52.     {
  53.         TranslateMessage(&msg) ;
  54.         DispatchMessage(&msg) ;
  55.     }
  56.  
  57.     return msg.wParam;
  58. }
  59.  
  60. LRESULT CALLBACK WndProc (HWND hwnd, UINT message, WPARAM wParam, LPARAM lParam)
  61. {
  62.     WSADATA       wsd;
  63.     static SOCKET sListen;
  64.     SOCKET        sClient;
  65.     SOCKADDR_IN   local, client;
  66.     int           ret, iAddrSize = sizeof(client);
  67.     char          szMessage[MSGSIZE];
  68.  
  69.     switch (message)
  70.     {
  71.     case WM_CREATE:
  72.         // Initialize Windows Socket library
  73.         WSAStartup(0x0202, &wsd);
  74.  
  75.         // Create listening socket
  76.         sListen = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  77.  
  78.         // Bind
  79.         local.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
  80.         local.sin_family = AF_INET;
  81.         local.sin_port = htons(PORT);
  82.         bind(sListen, (struct sockaddr *)&local, sizeof(local));
  83.  
  84.         // Listen
  85.         listen(sListen, 3);
  86.  
  87.         // Associate listening socket with FD_ACCEPT event
  88.         WSAAsyncSelect(sListen, hwnd, WM_SOCKET, FD_ACCEPT);
  89.         return 0;
  90.  
  91.     case WM_DESTROY:
  92.         closesocket(sListen);
  93.         WSACleanup();
  94.         PostQuitMessage(0);
  95.         return 0;
  96.  
  97.     case WM_SOCKET:
  98.         if (WSAGETSELECTERROR(lParam))
  99.         {
  100.             closesocket(wParam);
  101.             break;
  102.         }
  103.  
  104.         switch (WSAGETSELECTEVENT(lParam))
  105.         {
  106.         case FD_ACCEPT:
  107.             // Accept a connection from client
  108.             sClient = accept(wParam, (struct sockaddr *)&client, &iAddrSize);
  109.  
  110.             // Associate client socket with FD_READ and FD_CLOSE event
  111.             WSAAsyncSelect(sClient, hwnd, WM_SOCKET, FD_READ | FD_CLOSE);
  112.             break;
  113.  
  114.         case FD_READ:
  115.             ret = recv(wParam, szMessage, MSGSIZE, 0);
  116.  
  117.             if (ret == 0 || ret == SOCKET_ERROR && WSAGetLastError() == WSAECONNRESET)
  118.             {
  119.                 closesocket(wParam);
  120.             }
  121.             else
  122.             {
  123.                 szMessage[ret] = '\0';
  124.                 send(wParam, szMessage, strlen(szMessage), 0);
  125.             }
  126.             break;
  127.  
  128.         case FD_CLOSE:
  129.             closesocket(wParam);      
  130.             break;
  131.         }
  132.         return 0;
  133.     }
  134.  
  135.     return DefWindowProc(hwnd, message, wParam, lParam);
  136. }

       在我看来,WSAAsyncSelect是最简单的一种Winsock I/O模型(之所以说它简单是因为一个主线程就搞定了)。使用Raw Windows API写过窗口类应用程序的人应该都能看得懂。这里,我们需要做的仅仅是:
        1.在WM_CREATE消息处理函数中,初始化Windows Socket library,创建监听套接字,绑定,监听,并且调用WSAAsyncSelect函数表示我们关心在监听套接字上发生的FD_ACCEPT事件;
        2.自定义一个消息WM_SOCKET,一旦在我们所关心的套接字(监听套接字和客户端套接字)上发生了某个事件,系统就会调用WndProc并且message参数被设置为WM_SOCKET;
        3.在WM_SOCKET的消息处理函数中,分别对FD_ACCEPT、FD_READ和FD_CLOSE事件进行处理;
        4.在窗口销毁消息(WM_DESTROY)的处理函数中,我们关闭监听套接字,清除Windows Socket library

        下面这张用于WSAAsyncSelect函数的网络事件类型表可以让你对各个网络事件有更清楚的认识:
        表1

        FD_READ         应用程序想要接收有关是否可读的通知,以便读入数据
        FD_WRITE         应用程序想要接收有关是否可写的通知,以便写入数据
        FD_OOB         应用程序想接收是否有带外(OOB)数据抵达的通知
        FD_ACCEPT         应用程序想接收与进入连接有关的通知
        FD_CONNECT         应用程序想接收与一次连接或者多点join操作完成的通知
        FD_CLOSE         应用程序想接收与套接字关闭有关的通知
        FD_QOS         应用程序想接收套接字“服务质量”(QoS)发生更改的通知
        FD_GROUP_QOS          应用程序想接收套接字组“服务质量”发生更改的通知(现在没什么用处,为未来套接字组的使用保留)
        FD_ROUTING_INTERFACE_CHANGE         应用程序想接收在指定的方向上,与路由接口发生变化的通知
        FD_ADDRESS_LIST_CHANGE          应用程序想接收针对套接字的协议家族,本地地址列表发生变化的通知

        三.事件选择
        Winsock提供了另一个有用的异步I/O模型。和WSAAsyncSelect模型类似的是,它也允许应用程序在一个或多个套接字上,接收以事件为基础的网络事件通知。对于表1总结的、由WSAAsyncSelect模型采用的网络事件来说,它们均可原封不动地移植到新模型。在用新模型开发的应用程序中,也能接收和处理所有那些事件。该模型最主要的差别在于网络事件会投递至一个事件对象句柄,而非投递至一个窗口例程。(节选自《Windows网络编程》第八章)
        还是让我们先看代码然后进行分析:

Code Snippet
  1. #include
  2. #include
  3.  
  4. #define PORT    5150
  5. #define MSGSIZE 1024
  6.  
  7. #pragma comment(lib, "ws2_32.lib")
  8.  
  9. int      g_iTotalConn = 0;
  10. SOCKET   g_CliSocketArr[MAXIMUM_WAIT_OBJECTS];
  11. WSAEVENT g_CliEventArr[MAXIMUM_WAIT_OBJECTS];
  12.  
  13. DWORD WINAPI WorkerThread(LPVOID);
  14. void Cleanup(int index);
  15.  
  16. int main()
  17. {
  18.     WSADATA     wsaData;
  19.     SOCKET      sListen, sClient;
  20.     SOCKADDR_IN local, client;
  21.     DWORD       dwThreadId;
  22.     int         iaddrSize = sizeof(SOCKADDR_IN);
  23.  
  24.     // Initialize Windows Socket library
  25.     WSAStartup(0x0202, &wsaData);
  26.  
  27.     // Create listening socket
  28.     sListen = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  29.  
  30.     // Bind
  31.     local.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
  32.     local.sin_family = AF_INET;
  33.     local.sin_port = htons(PORT);
  34.     bind(sListen, (struct sockaddr *)&local, sizeof(SOCKADDR_IN));
  35.  
  36.     // Listen
  37.     listen(sListen, 3);
  38.  
  39.     // Create worker thread
  40.     CreateThread(NULL, 0, WorkerThread, NULL, 0, &dwThreadId);
  41.  
  42.     while (TRUE)
  43.     {
  44.         // Accept a connection
  45.         sClient = accept(sListen, (struct sockaddr *)&client, &iaddrSize);
  46.         printf("Accepted client:%s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));
  47.  
  48.         // Associate socket with network event
  49.         g_CliSocketArr[g_iTotalConn] = sClient;
  50.         g_CliEventArr[g_iTotalConn] = WSACreateEvent();
  51.         WSAEventSelect(g_CliSocketArr[g_iTotalConn], g_CliEventArr[g_iTotalConn], FD_READ | FD_CLOSE);
  52.         g_iTotalConn++;
  53.     }
  54. }
  55.  
  56. DWORD WINAPI WorkerThread(LPVOID lpParam)
  57. {
  58.     int              ret, index;
  59.     WSANETWORKEVENTS NetworkEvents;
  60.     char             szMessage[MSGSIZE];
  61.  
  62.     while (TRUE)
  63.     {
  64.         ret = WSAWaitForMultipleEvents(g_iTotalConn, g_CliEventArr, FALSE, 1000, FALSE);
  65.         if (ret == WSA_WAIT_FAILED || ret == WSA_WAIT_TIMEOUT)
  66.         {
  67.             continue;
  68.         }
  69.  
  70.         index = ret - WSA_WAIT_EVENT_0;
  71.         WSAEnumNetworkEvents(g_CliSocketArr[index], g_CliEventArr[index], &NetworkEvents);
  72.  
  73.         if (NetworkEvents.lNetworkEvents & FD_READ)
  74.         {
  75.             // Receive message from client
  76.             ret = recv(g_CliSocketArr[index], szMessage, MSGSIZE, 0);
  77.             if (ret == 0 || (ret == SOCKET_ERROR && WSAGetLastError() == WSAECONNRESET))
  78.             {
  79.                 Cleanup(index);
  80.             }
  81.             else
  82.             {
  83.                 szMessage[ret] = '\0';
  84.                 send(g_CliSocketArr[index], szMessage, strlen(szMessage), 0);
  85.             }
  86.         }
  87.  
  88.         if (NetworkEvents.lNetworkEvents & FD_CLOSE)
  89.         {
  90.             Cleanup(index);
  91.         }
  92.     }
  93.     return 0;
  94. }
  95.  
  96. void Cleanup(int index)
  97. {
  98.     closesocket(g_CliSocketArr[index]);
  99.     WSACloseEvent(g_CliEventArr[index]);
  100.  
  101.     if (index < g_iTotalConn - 1)
  102.     {
  103.         g_CliSocketArr[index] = g_CliSocketArr[g_iTotalConn - 1];
  104.         g_CliEventArr[index] = g_CliEventArr[g_iTotalConn - 1];
  105.     }
  106.  
  107.     g_iTotalConn--;
  108. }

        事件选择模型也比较简单,实现起来也不是太复杂,它的基本思想是将每个套接字都和一个WSAEVENT对象对应起来,并且在关联的时候指定需要关注的哪些网络事件。一旦在某个套接字上发生了我们关注的事件(FD_READ和FD_CLOSE),与之相关联的WSAEVENT对象被Signaled。程序定义了两个全局数组,一个套接字数组,一个WSAEVENT对象数组,其大小都是MAXIMUM_WAIT_OBJECTS(64),两个数组中的元素一一对应。
        同样的,这里的程序没有考虑两个问题,一是不能无条件的调用accept,因为我们支持的并发连接数有限。解决方法是将套接字按MAXIMUM_WAIT_OBJECTS分组,每MAXIMUM_WAIT_OBJECTS个套接字一组,每一组分配一个工作者线程;或者采用WSAAccept代替accept,并回调自己定义的Condition Function。第二个问题是没有对连接数为0的情形做特殊处理,程序在连接数为0的时候CPU占用率为100%。

        四.重叠I/O模型
        Winsock2的发布使得Socket I/O有了和文件I/O统一的接口。我们可以通过使用Win32文件操纵函数ReadFile和WriteFile来进行Socket I/O。伴随而来的,用于普通文件I/O的重叠I/O模型和完成端口模型对Socket I/O也适用了。这些模型的优点是可以达到更佳的系统性能,但是实现较为复杂,里面涉及较多的C语言技巧。例如我们在完成端口模型中会经常用到所谓的“尾随数据”。

        1.用事件通知方式实现的重叠I/O模型

Code Snippet
  1. #include
  2. #include
  3.  
  4. #define PORT    5150
  5. #define MSGSIZE 1024
  6.  
  7. #pragma comment(lib, "ws2_32.lib")
  8.  
  9. typedef struct
  10. {
  11.     WSAOVERLAPPED overlap;
  12.     WSABUF        Buffer;
  13.     char          szMessage[MSGSIZE];
  14.     DWORD         NumberOfBytesRecvd;
  15.     DWORD         Flags;
  16. }PER_IO_OPERATION_DATA, *LPPER_IO_OPERATION_DATA;
  17.  
  18. int                     g_iTotalConn = 0;
  19. SOCKET                  g_CliSocketArr[MAXIMUM_WAIT_OBJECTS];
  20. WSAEVENT                g_CliEventArr[MAXIMUM_WAIT_OBJECTS];
  21. LPPER_IO_OPERATION_DATA g_pPerIODataArr[MAXIMUM_WAIT_OBJECTS];
  22.  
  23. DWORD WINAPI WorkerThread(LPVOID);
  24. void Cleanup(int);
  25.  
  26. int main()
  27. {
  28.     WSADATA     wsaData;
  29.     SOCKET      sListen, sClient;
  30.     SOCKADDR_IN local, client;
  31.     DWORD       dwThreadId;
  32.     int         iaddrSize = sizeof(SOCKADDR_IN);
  33.  
  34.     // Initialize Windows Socket library
  35.     WSAStartup(0x0202, &wsaData);
  36.  
  37.     // Create listening socket
  38.     sListen = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  39.  
  40.     // Bind
  41.     local.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
  42.     local.sin_family = AF_INET;
  43.     local.sin_port = htons(PORT);
  44.     bind(sListen, (struct sockaddr *)&local, sizeof(SOCKADDR_IN));
  45.  
  46.     // Listen
  47.     listen(sListen, 3);
  48.  
  49.     // Create worker thread
  50.     CreateThread(NULL, 0, WorkerThread, NULL, 0, &dwThreadId);
  51.  
  52.     while (TRUE)
  53.     {
  54.         // Accept a connection
  55.         sClient = accept(sListen, (struct sockaddr *)&client, &iaddrSize);
  56.         printf("Accepted client:%s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));
  57.  
  58.         g_CliSocketArr[g_iTotalConn] = sClient;
  59.  
  60.         // Allocate a PER_IO_OPERATION_DATA structure
  61.         g_pPerIODataArr[g_iTotalConn] = (LPPER_IO_OPERATION_DATA)HeapAlloc(
  62.             GetProcessHeap(),
  63.             HEAP_ZERO_MEMORY,
  64.             sizeof(PER_IO_OPERATION_DATA));
  65.         g_pPerIODataArr[g_iTotalConn]->Buffer.len = MSGSIZE;
  66.         g_pPerIODataArr[g_iTotalConn]->Buffer.buf = g_pPerIODataArr[g_iTotalConn]->szMessage;
  67.         g_CliEventArr[g_iTotalConn] = g_pPerIODataArr[g_iTotalConn]->overlap.hEvent = WSACreateEvent();
  68.  
  69.         // Launch an asynchronous operation
  70.         WSARecv(
  71.             g_CliSocketArr[g_iTotalConn],
  72.             &g_pPerIODataArr[g_iTotalConn]->Buffer,
  73.             1,
  74.             &g_pPerIODataArr[g_iTotalConn]->NumberOfBytesRecvd,
  75.             &g_pPerIODataArr[g_iTotalConn]->Flags,
  76.             &g_pPerIODataArr[g_iTotalConn]->overlap,
  77.             NULL);
  78.  
  79.         g_iTotalConn++;
  80.     }
  81.  
  82.     closesocket(sListen);
  83.     WSACleanup();
  84.     return 0;
  85. }
  86.  
  87. DWORD WINAPI WorkerThread(LPVOID lpParam)
  88. {
  89.     int   ret, index;
  90.     DWORD cbTransferred;
  91.  
  92.     while (TRUE)
  93.     {
  94.         ret = WSAWaitForMultipleEvents(g_iTotalConn, g_CliEventArr, FALSE, 1000, FALSE);
  95.         if (ret == WSA_WAIT_FAILED || ret == WSA_WAIT_TIMEOUT)
  96.         {
  97.             continue;
  98.         }
  99.  
  100.         index = ret - WSA_WAIT_EVENT_0;
  101.         WSAResetEvent(g_CliEventArr[index]);
  102.  
  103.         WSAGetOverlappedResult(
  104.             g_CliSocketArr[index],
  105.             &g_pPerIODataArr[index]->overlap,
  106.             &cbTransferred,
  107.             TRUE,
  108.             &g_pPerIODataArr[g_iTotalConn]->Flags);
  109.  
  110.         if (cbTransferred == 0)
  111.         {
  112.             // The connection was closed by client
  113.             Cleanup(index);
  114.         }
  115.         else
  116.         {
  117.             // g_pPerIODataArr[index]->szMessage contains the received data
  118.             g_pPerIODataArr[index]->szMessage[cbTransferred] = '\0';
  119.             send(g_CliSocketArr[index], g_pPerIODataArr[index]->szMessage,\\par                 cbTransferred, 0);
  120.  
  121.             // Launch another asynchronous operation
  122.             WSARecv(
  123.                 g_CliSocketArr[index],
  124.                 &g_pPerIODataArr[index]->Buffer,
  125.                 1,
  126.                 &g_pPerIODataArr[index]->NumberOfBytesRecvd,
  127.                 &g_pPerIODataArr[index]->Flags,
  128.                 &g_pPerIODataArr[index]->overlap,
  129.                 NULL);
  130.         }
  131.     }
  132.  
  133.     return 0;
  134. }
  135.  
  136. void Cleanup(int index)
  137. {
  138.     closesocket(g_CliSocketArr[index]);
  139.     WSACloseEvent(g_CliEventArr[index]);
  140.     HeapFree(GetProcessHeap(), 0, g_pPerIODataArr[index]);
  141.  
  142.     if (index < g_iTotalConn - 1)
  143.     {
  144.         g_CliSocketArr[index] = g_CliSocketArr[g_iTotalConn - 1];
  145.         g_CliEventArr[index] = g_CliEventArr[g_iTotalConn - 1];
  146.         g_pPerIODataArr[index] = g_pPerIODataArr[g_iTotalConn - 1];
  147.     }
  148.  
  149.     g_pPerIODataArr[--g_iTotalConn] = NULL;
  150. }

        这个模型与上述其他模型不同的是它使用Winsock2提供的异步I/O函数WSARecv。在调用WSARecv时,指定一个WSAOVERLAPPED结构,这个调用不是阻塞的,也就是说,它会立刻返回。一旦有数据到达的时候,被指定的WSAOVERLAPPED结构中的hEvent被Signaled。由于下面这个语句
        g_CliEventArr[g_iTotalConn] = g_pPerIODataArr[g_iTotalConn]->overlap.hEvent;
        使得与该套接字相关联的WSAEVENT对象也被Signaled,所以WSAWaitForMultipleEvents的调用操作成功返回。我们现在应该做的就是用与调用WSARecv相同的WSAOVERLAPPED结构为参数调用WSAGetOverlappedResult,从而得到本次I/O传送的字节数等相关信息。在取得接收的数据后,把数据原封不动的发送到客户端,然后重新激活一个WSARecv异步操作。

        2.用完成例程方式实现的重叠I/O模型

Code Snippet
  1. #include
  2. #include
  3.  
  4. #define PORT    5150
  5. #define MSGSIZE 1024
  6.  
  7. #pragma comment(lib, "ws2_32.lib")
  8.  
  9. typedef struct
  10. {
  11.     WSAOVERLAPPED overlap;
  12.     WSABUF        Buffer;
  13.     char          szMessage[MSGSIZE];
  14.     DWORD         NumberOfBytesRecvd;
  15.     DWORD         Flags;
  16.     SOCKET        sClient;
  17. }PER_IO_OPERATION_DATA, *LPPER_IO_OPERATION_DATA;
  18.  
  19. DWORD WINAPI WorkerThread(LPVOID);
  20. void CALLBACK CompletionROUTINE(DWORD, DWORD, LPWSAOVERLAPPED, DWORD);
  21.  
  22. SOCKET g_sNewClientConnection;
  23. BOOL   g_bNewConnectionArrived = FALSE;
  24.  
  25. int main()
  26. {
  27.     WSADATA     wsaData;
  28.     SOCKET      sListen;
  29.     SOCKADDR_IN local, client;
  30.     DWORD       dwThreadId;
  31.     int         iaddrSize = sizeof(SOCKADDR_IN);
  32.  
  33.     // Initialize Windows Socket library
  34.     WSAStartup(0x0202, &wsaData);
  35.  
  36.     // Create listening socket
  37.     sListen = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  38.  
  39.     // Bind
  40.     local.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
  41.     local.sin_family = AF_INET;
  42.     local.sin_port = htons(PORT);
  43.     bind(sListen, (struct sockaddr *)&local, sizeof(SOCKADDR_IN));
  44.  
  45.     // Listen
  46.     listen(sListen, 3);
  47.  
  48.     // Create worker thread
  49.     CreateThread(NULL, 0, WorkerThread, NULL, 0, &dwThreadId);
  50.  
  51.     while (TRUE)
  52.     {
  53.         // Accept a connection
  54.         g_sNewClientConnection = accept(sListen, (struct sockaddr *)&client, &iaddrSize);
  55.         g_bNewConnectionArrived = TRUE;
  56.         printf("Accepted client:%s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));
  57.     }
  58. }
  59.  
  60. DWORD WINAPI WorkerThread(LPVOID lpParam)
  61. {
  62.     LPPER_IO_OPERATION_DATA lpPerIOData = NULL;
  63.  
  64.     while (TRUE)
  65.     {
  66.         if (g_bNewConnectionArrived)
  67.         {
  68.             // Launch an asynchronous operation for new arrived connection
  69.             lpPerIOData = (LPPER_IO_OPERATION_DATA)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(PER_IO_OPERATION_DATA));
  70.             lpPerIOData->Buffer.len = MSGSIZE;
  71.             lpPerIOData->Buffer.buf = lpPerIOData->szMessage;
  72.             lpPerIOData->sClient = g_sNewClientConnection;
  73.  
  74.             WSARecv(lpPerIOData->sClient,
  75.                 &lpPerIOData->Buffer,
  76.                 1,
  77.                 &lpPerIOData->NumberOfBytesRecvd,
  78.                 &lpPerIOData->Flags,
  79.                 &lpPerIOData->overlap,
  80.                 CompletionROUTINE);      
  81.  
  82.             g_bNewConnectionArrived = FALSE;
  83.         }
  84.  
  85.         SleepEx(1000, TRUE);
  86.     }
  87.     return 0;
  88. }
  89.  
  90. void CALLBACK CompletionROUTINE(DWORD dwError,
  91.                                 DWORD cbTransferred,
  92.                                 LPWSAOVERLAPPED lpOverlapped,
  93.                                 DWORD dwFlags)
  94. {
  95.     LPPER_IO_OPERATION_DATA lpPerIOData = (LPPER_IO_OPERATION_DATA)lpOverlapped;
  96.  
  97.     if (dwError != 0 || cbTransferred == 0)
  98.     {
  99.         // Connection was closed by client
  100.         closesocket(lpPerIOData->sClient);
  101.         HeapFree(GetProcessHeap(), 0, lpPerIOData);
  102.     }
  103.     else
  104.     {
  105.         lpPerIOData->szMessage[cbTransferred] = '\0';
  106.         send(lpPerIOData->sClient, lpPerIOData->szMessage, cbTransferred, 0);
  107.  
  108.         // Launch another asynchronous operation
  109.         memset(&lpPerIOData->overlap, 0, sizeof(WSAOVERLAPPED));
  110.         lpPerIOData->Buffer.len = MSGSIZE;
  111.         lpPerIOData->Buffer.buf = lpPerIOData->szMessage;    
  112.  
  113.         WSARecv(lpPerIOData->sClient,
  114.             &lpPerIOData->Buffer,
  115.             1,
  116.             &lpPerIOData->NumberOfBytesRecvd,
  117.             &lpPerIOData->Flags,
  118.             &lpPerIOData->overlap,
  119.             CompletionROUTINE);
  120.     }
  121. }

       用完成例程来实现重叠I/O比用事件通知简单得多。在这个模型中,主线程只用不停的接受连接即可;辅助线程判断有没有新的客户端连接被建立,如果有,就为那个客户端套接字激活一个异步的WSARecv操作,然后调用SleepEx使线程处于一种可警告的等待状态,以使得I/O完成后CompletionROUTINE可以被内核调用。如果辅助线程不调用SleepEx,则内核在完成一次I/O操作后,无法调用完成例程(因为完成例程的运行应该和当初激活WSARecv异步操作的代码在同一个线程之内)。
        完成例程内的实现代码比较简单,它取出接收到的数据,然后将数据原封不动的发送给客户端,最后重新激活另一个WSARecv异步操作。注意,在这里用到了“尾随数据”。我们在调用WSARecv的时候,参数lpOverlapped实际上指向一个比它大得多的结构PER_IO_OPERATION_DATA,这个结构除了WSAOVERLAPPED以外,还被我们附加了缓冲区的结构信息,另外还包括客户端套接字等重要的信息。这样,在完成例程中通过参数lpOverlapped拿到的不仅仅是WSAOVERLAPPED结构,还有后边尾随的包含客户端套接字和接收数据缓冲区等重要信息。这样的C语言技巧在我后面介绍完成端口的时候还会使用到。

        五.完成端口模型
        “完成端口”模型是迄今为止最为复杂的一种I/O模型。然而,假若一个应用程序同时需要管理为数众多的套接字,那么采用这种模型,往往可以达到最佳的系统性能!但不幸的是,该模型只适用于Windows NT和Windows 2000操作系统。因其设计的复杂性,只有在你的应用程序需要同时管理数百乃至上千个套接字的时候,而且希望随着系统内安装的CPU数量的增多,应用程序的性能也可以线性提升,才应考虑采用“完成端口”模型。要记住的一个基本准则是,假如要为Windows NT或Windows 2000开发高性能的服务器应用,同时希望为大量套接字I/O请求提供服务(Web服务器便是这方面的典型例子),那么I/O完成端口模型便是最佳选择!(节选自《Windows网络编程》第八章)
        完成端口模型是我最喜爱的一种模型。虽然其实现比较复杂(其实我觉得它的实现比用事件通知实现的重叠I/O简单多了),但其效率是惊人的。我在T公司的时候曾经帮同事写过一个邮件服务器的性能测试程序,用的就是完成端口模型。结果表明,完成端口模型在多连接(成千上万)的情况下,仅仅依靠一两个辅助线程,就可以达到非常高的吞吐量。下面我还是从代码说起:

Code Snippet
  1. #include
  2. #include
  3.  
  4. #define PORT    5150
  5. #define MSGSIZE 1024
  6.  
  7. #pragma comment(lib, "ws2_32.lib")
  8.  
  9. typedef enum
  10. {
  11.     RECV_POSTED
  12. }OPERATION_TYPE;
  13.  
  14. typedef struct
  15. {
  16.     WSAOVERLAPPED  overlap;
  17.     WSABUF         Buffer;
  18.     char           szMessage[MSGSIZE];
  19.     DWORD          NumberOfBytesRecvd;
  20.     DWORD          Flags;
  21.     OPERATION_TYPE OperationType;
  22. }PER_IO_OPERATION_DATA, *LPPER_IO_OPERATION_DATA;
  23.  
  24. DWORD WINAPI WorkerThread(LPVOID);
  25.  
  26. int main()
  27. {
  28.     WSADATA                 wsaData;
  29.     SOCKET                  sListen, sClient;
  30.     SOCKADDR_IN             local, client;
  31.     DWORD                   i, dwThreadId;
  32.     int                     iaddrSize = sizeof(SOCKADDR_IN);
  33.     HANDLE                  CompletionPort = INVALID_HANDLE_VALUE;
  34.     SYSTEM_INFO             systeminfo;
  35.     LPPER_IO_OPERATION_DATA lpPerIOData = NULL;
  36.  
  37.     // Initialize Windows Socket library
  38.     WSAStartup(0x0202, &wsaData);
  39.  
  40.     // Create completion port
  41.     CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
  42.  
  43.     // Create worker thread
  44.     GetSystemInfo(&systeminfo);
  45.     for (i = 0; i < systeminfo.dwNumberOfProcessors; i++)
  46.     {
  47.         CreateThread(NULL, 0, WorkerThread, CompletionPort, 0, &dwThreadId);
  48.     }
  49.  
  50.     // Create listening socket
  51.     sListen = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  52.  
  53.     // Bind
  54.     local.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
  55.     local.sin_family = AF_INET;
  56.     local.sin_port = htons(PORT);
  57.     bind(sListen, (struct sockaddr *)&local, sizeof(SOCKADDR_IN));
  58.  
  59.     // Listen
  60.     listen(sListen, 3);
  61.  
  62.     while (TRUE)
  63.     {
  64.         // Accept a connection
  65.         sClient = accept(sListen, (struct sockaddr *)&client, &iaddrSize);
  66.         printf("Accepted client:%s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));
  67.  
  68.         // Associate the newly arrived client socket with completion port
  69.         CreateIoCompletionPort((HANDLE)sClient, CompletionPort, (DWORD)sClient, 0);
  70.  
  71.         // Launch an asynchronous operation for new arrived connection
  72.         lpPerIOData = (LPPER_IO_OPERATION_DATA)HeapAlloc(
  73.             GetProcessHeap(),
  74.             HEAP_ZERO_MEMORY,
  75.             sizeof(PER_IO_OPERATION_DATA));
  76.         lpPerIOData->Buffer.len = MSGSIZE;
  77.         lpPerIOData->Buffer.buf = lpPerIOData->szMessage;
  78.         lpPerIOData->OperationType = RECV_POSTED;
  79.         WSARecv(sClient,
  80.             &lpPerIOData->Buffer,
  81.             1,
  82.             &lpPerIOData->NumberOfBytesRecvd,
  83.             &lpPerIOData->Flags,
  84.             &lpPerIOData->overlap,
  85.             NULL);
  86.     }
  87.  
  88.     PostQueuedCompletionStatus(CompletionPort, 0xFFFFFFFF, 0, NULL);
  89.     CloseHandle(CompletionPort);
  90.     closesocket(sListen);
  91.     WSACleanup();
  92.     return 0;
  93. }
  94.  
  95. DWORD WINAPI WorkerThread(LPVOID CompletionPortID)
  96. {
  97.     HANDLE                  CompletionPort=(HANDLE)CompletionPortID;
  98.     DWORD                   dwBytesTransferred;
  99.     SOCKET                  sClient;
  100.     LPPER_IO_OPERATION_DATA lpPerIOData = NULL;
  101.  
  102.     while (TRUE)
  103.     {
  104.         GetQueuedCompletionStatus(
  105.             CompletionPort,
  106.             &dwBytesTransferred,
  107.             &sClient,
  108.             (LPOVERLAPPED *)&lpPerIOData,
  109.             INFINITE);
  110.         if (dwBytesTransferred == 0xFFFFFFFF)
  111.         {
  112.             return 0;
  113.         }
  114.  
  115.         if (lpPerIOData->OperationType == RECV_POSTED)
  116.         {
  117.             if (dwBytesTransferred == 0)
  118.             {
  119.                 // Connection was closed by client
  120.                 closesocket(sClient);
  121.                 HeapFree(GetProcessHeap(), 0, lpPerIOData);        
  122.             }
  123.             else
  124.             {
  125.                 lpPerIOData->szMessage[dwBytesTransferred] = '\0';
  126.                 send(sClient, lpPerIOData->szMessage, dwBytesTransferred, 0);
  127.  
  128.                 // Launch another asynchronous operation for sClient
  129.                 memset(lpPerIOData, 0, sizeof(PER_IO_OPERATION_DATA));
  130.                 lpPerIOData->Buffer.len = MSGSIZE;
  131.                 lpPerIOData->Buffer.buf = lpPerIOData->szMessage;
  132.                 lpPerIOData->OperationType = RECV_POSTED;
  133.                 WSARecv(sClient,
  134.                     &lpPerIOData->Buffer,
  135.                     1,
  136.                     &lpPerIOData->NumberOfBytesRecvd,
  137.                     &lpPerIOData->Flags,
  138.                     &lpPerIOData->overlap,
  139.                     NULL);
  140.             }
  141.         }
  142.     }
  143.     return 0;
  144. }

         首先,说说主线程:
        1.创建完成端口对象
        2.创建工作者线程(这里工作者线程的数量是按照CPU的个数来决定的,这样可以达到最佳性能)
        3.创建监听套接字,绑定,监听,然后程序进入循环
        4.在循环中,我做了以下几件事情:
                (1).接受一个客户端连接
                (2).将该客户端套接字与完成端口绑定到一起(还是调用CreateIoCompletionPort,但这次的作用不同),注意,按道理来讲,此时传递给CreateIoCompletionPort的第三个参数应该是一个完成键,一般来讲,程序都是传递一个单句柄数据结构的地址,该单句柄数据包含了和该客户端连接有关的信息,由于我们只关心套接字句柄,所以直接将套接字句柄作为完成键传递;
                (3).触发一个WSARecv异步调用,这次又用到了“尾随数据”,使接收数据所用的缓冲区紧跟在WSAOVERLAPPED对象之后,此外,还有操作类型等重要信息。
        在工作者线程的循环中,我们
        1.调用GetQueuedCompletionStatus取得本次I/O的相关信息(例如套接字句柄、传送的字节数、单I/O数据结构的地址等等)
        2.通过单I/O数据结构找到接收数据缓冲区,然后将数据原封不动的发送到客户端
        3.再次触发一个WSARecv异步操作

        六.五种I/O模型的比较
        我会从以下几个方面来进行比较
        *有无每线程64连接数限制
        如果在选择模型中没有重新定义FD_SETSIZE宏,则每个fd_set默认可以装下64个SOCKET。同样的,受MAXIMUM_WAIT_OBJECTS宏的影响,事件选择、用事件通知实现的重叠I/O都有每线程最大64连接数限制。如果连接数成千上万,则必须对客户端套接字进行分组,这样,势必增加程序的复杂度。
        相反,异步选择、用完成例程实现的重叠I/O和完成端口不受此限制。

        *线程数
        除了异步选择以外,其他模型至少需要2个线程。一个主线程和一个辅助线程。同样的,如果连接数大于64,则选择模型、事件选择和用事件通知实现的重叠I/O的线程数还要增加。

        *实现的复杂度
        我的个人看法是,在实现难度上,异步选择<选择<用完成例程实现的重叠I/O<事件选择<完成端口<用事件通知实现的重叠I/O

        *性能
        由于选择模型中每次都要重设读集,在select函数返回后还要针对所有套接字进行逐一测试,我的感觉是效率比较差;完成端口和用完成例程实现的重叠I/O基本上不涉及全局数据,效率应该是最高的,而且在多处理器情形下完成端口还要高一些;事件选择和用事件通知实现的重叠I/O在实现机制上都是采用WSAWaitForMultipleEvents,感觉效率差不多;至于异步选择,不好比较。所以我的结论是:选择<用事件通知实现的重叠I/O<事件选择<用完成例程实现的重叠I/O<完成端口。

Comments