前情回顾
1. 进程相关函数
os.getpid() 获取当前进程的PID os.getppid() 获取父进程PID os._exit() 退出进程 sys.exit() 退出进程2. 孤儿进程和僵尸进程
如何避免僵尸进程 【1】 使用wait,waitpid处理僵尸进程 【2】 创建二级子进程处理僵尸进程3. 聊天室程序
确定需求--> 基本的技术分析--》整体设计--》功能分析--》代码实现--》代码测试完善 ********************************************************************************************** 一.群聊聊天室 1.退出功能 【1】服务端 * 接收消息确定消息类型 * 将用户从字典移除 * 将退出消息发送给其他人 * 给该用户发送特殊标志 【2】客户端 * 输入quit退出 * 将退出请求发送给服务器然后结束进程 * recv接收服务器信息后退出 2.管理员消息二.multiprocessing 模块创建进程
1.流程特点 【1】将需要子进程执行的事件封装为函数 【2】通过模块的Process类创建进程对象,关联函数 【3】通过进程对象设置进程信息及属性 【4】通过进程对象调用Start启动进程 【5】通过进程对象调用join回收进程 2.基本接口使用 【1】Process() 功能:创建进程对象 参数:target(绑定要执行的目标函数)(必选) args 元组,用于给target函数位置传参 kwargs 字典,给target函数键值传参1 import multiprocessing as mp 2 from time import sleep 3 4 #编写进程函数 5 def fun(): 6 sleep(3) 7 print("子进程") 8 9 #创建进程对象 10 p= mp.Process(target=fun) 11 #启动进程 12 p.start() 13 14 sleep(2) 15 print("父进程") 16 #回收进程 17 p.join()
1 import multiprocessing as mp 2 from time import sleep 3 4 a = 1 5 6 #编写进程函数 7 def fun(): 8 sleep(3) 9 global a 10 print("a=",a) 11 a = 10000 12 print("子进程") 13 14 #创建进程对象 15 p= mp.Process(target=fun) 16 #启动进程 17 p.start() 18 19 sleep(2) 20 print("父进程") 21 #回收进程(处理僵尸进程) 22 p.join() 23 24 print("---------------") 25 print("parent a=",a)
1 from multiprocessing import Process 2 from time import sleep 3 import os 4 5 def th1(): 6 sleep(3) 7 print("茶饭") 8 print(os.getppid(),'----',os.getpid()) 9 def th2(): 10 sleep(2) 11 print("睡觉") 12 print(os.getppid(),'----',os.getpid()) 13 def th3(): 14 sleep(4) 15 print("xxx") 16 print(os.getppid(),'----',os.getpid()) 17 18 things = [th1,th2,th3] 19 processes = [] 20 for th in things: 21 p = Process(target = th) 22 processes.append(p)#用列表保存进程对象 23 p.start() 24 25 for i in processes: 26 i.join()
1 from multiprocessing import Process 2 from time import sleep 3 4 #带参数的进程函数 5 def Worker(sec,name): 6 for i in range(3): 7 sleep(sec) 8 print("I'm %s"%name) 9 print("I'm working...") 10 #3# p = Process(target = Worker,args = (2,"Levt")) 11 #4#p = Process(target = Worker,kwargs={'name':'Tom','sec':2})#两种方式 12 p = Process(target = Worker,args=(2,),kwargs={'name':'Tom'})#3种方式 13 p.start() 14 p.join()
1 from multiprocessing import Process 2 from time import sleep 3 4 #带参数的进程函数 5 def Worker(sec,name): 6 for i in range(3): 7 sleep(sec) 8 print("I'm %s"%name) 9 print("I'm working...") 10 #3# p = Process(target = Worker,args = (2,"Levt")) 11 #4#p = Process(target = Worker,kwargs={'name':'Tom','sec':2})#两种方式 12 p = Process(target = Worker,args=(2,),kwargs={'name':'Tom'})#3种方式 13 p.start() 14 p.join()
1 from multiprocessing import Process 2 from time import sleep,ctime 3 4 def tm(): 5 for i in range(3): 6 sleep(2) 7 print(ctime()) 8 9 # p = Process(target = tm) 10 p = Process(target = tm,name = "tedu") 11 12 p.daemon = True#主进程退出之后子进程接着退出,p.join()二选一 13 14 p.start() 15 print("Process name",p.name)#进程名称 16 print("Process pid",p.pid)#对应子进程的PID 17 print("alive:",p.is_alive())#查看子进程是否在声明周期 18 19 20 p.join(2)#如果daemon设置成True通常就不使用join 21 print("------------------------") 22
1 from multiprocessing import Process 2 import time 3 4 #自定义进程类 5 class ClockProcess(Process): 6 def __init__(self,value): 7 self.value = value 8 super().__init__() 9 10 #重写run 方法 11 def run(self): 12 for i in range(5): 13 print("The time is %s"%time.ctime()) 14 time.sleep(self.value) 15 16 #创建进程对象 17 p = ClockProcess(2) 18 #启动新的进程 19 p.start() 20 p.join()
Pool(process)
功能:创建进程池对象 参数:指定进程数量,默认根据系统自动判断 【2】事件加入进程池队列执行 pool.apply_async(func,args,kwds) 功能:使用进程池执行func事件 参数:func 事件函数 args 元组 给func按位置传参 kwds 字典 给func按键值传参 返回值:返回函数事件对象 【3】关闭进程池 pool.close() 【4】回收进程池中的进程 pool.join() 【5】通过map添加进程池事件 pool.map(func,iter) map执行自动调用func,参数为:执行iter的可迭代 功能:将要做的事件加入进程池 参数:func 事件函数 iter 迭代对象,将迭代值传给func 返回值:得到函数返回值列表1 from multiprocessing import Pool 2 from time import sleep,ctime 3 4 #进程事件 5 def worker(msg): 6 sleep(2) 7 print(msg)#参数不同 8 return ctime() 9 #创建进程池 10 pool = Pool() 11 # pool = Pool(4)#同事启动四个进程 12 13 result = [] 14 #向进程池添加事件 15 for i in range(10): 16 msg = "hello %d"%i 17 r =pool.apply_async(func=worker,args=(msg,))#传给形参-->(msg) 18 result.append(r)#存储事件对象 19 20 #关闭进程池 21 pool.close() 22 #回收进程池 23 pool.join() 24 for i in result: 25 print(i.get())#通过对象get()可以获取事件函数返回值
1 from multiprocessing import Pool 2 import time 3 4 def fun(n): 5 time.sleep(1) 6 return n * n 7 pool = Pool() 8 #使用map将事件放入进程池 9 r = pool.map(fun,[1,2,3,4,5]) 10 pool.close() 11 pool.join() 12 13 print("结果",r) 14 #3s
四.进程间通信(IPC) 1.必要性:进程间空间独立,资源不共享, 此时在需要进程间数据传输时就需要特定的手段进行数据通信 2.进程间通信方法: 管道 消息队列 共享内存 信号 信号量 套接字 3.管道通信(Pipe) [1]通信原理:在内存中开辟管道空间,生成管道操作对象, 多个进程使用同一个管道对象进行读写即可实现通信 [2]实现方法: from multiprocessing import Pipe fd1,fd2 = Pipe(duplex = True) 功能:创建管道 参数:默认True表示双向管道(都可读可写) 如果为False 表示单向管道 返回值:表示管道两端从的读写对象 如果是双向管道均可读写 如果是单向管道fd1只读 fd2只写 fd.recv() 功能:从管道获取内容 返回值:获取到的数据 fd.send(data) 功能:向管道写入内容 参数:要写入的数据
1 from multiprocessing import Process,Pipe 2 import os,time 3 4 #创建管道对象 5 fd1,fd2 = Pipe() 6 7 def fun(name): 8 time.sleep(3) 9 #向管道写入内容 10 fd1.send(name) 11 12 13 jobs =[] 14 for i in range(5): 15 p = Process(target = fun,args = (i,)) 16 jobs.append(p) 17 p.start() 18 19 20 for i in range(5): 21 #读取管道内容 22 data = fd2.recv() 23 print(data) 24 25 26 for i in jobs: 27 i.join()
1 from multiprocessing import Process,Pipe 2 import os,time 3 4 5 fd1,fd2 = Pipe() 6 7 def fun(name): 8 time.sleep(3) 9 #向管道写入内容 10 fd1.send({ name:os.getpid()}) 11 12 jobs =[] 13 for i in range(5): 14 p = Process(target = fun,args = (i,)) 15 jobs.append(p) 16 p.start() 17 18 19 for i in range(5): 20 #读取管道内容 21 data = fd2.recv() 22 print(data) 23 24 25 for i in jobs: 26 i.join()
1 from multiprocessing import Process,Pipe 2 import os,time 3 4 5 fd1,fd2 = Pipe(False) 6 7 def fun(name): 8 time.sleep(3) 9 #向管道写入内容 10 fd1.send({ name:os.getpid()}) 11 12 jobs =[] 13 for i in range(5): 14 p = Process(target = fun,args = (i,)) 15 jobs.append(p) 16 p.start() 17 18 19 for i in range(5): 20 #读取管道内容 21 data = fd2.recv() 22 print(data) 23 24 25 for i in jobs: 26 i.join()
1 from multiprocessing import Queue,Process 2 from time import sleep 3 4 #创建消息对列 5 q = Queue(3) 6 #写 7 def fun1(): 8 for i in range(3): 9 sleep(1) 10 q.put((1,3)) 11 #读 12 def fun2(): 13 for i in range(4): 14 a,b = q.get(timeout=3) 15 print("sum=",a+b) 16 17 p1 = Process(target=fun1) 18 p2 = Process(target=fun2) 19 p1.start() 20 p2.start() 21 p1.join() 22 p2.join()
1 import os 2 3 filename = 'xl.txt' 4 5 #获取文件大小 6 size = os.path.getsize(filename) 7 8 # 父子进程共用一个文件对象偏移量会相互影响 9 # f = open(filename,'rb') 10 11 pid = os.fork() 12 if pid < 0: 13 print("Error") 14 elif pid == 0: 15 #复制上半部分 16 f = open(filename,'rb') 17 fw = open("1",'wb') 18 n = size // 2 19 while True: 20 if n < 1024: 21 data = f.read(n) 22 fw.write(data) 23 break 24 data = f.read(1024) 25 fw.write(data) 26 n -= 1024 27 f.close() 28 fw.close() 29 else: 30 #复制下半部分 31 f = open(filename,'rb') 32 fw = open("2",'wb') 33 f.seek(size//2,0) 34 while True: 35 data = f.read(1024) 36 if not data: 37 break 38 fw.write(data) 39 f.close() 40 fw.close() 41 42