Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 23 additions & 29 deletions apps/users/serializers/login.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,21 @@
import base64
import json

from captcha.image import ImageCaptcha
from django.core import signing
from django.core.cache import cache
from django.utils.translation import gettext_lazy as _
from rest_framework import serializers

from application.models import ApplicationAccessToken
from captcha.image import ImageCaptcha
from common.constants.authentication_type import AuthenticationType
from common.constants.cache_version import Cache_Version
from common.database_model_manage.database_model_manage import DatabaseModelManage
from common.exception.app_exception import AppApiException
from common.utils.common import password_encrypt, password_verify, needs_password_upgrade, get_random_chars
from common.utils.common import get_random_chars, needs_password_upgrade, password_encrypt, password_verify
from common.utils.logger import maxkb_logger
from common.utils.rsa_util import decrypt
from django.core import signing
from django.core.cache import cache
from django.utils.translation import gettext_lazy as _
from maxkb.const import CONFIG
from rest_framework import serializers
from users.models import User
from common.utils.logger import maxkb_logger


class LoginRequest(serializers.Serializer):
Expand Down Expand Up @@ -112,14 +111,17 @@ def login(instance):
raise AppApiException(500, _("Invalid encrypted data"))

try:
LoginRequest(data=instance).is_valid(raise_exception=True)
request_serializer = LoginRequest(data=instance)
request_serializer.is_valid(raise_exception=True)
except serializers.ValidationError:
raise
except Exception as e:
raise AppApiException(500, str(e))

password = instance.get("password")
captcha = instance.get("captcha", "")
validated_data = request_serializer.validated_data
username = validated_data.get("username", "")
password = validated_data.get("password", "")
captcha = validated_data.get("captcha", "")

# 获取认证配置
auth_setting = LoginSerializer.get_auth_setting()
Expand All @@ -128,8 +130,8 @@ def login(instance):
lock_time = auth_setting.get("lock_time", 10)

# 检查许可证有效性
license_validator = DatabaseModelManage.get_model("license_is_valid") or (lambda: False)
is_license_valid = license_validator() if license_validator() is not None else False
license_validator = DatabaseModelManage.get_model("license_is_valid")
is_license_valid = bool(license_validator()) if license_validator else False

if is_license_valid:
# 检查账户是否被锁定
Expand Down Expand Up @@ -185,11 +187,15 @@ def _is_account_locked(username: str, failed_attempts: int) -> bool:

@staticmethod
def _need_captcha(username: str, max_attempts: int) -> bool:
return LoginSerializer._need_captcha_by_key(system_get_key(f"system_{username}"), max_attempts)

@staticmethod
def _need_captcha_by_key(cache_key: str, max_attempts: int) -> bool:
"""判断是否需要验证码"""
if max_attempts == -1:
return False
elif max_attempts > 0:
fail_count = cache.get(system_get_key(f"system_{username}"), version=system_version) or 0
if max_attempts > 0:
fail_count = cache.get(cache_key, version=system_version) or 0
return fail_count >= max_attempts
return True

Expand Down Expand Up @@ -273,13 +279,7 @@ class CaptchaSerializer(serializers.Serializer):
def generate(username: str, type: str = "system"):
auth_setting = LoginSerializer.get_auth_setting()
max_attempts = auth_setting.get("max_attempts", 1)

need_captcha = True
if max_attempts == -1:
need_captcha = False
elif max_attempts > 0:
fail_count = cache.get(system_get_key(f"system_{username}"), version=system_version) or 0
need_captcha = fail_count >= max_attempts
need_captcha = LoginSerializer._need_captcha_by_key(system_get_key(f"system_{username}"), max_attempts)

return CaptchaSerializer._generate_captcha_if_needed(username, type, need_captcha)

Expand All @@ -292,13 +292,7 @@ def chat_generate(username: str, type: str = "chat", access_token: str = ""):

auth_setting = application_access_token.authentication_value
max_attempts = auth_setting.get("max_attempts", 1)

need_captcha = True
if max_attempts == -1:
need_captcha = False
elif max_attempts > 0:
fail_count = cache.get(system_get_key(f"{type}_{username}"), version=system_version) or 0
need_captcha = fail_count >= max_attempts
need_captcha = LoginSerializer._need_captcha_by_key(system_get_key(f"{type}_{username}"), max_attempts)

return CaptchaSerializer._generate_captcha_if_needed(username, type, need_captcha)

Expand Down
Loading
Loading