跳到主要内容
feedback
feedback

Modular RAG

在完成 Naive RAG 的基础构建与 Advanced RAG 的链路优化后,我们正式进入Modular RAG (模块化 RAG) 章节。

不同于以往线性的、固定的 Pipeline,Modular RAG 引入智能调度的概念。系统根据用户意图,通过 Qwen3-0.6B 微型路由器动态编排处理路径。在 DeepSeek 论文集的实测中,该系统将原本 5 分钟的复杂检索压缩至 30-50 秒,同时维持了极高的学术严谨性与对抗幻觉的能力。

一、架构概览

不同于传统 RAG 的“线性流水线”,Modular RAG 更像一个拥有“大脑”的调度中心。它根据用户意图,动态选择最快最准的处理路径

  • 数据层
    • 内置示例数据集:平台在内置路径中预置了 “DeepSeek” 论文数据集。用户无需繁琐的数据准备,即可通过调用内置库实现 “一键上手”,快速验证 RAG 流程的闭环性。
    • 私有数据接入:系统具备高度的灵活性,支持用户通过数据通道自行上传 PDFMarkdown 等私有文档。通过简单的路径配置,即可实现从公共知识到行业私有知识的无缝切换。
    • 深度清洗加固:针对非结构化 PDF 执行“断路器”清洗逻辑,强制隔离元数据并过滤非法字符,彻底规避 Embedding 阶段由于数据脏点导致的类型崩溃风险。
  • 推理层
    • 核心引擎:采用 vLLM 作为推理后端,利用其 PagedAttention 技术提升并发处理能力。
    • 启动策略:执行显存割让策略,为后续的向量化和精排预留充足的计算余裕,确保单卡环境下多模型的稳定运行。
    • 多模型协同架构:在单卡环境下通过显存分片同时调度 Qwen3-8B(生成/反思)、Qwen3-Embedding-8B(向量化) 与 Qwen3-0.6B(路由) 三大模型。
  • 数据存储与检索层
    • 异构双路索引:集成 Milvus Lite(语义向量库)与 PropertyGraph(本地逻辑图谱),实现“事实细节”与“逻辑链条”的双重覆盖
    • 秒级加载复用:实现了向量库与图谱索引的本地持久化机制(Save/Load)。首次构建需等待 5 分钟,二次加载即可实现秒级响应。
  • 逻辑编排层
    • 工作流管理:基于 LlamaIndex 构建。
    • Modular RAG 范式
      • 意图路由 (Router):利用微型模型精准分流,实现寒暄、统计、逻辑演进与技术细节的路径差异化。
      • 语义对齐增强:在检索前引入轻量化 HyDE。通过预生成 3 个技术关键词补充用户提问的背景,解决学术论文中“术语缩写”与“语义漂移”导致的召回失败。
      • 异构双路混合检索
        • 实现:并行驱动 Milvus 向量库PropertyGraph 逻辑图谱
        • 逻辑:向量路负责捕捉实验数据等“事实点”,图谱路负责还原跨章节的“逻辑线”(如 A 技术对 B 架构的改进路径),实现点线结合的 360° 知识全覆盖。
      • 知识精炼:将多个 1000 Token 的原始片段合并,通过 1 次 LLM 请求提取核心事实清单,彻底消除长文本噪声。

二、优化步骤

为了实现超越 Advanced RAG 的表现,我们在代码中实施了以下三大模块化进化策略:

1:智能路由与元数据感知

系统不再盲目检索,而是先通过“路由”模型判定用户到底想要什么,从而实现响应效率与准确性的双重提升。

  • 代码实现逻辑
    • 0.6B 毫秒级路由:调用 Qwen3-0.6B 配合 Few-Shot 指令,将输入分流至 GREETING(对话)、SUMMARY(总结)、GRAPH(逻辑)或 TECH(技术)。
    • 元数据直接映射:针对统计类问题,系统绕过向量检索,直接扫描文档元数据提取 file_name,确保统计结果 100% 准确,消除了大模型对数值的幻觉。

2:Graph RAG 模块

作为 Modular RAG 的核心进阶模块,Graph RAG 填补了传统检索在“因果关系”和“跨章节推理”上的空白。

  • 模块设计逻辑
    • 核心逻辑链检索:当路由器识别到“关系”、“演进”或“对比”类意图时,系统沿着图谱路径还原技术脉络,而非寻找相似片段。
    • 论文提取策略:系统不再对数千个碎片节点建图,而是按论文聚合,仅针对每篇论文的前 500 字(标题与摘要)提取关键三元组(如:Engram -> 优化 -> 推理延迟)。

3:HyDE 语义对齐

针对学术论文中密集的硬核术语(如 mHC),系统采用“以点带面”的策略。

  • 代码实现逻辑
    • HyDE 预处理:在检索前先生成 3 个核心技术术语作为检索增强,解决了用户提问太简短导致的语义偏移。
    • 批量事实提取:为了性能最大化,系统将向量库与图谱库检索到的 Top-6 证据进行打包,通过 1 次 LLM 调用提取所有核心事实。这比传统逐条精炼的方式速度提升了 500% 以上。
    • 严格边界指令:在最终生成阶段强制执行 【严格指令】,确保回答完全锁死在当前 PDF 资料内,杜绝预训练记忆干扰。

4:重排序与批量

系统从“搬运工”向“学术审稿人”进化的关键。通过对海量检索结果的二次筛选与无损压缩,确保生成模型只接收最纯净的知识干货。

  • 代码实现逻辑
    • LLM 重排序 (Rerank):系统首先从向量库中初筛出 Top 15 个片段;引入 LLM ,通过 LLMRerank 模块对这 15 个片段进行语义评分,将 choice_batch_size 设为 15。这意味着原本需要多次往返的精排打分,现在通过 1 次 并发请求即可全量完成,速度提升了 80%。
    • 批量事实提取:代码摒弃了“逐条精简”的低效模式,采用 batch_extract_facts 函数。将精排后的 Top 6 核心证据原文打包成一个任务包。

三、沐曦 (MetaX) 部署指南

本章节适用于 曦云 C500 等沐曦系列算力卡。

1. 硬件与基础环境

  • 算力型号:曦云 C500 (64GB)
  • 算力主机:
    • jiajia-mxcvLLM / vllm:0.11.0 / Python 3.10 / maca 3.3.0.11

      镜像选择

2. 基础步骤

  • 进入算力容器,启动实例后,点击 JupyterLab 进入工作台。

    进入容器

3. 实现步骤

3.1 下载 LlamaIndex 与 Milvus Lite 框架

  • 创建终端窗口(Terminal)

    进入终端

  • 输入代码:

    pip install --target /data/llama_libs --no-deps -i https://mirrors.aliyun.com/pypi/simple/ -U \
    "pymilvus==2.6.6" milvus-lite orjson minio pathspec python-dateutil pytz six \
    llama-index-core llama-index-readers-file llama-index-llms-openai llama-index-llms-openai-like \
    llama-index-embeddings-huggingface llama-index-vector-stores-milvus llama-index-postprocessor-sbert-rerank \
    llama-index-instrumentation llama-index-workflows llama-index-utils-workflow \
    llama-index-retrievers-bm25 rank-bm25 bm25s PyStemmer \
    sentence-transformers pypdf docx2txt nest-asyncio ujson grpcio google-api-core protobuf banks griffe sqlalchemy dataclasses-json marshmallow typing-inspect fsspec filetype deprecated wrapt dirtyjson tenacity jinja2 pyyaml \
    pandas numpy nltk tiktoken requests charset-normalizer urllib3 certifi idna sniffio anyio h11 httpcore httpx mypy_extensions typing_extensions scikit-learn scipy joblib threadpoolctl tqdm pyarrow \
    ragas langchain-core langchain-openai langsmith requests_toolbelt "numpy<2.0" uuid_utils tenacity regex appdirs instructor docstring_parser langchain_community llama-index-llms-huggingface
    pip install accelerate

  • 完成下载后,新建一个新的终端: 新建终端

3.2 启动 vLLM 推理

  • 在新的终端内输入代码:

    python -m vllm.entrypoints.openai.api_server \
    --model /mnt/moark-models/Qwen3-8B \
    --gpu-memory-utilization 0.4 \
    --port 8000
  • 当终端提示INFO: Application startup compete,则完成vLLM启动步骤。 启动vLLM

