ai_invest/backend/airflow/dags/test_group/tasks/test.py

21 lines
952 B
Python
Raw Normal View History

from airflow.exceptions import AirflowException, AirflowSkipException, AirflowFailException
from airflow.models import Variable
from airflow.providers.mysql.hooks.mysql import MySqlHook
class Test():
# =======================================================================================================================================
# 임시 처리용 테스트
def test(self, exec_dt_str: str):
logging.info('============================== test start ==============================')
try:
# 기본 정보
exec_dt = CommonDatetime.make_datetime_kst_from_ts(exec_dt_str)
logging.info(f"exec_dt(): {exec_dt}")
except Exception as e:
logging.error(f"test() Exception: {str(e)}")
raise Exception(str(e))
finally :
logging.info('============================== test finish ==============================')