141 lines
4.5 KiB
Python
141 lines
4.5 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Header, Body, status, Request
|
|
from fastapi.security import APIKeyHeader
|
|
from typing import Union, Optional, List
|
|
from typing_extensions import Annotated
|
|
import json
|
|
import logging
|
|
import datetime
|
|
from kafka import KafkaProducer
|
|
|
|
KST = datetime.timezone(datetime.timedelta(hours=9))
|
|
|
|
router = APIRouter(
|
|
prefix="/api",
|
|
tags=["api", "collect"],
|
|
responses={404: {"description": "Not found"}},
|
|
)
|
|
|
|
# -----------------------------------------------------------------------------------------------
|
|
# 비정상 요청 로그 기록
|
|
logger = logging.getLogger()
|
|
|
|
#2 logger의 level을 가장 낮은 수준인 DEBUG로 설정해둔다.
|
|
logger.setLevel(logging.ERROR)
|
|
|
|
#3 formatter 지정
|
|
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
|
|
|
#4 handler instance 생성
|
|
console = logging.StreamHandler()
|
|
file_handler = logging.FileHandler(filename="error_request.log")
|
|
|
|
#5 handler 별로 다른 level 설정
|
|
console.setLevel(logging.ERROR)
|
|
file_handler.setLevel(logging.ERROR)
|
|
|
|
#6 handler 출력 format 지정
|
|
console.setFormatter(formatter)
|
|
file_handler.setFormatter(formatter)
|
|
|
|
#7 logger에 handler 추가
|
|
logger.addHandler(console)
|
|
logger.addHandler(file_handler)
|
|
|
|
|
|
KAFKA_IP_DEV = "183.107.250.208:9092"
|
|
KAFKA_IP = "10.0.1.10:9092"
|
|
KAFKA_TOPIC = "boryeong-herit"
|
|
|
|
api_key_header = APIKeyHeader(name="X-HIT-TransactionId")
|
|
|
|
async def get_body(request: Request):
|
|
return await request.body()
|
|
|
|
|
|
async def get_header(header: str = Depends(api_key_header)):
|
|
return await header
|
|
|
|
|
|
# 헤더의 토큰 확인하는
|
|
async def api_token(token: str = Depends(api_key_header)):
|
|
if len(token) == 0 or token.count('.') != 3:
|
|
logger.error("The value of 'header[X-HIT-TransactionId]' is not determined by a specific rule.")
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
|
token = token.split('.')
|
|
if len(token) != 4 or token[0] !='T' or len(token[3]) != 20:
|
|
logger.error("The value of 'header[X-HIT-TransactionId]' is not determined by a specific rule.")
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
|
|
|
|
|
@router.post("/herit", dependencies=[Depends(api_token)])
|
|
async def col_raw_data_from_herit(
|
|
content_Type: Optional[str] = Header(None),
|
|
X_HIT_TransactionId: Optional[str] = Header(None),
|
|
content_Length: Optional[str] = Header(None),
|
|
body: bytes = Depends(get_body)
|
|
):
|
|
|
|
data = {
|
|
'content_Type': str(content_Type),
|
|
'X_HIT_TransactionId': str(X_HIT_TransactionId),
|
|
'content_Length': str(content_Length),
|
|
'body': str(body.decode("UTF-8"))
|
|
}
|
|
message_producer = MessageProducer(KAFKA_IP, KAFKA_TOPIC)
|
|
# message_producer_dev = MessageProducer(KAFKA_IP_DEV, KAFKA_TOPIC)
|
|
|
|
msg = {
|
|
"data_type": 'data_herit',
|
|
"data_herit": {
|
|
"content_type": data['content_Type'],
|
|
"x_hit_transactionid": data['X_HIT_TransactionId'],
|
|
"content_length": data['content_Length'],
|
|
"body": data['body']
|
|
}
|
|
}
|
|
res = message_producer.send_message(msg)
|
|
# res_dev = message_producer_dev.send_message(msg)
|
|
|
|
if res['status_code'] != status.HTTP_200_OK:
|
|
return {"reason": "SERVER ERROR"}
|
|
else :
|
|
return {"reason": "OK"}
|
|
|
|
|
|
# 테스트 URL
|
|
@router.post("/test")
|
|
async def col_raw_data_from_herit(
|
|
body: bytes = Depends(get_body)
|
|
):
|
|
return {
|
|
"reason": "OK2",
|
|
"body": body
|
|
}
|
|
|
|
# -----------------------------------------------------------------------------------------------
|
|
# Kafka로 전달하기위한 클래스
|
|
class MessageProducer:
|
|
broker = ""
|
|
topic = ""
|
|
producer = None
|
|
|
|
def __init__(self, broker, topic):
|
|
self.broker = broker
|
|
self.topic = topic
|
|
self.producer = KafkaProducer(bootstrap_servers=self.broker,
|
|
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
|
|
acks=0,
|
|
api_version=(2,5,0),
|
|
retries=3
|
|
)
|
|
|
|
def send_message(self, msg):
|
|
try:
|
|
future = self.producer.send(self.topic, msg)
|
|
self.producer.flush() # 비우는 작업
|
|
future.get(timeout=60)
|
|
return {'status_code': 200, 'error': None}
|
|
except Exception as e:
|
|
logging.error(f"kafka process error: {e}")
|
|
return e
|