3.3 创建并运行 Python 脚本

  • 点击 Python File:

    创建脚本

  • 输入代码:

    import sys, os, asyncio, nest_asyncio, torch, shutil
    from transformers import AutoModelForCausalLM, AutoTokenizer

    # 1. 环境初始化
    PRIVATE_LIB = "/data/llama_libs"
    if PRIVATE_LIB not in sys.path:
    sys.path.insert(0, PRIVATE_LIB)

    from llama_index.llms.huggingface import HuggingFaceLLM
    nest_asyncio.apply()

    from llama_index.core import SimpleDirectoryReader, VectorStoreIndex, StorageContext, Settings, PromptTemplate, load_index_from_storage
    from llama_index.embeddings.huggingface import HuggingFaceEmbedding
    from llama_index.llms.openai_like import OpenAILike
    from llama_index.vector_stores.milvus import MilvusVectorStore
    from llama_index.core.node_parser import HierarchicalNodeParser, get_leaf_nodes
    from llama_index.core.retrievers import RecursiveRetriever, QueryFusionRetriever
    from llama_index.core.postprocessor import LLMRerank
    from llama_index.core.query_engine import RetrieverQueryEngine
    from llama_index.retrievers.bm25 import BM25Retriever
    from llama_index.core import PropertyGraphIndex
    from llama_index.core.indices.property_graph import ImplicitPathExtractor, SimpleLLMPathExtractor
    from llama_index.core.schema import TextNode

    # --- 全局配置参数 ---
    DATA_DIR = "/mnt/moark-models/deepseek_papers"
    EMBED_PATH = "/mnt/moark-models/Qwen3-Embedding-8B"
    LLM_MODEL = "/mnt/moark-models/Qwen3-8B"
    ROUTER_MODEL_PATH = "/mnt/moark-models/Qwen3-0.6B"
    GRAPH_STORAGE_DIR = "./graph_storage_finalV1"
    DB_PATH = "./modular_rag_finalV1.db"

    def clean_think_tag(text):
    text = str(text).strip()
    if "</think>" in text:
    return text.split("</think>")[-1].strip()
    return text

    # --- A: 批量事实精炼 (1次请求解决所有片段) ---
    def batch_extract_facts(query, nodes, llm):
    """
    【加速核心】:将原本 N 次的 LLM 调用合并为 1 次
    """
    if not nodes: return "无参考资料"

    print(f" (正在批量精炼 {len(nodes)} 个参考片段中的核心证据...)")

    combined_context = ""
    for i, node in enumerate(nodes[:6]): # 限制前 6 个最相关的片段
    combined_context += f"--- 片段 {i+1} ---\n{node.node.get_content()[:1000]}\n"

    batch_prompt = (
    "你是一个高效的技术文档精炼专家。请从下述多个论文片段中提取与问题直接相关的‘技术事实、数据和逻辑链’。\n"
    "要求:1. 以列表形式展现。 2. 必须保留原始数值和术语。 3. 如果资料无关,请直接忽略该片段。\n"
    f"用户问题:{query}\n"
    f"参考资料库:\n{combined_context}\n"
    "精炼事实清单:"
    )

    res = llm.complete(batch_prompt)
    return clean_think_tag(res.text)

    # --- 极速优化 B: 简效 HyDE ---
    def generate_search_context(query, llm):
    """缩短 HyDE 输出,仅生成关键词背景"""
    prompt = f"请为问题‘{query}’提供 3 个 DeepSeek 论文相关的核心技术术语作为检索增强:\n核心术语:"
    res = llm.complete(prompt)
    return clean_think_tag(res.text)

    def get_intent_via_router(user_input, router_llm):
    val = user_input.lower()
    meta_keys = ["多少篇", "几篇", "清单", "列表", "文件名", "哪些论文"]
    if any(k in val for k in meta_keys): return "META_QUERY"
    if any(k in val for k in ["关系", "演进", "对比", "联系", "改进", "区别"]): return "GRAPH_QUERY"
    if any(g in val for g in ["你好", "您好", "你是谁", "在吗","hello","hi"]) or len(val) < 5: return "GREETING"
    return "TECH_QUERY"

    async def build_global_core_graph_fast(documents, llm):
    print(">>> 正在快速构建 24 篇论文核心逻辑链...")
    kg_extractor = SimpleLLMPathExtractor(llm=llm, max_paths_per_chunk=3)
    unique_papers = {}
    for doc in documents:
    if doc.metadata.get('file_name') not in unique_papers:
    unique_papers[doc.metadata.get('file_name')] = doc

    core_nodes = [TextNode(text=doc.text[:500], metadata=doc.metadata) for doc in unique_papers.values()]
    index = PropertyGraphIndex(core_nodes, path_extractors=[kg_extractor], llm=llm, show_progress=True)
    return index

    # --- 主程序 ---

    async def main():
    # --- Step 1: 性能优化配置 ---
    Settings.embed_batch_size = 1 # 显存充足时提高并行度

    print(">>> 正在启动 Modular GraphRAG...")
    Settings.embed_model = HuggingFaceEmbedding(model_name=EMBED_PATH, device="cuda", trust_remote_code=True, model_kwargs={"torch_dtype": torch.float16})
    Settings.llm = OpenAILike(model=LLM_MODEL, api_base="http://localhost:8000/v1", api_key="fake", is_chat_model=True, timeout=120.0)
    router_llm = HuggingFaceLLM(model_name=ROUTER_MODEL_PATH, tokenizer_name=ROUTER_MODEL_PATH, device_map="cuda:0")

    # --- Step 2: 向量库加载 (持久化) ---
    vector_store = MilvusVectorStore(uri=DB_PATH, dim=4096, overwrite=False)
    vec_storage_context = StorageContext.from_defaults(vector_store=vector_store)

    if not os.path.exists(DB_PATH) or os.path.getsize(DB_PATH) < 1000:
    print(">>> 构建全量向量索引...")
    documents = SimpleDirectoryReader(input_dir=DATA_DIR, recursive=True).load_data()
    all_nodes = HierarchicalNodeParser.from_defaults(chunk_sizes=[1536, 512, 256]).get_nodes_from_documents(documents)
    leaf_nodes = []
    for n in get_leaf_nodes(all_nodes):
    clean_text = "".join(ch for ch in str(n.text) if ch.isprintable()).strip()
    if clean_text and clean_text.lower() != "nan":
    n.text = clean_text
    n.excluded_embed_metadata_keys = list(n.metadata.keys())
    leaf_nodes.append(n)
    vec_storage_context.docstore.add_documents(all_nodes)
    vector_index = VectorStoreIndex(leaf_nodes, storage_context=vec_storage_context, show_progress=True)
    else:
    print(">>> 向量库加载完成。")
    vector_index = VectorStoreIndex.from_vector_store(vector_store, storage_context=vec_storage_context)
    # 即使加载索引,我们也需要加载文档结构用于回溯
    documents = SimpleDirectoryReader(input_dir=DATA_DIR, recursive=True).load_data()
    all_nodes = HierarchicalNodeParser.from_defaults(chunk_sizes=[1536, 512, 256]).get_nodes_from_documents(documents)

    # --- Step 3: 图谱加载 (持久化) ---
    if not os.path.exists(GRAPH_STORAGE_DIR):
    graph_index = await build_global_core_graph_fast(documents, Settings.llm)
    graph_index.storage_context.persist(persist_dir=GRAPH_STORAGE_DIR)
    else:
    print(">>> 图谱库加载完成。")
    g_ctx = StorageContext.from_defaults(persist_dir=GRAPH_STORAGE_DIR)
    graph_index = PropertyGraphIndex(nodes=[], storage_context=g_ctx, llm=Settings.llm, embed_model=Settings.embed_model)

    # --- Step 4: 检索引擎 ---
    query_engine = RetrieverQueryEngine.from_args(
    retriever=RecursiveRetriever("vector",
    retriever_dict={"vector": vector_index.as_retriever(similarity_top_k=15)},
    node_dict={node.node_id: node for node in all_nodes}
    ),
    node_postprocessors=[LLMRerank(llm=Settings.llm, choice_batch_size=15, top_n=8)]
    )

    # --- Step 5: 交互循环 ---
    print("\n" + "="*50 + "\n粒术 Modular GraphRAG 已就绪!")
    chat_history = []

    while True:
    try:
    torch.cuda.empty_cache()
    raw_input = input("\n用户 >> ").strip()
    if raw_input.lower() in ['exit', 'quit', '退出']: break
    if not raw_input: continue
    user_input = raw_input.encode('utf-8', 'ignore').decode('utf-8')

    intent = get_intent_via_router(user_input, router_llm)
    print(f" (意图识别: {intent})")

    if intent == "GREETING":
    res = Settings.llm.complete(f"你是专家粒术。快速回应:{user_input}")
    print(f"\n粒术 >> {clean_think_tag(res.text)}")

    elif intent == "META_QUERY":
    all_docs = list(set([n.metadata.get('file_name') for n in all_nodes]))
    print(f"\n粒术 >> 库内共有 {len(all_docs)} 篇论文。")

    elif intent == "SUMMARY":
    # 总结任务:一次性检索 30 个片段进行大汇总
    summary_res = query_engine.query(f"请深度总结关于‘{user_input}’的核心创新点和实验结论。")
    print(f"\n粒术 >> {clean_think_tag(summary_res.response)}")

    else:
    # 【极速路径】:1次HyDE关键词 + 1次双路检索 + 1次批量精炼 + 1次生成
    print(" (极速检索中...)")

    # 1. 快速生成查询关键词 (1次请求)
    tech_keywords = generate_search_context(user_input, Settings.llm)

    # 2. 并行检索 (向量 + 图谱)
    v_res = query_engine.query(f"{user_input}\n技术语境:{tech_keywords}")
    g_nodes = []
    if graph_index:
    g_nodes = graph_index.as_retriever(include_text=True, similarity_top_k=3).retrieve(user_input)

    # 3. 批量无损精炼 (1次请求)
    all_nodes_list = v_res.source_nodes + g_nodes
    evidence = batch_extract_facts(user_input, all_nodes_list, Settings.llm)

    # 4. 最终回答 (1次请求)
    final_res = Settings.llm.complete(
    f"【严格指令】仅用资料回答。问题:{user_input}\n依据事实:\n{evidence}\n专业回答:"
    )
    print(f"\n粒术 >> {clean_think_tag(final_res.text)}")

    except Exception as e:
    print(f"\n 自动重连中... ({e})")

    if __name__ == "__main__":
    asyncio.run(main())
  • Ctrl + S保存文件,并完成文件命名test。新建一个终端,输入python test.py,即可进入 Modular RAG 系统。

    运行结果