from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service import logging import os import time from airflow.exceptions import AirflowException from airflow.models import Variable from airflow.providers.mysql.hooks.mysql import MySqlHook from plugins.sql.hooks.mysql_hook import CommonHookMySQL class SetupUtils: # DB 연결 ID DB_CONN_ID = "COLLECT_SERVER_DB" # 나라장터 메인 URL G2B_MAIN_URL = Variable.get("G2B_MAIN_URL", '') # 크롬 드라이버 경로 CHROMEDRIVER_EXECUTABLE_PATH = Variable.get("CHROMEDRIVER_EXECUTABLE_PATH", '/usr/local/bin/chromedriver') def __init__(self): self.driver = None self._all_cookies = {} self.CommonHookMySQL = CommonHookMySQL() logging.info(f"=============== MySQL 연결 시작 ================") self.db_hook = MySqlHook(mysql_conn_id=self.DB_CONN_ID) self.conn = self.db_hook.get_conn() logging.info(f"===============================================") # 소멸자 def __del__(self): logging.info(f"=============== MySQL 연결 종료 ================") self.conn.close() logging.info(f"===============================================") # Selenium 드라이버 초기화 def _setup_selenium(self): logging.info("Selenium Chrome 드라이버 초기화 중...") chrome_options = Options() chrome_options.add_argument('--headless') chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--window-size=1920,1080') chrome_options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36') logging.info(f"사용할 ChromeDriver 경로: {self.CHROMEDRIVER_EXECUTABLE_PATH}") if not os.path.exists(self.CHROMEDRIVER_EXECUTABLE_PATH): error_msg = f"ChromeDriver 실행 파일이 지정된 경로에 없습니다: {self.CHROMEDRIVER_EXECUTABLE_PATH}. " \ "Airflow Variable 'CHROMEDRIVER_EXECUTABLE_PATH' 설정을 확인하거나, " \ "Airflow 워커 이미지에 해당 경로로 ChromeDriver가 설치되었는지 확인해주세요." logging.error(error_msg) raise FileNotFoundError(error_msg) if not os.access(self.CHROMEDRIVER_EXECUTABLE_PATH, os.X_OK): error_msg = f"ChromeDriver 실행 파일에 실행 권한이 없습니다: {self.CHROMEDRIVER_EXECUTABLE_PATH}. " \ "파일에 실행 권한(chmod +x)을 부여해주세요." logging.error(error_msg) raise PermissionError(error_msg) try: service = Service(executable_path=self.CHROMEDRIVER_EXECUTABLE_PATH) self.driver = webdriver.Chrome( service=service, options=chrome_options ) logging.info("Selenium Chrome 드라이버가 성공적으로 초기화되었습니다.") except Exception as e: logging.error(f"Selenium Chrome 드라이버 초기화 중 심각한 오류 발생: {e}") logging.error(f"ChromeDriver 경로: {self.CHROMEDRIVER_EXECUTABLE_PATH}, 사용된 옵션: {chrome_options.arguments}") raise AirflowException(f"Selenium 드라이버 초기화 실패: {e}") # ======================================================================================================================================= # 크롬 드라이버 닫기 def _close_driver(self): try: if self.driver: self.driver.quit() logging.info("Selenium 드라이버가 안전하게 종료되었습니다.") self.driver = None else: logging.warning("Selenium 드라이버가 이미 종료되었거나 초기화되지 않았습니다.") except Exception as e: logging.error(f"Selenium 드라이버 종료 중 오류 발생: {e}") raise # ======================================================================================================================================= # 쿠키 가져오기 def _get_cookies(self): main_page_url = self.G2B_MAIN_URL try: self._setup_selenium() logging.info(f"{main_page_url} 에 접속하여 쿠키를 가져오고 있습니다...") self.driver.get(main_page_url) time.sleep(5) # 페이지 로딩 및 JS 실행 시간 고려 cookies = self.driver.get_cookies() # 모든 쿠키를 딕셔너리에 저장 self._all_cookies = {cookie['name']: cookie['value'] for cookie in cookies} logging.info(f"Selenium을 사용하여 모든 쿠키를 가져왔습니다: {self._all_cookies}") # 최소한 필요한 쿠키(JSESSIONID, XTVID 등)가 있는지 확인 (선택 사항이지만 권장) if not self._all_cookies.get('JSESSIONID') or not self._all_cookies.get('XTVID'): logging.warning("필요한 쿠키(JSESSIONID or XTVID)가 없을 수 있습니다.") self._close_driver() except Exception as e: logging.error(f"Selenium을 사용하여 쿠키를 가져오는 중 오류가 발생했습니다: {e}") raise # ======================================================================================================================================= # 코드 데이터 DB UPSERT def common_code_db_upsert(self, code_data): try: query = f""" INSERT INTO common_code (group_id, code_key, code_value, update_dt) VALUES ('{code_data['group_id']}', '{code_data['code_key']}', '{code_data['code_value']}', NOW()) ON DUPLICATE KEY UPDATE group_id = '{code_data['group_id']}', code_key = '{code_data['code_key']}', code_value = '{code_data['code_value']}', update_dt = NOW(); """ self.CommonHookMySQL.process_sql(self.conn, query) except Exception as e: logging.error(f"쿼리: {query}") logging.error(f"코드 데이터 DB UPSERT 중 오류가 발생했습니다: {e}") raise # ======================================================================================================================================= # 코드 데이터 select def common_code_db_select(self, group_id, code_key): try: query = f""" SELECT code_value FROM common_code WHERE group_id = '{group_id}' AND code_key = '{code_key}' """ result = self.CommonHookMySQL.select_sql(self.conn, query) return result[0][0] except Exception as e: logging.error(f"쿼리: {query}") logging.error(f"코드 데이터 DB select 중 오류가 발생했습니다: {e}") raise