Skip to content

ClickHouse 연결 지연(Lazy Connect) 방식 적용 및 환경 변수 검증 로직 추가 #113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 42 additions & 22 deletions llm_utils/connect_db.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""
이 모듈은 ClickHouse 데이터베이스에 연결하고 SQL 쿼리를 실행하여 결과를 pandas DataFrame으로 반환하는 기능을 제공합니다.
ClickHouse 데이터베이스에 연결하고 SQL 쿼리를 실행하여 결과를 pandas DataFrame으로 반환하는 모듈입니다.

구성 요소:
- 환경 변수에서 접속 정보를 불러와 ClickHouse 서버에 연결합니다.
- 환경 변수에서 접속 정보를 불러옵니다.
- ClickHouse에 지연 연결(lazy connection)을 수행합니다.
- SQL 쿼리를 실행하고 결과를 pandas DataFrame으로 반환합니다.
- 연결 실패 및 쿼리 오류에 대해 로깅을 통해 디버깅을 지원합니다.
"""
Expand All @@ -29,32 +30,51 @@ class ConnectDB:
"""
ClickHouse 데이터베이스에 연결하고 SQL 쿼리를 실행하는 클래스입니다.

환경 변수에서 접속 정보를 로드하여 ClickHouse 서버에 연결하며,
SQL 쿼리 실행 결과를 pandas DataFrame으로 반환합니다.
환경 변수에서 접속 정보를 읽어들이며, 실제 연결은 SQL 실행 시점에 수행됩니다.
연결된 클라이언트를 통해 SQL 쿼리를 실행하고, 결과를 pandas DataFrame으로 반환합니다.
"""

def __init__(self):
"""
ConnectDB 클래스의 인스턴스를 초기화합니다.

환경 변수에서 ClickHouse 접속 정보를 읽고, 즉시 서버에 연결을 시도합니다.
환경 변수에서 ClickHouse 접속 정보를 불러오며, 연결은 즉시 수행하지 않습니다.
"""

self.client: Optional[Client] = None
self.host = os.getenv("CLICKHOUSE_HOST")
self.dbname = os.getenv("CLICKHOUSE_DATABASE")
self.user = os.getenv("CLICKHOUSE_USER")
self.password = os.getenv("CLICKHOUSE_PASSWORD")
self.port = os.getenv("CLICKHOUSE_PORT")
self.is_connected: bool = False

self.connect_to_clickhouse()
self.host = self._get_env_or_raise("CLICKHOUSE_HOST")
self.dbname = self._get_env_or_raise("CLICKHOUSE_DATABASE")
self.user = self._get_env_or_raise("CLICKHOUSE_USER")
self.password = os.getenv("CLICKHOUSE_PASSWORD", "")
self.port = int(self._get_env_or_raise("CLICKHOUSE_PORT"))

def _get_env_or_raise(self, var_name: str) -> str:
"""
주어진 환경변수를 읽고, 값이 없으면 예외를 발생시킵니다.

Parameters:
var_name (str): 환경 변수 이름

Returns:
str: 환경 변수의 값

Raises:
ValueError: 값이 없을 경우
"""

value = os.getenv(var_name)
if not value:
logger.error("Environment variable '%s' is not set.", var_name)
raise ValueError(f"Environment variable '{var_name}' is not set.")
return value

def connect_to_clickhouse(self) -> None:
"""
ClickHouse 서버에 연결을 시도합니다.

연결에 성공하면 client 객체가 설정되며, 실패 시 예외를 발생시킵니다.
연결 상태는 로깅을 통해 출력됩니다.
연결 성공 시 client가 초기화되며, 실패 시 로그를 남기고 예외를 발생시킵니다.
"""

try:
Expand All @@ -65,30 +85,31 @@ def connect_to_clickhouse(self) -> None:
password=self.password,
database=self.dbname,
)
self.is_connected = True
logger.info("Successfully connected to ClickHouse.")
except Exception as e:
self.is_connected = False
logger.error("Failed to connect to ClickHouse: %s", e)
raise

def run_sql(self, sql: str) -> pd.DataFrame:
"""
SQL 쿼리를 실행하고 결과를 pandas DataFrame으로 반환합니다.
내부적으로 ClickHouse 클라이언트가 없으면 자동으로 재연결을 시도합니다.
주어진 SQL 쿼리를 실행하고 결과를 pandas DataFrame으로 반환합니다.

연결이 설정되지 않은 경우 자동으로 연결을 시도합니다.

Parameters:
sql (str): 실행할 SQL 쿼리 문자열

Returns:
pd.DataFrame: 쿼리 실행 결과를 담은 DataFrame 객체
pd.DataFrame: 쿼리 결과를 포함한 DataFrame

Raises:
Exception: SQL 실행 중 오류 발생 시 예외를 발생시킵니다.
Exception: SQL 실행 중 오류가 발생한 경우 예외를 발생시킵니다.
"""

if not self.client:
logger.warning(
"ClickHouse client is not initialized. Attempting to reconnect..."
)
if not self.is_connected or not self.client:
logger.warning("ClickHouse client not connected. Attempting to connect...")
self.connect_to_clickhouse()

try:
Expand All @@ -97,7 +118,6 @@ def run_sql(self, sql: str) -> pd.DataFrame:
column_names = [col[0] for col in columns]
df = pd.DataFrame(rows, columns=column_names)
return df

except Exception as e:
logger.exception("An error occurred while executing SQL: %s", e)
raise