ai_invest/backend/airflow/dags/plugins/utils/setup_utils.py

155 lines
7.0 KiB
Python
Raw Permalink Normal View History

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
import logging
import os
import time
from airflow.exceptions import AirflowException
from airflow.models import Variable
from airflow.providers.mysql.hooks.mysql import MySqlHook
from plugins.sql.hooks.mysql_hook import CommonHookMySQL
class SetupUtils:
# DB 연결 ID
DB_CONN_ID = "COLLECT_SERVER_DB"
# 나라장터 메인 URL
G2B_MAIN_URL = Variable.get("G2B_MAIN_URL", '')
# 크롬 드라이버 경로
CHROMEDRIVER_EXECUTABLE_PATH = Variable.get("CHROMEDRIVER_EXECUTABLE_PATH", '/usr/local/bin/chromedriver')
def __init__(self):
self.driver = None
self._all_cookies = {}
self.CommonHookMySQL = CommonHookMySQL()
logging.info(f"=============== MySQL 연결 시작 ================")
self.db_hook = MySqlHook(mysql_conn_id=self.DB_CONN_ID)
self.conn = self.db_hook.get_conn()
logging.info(f"===============================================")
# 소멸자
def __del__(self):
logging.info(f"=============== MySQL 연결 종료 ================")
self.conn.close()
logging.info(f"===============================================")
# Selenium 드라이버 초기화
def _setup_selenium(self):
logging.info("Selenium Chrome 드라이버 초기화 중...")
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36')
logging.info(f"사용할 ChromeDriver 경로: {self.CHROMEDRIVER_EXECUTABLE_PATH}")
if not os.path.exists(self.CHROMEDRIVER_EXECUTABLE_PATH):
error_msg = f"ChromeDriver 실행 파일이 지정된 경로에 없습니다: {self.CHROMEDRIVER_EXECUTABLE_PATH}. " \
"Airflow Variable 'CHROMEDRIVER_EXECUTABLE_PATH' 설정을 확인하거나, " \
"Airflow 워커 이미지에 해당 경로로 ChromeDriver가 설치되었는지 확인해주세요."
logging.error(error_msg)
raise FileNotFoundError(error_msg)
if not os.access(self.CHROMEDRIVER_EXECUTABLE_PATH, os.X_OK):
error_msg = f"ChromeDriver 실행 파일에 실행 권한이 없습니다: {self.CHROMEDRIVER_EXECUTABLE_PATH}. " \
"파일에 실행 권한(chmod +x)을 부여해주세요."
logging.error(error_msg)
raise PermissionError(error_msg)
try:
service = Service(executable_path=self.CHROMEDRIVER_EXECUTABLE_PATH)
self.driver = webdriver.Chrome(
service=service,
options=chrome_options
)
logging.info("Selenium Chrome 드라이버가 성공적으로 초기화되었습니다.")
except Exception as e:
logging.error(f"Selenium Chrome 드라이버 초기화 중 심각한 오류 발생: {e}")
logging.error(f"ChromeDriver 경로: {self.CHROMEDRIVER_EXECUTABLE_PATH}, 사용된 옵션: {chrome_options.arguments}")
raise AirflowException(f"Selenium 드라이버 초기화 실패: {e}")
# =======================================================================================================================================
# 크롬 드라이버 닫기
def _close_driver(self):
try:
if self.driver:
self.driver.quit()
logging.info("Selenium 드라이버가 안전하게 종료되었습니다.")
self.driver = None
else:
logging.warning("Selenium 드라이버가 이미 종료되었거나 초기화되지 않았습니다.")
except Exception as e:
logging.error(f"Selenium 드라이버 종료 중 오류 발생: {e}")
raise
# =======================================================================================================================================
# 쿠키 가져오기
def _get_cookies(self):
main_page_url = self.G2B_MAIN_URL
try:
self._setup_selenium()
logging.info(f"{main_page_url} 에 접속하여 쿠키를 가져오고 있습니다...")
self.driver.get(main_page_url)
time.sleep(5) # 페이지 로딩 및 JS 실행 시간 고려
cookies = self.driver.get_cookies()
# 모든 쿠키를 딕셔너리에 저장
self._all_cookies = {cookie['name']: cookie['value'] for cookie in cookies}
logging.info(f"Selenium을 사용하여 모든 쿠키를 가져왔습니다: {self._all_cookies}")
# 최소한 필요한 쿠키(JSESSIONID, XTVID 등)가 있는지 확인 (선택 사항이지만 권장)
if not self._all_cookies.get('JSESSIONID') or not self._all_cookies.get('XTVID'):
logging.warning("필요한 쿠키(JSESSIONID or XTVID)가 없을 수 있습니다.")
self._close_driver()
except Exception as e:
logging.error(f"Selenium을 사용하여 쿠키를 가져오는 중 오류가 발생했습니다: {e}")
raise
# =======================================================================================================================================
# 코드 데이터 DB UPSERT
def common_code_db_upsert(self, code_data):
try:
query = f"""
INSERT INTO common_code (group_id, code_key, code_value, update_dt)
VALUES ('{code_data['group_id']}', '{code_data['code_key']}', '{code_data['code_value']}', NOW())
ON DUPLICATE KEY UPDATE
group_id = '{code_data['group_id']}', code_key = '{code_data['code_key']}', code_value = '{code_data['code_value']}', update_dt = NOW();
"""
self.CommonHookMySQL.process_sql(self.conn, query)
except Exception as e:
logging.error(f"쿼리: {query}")
logging.error(f"코드 데이터 DB UPSERT 중 오류가 발생했습니다: {e}")
raise
# =======================================================================================================================================
# 코드 데이터 select
def common_code_db_select(self, group_id, code_key):
try:
query = f"""
SELECT code_value FROM common_code WHERE group_id = '{group_id}' AND code_key = '{code_key}'
"""
result = self.CommonHookMySQL.select_sql(self.conn, query)
return result[0][0]
except Exception as e:
logging.error(f"쿼리: {query}")
logging.error(f"코드 데이터 DB select 중 오류가 발생했습니다: {e}")
raise