进程、线程、协程
2021-02-23 10:13:17 27 举报
AI智能生成
python相关
作者其他创作
大纲/内容
进程
定义及相关概念
在操作系统中运行的程序就是一个进程
是操作系统调度的
操作系统资源分配的最小单位
一个进程一个pid(今晨ID)
并发
多个程序同时在一个cpu上轮流执行
并行
多个程序同时在多个cup上执行
同步
在做事情A的时候,又来了一个事情B,必须等到事情B执行完成,才能继续做事情A
异步
在做事情A的时候,来了一个事情B,不需要等待事情B的结果,可以继续做事情A
阻塞
cpu不工作
非阻塞
cpu工作
进程的三状态图
就绪
运行
阻塞
程序开始运行进入就绪队列——》等待被执行——》运行——》运行时间片到后——》就绪
当程序在正在运行——》遇到IO操作——》进入阻塞状态——》阻塞结束后——》进入就绪队列——》等待被运行
进程的调度算法
短作业优先
先来先服务
多级反馈算法
多个任务队列,优先级从高到低
新来的任务,优先级最高
每一个新来的任务都会立即获得一个时间片
执行完一个时间片后,会下降到下一级队列中
总是优先级高的任务执行完成后,在执行优先级低的任务
优先级越高时间片越短
进程模块multiprocessing
开启一个进程
from multiprocessing import Process
import time
def func(i):
print('start:',i)
time.sleep(1)
print('end:',i)
if __name__ == '__main__':
for i in range(10):
p = Process(target = func,args=(i,))
p.start()
import time
def func(i):
print('start:',i)
time.sleep(1)
print('end:',i)
if __name__ == '__main__':
for i in range(10):
p = Process(target = func,args=(i,))
p.start()
join的用法
from multiprocessing import Process
import time
def func(i):
print('start:',i)
time.sleep(2)
print('end:',i)
if __name__ == '__main__':
p1 = Process(target=func,args=(1,))
p2 = Process(target=func, args=(2,))
p1.start()
p2.start()
p1.join() # 等待进程执行完成后执行下面的代码
print('123')
import time
def func(i):
print('start:',i)
time.sleep(2)
print('end:',i)
if __name__ == '__main__':
p1 = Process(target=func,args=(1,))
p2 = Process(target=func, args=(2,))
p1.start()
p2.start()
p1.join() # 等待进程执行完成后执行下面的代码
print('123')
等待子进程执行完后,在执行主进程内的内容
基于进程实现socket的并发
server端
import socket
from multiprocessing import Process
def talk(conn):
while True:
msg = conn.recv(1024).decode('utf-8')
MSG = msg.upper()
conn.send(MSG.encode('utf-8'))
if __name__ == '__main__':
sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.listen()
while True:
conn,_ = sk.accept()
p = Process(target=talk,args=(conn,))
p.start()
from multiprocessing import Process
def talk(conn):
while True:
msg = conn.recv(1024).decode('utf-8')
MSG = msg.upper()
conn.send(MSG.encode('utf-8'))
if __name__ == '__main__':
sk = socket.socket()
sk.bind(('127.0.0.1',9000))
sk.listen()
while True:
conn,_ = sk.accept()
p = Process(target=talk,args=(conn,))
p.start()
开启进程的另一种方式
from multiprocessing import Process
class MyProcess(Process):
def __init__(self,name):
self.name = name
super().__init__()
def run(self):
print(f'执行了{self.name}')
if __name__ == '__main__':
mp = MyProcess('alex')
mp.start()
class MyProcess(Process):
def __init__(self,name):
self.name = name
super().__init__()
def run(self):
print(f'执行了{self.name}')
if __name__ == '__main__':
mp = MyProcess('alex')
mp.start()
继承Process类,并实现Process类的同名方法:run()
进程的其他方法
daemon = True 守护进程
定义
守护进程会等待主进程的代码执行结束之后再结束,而不是等待整个主进程结束.
daemon = True 表示设置子进程是一个守护进程
主进程的代码什么时候结束,守护进程就什么时候结束,和其他子进程的执行进度无关
主进程会等待所有的子进程结束,是为了回收子进程的资源
from multiprocessing import Process
import time
import os
def func(i):
print('start:',i)
time.sleep(2)
print('end:',i)
if __name__ == '__main__':
p = Process(target=func,args=(1,))
p1 = Process(target=func, args=(2,))
p.daemon=True
print(11111)
p.start()
p1.start()
import time
import os
def func(i):
print('start:',i)
time.sleep(2)
print('end:',i)
if __name__ == '__main__':
p = Process(target=func,args=(1,))
p1 = Process(target=func, args=(2,))
p.daemon=True
print(11111)
p.start()
p1.start()
结束进程 terminate
from multiprocessing import Process
import time
import os
def func(i):
print('start:',i)
time.sleep(2)
print('end:',i)
print(os.getpid()) #子进程ID
print(os.getppid()) #父进程ID
if __name__ == '__main__':
p = Process(target=func,args=(1,))
p.start()
print(123)
p.terminate() # 结束一个进程
print(456)
import time
import os
def func(i):
print('start:',i)
time.sleep(2)
print('end:',i)
print(os.getpid()) #子进程ID
print(os.getppid()) #父进程ID
if __name__ == '__main__':
p = Process(target=func,args=(1,))
p.start()
print(123)
p.terminate() # 结束一个进程
print(456)
is_alive 查看进程是否存活,返回True或False
Manage 数据共享
from multiprocessing import Process,Manager,Lock
def change_dic(dic,lock):
with lock:
dic['count'] -= 1
if __name__ == '__main__':
# m = Manager()
with Manager() as m:
lock = Lock()
dic = m.dict({'count': 100})
# dic = {'count': 100}
p_l = []
for i in range(100):
p = Process(target=change_dic,args=(dic,lock))
p.start()
p_l.append(p)
for p in p_l : p.join()
print(dic)
def change_dic(dic,lock):
with lock:
dic['count'] -= 1
if __name__ == '__main__':
# m = Manager()
with Manager() as m:
lock = Lock()
dic = m.dict({'count': 100})
# dic = {'count': 100}
p_l = []
for i in range(100):
p = Process(target=change_dic,args=(dic,lock))
p.start()
p_l.append(p)
for p in p_l : p.join()
print(dic)
线程
定义
线程是进程中的一个单位,不能脱离进程单独存在
线程是操作系统(CPU)调度的最小单位
可以利用多核
线程之间数据共享
数据不安全
线程不能被强制关闭,没有terminate方法
全局解释器锁 GIL锁
同一个进程中的多个线程,同一时间只有一个线程被CPU调度
节省了IO操作的时间,而非CPU计算的时间
线程模块 Thread
线程的开启
#############线程模块##############
from threading import Thread
import time
def func(i):
print('start:',i)
time.sleep(1)
print('end:',i)
if __name__ == '__main__':
t_lst = []
for i in range(10):
t = Thread(target=func,args=(i,))
t.start()
t_lst.append(t)
for t in t_lst:
t.join()
from threading import Thread
import time
def func(i):
print('start:',i)
time.sleep(1)
print('end:',i)
if __name__ == '__main__':
t_lst = []
for i in range(10):
t = Thread(target=func,args=(i,))
t.start()
t_lst.append(t)
for t in t_lst:
t.join()
面向对象开启一个线程
from threading import Thread
class MyThread(Thread):
def __init__(self,a,b):
self.a = a
self.b = b
super().__init__()
def run(self):
print(self.ident)
t = MyThread(1,2)
t.start()
class MyThread(Thread):
def __init__(self,a,b):
self.a = a
self.b = b
super().__init__()
def run(self):
print(self.ident)
t = MyThread(1,2)
t.start()
线程的其他方法
current_thread
print(current_thread().ident) #获取线程ID
enumerate
print('线程的对象列表:%s'%enumerate())
active_count
print('线程的对象列表数量:%s'%active_count())
守护线程
主线程会等待子线程结果而结束,是为了回收资源
守护线程会等待主线程(包括其他子线程)结束之后才结束
守护线程的结束原理,主进程结束之后,守护线程和其他所有线程资源一起被回收掉
线程的单例模式
线程锁
class A:
from threading import Lock
__instance = None
lock = Lock()
def __new__(cls):
with cls.lock:
if not cls.__instance:
cls.__instance = super().__new__(cls)
return cls.__instance
from threading import Lock
__instance = None
lock = Lock()
def __new__(cls):
with cls.lock:
if not cls.__instance:
cls.__instance = super().__new__(cls)
return cls.__instance
协程
定义
协程就是一个线程
多个任务在一条线程上面来回切换
规避IO的两个模块
gevent
利用了 greenlet 底层模块完成的切换—自动规避io的功能
from gevent import monkey
monkey.patch_all()
import gevent
import time
import random
def func(i):
print('start:', i)
time.sleep(random.randint(1, 4))
print('end:', i)
g = gevent.spawn(func,1)
g2 = gevent.spawn(func,3)
g3 = gevent.spawn(func,4)
gevent.joinall((g,g2,g3))
monkey.patch_all()
import gevent
import time
import random
def func(i):
print('start:', i)
time.sleep(random.randint(1, 4))
print('end:', i)
g = gevent.spawn(func,1)
g2 = gevent.spawn(func,3)
g3 = gevent.spawn(func,4)
gevent.joinall((g,g2,g3))
asyncio
利用了 yield 底层语法完成的切换—自动规避io的功能
import asyncio
async def func(i):
print('start :', i)
await asyncio.sleep(1)
print('end :', i*10)
return i*10
loop = asyncio.get_event_loop()
loop.run_until_complete(func(1))
async def func(i):
print('start :', i)
await asyncio.sleep(1)
print('end :', i*10)
return i*10
loop = asyncio.get_event_loop()
loop.run_until_complete(func(1))
区别
进程
计算机资源分配最小单位,数据不安全,可以利用多核,操作系统级别,开销大
线程
计算机(CPU)调度的最小单位,数据不安全,不能利用多核,操作系统级别,开销小
协程
计算机(CPU)调度的最小单位,数据安全(GIL锁:全局解释器锁),数据安全,不能利用多核,用户级别,开销更小
操作系统相关
多道操作系统
优缺点
用户体验差
cpu使用率高
定义:
一个程序遇到IO操作就把cpu让给其他程序
分时操作系统
定义:
- 把时间分成很小的一段,每一段时间都是一个时间,每一个程序在一个时间片上运行片,当时间片到后,就轮到下一个程序执行自己的时间片
优缺点:
先来先服务FCFS
短作业优先
分布式操作系统
celery python分布式框架
同步、异步、阻塞相关概念
同步阻塞
调用一个函数,需要等待函数的执行结果,并且CPU不工作
同步非阻塞
调用一个函数,需要等待函数的执行结果,CPU工作
异步阻塞
调用一个函数,不需要等待函数的执行结果,CPU不工作
异步非阻塞
调用一个函数,不需要等待函数的执行结果,CPU工作
锁
进程锁
互斥锁
with lock: # 代替acquire和release 并且在此基础上做一些异常处理,保证即便一个进程的代码出错退出了,也会归还钥匙
from multiprocessing import Lock
递归锁
from multiprocessing import Rlock
可以多次acquire
列子
抢票
from multiprocessing import Process,Lock
PATH = r'H:\Python 学习文件\网络编程复习\08、网络编程复习\ticket.txt'
import json
class BuyTicket:
@classmethod
def search(self,path,i):
with open(path) as f:
ret = json.load(f)
print(f"{i}查询剩余票数为:{ret['count']}")
@classmethod
def buy_ticket(self,path,i):
with open(path) as f:
ret = json.load(f)
print(ret)
if ret['count'] > 0:
ret['count'] -= 1
# print(ret)
with open(path,mode='w') as f:
json.dump(ret,f)
print(f'{i} :购票成功')
def get_ticket(self,lock,path,i):
BuyTicket.search(path,i)
lock.acquire()
BuyTicket.buy_ticket(path,i)
lock.release()
if __name__ == '__main__':
lock = Lock()
b = BuyTicket()
for i in range(10):
p = Process(target=b.get_ticket,args=(lock,PATH,i))
p.start()
from multiprocessing import Process,Lock
PATH = r'H:\Python 学习文件\网络编程复习\08、网络编程复习\ticket.txt'
import json
class BuyTicket:
@classmethod
def search(self,path,i):
with open(path) as f:
ret = json.load(f)
print(f"{i}查询剩余票数为:{ret['count']}")
@classmethod
def buy_ticket(self,path,i):
with open(path) as f:
ret = json.load(f)
print(ret)
if ret['count'] > 0:
ret['count'] -= 1
# print(ret)
with open(path,mode='w') as f:
json.dump(ret,f)
print(f'{i} :购票成功')
def get_ticket(self,lock,path,i):
BuyTicket.search(path,i)
lock.acquire()
BuyTicket.buy_ticket(path,i)
lock.release()
if __name__ == '__main__':
lock = Lock()
b = BuyTicket()
for i in range(10):
p = Process(target=b.get_ticket,args=(lock,PATH,i))
p.start()
线程锁
class A:
from threading import Lock
__instance = None
lock = Lock()
def __new__(cls):
with cls.lock:
if not cls.__instance:
cls.__instance = super().__new__(cls)
return cls.__instance
from threading import Lock
__instance = None
lock = Lock()
def __new__(cls):
with cls.lock:
if not cls.__instance:
cls.__instance = super().__new__(cls)
return cls.__instance
递归锁
优点
解决死锁有奇效
缺点
效率低
在同一线程中,可以被acquire多次
互斥锁
优点
效率高
缺点
容易出现死锁
死锁的定义
-多把锁,在多线程中间交替使用
队列
进程
进程队列
进程之间数据隔离
进程之间通信 IPC
是基于socket与pickle来实现的文件通信
定义
先进先出
from queue import Queue
import time
q = Queue() #可加参数,设置队列的长度
for i in range(10,1,-1):
q.put(i)
print(f"放入{i}")
while True:
ret = q.get()
if ret:
print(ret)
time.sleep(1)
else:
break
import time
q = Queue() #可加参数,设置队列的长度
for i in range(10,1,-1):
q.put(i)
print(f"放入{i}")
while True:
ret = q.get()
if ret:
print(ret)
time.sleep(1)
else:
break
先进后出
from queue import LifoQueue
import time
q = LifoQueue()
for i in range(10,1,-1):
q.put(i)
print(f"放入{i}")
while True:
ret = q.get()
if ret:
print(ret)
time.sleep(1)
else:
break
import time
q = LifoQueue()
for i in range(10,1,-1):
q.put(i)
print(f"放入{i}")
while True:
ret = q.get()
if ret:
print(ret)
time.sleep(1)
else:
break
优先级队列
from queue import PriorityQueue # 优先级队列
priq = PriorityQueue()
priq.put((2,'alex'))
priq.put((1,'wusir'))
priq.put((0,'太白'))
print(priq.get())
print(priq.get())
print(priq.get())
priq = PriorityQueue()
priq.put((2,'alex'))
priq.put((1,'wusir'))
priq.put((0,'太白'))
print(priq.get())
print(priq.get())
print(priq.get())
生产者消费者模型
def consumer(q,name): # 消费者:通常取到数据之后还要进行某些操作
while True:
food = q.get()
if food:
print('%s吃了%s'%(name,food))
else:break
def producer(q,name,food): # 生产者:通常在放数据之前需要先通过某些代码来获取数据
for i in range(10):
foodi = '%s%s'%(food,i)
print('%s生产了%s'%(name,foodi))
time.sleep(random.random())
q.put(foodi)
if __name__ == '__main__':
q = Queue()
c1 = Process(target=consumer,args=(q,'alex'))
c2 = Process(target=consumer,args=(q,'alex'))
p1 = Process(target=producer,args=(q,'大壮','泔水'))
p2 = Process(target=producer,args=(q,'b哥','香蕉'))
c1.start()
c2.start()
p1.start()
p2.start()
p1.join()
p2.join()
q.put(None)
q.put(None)
while True:
food = q.get()
if food:
print('%s吃了%s'%(name,food))
else:break
def producer(q,name,food): # 生产者:通常在放数据之前需要先通过某些代码来获取数据
for i in range(10):
foodi = '%s%s'%(food,i)
print('%s生产了%s'%(name,foodi))
time.sleep(random.random())
q.put(foodi)
if __name__ == '__main__':
q = Queue()
c1 = Process(target=consumer,args=(q,'alex'))
c2 = Process(target=consumer,args=(q,'alex'))
p1 = Process(target=producer,args=(q,'大壮','泔水'))
p2 = Process(target=producer,args=(q,'b哥','香蕉'))
c1.start()
c2.start()
p1.start()
p2.start()
p1.join()
p2.join()
q.put(None)
q.put(None)
0 条评论
下一页