Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions airflow/providers/hbase/example_dags/example_hbase_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
delete_table_cleanup = HBaseDeleteTableOperator(
task_id="delete_table_cleanup",
table_name="test_table_backup",
hbase_conn_id="hbase_kerberos",
hbase_conn_id="hbase_thrift2",
dag=dag,
)

Expand All @@ -79,7 +79,7 @@
task_id="create_table",
table_name="test_table_backup",
families={"cf1": {}, "cf2": {}},
hbase_conn_id="hbase_kerberos",
hbase_conn_id="hbase_thrift2",
dag=dag,
)

Expand All @@ -89,7 +89,7 @@
table_name="test_table_backup",
row_key="test_row",
data={"cf1:col1": "test_value"},
hbase_conn_id="hbase_kerberos",
hbase_conn_id="hbase_thrift2",
dag=dag,
)

Expand All @@ -99,15 +99,15 @@
action=BackupSetAction.ADD,
backup_set_name="test_backup_set",
tables=["test_table_backup"],
hbase_conn_id="hbase_kerberos",
hbase_conn_id="hbase_thrift2",
dag=dag,
)

# List backup sets
list_backup_sets = HBaseBackupSetOperator(
task_id="list_backup_sets",
action=BackupSetAction.LIST,
hbase_conn_id="hbase_kerberos",
hbase_conn_id="hbase_thrift2",
dag=dag,
)

Expand All @@ -118,15 +118,15 @@
backup_path="/hbase/backup",
backup_set_name="test_backup_set",
workers=1,
hbase_conn_id="hbase_kerberos",
hbase_conn_id="hbase_thrift2",
dag=dag,
)

# Get backup history
get_backup_history = HBaseBackupHistoryOperator(
task_id="get_backup_history",
backup_set_name="test_backup_set",
hbase_conn_id="hbase_kerberos",
hbase_conn_id="hbase_thrift2",
dag=dag,
)

Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/hbase/example_dags/example_hbase_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def verify_restored_data(**context):
"""Verify that data was restored correctly."""
from airflow.providers.hbase.hooks.hbase import HBaseHook

hook = HBaseHook(hbase_conn_id="hbase_kerberos")
hook = HBaseHook(hbase_conn_id="hbase_thrift2")
table_name = "test_table_backup"

# Get restore output from previous task
Expand Down Expand Up @@ -128,7 +128,7 @@ def verify_restored_data(**context):
task_id="delete_table",
table_name="test_table_backup",
if_not_exists="ignore", # Don't fail if table doesn't exist
hbase_conn_id="hbase_kerberos",
hbase_conn_id="hbase_thrift2",
dag=dag,
)

Expand All @@ -139,7 +139,7 @@ def verify_restored_data(**context):
backup_id="backup_1769686282917", # Substitute with a real backup id
tables=["test_table_backup"],
overwrite=True,
hbase_conn_id="hbase_kerberos",
hbase_conn_id="hbase_thrift2",
do_xcom_push=True, # Push result to XCom for debugging
dag=dag,
)
Expand Down
125 changes: 103 additions & 22 deletions airflow/providers/hbase/hooks/hbase_administration.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@

from __future__ import annotations

import logging
import re
import subprocess
from typing import TYPE_CHECKING

from airflow.hooks.base import BaseHook

if TYPE_CHECKING:
from airflow.models import Connection

logger = logging.getLogger(__name__)


class HBaseAdministrationHook(BaseHook):
"""
Expand All @@ -42,9 +47,10 @@ class HBaseAdministrationHook(BaseHook):
conn_type = "hbase"
hook_name = "HBase Administration"

def __init__(self, hbase_conn_id: str = default_conn_name) -> None:
def __init__(self, hbase_conn_id: str = default_conn_name, hbase_cmd: str = "hbase") -> None:
super().__init__()
self.hbase_conn_id = hbase_conn_id
self.hbase_cmd = hbase_cmd
self._connection: Connection | None = None

def get_conn(self) -> Connection:
Expand All @@ -53,6 +59,53 @@ def get_conn(self) -> Connection:
self._connection = self.get_connection(self.hbase_conn_id)
return self._connection

def _execute_hbase_command(self, command: str) -> str:
"""Execute HBase CLI command.

