backend/fastapi/app/router/router_api.py
2024-12-06 14:12:28 +09:00

141 lines
4.5 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Header, Body, status, Request
from fastapi.security import APIKeyHeader
from typing import Union, Optional, List
from typing_extensions import Annotated
import json
import logging
import datetime
from kafka import KafkaProducer
KST = datetime.timezone(datetime.timedelta(hours=9))
router = APIRouter(
prefix="/api",
tags=["api", "collect"],
responses={404: {"description": "Not found"}},
)
# -----------------------------------------------------------------------------------------------
# 비정상 요청 로그 기록
logger = logging.getLogger()
#2 logger의 level을 가장 낮은 수준인 DEBUG로 설정해둔다.
logger.setLevel(logging.ERROR)
#3 formatter 지정
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
#4 handler instance 생성
console = logging.StreamHandler()
file_handler = logging.FileHandler(filename="error_request.log")
#5 handler 별로 다른 level 설정
console.setLevel(logging.ERROR)
file_handler.setLevel(logging.ERROR)
#6 handler 출력 format 지정
console.setFormatter(formatter)
file_handler.setFormatter(formatter)
#7 logger에 handler 추가
logger.addHandler(console)
logger.addHandler(file_handler)
KAFKA_IP_DEV = "183.107.250.208:9092"
KAFKA_IP = "10.0.1.10:9092"
KAFKA_TOPIC = "boryeong-herit"
api_key_header = APIKeyHeader(name="X-HIT-TransactionId")
async def get_body(request: Request):
return await request.body()
async def get_header(header: str = Depends(api_key_header)):
return await header
# 헤더의 토큰 확인하는
async def api_token(token: str = Depends(api_key_header)):
if len(token) == 0 or token.count('.') != 3:
logger.error("The value of 'header[X-HIT-TransactionId]' is not determined by a specific rule.")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
token = token.split('.')
if len(token) != 4 or token[0] !='T' or len(token[3]) != 20:
logger.error("The value of 'header[X-HIT-TransactionId]' is not determined by a specific rule.")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
@router.post("/herit", dependencies=[Depends(api_token)])
async def col_raw_data_from_herit(
content_Type: Optional[str] = Header(None),
X_HIT_TransactionId: Optional[str] = Header(None),
content_Length: Optional[str] = Header(None),
body: bytes = Depends(get_body)
):
data = {
'content_Type': str(content_Type),
'X_HIT_TransactionId': str(X_HIT_TransactionId),
'content_Length': str(content_Length),
'body': str(body.decode("UTF-8"))
}
message_producer = MessageProducer(KAFKA_IP, KAFKA_TOPIC)
# message_producer_dev = MessageProducer(KAFKA_IP_DEV, KAFKA_TOPIC)
msg = {
"data_type": 'data_herit',
"data_herit": {
"content_type": data['content_Type'],
"x_hit_transactionid": data['X_HIT_TransactionId'],
"content_length": data['content_Length'],
"body": data['body']
}
}
res = message_producer.send_message(msg)
# res_dev = message_producer_dev.send_message(msg)
if res['status_code'] != status.HTTP_200_OK:
return {"reason": "SERVER ERROR"}
else :
return {"reason": "OK"}
# 테스트 URL
@router.post("/test")
async def col_raw_data_from_herit(
body: bytes = Depends(get_body)
):
return {
"reason": "OK2",
"body": body
}
# -----------------------------------------------------------------------------------------------
# Kafka로 전달하기위한 클래스
class MessageProducer:
broker = ""
topic = ""
producer = None
def __init__(self, broker, topic):
self.broker = broker
self.topic = topic
self.producer = KafkaProducer(bootstrap_servers=self.broker,
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
acks=0,
api_version=(2,5,0),
retries=3
)
def send_message(self, msg):
try:
future = self.producer.send(self.topic, msg)
self.producer.flush() # 비우는 작업
future.get(timeout=60)
return {'status_code': 200, 'error': None}
except Exception as e:
logging.error(f"kafka process error: {e}")
return e