当前位置: 首页 > news >正文

sql事务执行

使用上下文管理器

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
import logging# 创建数据库连接
engine = create_engine('mysql+pymysql://username:password@localhost/dbname')
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)@contextmanager
def get_db():"""数据库会话上下文管理器"""db = SessionLocal()try:yield dbdb.commit()except Exception as e:db.rollback()raise efinally:db.close()def update_data():"""更新数据的函数"""try:with get_db() as db:# 1. 删除操作delete_stmt = text("DELETE FROM users WHERE department = :dept AND status = :status")db.execute(delete_stmt, {"dept": "IT", "status": "inactive"})# 2. 插入操作insert_stmt = text("""INSERT INTO users (name, department, status, created_at) VALUES (:name, :dept, :status, NOW())""")# 插入多条数据users_to_insert = [{"name": "John Doe", "dept": "IT", "status": "active"},{"name": "Jane Smith", "dept": "IT", "status": "active"}]for user in users_to_insert:db.execute(insert_stmt, user)print("操作成功完成")except Exception as e:print(f"操作失败: {e}")raise# 调用函数
update_data()

  

显式使用事务

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmakerengine = create_engine('mysql+pymysql://username:password@localhost/dbname')
Session = sessionmaker(bind=engine)def update_data_explicit():"""显式事务控制"""session = Session()try:# 开始事务(SQLAlchemy 默认在事务中运行)# 删除操作delete_result = session.execute(text("DELETE FROM users WHERE department = :dept AND status = :status"),{"dept": "IT", "status": "inactive"})print(f"删除了 {delete_result.rowcount} 条记录")# 插入操作insert_stmt = text("""INSERT INTO users (name, department, status) VALUES (:name, :dept, :status)""")session.execute(insert_stmt, {"name": "John Doe", "dept": "IT", "status": "active"})# 提交事务session.commit()print("操作成功完成")except Exception as e:# 回滚事务session.rollback()print(f"操作失败,已回滚: {e}")raisefinally:session.close()update_data_explicit()

  

使用 SQLAlchemy Core 和连接事务

from sqlalchemy import create_engine, textengine = create_engine('mysql+pymysql://username:password@localhost/dbname')def update_data_with_connection():"""使用连接级别的事务"""with engine.begin() as connection:# 删除操作connection.execute(text("DELETE FROM users WHERE department = :dept AND status = :status"),{"dept": "IT", "status": "inactive"})# 插入操作connection.execute(text("INSERT INTO users (name, department, status) VALUES (:name, :dept, :status)"),{"name": "John Doe", "dept": "IT", "status": "active"})print("操作成功完成")update_data_with_connection()

  

使用 ORM 模型(如果使用 SQLAlchemy ORM)

from sqlalchemy import create_engine, Column, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetimeBase = declarative_base()class User(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(100))department = Column(String(50))status = Column(String(20))created_at = Column(DateTime, default=datetime.now)engine = create_engine('mysql+pymysql://username:password@localhost/dbname')
Session = sessionmaker(bind=engine)def update_data_orm():"""使用 ORM 方式"""session = Session()try:# 删除操作deleted_count = session.query(User).filter(User.department == "IT",User.status == "inactive").delete()print(f"删除了 {deleted_count} 条记录")# 插入操作new_users = [User(name="John Doe", department="IT", status="active"),User(name="Jane Smith", department="IT", status="active")]session.add_all(new_users)# 提交事务session.commit()print("操作成功完成")except Exception as e:session.rollback()print(f"操作失败,已回滚: {e}")raisefinally:session.close()update_data_orm()

  

 

使用连接的事务上下文(最简洁)

from sqlalchemy import create_engine, textdef update_data_simple():"""最简单的事务实现"""engine = create_engine('mysql+pymysql://user:password@localhost/dbname')try:with engine.begin() as conn:# 删除操作conn.execute(text("DELETE FROM users WHERE department = :dept AND status = :status"),{"dept": "IT", "status": "inactive"})# 插入操作conn.execute(text("""INSERT INTO users (name, department, status, created_at) VALUES (:name, :dept, :status, NOW())"""),{"name": "John Doe", "dept": "IT", "status": "active"})print("操作成功完成")except Exception as e:print(f"操作失败: {e}")raiseupdate_data_simple()

  

 

显式事务控制

from sqlalchemy import create_engine, textdef update_data_explicit():"""显式事务控制"""engine = create_engine('mysql+pymysql://user:password@localhost/dbname')conn = engine.connect()trans = conn.begin()  # 开始事务try:# 删除操作result = conn.execute(text("DELETE FROM users WHERE department = :dept AND status = :status"),{"dept": "IT", "status": "inactive"})print(f"删除了 {result.rowcount} 条记录")# 插入操作 - 多条数据users_data = [{"name": "John Doe", "dept": "IT", "status": "active"},{"name": "Jane Smith", "dept": "IT", "status": "active"},{"name": "Bob Wilson", "dept": "IT", "status": "active"}]for user in users_data:conn.execute(text("""INSERT INTO users (name, department, status, created_at) VALUES (:name, :dept, :status, NOW())"""),user)# 提交事务trans.commit()print("操作成功完成")except Exception as e:# 回滚事务trans.rollback()print(f"操作失败,已回滚: {e}")raisefinally:# 关闭连接conn.close()update_data_explicit()

  

 

批量插入的优化版本