:param command: HBase command to execute (e.g., "backup create full /backup -t table1")
:return: Command output
"""
conn = self.get_conn()

# Get JAVA_HOME from connection extra if provided
java_home = None
if conn.extra_dejson:
java_home = conn.extra_dejson.get('java_home')
hbase_home = conn.extra_dejson.get('hbase_home')
if hbase_home:
self.hbase_cmd = f"{hbase_home}/bin/hbase"

full_command = f"{self.hbase_cmd} {command}"

env = None
if java_home:
import os
env = os.environ.copy()
env['JAVA_HOME'] = java_home

logger.info(f"Executing HBase command: {self._mask_sensitive(full_command)}")

try:
result = subprocess.run(
full_command,
shell=True,
capture_output=True,
text=True,
check=True,
env=env
)
logger.info(f"Command completed successfully")
return result.stdout
except subprocess.CalledProcessError as e:
logger.error(f"Command failed with exit code {e.returncode}: {e.stderr}")
raise RuntimeError(f"HBase command failed: {e.stderr}") from e

def _mask_sensitive(self, text: str) -> str:
"""Mask sensitive information in logs."""
# Mask potential paths that might contain sensitive info
text = re.sub(r'(/[\w/.-]*\.keytab)', '***KEYTAB***', text)
return text

def create_backup_set(self, backup_set_name: str, tables: list[str]) -> str:
"""
Create backup set.
Expand All @@ -61,19 +114,18 @@ def create_backup_set(self, backup_set_name: str, tables: list[str]) -> str:
:param tables: List of tables to include in backup set.
:return: Result message.
"""
raise NotImplementedError(
"Backup operations will be implemented using HBase Admin client API in future release"
)
tables_str = ",".join(tables)
command = f"backup set add {backup_set_name} {tables_str}"
return self._execute_hbase_command(command)

def list_backup_sets(self) -> str:
"""
List all backup sets.

:return: List of backup sets.
"""
raise NotImplementedError(
"Backup operations will be implemented using HBase Admin client API in future release"
)
command = "backup set list"
return self._execute_hbase_command(command)

def create_full_backup(
self,
Expand All @@ -91,9 +143,20 @@ def create_full_backup(
:param workers: Number of workers for backup operation.
:return: Backup ID.
"""
raise NotImplementedError(
"Backup operations will be implemented using HBase Admin client API in future release"
)
command = f"backup create full {backup_root}"

if backup_set_name:
command += f" -s {backup_set_name}"
elif tables:
tables_str = ",".join(tables)
command += f" -t {tables_str}"
else:
raise ValueError("Either backup_set_name or tables must be provided")

if workers:
command += f" -w {workers}"

return self._execute_hbase_command(command)

def create_incremental_backup(
self,
Expand All @@ -111,9 +174,20 @@ def create_incremental_backup(
:param workers: Number of workers for backup operation.
:return: Backup ID.
"""
raise NotImplementedError(
"Backup operations will be implemented using HBase Admin client API in future release"
)
command = f"backup create incremental {backup_root}"

if backup_set_name:
command += f" -s {backup_set_name}"
elif tables:
tables_str = ",".join(tables)
command += f" -t {tables_str}"
else:
raise ValueError("Either backup_set_name or tables must be provided")

if workers:
command += f" -w {workers}"

return self._execute_hbase_command(command)

def get_backup_history(self, backup_set_name: str | None = None) -> str:
"""
Expand All @@ -122,9 +196,10 @@ def get_backup_history(self, backup_set_name: str | None = None) -> str:
:param backup_set_name: Name of backup set (optional).
:return: Backup history.
"""
raise NotImplementedError(
"Backup operations will be implemented using HBase Admin client API in future release"
)
command = "backup history"
if backup_set_name:
command += f" -s {backup_set_name}"
return self._execute_hbase_command(command)

def describe_backup(self, backup_id: str) -> str:
"""
Expand All @@ -133,9 +208,8 @@ def describe_backup(self, backup_id: str) -> str:
:param backup_id: Backup ID.
:return: Backup description.
"""
raise NotImplementedError(
"Backup operations will be implemented using HBase Admin client API in future release"
)
command = f"backup describe {backup_id}"
return self._execute_hbase_command(command)

def restore_backup(
self,
Expand All @@ -153,6 +227,13 @@ def restore_backup(
:param overwrite: Whether to overwrite existing tables.
:return: Result message.
"""
raise NotImplementedError(
"Backup operations will be implemented using HBase Admin client API in future release"
)
command = f"restore {backup_root} {backup_id}"

if tables:
tables_str = ",".join(tables)
command += f" -t {tables_str}"

if overwrite:
command += " -o"

return self._execute_hbase_command(command)
Loading