python中的多进程、多线程、协程
2018-12-17 17:32:16 0 举报
AI智能生成
多进程、多线程、协程
作者其他创作
大纲/内容
协程
概念及简介
协程从代码级实现并发,操作系统感知不到,粒度越细,开发难度越大
greenlet
模块导入:from greenlet import greenlet
创建对象:g1=greenlet(eat)
切换对象:g1.switch('egon')
缺点:该模块不能自动实现来回切换
示例代码
from greenlet import greenlet
def eat(name):
print('%s eat 1' %name)
g2.switch('egon')
print('%s eat 2' %name)
g2.switch()
def play(name):
print('%s play 1' %name)
g1.switch()
print('%s play 2' %name)
g1=greenlet(eat)
g2=greenlet(play)
g1.switch('egon')#可以在第一次switch时传入参数,以后都不需要
def eat(name):
print('%s eat 1' %name)
g2.switch('egon')
print('%s eat 2' %name)
g2.switch()
def play(name):
print('%s play 1' %name)
g1.switch()
print('%s play 2' %name)
g1=greenlet(eat)
g2=greenlet(play)
g1.switch('egon')#可以在第一次switch时传入参数,以后都不需要
Gevent
gevent模块:import gevent
创建对象并执行:g1=gevent.spawn(eat)
控制
对象.join方式:g1.join()
模块.joinall方式:gevent.joinall([g1,g2])
gevent阻塞:gevent.sleep(1)
其他阻塞识别
from gevent import monkey;monkey.patch_all()
示例代码1
import gevent
def eat(name):
print('%s eat 1' %name)
gevent.sleep(2)
print('%s eat 2' %name)
def play(name):
print('%s play 1' %name)
gevent.sleep(1)
print('%s play 2' %name)
g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,name='egon')
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print('主')
def eat(name):
print('%s eat 1' %name)
gevent.sleep(2)
print('%s eat 2' %name)
def play(name):
print('%s play 1' %name)
gevent.sleep(1)
print('%s play 2' %name)
g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,name='egon')
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print('主')
示例代码2
from gevent import spawn,joinall,monkey;monkey.patch_all()
import time
def task(pid):
"""
Some non-deterministic task
"""
time.sleep(0.5)
print('Task %s done' % pid)
def synchronous(): # 同步
for i in range(10):
task(i)
def asynchronous(): # 异步
g_l=[spawn(task,i) for i in range(10)]
joinall(g_l)
print('DONE')
if __name__ == '__main__':
print('Synchronous:')
synchronous()
print('Asynchronous:')
asynchronous()
# 上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。
# 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,
# 后者阻塞当前流程,并执行所有给定的greenlet任务。
执行流程只会在 所有greenlet执行完后才会继续向下走。
import time
def task(pid):
"""
Some non-deterministic task
"""
time.sleep(0.5)
print('Task %s done' % pid)
def synchronous(): # 同步
for i in range(10):
task(i)
def asynchronous(): # 异步
g_l=[spawn(task,i) for i in range(10)]
joinall(g_l)
print('DONE')
if __name__ == '__main__':
print('Synchronous:')
synchronous()
print('Asynchronous:')
asynchronous()
# 上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。
# 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,
# 后者阻塞当前流程,并执行所有给定的greenlet任务。
执行流程只会在 所有greenlet执行完后才会继续向下走。
多进程
multiprocess模块
创建进程
Process(group=None, target=None, name=None, args=(), kwargs={})
target:目标函数,不加括号
args=():通过元组将子进程需要参数传入
kwargs={‘k’:'v'}关键字传参
进程开启、控制和结束
P.start()开启子进程,调用Process类中的run()方法
P.join()控制主进程等待子线程结束后再往后执行,timeout可设置等待的最长时间
P.is_alive()判断子进程P是否还在运行
P.terminate()强制终止,可能导致P的子进程成为僵尸进程
实例代码
import time
from multiprocessing import Process
def f(name):
print('hello', name)
print('我是子进程')
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
time.sleep(1)
print('执行主进程的内容了')
from multiprocessing import Process
def f(name):
print('hello', name)
print('我是子进程')
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
time.sleep(1)
print('执行主进程的内容了')
进程依赖
守护进程
P.daemon=True放在p.start()之前
信号量
Semaphore模块:from multiprocessing importSemaphore
创建信号量对象:sem=Semaphore(4)
信号的获取,sem.acquire()
信号的释放,sem.release()
事件
Event模块:from multiprocessing import Event
e = Event()
e.set()
e.clear()
e.is_set()
进程间通信
进程间是数据隔离的,通过队列、通道实现进程间的通信
pipe
queue
Queue模块:from multiprocessing import Queue
q=Queue([maxsize])创建允许最大顶数为maxsize的共享队列
q.put(content)将数据放入栈中,栈满时发生阻塞;q.put_nowait(content)栈满时不会阻塞,但会报错
q.get()从栈中取出数据,栈空时阻塞;q.get_nowait()栈空时不会阻塞,但会报错
q.empty(),q.full()判断队列是否为空或满,满足条件是返回True
数据隔离和数据安全
进程间是数据隔离的,但这种隔离主要是指内存级别的,当多进程同时操作文件(硬盘)中的数据时还是存在数据安全问题,为了保证数据安全就要用到锁
锁
Lock模块:from multiprocessing import Process,Lock
创建锁对象:lock=Lock()
锁的获取,lock.acquire()
锁的释放,lock.release()
死锁
递归锁
Rlock
资源占用的问题
进程池
Pool模块
Pool模块:from multiprocessing import Pool
创建进程池对象:p=Pool(n),n为允许同时执行的最大进程数
进程开启:p.apply(work,args=(i,)),work为调用的函数,args传递参数;非阻塞执行为p.apply_async(func [, args [, kwargs]])
p.join()控制主进程在子进程结束后继续进行
示例代码
import os,time
from multiprocessing import Pool
def work(n):
print('%s run' %os.getpid())
time.sleep(3)
return n**2
if __name__ == '__main__':
p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
res_l=[]
for i in range(10):
res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞
# 但不管该任务是否存在阻塞,同步调用都会在原地等着
print(res_l)
from multiprocessing import Pool
def work(n):
print('%s run' %os.getpid())
time.sleep(3)
return n**2
if __name__ == '__main__':
p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
res_l=[]
for i in range(10):
res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞
# 但不管该任务是否存在阻塞,同步调用都会在原地等着
print(res_l)
ProcessPoolExecutor
ProcessPoolExecutor模块:from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
创建对象:executor=ProcessPoolExecutor(max_workers=3)
开启进程(异步):future=executor.submit(task,i)
关闭进程:executor.shutdown(True)
获取进程结果:future.result()
回调函数:add_done_callback(fn)
map简化:executor.map(task,range(1,12))
对等于:
# for i in range(11):
# future=executor.submit(task,i)
# for i in range(11):
# future=executor.submit(task,i)
多线程
threading模块
创建线程
Thread模块:from threading import Thread
Thread(group=None, target=None, name=None, args=(), kwargs={})
target:目标函数,不加括号
args=():通过元组将线程程需要参数传入
kwargs={‘k’:'v'}关键字传参
线程开启、控制和结束
t.start()开启子线程,调用Thread类中的run()方法
t.join()控制主线程等待子线程结束后再往后执行,timeout可设置等待的最长时间
t.is_alive()判断子进程t是否还在运行
t.enumerate()返回正在运行的子线程列表
示例代码
from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hello' %name)
if __name__ == '__main__':
t=Thread(target=sayhi,args=('egon',))
t.start()
print('主线程')
import time
def sayhi(name):
time.sleep(2)
print('%s say hello' %name)
if __name__ == '__main__':
t=Thread(target=sayhi,args=('egon',))
t.start()
print('主线程')
线程依赖
守护线程
t.setDaemon(True)放在t.start()之前
信号量
Semaphore模块:from threading import Thread,Semaphore
创建信号量对象:sem=Semaphore(4)
信号的获取,sem.acquire()
信号的释放,sem.release()
示例代码:
from threading import Thread,Semaphore
import threading
import time
# def func():
# if sm.acquire():
# print (threading.currentThread().getName() + ' get semaphore')
# time.sleep(2)
# sm.release()
def func():
sm.acquire()
print('%s get sm' %threading.current_thread().getName())
time.sleep(3)
sm.release()
if __name__ == '__main__':
sm=Semaphore(5)
for i in range(23):
t=Thread(target=func)
t.start()
import threading
import time
# def func():
# if sm.acquire():
# print (threading.currentThread().getName() + ' get semaphore')
# time.sleep(2)
# sm.release()
def func():
sm.acquire()
print('%s get sm' %threading.current_thread().getName())
time.sleep(3)
sm.release()
if __name__ == '__main__':
sm=Semaphore(5)
for i in range(23):
t=Thread(target=func)
t.start()
事件
Event模块:from threading import Thread, Event
e = Event()
e.set()
e.clear()
e.is_set()
条件condition
Condition模块:from threading import Thread,Condition
con=Condition()
con.acquire()
con.notify(int(inp))
con.release()
con.notify(int(inp))
con.release()
con.acquire()
con.wait()
con.release()
con.wait()
con.release()
数据共享和数据安全
进程间是数据共享的,当多线程同时操作数据时存在数据安全问题,为了保证数据安全就要用到锁
锁
Lock模块:from threading import Thread,Lock
创建锁对象:lock=Lock()
锁的获取,lock.acquire()
锁的释放,lock.release()
死锁
多线程或多进程争抢共同的资源,原因是设置的多把互斥锁不合理
递归锁
Rlock
科学家吃面:fork_lock = noodle_lock = RLock()
queue
线程中的Queue和进程的Queue不同,线程中更强调数据安全(通道+锁),而不是数据通信,它的导入是从独立的queue模块
Queue模块:from queue import Queue
q=Queue([maxsize])创建允许最大顶数为maxsize的共享队列
q.put(content)将数据放入栈中
q.get()从栈中取出数据
q.empty(),q.full()判断队列是否为空或满,满足条件是返回True
线程池
ThreadPoolExecutor
ThreadPoolExecutor模块:from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
创建对象:executor=ThreadPoolExecutor(max_workers=3)
开启线程(异步):future=executor.submit(task,i)
关闭线程:executor.shutdown(True)
获取线程结果:future.result()
回调函数:add_done_callback(fn)
map简化:executor.map(task,range(1,12))
对等于:
# for i in range(11):
# future=executor.submit(task,i)
# for i in range(11):
# future=executor.submit(task,i)
0 条评论
下一页
为你推荐
查看更多