http://blog.csdn.net/caoshiying?viewmode=contents
一,回想重叠 IO 模型
用完毕例程来实现重叠 I/O 比用事件通知简单得多.在这个模型中,主线程仅仅用不停的接受连接就可以;辅助线程推断有没有新的 client 连接被建立,假设有.就为那个 client 套接字激活一个异步的 WSARecv 操作,然后调用 SleepEx 使线程处于一种可警告的等待状态,以使得 I/O 完毕后 CompletionROUTINE 能够被内核调用.假设辅助线程不调用 SleepEx.则内核在完毕一次 I/O 操作后,无法调用完毕例程(由于完毕例程的执行应该和当初激活 WSARecv 异步操作的代码在同一个线程之内).
完毕例程内的实现代码比較简单,它取出接收到的数据,然后将数据原封不动的发送给 client.最后又一次激活还有一个 WSARecv 异步操作.注意,在这里用到了 "跟随数据".我们在调用 WSARecv 的时候,參数 lpOverlapped 实际上指向一个比它大得多的结构 PER_IO_OPERATION_DATA,这个结构除了 WSAOVERLAPPED 以外.还被我们附加了缓冲区的结构信息,另外还包括 client 套接字等重要的信息.这样.在完毕例程中通过參数 lpOverlapped 拿到的不不过 WSAOVERLAPPED 结构,还有后边跟随的包括 client 套接字和接收数据缓冲区等重要信息.这种 C 语言技巧在我介绍完毕 port 的时候还会使用到.
二,完毕 port 模型
"完毕 port" 模型是迄今为止最为复杂的一种 I/O 模型.
然而,假若一个应用程序同一时候须要管理为数众多的套接字,那么採用这样的模型,往往能够达到最佳的系统性能!
但不幸的是.该模型仅仅适用于 Windows NT 和 Windows 2000 操作系统.
因其设计的复杂性,仅仅有在你的应用程序须要同一时候管理数百乃至上千个套接字的时候,并且希望随着系统内安装的 CPU 数量的增多,应用程序的性能也能够线性提升.才应考虑採用 "完毕 port" 模型.
要记住的一个基本准则是.假如要为 Windows NT 或 Windows 2000 开发高性能的 server 应用.同一时候希望为大量套接字 I/O 请求提供服务(webserver 便是这方面的典型样例).那么 I/O 完毕 port 模型便是最佳选择!
完毕 port 模型是我最喜爱的一种模型.尽管事实上现比較复杂(事实上我认为它的实现比用事件通知实现的重叠 I/O 简单多了).但其效率是惊人的.
我在 T 公司的时候以前帮同事写过一个邮件 server 的性能測试程序,用的就是完毕 port 模型.
结果表明.完毕 port 模型在多连接(成千上万)的情况下.只依靠一两个辅助线程.就能够达到很高的吞吐量.
三,关键函数
1,CreateIoCompletionPort
创建一个输入 / 输出(I / O)完毕 port,并将其与一个指定的文件句柄关联.或者创建一个尚未与文件句柄关联的 I / O 完毕 port,同意在稍后的时间关联.将已打开的文件句柄的实例与一个 I / O 完毕 port 关联,同意一个进程接收包括该文件句柄的异步 I / O 操作完毕的通知.注意:这里所使用的术语文件句柄是指代表一个重叠的 I / O 端点的系统抽象,而不不过磁盘上的一个文件.不论什么系统对象支持重叠 I / o-such 网络端点,TCP 套接字.命名管道,邮件槽能够作为文件句柄.
函数原型:
HANDLE WINAPI CreateIoCompletionPort(
_In_ HANDLE FileHandle,
_In_opt_ HANDLE ExistingCompletionPort,
_In_ ULONG_PTR CompletionKey,
_In_ DWORD NumberOfConcurrentThreads
);
函数參数:
FileHandle:一个打开的文件句柄或者 INVALID_HANDLE_VALUE.这个文件句柄必须是支持重叠 IO 的 object.
假设提供了句柄, 它必须是已经给重叠 I/O 模型完毕 port 打开的句柄.比如.假设您使用 CreateFile 函数获取的句柄,那么您在调用这个函数时必须在參数中指定 FILE_FLAG_OVERLAPPED 旗标.假设指定 INVALID_HANDLE_VALUE,那么函数将创建一个没有关联文件句柄的 IO 完毕 port 模型,此外 ExistingCompletionPort 參数必须设为 NULL,CompletionKey 參数将被忽略.
ExistingCompletionPort:是已经存在的完毕 port.
假设为 NULL.则为新建一个 IOCP.
CompletionKey:用户定义的句柄包括的 I/O 完毕包信息.当 FileHandle 被设为 INVALID_HANDLE_VALUE 时此參数被忽略.
NumberOfConcurrentThreads:操作系统能够同意同一时候处理 I / O 完毕端口的 I / O 完毕数据包的线程的最大数目.假设 existingcompletionport 參数不为空,则忽略此參数.假设这个參数为零,系统同意多个并发执行的线程.由于系统中有处理器.
返回值:
假设函数成功,返回值是一个 I / O 完毕 port 的句柄:假设 ExistingCompletionPort 參数为空.返回值是一个新的处理.假设 ExistingCompletionPort 參数是一个有效的 I/O 完毕 port 句柄,返回值是同样的处理.假设文件句柄參数是一个有效的处理,文件处理是如今与返回的 I/O 完毕 port.假设函数失败,返回值为空.为了获得很多其它的错误信息,调用 GetLastError 函数.
2,GetQueuedCompletionStatus
失望的是微软官方 MSDN 没有提供关于这个 API 的说明.下面參照一篇英文文档进行翻译.
文档说这个函数试图将一个 I/O 完毕包从指定的 I/O 完毕 port.假设没有完毕数据包队列,则函数等待一个挂起的 I / O 操作与完毕 port 相关联的完毕.
函数原型:
BOOL WINAPI GetQueuedCompletionStatus(
_In_ HANDLE CompletionPort,
_Out_ LPDWORD lpNumberOfBytes,
_Out_ PULONG_PTR lpCompletionKey,
_Out_ LPOVERLAPPED *lpOverlapped,
_In_ DWORD dwMilliseconds
);
函数參数:
CompletionPort:完毕 port 的句柄.创建一个完毕 port.使用 CreateIoCompletionPort 函数.
lpNumberOfBytes:指向已完毕的 I / O 操作期间传输的字节数的变量的指针.
lpCompletionKey:指向与文件句柄关联的完毕键的变量的指针,该键的 I / O 操作已完毕.一个完毕的关键是每个文件的关键,是指定一个叫 CreateIoCompletionPort.
lpOverlapped:一个指向一个变量的指针,该指针指向在已完毕的 I / O 操作開始时指定的重叠结构的地址的变量.
即使您已经通过了一个与完毕 port 相关联的文件句柄和一个有效的重叠结构,应用程序也能够防止完毕 port 通知.这是通过指定的重叠结构的 hevent 成员有效的事件处理完毕,并设置其低阶位.
一个有效的事件句柄,其低阶位设置将保持 I / O 完毕从被队列到完毕 port.
dwMilliseconds:调用方愿意等待完毕数据包出如今完毕 port 的毫秒数.
假设一个完毕包没有出如今指定的时间内,功能倍出.返回 false.并设置 * lpOverlapped 为 null.
假设该參数是无限的.函数将没有时间了.
假设该參数为零,没有 I/O 操作中出列,函数将取消等待时间,马上操作.
返回值:
返回非零(真).假设成功或零(假),否则.为了获得很多其它的错误信息,调用 GetLastError.
此功能将一个线程与指定的完毕 port 关联.
一个线程能够与至多一个完毕 port 相关联的.假设由于完毕 port 句柄与它是封闭而调用调用 GetQueuedCompletionStatus 突出失败.函数返回 false.*lpOverlapped 会是空的,GetLastError 将返回 error_abandoned_wait_0.
Windows Server 2003 和 Windows XP:关闭完毕 port 句柄,调用优秀不会导致之前的行为.该函数将继续等待直到一项是从港口或直到发生超时删除,假设指定以外的无限价值.
假设 GetQueuedCompletionStatus 函数调用成功,它出列完毕包一个成功的 I/O 操作完毕 port 和存储信息的变量所指向的下列參数:lpNumberOfBytes.lpcompletionkey,和 lpOverlapped.在失败(返回值是错误的),这些同样的參数能够包括特定的值组合例如以下:
假设 * lpOverlapped 为空.功能没有出列完毕包从完毕 port.在这样的情况下,函数不存储信息在 lpNumberOfBytes and lpCompletionKey 所指向的參数中,其值是不确定的.
假设 * lpOverlapped 不空和功能按一个失败的 I/O 操作的完毕 port 完毕包的功能.存储信息有关失败操作的变量所指向的 lpcompletionkey lpOverlapped lpNumberOfBytes.为了获得很多其它的错误信息.调用 GetLastError.
3,PostQueuedCompletionStatus
将一个 I / O 完毕数据包发送到一个 I / O 完毕 port.I/O 完毕包将满足一个优秀的调用 GetQueuedCompletionStatus 函数.
该函数返回三值传递的第二,第三,和第四个參数 postqueuedcompletionstatus 呼叫.
该系统不使用或验证这些值.特别是.lpOverlapped 參数不须要点的重叠结构.
函数原型:
BOOL WINAPI PostQueuedCompletionStatus(
_In_ HANDLE CompletionPort,
_In_ DWORD dwNumberOfBytesTransferred,
_In_ ULONG_PTR dwCompletionKey,
_In_opt_ LPOVERLAPPED lpOverlapped
);
參数:
CompletionPort:一个 I / O 完毕数据包的 I / O 完毕 port 的句柄.
dwNumberOfBytesTransferred:要通过 lpnumberofbytestransferred 參数 GetQueuedCompletionStatus 函数返回的值.0xFFFFFFFF 表示处理全部跟随数据.仅仅有准备关闭 port 的时候才这样做.
dwCompletionKey:能够通过 GetQueuedCompletionStatus 函数返回的值 lpcompletionkey 參数.
lpOverlapped:要通过 lpOverlapped 參数 GetQueuedCompletionStatus 函数返回的值.
返回值:
假设函数成功.返回值是非零的.假设函数失败,返回值为零.为了获得很多其它的错误信息,调用 GetLastError.
四,完整的演示样例程序
接着上面几篇 Socket 文章写,关于公共代码与反射式 client 请參见:《 Socket 编程模型之简单选择模型 》.以下是新建的 overlapped_serverproject,新建了一个 overlapped_server_manager 类型,继承自 iserver_manager 接口,头文件完整代码例如以下:
#pragma once
#define SOCKET_MESSAGE_SIZE 1024
#include <WinSock2.h>
#include <common_callback.h>
typedef enum
{
RECV_POSTED
}OPERATION_TYPE;
typedef struct
{
WSAOVERLAPPED overlap;
WSABUF buffer;
char message[SOCKET_MESSAGE_SIZE];
DWORD received_count;
DWORD flags;
OPERATION_TYPE operation_type;
}PEERIO_OPERATION_DATA, *LPPEERIO_OPERATION_DATA;
class completeio_server_manager:
public iserver_manager
{
private:
int iport;
int iaddr_size;
common_callback callback;
BOOL brunning;
SOCKET server;
WSADATA wsaData;
HANDLE hcomplete_port;
SYSTEM_INFO system_info;
LPPEERIO_OPERATION_DATA peer_data;
bool bdisposed;
protected:
bool accept_by_crt();
bool accept_by_winapi();
public:
void receive();
void shutdown();
void start_receive();
void start_accept();
public:
completeio_server_manager();
virtual ~completeio_server_manager();
};
实现文件完整代码例如以下:
#include "completeio_server_manager.h"
#include <stdio.h>
#include <tchar.h>
completeio_server_manager::completeio_server_manager()
{
iport = 5150;
iaddr_size = sizeof(SOCKADDR_IN);
brunning = FALSE;
GetSystemInfo(&system_info);
callback.set_manager(this);
callback.set_receive_thread_coount(system_info.dwNumberOfProcessors);
hcomplete_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
bdisposed = false;
}
completeio_server_manager::~completeio_server_manager()
{
if (bdisposed)
shutdown();
}
bool completeio_server_manager::accept_by_crt()
{
return true;
}
bool completeio_server_manager::accept_by_winapi()
{
SOCKADDR_IN server_addr;
SOCKADDR_IN client_addr;
SOCKET client;
LPPEERIO_OPERATION_DATA peer_data;
int iresult = -1;
WSAStartup(MAKEWORD(2, 2), &wsaData);
server = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
server_addr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(iport);
do
{
iresult = bind(server, (struct sockaddr*)&server_addr, iaddr_size);
if (iresult == SOCKET_ERROR)
{
iport++;
server_addr.sin_port = htons(iport);
}
} while (iresult == -1);
listen(server, 3);
printf("基于完毕端口模型的Socket服务器启动成功.监听端口是:%d\n", iport);
while (brunning)
{
printf("開始监听请求.\n");
client = accept(server, (struct sockaddr*)&client_addr, &iaddr_size);
if (client == SOCKET_ERROR)
continue;
printf("新客户端连接:%s:%d\n", inet_ntoa(client_addr.sin_addr), htons(client_addr.sin_port));
CreateIoCompletionPort((HANDLE)client, hcomplete_port, (DWORD)client, 0);
peer_data = (LPPEERIO_OPERATION_DATA)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(PEERIO_OPERATION_DATA));
peer_data->buffer.len = SOCKET_MESSAGE_SIZE;
peer_data->buffer.buf = peer_data->message;
peer_data->operation_type = RECV_POSTED;
printf("開始接收客户端传送数据.\n");
WSARecv(client, &peer_data->buffer, 1, &peer_data->received_count, &peer_data->flags, &peer_data->overlap, NULL);
printf("收到客户端数据.\n");
}
return true;
}
void completeio_server_manager::receive()
{
DWORD dwtransfered = 0;
SOCKET client;
LPPEERIO_OPERATION_DATA peer = nullptr;
while (brunning)
{
printf("线程:%d,查询端口状态信息.
\n ",GetCurrentThreadId());
GetQueuedCompletionStatus(hcomplete_port, &dwtransfered, (PULONG_PTR)&client, (LPOVERLAPPED*)&peer, INFINITE);
printf("获得端口信息.\n ");
if (dwtransfered == 0xFFFFFFFF)
return;
if (peer->operation_type == RECV_POSTED)
{
if (dwtransfered == 0)
{
closesocket(client);
printf("有客户端退出了.\n ");
HeapFree(GetProcessHeap(), 0, peer);
}
else
{
peer->message[dwtransfered] = 0;
send(client, peer->message, dwtransfered, 0);
memset(peer, 0, sizeof(PEERIO_OPERATION_DATA));
peer->buffer.len = SOCKET_MESSAGE_SIZE;
peer->buffer.buf = peer->message;
peer->operation_type = RECV_POSTED;
WSARecv(client, &peer->buffer, 1, &peer->received_count, &peer->flags, &peer->overlap, nullptr);
}
}
}
}
void completeio_server_manager::shutdown()
{
PostQueuedCompletionStatus(hcomplete_port, 0xFFFFFFFF, 0, NULL);//端口跟随数据.
brunning = FALSE;
callback.shutdown();//清扫
CloseHandle(hcomplete_port);
closesocket(server);
WSACleanup();
bdisposed = true;
}
void completeio_server_manager::start_accept()
{
brunning = TRUE;
bdisposed = false;
callback.start_accept_by_winapi();
}
void completeio_server_manager::start_receive()
{
brunning = TRUE;
bdisposed = false;
callback.start_receive();
}
int main()
{
completeio_server_manager csm;
csm.start_accept();
csm.start_receive();
printf("服务器启动成功.按随意键关闭服务器并退出程序.\n ");
getchar();
csm.shutdown();
return 0;
}"
五,效果
六,心得体会
成功创建一个完毕 port 后,便可開始将套接字句柄与对象关联到一起.但在关联套接字之前,首先必须创建一个或多个 "工作者线程",以便在 I/O 请求投递给完毕 port 对象后,为完毕 port 提供服务.在这个时候,大家也许会认为奇怪,究竟应创建多少个线程,以便为完毕 port 提供服务呢?这实际正是完毕 port 模型显得颇为 "复杂" 的一个方面,由于服务 I/O 请求所需的数量取决于应用程序的整体设计情况.
在此要记住的一个重点在于.在我们调用 CreateIoCompletionPort 时指定的并发线程数量,与打算创建的工作者线程数量相比,它们代表的并不是同一件事情.早些时候.我们曾建议大家用 CreateIoCompletionPort 函数为每一个处理器都指定一个线程(处理器的数量有多少,便指定多少线程)以避免因为频繁的线程 "场景" 交换活动,从而影响系统的总体性能.CreateIoCompletionPort 函数的 NumberOfConcurrentThreads 參数明白指示系统:在一个完毕 port 上,一次仅仅同意 n 个工作者线程执行.假如在完毕 port 上创建的工作者线程数量超出 n 个.那么在同一时刻.最多仅仅同意 n 个线程执行.
但实际上,在一段较短的时间内,系统有可能超过这个值,但非常快便会把它降低至事先在 CreateIoCompletionPort 函数中设定的值.那么.为何实际创建的工作者线程数量有时要比 CreateIoCompletionPort 函数设定的多一些呢?这样做有必要吗?如先前所述.这主要取决于应用程序的整体设计情况.
假定我们的某个工作者线程调用了一个函数,比方 Sleep 或 WaitForSingleObject,但却进入了暂停(锁定或挂起)状态.那么同意还有一个线程取代它的位置.换言之,我们希望随时都能运行尽可能多的线程.当然,最大的线程数量是事先在 CreateIoCompletionPort 调用里设定好的.
这样一来.假如事先估计到自己的线程有可能临时处于停顿状态,那么最好可以创建比 CreateIoCompletionPort 的 NumberOfConcurrentThreads 參数的值多的线程.以便到时候充分发挥系统的潜力.
一旦在完毕 port 上拥有足够多的工作者线程来为 I/O 请求提供服务,便可着手将套接字句柄同完毕 port 关联到一起.这要求我们在一个现有的完毕 port 上,调用 CreateIoCompletionPort 函数,同一时候为前三个參数——FileHandle,ExistingCompletionPort 和 CompletionKey——提供套接字的信息.当中, FileHandle 參数指定一个要同完毕 port 关联在一起的套接字句柄.
ExistingCompletionPort 參数指定的是一个现有的完毕 port.
CompletionKey(完毕键)參数则指定要与某个特定套接字句柄关联在一起的 "单句柄数据".在这个參数中.应用程序可保存与一个套接字相应的随意类型的信息.之所以把它叫作 "单句柄数据".是因为它仅仅相应着与那个套接字句柄关联在一起的数据.
可将其作为指向一个数据结构的指针,来保存套接字句柄;在那个结构中.同一时候包括了套接字的句柄.以及与那个套接字有关的其它信息.
来源: http://www.bubuko.com/infodetail-2466453.html