EventThread 负责侦听 Darwin 系统的 Socket 事件,包括以下两类, 对于两类事件是分别如何处理的,我们从头开始分析。
a) 建立新的 RTSP 连接请求事件
b) 已有 RTSP 连接上的 RTSP 请求消息事件
EventThread 的创建
RunServer.cpp 中 StartServer 函数初始化调用了 Socket::Initialize(),在该函数内即创建了 EventThread,但是这里仅为创建,并未启动,所以还没有进入到线程的 Entry() 函数
EventThred 的运行
因为 EasyDarwin 支持多种平台的运行,我们进一步分析可以发现
static void StartThread() { sEventThread->Start();}
启动了在上一步创建的 sEventThread 类所对应的线程
sEventThread 继承于 OSThread
- void OSThread: :Start() {#ifdef __Win32__ unsigned int theId = 0; // We don't care about the identifier
- fThreadID = (HANDLE) _beginthreadex(NULL, // Inherit security
- 0, // Inherit stack size
- _Entry, // Entry function
- (void * ) this, // Entry arg
- 0, // Begin executing immediately
- & theId);
- Assert(fThreadID != NULL);#elif __PTHREADS__ pthread_attr_t * theAttrP;#ifdef _POSIX_THREAD_PRIORITY_SCHEDULING
- //theAttrP = &sThreadAttr;
- theAttrP = 0;#
- else theAttrP = NULL;#endif int err = pthread_create((pthread_t * ) & fThreadID, theAttrP, _Entry, (void * ) this);
- Assert(err == 0);#
- else fThreadID = (UInt32) cthread_fork((cthread_fn_t) _Entry, (any_t) this);#endif
- }
上面对于不同平台的函数做了预定义,这种写法值得参考
我们可以看到这个监听线程的线程函数为_Entry,我们可以看到_Entry 的实现
- #ifdef __Win32__ unsigned int WINAPI OSThread: :_Entry(LPVOID inThread)#
- else void * OSThread: :_Entry(void * inThread) //static
- #endif {
- OSThread * theThread = (OSThread * ) inThread;#ifdef __Win32__ BOOL theErr = ::TlsSetValue(sThreadStorageIndex, theThread);
- Assert(theErr == TRUE);#elif __PTHREADS__ theThread - >fThreadID = (pthread_t) pthread_self();
- pthread_setspecific(OSThread: :gMainKey, theThread);#
- else theThread - >fThreadID = (UInt32) cthread_self();
- cthread_set_data(cthread_self(), (any_t) theThread);#endif theThread - >SwitchPersonality();
- //
- // Run the thread
- theThread - >Entry();
- return NULL;
- }
EventThread 线程启动后进入,EventThread::Entry() 函数中开始调用 select_waitevent 函数监听所有的 Socket 端口,直到有事件发生为止
当有事件发生时:
- void EventThread: :Entry() {
- struct eventreq theCurrentEvent;: :memset( & theCurrentEvent, '\0', sizeof(theCurrentEvent));
- while (true) {
- int theErrno = EINTR;
- while (theErrno == EINTR) {#
- if MACOSXEVENTQUEUE int theReturnValue = waitevent( & theCurrentEvent, NULL);#
- else
- //调用select_waitevent函数监听所有的Socket端口,直到有事件发生为止
- int theReturnValue = select_waitevent( & theCurrentEvent, NULL);#endif
- //Sort of a hack. In the POSIX version of the server, waitevent can return
- //an actual POSIX errorcode.
- if (theReturnValue >= 0) theErrno = theReturnValue;
- else theErrno = OSThread: :GetErrno();
- }
- AssertV(theErrno == 0, theErrno);
- //ok, there's data waiting on this socket. Send a wakeup.
- if (theCurrentEvent.er_data != NULL) {
- //The cookie in this event is an ObjectID. Resolve that objectID into
- //a pointer.
- StrPtrLen idStr((char * ) & theCurrentEvent.er_data, sizeof(theCurrentEvent.er_data));
- OSRef * ref = fRefTable.Resolve( & idStr);
- if (ref != NULL) {
- EventContext * theContext = (EventContext * ) ref - >GetObject();#
- if DEBUG theContext - >fModwatched = false;#endif theContext - >ProcessEvent(theCurrentEvent.er_eventbits);
- fRefTable.Release(ref);
- }
- }#
- if EVENT_CONTEXT_DEBUG SInt64 yieldStart = OS: :Milliseconds();#endif this - >ThreadYield();#
- if EVENT_CONTEXT_DEBUG SInt64 yieldDur = OS: :Milliseconds() - yieldStart;
- static SInt64 numZeroYields;
- if (yieldDur > 1) {
- qtss_printf("EventThread time in OSTHread::Yield %i, numZeroYields %i\n", (SInt32) yieldDur, (SInt32) numZeroYields);
- numZeroYields = 0;
- } else numZeroYields++;#endif
- }
- }
有 Socket 事件发生时处理流程:
1. 构造 StrPtrLen 类型变量 idStr,在 fRefTable 表中查找标识为 idStr 的 2.OSRef 类型引用。
3. 通过引用指针 ref 查找 EventContext 对象
4. 调用 EventContext 的 ProcessEvent 方法(重点关注)
5. 在 fRefTable 释放 ref 引用指针
这里要着重关注 ProcessEvent() 方法:
ProcessEvent 作为虚函数有两个实现,
EventContext 类中实现了 ProcessEvent 方法(对应情况 b)
EventContext 的派生类 TCPListenerSocket 中实现了 ProcessEvent 方法。(对应情况 a)
各位读者好好理解下面的 a 和 b 对应的内容。
a). 建立新的 RTSP 连接请求事件
fRefTable 表插入时间: QTSServer::StartTasks() 中的 fListeners[x]->RequestEvent(EV_RE);,
存入的对象为 RTSPListenerSocket
调用方法: TCPListenerSocket::ProcessEvent
方法描述: 此方法调用 RTSPListenerSocket 的 GetSessionTask 方法建立一个 RTSPSession,并把相应的套接口加入侦听队列,等待 RTSP 请求。
然后还需调用 this->RequestEvent(EV_RE) 把建立 RTSP 连接的请求加入到侦听队列。
b). 已有 RTSP 连接上的 RTSP 请求消息事件
fRefTable 表插入时间: 上一步的 this->RequestEvent(EV_RE)
调用方法: EventContext::ProcessEvent
方法描述: 通过 Task 的 Signal 把对应的 RTSPSession 类型的 Task 加入到 TaskThread::fTaskQueue 中等待 TaskThread 处理。
另外介绍下 TCPListenerSocket::ProcessEvent 方法,RTSPSession 就这这里的 GetSessionTask 中被创建。摘取片段进行备注说明
- //获取RTSPSession的fSocket参数,然后将当前接收数据的socket赋值给该参数;
- theTask = this - >GetSessionTask( & theSocket);
- if (theTask == NULL) { //this should be a disconnect. do an ioctl call?
- close(osSocket);
- if (theSocket) theSocket - >fState &= ~kConnected; // turn off connected state
- } else //创建成功,接着创建Socket对象;
- {
- //把刚刚建立好的RTSP连接加入到侦听队列,等待RTSP请求的到来;
- theSocket - >Set(osSocket, &addr);
- theSocket - >InitNonBlocking(osSocket);
- //将新建的RTSPSession保存到EventContext对应的fTask属性,当该socket连接后面的消息事件到来时即将该RTSPSession任务加入到task线程中;
- theSocket - >SetTask(theTask);
- theSocket - >RequestEvent(EV_RE); //新对象监听读事件;(EventContext开始监听该socket连接之上发送的消息)
- //theTask就是新建的RTSPSession;
- theTask - >SetThreadPicker(Task: :GetBlockingTaskThreadPicker()); //The RTSP Task processing threads
- 更多类容请看: 阿木技术社区
来源: http://lib.csdn.net/article/liveplay/49111