from sqlalchemy import create_engine, textdef update_data_batch():"""批量插入的版本"""engine = create_engine('mysql+pymysql://user:password@localhost/dbname')try:with engine.begin() as conn:# 删除操作delete_result = conn.execute(text("DELETE FROM products WHERE category = :category AND stock = 0"),{"category": "electronics"})print(f"删除了 {delete_result.rowcount} 个产品")# 批量插入操作products_data = [{"name": "Laptop", "category": "electronics", "price": 999.99, "stock": 10},{"name": "Mouse", "category": "electronics", "price": 25.50, "stock": 50},{"name": "Keyboard", "category": "electronics", "price": 75.00, "stock": 30}]# 使用 executemany 批量插入conn.execute(text("""INSERT INTO products (name, category, price, stock, created_at) VALUES (:name, :category, :price, :stock, NOW())"""),products_data)print("批量操作成功完成")except Exception as e:print(f"操作失败: {e}")raiseupdate_data_batch()

  

 

带参数化的通用函数

 

from sqlalchemy import create_engine, text
from typing import List, Dict, Anydef transactional_operation(db_url: str,delete_query: str,delete_params: Dict[str, Any],insert_query: str,insert_data: List[Dict[str, Any]]
) -> bool:"""通用的事务操作函数Args:db_url: 数据库连接字符串delete_query: 删除操作的SQL语句delete_params: 删除操作的参数insert_query: 插入操作的SQL语句  insert_data: 插入操作的数据列表Returns:bool: 操作是否成功"""engine = create_engine(db_url)try:with engine.begin() as conn:# 执行删除操作delete_result = conn.execute(text(delete_query), delete_params)print(f"删除操作影响行数: {delete_result.rowcount}")# 执行插入操作if insert_data:conn.execute(text(insert_query), insert_data)print(f"插入操作影响行数: {len(insert_data)}")print("事务操作成功完成")return Trueexcept Exception as e:print(f"事务操作失败: {e}")return False# 使用示例
if __name__ == "__main__":db_url = "mysql+pymysql://user:password@localhost/test_db"delete_sql = "DELETE FROM orders WHERE status = :status AND created_at < :date"delete_params = {"status": "cancelled", "date": "2023-01-01"}insert_sql = """INSERT INTO orders (customer_id, product_id, quantity, status) VALUES (:customer_id, :product_id, :quantity, :status)"""new_orders = [{"customer_id": 1, "product_id": 101, "quantity": 2, "status": "pending"},{"customer_id": 2, "product_id": 102, "quantity": 1, "status": "pending"}]success = transactional_operation(db_url, delete_sql, delete_params, insert_sql, new_orders)print(f"操作结果: {'成功' if success else '失败'}")

  

 

 

带返回值的事务操作

from sqlalchemy import create_engine, textdef update_data_with_return():"""带返回值的事务操作"""engine = create_engine('mysql+pymysql://user:password@localhost/dbname')try:with engine.begin() as conn:# 删除操作并获取影响行数delete_result = conn.execute(text("DELETE FROM logs WHERE log_date < :date"),{"date": "2023-06-01"})deleted_count = delete_result.rowcount# 插入操作conn.execute(text("INSERT INTO logs (message, level) VALUES (:message, :level)"),{"message": "System started", "level": "INFO"})# 返回操作结果return {"success": True,"deleted_count": deleted_count,"inserted_count": 1}except Exception as e:return {"success": False,"error": str(e),"deleted_count": 0,"inserted_count": 0}# 使用
result = update_data_with_return()
print(f"操作结果: {result}")

  

 

http://www.wxhsa.cn/company.asp?id=4560

相关文章:

  • GAS_Aura-Spawn FireBolt from Event
  • 在CentOS 7系统上创建SSL/TLS证书以启用HTTPS
  • 从Craigslist广告到BHIS安全顾问:非科班生的渗透测试求职之路
  • Java 微服务架构中的实践与挑战
  • Java 与大数据处理:从 Hadoop 到实时计算
  • 国产IT运维卡壳?乐维智能运维体让运维团队告别“适配难、监控乱”
  • ubuntu18安装mysql5.7
  • 【IEEE出版 |已连续5届EI稳定检索】第六届计算机工程与智能控制学术会议(ICCEIC 2025)
  • 在选择2025年代码托管平台时,Gitee和GitHub作为国内外两大主流平台各有优势。本文将从多个维度进行对比分析,帮助开发者做出更适合自身需求的选择。
  • android使用socks5的教程
  • vue3 自定义指令并实现页面元素平滑上升
  • abp记录
  • 强化学习(二十):模仿学习
  • 重生之从零开始的神经网络算法学习之路 —— 第七篇 重拾 PyTorch(超分辨率重建和脚本的使用)
  • 从基础到实践(四十五):车载显示屏LCD、OLED、Mini-LED、MicroLED的工作原理、设计差异等说明 - 教程
  • 国产项目管理工具崛起:Gitee如何以本土化优势重构开发协作生态
  • GAS_Aura-Sending Gameplay Events
  • 【IEEE-智造领空天,寰宇链未来】第五届机电一体化技术与航空航天工程国际学术会议(ICMTAE 2025)
  • 进程间通信(消息队列)
  • 有点长所以单发的闲话(对acgn的看法(存疑))
  • 【光照】Unity中的[光照模型]概念辨析
  • 深入解析:Shell脚本监控系统资源详解
  • 计算几何全家桶
  • 完整教程:从无声视频中“听见”声音:用视觉语言模型推理音频描述
  • Win10如何安装语音包
  • C#通过TCP/IP控制康奈视读码枪实现方案
  • 链表
  • 利用三方APP[IP切换助手]使用socks5
  • 智能卫浴雷达模块感应方案WT4101寿命长不怕干扰
  • 修改Windows 资源器中文件的创建时间或更新时间