backend/fastapi/app/router/manage_user_api.py

569 lines
33 KiB
Python
Raw Permalink Normal View History

2024-12-06 05:12:28 +00:00
from fastapi import APIRouter, Depends, HTTPException, Header, Body, status, Request, File, UploadFile, Form
from sqlalchemy.orm import Session
from fastapi.security import APIKeyHeader
from typing import Union, Optional, List
from typing_extensions import Annotated
from db import models, schemas, crud
import json
import logging
import datetime
from kafka import KafkaProducer
from fastapi.responses import FileResponse, StreamingResponse
import os
import openpyxl
import time
import random
import string
from db.schemas import RawData, RtuGenerator
from db.base import get_db
from process.logger import logger
from process.certification import cert_process
from process.response import response
from process.user import manage_user
from process.user import manage_user_pattern
from process.email import email_user
KST = datetime.timezone(datetime.timedelta(hours=9))
router = APIRouter(
prefix="/user",
tags=["user"],
responses={404: {"description": "Not found"}},
)
# -----------------------------------------------------------------------------------------------
async def get_body(request: Request):
return await request.body()
#==================================================================================================
# 로그인
#==================================================================================================
@router.post("/login")
async def login(request: Request, body: bytes = Depends(get_body), db: Session = Depends(get_db)):
try:
# 인증서 갱신
auth_token = cert_process.renew_cert(request=request)
auth_token = auth_token['data']
# body에서 ID, PW 추출
try:
body = json.loads(body)
except json.JSONDecodeError as e:
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='데이터 에러', msg_content='데이터 처리 장애가 발생했습니다. 요청정보를 정확히 입력했는지 확인해주세요.', data={})
user_id = body['user_id']
user_pw = body['user_pw']
2025-01-07 06:59:18 +00:00
logger.debug(f"test11")
2024-12-06 05:12:28 +00:00
# ID, PW로 로그인 시도
login_result = await manage_user.do_login(user_id= user_id, user_pw= user_pw, db=db)
2025-01-07 06:59:18 +00:00
logger.debug(f"test12")
2024-12-06 05:12:28 +00:00
# 로그인 결과에 따른 응답
if login_result['result'] == 'OK':
2025-01-07 06:59:18 +00:00
logger.debug(f"test13")
2024-12-06 05:12:28 +00:00
user_seq = (login_result['user_seq'])[0]
# 마지막 로그인 시간 업데이트
last_login_dt_result = await manage_user.update_last_login_dt(user_seq=user_seq, db=db)
2025-01-07 06:59:18 +00:00
logger.debug(f"test14")
2024-12-06 05:12:28 +00:00
if last_login_dt_result['result'] == 'OK':
2025-01-07 06:59:18 +00:00
logger.debug(f"test15")
2024-12-06 05:12:28 +00:00
auth_token = {
"renew_yn": 'Y',
2025-01-07 06:59:18 +00:00
"token": cert_process.create_jwt(user_seq=user_seq, period=60*60*24*30),
"user_seq": user_seq,
2024-12-06 05:12:28 +00:00
}
return await response.ok_res(auth_token=auth_token, data={}, db=db)
else:
2025-01-07 06:59:18 +00:00
logger.debug(f"test16")
2024-12-06 05:12:28 +00:00
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='로그인 실패', msg_content='로그인 정보 업데이트에 실패했습니다.', data={})
else:
2025-01-07 06:59:18 +00:00
logger.debug(f"test17")
2024-12-06 05:12:28 +00:00
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='로그인 실패', msg_content='회원정보를 다시 확인해주세요.', data={})
except Exception as e:
logger.error(f"request error. URL: /user/login\nerror message: {e}")
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='로그인 에러', msg_content='로그인 처리중 에러가 발생했습니다.', data={})
#==================================================================================================
# 아이디 찾기
#==================================================================================================
@router.post("/find/id")
async def find_id(request: Request, body: bytes = Depends(get_body), db: Session = Depends(get_db)):
try:
# 인증서 갱신
auth_token = cert_process.renew_cert(request=request)
auth_token = auth_token['data']
# body에서 ID, PW 추출
try:
body = json.loads(body)
except json.JSONDecodeError as e:
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='데이터 에러', msg_content='데이터 처리 장애가 발생했습니다. 요청정보를 정확히 입력했는지 확인해주세요.', data={})
nickname = body['nickname']
user_email = body['user_email']
# 닉네임 패턴 검증
if not manage_user_pattern.validate_nickname(nickname=nickname):
2025-01-07 06:59:18 +00:00
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='닉네임 확인', msg_content='닉네임을 정확히 입력해주세요.', data={})
2024-12-06 05:12:28 +00:00
# 이메일 패턴 검증
if not manage_user_pattern.validate_user_email(user_email=user_email):
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='이메일 확인', msg_content='이메일 주소를 정확히 입력해주세요.', data={})
# 닉네임, EMAIL로 아이디 찾기 시도
find_id_result = await manage_user.find_id_by_name_email(nickname=nickname, user_email=user_email, db=db)
# 아이디 찾기 시도 결과에 따른 응답
if find_id_result['result'] == 'OK':
# user_seq, user_id 확인
user_seq = find_id_result['user_seq']
user_id = find_id_result['user_id']
# ID 마스킹 처리
user_id = user_id[:int(len(user_id)/2)] + ''.join(['*' for _ in range(int(len(user_id)/2))])
data = {
"auth": cert_process.create_jwt(user_seq=user_seq, period=600),
"user_id": user_id,
}
return await response.ok_res(auth_token=auth_token, data=data, db=db)
else:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='아이디 찾기 실패', msg_content='회원정보를 다시 확인해주세요.', data={})
except Exception as e:
logger.error(f"request error. URL: /user/find/id\nerror message: {e}")
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='로그인 에러', msg_content='로그인 처리중 에러가 발생했습니다.', data={})
#==================================================================================================
# 아이디 이메일 전송
#==================================================================================================
@router.post("/find/id/full")
async def find_receive_by_eamil(request: Request, body: bytes = Depends(get_body), db: Session = Depends(get_db)):
try:
# 인증서 갱신
auth_token = cert_process.renew_cert(request=request)
auth_token = auth_token['data']
# body에서 ID, PW 추출
try:
body = json.loads(body)
except json.JSONDecodeError as e:
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='데이터 에러', msg_content='데이터 처리 장애가 발생했습니다. 요청정보를 정확히 입력했는지 확인해주세요.', data={})
auth = body['auth']
auth_data = cert_process.decode_jwt(auth)
if auth_data['result'] == 'OK':
auth_data = auth_data['data']
user_seq = auth_data['user_seq']
exp = auth_data['exp']
iat = auth_data['iat']
current_time = int(time.time())
if not(iat <= current_time and current_time <= exp):
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='이메일 발송 실패', msg_content='토큰 유효기간이 만료되었습니다.', data={})
else:
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='토큰 에러', msg_content='토큰 정보가 정확하지 않습니다.', data={})
# user_seq로 아이디 찾기 시도
find_id_result = await manage_user.find_id_by_user_seq(user_seq=user_seq, db=db)
# 아이디 찾기 시도 결과에 따른 응답
if find_id_result['result'] == 'OK':
# user_id, user_email 확인
user_id = find_id_result['user_id']
user_email = find_id_result['user_email']
# 이메일 발송 3회 이상 됐는지 확인
send_email_cnt_result = await manage_user.select_send_email_cnt(user_email=user_email, db=db)
if send_email_cnt_result['result'] == 'OK':
send_email_cnt = send_email_cnt_result['send_email_cnt']
if send_email_cnt >= 3:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='이메일 발송 실패', msg_content='이메일 발송은 하루 3번까지 가능합니다.', data={})
else:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='이메일 발송 실패', msg_content='이메일 발송 실패했습니다.', data={})
# 이메일 발송 인증 내역 입력
insert_email_cert_result = await manage_user.insert_send_email_info(user_seq=user_seq, cert_type='email', user_info=user_email, cert_code=user_id, db=db)
if insert_email_cert_result['result'] == 'FAIL':
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='이메일 발송 실패', msg_content='이메일 발송 실패했습니다.', data={})
# 이메일로 유저 아이디 발송해주기
send_email_result = email_user.email_find_id(user_email=user_email, info=user_id)
if send_email_result['result'] == 'OK':
return await response.ok_res(auth_token=auth_token, data={"send_email_result": True}, db=db)
else:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='이메일 발송 실패', msg_content=send_email_result['msg'], data={})
else:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='이메일 발송 실패', msg_content='이메일 발송 실패했습니다.', data={})
except Exception as e:
logger.error(f"request error. URL: /user/find/id/full\nerror message: {e}")
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='아이디 찾기 에러', msg_content='아이디 찾기 이메일 발송 처리중 에러가 발생했습니다.', data={})
#==================================================================================================
# 비밀번호 찾기
#==================================================================================================
@router.post("/find/password")
async def find_pw(request: Request, body: bytes = Depends(get_body), db: Session = Depends(get_db)):
try:
# 인증서 갱신
auth_token = cert_process.renew_cert(request=request)
auth_token = auth_token['data']
# body에서 ID, PW 추출
try:
body = json.loads(body)
except json.JSONDecodeError as e:
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='데이터 에러', msg_content='데이터 처리 장애가 발생했습니다. 요청정보를 정확히 입력했는지 확인해주세요.', data={})
user_id = body['user_id']
user_email = body['user_email']
# 아이디 패턴 검증
if not manage_user_pattern.validate_user_id(user_id=user_id):
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='아이디 확인', msg_content='아이디를 정확히 입력해주세요.', data={})
# 이메일 패턴 검증
if not manage_user_pattern.validate_user_email(user_email=user_email):
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='이메일 확인', msg_content='이메일 주소를 정확히 입력해주세요.', data={})
# 이메일 발송 3회 이상 됐는지 확인
send_email_cnt_result = await manage_user.select_send_email_cnt(user_email=user_email, db=db)
if send_email_cnt_result['result'] == 'OK':
send_email_cnt = send_email_cnt_result['send_email_cnt']
if send_email_cnt >= 3:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='이메일 발송 실패', msg_content='이메일 발송은 하루 3번까지 가능합니다.', data={})
# ID, EMAIL로 비밀번호 찾기 시도
find_password_result = await manage_user.find_password_by_id_email(user_id= user_id, user_email= user_email, db=db)
# 비밀번호 찾기 시도 결과에 따른 응답
if find_password_result['result'] == 'OK':
# user_seq 확인, 신규 PW&SOLT 생성하기
user_seq = (find_password_result['user_seq'])[0]
new_pw = generate_random_string(20)
new_solt = generate_random_string(10)
# DB에 신규로 생성된 비밀번호 업데이트 하기
update_pw_result = await manage_user.update_new_password(user_seq=user_seq, new_pw=new_pw, new_solt=new_solt, db=db)
if update_pw_result['result'] == 'OK':
# DB 업데이트가 성공하면 이메일로 유저 임시 비밀번호 발송해주기
send_email_result = email_user.email_find_password(user_email=user_email, info=new_pw)
if send_email_result['result'] == 'OK':
return await response.ok_res(auth_token=auth_token, data={"send_email_result": send_email_result}, db=db)
else:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='이메일 발송 실패', msg_content=send_email_result['msg'], data={})
else:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='비밀번호 업데이트 실패', msg_content='임시 비밀번호 업데이트에 실패했습니다.', data={})
else:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='비밀번호 찾기 실패', msg_content='회원정보를 다시 확인해주세요.', data={})
except Exception as e:
logger.error(f"request error. URL: /user/find/password\nerror message: {e}")
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='로그인 에러', msg_content='로그인 처리중 에러가 발생했습니다.', data={})
#==================================================================================================
# 회원가입
#==================================================================================================
@router.post("/signup")
async def signup(request: Request, body: bytes = Depends(get_body), db: Session = Depends(get_db)):
try:
# 인증서 갱신
auth_token = cert_process.renew_cert(request=request)
auth_token = auth_token['data']
# body에서 ID 추출
try:
body = json.loads(body)
except json.JSONDecodeError as e:
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='데이터 에러', msg_content='데이터 처리 장애가 발생했습니다. 요청정보를 정확히 입력했는지 확인해주세요.', data={})
2025-01-07 06:59:18 +00:00
logger.debug(f"body: {body}")
2024-12-06 05:12:28 +00:00
# user_id 유효성 검사
is_valid_user_id = await manage_user_pattern.is_valid_user_id_by_user_id(user_id=body['user_id'], db=db)
fail_msg = None if is_valid_user_id else '아이디가 이미 존재합니다.'
if fail_msg is not None:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='회원가입 실패', msg_content=fail_msg, data={})
# 필수 입력항목 패턴 확인
mandatory_list_verify_msg = manage_user_pattern.check_mandatory_insert_list(user_info=body)
if mandatory_list_verify_msg is not None:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='회원가입 실패', msg_content=mandatory_list_verify_msg, data={})
# 회원가입 DB 입력
body['user_pw_solt'] = generate_random_string(10) # 비밀번호 SOLT 생성
insert_new_user_result = await manage_user.insert_new_user(user_info=body, db=db)
if insert_new_user_result['result'] == 'OK':
return await response.ok_res(auth_token=auth_token, data={}, db=db)
else:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='회원가입 실패', msg_content='회원가입에 실패했습니다. 입력하신 정보를 다시 확인해주세요.', data={})
except Exception as e:
logger.error(f"request error. URL: /user/signup\nerror message: {e}")
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='로그인 에러', msg_content='로그인 처리중 에러가 발생했습니다.', data={})
#==================================================================================================
# 프로필 이미지 등록
#==================================================================================================
@router.post("/update/profile/img")
async def update_profile_img(request: Request, body: str = Form(...), file: UploadFile = File(...), db: Session = Depends(get_db)):
try:
# 인증서 갱신
auth_token = cert_process.renew_cert(request=request)
if auth_token['result'] == 'OK':
auth_token = auth_token['data']
elif auth_token['result'] == 'TOKEN_EXPIRED':
raise token_expired_return_process(auth_token['msg'])
else:
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='토큰 에러', msg_content='토큰 정보가 정확하지 않습니다.', data={})
# body에서 ID 추출
try:
body = json.loads(body)
except json.JSONDecodeError as e:
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='데이터 에러', msg_content='데이터 처리 장애가 발생했습니다. 요청정보를 정확히 입력했는지 확인해주세요.', data={})
# 파일 읽기
content = await file.read()
# 파일 크기 제한
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
file_size = len(content) # 파일 크기 확인
if file_size > MAX_FILE_SIZE:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='이미지 업로드 실패', msg_content='이미지 파일 크기는 5MB까지 가능합니다.', data={})
# body에 들어있는 이미지명 처리
user_seq_result = cert_process.get_user_seq_by_token(auth_token['token'])
if user_seq_result['result'] == 'OK':
# user_seq 추출
user_seq = user_seq_result['data']['user_seq']
# 현재 시간 가져오기 YYYYMMDDHHMMSSsss
now_time = get_now_time_str()
# 확장자 확인하고 텍스트 만들기
extension = os.path.splitext(file.filename)[1].lower()
if extension not in ['.jpg', 'jpeg', '.png', 'bmp']:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='이미지 업로드 실패', msg_content='지원하지 않는 확장자입니다.', data={})
# IMG_(user_seq)_현재시간 으로 파일명 만들기
file_name = f"IMG_{user_seq}_{now_time}{extension}"
# 파일 업로드 코드 시작
# 경로 존재하지 않으면 생성하기
UPLOAD_DIRECTORY = f"images/user/temp_dir/profile_img/"
if not os.path.exists(UPLOAD_DIRECTORY):
os.makedirs(UPLOAD_DIRECTORY)
# 파일 저장 경로 생성
file_path = os.path.join(UPLOAD_DIRECTORY, file_name)
# 파일을 서버에 저장
with open(file_path, "wb") as buffer:
buffer.write(content)
# 실제로 잘 저장됐는지 확인
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
2025-01-07 06:59:18 +00:00
# # DB 업데이트해주기
# update_profile_img_result = await manage_user.update_profile_img(user_seq=user_seq, profile_img=file_path.replace('images', ''), db=db)
# if update_profile_img_result['result'] == 'OK':
# return await response.ok_res(auth_token=auth_token, data={"img_src": '/'+file_path}, db=db)
# else:
# return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='이미지 업로드 실패', msg_content='이미지 업로드에 실패했습니다.', data={})
2024-12-06 05:12:28 +00:00
return await response.ok_res(auth_token=auth_token, data={"img_src": '/'+file_path}, db=db)
else:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='이미지 업로드 실패', msg_content='이미지 업로드에 실패했습니다.', data={})
else:
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='토큰 에러', msg_content='토큰 정보가 정확하지 않습니다.', data={})
except Exception as e:
logger.error(f"request error. URL: /user/update/profile/img\nerror message: {e}")
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='로그인 에러', msg_content='로그인 처리중 에러가 발생했습니다.', data={})
#==================================================================================================
# 내 정보 가져오기
#==================================================================================================
@router.post("/myinfo")
async def update_user_info(request: Request, body: bytes = Depends(get_body), db: Session = Depends(get_db)):
try:
# 인증서 갱신
auth_token = cert_process.renew_cert(request=request)
if auth_token['result'] == 'OK':
auth_token = auth_token['data']
elif auth_token['result'] == 'TOKEN_EXPIRED':
raise token_expired_return_process(auth_token['msg'])
else:
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='토큰 에러', msg_content='토큰 정보가 정확하지 않습니다.', data={})
# body에서 ID 추출
try:
body = json.loads(body)
except json.JSONDecodeError as e:
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='데이터 에러', msg_content='데이터 처리 장애가 발생했습니다. 요청정보를 정확히 입력했는지 확인해주세요.', data={})
user_seq_result = cert_process.get_user_seq_by_token(token=auth_token['token'])
if user_seq_result["result"] == 'OK':
user_seq = user_seq_result['data']['user_seq']
# 유저 정보 가져오기
user_data = await manage_user.get_my_info_by_user_seq(user_seq=user_seq, db=db)
if user_data['result'] == 'OK':
return await response.ok_res(auth_token=auth_token, data=user_data['data'], db=db)
logger.error(f"request error. URL: /user/myinfo\nerror message: {e}")
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='마이페이지 내정보 가져오기 에러', msg_content='마이페이지 데이터 조회중 에러가 발생했습니다.', data={})
except Exception as e:
logger.error(f"request error. URL: /user/myinfo\nerror message: {e}")
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='로그인 에러', msg_content='로그인 처리중 에러가 발생했습니다.', data={})
#==================================================================================================
# 유저 정보 수정하기
#==================================================================================================
@router.post("/update/user/info")
async def update_user_info(request: Request, body: bytes = Depends(get_body), db: Session = Depends(get_db)):
try:
# 인증서 갱신
auth_token = cert_process.renew_cert(request=request)
if auth_token['result'] == 'OK':
auth_token = auth_token['data']
elif auth_token['result'] == 'TOKEN_EXPIRED':
raise token_expired_return_process(auth_token['msg'])
else:
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='토큰 에러', msg_content='토큰 정보가 정확하지 않습니다.', data={})
# body에서 ID 추출
try:
body = json.loads(body)
except json.JSONDecodeError as e:
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='데이터 에러', msg_content='데이터 처리 장애가 발생했습니다. 요청정보를 정확히 입력했는지 확인해주세요.', data={})
user_seq_result = cert_process.get_user_seq_by_token(token=auth_token['token'])
if user_seq_result["result"] == 'OK':
user_seq = user_seq_result['data']['user_seq']
body['user_seq'] = user_seq
# 현재 비밀번호 일치하는지 확인
current_pw_correct_result = await manage_user.check_current_user_pw(user_seq=user_seq, user_pw=body['user_pw'], db=db)
if current_pw_correct_result['result'] == 'FAIL':
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='현재 비밀번호 인증 실패', msg_content='현재 비밀번호가 일치하지 않습니다.', data={})
# 입력된 값 패턴 검증
new_data_pattern_result_msg = manage_user_pattern.check_new_data(user_info=body)
if new_data_pattern_result_msg is not None:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='회원정보 수정 실패', msg_content=new_data_pattern_result_msg, data={})
# 유저 정보 가져오기
user_data = await manage_user.get_my_info_by_user_seq(user_seq=user_seq, db=db)
if user_data['result'] == 'OK':
user_data = user_data['data']
else:
return response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='정보 가져오기 에러', msg_content='유저 정보 가져오기에 실패했습니다.', data={})
# 변경된 항목들 확인하기
# user_pw 변경 됐는지 확인
if body['user_pw_change_yn'] == 'Y':
body['user_pw_solt'] = generate_random_string(10)
# nickname 변경 됐는지 확인
body['nickname'] = body['nickname'] if body['nickname'] != user_data['nickname'] else user_data['nickname']
# user_email 변경 됐는지 확인
body['user_email'] = body['user_email'] if body['user_email'] != user_data['user_email'] else user_data['user_email']
# department 변경 됐는지 확인
body['department'] = body['department'] if body['department'] != user_data['department'] else user_data['department']
# profile_img 변경 됐는지 확인
body['profile_img'] = body['profile_img'] if body['profile_img'] != user_data['profile_img'] else user_data['profile_img']
# profile_img 변경 됐는지 확인
body['introduce_myself'] = body['introduce_myself'] if body['introduce_myself'] != user_data['introduce_myself'] else user_data['introduce_myself']
# 변경된 정보에 대해서만 업데이트하기
update_result = await manage_user.update_user_info(user_info=body, db=db)
if update_result['result'] == 'OK':
return await response.ok_res(auth_token=auth_token, data={}, db=db)
else:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='회원정보 변경 실패', msg_content='회원정보 변경 실패했습니다. 관리자에게 문의해주세요.', data={})
else:
logger.error(f"request error. URL: /user/update/user/info\nerror message: {e}")
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='토큰 에러', msg_content='토큰 정보가 정확하지 않습니다.', data={})
except Exception as e:
logger.error(f"request error. URL: /user/update/user/info\nerror message: {e}")
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='로그인 에러', msg_content='로그인 처리중 에러가 발생했습니다.', data={})
#==================================================================================================
# 회원탈퇴
#==================================================================================================
@router.post("/withdraw/user")
async def withdraw_user(request: Request, body: bytes = Depends(get_body), db: Session = Depends(get_db)):
try:
# 인증서 갱신
auth_token = cert_process.renew_cert(request=request)
if auth_token['result'] == 'OK':
auth_token = auth_token['data']
elif auth_token['result'] == 'TOKEN_EXPIRED':
raise token_expired_return_process(auth_token['msg'])
else:
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='토큰 에러', msg_content='토큰 정보가 정확하지 않습니다.', data={})
# body에서 ID 추출
try:
body = json.loads(body)
except json.JSONDecodeError as e:
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='데이터 에러', msg_content='데이터 처리 장애가 발생했습니다. 요청정보를 정확히 입력했는지 확인해주세요.', data={})
user_pw = body['user_pw']
user_seq_result = cert_process.get_user_seq_by_token(token=auth_token['token'])
if user_seq_result["result"] == 'OK':
user_seq = user_seq_result['data']['user_seq']
# 현재 비밀번호 일치하는지 확인
current_pw_correct_result = await manage_user.check_current_user_pw(user_seq=user_seq, user_pw=user_pw, db=db)
if current_pw_correct_result['result'] == 'OK':
# 회원 탈퇴 처리
withdraw_result = await manage_user.user_withdraw(user_seq=user_seq, db=db)
if withdraw_result['result'] == 'OK':
return await response.ok_res(auth_token=auth_token, data={}, db=db)
else:
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='회원탈퇴 에러', msg_content='회원탈퇴 처리중 에러가 발생헀습니다. 관리자에게 문의해주세요.', data={})
else:
return await response.fail_res(auth_token=auth_token, auth_type='NOMAL', msg_title='현재 비밀번호 인증 실패', msg_content='현재 비밀번호가 일치하지 않습니다.', data={})
else:
logger.error(f"request error. URL: /user/withdraw/user\nerror message: {e}")
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='토큰 에러', msg_content='토큰 정보가 정확하지 않습니다.', data={})
except Exception as e:
logger.error(f"request error. URL: /user/withdraw/user\nerror message: {e}")
return await response.error_res(auth_token=auth_token, auth_type='NOMAL', msg_title='로그인 에러', msg_content='로그인 처리중 에러가 발생했습니다.', data={})
#==================================================================================================
# 필요한 함수
#==================================================================================================
# N자 대문자, 소문자, 숫자 조합 랜덤 발생기
def generate_random_string(length=10):
# 영문 대문자, 소문자, 숫자를 모두 포함한 문자 집합
characters = string.ascii_letters + string.digits
# 지정된 길이만큼 랜덤하게 선택하여 문자열 생성
random_string = ''.join(random.choice(characters) for _ in range(length))
return random_string
# N자 숫자 조합 랜덤 발생기
def generate_random_number(length=6):
characters = string.digits
# 지정된 길이만큼 랜덤하게 선택하여 문자열 생성
random_string = ''.join(random.choice(characters) for _ in range(length))
return random_string
def token_expired_return_process(fail_msg):
logger.error(f"request fail: {fail_msg}")
return HTTPException(
status_code=401,
detail=f"{fail_msg}"
)
# 현재 시간 STR로 가져오기
def get_now_time_str():
# 현재 시간 가져오기
now = datetime.datetime.now()
# YYYYMMDDHHMMSSsss 형식으로 포맷
formatted_time = now.strftime('%Y%m%d%H%M%S%f')[:17]
return formatted_time