一, TCP-socket
服务端:
- import socket
- tcp_sk = socket.socket()
- tcp_sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
- tcp_sk.bind(('127.0.0.1',8000))
- tcp_sk.listen()
- conn,addr = tcp_sk.accept()
- conn.send('你好'.encode('utf-8'))
- print(conn.recv(1024).decode('utf-8'))
- conn.close()
- tcp_sk.close()
客户端:
- import socket
- sk = socket.socket()
- sk.connect(('127.0.0.1',8000))
- print(sk.recv(1024).decode('utf-8'))
- sk.send('嘿嘿嘿'.encode('utf-8'))
- sk.close()
二, UDP-socket
服务端:
- import socket
- udp_sk = socket.socket(type=socket.SOCK_DGRAM)
- udp_sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
- udp_sk.bind(('127.0.0.1',8001))
- msg,addr = udp_sk.recvfrom(1024)
- print(msg.decode('utf-8'))
- udp_sk.sendto('你好'.encode('utf-8'),addr)
- udp_sk.close()
客户端:
- import socket
- sk = socket.socket(type=socket.SOCK_DGRAM)
- sk.sendto('哈哈'.encode('utf-8'),('127.0.0.1',8001))
- msg,addr = sk.recvfrom(1024)
- print(msg.decode('utf-8'))
- sk.close()
三, socketserver
服务端:
- import socketserver
- class Myserver(socketserver.BaseRequestHandler):
- def handle(self):
- conn = self.request
- while True:
- conn.send(b'hello')
- print(conn.recv(1024).decode('utf-8'))
- socketserver.TCPServer.allow_reuse_address = True
- server = socketserver.ThreadingTCPServer(('127.0.0.1',8080),Myserver)
- server.serve_forever()
客户端:
- import socket
- sk = socket.socket()
- sk.connect(('127.0.0.1',8080))
- while True:
- ret = sk.recv(1024)
- print(ret.decode('utf-8'))
- sk.send(b'hiworld')
- sk.close()
四, 进程
方式一,
- from multiprocessing import Process
- def func(arg):
- print(arg)
- if __name__ == '__main__':
- p = Process(target=func,args=('子进程',))
- p.start()
- p.join()
- print('主进程')
方式二,
- from multiprocessing import Process
- class MyProcess(Process):
- def __init__(self,name):
- super().__init__()
- self.name = name
- def run(self):
- print(self.name)
- if __name__ == '__main__':
- p = MyProcess('小明')
- p.start()
五, 线程
方式一,
- from threading import Thread
- import time
- def sleep_boy(name):
- time.sleep(1)
- print('%s is sleeping' %name)
- t = Thread(target=sleep_boy,args=('xiaoming',)) # 这里可以不需要 main, 因为现在只是在一个进程内操作, 不需要导入进程就不会 import 主进程了
- t.start()
- print('主线程')
方式二,
- from threading import Thread
- import time
- class Sleep_boy(Thread):
- def __init__(self,name):
- super().__init__()
- self.name = name
- def run(self):
- time.sleep(1)
- print('%s is sleeping' % self.name)
- t = Sleep_boy('xiaoming')
- t.start()
- print('主线程')
六, 协程
1,greenlet 例子:
- import time
- from greenlet import greenlet
- def cooking():
- print('cooking 1')
- g2.switch() # 切换到 g2, 让 g2 的函数工作
- time.sleep(1)
- print('cooking 2')
- def watch():
- print('watch TV 1')
- time.sleep(1)
- print('watch TV 2')
- g1.switch() # 切换到 g1, 让 g1 的函数工作
- g1 = greenlet(cooking)
- g2 = greenlet(watch)
- g1.switch() # 切换到 g1, 让 g1 的函数工作
greenlet 的缺陷: 很显然 greenlet 实现了协程的切换功能, 可以自己设置什么时候切, 在哪切, 但是它遇到阻塞并没有自动切换,
因此并不能提高效率. 所以一般我们都使用 gevent 模块实现协程
2,gevent 例子:
- from gevent import monkey
- monkey.patch_all()
- import time
- import gevent
- def cooking():
- print('cooking 1')
- time.sleep(1)
- print('cooking 2')
- def watch():
- print('watch TV 1')
- time.sleep(1)
- print('watch TV 2')
- g1 = gevent.spawn(cooking) # 自动检测阻塞事件, 遇见阻塞了就会进行切换
- g2 = gevent.spawn(watch)
- g1.join() # 阻塞直到 g1 结束
- g2.join() # 阻塞直到 g2 结束
七, 进程池
1, 同步提交 apply:
- import os
- import time
- from multiprocessing import Pool
- def test(num):
- time.sleep(1)
- print('%s:%s' %(num,os.getpid()))
- return num*2
- if __name__ == '__main__':
- p = Pool()
- for i in range(20):
- res = p.apply(test,args=(i,)) # 提交任务的方法 同步提交
- print('-->',res) # res 就是 test 的 return 的值, 同步提交的返回值可以直接使用
2, 异步提交 apply_async:
2-1 无返回值:
- import time
- from multiprocessing import Pool
- def func(num):
- time.sleep(1)
- print('做了 %s 件衣服'%num)
- if __name__ == '__main__':
- p = Pool(4) # 进程池中创建 4 个进程, 不写的话, 默认值为你电脑的 CUP 数量
- for i in range(50):
- p.apply_async(func,args=(i,)) # 异步提交 func 到一个子进程中执行, 没有返回值的情况
- p.close() # 关闭进程池, 用户不能再向这个池中提交任务了
- p.join() # 阻塞, 直到进程池中所有的任务都被执行完
2-2 有返回值:
- import time
- import os
- from multiprocessing import Pool
- def test(num):
- time.sleep(1)
- print('%s:%s' %(num,os.getpid()))
- return num*2
- if __name__ == '__main__':
- p = Pool()
- res_lst = []
- for i in range(20):
- res = p.apply_async(test,args=(i,)) # 提交任务的方法 异步提交
- res_lst.append(res)
- for res in res_lst:
- print(res.get()) # 异步提交的返回值需要 get,get 有阻塞效果, 此时就不需要 close 和 join
- 2-3map:
map 接收一个函数和一个可迭代对象, 是异步提交的简化版本, 自带 close 和 join 方法
可迭代对象的每一个值就是函数接收的实参, 可迭代对象的长度就是创建的任务数量
map 可以直接拿到返回值的可迭代对象 (列表), 循环就可以获取返回值
- import time
- from multiprocessing import Pool
- def func(num):
- print('子进程:',num)
- # time.sleep(1)
- return num
- if __name__ == '__main__':
- p = Pool()
- ret = p.map(func,range(10)) # ret 是列表
- for i in ret:
- print('返回值:',i)
2-4 回调函数:
- import os
- from multiprocessing import Pool
- def func(i):
- print('子进程:',os.getpid())
- return i
- def call_back(res):
- print('回调函数:',os.getpid())
- print('res--->',res)
- if __name__ == '__main__':
- p = Pool()
- print('主进程:',os.getpid())
- p.apply_async(func,args=(1,),callback=call_back) # callback 关键字传参, 参数是回调函数
- p.close()
- p.join()
八, 进程池, 线程池
线程池:
- 1,
- import time
- from concurrent.futures import ThreadPoolExecutor
- def func(i):
- print('thread',i)
- time.sleep(1)
- print('thread %s end'%i)
- tp = ThreadPoolExecutor(5) # 相当于 tp = Pool(5)
- tp.submit(func,1) # 相当于 tp.apply_async(func,args=(1,))
- tp.shutdown() # 相当于 tp.close() + tp.join()
- print('主线程')
- 2,
- import time
- from concurrent.futures import ThreadPoolExecutor
- from threading import currentThread
- def func(i):
- print('thread',i,currentThread().ident)
- time.sleep(1)
- print('thread %s end'%i)
- tp = ThreadPoolExecutor(5)
- for i in range(20):
- tp.submit(func,i)
- tp.shutdown() # shutdown 一次就够了, 会自动把所有的线程都 join()
- print('主线程')
3, 返回值
- import time
- from concurrent.futures import ThreadPoolExecutor
- from threading import currentThread
- def func(i):
- print('thread',i,currentThread().ident)
- time.sleep(1)
- print('thread %s end' %i)
- return i * '*'
- tp = ThreadPoolExecutor(5)
- ret_lst = []
- for i in range(20):
- ret = tp.submit(func,i)
- ret_lst.append(ret)
- for ret in ret_lst:
- print(ret.result()) # 相当于 ret.get()
- print('主线程')
- 4,map
map 接收一个函数和一个可迭代对象
可迭代对象的每一个值就是函数接收的实参, 可迭代对象的长度就是创建的线程数量
map 可以直接拿到返回值的可迭代对象 (列表), 循环就可以获取返回值
- import time
- from concurrent.futures import ThreadPoolExecutor
- def func(i):
- print('thread',i)
- time.sleep(1)
- print('thread %s end'%i)
- return i * '*'
- tp = ThreadPoolExecutor(5)
- ret = tp.map(func,range(20))
- for i in ret:
- print(i)
5, 回调函数
回调函数在进程池是由主进程实现的
回调函数在线程池是由子线程实现的
- import time
- from concurrent.futures import ThreadPoolExecutor
- from threading import currentThread
- def func(i):
- print('thread',i,currentThread().ident)
- time.sleep(1)
- print('thread %s end'%i)
- return i * '*'
- def call_back(arg):
- print('call back :',currentThread().ident)
- print('ret :',arg.result()) # multiprocessing 的 Pool 回调函数中的参数不需要 get(), 这里需要 result()
- tp = ThreadPoolExecutor(5)
- ret_lst = []
- for i in range(20):
- tp.submit(func,i).add_done_callback(call_back) # 使用 add_done_callback() 方法实现回调函数
- print('主线程',currentThread().ident)
来源: http://www.bubuko.com/infodetail-2783804.html