系列文章目录
《FASTAPI零基础入门与进阶实战》https://blog.csdn.net/sen_shan/category_12950843.html
第17篇:Token验证改善--与项目与用户关联https://blog.csdn.net/sen_shan/article/details/151707238
文章目录
目录
系列文章目录
文章目录
前言
login_manager
orm_curd
CRUD
router
测试
前言
上一章聚焦 Token 验证及项目-用户绑定,本章则围绕应用 API ID 与 User ID 的增、查、改、删展开。
login_manager
1.把login_manager.py从Models移转到schemas中,并且修改相关程序,此处不做说明;
2.修改\src\schemas\login_manager.py,新增class AuthManager
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
class AuthManager(BaseModel):
api_id: str
user_id: str
username: str
role: Optional[str] = None
email: str
exp: datetime
api_key: str
orm_curd
修改src\core\orm_curd.py,本次修改内容比较多,附上全部代码:
# src\core\orm_curd.py
from typing import Set, Any, Dict, List, Optional, Type, TypeVar, Tuple, Union
from sqlalchemy.orm import Session, Query
from pydantic import BaseModel
from src.core import retMes, str_utils as strU
from sqlalchemy import and_, or_
from sqlalchemy.inspection import inspect
from src.schemas.request_model import UpdateModel, UpdateItem, ensure_dict, DeleteModel, DeleteItem
from src.schemas.login_manager import AuthManager
T = TypeVar("T") # SQLAlchemy Model
S = TypeVar("S", bound=BaseModel) # Pydantic Schema
def _multi_filter(Model: Type[T], unique_fields: List[str], row: Dict[str, Any]):
"""根据多个字段生成 and_ 条件"""
return and_(*(getattr(Model, k) == row[k] for k in unique_fields))
def Insert(
db: Session,
auth: AuthManager,
Model: Type[T],
Schema: Type[S],
Model_Name: str,
data: List[Dict[str, Any]],
unique_fields: List[str],
skip_duplicates: bool = True,
) -> Dict[str, Any]: # Tuple[List[T], List[T]]:
"""
通用批量写入/跳过重复
:param db: SQLAlchemy Session
:param auth: 登录信息
:param Model: 数据库模型类
:param Schema: Pydantic 校验模型(仅用于序列化返回)
:param Model_Name:数据库模型类名称
:param data: 待写入的字典列表
:param unique_fields: 用于判断是否重复的唯一字段名列表
:param skip_duplicates: True=跳过重复;False=抛异常
:return: 按 retMes.Success 包装好的 dict
"""
api_id = auth.api_id
user_id = auth.user_id
created, duplicates, faulty = [], [], []
for row in data:
if not isinstance(row, dict):
row = row.model_dump() #V1.dict(),V2 model_dump() # 不是 dict 转换dict结构
# --------- 新增:创建人、更新人等默认值 ---------
if hasattr(Model, 'api_id'):
row["api_id"]=api_id
if hasattr(Model, 'api_id'):
row["creator_by"]=user_id
if hasattr(Model, 'last_updated_date'):
row["last_updated_date"]=None
if hasattr(Model, 'last_updated_by'):
row["last_updated_by"]=None
if hasattr(Model, 'deletion_date'):
row["deletion_date"] = None
if hasattr(Model, 'deletion_by'):
row["deletion_by"] = None
# --------- 新增:逻辑删除 ---------
if hasattr(Model, 'deletion_mark'):
if row["deletion_mark"]:
if hasattr(Model, 'deletion_date') and row["deletion_date"] is None:
row["deletion_date"]=strU.now()
if hasattr(Model, 'deletion_by'):
row["deletion_by"]=user_id
# --------- 新增:完整性校验 ---------
missing = [k for k in unique_fields if k not in row or row[k] is None]
if missing:
faultyData = {"message": f"行数据缺少唯一键字段为:{missing}",
"details": row}
faulty.append(faultyData)
continue
# 1. 构造唯一性过滤条件
exists = db.query(Model).filter(_multi_filter(Model, unique_fields, row)).first()
# filter_cond = [getattr(Model, k) == row[k] for k in unique_fields]
# exists = db.query(Model).filter(*filter_cond).first()
if exists:
duplicates.append(exists)
if not skip_duplicates:
retMes.Raise(message=f"Duplicate on {unique_fields}: {row}").mes()
# raise ValueError(f"Duplicate on {unique_fields}: {row}")
continue
new_obj = Model(**row)
db.add(new_obj)
created.append(new_obj)
if created:
db.commit()
for obj in created:
db.refresh(obj)
# 2. 统一序列化
# created_dicts = [strU.model_to_dict(obj) for obj in created]
# duplicates_dicts = [Schema.from_orm(obj).dict() for obj in duplicates]
created_dicts = [Schema.model_validate(obj).model_dump() for obj in created]
duplicates_dicts = [Schema.model_validate(obj).model_dump() for obj in duplicates]
# 3. 构造响应
message = f"成功创建 {len(created_dicts)} 个{Model_Name}"
if duplicates_dicts:
message += f",跳过 {len(duplicates_dicts)} 个重复{Model_Name}"
if faulty:
message += f",有 {len(faulty)} 个数据格式错误"
response_data = {
"created": {"count": len(created_dicts), "details": created_dicts},
"duplicates": {"count": len(duplicates_dicts), "details": duplicates_dicts},
"faulty": {"count": len(faulty), "details": faulty},
}
return retMes.Success(response_data, message).mes()
def query_with_page(
db: Session,
auth: AuthManager,
model_cls: Type, # 对应 models.SysUser
resp_model_cls: Optional[Type] = None, # 对应 schemas.SysUser,仅做字段校验,可省略
*,
id: Optional[str] = None,
filter_model: Optional[BaseModel] = None, # 对应 request_model.QueryModel
page: int = 0,
page_size: int = 100
) -> Dict[str, Any]:
"""
通用分页查询
:param db: 数据库会话
:param auth: 登录信息
:param model_cls: SQLAlchemy 实体类
:param resp_model_cls: Pydantic 响应模型(可选)
:param id: 按主键 id 精确查询
:param filter_model: 前端传来的过滤 + 分页参数
:param page: 默认页码
:param page_size: 默认每页条数
:return: 按 retMes.Success 包装好的 dict
"""
api_id = auth.api_id
# query: Query = db.query(model_cls).filter(model_cls.api_id == api_id)
if hasattr(model_cls, 'api_id'):
query: Query = db.query(model_cls).filter(model_cls.api_id == api_id)
else:
query: Query = db.query(model_cls) # 不带 api_id 的表直接查全量
# 1. 主键精确查询
if strU.is_not_empty(id):
query = query.filter(model_cls.id == id)
# 2. 动态过滤条件
elif strU.is_not_empty(filter_model):
if filter_model.page is not None:
page = filter_model.page
if filter_model.page_size is not None:
page_size = filter_model.page_size
filter_data: Dict[str, Any] = filter_model.data or {}
for key, value in filter_data.items():
if hasattr(model_cls, key) and value is not None:
query = query.filter(getattr(model_cls, key) == value)
# 3. 分页
def _calc_skip(pg: int, ps: int) -> int:
return (pg - 1) * ps + 1 if pg > 1 else 0
if strU.is_empty(page) or strU.is_empty(page_size):
total = query.count()
rows = query.all()
else:
skip = _calc_skip(page, page_size)
limit = page_size
total = query.count()
rows = query.offset(skip).limit(limit).all()
# 4. 转 dict
# details: List[Dict[str, Any]] = [strU.model_to_dict(r) for r in rows]
# 4. 转 dict(带 resp_model_cls 校验)
details: List[Dict[str, Any]] = []
for row in rows:
if resp_model_cls: # ← 新增
validated = resp_model_cls.from_orm(row) # Pydantic 校验/脱敏
details.append(validated.dict())
else: # ← 原有
details.append(strU.model_to_dict(row))
# 5. 包装
payload = {
"count": len(details),
"page": page,
"page_size": page_size,
"total_count": total,
"details": details
}
return retMes.Success(payload,
f'共计:{len(details)} 条数据',
record_count=len(details)).mes()
class PkValuesEmptyError(ValueError):
"""主键值列表为空异常"""
pass
def _only_schema_fields(obj: Any, schema: Union[BaseModel, Type[BaseModel], dict]) -> Dict[str, Any]:
"""
只保留 schema 中声明的字段(支持 Pydantic 模型或 dict)
"""
if isinstance(schema, dict):
schema_fields = schema.keys()
elif isinstance(schema, BaseModel):
schema_fields = schema.__fields_set__ | schema.__fields__.keys()
else: # 类类型
schema_fields = schema.__fields__.keys()
# 2. 统一把 obj 变 dict
if isinstance(obj, dict):
data = obj
else:
data = strU.model_to_dict(obj) # 仅 ORM 对象才需要
# 3. 过滤
return {k: v for k, v in data.items() if k in schema_fields}
def updateItem(
db: Session,
auth: AuthManager,
model_cls,
pk_names: List[str],
pk_values: Optional[List[Any]] = None,
schema_obj: Union[BaseModel, Dict[str, Any]] = None,
update_type: str = "update",
) -> Optional[Any]:
"""
通用实体部分更新(支持联合主键)。
如果调用方未传入 pk_values,则尝试从 schema_obj 解析;
解析后仍为空则抛出 PkValuesEmptyError。
:param db: 数据库会话
:param auth: 登录信息
:param model_cls: ORM 模型类
:param pk_names: 主键字段名列表,如 ['tenant_id', 'id']
:param pk_values: 主键值列表,如 [1, 'abc'];可为 None
:param schema_obj: Pydantic 实例(含待更新字段)
:param update_type: 修改类型
:return: 更新后的 ORM 对象或 None(未找到)
:raises: PkValuesEmptyError
"""
# print(schema_obj)
api_id = auth.api_id
user_id = auth.user_id
# 1. 若未显式传入 pk_values,则从 schema_obj 提取
if strU.is_empty(pk_values):
if schema_obj is None:
retMes.Raise(message="schema_obj 为空,无法提取主键值").mes()
# raise PkValuesEmptyError("schema_obj 为空,无法提取主键值")
# data = schema_obj.dict()
data = ensure_dict(schema_obj)
pk_values = [data.get(name) for name in pk_names]
# 2. 再次检查 pk_values 是否全部非空
if not pk_values or any(v is None for v in pk_values):
retMes.Raise(message="主键值列表为空或包含 None").mes()
# raise PkValuesEmptyError("主键值列表为空或包含 None")
# 3. 构造查询并更新
# query = db.query(model_cls)
if hasattr(model_cls, 'api_id'):
query = db.query(model_cls).filter(model_cls.api_id == api_id)
else:
query = db.query(model_cls) # 不带 api_id 的表直接查全量
for name, value in zip(pk_names, pk_values):
query = query.filter(getattr(model_cls, name) == value)
# 如果update_type=“mark”,增加deletion_mark为0或者false判断
if update_type == "mark":
query = query.filter(model_cls.deletion_mark == 0)
# 4. 先捞出所有主键(仅主键,节省内存)----------
# 支持联合主键:每条主键是一个 tuple
mapper = inspect(model_cls)
pk_cols = mapper.primary_key
if pk_cols is None:
pk_cols = [getattr(model_cls, pk) for pk in pk_names]
pk_list = query.with_entities(*pk_cols).all() # [(val1, val2), ...]
if not pk_list:
return []
# 5. 组装更新字段
update_data = ensure_dict(schema_obj)
update_data = {
k: v for k, v in update_data.items()
if k not in pk_names and k not in {"create_date", "creator_by"}
}
if hasattr(model_cls, "last_updated_date"):
update_data["last_updated_date"] = strU.now()
if hasattr(model_cls, "last_updated_by"):
update_data["last_updated_by"] = user_id
if not update_data: # 无字段可更新
return []
# 6. 批量更新
query.update(update_data, synchronize_session=False)
db.commit()
# 7. 根据主键再查一次,拿到更新后实体 ----------
# 动态构造 OR (pk1=val1 AND pk2=val2) ...
or_conditions = [
and_(*[col == val for col, val in zip(pk_cols, vals)])
for vals in pk_list
]
updated_rows = db.query(model_cls).filter(or_(*or_conditions)).all()
# 8. 按 schema_obj 字段过滤
if schema_obj is None:
return [strU.model_to_dict(r) for r in updated_rows]
return [_only_schema_fields(r, schema_obj) for r in updated_rows]
def update(
db: Session,
auth: AuthManager,
model_cls,
update_obj: Optional[UpdateModel] = None,
) -> Dict[str, Any]:
"""
通用实体部分更新(支持联合主键)。
如果调用方未传入 pk_values,则尝试从 schema_obj 解析;
解析后仍为空则抛出 PkValuesEmptyError。
:param db: 数据库会话
:param auth: 登录信息
:param model_cls: ORM 模型类
:param update_obj: Pydantic 实例(含待更新字段)
:return: 更新后的 ORM 对象或 None(未找到)
:raises: PkValuesEmptyError
"""
# 1.
if update_obj is None:
return retMes.Error("修改内容Json 为空").mes()
updated_entities: List[Any] = []
# 遍历裸数组
for item in update_obj.root:
# 如果 item 里给了 pk_names / pk_values,就用它;否则用外层默认值
_pk_names = item.pk_names if item.pk_names is not None else None
_pk_values = item.pk_values if item.pk_values is not None else None
_data = item.data if item.data is not None else None
entity = updateItem(
db=db,
auth=auth,
model_cls=model_cls,
pk_names=_pk_names,
pk_values=_pk_values,
schema_obj=_data, # 把 data 字典作为 schema_obj
)
if entity is not None:
updated_entities.extend(entity) # entity_dict
# entity_dict = strU.model_to_dict(entity)
# append(obj) 把 obj 原封不动当成一个元素追加到列表末尾。 结果长度 +1
# extend(iterable) # 把 iterable 拆开成单个元素后再依次追加。 # 结果长度 +len(iterable)
# updated_entities.append(entity_dict)
return retMes.Success(updated_entities,
f'共计:{len(updated_entities)} 条数据',
record_count=len(updated_entities)).mes()
def deleteItem(
db: Session,
auth: AuthManager,
model_cls,
pk_names: List[str],
pk_values: Optional[List[Any]] = None,
schema_obj: Union[BaseModel, Dict[str, Any]] = None,
exclude_fields: Optional[Set[str]] = {"api_id", "creator_by"},
) -> List[Dict[str, Any]]:
"""
删除所有符合主键条件的记录(支持联合主键)
返回实际删除的行数
"""
api_id=auth.api_id
# 1. 解析主键
if strU.is_empty(pk_values):
data = ensure_dict(schema_obj)
pk_values = [data.get(name) for name in pk_names]
if not pk_values or any(v is None for v in pk_values):
retMes.Raise(message="主键值列表为空或包含 None").mes()
# 2. 构造条件
# query = db.query(model_cls)
if hasattr(model_cls, 'api_id'):
query: Query = db.query(model_cls).filter(model_cls.api_id == api_id)
else:
query: Query = db.query(model_cls) # 不带 api_id 的表直接查全量
for name, value in zip(pk_names, pk_values):
query = query.filter(getattr(model_cls, name) == value)
# 3. 先取出字段(避免 detached 后访问失败)
rows = query.all()
if not rows:
return []
data_list = strU.model_to_dict(rows)
# 3.2 统一剔除
if exclude_fields:
for item in data_list:
for f in exclude_fields:
item.pop(f, None)
# 3. 批量删除(0 查询)
row_count = query.delete(synchronize_session=False) # 直接发 DELETE ... WHERE ...
db.commit()
return data_list
def delete(
db: Session,
auth: AuthManager,
model_cls,
delete_obj: Optional[DeleteModel] = None,
) -> Dict[str, Any]:
"""
通用实体部分更新(支持联合主键)。
如果调用方未传入 pk_values,则尝试从 schema_obj 解析;
解析后仍为空则抛出 PkValuesEmptyError。
:param db: 数据库会话
:param auth: 登录信息
:param model_cls: ORM 模型类
:param delete_obj: Pydantic 实例(含待更新字段)
:return: 更新后的 ORM 对象或 None(未找到)
:raises: PkValuesEmptyError
"""
# 1.
if delete_obj is None:
return retMes.Error("删除内容为空").mes()
deleted_entities: List[Any] = []
marked_entities: List[Any] = []
# 遍历裸数组
for item in delete_obj.root:
# 如果 item 里给了 pk_names / pk_values,就用它;否则用外层默认值
_deletion_model = item.deletion_model if item.deletion_model is not None else None
_pk_names = item.pk_names if item.pk_names is not None else None
_pk_values = item.pk_values if item.pk_values is not None else None
_data = item.data if item.data is not None else None
_deletion_reason = item.deletion_reason if item.deletion_reason is not None else None
if _deletion_model == "mark":
#增加Data内容,若没有,则增加一个
if _data is None:
if _pk_names is None:
retMes.Raise(message="主键字段列表为空").mes()
_data = {"deletion_reason": _deletion_reason
}
else:
# 检查_data中是否有deletion_reason字段
if "deletion_reason" not in _data:
_data["deletion_reason"] = _deletion_reason
# 增加deletion_date字段与强制增加删除日期
_data["deletion_date"] = strU.now()
_data["deletion_mark"] = 1
# 标记删除
marked = updateItem(
db=db,
auth=auth,
model_cls=model_cls,
pk_names=_pk_names,
pk_values=_pk_values,
schema_obj=_data, # 把 data 字典作为 schema_obj
update_type="mark"
)
if marked is not None and len(marked) > 0:
marked_entities.extend(marked)
# print(marked)
# marked_dict = strU.model_to_dict(marked)
# marked_entities.extend(marked_dict)
else:
deleted = deleteItem(
db=db,
auth=auth,
model_cls=model_cls,
pk_names=_pk_names,
pk_values=_pk_values,
schema_obj=_data, # 把 data 字典作为 schema_obj
)
if len(deleted) > 0:
deleted_entities.extend(deleted)
"""
if deleted is not None and len(deleted)>0:
deleted_dict = strU.model_to_dict(deleted)
deleted_entities.extend(deleted_dict)
"""
# 3. 构造响应
message_parts = []
if deleted_entities:
message_parts.append(f"成功删除 {len(deleted_entities)} 笔资料")
if marked_entities:
message_parts.append(f"成功标注失效记录 {len(marked_entities)}")
message = ",".join(message_parts) or "无任何变动"
response_data = {
"deleted": {"count": len(deleted_entities), "details": deleted_entities},
"marked": {"count": len(marked_entities), "details": marked_entities},
}
record_count = len(deleted_entities) + len(marked_entities)
return retMes.Success(response_data,
f'{message} ',
record_count).mes()
所有代码参数中,增加auth: AuthManager,后期获取api_id与User_id.
新增
在写入数据库前,统一为实体补全审计字段与逻辑删除标记,保证数据可追溯、可复用。
❶. api_id与User_id获取
根据登录信息获取 api_id与User_id。
❷.项目隔离
若模型带 api_id 列,自动把当前登录用户的项目编号写入,防止跨项目数据串扰。
❸. 创建人 / 更新人
新增时: creator_by = 当前用户 ID;
首次写入时: last_updated_by 、 last_updated_date 置空,由数据库触发器或后续更新逻辑回填。
❹. 逻辑删除
默认 deletion_mark = 0 , deletion_date 与 deletion_by 置空;
当调用方显式把 deletion_mark 设为 1 时,自动写入删除时间(当前时间)及删除人,实现“谁删的、何时删”一目了然。
❺. 防御式编程
全程用 hasattr 判断字段是否存在,同一段代码可安全应用于不同表,无字段则跳过,避免 AttributeError 。
修改
更新前自动补全「项目隔离」与「审计字段」,同一段代码可安全复用到任意表。
❶. 项目隔离
若模型存在 api_id 列,则自动带上 api_id = 当前登录项目 ID 的条件,防止跨项目误改数据;全局表无此字段则跳过,直接查全量。
❷. 审计字段
只要表里有 last_updated_date ,就写入当前时间;
有 last_updated_by ,就写入当前用户 ID。
不存在对应字段的表自动忽略,避免抛错。
❸. 防御式写法
全程用 hasattr 判断,模型增减字段无需改动公共逻辑,一次编写,全库通用。
删除
删除除了参加参数,程序逻辑变化不大。
查询
查询前自动按「api_id 」隔离数据,同一段代码兼容所有模型。
❶. 若模型存在 api_id 字段,则自动追加过滤条件 api_id = 当前登录项目 ID ,确保仅返回本项目数据。
❷. 若模型无 api_id 字段(如系统级字典表),则跳过过滤,直接查全量,避免报错。
❸. 全程使用 hasattr 判断,模型增减字段无需调整公共逻辑,实现“一次编写,全库通用」。
CRUD
修改crud/sys_user.py,本改版比较多,附上全部代码:
# crud/sys_user.py
from pydoc import pager
from sqlalchemy.orm import Session
from src.models import sys_user as models
from src.schemas import sys_user as schemas, request_model
from src.core import retMes, str_utils as strU, orm_curd
from src.schemas.login_manager import AuthManager
def create_user(db: Session,
auth: AuthManager,
user: schemas.SysUserCreate):
"""
新增单个用户。
参数说明
----------
db : Session
数据库会话对象。
auth: 登录信息
user : schemas.SysUserCreate
待创建用户的 Pydantic 模型实例。
返回值
-------
dict
按 retMes.Success 包装好的 dict
备注
----
内部调用 create_users 实现单条批量插入,行为与批量接口保持一致。
"""
# 检查用户是否已存在(基于login_id)
return orm_curd.Insert(db,
auth,
models.SysUser,
schemas.SysUser,
"用户",
[user.dict()], # [u.dict() for u in users], # users,
["api_id", "login_id"],
skip_duplicates=True)
def create_users(db: Session,
auth: AuthManager,
users: list[schemas.SysUserCreate]):
"""
批量新增用户。
参数说明
----------
db : Session
数据库会话对象。
auth: 登录信息
users : list[schemas.SysUserCreate]
待创建用户的 Pydantic 模型实例列表。
返回值
-------
dict
按 retMes.Success 包装好的 dict
"""
return orm_curd.Insert(db,
auth,
models.SysUser,
schemas.SysUser,
"用户",
users, #[u.dict() for u in users], # users,
["api_id", "login_id"],
skip_duplicates=True)
def get_users(db: Session,
auth: AuthManager,
page: int = 0,
page_size: int = 100,
FilterModel: request_model.QueryModel = None,
id: str = None):
"""
分页查询系统用户列表。
参数说明
----------
db : Session
数据库会话对象,用于执行 ORM 操作。
auth: 登录信息
authManager:authManager
登录信息
page : int, 可选
页码,从 0 开始计数;默认值为 0。
page_size : int, 可选
每页返回的记录数;默认值为 100。
FilterModel : request_model.QueryModel, 可选
查询条件模型,用于构造过滤条件;默认值为 None,表示不过滤。
id : str, 可选
用户 ID 精准匹配;默认值为 None,表示不根据 ID 过滤。
返回值
-------
dict
按 retMes.Success 包装好的 dict
备注说明
--------
1. 本函数通过 `orm_curd.query_with_page` 统一封装,自动完成分页、过滤、排序等操作。
2. 当 `id` 参数非空时,会优先使用精准匹配;其余过滤条件以 `FilterModel` 为准。
3. 若 `schemas.SysUser` 为 None,则返回原始 ORM 对象,跳过 Pydantic 校验,性能略高。
4. 调用方需确保传入的 `db` 处于活跃事务中,避免 Lazy Load 异常。
"""
return orm_curd.query_with_page(db,
auth,
models.SysUser,
schemas.SysUser, # 如不需要字段校验可传 None
id=id,
filter_model=FilterModel,
page=page,
page_size=page_size)
def update_user(db: Session,
auth: AuthManager,
user_id: str,
user: schemas.SysUser,
) -> schemas.SysUser:
"""
更新单个用户
参数
----
db : SQLAlchemy Session
auth: 登录信息
user_id : 待更新用户的主键 id
user : Pydantic 模型,仅包含需要修改的字段(exclude_unset=True)
返回
----
更新后的用户 ORM 对象(已 flush,含最新值)
"""
update_model = request_model.UpdateModel([
request_model.UpdateItem(
pk_names=["id"],
pk_values=[user_id],
data=user.dict(exclude_unset=True)
)
])
return orm_curd.update(db,
auth,
models.SysUser,
update_model)
def update_users(db: Session,
auth: AuthManager,
updateModel: request_model.UpdateModel
):
"""
批量更新用户
参数
----
db : SQLAlchemy Session
auth: 登录信息
updateModel : request_model.UpdateModel
待更新的用户列表,包含主键 id 和数据字段
返回
----
按 retMes.Success 封装的 dict
"""
return orm_curd.update(db,
auth,
models.SysUser,
updateModel)
def delete_user(db: Session,
auth: AuthManager,
deleteModel: request_model.DeleteModel):
"""
调用通用 delete 接口删除指定用户(硬删除)
:param db: SQLAlchemy 会话
:param auth: 登录信息
:param deleteModel: 删除内容
:return: 统一响应格式
"""
# 直接调用通用删除函数
return orm_curd.delete(db=db,
auth=auth,
model_cls=models.SysUser,
delete_obj=deleteModel)
在所有程序增加参数auth: AuthManager登录信息
router
修改src/router/sys_user.py,,本改版比较多,附上全部代码:
# src/router/sys_user.py
# from main import startup_event, shutdown_event
from src.crud import (sys_user as sysUserCrud,
sys_api_project as SysApiProjectCrud,
sys_api_teamwork as SysApiTeamworkCrud,
sys_field_mapping as SysFieldMappingCrud,
sys_organization as SysOrganizationCrud,
sys_table_mapping as SysTableMappingCrud)
from src.schemas import (sys_user as sysUserSchema,
sys_api_project as SysApiProjectSchema,
sys_api_teamwork as SysApiTeamworkSchema,
sys_field_mapping as SysFieldMappingSchema,
sys_organization as SysOrganizationSchema,
sys_table_mapping as SysTableMappingSchema)
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from src.core.ormdb import get_db
from src.schemas import request_model
from src.schemas.login_manager import AuthManager
from src.core import dependencies, retMes
router = APIRouter( # prefix="/user",
# tags=["sys_user"] # ,
# dependencies=[Depends(get_token_header)],
# responses={404: {"description": "Not found"}},
# description="Operations related to system users",
# on_startup=[startup_event],
# on_shutdown=[shutdown_event]
)
# 创建用户
#@router.post("/users/", response_model=list[sysUserSchema.SysUser])
@router.post("/users/", response_model=request_model.ResponseModel)
def create_users(users: list[sysUserSchema.SysUserCreate],
db: Session = Depends(get_db),
auth: AuthManager = Depends(dependencies.auth_token)
):
return sysUserCrud.create_users(db,auth, users)
# @router.post("/user/", response_model=sysUserSchema.SysUser)
@router.post("/user/", response_model=request_model.ResponseModel)
def create_user(user: sysUserSchema.SysUserCreate,
db: Session = Depends(get_db),
auth: AuthManager = Depends(dependencies.auth_token)
):
return sysUserCrud.create_user(db,auth, user)
# 获取用户
@router.get("/users/{id}", response_model=request_model.ResponseModel)
def read_user(
id: str,
db: Session = Depends(get_db),
auth: AuthManager = Depends(dependencies.auth_token)
):
# 按 id 查单条
db_user = sysUserCrud.get_users(db, auth, None, None, None, id)
if not db_user: # 查不到
retMes.Raise(message="User not found").mes() # 统一 401
return db_user
# 获取所有用户
# @router.get("/users/", response_model=list[sysUserSchema.SysUser])
@router.get("/users/", response_model=request_model.ResponseModel)
def read_users(page: int = 1,
page_size: int = 100,
db: Session = Depends(get_db),
auth: AuthManager = Depends(dependencies.auth_token)
):
# 将页码转换为skip值
# skip = (page - 1) * page_size
return sysUserCrud.get_users(db, auth, page, page_size)
# 获取所有用户
# @router.post("/users/", response_model=list[sysUserSchema.SysUser])
@router.post("/users/query/", response_model=request_model.ResponseModel)
def read_users(queryModel: request_model.QueryModel = None,
db: Session = Depends(get_db),
auth: AuthManager = Depends(dependencies.auth_token)
):
# 将页码转换为skip值
return sysUserCrud.get_users(db, auth, None, None, queryModel)
# 更新用户
@router.put("/users/{user_id}", response_model=request_model.ResponseModel)
def update_user(user_id: str,
user: sysUserSchema.SysUser,
db: Session = Depends(get_db),
auth: AuthManager = Depends(dependencies.auth_token)):
db_user = sysUserCrud.update_user(db, auth, user_id, user)
if db_user is None:
retMes.Raise(message="User not found").mes() # 统一 401
return db_user
@router.put("/users/", response_model=request_model.ResponseModel)
def update_user(updateModel: request_model.UpdateModel,
db: Session = Depends(get_db),
auth: AuthManager = Depends(dependencies.auth_token)):
db_user = sysUserCrud.update_users(db, auth, updateModel)
if db_user is None:
retMes.Raise(message="User not found").mes() # 统一 401
print(db_user)
return db_user
# 删除用户
@router.delete("/users/", response_model=request_model.ResponseModel)
def delete_user(deleteModel: request_model.DeleteModel,
db: Session = Depends(get_db),
auth: AuthManager = Depends(dependencies.auth_token)):
db_user = sysUserCrud.delete_user(db,auth, deleteModel)
if db_user is None:
retMes.Raise(message="User not found").mes() # 统一 401
return db_user
在所有程序增加参数auth: AuthManager = Depends(dependencies.auth_token)获取登录信息
测试
在Header 增加x-api-key与token,并属于正确值,具体参考第七章节中验证Token。