Vanna相关(一)

First Post:

Last Update:

Vanna

简介

工作流程

工作流程图

Vanna 主要围绕数据库与大模型进行工作

核心工作思路如下:

一、获得提问语句 并 根据提问语句 在向量数据库中寻找有较高相似度的 DDL,DOC,SQL
语句。

二、将相似文本、提问内容进行一定前置文本加工作为 大模型 prompt 进行提问。鉴于该
Vanna项目的前端内容修改难度很大,本身不支持处理后端流式数据返回,所以大模型
项目内的大模型接口正常为完整文本返回

三、将返回的内容提取 SQL 语句 返回并以该语句查询数据库,返回表格结果

四、图表生成流程会将前言的指令信息与表格结果等作为 prompt,向大模型提问,获得图
表表示的代码,大模型返回内容同样需要相应方法处理后得到表示代码

五、当用户反馈正确的时候,会将当前会话的信息,对应的 SQL 存入先前的向量数据库

当前输出结果展示

工作图
工作图
工作图
工作图

功能相关

目前更新内容

新增 openLooKeng 数据库连接

/src/vanna/base/base.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
 def connect_to_openlookeng(
self,
host: str = None,
catalog:str = None,
schema:str = None,
user: str = None,
password: str = None,
port: int = 8080,
):

try:
from prestodb.dbapi import Connection
from prestodb import exceptions as presto_exceptions
from prestodb.auth import BasicAuthentication # 如果需要认证
except ImportError:
raise DependencyError(
"You need to install required dependencies to execute this method,"
" run command: \npip install presto-python-client"
)

if not host:
host = os.getenv("HOST")

if not host:
raise ImproperlyConfigured("Please set your openLooKeng host")

if not catalog:
raise ImproperlyConfigured("Please set your catalog")

if not schema:
raise ImproperlyConfigured("Please set your schema")

if not user:
user = os.getenv("USER")

if not user:
raise ImproperlyConfigured("Please set your openLooKeng user")

# if not password:
# password = os.getenv("PASSWORD")
#
# if not password:
# raise ImproperlyConfigured("Please set your openLooKeng password")

if not port:
port = os.getenv("PORT")

if not port:
raise ImproperlyConfigured("Please set your openLooKeng port")

conn = None

try:
auth = BasicAuthentication(user, password) if password else None
conn = Connection(host=host,
user=user,
catalog=catalog,
schema=schema,
port=port,
auth=auth # 仅在需要认证时传递
)
except Exception as e:
raise ValidationError(f"Connection failed: {str(e)}")

def run_sql_openlookeng(sql: str) -> pd.DataFrame:
"""执行 SQL 查询并返回 DataFrame"""
if not conn:
raise ValidationError("Connection is not established")

cur = None
try:
cur = conn.cursor()

cur.execute(sql)
results = cur.fetchall()

# 处理空结果集
if not cur.description:
return pd.DataFrame()

# 获取列名(注意列名大小写)
columns = [desc[0] for desc in cur.description]
return pd.DataFrame(results, columns=columns)
except presto_exceptions.DatabaseError as e:
raise ValidationError(f"Query failed: {str(e)}")
finally:
if cur:
cur.close() # 显式关闭游标

def get_full_ddl() -> str:
"""获取 catalog.schema 的元数据描述"""
ddl_scripts = []

# 获取表列表(使用参数化 schema)
try:
tables_df = self.run_sql(
f"SELECT table_name FROM information_schema.tables "+
f"WHERE table_schema = '{schema}'"
)
print(tables_df)
except Exception as e:
raise ValidationError(f"Failed to get tables: {str(e)}")

# 处理可能的空结果
if tables_df.empty:
return ""

# 生成表结构
for table in tables_df['table_name']:
try:
desc_df = self.run_sql(
f"DESCRIBE {catalog}.{schema}.\"{table}\"" # 处理特殊表名
)
print(desc_df)
cols = []
for _, row in desc_df.iterrows():
# 注意列名可能大小写敏感(调整为小写)
col_name = row.get('Column', row.get('column'))
col_type = row.get('Type', row.get('type'))
extra = row.get('Extra', row.get('extra', ''))

col_def = f"{col_name} {col_type}"
if extra:
col_def += f" {extra}"
cols.append(col_def)

ddl_scripts.append(
f"CREATE TABLE {table} (\n " + ",\n ".join(cols) + "\n);"
)
except Exception as e:
print(f"Skipping table {table} due to error: {str(e)}")
continue

return "\n\n".join(ddl_scripts)

def get_table_ddl(table_names: Union[str, List[str]]) -> str:
"""获取指定表的DDL语句"""
ddl_scripts = []

# 统一处理表名为列表形式
if isinstance(table_names, str):
tables = [table_names]
else:
tables = table_names

for table in tables:
try:
# 使用类实例中的catalog和schema(假设已定义)
desc_df = self.run_sql(
f'DESCRIBE {catalog}.{schema}."{table}"' # 处理特殊表名
)
cols = []
for _, row in desc_df.iterrows():
# 处理可能的大小写敏感列名
col_name = row.get('Column', row.get('column', ''))
col_type = row.get('Type', row.get('type', ''))
extra = row.get('Extra', row.get('extra', ''))

# 构建列定义
col_def = f"{col_name} {col_type}"
if extra:
col_def += f" {extra}"
cols.append(col_def)

# 生成CREATE TABLE语句
create_stmt = f"CREATE TABLE {table} (\n " + ",\n ".join(
cols) + "\n);"
ddl_scripts.append(create_stmt)
except Exception as e:
print(f"Skipping table {table} due to error: {str(e)}")
continue

return "\n\n".join(ddl_scripts)

self.run_sql_is_set = True
self.run_sql = run_sql_openlookeng
self.get_full_ddl = get_full_ddl
self.get_ddl = get_table_ddl

新增 大模型流式传输 接口 (以智普 AI 的 GLM-4-PLUS 接口)

/src/vanna/ZhipuAI/ZhipuAI_Chat.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def submit_prompt(
self, prompt, max_tokens=500, temperature=0.7, top_p=0.7, stop=None, stream_set = False,**kwargs
):
if prompt is None:
raise Exception("Prompt is None")

if len(prompt) == 0:
raise Exception("Prompt is empty")
client = ZhipuAI(api_key=self.api_key) # 填写您自己的APIKey
response = client.chat.completions.create(
model="glm-4", # 填写需要调用的模型名称
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stop=stop,
messages=prompt,
stream=stream_set
)
# print(prompt)
if stream_set:
# 返回生成器
for chunk in response:
content = chunk.choices[0].delta.content
if content:
yield f"{content}"
else:
# 返回完整字符串
return response.choices[0].message.content

新增 chroma 映射集合、映射上传接口,映射文本预处理,获取相似映射文本

/src/vanna/chromadb/chromadb_vector.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def __init__(self, config=None):
self.projection_collection = self.chroma_client.get_or_create_collection(
name="projection", embedding_function=self.embedding_function
)

def add_projection(self, projection: str, **kwargs) -> str:
id = str(uuid.uuid4()) + "-pro"
self.projection_collection.add(
documents=projection,
embeddings=self.generate_embedding(projection),
ids=id,
)
return id

def parse_json_mappings(self,json_data):
result = ""
for mapping in json_data.get("mappings", []):
table = mapping.get("table", {})
table_system_name = table.get("systemName", "")
table_display_name = table.get("displayName", "")
result += f"表名:'{table_system_name}' 代表 '{table_display_name}'\n"

fields = mapping.get("fields", [])
for field in fields:
field_system_name = field.get("systemName", "")
field_display_name = field.get("displayName", "")
result += f" - 当前表中字段 '{field_system_name}' 代表 '{field_display_name}'\n"

