Python 中有一个 select 模块,其中提供了:select、poll、epoll 三个方法,分别调用系统的 select,poll,epoll 从而实现 IO 多路复用。
- Windows Python:
- 提供: select
- Mac Python:
- 提供: select
- Linux Python:
- 提供: select、poll、epoll
注意:网络操作、文件操作、终端操作等均属于 IO 操作,对于 windows 只支持 Socket 操作,其他系统支持其他 IO 操作,但是无法检测 普通文件操作 自动上次读取是否已经变化。
句柄列表 11, 句柄列表 22, 句柄列表 33 = select.select(句柄序列 1, 句柄序列 2, 句柄序列 3, 超时时间)
参数: 可接受四个参数(前三个必须)返回值:三个列表
select 方法用来监视文件句柄,如果句柄发生变化,则获取该句柄。1、当 参数 1 序列中的句柄发生可读时(accetp 和 read),则获取发生变化的句柄并添加到 返回值 1 序列中 2、当 参数 2 序列中含有句柄时,则将该序列中所有的句柄添加到 返回值 2 序列中 3、当 参数 3 序列中的句柄发生错误时,则将该发生错误的句柄添加到 返回值 3 序列中 4、当 超时时间 未设置,则 select 会一直阻塞,直到监听的句柄发生变化当 超时时间 = 1 时,那么如果监听的句柄均无任何变化,则 select 会阻塞 1 秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import select
- import threading
- import sys
- while True:
- readable, writeable, error = select.select([sys.stdin,],[],[],1)
- if sys.stdin in readable:
- print 'select get stdin',sys.stdin.readline()
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import socket
- import select
- sk1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sk1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sk1.bind(('127.0.0.1',8002))
- sk1.listen(5)
- sk1.setblocking(0)
- inputs = [sk1,]
- while True:
- readable_list, writeable_list, error_list = select.select(inputs, [], inputs, 1)
- for r in readable_list:
- # 当客户端第一次连接服务端时
- if sk1 == r:
- print 'accept'
- request, address = r.accept()
- request.setblocking(0)
- inputs.append(request)
- # 当客户端连接上服务端之后,再次发送数据时
- else:
- received = r.recv(1024)
- # 当正常接收客户端发送的数据时
- if received:
- print 'received data:', received
- # 当客户端关闭程序时
- else:
- inputs.remove(r)
- sk1.close()
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import socket
- ip_port = ('127.0.0.1',8002)
- sk = socket.socket()
- sk.connect(ip_port)
- while True:
- inp = raw_input('please input:')
- sk.sendall(inp)
- sk.close()
此处的 Socket 服务端相比与原生的 Socket,他支持当某一个请求不再发送数据时,服务器端不会等待而是可以去处理其他请求的数据。但是,如果每个请求的耗时比较长时,select 版本的服务器端也无法完成同时操作。
- #!/usr/bin/env python
- #coding:utf8
- '''
- 服务器的实现 采用select的方式
- '''
- import select
- import socket
- import sys
- import Queue
创建套接字并设置该套接字为非阻塞模式
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)server.setblocking(0)
绑定套接字 server_address = ('localhost',10000)print >>sys.stderr,'starting up on %s port %s'% server_addressserver.bind(server_address)
- #backlog等于5,表示内核已经接到了连接请求,但服务器还没有调用accept进行处理的连接个数最大为5
- #这个值不能无限大,因为要在内核中维护连接队列
- server.listen(5)
- #初始化读取数据的监听列表,最开始时希望从server这个套接字上读取数据
- inputs = [server]
- #初始化写入数据的监听列表,最开始并没有客户端连接进来,所以列表为空
- outputs = []
- #要发往客户端的数据
- message_queues = {}
- while inputs:
- print >>sys.stderr,'waiting for the next event'
- #调用select监听所有监听列表中的套接字,并将准备好的套接字加入到对应的列表中
- readable,writable,exceptional = select.select(inputs,outputs,inputs)#列表中的socket 套接字 如果是文件呢?
- #监控文件句柄有某一处发生了变化 可写 可读 异常属于Linux中的网络编程
- #属于同步I/O操作,属于I/O复用模型的一种
- #rlist--等待到准备好读
- #wlist--等待到准备好写
- #xlist--等待到一种异常
- #处理可读取的套接字
- '''
- 如果server这个套接字可读,则说明有新链接到来
- 此时在server套接字上调用accept,生成一个与客户端通讯的套接字
- 并将与客户端通讯的套接字加入inputs列表,下一次可以通过select检查连接是否可读
- 然后在发往客户端的缓冲中加入一项,键名为:与客户端通讯的套接字,键值为空队列
- select系统调用是用来让我们的程序监视多个文件句柄(file descrīptor)的状态变化的。程序会停在select这里等待,
- 直到被监视的文件句柄有某一个或多个发生了状态改变
- '''
- '''
- 若可读的套接字不是server套接字,有两种情况:一种是有数据到来,另一种是链接断开
- 如果有数据到来,先接收数据,然后将收到的数据填入往客户端的缓存区中的对应位置,最后
- 将于客户端通讯的套接字加入到写数据的监听列表:
- 如果套接字可读.但没有接收到数据,则说明客户端已经断开。这时需要关闭与客户端连接的套接字
- 进行资源清理
- '''
- for s in readable:
- if s is server:
- connection,client_address = s.accept()
- print >>sys.stderr,'connection from',client_address
- connection.setblocking(0)#设置非阻塞
- inputs.append(connection)
- message_queues[connection] = Queue.Queue()
- else:
- data = s.recv(1024)
- if data:
- print >>sys.stderr,'received "%s" from %s'% (data,s.getpeername())
- message_queues[s].put(data)
- if s not in outputs:
- outputs.append(s)
- else:
- print >>sys.stderr,'closing',client_address
- if s in outputs:
- outputs.remove(s)
- inputs.remove(s)
- s.close()
- del message_queues[s]
- #处理可写的套接字
- '''
- 在发送缓冲区中取出响应的数据,发往客户端。
- 如果没有数据需要写,则将套接字从发送队列中移除,select中不再监视
- '''
- for s in writable:
- try:
- next_msg = message_queues[s].get_nowait()
- except Queue.Empty:
- print >>sys.stderr,' ',s,getpeername(),'queue empty'
- outputs.remove(s)
- else:
- print >>sys.stderr,'sending "%s" to %s'% (next_msg,s.getpeername())
- s.send(next_msg)
- #处理异常情况
- for s in exceptional:
- for s in exceptional:
- print >>sys.stderr,'exception condition on',s.getpeername()
- inputs.remove(s)
- if s in outputs:
- outputs.remove(s)
- s.close()
- del message_queues[s]
SocketServer 内部使用 IO 多路复用 以及 "多线程" 和 "多进程" ,从而实现并发处理多个客户端请求的 Socket 服务端。即:每个客户端请求连接到服务器时,Socket 服务端都会在服务器是创建一个 "线程" 或者 "进程" 专门负责处理当前客户端的所有请求。
ThreadingTCPServer
ThreadingTCPServer 实现的 Soket 服务器内部会为每个 client 创建一个 "线程",该线程用来和客户端进行交互。
- 使用ThreadingTCPServer:
- 创建一个继承自 SocketServer.BaseRequestHandler 的类
- 类中必须定义一个名称为 handle 的方法
- 启动ThreadingTCPServer
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import SocketServer
- class MyServer(SocketServer.BaseRequestHandler):
- def handle(self):
- # print self.request,self.client_address,self.server
- conn = self.request
- conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.')
- Flag = True
- while Flag:
- data = conn.recv(1024)
- if data == 'exit':
- Flag = False
- elif data == '0':
- conn.sendall('通过可能会被录音.balabala一大推')
- else:
- conn.sendall('请重新输入.')
- if __name__ == '__main__':
- server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
- server.serve_forever()
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import socket
- ip_port = ('127.0.0.1',8009)
- sk = socket.socket()
- sk.connect(ip_port)
- sk.settimeout(5)
- while True:
- data = sk.recv(1024)
- print 'receive:',data
- inp = raw_input('please input:')
- sk.sendall(inp)
- if inp == 'exit':
- break
- sk.close()
ThreadingTCPServer 的类图关系如下:
内部调用流程为:
- 启动服务端程序
- 执行 TCPServer.__init__ 方法,创建服务端Socket对象并绑定 IP 和 端口
- 执行 BaseServer.__init__ 方法,将自定义的继承自SocketServer.BaseRequestHandler 的类 MyRequestHandle赋值给 self.RequestHandlerClass
- 执行 BaseServer.server_forever 方法,While 循环一直监听是否有客户端请求到达 ...
- 当客户端连接到达服务器
- 执行 ThreadingMixIn.process_request 方法,创建一个 "线程" 用来处理请求
- 执行 ThreadingMixIn.process_request_thread 方法
- 执行 BaseServer.finish_request 方法,执行 self.RequestHandlerClass() 即:执行 自定义 MyRequestHandler 的构造方法(自动调用基类BaseRequestHandler的构造方法,在该构造方法中又会调用 MyRequestHandler的handle方法)
class BaseServer:
- """Base class for server classes.
- Methods for the caller:
- - __init__(server_address, RequestHandlerClass)
- - serve_forever(poll_interval=0.5)
- - shutdown()
- - handle_request() # if you do not use serve_forever()
- - fileno() -> int # for select()
- Methods that may be overridden:
- - server_bind()
- - server_activate()
- - get_request() -> request, client_address
- - handle_timeout()
- - verify_request(request, client_address)
- - server_close()
- - process_request(request, client_address)
- - shutdown_request(request)
- - close_request(request)
- - handle_error()
- Methods for derived classes:
- - finish_request(request, client_address)
- Class variables that may be overridden by derived classes or
- instances:
- - timeout
- - address_family
- - socket_type
- - allow_reuse_address
- Instance variables:
- - RequestHandlerClass
- - socket
- """
- timeout = None
- def __init__(self, server_address, RequestHandlerClass):
- """Constructor. May be extended, do not override."""
- self.server_address = server_address
- self.RequestHandlerClass = RequestHandlerClass
- self.__is_shut_down = threading.Event()
- self.__shutdown_request = False
- def server_activate(self):
- """Called by constructor to activate the server.
- May be overridden.
- """
- pass
- def serve_forever(self, poll_interval=0.5):
- """Handle one request at a time until shutdown.
- Polls for shutdown every poll_interval seconds. Ignores
- self.timeout. If you need to do periodic tasks, do them in
- another thread.
- """
- self.__is_shut_down.clear()
- try:
- while not self.__shutdown_request:
- # XXX: Consider using another file descriptor or
- # connecting to the socket to wake this up instead of
- # polling. Polling reduces our responsiveness to a
- # shutdown request and wastes cpu at all other times.
- r, w, e = _eintr_retry(select.select, [self], [], [],
- poll_interval)
- if self in r:
- self._handle_request_noblock()
- finally:
- self.__shutdown_request = False
- self.__is_shut_down.set()
- def shutdown(self):
- """Stops the serve_forever loop.
- Blocks until the loop has finished. This must be called while
- serve_forever() is running in another thread, or it will
- deadlock.
- """
- self.__shutdown_request = True
- self.__is_shut_down.wait()
- # The distinction between handling, getting, processing and
- # finishing a request is fairly arbitrary. Remember:
- #
- # - handle_request() is the top-level call. It calls
- # select, get_request(), verify_request() and process_request()
- # - get_request() is different for stream or datagram sockets
- # - process_request() is the place that may fork a new process
- # or create a new thread to finish the request
- # - finish_request() instantiates the request handler class;
- # this constructor will handle the request all by itself
- def handle_request(self):
- """Handle one request, possibly blocking.
- Respects self.timeout.
- """
- # Support people who used socket.settimeout() to escape
- # handle_request before self.timeout was available.
- timeout = self.socket.gettimeout()
- if timeout is None:
- timeout = self.timeout
- elif self.timeout is not None:
- timeout = min(timeout, self.timeout)
- fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
- if not fd_sets[0]:
- self.handle_timeout()
- return
- self._handle_request_noblock()
- def _handle_request_noblock(self):
- """Handle one request, without blocking.
- I assume that select.select has returned that the socket is
- readable before this function was called, so there should be
- no risk of blocking in get_request().
- """
- try:
- request, client_address = self.get_request()
- except socket.error:
- return
- if self.verify_request(request, client_address):
- try:
- self.process_request(request, client_address)
- except:
- self.handle_error(request, client_address)
- self.shutdown_request(request)
- def handle_timeout(self):
- """Called if no new request arrives within self.timeout.
- Overridden by ForkingMixIn.
- """
- pass
- def verify_request(self, request, client_address):
- """Verify the request. May be overridden.
- Return True if we should proceed with this request.
- """
- return True
- def process_request(self, request, client_address):
- """Call finish_request.
- Overridden by ForkingMixIn and ThreadingMixIn.
- """
- self.finish_request(request, client_address)
- self.shutdown_request(request)
- def server_close(self):
- """Called to clean-up the server.
- May be overridden.
- """
- pass
- def finish_request(self, request, client_address):
- """Finish one request by instantiating RequestHandlerClass."""
- self.RequestHandlerClass(request, client_address, self)
- def shutdown_request(self, request):
- """Called to shutdown and close an individual request."""
- self.close_request(request)
- def close_request(self, request):
- """Called to clean up an individual request."""
- pass
- def handle_error(self, request, client_address):
- """Handle an error gracefully. May be overridden.
- The default is to print a traceback and continue.
- """
- print '-'*40
- print 'Exception happened during processing of request from',
- print client_address
- import traceback
- traceback.print_exc() # XXX But this goes to stderr!
- print '-'*40
class TCPServer(BaseServer):
- """Base class for various socket-based server classes.
- Defaults to synchronous IP stream (i.e., TCP).
- Methods for the caller:
- - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
- - serve_forever(poll_interval=0.5)
- - shutdown()
- - handle_request() # if you don't use serve_forever()
- - fileno() -> int # for select()
- Methods that may be overridden:
- - server_bind()
- - server_activate()
- - get_request() -> request, client_address
- - handle_timeout()
- - verify_request(request, client_address)
- - process_request(request, client_address)
- - shutdown_request(request)
- - close_request(request)
- - handle_error()
- Methods for derived classes:
- - finish_request(request, client_address)
- Class variables that may be overridden by derived classes or
- instances:
- - timeout
- - address_family
- - socket_type
- - request_queue_size (only for stream sockets)
- - allow_reuse_address
- Instance variables:
- - server_address
- - RequestHandlerClass
- - socket
- """
- address_family = socket.AF_INET
- socket_type = socket.SOCK_STREAM
- request_queue_size = 5
- allow_reuse_address = False
- def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
- """Constructor. May be extended, do not override."""
- BaseServer.__init__(self, server_address, RequestHandlerClass)
- self.socket = socket.socket(self.address_family,
- self.socket_type)
- if bind_and_activate:
- try:
- self.server_bind()
- self.server_activate()
- except:
- self.server_close()
- raise
- def server_bind(self):
- """Called by constructor to bind the socket.
- May be overridden.
- """
- if self.allow_reuse_address:
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.socket.bind(self.server_address)
- self.server_address = self.socket.getsockname()
- def server_activate(self):
- """Called by constructor to activate the server.
- May be overridden.
- """
- self.socket.listen(self.request_queue_size)
- def server_close(self):
- """Called to clean-up the server.
- May be overridden.
- """
- self.socket.close()
- def fileno(self):
- """Return socket file number.
- Interface required by select().
- """
- return self.socket.fileno()
- def get_request(self):
- """Get the request and client address from the socket.
- May be overridden.
- """
- return self.socket.accept()
- def shutdown_request(self, request):
- """Called to shutdown and close an individual request."""
- try:
- #explicitly shutdown. socket.close() merely releases
- #the socket and waits for GC to perform the actual close.
- request.shutdown(socket.SHUT_WR)
- except socket.error:
- pass #some platforms may raise ENOTCONN here
- self.close_request(request)
- def close_request(self, request):
- """Called to clean up an individual request."""
- request.close()
class ThreadingMixIn:"""Mix-in class to handle each request in a new thread."""
- # Decides how threads will act upon termination of the
- # main process
- daemon_threads = False
- def process_request_thread(self, request, client_address):
- """Same as in BaseServer but as a thread.
- In addition, exception handling is done here.
- """
- try:
- self.finish_request(request, client_address)
- self.shutdown_request(request)
- except:
- self.handle_error(request, client_address)
- self.shutdown_request(request)
- def process_request(self, request, client_address):
- """Start a new thread to process the request."""
- t = threading.Thread(target = self.process_request_thread,
- args = (request, client_address))
- t.daemon = self.daemon_threads
- t.start()
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
class BaseRequestHandler:
- """Base class for request handler classes.
- This class is instantiated for each request to be handled. The
- constructor sets the instance variables request, client_address
- and server, and then calls the handle() method. To implement a
- specific service, all you need to do is to derive a class which
- defines a handle() method.
- The handle() method can find the request as self.request, the
- client address as self.client_address, and the server (in case it
- needs access to per-server information) as self.server. Since a
- separate instance is created for each request, the handle() method
- can define arbitrary other instance variariables.
- """
- def __init__(self, request, client_address, server):
- self.request = request
- self.client_address = client_address
- self.server = server
- self.setup()
- try:
- self.handle()
- finally:
- self.finish()
- def setup(self):
- pass
- def handle(self):
- pass
- def finish(self):
- pass
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import SocketServer
- class MyServer(SocketServer.BaseRequestHandler):
- def handle(self):
- # print self.request,self.client_address,self.server
- conn = self.request
- conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.')
- Flag = True
- while Flag:
- data = conn.recv(1024)
- if data == 'exit':
- Flag = False
- elif data == '0':
- conn.sendall('通过可能会被录音.balabala一大推')
- else:
- conn.sendall('请重新输入.')
- if __name__ == '__main__':
- server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
- server.serve_forever()
源码精简:
- import socket
- import threading
- import select
- def process(request, client_address):
- print request,client_address
- conn = request
- conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.')
- flag = True
- while flag:
- data = conn.recv(1024)
- if data == 'exit':
- flag = False
- elif data == '0':
- conn.sendall('通过可能会被录音.balabala一大推')
- else:
- conn.sendall('请重新输入.')
- sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sk.bind(('127.0.0.1',8002))
- sk.listen(5)
- while True:
- r, w, e = select.select([sk,],[],[],1)
- print 'looping'
- if sk in r:
- print 'get request'
- request, client_address = sk.accept()
- t = threading.Thread(target=process, args=(request, client_address))
- t.daemon = False
- t.start()
- sk.close()
如精简代码可以看出,SocketServer 的 ThreadingTCPServer 之所以可以同时处理请求得益于 select 和 Threading 两个东西,其实本质上就是在服务器端为每一个客户端创建一个线程,当前线程用来处理对应客户端的请求,所以,可以支持同时 n 个客户端链接(长连接)。
ForkingTCPServer
ForkingTCPServer 和 ThreadingTCPServer 的使用和执行流程基本一致,只不过在内部分别为请求者建立 "线程" 和 "进程"。
基本使用:
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import SocketServer
- class MyServer(SocketServer.BaseRequestHandler):
- def handle(self):
- # print self.request,self.client_address,self.server
- conn = self.request
- conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.')
- Flag = True
- while Flag:
- data = conn.recv(1024)
- if data == 'exit':
- Flag = False
- elif data == '0':
- conn.sendall('通过可能会被录音.balabala一大推')
- else:
- conn.sendall('请重新输入.')
- if __name__ == '__main__':
- server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyServer)
- server.serve_forever()
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import socket
- ip_port = ('127.0.0.1',8009)
- sk = socket.socket()
- sk.connect(ip_port)
- sk.settimeout(5)
- while True:
- data = sk.recv(1024)
- print 'receive:',data
- inp = raw_input('please input:')
- sk.sendall(inp)
- if inp == 'exit':
- break
- sk.close()
以上 ForkingTCPServer 只是将 ThreadingTCPServer 实例中的代码:
- server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyRequestHandler)
- 变更为:
- server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyRequestHandler)
SocketServer 的 ThreadingTCPServer 之所以可以同时处理请求得益于 select 和 os.fork 两个东西,其实本质上就是在服务器端为每一个客户端创建一个进程,当前新创建的进程用来处理对应客户端的请求,所以,可以支持同时 n 个客户端链接(长连接)。
源码剖析参考 ThreadingTCPServerTwisted
Twisted 是一个事件驱动的网络框架,其中包含了诸多功能,例如:网络协议、线程、数据库管理、网络操作、电子邮件等。
简而言之,事件驱动分为二个部分:第一,注册事件;第二,触发事件。
自定义事件驱动框架,命名为:"弑君者":
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # event_drive.py
- event_list = []
- def run():
- for event in event_list:
- obj = event()
- obj.execute()
- class BaseHandler(object):
- """
- 用户必须继承该类,从而规范所有类的方法(类似于接口的功能)
- """
- def execute(self):
- raise Exception('you must overwrite execute')
- 程序员使用"弑君者框架":
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- from source import event_drive
- class MyHandler(event_drive.BaseHandler):
- def execute(self):
- print 'event-drive execute MyHandler'
- event_drive.event_list.append(MyHandler)
- event_drive.run()
如上述代码,事件驱动只不过是框架规定了执行顺序,程序员在使用框架时,可以向原执行顺序中注册 "事件",从而在框架执行时可以出发已注册的 "事件"。
基于事件驱动 Socket
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- from twisted.internet import protocol
- from twisted.internet import reactor
- class Echo(protocol.Protocol):
- def dataReceived(self, data):
- self.transport.write(data)
- def main():
- factory = protocol.ServerFactory()
- factory.protocol = Echo
- reactor.listenTCP(8000,factory)
- reactor.run()
- if __name__ == '__main__':
- main()
程序执行流程:
- 运行服务端程序
- 创建Protocol的派生类Echo
- 创建ServerFactory对象,并将Echo类封装到其protocol字段中
- 执行reactor的 listenTCP 方法,内部使用 tcp.Port 创建socket server对象,并将该对象添加到了 reactor的set类型的字段 _read 中
- 执行reactor的 run 方法,内部执行 while 循环,并通过 select 来监视 _read 中文件描述符是否有变化,循环中...
- 客户端请求到达
- 执行reactor的 _doReadOrWrite 方法,其内部通过反射调用 tcp.Port 类的 doRead 方法,内部 accept 客户端连接并创建Server对象实例(用于封装客户端socket信息)和 创建 Echo 对象实例(用于处理请求) ,然后调用 Echo 对象实例的 makeConnection 方法,创建连接。
- 执行 tcp.Server 类的 doRead 方法,读取数据,
- 执行 tcp.Server 类的 _dataReceived 方法,如果读取数据内容为空(关闭链接),否则,出发 Echo 的 dataReceived 方法
- 执行 Echo 的 dataReceived 方法
从源码可以看出,上述实例本质上使用了事件驱动的方法 和 IO 多路复用的机制来进行 Socket 的处理。
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- from twisted.internet import reactor, protocol
- from twisted.web.client import getPage
- from twisted.internet import reactor
- import time
- class Echo(protocol.Protocol):
- def dataReceived(self, data):
- deferred1 = getPage('http://cnblogs.com')
- deferred1.addCallback(self.printContents)
- deferred2 = getPage('http://baidu.com')
- deferred2.addCallback(self.printContents)
- for i in range(2):
- time.sleep(1)
- print 'execute ',i
- def execute(self,data):
- self.transport.write(data)
- def printContents(self,content):
- print len(content),content[0:100],time.time()
- def main():
- factory = protocol.ServerFactory()
- factory.protocol = Echo
- reactor.listenTCP(8000,factory)
- reactor.run()
- if __name__ == '__main__':
- main()
来源: http://www.bubuko.com/infodetail-2446280.html