backend/fastapi/app/process/certification/cert_process.py
2024-12-06 14:12:28 +09:00

162 lines
4.4 KiB
Python

from fastapi import Request
import json
from process.logger import logger
import jwt
import datetime
import time
from common.config import SECRET_KEY
async def cert_process_by_token(request: Request):
# 헤더에서 토큰 찾기
auth_token = request.headers.get('auth-token')
# 헤더에 토큰이 없으면 인증 거절 처리
if auth_token == None:
return {
"result": "FAIL",
"token": ""
}
else:
# 헤더에 토큰이 있으므로 DB에서 유효한 토큰 값인지 검증하기
1
def create_jwt(user_seq: int, period: int):
# 현재 시간
now = int(time.time())
# JWT의 페이로드 (클레임)
payload = {
"user_seq": user_seq,
"exp": now + period, # 만료 시간
"iat": now, # 토큰 발행 시간
}
# JWT 생성 (HS256 알고리즘 사용)
token = jwt.encode(payload, SECRET_KEY, algorithm="HS256")
return token
def decode_jwt(token: str):
try:
# 토큰 디코드
decoded_payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return {
"result": "OK",
"data": decoded_payload
}
except jwt.ExpiredSignatureError:
return {
"result": "TOKEN_EXPIRED",
"error": "Token has expired"
}
except jwt.InvalidTokenError:
return {
"result": "FAIL",
"error": "Invalid token"
}
# 인증정보 JWT토큰화
def create_user_cert_seq_jwt(user_cert_seq: int, period: int):
# 현재 시간
now = int(time.time())
# JWT의 페이로드 (클레임)
payload = {
"user_cert_seq": user_cert_seq,
"exp": now + period, # 만료 시간
"iat": now, # 토큰 발행 시간
}
# JWT 생성 (HS256 알고리즘 사용)
token = jwt.encode(payload, SECRET_KEY, algorithm="HS256")
return token
# 인증서 갱신
def renew_cert(request: Request):
try:
# 헤더에서 토큰 찾기
auth_token = request.headers.get('auth-token')
decoded_payload = decode_jwt(auth_token)
if decoded_payload['result'] == 'OK':
decoded_payload = decoded_payload['data']
user_seq = decoded_payload['user_seq']
exp = decoded_payload['exp']
iat = decoded_payload['iat']
now = int(time.time())
# 시간비교 유효한 토큰인지 확인
if iat <= now and now <= exp:
return {
"result": "OK",
"data": make_auth_data(create_jwt(user_seq=user_seq, period=3600), 'Y')
}
else:
return {
"result": "TOKEN_EXPIRED",
"data": make_auth_data('', 'N'),
"msg": '인증서 토큰이 만료 되었습니다.'
}
elif decoded_payload['result'] == 'TOKEN_EXPIRED':
return {
"result": "TOKEN_EXPIRED",
"data": make_auth_data('', 'N'),
"msg": '인증서 토큰이 만료 되었습니다.'
}
else:
logger.error(f"token renew error. message: {e}")
return {
"result": "FAIL",
"data": make_auth_data('', 'N'),
"msg": decoded_payload['error']
}
except Exception as e:
logger.error(f"token renew error. message: {e}")
return {
"result": "FAIL",
"data": make_auth_data('', 'N'),
"msg": '인증서 토큰 갱신에 실패했습니다.'
}
# 응답 auth 함수화
def make_auth_data(token, tf):
return {
"renew_yn": tf,
"token": token
}
# 토큰으로 user_seq 가져오기
def get_user_seq_by_token(token: str):
try:
payload = decode_jwt(token)
if payload['result'] == 'OK':
user_seq = payload['data']['user_seq']
return {
"result": "OK",
"data":{
"user_seq": user_seq
}
}
else:
return {
"result": "FAIL",
"msg": payload['error']
}
except Exception as e:
logger.error(f"token error. message: {e}")
return {
"result": "FAIL",
"msg": 'get_user_seq_by_token error'
}