value_mappings = field.get("valueMappings", {})
if value_mappings:
mappings_text = ", ".join(
[f"'{k}' 代表 '{v}'" for k, v in value_mappings.items() if k != "_comment"])
result += f" -- 当前字段中 {mappings_text}\n"

return result

def get_training_data(self, **kwargs) -> pd.DataFrame:
projection_data = self.projection_collection.get()

if projection_data is not None:
# Extract the documents and ids
projection = [pro for pro in projection_data["documents"]]
ids = projection_data["ids"]

# Create a DataFrame
df_doc = pd.DataFrame(
{
"id": ids,
"question": [None for doc in projection],
"content": [pro for pro in projection],
}
)

df_doc["training_data_type"] = "projection"

df = pd.concat([df, df_doc])

def remove_training_data(self, id: str, **kwargs) -> bool:
elif id.endswith("-pro"):
self.projection_collection.delete(ids=id)
return True

def remove_collection(self, collection_name: str) -> bool:
elif collection_name == "projection":
self.chroma_client.delete_collection(name="projection")
self.projection_collection = self.chroma_client.get_or_create_collection(
name="projection", embedding_function=self.embedding_function
)
return True

def get_related_projection(self, question: str, **kwargs) -> list:
return ChromaDB_VectorStore._extract_documents(
self.projection_collection.query(
query_texts=[question],
)
)

新增 意图控制、意图分析、SQL 生成与分析、数据库数据返回 流式传输 API 接口

/src/ZhipuAI/ZhipuAI_Chat.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#TODO:意图控制
#TODO:意图控制
def generate_intent_control_inspection(self,question: str = None,ddl: str = None,relative_projections=None,**kwargs):
if not question:
raise ValueError("Question cannot be empty")

# 构造系统消息提示
system_message = """你是一个专业的数据库查询意图控制系统。你需要对用户的问题进行以下判断:
1. 合法性判断:判断用户的问题是否合法,是否涉及敏感操作(如删除、修改等)。
2. 统计范围判断:判断用户的问题是否在业务表的统计范围内。
3. 详细分析:对用户的问题进行详细分析,解释你的判断依据。
\n"""

system_message += f"""提供数据库结构如下:{ddl}\n"""

projection_words = relative_projections
system_message += f""" 并提供对应生成词的原意 :{projection_words} """

system_message += """
\n请严格按照以下格式回复:
开始意图控制检查...
- 合法性判断结果:[合法/非法]
- 统计范围判断结果:[在范围内/不在统计范围内]
- 指令:[生成的对应数据指令]
- 详细分析:[你的详细分析]
结束意图控制检查...\n"""

# print(system_message)
# 构造消息日志
message_log = [
ZhipuAI_Chat.system_message(system_message),
ZhipuAI_Chat.user_message(question)
]

# 提交Prompt并获取回复
response = self.submit_prompt(message_log,stream_set=True, **kwargs)

return response

#TODO:意图理解
def generate_intention_understanding_analysis(self, question: str = None, ddl: str = None, relative_projections=None,im_context = None,**kwargs):
"""
生成意图理解的分析,格式如下:
开始意图理解分析...
- 业务属性分类:[分类]
- 查询范围确定:[范围]
- 近似问题参考:[参考问题列表]
- 详细分析:[详细分析内容]
结束意图理解分析...
"""
if not question:
raise ValueError("Question cannot be empty")

# 构造系统消息提示
system_message = """你是一个专业的数据库查询意图理解系统。理解用户想要查询什么、需要哪些数据、满足什么条件等.
你需要对用户的问题进行以下分析:
1. 业务属性分类:将问题归类到具体的业务领域,例如“产品管理”、“库存管理”等。
2. 查询范围确定:明确问题涉及的表、字段以及查询条件。
3. 近似问题参考:提供与当前问题相似的其他问题示例。
4. 详细分析:对问题进行详细分析,解释你的判断依据,并提供可能的SQL查询语句。
\n"""
system_message += f"""提供数据库结构如下:{ddl}\n"""

projection_words = relative_projections
system_message += f""" 并提供对应生成词的原意 :{projection_words} """

if im_context:
system_message += f"""\n 并提供意图控制检查的相应结果:\n{im_context}"""

system_message +="""\n
请严格按照以下格式回复:
开始意图理解分析...
- 业务属性分类:[分类]
- 查询范围确定:[范围]
- 近似问题参考:[参考问题列表]
- 详细分析:[详细分析内容]
结束意图理解分析...\n"""



# print(system_message)
# 构造消息日志
message_log = [
ZhipuAI_Chat.system_message(system_message),
ZhipuAI_Chat.user_message(question)
]

# 提交Prompt并获取回复
response = self.submit_prompt(message_log,stream_set=True, **kwargs)

return response

def generate_sql_with_im_and_iua(self, question: str = None, ddl: str = None,relative_projections=None,im_context=None,iua_context=None,
**kwargs):
if not question:
raise ValueError("Question cannot be empty")

# 构造系统消息提示
system_message = """你是一个专业的数据库查询系统。理解用户想要查询什么、需要哪些数据、满足什么条件等.
你需要根据用户的问题,结合上下文的意图控制与意图理解内容进行以下反馈:
1. 给出对应的**SQL**语句[不需要';'结尾]
2. 结合意图反馈分析该SQL语句的实现逻辑
\n"""
system_message += f"""提供数据库结构如下:{ddl}\n"""

projection_words = relative_projections
system_message += f""" 并提供对应生成词的原意 :{projection_words} """

if im_context:
system_message += f"""\n 并提供意图控制检查的相应结果:\n{im_context}"""
if iua_context:
system_message += f"""\n 并提供意图理解检查的相应结果:\n{im_context}"""

system_message += """\n
请严格按照以下格式回复:
开始生成SQL语句...
\`\`\`sql
SQL语句[不需要';'结尾]
\`\`\`
- 详细分析:[详细分析内容]
结束生成SQL语句...\n"""

# print(system_message)
# 构造消息日志
message_log = [
ZhipuAI_Chat.system_message(system_message),
ZhipuAI_Chat.user_message(question)
]

# 提交Prompt并获取回复
response = self.submit_prompt(message_log, stream_set=True, **kwargs)

return response

/src/flask/init.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#TODO:'意图控制', '意图理解','SQL生成','SQL校验','可视化建议'
@self.flask_app.route('/api/v0/Step_text2sql')
def Step_text2sql():
import re
question = flask.request.args.get("question")
tables = flask.request.args.get("tables")


if question is None:
return jsonify({"type": "error", "error": "No question provided"})

#TODO:意图控制
# sql = vn.generate_sql(question=question)
# id = self.cache.generate_id(question=question)\
# self.cache.set(id=id, field="question", value=question)

import re


relative_projections = vn.get_related_projection(question)
relative_tables = []

for projection in relative_projections:

# 使用正则表达式匹配表名
match = re.search(r"表名:'([^']+)'", projection)

if match:
table_name = match.group(1)
relative_tables.append(table_name)

ddl = ""
for table in relative_tables:
ddl += vn.get_ddl(table) + '\n'
relative_projections = '\n'.join(relative_projections)

print(f'当前的DDL:\n{ddl}\n')
print(f'当前的relative_projections:\n{relative_projections}\n')

def generate_streams(question,ddl,relative_projections):
import json
# 阶段1:意图控制流
im_stream = vn.generate_intent_control_inspection(question, ddl,relative_projections)
im_chunks = []

for chunk in im_stream:
yield f"event: intent_control\ndata:{chunk}\n\n"
im_chunks.append(chunk)
im_complete = "".join(im_chunks)
print(im_complete)
print('_______________________________________')

