import time from concurrent.futures import ProcessPoolExecutor, TimeoutError from pydantic import BaseModel import multiprocessing as mp import os import signalclass SharedData(BaseModel):value: int = 0def worker(shared_value, data_dict):# 真实场景中的任务,没有循环检查stop_eventfor i in range(30):data_dict['value'] = ishared_value.value = i * 10print(f'子进程 data_dict:{data_dict["value"]} shared_value:{shared_value.value}')time.sleep(1)return data_dict['value'] + shared_value.valuedef init_worker():"""设置子进程的信号处理"""signal.signal(signal.SIGTERM, lambda sig, frame: os._exit(0))if __name__ == '__main__':manager = mp.Manager()shared_value = manager.Value('i', 0)data_dict = manager.dict({'value': 0})with ProcessPoolExecutor(initializer=init_worker) as executor:future = executor.submit(worker, shared_value, data_dict)try:# 主进程监控5秒for count in range(5):print(f'\n第 {count + 1} 次检查 - 状态: {"运行中" if future.running() else "完成" if future.done() else "等待"}')print(f'主进程 shared_value:{shared_value.value} data_dict:{data_dict["value"]}')time.sleep(1)# 检查是否完成if not future.done():print("\n任务超时,强制终止子进程...")# 获取子进程PID并发送终止信号for pid, process in executor._processes.items():if process.is_alive():os.kill(pid, signal.SIGTERM)raise TimeoutError("任务超时")result = future.result()print(f"Result: {result}")except TimeoutError:print("子进程已被强制终止")future.cancel()finally:executor.shutdown(wait=True)print(f"最终值 - shared: {shared_value.value}, dict: {data_dict['value']}")
import time from concurrent.futures import ProcessPoolExecutor, TimeoutError from pydantic import BaseModel import multiprocessing as mp import signal import osclass SharedData(BaseModel):value: int = 0def worker(shared_value, data_dict, stop_event):print('子进程PID', os.getpid())i = 0while i < 30 and not stop_event.is_set():# data_dict['value'] = i# shared_value.value = i * 10print(f'子进程 data_dict:{data_dict["value"]} shared_value:{shared_value.value}')i += 1time.sleep(1)return data_dict['value'] + shared_value.valueif __name__ == '__main__':print('主进程PID', os.getpid())manager = mp.Manager()shared_value = manager.Value('i', 0)data_dict = manager.dict({'value': 0})stop_event = manager.Event()with ProcessPoolExecutor() as executor:future = executor.submit(worker, shared_value, data_dict, stop_event)try:# 主进程监控5秒for _ in range(5):print("future.running() =>", future.running())print(f'主进程 shared_value:{shared_value.value} data_dict:{data_dict["value"]}')time.sleep(1)# 检查子进程是否完成result = future.result(timeout=0.1)print(f"Result: {result}")except TimeoutError:print("任务超时,通知子进程停止...")stop_event.set() # 通知子进程优雅停止try:# 给子进程一些时间进行清理result = future.result(timeout=0.1)print(f"子进程已优雅停止,Result: {result}")except TimeoutError:print("子进程未及时响应,强制取消...")future.cancel()# 正常关闭执行器,等待所有进程完成executor.shutdown(wait=True)print("future.running() =>", future.running())print(f"最终值 - shared: {shared_value.value}, dict: {data_dict['value']}")
import time from concurrent.futures import ProcessPoolExecutor, TimeoutError from pydantic import BaseModel import multiprocessing as mp import os import signalclass SharedData(BaseModel):value: int = 0# 全局变量存储executor executor = Nonedef worker(shared_value, data_dict):# 真实场景中的任务,没有循环检查stop_eventfor i in range(30):# data_dict['value'] = i# shared_value.value = i * 10print(f'子进程 data_dict:{data_dict["value"]} shared_value:{shared_value.value}')time.sleep(1)return data_dict['value'] + shared_value.valuedef init_worker():"""设置子进程的信号处理"""signal.signal(signal.SIGTERM, lambda sig, frame: os._exit(0))def create_executor():"""创建并返回executor实例"""global executorif executor is None:executor = ProcessPoolExecutor(initializer=init_worker)return executordef shutdown_executor(wait=True):"""关闭executor"""global executorif executor is not None:executor.shutdown(wait=wait)executor = Nonedef submit_task(shared_value, data_dict):"""提交任务到executor"""global executorif executor is None:create_executor()return executor.submit(worker, shared_value, data_dict)def check_process_status(future):"""检查子进程运行状态"""try:# 检查future状态if future.done():if future.cancelled():print("子进程任务已被取消")return "cancelled"else:try:result = future.result(timeout=0)print(f"子进程已完成,结果: {result}")return "completed"except Exception as e:print(f"子进程执行出错: {e}")return "error"else:print("子进程仍在运行中...")return "running"except TimeoutError:print("检查状态超时")return "timeout"if __name__ == '__main__':manager = mp.Manager()shared_value = manager.Value('i', 0)data_dict = manager.dict({'value': 0})try:# 在程序不同地方使用executorcreate_executor()future = submit_task(shared_value, data_dict)# 主进程监控5秒for count in range(5):data_dict['value'] = countshared_value.value = count * 10time.sleep(1)# 在另一个地方检查状态status = check_process_status(future)# 如果需要终止if status == "running":print("\n任务超时,强制终止子进程...")# 获取子进程PID并发送终止信号for pid, process in executor._processes.items():if process.is_alive():os.kill(pid, signal.SIGTERM)raise TimeoutError("任务超时")result = future.result()print(f"Result: {result}")except TimeoutError:print("子进程已被强制终止")if 'future' in locals():future.cancel()finally:# 在程序结束前检查子进程状态if 'future' in locals():final_status = check_process_status(future)print(f"最终任务状态: {final_status}")# 关闭executorshutdown_executor(wait=True)print(f"最终值 - shared: {shared_value.value}, dict: {data_dict['value']}")