当前位置: 首页 > news >正文

多进程

import time
from concurrent.futures import ProcessPoolExecutor, TimeoutError
from pydantic import BaseModel
import multiprocessing as mp
import os
import signalclass SharedData(BaseModel):value: int = 0def worker(shared_value, data_dict):# 真实场景中的任务,没有循环检查stop_eventfor i in range(30):data_dict['value'] = ishared_value.value = i * 10print(f'子进程 data_dict:{data_dict["value"]} shared_value:{shared_value.value}')time.sleep(1)return data_dict['value'] + shared_value.valuedef init_worker():"""设置子进程的信号处理"""signal.signal(signal.SIGTERM, lambda sig, frame: os._exit(0))if __name__ == '__main__':manager = mp.Manager()shared_value = manager.Value('i', 0)data_dict = manager.dict({'value': 0})with ProcessPoolExecutor(initializer=init_worker) as executor:future = executor.submit(worker, shared_value, data_dict)try:# 主进程监控5秒for count in range(5):print(f'\n第 {count + 1} 次检查 - 状态: {"运行中" if future.running() else "完成" if future.done() else "等待"}')print(f'主进程 shared_value:{shared_value.value} data_dict:{data_dict["value"]}')time.sleep(1)# 检查是否完成if not future.done():print("\n任务超时,强制终止子进程...")# 获取子进程PID并发送终止信号for pid, process in executor._processes.items():if process.is_alive():os.kill(pid, signal.SIGTERM)raise TimeoutError("任务超时")result = future.result()print(f"Result: {result}")except TimeoutError:print("子进程已被强制终止")future.cancel()finally:executor.shutdown(wait=True)print(f"最终值 - shared: {shared_value.value}, dict: {data_dict['value']}")

  

 

 

 

import time
from concurrent.futures import ProcessPoolExecutor, TimeoutError
from pydantic import BaseModel
import multiprocessing as mp
import signal
import osclass SharedData(BaseModel):value: int = 0def worker(shared_value, data_dict, stop_event):print('子进程PID', os.getpid())i = 0while i < 30 and not stop_event.is_set():# data_dict['value'] = i# shared_value.value = i * 10print(f'子进程 data_dict:{data_dict["value"]} shared_value:{shared_value.value}')i += 1time.sleep(1)return data_dict['value'] + shared_value.valueif __name__ == '__main__':print('主进程PID', os.getpid())manager = mp.Manager()shared_value = manager.Value('i', 0)data_dict = manager.dict({'value': 0})stop_event = manager.Event()with ProcessPoolExecutor() as executor:future = executor.submit(worker, shared_value, data_dict, stop_event)try:# 主进程监控5秒for _ in range(5):print("future.running() =>", future.running())print(f'主进程 shared_value:{shared_value.value} data_dict:{data_dict["value"]}')time.sleep(1)# 检查子进程是否完成result = future.result(timeout=0.1)print(f"Result: {result}")except TimeoutError:print("任务超时,通知子进程停止...")stop_event.set()  # 通知子进程优雅停止try:# 给子进程一些时间进行清理result = future.result(timeout=0.1)print(f"子进程已优雅停止,Result: {result}")except TimeoutError:print("子进程未及时响应,强制取消...")future.cancel()# 正常关闭执行器,等待所有进程完成executor.shutdown(wait=True)print("future.running() =>", future.running())print(f"最终值 - shared: {shared_value.value}, dict: {data_dict['value']}")

  

 

 

import time
from concurrent.futures import ProcessPoolExecutor, TimeoutError
from pydantic import BaseModel
import multiprocessing as mp
import os
import signalclass SharedData(BaseModel):value: int = 0# 全局变量存储executor
executor = Nonedef worker(shared_value, data_dict):# 真实场景中的任务,没有循环检查stop_eventfor i in range(30):# data_dict['value'] = i# shared_value.value = i * 10print(f'子进程 data_dict:{data_dict["value"]} shared_value:{shared_value.value}')time.sleep(1)return data_dict['value'] + shared_value.valuedef init_worker():"""设置子进程的信号处理"""signal.signal(signal.SIGTERM, lambda sig, frame: os._exit(0))def create_executor():"""创建并返回executor实例"""global executorif executor is None:executor = ProcessPoolExecutor(initializer=init_worker)return executordef shutdown_executor(wait=True):"""关闭executor"""global executorif executor is not None:executor.shutdown(wait=wait)executor = Nonedef submit_task(shared_value, data_dict):"""提交任务到executor"""global executorif executor is None:create_executor()return executor.submit(worker, shared_value, data_dict)def check_process_status(future):"""检查子进程运行状态"""try:# 检查future状态if future.done():if future.cancelled():print("子进程任务已被取消")return "cancelled"else:try:result = future.result(timeout=0)print(f"子进程已完成,结果: {result}")return "completed"except Exception as e:print(f"子进程执行出错: {e}")return "error"else:print("子进程仍在运行中...")return "running"except TimeoutError:print("检查状态超时")return "timeout"if __name__ == '__main__':manager = mp.Manager()shared_value = manager.Value('i', 0)data_dict = manager.dict({'value': 0})try:# 在程序不同地方使用executorcreate_executor()future = submit_task(shared_value, data_dict)# 主进程监控5秒for count in range(5):data_dict['value'] = countshared_value.value = count * 10time.sleep(1)# 在另一个地方检查状态status = check_process_status(future)# 如果需要终止if status == "running":print("\n任务超时,强制终止子进程...")# 获取子进程PID并发送终止信号for pid, process in executor._processes.items():if process.is_alive():os.kill(pid, signal.SIGTERM)raise TimeoutError("任务超时")result = future.result()print(f"Result: {result}")except TimeoutError:print("子进程已被强制终止")if 'future' in locals():future.cancel()finally:# 在程序结束前检查子进程状态if 'future' in locals():final_status = check_process_status(future)print(f"最终任务状态: {final_status}")# 关闭executorshutdown_executor(wait=True)print(f"最终值 - shared: {shared_value.value}, dict: {data_dict['value']}")

  

http://www.wxhsa.cn/company.asp?id=6935

相关文章:

  • 93. 递归实现组合型枚举
  • Sort方法学习(伪代码记录)
  • 深入解析:【每日一问】运算放大器与比较器有什么区别?
  • 9.17支配对问题专题总结
  • 记录知识
  • AT_agc058_b [AGC058B] Adjacent Chmax
  • Jenkins CVE-2018-1000600漏洞利用与SSRF攻击分析
  • NOIP 集训日记(学术)
  • linux中mysql如何远程连接
  • 详细介绍:Python:OpenCV 教程——从传统视觉到深度学习:YOLOv8 与 OpenCV DNN 模块协同实现工业缺陷检测
  • 深入解析:PYcharm——pyqt音乐播放器
  • Day02
  • 专题:Python实现贝叶斯线性回归与MCMC采样数据可视化分析2实例|附代码数据
  • 威联通NAS如何导入本地docker镜像
  • 【学习笔记】拉格朗日插值
  • 一种将离散化状态方程映射为并行多处理器计算机的方法
  • 基本数据类型题目
  • 一种基于动作指令交互的动态活体检测技术,提升人脸识别安全性
  • [系统] Windows 已有office版本和visio不兼容的解决方案
  • CF 2127F Hamed and AghaBalaSar
  • AT_agc055_b [AGC055B] ABC Supremacy
  • “Sequential Thinking MCP Server 和codex等AI工具本身任务拆解功能对比
  • 基于错误xsleak 悬空标记 运用css利用帧计数 -- Pure leak ASIS CTF 2025
  • 网易伏羲:当算法遇见社交,解码游戏世界的连接密码
  • 在 CentOS 7 上安装Nginx和配置http代理
  • 题解:P2624 [HNOI2008] 明明的烦恼
  • 在AI技术快速实现创想的时代,挖掘新需求成为核心竞争力——某知名DevOps学习平台需求洞察
  • Windows Powershell 获取版本version
  • XXL-JOB (1)
  • 记录---Vue3对接UE,通过MQTT完成通讯