# 阶段2:意图理解流
iua_stream = vn.generate_intention_understanding_analysis(question,
ddl,im_context = im_complete)
iua_chunks = []
for chunk in iua_stream:
yield f"event: intent_analysis\ndata:{chunk}\n\n"
iua_chunks.append(chunk)
iua_complete = "".join(iua_chunks)
print(iua_complete)
print('_______________________________________')

sql_stream = vn.generate_sql_with_im_and_iua(question,ddl,relative_projections,im_context=im_complete,iua_context=iua_complete)
sql_chunks = []
for chunk in sql_stream:
yield f"event: sql_generate\ndata:{chunk}\n\n"
sql_chunks.append(chunk)
sql_complete = "".join(sql_chunks)
print(sql_complete)
_sql = vn.extract_sql(sql_complete)
print('_______________________________________')
_sql = re.sub(';', '', _sql) #openLooKeng
df = vn.run_sql(_sql)
print(df)
json_data = df.to_json(orient='records', date_format='iso')
yield f"event: table\ndata:{json_data}\n\n"
return f"event: Finish\n\n"

#
return Response(
stream_with_context(generate_streams(question=question,ddl=ddl,relative_projections=relative_projections)),
mimetype='text/event-stream', # 或 'application/json'
headers={'Content-Type': 'text/event-stream; charset=utf-8'}
)


接口信息与反馈

generate_sql

请求 URL:http://127.0.0.1:8085/api/v0/generate_sql
请求方法:GET

Method Target
Get http://127.0.0.1:8085/api/v0/generate_sql
Key Value
question “question”

return

1
2
3
4
5
{
"id": "your id",
"text": "SELECT * FROM ys_vehicle_transfer;",
"type": "sql"
}

run_sql

请求 URL:http://127.0.0.1:8085/api/v0/run_sql
请求方法:GET

Method Target
Get http://127.0.0.1:8085/api/v0/run_sql
Key Value
id “id from generate_sql”

return

1
2
3
4
5
{
"df": "df content",
"id": "your id",
"type": "df"
}

download_csv

请求 URL:http://127.0.0.1:8085/api/v0/download_csv
请求方法:GET

Method Target
Get http://127.0.0.1:8085/api/v0/download_csv
Key Value
id “id from generate_sql”

return

1
2
3
{
csv表格信息
}

generate_plotly_code

请求 URL:http://127.0.0.1:8085/api/v0/generate_plotly_figure
请求方法:GET

Method Target
Get http://127.0.0.1:8085/api/v0/
Key Value
id “id from generate_sql”

return

1
2
3
4
5
{
"fig": "{.........}",
"id": "your id",
"type": "plotly_figure"
}

以上接口均需要对应 id 才能继续运行,以下接口不需要 id

Step_text

请求 URL:http://127.0.0.1:8085/api/v0/Step_text
请求方法:GET

Method Target
Get http://127.0.0.1:8085/api/v0/Step_text2sql
Key Value
question “在产业收入分类表中分类字段的数据有哪些”

return

1
大量流式传输返回结果

add_projection

请求 URL:http://127.0.0.1:8085/api/v0/Step_text
请求方法:POST

Method Target
POST http://127.0.0.1:8085/api/v0/add_projection

发送的映射文件参考

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
"mappings": [
{
"table": {
"displayName": "产业收入分类",
"systemName": "mo6hi6315MsO",
"description": "记录产业收入的分类维度数据"
},
"fields": [
{
"displayName": "类型",
"systemName": "vcy3oy9au74axvuyyzu9",
"description": "业务类型标识字段",
"valueMappings": {
"_comment": "可选的特殊值映射",
"0": "启用",
"1": "禁用"
}
},
{
"displayName": "分类",
"systemName": "vcnwp8vhb070xh9o8ekr",
"description": "收入分类层级标签"
}
]
}
]
}

return

1
None