from fastapi import APIRouter, Depends, HTTPException, Header, Body, status, Request from fastapi.security import APIKeyHeader from typing import Union, Optional, List from typing_extensions import Annotated import json import logging import datetime from kafka import KafkaProducer KST = datetime.timezone(datetime.timedelta(hours=9)) router = APIRouter( prefix="/api", tags=["api", "collect"], responses={404: {"description": "Not found"}}, ) # ----------------------------------------------------------------------------------------------- # 비정상 요청 로그 기록 logger = logging.getLogger() #2 logger의 level을 가장 낮은 수준인 DEBUG로 설정해둔다. logger.setLevel(logging.ERROR) #3 formatter 지정 formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") #4 handler instance 생성 console = logging.StreamHandler() file_handler = logging.FileHandler(filename="error_request.log") #5 handler 별로 다른 level 설정 console.setLevel(logging.ERROR) file_handler.setLevel(logging.ERROR) #6 handler 출력 format 지정 console.setFormatter(formatter) file_handler.setFormatter(formatter) #7 logger에 handler 추가 logger.addHandler(console) logger.addHandler(file_handler) KAFKA_IP_DEV = "183.107.250.208:9092" KAFKA_IP = "10.0.1.10:9092" KAFKA_TOPIC = "boryeong-herit" api_key_header = APIKeyHeader(name="X-HIT-TransactionId") async def get_body(request: Request): return await request.body() async def get_header(header: str = Depends(api_key_header)): return await header # 헤더의 토큰 확인하는 async def api_token(token: str = Depends(api_key_header)): if len(token) == 0 or token.count('.') != 3: logger.error("The value of 'header[X-HIT-TransactionId]' is not determined by a specific rule.") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) token = token.split('.') if len(token) != 4 or token[0] !='T' or len(token[3]) != 20: logger.error("The value of 'header[X-HIT-TransactionId]' is not determined by a specific rule.") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) @router.post("/herit", dependencies=[Depends(api_token)]) async def col_raw_data_from_herit( content_Type: Optional[str] = Header(None), X_HIT_TransactionId: Optional[str] = Header(None), content_Length: Optional[str] = Header(None), body: bytes = Depends(get_body) ): data = { 'content_Type': str(content_Type), 'X_HIT_TransactionId': str(X_HIT_TransactionId), 'content_Length': str(content_Length), 'body': str(body.decode("UTF-8")) } message_producer = MessageProducer(KAFKA_IP, KAFKA_TOPIC) # message_producer_dev = MessageProducer(KAFKA_IP_DEV, KAFKA_TOPIC) msg = { "data_type": 'data_herit', "data_herit": { "content_type": data['content_Type'], "x_hit_transactionid": data['X_HIT_TransactionId'], "content_length": data['content_Length'], "body": data['body'] } } res = message_producer.send_message(msg) # res_dev = message_producer_dev.send_message(msg) if res['status_code'] != status.HTTP_200_OK: return {"reason": "SERVER ERROR"} else : return {"reason": "OK"} # 테스트 URL @router.post("/test") async def col_raw_data_from_herit( body: bytes = Depends(get_body) ): return { "reason": "OK2", "body": body } # ----------------------------------------------------------------------------------------------- # Kafka로 전달하기위한 클래스 class MessageProducer: broker = "" topic = "" producer = None def __init__(self, broker, topic): self.broker = broker self.topic = topic self.producer = KafkaProducer(bootstrap_servers=self.broker, value_serializer=lambda x: json.dumps(x).encode('utf-8'), acks=0, api_version=(2,5,0), retries=3 ) def send_message(self, msg): try: future = self.producer.send(self.topic, msg) self.producer.flush() # 비우는 작업 future.get(timeout=60) return {'status_code': 200, 'error': None} except Exception as e: logging.error(f"kafka process error: {e}") return e