- 回顾:
- 进程
- 一个程序需要运行所需的资源的集合
- 每个进程数据是独立的
- 每个进程里至少有一个线程
- 进程里可以有多个线程
- 线程数据是共享的
- 一个进程的多个线 6程可以充分利用多核cpu
- multiprocessing
- pipe
- queue
- 实现的是进程间的数据传递,通信
- manager 实现了多进程间的数据共享
- 进程间共享数据的代价是高昂的,所以要尽量避免进程间的数据共享
- 线程间的数据本来就是共享的
- 线程要修改同一份数据,必须加锁,互斥锁mutex
- event
- 线程间交互
- 生产者消费者模型
- 解耦 (降低进程间的依赖性)
- 提高程序运行效率
- queue
- FIFO
- LIFO
- 优先级queue
- 适用场景:
- 线程:
- I/O密集型(I/O不占用cpu),socket 爬虫 web
- 进程:cpu运算密集型,金融分析
- 1 # -*- coding:utf-8 -*-
- 2
- 3
- 4 from greenlet import greenlet
- 5
- 6
- 7 def test1():
- 8 print(12)
- 9 gr2.switch()
- 10 print(34)
- 11 gr2.switch()
- 12
- 13
- 14 def test2():
- 15 print(56)
- 16 gr1.switch()
- 17 print(78)
- 18
- 19
- 20 gr1 = greenlet(test1)
- 21 gr2 = greenlet(test2)
- 22 gr1.switch()
- 1 import gevent
- 2
- 3 def func1():
- 4 print('\033[31;1m李闯在跟海涛搞...\033[0m')
- 5 gevent.sleep(2)
- 6 print('\033[31;1m李闯又回去跟继续跟海涛搞...\033[0m')
- 7
- 8 def func2():
- 9 print('\033[32;1m李闯切换到了跟海龙搞...\033[0m')
- 10 gevent.sleep(1)
- 11 print('\033[32;1m李闯搞完了海涛,回来继续跟海龙搞...\033[0m')
- 12
- 13
- 14 gevent.joinall([
- 15 gevent.spawn(func1),
- 16 gevent.spawn(func2),
- 17 #gevent.spawn(func3),
- 18 ])
- import gevent
- def task(pid):
- """
- Some non-deterministic task
- """
- gevent.sleep(0.5)
- print('Task %s done' % pid)
- def synchronous():
- for i in range(1,10):
- task(i)
- def asynchronous():
- threads = [gevent.spawn(task, i) for i in range(10)]
- gevent.joinall(threads)
- print('Synchronous:')
- synchronous()
- print('Asynchronous:')
- asynchronous()
select 多并发 socket 例子
server
- 1 #_*_coding:utf-8_*_
- 2 __author__ = 'Alex Li'
- 3
- 4 import select
- 5 import socket
- 6 import sys
- 7 import queue
- 8
- 9
- 10 server = socket.socket()
- 11 server.setblocking(0)
- 12
- 13 server_addr = ('localhost',10000)
- 14
- 15 print('starting up on %s port %s' % server_addr)
- 16 server.bind(server_addr)
- 17
- 18 server.listen(5)
- 19
- 20
- 21 inputs = [server, ] #自己也要监测呀,因为server本身也是个fd
- 22 outputs = []
- 23
- 24 message_queues = {}
- 25
- 26 while True:
- 27 print("waiting for next event...")
- 28
- 29 readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果没有任何fd就绪,那程序就会一直阻塞在这里
- 30
- 31 for s in readable: #每个s就是一个socket
- 32
- 33 if s is server: #别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了,
- 34 #就是有活动了, 什么情况下它才有活动? 当然 是有新连接进来的时候 呀
- 35 #新连接进来了,接受这个连接
- 36 conn, client_addr = s.accept()
- 37 print("new connection from",client_addr)
- 38 conn.setblocking(0)
- 39 inputs.append(conn) #为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接
- 40 #就会被交给select去监听,如果这个连接的客户端发来了数据 ,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到
- 41 #readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了, 下面就是这么干 的
- 42
- 43 message_queues[conn] = queue.Queue() #接收到客户端的数据后,不立刻返回 ,暂存在队列里,以后发送
- 44
- 45 else: #s不是server的话,那就只能是一个 与客户端建立的连接的fd了
- 46 #客户端的数据过来了,在这接收
- 47 data = s.recv(1024)
- 48 if data:
- 49 print("收到来自[%s]的数据:" % s.getpeername()[0], data)
- 50 message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端
- 51 if s not in outputs:
- 52 outputs.append(s) #为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端
- 53
- 54
- 55 else:#如果收不到data代表什么呢? 代表客户端断开了呀
- 56 print("客户端断开了",s)
- 57
- 58 if s in outputs:
- 59 outputs.remove(s) #清理已断开的连接
- 60
- 61 inputs.remove(s) #清理已断开的连接
- 62
- 63 del message_queues[s] ##清理已断开的连接
- 64
- 65
- 66 for s in writeable:
- 67 try :
- 68 next_msg = message_queues[s].get_nowait()
- 69
- 70 except queue.Empty:
- 71 print("client [%s]" %s.getpeername()[0], "queue is empty..")
- 72 outputs.remove(s)
- 73
- 74 else:
- 75 print("sending msg to [%s]"%s.getpeername()[0], next_msg)
- 76 s.send(next_msg.upper())
- 77
- 78
- 79 for s in exeptional:
- 80 print("handling exception for ",s.getpeername())
- 81 inputs.remove(s)
- 82 if s in outputs:
- 83 outputs.remove(s)
- 84 s.close()
- 85
- 86 del message_queues[s]
client
- 1 #_*_coding:utf-8_*_
- 2 __author__ = 'Alex Li'
- 3
- 4
- 5 import socket
- 6 import sys
- 7
- 8 messages = [ b'This is the message. ',
- 9 b'It will be sent ',
- 10 b'in parts.',
- 11 ]
- 12 server_address = ('localhost', 10000)
- 13
- 14 # Create a TCP/IP socket
- 15 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
- 16 socket.socket(socket.AF_INET, socket.SOCK_STREAM),
- 17 ]
- 18
- 19 # Connect the socket to the port where the server is listening
- 20 print('connecting to %s port %s' % server_address)
- 21 for s in socks:
- 22 s.connect(server_address)
- 23
- 24 for message in messages:
- 25
- 26 # Send messages on both sockets
- 27 for s in socks:
- 28 print('%s: sending "%s"' % (s.getsockname(), message) )
- 29 s.send(message)
- 30
- 31 # Read responses on both sockets
- 32 for s in socks:
- 33 data = s.recv(1024)
- 34 print( '%s: received "%s"' % (s.getsockname(), data) )
- 35 if not data:
- 36 print(sys.stderr, 'closing socket', s.getsockname() )
- 37 复制代码
selectors 模块
- 1 import selectors
- 2 import socket
- 3
- 4 sel = selectors.DefaultSelector()
- 5
- 6 def accept(sock, mask):
- 7 conn, addr = sock.accept() # Should be ready
- 8 print('accepted', conn, 'from', addr)
- 9 conn.setblocking(False)
- 10 sel.register(conn, selectors.EVENT_READ, read)
- 11
- 12 def read(conn, mask):
- 13 data = conn.recv(1000) # Should be ready
- 14 if data:
- 15 print('echoing', repr(data), 'to', conn)
- 16 conn.send(data) # Hope it won't block
- 17 else:
- 18 print('closing', conn)
- 19 sel.unregister(conn)
- 20 conn.close()
- 21
- 22 sock = socket.socket()
- 23 sock.bind(('localhost', 10000))
- 24 sock.listen(100)
- 25 sock.setblocking(False)
- 26 sel.register(sock, selectors.EVENT_READ, accept)
- 27
- 28 while True:
- 29 events = sel.select()
- 30 for key, mask in events:
- 31 callback = key.data
- 32 callback(key.fileobj, mask)
来源: