Files
cdtestplant_v1/apps/user/controllers.py
2026-01-28 16:50:40 +08:00

175 lines
8.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from django.contrib.auth import get_user_model
from datetime import datetime, timedelta, timezone
from ninja_extra import api_controller, ControllerBase, route
from ninja.pagination import paginate
from utils.chen_pagination import MyPagination
from ninja_extra.permissions import IsAuthenticated, IsAdminUser
from ninja import Query
from django.db import transaction
from django.contrib.auth import authenticate
from django.shortcuts import get_object_or_404
from ninja_jwt.authentication import JWTAuth
from ninja_jwt.controller import TokenObtainPairController
from ninja_jwt import schema
from typing import List
from utils.chen_response import ChenResponse
from apps.user.schema import UserInfoOutSchema, CreateUserSchema, CreateUserOutSchema, \
UserRetrieveInputSchema, \
UserRetrieveOutSchema, UpdateDeleteUserSchema, UpdateDeleteUserOutSchema, DeleteUserSchema, LogOutSchema, \
LogInputSchema, LogDeleteInSchema, AdminModifyPasswordSchema, MyTokenObtainPairOutSchema, \
MyTokenObtainPairInputSchema
from apps.user.models import TableOperationLog, Users as UserClass
from apps.project.models import Project
# 工具函数
from utils.chen_crud import update, multi_delete
from apps.user.tools.ldap_tools import load_ldap_users
Users = get_user_model()
# 定义用户登录接口包含token刷新和生成
@api_controller("/system", tags=['用户token控制和登录接口'])
class UserTokenController(TokenObtainPairController):
auto_import = True
@route.post("/login", response=MyTokenObtainPairOutSchema, url_name='login')
def obtain_token(self, user_token: MyTokenObtainPairInputSchema):
user_token.check_user_authentication_rule()
return user_token.to_response_schema()
@route.get("/getInfo", response=UserInfoOutSchema, url_name="get_info", auth=JWTAuth())
def get_user_info(self):
# 直接按照Schema返回
return self.context.request.auth
@route.post("/logout", url_name="logout", auth=JWTAuth())
def logout(self):
return ChenResponse(code=200, message='退出登录成功')
# 定义system/user用户管理接口
@api_controller("/system/user", tags=['用户管理'], auth=JWTAuth())
class UserManageController(ControllerBase):
# 用户创建接口
@route.post("/save", response=CreateUserOutSchema, url_name="user_create", auth=JWTAuth(),
permissions=[IsAuthenticated, IsAdminUser])
def create_user(self, user_schema: CreateUserSchema):
user = user_schema.create()
return user
# 给前端传所有用户当做字典
@route.get('/list', response=List[UserRetrieveOutSchema], url_name="user_list", auth=None)
@transaction.atomic
def list_user(self, project_id: int = None):
"""如果传了project_id则返回项目中的成员而非全部用户"""
qs = Users.objects.all()
if project_id is not None:
project_obj = get_object_or_404(Project, id=project_id)
all_member: list = project_obj.member
# 将member和duty_person联合
if project_obj.duty_person not in project_obj.member:
all_member.append(project_obj.duty_person)
qs = qs.filter(name__in=all_member)
return qs
# 用户检索接口
@route.get("/index", response=List[UserRetrieveOutSchema])
@paginate(MyPagination)
def index_user(self, filters: UserRetrieveInputSchema = Query(...)):
# 重要处理前端不传值为None的情况
for attr, value in filters.__dict__.items():
if getattr(filters, attr) is None:
setattr(filters, attr, '')
start_time = self.context.request.GET.get('create_datetime[0]')
if start_time is None:
start_time = "2000-01-01"
end_time = self.context.request.GET.get('create_datetime[1]')
if end_time is None:
end_time = '8000-01-01'
date_list = [start_time, end_time]
qs = Users.objects.filter(name__icontains=filters.name, username__icontains=filters.username,
phone__icontains=filters.phone, status__contains=filters.status,
create_datetime__range=date_list).order_by('-create_datetime')
return qs
@route.put("/update/{user_id}", response=UpdateDeleteUserOutSchema,
permissions=[IsAuthenticated, IsAdminUser],
url_name="user-update")
def update_user(self, user_id: int, payload: UpdateDeleteUserSchema):
if payload.username == "superAdmin":
return ChenResponse(code=400, status=400, message="无法编辑,唯一管理员账号")
payload.validate_unique_username(user_id)
update_user = update(self.context.request, user_id, payload, Users)
return {"message": "用户更新成功"}
@route.delete("/delete", permissions=[IsAuthenticated, IsAdminUser], url_name="user-delete")
def delete_user(self, data: DeleteUserSchema):
ids = data.ids
# 去掉删除创始人
for item in ids:
if item == 1:
ids.pop(item)
multi_delete(ids, Users)
return ChenResponse(code=200, status=200, message="删除成功")
# 管理员改变用户状态是否停用/启用
@route.get('/change_status', auth=JWTAuth(), permissions=[IsAuthenticated, IsAdminUser],
url_name='user-change')
def change_user_status(self, user_status: str, userId: int):
user = Users.objects.filter(id=userId).first()
if not user:
return ChenResponse(status=400, code=400, message='用户未找到')
if user.id == 1:
return ChenResponse(status=400, code=400, message='管理员不能被禁用,此操作无效')
user.status = user_status
user.save()
return user.status
@route.post("/modifyPassword", auth=JWTAuth(), permissions=[IsAuthenticated, IsAdminUser])
def modify_password(self, payload: AdminModifyPasswordSchema):
user: UserClass = self.context.request.user # type:ignore
if user:
# 判断就密码是否正确
user_old = authenticate(username=user.username, password=payload.oldPassword)
if not user_old:
return ChenResponse(status=500, code=500, message='旧密码错误,请检查')
user.set_password(payload.newPassword)
user.save()
return ChenResponse(status=200, code=200, message='管理员修改密码成功')
return None
# 用户登录后动态读取LDAP用户录入数据
@route.get("/ldap", url_name='user-ldap')
def load_ldap(self):
try:
load_ldap_users()
return ChenResponse(status=200, code=200, message='连接LDAP服务器成功同步用户数据')
except Exception as exc:
print(exc)
return ChenResponse(status=200, code=200, message='欢迎您,正在外网访问')
# 操作日志接口
@api_controller("/system/log", tags=['日志记录'], auth=JWTAuth())
class LogController(ControllerBase):
@route.get("/operation_list", url_name="log_list", response=List[LogOutSchema], auth=None)
@paginate(MyPagination)
def log_list(self, data: Query[LogInputSchema]):
for attr, value in data.model_dump().items():
if getattr(data, attr) is None:
setattr(data, attr, '')
logs = TableOperationLog.objects.values('id', 'user__username', 'operate_obj', 'create_datetime',
'operate_des').order_by(
'-create_datetime')
# 根据条件搜索
logs = logs.filter(user__username__icontains=data.user, create_datetime__range=data.create_datetime)
return logs
@route.get('/operation_delete', url_name='log_delete', permissions=[IsAuthenticated, IsAdminUser],
auth=JWTAuth())
def log_delete(self, data: LogDeleteInSchema = Query(...)):
time = datetime.now() - timedelta(days=data.day)
log_qs = TableOperationLog.objects.filter(create_datetime__lt=time)
log_qs.delete()
if data.day > 0:
return ChenResponse(message=f'删除{data.day}天前数据成功')
else:
return ChenResponse(message='全部日志删除成功')