使用 LlamaIndex 查询管道轻松实现 RAG 工作流(带有文本到 SQL 的示例)
1. 简介
LlamaIndex
中的查询管道可帮助我们轻松地将 RAG
组件拼凑起来并在常见工作流中重复使用,并将自定义工作流定义为 DAG
。
1.1. RAG 组件
通常,RAG 管道包含查询重写、检索、重新排序、工具、响应合成等元素。其中许多工作流程都很常见。有时,您实际上可以将它们描绘为 DAG。
2. 使用 QueryPipeline 顺序链接组件
下面描述了一个带有查询模板和 LLM 的简单工作流程。
具有查询模板和 LLM 的简单工作流程
一个简单的查询管道链将如下所示。
一个简单的查询管道链
让我们实现上述管道。首先我们需要安装 Llamaindex
。
pip install llama-index== 0.10.37
简单线性查询管道的代码如下:
from llama_index.core.query_pipeline import QueryPipeline
from llama_index.llms.openai import OpenAI
from llama_index.core import PromptTemplate
import openai
openai.api_key = ""
# Chaining basic prompts
prompt_str = "Please generate related movies to {movie_name}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
# Define the query pipeline
query_pipeline = QueryPipeline(chain=[prompt_tmpl, llm], verbose=True)
output = query_pipeline.run(movie_name="The transformers")
print(str(output))
下面是回答:
您可以将多个提示和LLM进行链式连接,步骤如下:
2.1. 具有查询重写功能的高级查询流水线。
让我们用以下组件来研究一个高级RAG示例。
-
查询重写
Query rewriting
-
重新排名器
Reranker
有了所有这些组件,顺序查询流水线将如下所示。
现在让我们来开发我们的示例。首先,您需要按照以下步骤下载数据。数据将存储在“Data”文件夹中。
mkdir Data
wget -O Data/paul_graham_essay.txt https://gist.github.com/wey-gu/75d49362d011a0f0354d39e396404ba2/raw/0844351171751ebb1ce54ea62232bf5e59445bb7/paul_graham_essay.txt
然后我们导入所需的模块。
from llama_index.core import (
VectorStoreIndex,
ServiceContext,
SimpleDirectoryReader,
load_index_from_storage,
)
使用SimpleDirectoryReader读取Data文件夹的内容。
reader = SimpleDirectoryReader("./Data")
docs = reader.load_data()
print(docs[0].get_content())
然后我们需要重建或者创建索引:
import os
from llama_index.core.storage import StorageContext
if not os.path.exists("storage"):
index = VectorStoreIndex.from_documents(docs)
# 保存索引
index.set_index_id("vector_index")
index.storage_context.persist("./storage")
else:
# 重建存储上下文
storage_context = StorageContext.from_defaults(persist_dir="storage")
# 加载索引
index = load_index_from_storage(storage_context, index_id="vector_index")
2.2 构建查询流水线
# 生成关于主题的问题
prompt_str1 = "Please generate a concise question about Paul Graham's life regarding the following topic {topic}"
prompt_tmpl1 = PromptTemplate(prompt_str1)
# 幻觉的回答。
prompt_str2 = (
"Please write a passage to answer the question\n"
"Try to include as many key details as possible.\n"
"\n"
"\n"
"{query_str}\n"
"\n"
"\n"
'Passage:"""\n'
)
prompt_tmpl2 = PromptTemplate(prompt_str2)
llm = OpenAI(model="gpt-3.5-turbo")
retriever = index.as_retriever(similarity_top_k=5)
p = QueryPipeline(
chain=[prompt_tmpl1, llm, prompt_tmpl2, llm, retriever], verbose=True
)
现在你可以按照以下方式从管道中提出问题。
nodes = p.run(topic="university")
len(nodes)
for node in nodes:
print(node, end="")
3. 将RAG管线构建为DAG
顺序检索链看起来不错, 我们可以像火车一样构建一个漂亮的RAG流水线!但假设以下情景。
这里有一些模块,比如retriever
、summarizer
和re-ranker
,拥有多个输入。正如您所看到的,在现实世界中,一些模块将具有多个输入。然后您将不得不创建一个以DAG
形式构建的RAG管道。 不同RAG组件的排列方式将类似于上面的图像。现在让我们学习如何构建这样一个DAG管道。
之前我们已经定义了除re-ranker
和summarizer
之外的所有其他模块。那么让我们也定义这两个模块吧。 这里我们正在使用Cohere re-ranker
模块。
from llama_index.postprocessor.cohere_rerank import CohereRerank
from llama_index.core.response_synthesizers import TreeSummarize
# define modules
prompt_str = "Please generate a question about Paul Graham's life regarding the following topic {topic}"
prompt_tmpl = PromptTemplate(prompt_str)
llm = OpenAI(model="gpt-3.5-turbo")
retriever = index.as_retriever(similarity_top_k=2)
summarizer = TreeSummarize(
llm=OpenAI(model="gpt-3.5-turbo")
)
reranker = CohereRerank(api_key="")
现在我们已经定义了所需的模块。让我们将该模块添加到查询管道中。
p = QueryPipeline(verbose=True)
p.add_modules(
{
"llm": llm,
"prompt_tmpl": prompt_tmpl,
"retriever": retriever,
"summarizer": summarizer,
"reranker": reranker,
}
)
# 完成后,我们需要在它们之间添加链接。
p.add_link("prompt_tmpl", "llm")
p.add_link("llm", "retriever")
p.add_link("retriever", "reranker", dest_key="nodes")
p.add_link("llm", "reranker", dest_key="query_str")
p.add_link("reranker", "summarizer", dest_key="nodes")
p.add_link("llm", "summarizer", dest_key="query_str")
Reranker
需要两个输入。
- 从
Retrieve
r侧获取节点。 - 从LLM方面获取用户查询。
由于Reranker有两个输入,您必须明确将它们指定为dest_keys。
就像这样,摘要生成器有两个输入:
- 从重新排名器的角度来看,它会对节点进行重新排名。
- 从语言模型生成器的角度来看,它会获取用户查询。
我们可以将其可视化如下。
现在你可以按照以下方式提问。
nodes = p.run(topic="university")
len(nodes)
这是上述查询的答案。
4. 文本到SQL流水线与有向无环图
现在我们讨论使用有向无环图的文本到 SQL RAG
管道。
4.1. 创建数据库
首先要创建数据库。你需要一些表格数据来填充数据库。我们从维基表问题下载一些数据。它包含可用于填充数据库的CSV
文件。
首先让我们下载数据。
!wget "https://github.com/ppasupat/WikiTableQuestions/releases/download/v1.0.2/WikiTableQuestions-1.0.2-compact.zip" -O data.zip
!unzip -o data.zip
然后让我们将给定路径下的所有 csv 文件加载到数据框中。
import pandas as pd
from pathlib import Path
data_dir = Path("./WikiTableQuestions/csv/200-csv")
csv_files = sorted([f for f in data_dir.glob("*.csv")])
dfs = []
for csv_file in csv_files:
print(f"processing file: {csv_file}")
try:
df = pd.read_csv(csv_file)
dfs.append(df)
except Exception as e:
print(f"Error parsing {csv_file}: {str(e)}")
然后,对于每个数据框架,我们使用LLM生成表名和摘要,并保存它们。为此,请使用下面的Pydantic类。
from llama_index.core.program import LLMTextCompletionProgram
from pydantic import BaseModel, Field
from llama_index.llms.openai import OpenAI
class TableInfo(BaseModel):
"""关于结构化表格的信息."""
table_name: str = Field(
…, description="table name (必须使用下划线,不能有空格。)"
)
table_summary: str = Field(
…, description="表格的简洁摘要/标题"
)
prompt_str = """\
Give me a summary of the table with the following JSON format.
- The table name must be unique to the table and describe it while being concise.
- Do NOT output a generic table name (e.g. table, my_table).
Do NOT make the table name one of the following: {exclude_table_name_list}
Table:
{table_str}
Summary: """
program = LLMTextCompletionProgram.from_defaults(
output_cls=TableInfo,
llm=OpenAI(model="gpt-3.5-turbo"),
prompt_template_str=prompt_str,
)
然后,对于每个数据框,我们使用以下代码生成表名和表的摘要。
import json
def _get_tableinfo_with_index(idx: int) -> str:
results_gen = Path(tableinfo_dir).glob(f"{idx}_*")
results_list = list(results_gen)
if len(results_list) == 0:
return None
elif len(results_list) == 1:
path = results_list[0]
return TableInfo.parse_file(path)
else:
raise ValueError(
f"More than one file matching index: {list(results_gen)}"
)
table_names = set()
table_infos = []
for idx, df in enumerate(dfs):
table_info = _get_tableinfo_with_index(idx)
if table_info:
table_infos.append(table_info)
else:
while True:
df_str = df.head(10).to_csv()
table_info = program(
table_str=df_str,
exclude_table_name_list=str(list(table_names)),
)
table_name = table_info.table_name
print(f"Processed table: {table_name}")
if table_name not in table_names:
table_names.add(table_name)
break
else:
print(f"Table name {table_name} already exists, trying again.")
pass
out_file = f"{tableinfo_dir}/{idx}_{table_name}.json"
json.dump(table_info.dict(), open(out_file, "w"))
table_infos.append(table_info)
现在你已经有了数据框架、表格名称和摘要。接着我们创建实际的数据库表,并填充数据。
# 填充数据库
from sqlalchemy import (
create_engine,
MetaData,
Table,
Column,
String,
Integer,
)
import re
# 创建列名的函数
def sanitize_column_name(col_name):
return re.sub(r"\W+", "_", col_name)
# 从数据框创建数据库表
def create_table_from_dataframe(
df: pd.DataFrame, table_name: str, engine, metadata_obj
):
sanitized_columns = {col: sanitize_column_name(col) for col in df.columns}
df = df.rename(columns=sanitized_columns)
# 根据DataFrame列和数据类型创建列
columns = [
Column(col, String if dtype == "object" else Integer)
for col, dtype in zip(df.columns, df.dtypes)
]
# 创建一个包含定义列的表格。
table = Table(table_name, metadata_obj, *columns)
# 在数据库中添加表格
metadata_obj.create_all(engine)
# 将DataFrame中的数据插入表格
with engine.connect() as conn:
for _, row in df.iterrows():
insert_stmt = table.insert().values(**row.to_dict())
conn.execute(insert_stmt)
conn.commit()
engine = create_engine("sqlite:///:memory:")
metadata_obj = MetaData()
for idx, df in enumerate(dfs):
tableinfo = _get_tableinfo_with_index(idx)
print(f"Creating table: {tableinfo.table_name}")
create_table_from_dataframe(df, tableinfo.table_name, engine, metadata_obj)
现在我们的数据库已经准备就绪,是时候使用查询管道来创建DAG了。我们需要以下的DAG模块。
- 用于存储表模式的对象索引。
- 用于检索表架构的检索器。
SQLDatabase
对象用于连接到表和检索器。- 使用
Text-to-SQL Prompt
可以从自然语言生成 SQL。 - 响应合成提示。
- LLM.
4.2 对象索引和检索器
在检索增强生成(RAG
)模型中,矢量索引的目的是通过将文本数据(字符串)转换为可以有效搜索和比较的数字表示(矢量)来增强检索过程。但在这里,我们不是在处理字符串。我们需要存储的表模式实际上是对象,而不是字符串。LlamaIndex
提供了一种简单的方法来处理这种情况。这就是对象索引。
所以我们需要一个对象索引,这样对象就可以被转换成字符串表示,这样它们就可以被矢量化。在检索中,检索到的字符串表示也需要转换为对象表示。对象索引也有助于实现这一点。因此,Object索引存储将返回一个对象,而不是返回字符串。以下是代码:
from llama_index.core.objects import (
SQLTableNodeMapping,
ObjectIndex,
SQLTableSchema,
)
from llama_index.core import SQLDatabase
sql_database = SQLDatabase(engine)
table_node_mapping = SQLTableNodeMapping(sql_database)
table_schema_objs = [
SQLTableSchema(table_name=t.table_name, context_str=t.table_summary)
for t in table_infos
] # 为每个表添加一个 SQLTableSchema。
obj_index = ObjectIndex.from_objects(
table_schema_objs,
table_node_mapping,
VectorStoreIndex,
)
obj_retriever = obj_index.as_retriever(similarity_top_k=3)
在向量存储中我们可以保存的是一个节点。这与表方案有些不同,后者是一个对象。因此,我们需要一种将表模式映射到节点的方法。LlamaIndex
提供了 SQLTableNodeMapping
来轻松实现此操作。在这里,SQLTableNodeMapping
对象接收 SQLDatabase
,并为每个数据库生成一个 Node
对象。
但是这些信息还不足以做一个良好的检索。目前我们有关于表结构的信息是以节点格式表示的。假设有两个表,一个叫Student
,另一个叫StudentInfo
。我们需要区分这两个表,以便在构建查询时知道要针对哪个表进行操作。一种非常好的方法是保存每个表的摘要,并附上与该表相关联的引用。如果我们在节点中保存表名和表摘要,就可以轻松实现这一点。这正是我们接下来需要做的事情。之前我们提取了各个数据行并将其存储在名为table_infos
的列表中。使用 SQLTableSchema
,我们可以为每张表创建一个相应的节点(利用table_infos
),并将其与结构一起保存在矢量存储器中。
为了进一步明确,打印出 table_schema_obj
和 table_node_mapping._sql_database
。
当我们检索时,现在可以获取表模式。这是在下面的代码中完成的。同时,在最后一行创建了一个FnComponent(函数组件),因此该组件可以添加到Llamaindex查询管道中。
from llama_index.core.retrievers import SQLRetriever
from typing import List
from llama_index.core.query_pipeline import FnComponent
sql_retriever = SQLRetriever(sql_database)
def get_table_context_str(table_schema_objs: List[SQLTableSchema]):
"""获取表上下文字符串."""
context_strs = []
for table_schema_obj in table_schema_objs:
table_info = sql_database.get_single_table_info(
table_schema_obj.table_name
)
if table_schema_obj.context_str:
table_opt_context = " The table description is: "
table_opt_context += table_schema_obj.context_str
table_info += table_opt_context
context_strs.append(table_info)
return "\n\n".join(context_strs)
table_parser_component = FnComponent(fn=get_table_context_str)
4.3. 文本到 SQL prompt
一旦我们检索到模式,我们需要做的是基于这些检索构建一个提示。为此,我们需要将模式转换为字符串表示。然后,我们将其定义为一个函数组件,在查询流程中使用。
将返回的模式转换为字符串后,我们需要创建一个提示。我们可以使用 DEFAULT_TEXT_TO_SQL_PROMPT 作为基础。FnComponent已经被创建用于将其添加到查询流程中。
from llama_index.core.prompts.default_prompts import DEFAULT_TEXT_TO_SQL_PROMPT
from llama_index.core.prompts import PromptTemplate
from llama_index.core.query_pipeline import FnComponent
from llama_index.core.llms import ChatResponse
def parse_response_to_sql(response: ChatResponse) -> str:
"""Parse response to SQL."""
response = response.message.content
sql_query_start = response.find("SQLQuery:")
if sql_query_start != -1:
response = response[sql_query_start:]
if response.startswith("SQLQuery:"):
response = response[len("SQLQuery:") :]
sql_result_start = response.find("SQLResult:")
if sql_result_start != -1:
response = response[:sql_result_start]
return response.strip().strip("```").strip()
sql_parser_component = FnComponent(fn=parse_response_to_sql)
text2sql_prompt = DEFAULT_TEXT_TO_SQL_PROMPT.partial_format(
dialect=engine.dialect.name
)
print(text2sql_prompt.template)
以下是Prompt:
Given an input question, first create a syntactically correct {dialect} query to run, then look at the results of the query and return the answer. You can order the results by a relevant column to return the most interesting examples in the database.
Never query for all the columns from a specific table, only ask for a few relevant columns given the question.
Pay attention to use only the column names that you can see in the schema description. Be careful to not query for columns that do not exist. Pay attention to which column is in which table. Also, qualify column names with the table name when needed. You are required to use the following format, each taking one line:
Question: Question here
SQLQuery: SQL Query to run
SQLResult: Result of the SQLQuery
Answer: Final answer here
Only use tables listed below.
{schema}
Question: {query_str}
SQLQuery:
这里{scheme}是转换成文本的架构,而{query_str}是自然语言问题。以下是该问题的示例提示。
4.4. 响应综合提示
然后我们需要根据结果生成响应。为此目的创建了一个提示并添加了。代码如下:
response_synthesis_prompt_str = (
"Given an input question, synthesize a response from the query results.\n"
"Query: {query_str}\n"
"SQL: {sql_query}\n"
"SQL Response: {context_str}\n"
"Response: "
)
response_synthesis_prompt = PromptTemplate(
response_synthesis_prompt_str,
)
现在所有组件都准备就绪。现在我们需要按照以下方式将定义的模块添加到管道中。
from llama_index.core.query_pipeline import (
QueryPipeline as QP,
Link,
InputComponent,
CustomQueryComponent,
)
qp = QP(
modules={
"input": InputComponent(),
"table_retriever": obj_retriever,
"table_output_parser": table_parser_component,
"text2sql_prompt": text2sql_prompt,
"text2sql_llm": llm,
"sql_output_parser": sql_parser_component,
"sql_retriever": sql_retriever,
"response_synthesis_prompt": response_synthesis_prompt,
"response_synthesis_llm": llm,
},
verbose=True,
)
如上面所述,我们需要初步的InputComponent、对象检索器来检索数据表结构,然后将检索到的结构转换成字符串并为此创建一个提示。接着由llm回答这个提示。接下来解析llm的结果并从数据库中检索出结果。然后合成响应提示,并为该提示指定响应。
接着我们需要添加以下链接代码:
qp.add_chain(["input", "table_retriever", "table_output_parser"])
qp.add_link("input", "text2sql_prompt", dest_key="query_str")
qp.add_link("table_output_parser", "text2sql_prompt", dest_key="schema")
qp.add_chain(
["text2sql_prompt", "text2sql_llm", "sql_output_parser", "sql_retriever"]
)
qp.add_link(
"sql_output_parser", "response_synthesis_prompt", dest_key="sql_query"
)
qp.add_link(
"sql_retriever", "response_synthesis_prompt", dest_key="context_str"
)
qp.add_link("input", "response_synthesis_prompt", dest_key="query_str")
qp.add_link("response_synthesis_prompt", "response_synthesis_llm")
生成的查询管道如下所示:
现在您可以按照以下方式提问。
response = qp.run(
query="how many schools in Ohio?"
)
下面是回答: