from airflow.exceptions import AirflowException, AirflowSkipException, AirflowFailException from airflow.models import Variable from airflow.providers.mysql.hooks.mysql import MySqlHook class Test(): # ======================================================================================================================================= # 임시 처리용 테스트 def test(self, exec_dt_str: str): logging.info('============================== test start ==============================') try: # 기본 정보 exec_dt = CommonDatetime.make_datetime_kst_from_ts(exec_dt_str) logging.info(f"exec_dt(): {exec_dt}") except Exception as e: logging.error(f"test() Exception: {str(e)}") raise Exception(str(e)) finally : logging.info('============================== test finish ==============================')