1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
| def connect_to_openlookeng( self, host: str = None, catalog:str = None, schema:str = None, user: str = None, password: str = None, port: int = 8080, ):
try: from prestodb.dbapi import Connection from prestodb import exceptions as presto_exceptions from prestodb.auth import BasicAuthentication except ImportError: raise DependencyError( "You need to install required dependencies to execute this method," " run command: \npip install presto-python-client" )
if not host: host = os.getenv("HOST")
if not host: raise ImproperlyConfigured("Please set your openLooKeng host")
if not catalog: raise ImproperlyConfigured("Please set your catalog")
if not schema: raise ImproperlyConfigured("Please set your schema")
if not user: user = os.getenv("USER")
if not user: raise ImproperlyConfigured("Please set your openLooKeng user")
if not port: port = os.getenv("PORT")
if not port: raise ImproperlyConfigured("Please set your openLooKeng port")
conn = None
try: auth = BasicAuthentication(user, password) if password else None conn = Connection(host=host, user=user, catalog=catalog, schema=schema, port=port, auth=auth ) except Exception as e: raise ValidationError(f"Connection failed: {str(e)}")
def run_sql_openlookeng(sql: str) -> pd.DataFrame: """执行 SQL 查询并返回 DataFrame""" if not conn: raise ValidationError("Connection is not established")
cur = None try: cur = conn.cursor()
cur.execute(sql) results = cur.fetchall()
if not cur.description: return pd.DataFrame()
columns = [desc[0] for desc in cur.description] return pd.DataFrame(results, columns=columns) except presto_exceptions.DatabaseError as e: raise ValidationError(f"Query failed: {str(e)}") finally: if cur: cur.close()
def get_full_ddl() -> str: """获取 catalog.schema 的元数据描述""" ddl_scripts = []
try: tables_df = self.run_sql( f"SELECT table_name FROM information_schema.tables "+ f"WHERE table_schema = '{schema}'" ) print(tables_df) except Exception as e: raise ValidationError(f"Failed to get tables: {str(e)}")
if tables_df.empty: return ""
for table in tables_df['table_name']: try: desc_df = self.run_sql( f"DESCRIBE {catalog}.{schema}.\"{table}\"" ) print(desc_df) cols = [] for _, row in desc_df.iterrows(): col_name = row.get('Column', row.get('column')) col_type = row.get('Type', row.get('type')) extra = row.get('Extra', row.get('extra', ''))
col_def = f"{col_name} {col_type}" if extra: col_def += f" {extra}" cols.append(col_def)
ddl_scripts.append( f"CREATE TABLE {table} (\n " + ",\n ".join(cols) + "\n);" ) except Exception as e: print(f"Skipping table {table} due to error: {str(e)}") continue
return "\n\n".join(ddl_scripts)
def get_table_ddl(table_names: Union[str, List[str]]) -> str: """获取指定表的DDL语句""" ddl_scripts = []
if isinstance(table_names, str): tables = [table_names] else: tables = table_names
for table in tables: try: desc_df = self.run_sql( f'DESCRIBE {catalog}.{schema}."{table}"' ) cols = [] for _, row in desc_df.iterrows(): col_name = row.get('Column', row.get('column', '')) col_type = row.get('Type', row.get('type', '')) extra = row.get('Extra', row.get('extra', ''))
col_def = f"{col_name} {col_type}" if extra: col_def += f" {extra}" cols.append(col_def)
create_stmt = f"CREATE TABLE {table} (\n " + ",\n ".join( cols) + "\n);" ddl_scripts.append(create_stmt) except Exception as e: print(f"Skipping table {table} due to error: {str(e)}") continue
return "\n\n".join(ddl_scripts)
self.run_sql_is_set = True self.run_sql = run_sql_openlookeng self.get_full_ddl = get_full_ddl self.get_ddl = get_table_ddl
|