跳到主要内容
feedback
feedback

Modular RAG

在完成 Naive RAG 的基础构建与 Advanced RAG 的链路优化后,我们正式进入Modular RAG (模块化 RAG) 章节。

不同于以往线性的、固定的 Pipeline,Modular RAG 引入智能调度持久化存储智能增量加载机制的概念。系统根据用户意图,通过 Qwen3-0.6B 微型路由器动态编排处理路径。在 DeepSeek 论文集的实测中,系统首次构建需数分钟,但二次启动仅需秒级,且具备极强的抗噪能力与逻辑推理能力。

一、架构概览

Modular RAG 不再是线性的流水线,而是一个具备“记忆”与“自检”能力的智能系统。它通过 Qwen3-0.6B 路由器动态编排路径,并实现了宏观图谱微观向量的完美融合。

  • 数据层
    • 内置示例数据集:平台在内置路径中预置了 “DeepSeek” 论文数据集。用户无需繁琐的数据准备,即可通过调用内置库实现 “一键上手”,快速验证 RAG 流程的闭环性。
    • 私有数据接入:系统具备高度的灵活性,支持用户通过数据通道(SCP)自行上传 PDFMarkdown 等私有文档。通过简单的路径配置,即可实现从公共知识到行业私有知识的无缝切换。
    • 深度清洗加固:针对学术 PDF 中常见的数学符号与乱码(如 \ud835),在 Embedding 前执行强制编码清洗(utf-8 ignore + decode),并在对象重建层面修复了 node_parser 的崩溃风险。
  • 推理层
    • 核心引擎:采用 vLLM 作为推理后端,利用其 PagedAttention 技术提升并发处理能力。
    • 启动策略:执行显存割让策略,为后续的向量化和精排预留充足的计算余裕,确保单卡环境下多模型的稳定运行。
    • 多模型协同架构:在双卡环境下通过显存分片同时调度 Qwen3-8B 负责核心推理、Qwen3-Embedding-8B 负责高并发向量化(Batch Size=30+),Qwen3-0.6B 负责毫秒级路由。
  • 数据存储与检索层
    • 智能增量加载:系统默认关闭 FORCE_REBUILD。启动时优先检测 Milvus 本地库(.db)与图谱存储(graph_storage),若存在则直接加载索引,跳过耗时的 Embedding 过程。
    • 异构双路索引
      • Milvus Lite:存储 740+ 页的全量切片向量,负责“细节查准”。
      • PropertyGraph:存储基于论文核心摘要生成的逻辑三元组,负责“关系查全”。
  • 逻辑编排层
    • 意图路由 (Router):精准识别 META_QUERY(统计)、GRAPH_QUERY(逻辑)、SUMMARY(总结)与 TECH_QUERY(细节)。
    • 宏微观混合检索
      • 微观:通过 HyDE(假设文档)增强向量检索,定位具体参数。
      • 宏观:通过摘要级图谱,定位论文间的演进与对比关系。
  • 流程图
    • 文档

      文档流程图

    • 代码

      代码流程图

二、优化步骤

为了实现超越 Advanced RAG 的表现,我们在代码中实施了以下三大模块化进化策略:

1:摘要级图谱构建

系统不再盲目检索,而是先通过“路由”模型判定用户到底想要什么,从而实现响应效率与准确性的双重提升。

  • 痛点:传统 Graph RAG 尝试对 740 个页面切片逐一提取三元组,导致构建耗时极长且图谱充斥着无关琐碎细节。
  • 优化逻辑
    • 按文归并:代码自动将 740 个切片按 file_name 归并为 24 组。
    • 摘要生成:先调用 LLM 对每篇论文的前 5-7 页生成“高浓缩逻辑摘要”。
    • 图谱构建:仅针对这 24 个摘要节点 提取三元组。
  • 效果:构建速度提升 10 倍以上,且图谱关系(如 DeepSeek-V3 -> 改进 -> V2)更加清晰、宏观,极大地提升了回答“演进关系”类问题的质量。

2:智能路由与元数据直通

让系统“听懂”用户在问什么,而不仅仅是做关键词匹配。

  • 元数据直通(Meta Query):当用户问“有多少篇论文”时,系统不经过 LLM,直接扫描 Document Metadata 进行 Python 级统计,实现了 0 幻觉 的精准统计回答。
  • 意图分流
    • Graph Query:触发图谱检索,回答“架构区别”、“演进路线”。
    • Tech Query:触发 HyDE + 向量检索,回答“参数细节”、“实验数据”。

3:双流混合检索与重排序

  • HyDE 预处理:在向量检索前,让 LLM 先“脑补”一段假设性答案,用生成的答案去检索,解决了学术提问太短(如“MTP 参数?”)导致的召回失败。
  • LLM 重排序:从向量库和图谱库中召回 Top-15 结果后,使用 LLMRerank 进行语义打分,只保留相关度最高的 Top-5 进入最终生成环节,有效减少了上下文窗口的噪声。

4:持久化

  • Milvus 文件锁处理:解决了 Milvus Lite 独占文件锁导致的冲突问题,通过在主循环中集成 debug 指令,实现了在不退出服务的情况下动态查看底层数据库状态。

三、沐曦 (MetaX) 部署指南

本章节适用于 曦云 C500 等沐曦系列算力卡。

1. 硬件与基础环境

  • 算力型号:曦云 C500 (64GB) * 2
  • 算力主机:
    • jiajia-mxcvLLM / vllm:0.11.0 / Python 3.10 / maca 3.3.0.11

      镜像选择

    • suanfeng-mxcvLLM / vllm:0.13.0 / Python 3.10 / maca 3.3.0.303

      镜像选择

2. 基础步骤

  • 进入算力容器,启动实例后,点击 JupyterLab 进入工作台。

    进入容器

3. 实现步骤

3.1 下载 LlamaIndex 与 Milvus Lite 框架

  • 创建终端窗口(Terminal)

    进入终端

  • 输入代码:

    pip install --target /data/llama_libs --no-deps -i https://mirrors.aliyun.com/pypi/simple/ -U \
    "pymilvus==2.6.6" milvus-lite orjson minio pathspec python-dateutil pytz six \
    llama-index-core llama-index-readers-file llama-index-llms-openai llama-index-llms-openai-like \
    llama-index-embeddings-huggingface llama-index-vector-stores-milvus llama-index-postprocessor-sbert-rerank \
    llama-index-instrumentation llama-index-workflows llama-index-utils-workflow \
    llama-index-retrievers-bm25 rank-bm25 bm25s PyStemmer \
    sentence-transformers pypdf docx2txt nest-asyncio ujson grpcio google-api-core protobuf banks griffe sqlalchemy dataclasses-json marshmallow typing-inspect fsspec filetype deprecated wrapt dirtyjson tenacity jinja2 pyyaml \
    pandas numpy nltk tiktoken requests charset-normalizer urllib3 certifi idna sniffio anyio h11 httpcore httpx mypy_extensions typing_extensions scikit-learn scipy joblib threadpoolctl tqdm pyarrow \
    ragas langchain-core langchain-openai langsmith requests_toolbelt "numpy<2.0" uuid_utils tenacity regex appdirs instructor docstring_parser langchain_community llama-index-llms-huggingface jsonpatch
    pip install griffe -t /data/llama_libs
    pip install tinytag -t /data/llama_libs
    pip install accelerate
  • 完成下载后,新建一个新的终端: 新建终端

3.2 启动 vLLM 推理

  • 在新的终端内输入代码:

    CUDA_VISIBLE_DEVICES=0 vllm serve /mnt/moark-models/Qwen3-8B --gpu-memory-utilization 0.7 --port 8000
  • 当终端提示INFO: Application startup compete,则完成vLLM启动步骤。 启动vLLM

3.3 创建并运行 Python 脚本

  • 点击 Python File:

    创建脚本

  • 输入代码:

    import sys, os, asyncio, nest_asyncio, torch, shutil, logging
    from transformers import AutoModelForCausalLM, AutoTokenizer
    from tqdm.asyncio import tqdm # 引入进度条库

    # --- 0. 日志降噪 (屏蔽 HTTP 刷屏) ---
    logging.getLogger("httpx").setLevel(logging.WARNING)
    logging.getLogger("httpcore").setLevel(logging.WARNING)
    logging.getLogger("openai").setLevel(logging.WARNING)

    # 1. 环境初始化与路径保护
    PRIVATE_LIB = "/data/llama_libs"
    if PRIVATE_LIB not in sys.path:
    sys.path.insert(0, PRIVATE_LIB)

    from llama_index.llms.huggingface import HuggingFaceLLM
    nest_asyncio.apply()

    # 核心组件导入
    from llama_index.core import SimpleDirectoryReader, VectorStoreIndex, StorageContext, Settings, PromptTemplate, load_index_from_storage, Document
    from llama_index.core.schema import TextNode
    from llama_index.embeddings.huggingface import HuggingFaceEmbedding
    from llama_index.llms.openai_like import OpenAILike
    from llama_index.vector_stores.milvus import MilvusVectorStore
    from llama_index.core.node_parser import HierarchicalNodeParser, get_leaf_nodes
    from llama_index.core.retrievers import RecursiveRetriever
    from llama_index.core.postprocessor import LLMRerank
    from llama_index.core.query_engine import RetrieverQueryEngine
    from llama_index.core import PropertyGraphIndex
    from llama_index.core.indices.property_graph import ImplicitPathExtractor, SimpleLLMPathExtractor

    # --- 全局配置参数 ---
    DATA_DIR = "/mnt/moark-models/deepseek_papers"
    EMBED_PATH = "/mnt/moark-models/Qwen3-Embedding-8B"
    LLM_MODEL = "/mnt/moark-models/Qwen3-8B"
    ROUTER_MODEL_PATH = "/mnt/moark-models/Qwen3-0.6B"
    GRAPH_STORAGE_DIR = "./graph_storage_final"
    MILVUS_FILE = "./modular_rag_final.db"

    # --- 控制开关 ---
    FORCE_REBUILD_VECTOR = False
    FORCE_REBUILD_GRAPH = False

    # --- 工具函数 ---

    def clean_think_tag(text):
    text = str(text).strip()
    if not text: return "(正在组织语言...)"
    if "</think>" in text:
    return text.split("</think>")[-1].strip()
    return text

    def generate_hypothetical_answer(query, llm):
    hyde_prompt = f"针对以下问题写一段技术性模拟回答(仅限技术语境):\n问题:{query}\n回答:"
    response = llm.complete(hyde_prompt)
    return clean_think_tag(response.text)

    def get_intent_via_router(user_input, router_llm):
    val = user_input.lower()
    if any(k in val for k in ["多少篇", "几篇", "清单", "列表", "文件名", "哪些论文", "库里有什么"]): return "META_QUERY"
    if any(k in val for k in ["关系", "演进", "对比", "联系", "改进", "区别", "架构"]): return "GRAPH_QUERY"
    if any(g in val for g in ["你好", "您好", "你是谁", "在吗"]) or len(val) < 4: return "GREETING"
    if any(s in val for s in ["总结", "核心", "大意", "概括", "讲了什么"]): return "SUMMARY"
    return "TECH_QUERY"

    def verify_relevance(query, response_nodes, llm):
    if not response_nodes: return "LOW"
    context_preview = "\n".join([n.node.get_content()[:200] for n in response_nodes[:3]])
    grade_prompt = f"问题:{query}\n资料:{context_preview}\n判断资料是否包含答案。只回答:[YES] 或 [NO]。"
    res = llm.complete(grade_prompt)
    return "HIGH" if "[YES]" in res.text.upper() else "LOW"

    def rewrite_query_logic(query, llm):
    res = llm.complete(f"将问题 '{query}' 改写为2个学术搜索词,分号隔开。")
    return res.text.strip()

    def compress_context_nodes(query, source_nodes, llm):
    refined_facts = []
    for node in source_nodes[:3]:
    refine_prompt = f"提取与‘{query}’相关的核心技术事实:\n内容:{node.node.get_content()[:800]}"
    res = llm.complete(refine_prompt)
    fact = clean_think_tag(res.text)
    if len(fact) > 15: refined_facts.append(f"• {fact}")
    return "\n".join(refined_facts) if refined_facts else "未发现显著事实。"

    # --- 核心:数据完整性校验 ---
    def validate_knowledge_base(source_docs, vector_index, graph_index):
    print("\n" + "="*15 + " [知识库完整性校验报告] " + "="*15)

    # 统计独立文件数
    unique_files = set([d.metadata.get('file_name', 'unknown') for d in source_docs])
    print(f"1. 本地源文件: {len(unique_files)} 篇 PDF (共 {len(source_docs)} 页切片)")

    print(f"2. 向量检索库: {'[已加载]' if vector_index else '[未初始化]'}")

    graph_status = "未构建"
    if graph_index:
    try:
    triplet_count = len(graph_index.property_graph_store.get_triplets(limit=10000))
    graph_status = f"正常 (包含约 {triplet_count}+ 关系边)"
    except:
    graph_status = "已加载"
    print(f"3. 知识图谱: {graph_status}")

    if len(source_docs) > 0 and not vector_index:
    print("\n[警告] 向量库缺失!")
    else:
    print("\n[结果] 系统就绪,数据加载完整。")
    print("="*50 + "\n")

    # --- 核心优化:生成摘要节点 (按文件分组 + 进度条) ---
    async def generate_summary_nodes(documents, llm):
    # 1. 将页面按文件名分组
    doc_map = {}
    for doc in documents:
    fname = doc.metadata.get('file_name', 'unknown_file')
    if fname not in doc_map:
    doc_map[fname] = []
    doc_map[fname].append(doc)

    unique_files = list(doc_map.keys())
    print(f">>> 正在处理 {len(unique_files)} 篇独立文档 (已自动合并 {len(documents)} 个页面切片)...")

    semaphore = asyncio.Semaphore(5) # 允许5个并发请求

    async def process_file_summary(fname):
    async with semaphore:
    pages = doc_map[fname]
    context_text = "\n".join([p.get_content() for p in pages[:5]])[:7000] # 截取前7000字符

    prompt = (
    f"请阅读以下论文《{fname}》的片段,生成一份包含核心概念、技术架构和关键结论的详细摘要。"
    "摘要不需要寒暄,直接输出技术干货,重点描述实体之间的关系。"
    f"\n\n原文片段:\n{context_text}..."
    )

    try:
    response = await llm.acomplete(prompt)
    summary_text = clean_think_tag(response.text)
    base_meta = pages[0].metadata
    node = TextNode(text=f"论文《{fname}》的核心摘要:\n{summary_text}", metadata=base_meta)
    return node
    except Exception as e:
    # print(f" × 生成失败 {fname}: {e}") # 进度条模式下减少print
    return None

    tasks = [process_file_summary(fname) for fname in unique_files]
    results = []

    for f in tqdm.as_completed(tasks, desc="生成图谱摘要", unit="篇"):
    res = await f
    if res: results.append(res)

    return results

    # GraphRAG 构建
    async def build_graph_index(summary_nodes, llm):
    print(f"\n>>> 正在基于 {len(summary_nodes)} 个摘要节点构建语义图谱...")
    # 增加提取数量,因为现在是针对全篇摘要
    kg_extractor = SimpleLLMPathExtractor(llm=llm, max_paths_per_chunk=15)

    index = PropertyGraphIndex(
    summary_nodes,
    path_extractors=[kg_extractor, ImplicitPathExtractor()],
    llm=llm,
    show_progress=True,
    embed_model=Settings.embed_model
    )
    return index

    # --- 主程序 ---

    async def main():
    # --- Step 1: 初始化 ---
    print(">>> 正在启动 64GB 显存...")
    Settings.embed_batch_size = 30
    Settings.embed_model = HuggingFaceEmbedding(model_name=EMBED_PATH, device="cuda:1", trust_remote_code=True, model_kwargs={"torch_dtype": torch.float16})
    Settings.llm = OpenAILike(model=LLM_MODEL, api_base="http://localhost:8000/v1", api_key="fake", is_chat_model=True, timeout=120.0)
    router_llm = HuggingFaceLLM(model_name=ROUTER_MODEL_PATH, tokenizer_name=ROUTER_MODEL_PATH, device_map="cuda:1", generate_kwargs={"temperature": 0.0, "max_new_tokens": 15})

    reader = SimpleDirectoryReader(input_dir=DATA_DIR, recursive=True)
    raw_documents = reader.load_data()

    print(f" (正在预检 {len(raw_documents)} 页原始切片...)")
    documents = []
    for doc in raw_documents:
    original_text = doc.get_content() or ""
    if original_text:
    cleaned_text = original_text.encode('utf-8', 'ignore').decode('utf-8')
    new_doc = Document(
    text=cleaned_text, metadata=doc.metadata or {},
    excluded_embed_metadata_keys=doc.excluded_embed_metadata_keys or [],
    excluded_llm_metadata_keys=doc.excluded_llm_metadata_keys or [],
    id_=doc.id_
    )
    documents.append(new_doc)

    node_parser = HierarchicalNodeParser.from_defaults(chunk_sizes=[1536, 512, 256])
    all_nodes = node_parser.get_nodes_from_documents(documents)

    # --- Step 2: 向量库逻辑 ---
    vector_index = None
    milvus_exists = os.path.exists(MILVUS_FILE) and not FORCE_REBUILD_VECTOR

    if milvus_exists:
    print(">>> 检测到现有 Milvus 数据库,正在直接加载...")
    try:
    vector_store = MilvusVectorStore(uri=MILVUS_FILE, dim=4096, overwrite=False)
    vector_index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
    print(" √ 向量库加载成功!")
    except Exception as e:
    print(f" × 加载失败 ({e}),准备重建...")
    milvus_exists = False

    if not milvus_exists:
    print(">>> 正在初始化 Milvus 向量库...")
    leaf_nodes = []
    for n in get_leaf_nodes(all_nodes):
    clean_text = "".join(ch for ch in str(n.text) if ch.isprintable()).strip()
    if clean_text and clean_text.lower() != "nan":
    n.text = clean_text
    leaf_nodes.append(n)

    vector_store = MilvusVectorStore(uri=MILVUS_FILE, dim=4096, overwrite=True)
    vec_storage_context = StorageContext.from_defaults(vector_store=vector_store)
    vec_storage_context.docstore.add_documents(all_nodes)
    vector_index = VectorStoreIndex(leaf_nodes, storage_context=vec_storage_context, show_progress=True)
    print(" √ 向量库构建完成。")

    # --- Step 3: 图谱索引加载与重构 ---
    graph_index = None
    graph_exists = os.path.exists(GRAPH_STORAGE_DIR) and not FORCE_REBUILD_GRAPH

    if graph_exists:
    print(">>> 检测到本地图谱,正在尝试加载...")
    try:
    graph_storage_context = StorageContext.from_defaults(persist_dir=GRAPH_STORAGE_DIR)
    graph_index = PropertyGraphIndex(nodes=[], storage_context=graph_storage_context, llm=Settings.llm, embed_model=Settings.embed_model)
    print(" √ 本地图谱加载成功。")
    except Exception as e:
    print(f" × 图谱加载失败 ({e}),准备自动重构...")
    graph_exists = False

    if not graph_exists:
    print(">>> 开始构建新的语义图谱...")
    if os.path.exists(GRAPH_STORAGE_DIR): shutil.rmtree(GRAPH_STORAGE_DIR)

    # 1. 生成摘要 (带进度条)
    summary_nodes = await generate_summary_nodes(documents, Settings.llm)
    # 2. 构建图谱
    graph_index = await build_graph_index(summary_nodes, Settings.llm)
    # 3. 持久化
    graph_index.storage_context.persist(persist_dir=GRAPH_STORAGE_DIR)
    print(f" √ 图谱构建成功并保存至 {GRAPH_STORAGE_DIR}")

    # --- Step 4: 校验与配置 ---
    validate_knowledge_base(documents, vector_index, graph_index)

    reranker = LLMRerank(llm=Settings.llm, choice_batch_size=10, top_n=5)
    query_engine = RetrieverQueryEngine.from_args(
    retriever=RecursiveRetriever("vector", retriever_dict={"vector": vector_index.as_retriever(similarity_top_k=10)}, node_dict={node.node_id: node for node in all_nodes}),
    node_postprocessors=[reranker]
    )

    # --- Step 5: 交互循环 ---
    print("\n" + "="*50 + "\n粒术 Modular RAG 已就绪!")
    chat_history = []

    while True:
    try:
    torch.cuda.empty_cache()
    raw_input = input("\n用户 >> ").strip()
    if raw_input.lower() in ['exit', 'quit', '退出']: break
    if not raw_input: continue
    user_input = raw_input.encode('utf-8', 'ignore').decode('utf-8')

    intent = get_intent_via_router(user_input, router_llm)
    print(f" (意图识别: {intent})")

    if intent == "GREETING":
    res = Settings.llm.complete(f"你是论文专家粒术。回应问候:{user_input}。")
    print(f"\n粒术 >> {clean_think_tag(res.text)}")

    elif intent == "META_QUERY":

    unique_files = list(set([n.metadata.get('file_name', '未知文档') for n in documents]))
    res_text = f"粒术守护着 **{len(unique_files)}** 篇 DeepSeek 论文。\n清单前5项:\n" + "\n".join([f"- {d}" for d in unique_files[:5]])
    if len(unique_files) > 5: res_text += f"\n...等共 {len(unique_files)} 篇"
    print(f"\n粒术 >> {res_text}")

    elif intent == "SUMMARY":
    response = query_engine.query(f"请基于全篇论文深度总结:{user_input}")
    print(f"\n粒术 >> {clean_think_tag(response.response)}")

    elif intent == "GRAPH_QUERY" and graph_index:
    print(" (检索语义图谱路径 [基于宏观摘要]...)")
    graph_retriever = graph_index.as_retriever(include_text=True, similarity_top_k=3)
    nodes = graph_retriever.retrieve(user_input)
    graph_context = "\n".join([n.node.get_content() for n in nodes])
    res = Settings.llm.complete(f"结合图谱逻辑(基于论文核心摘要)回答:{user_input}\n宏观背景:\n{graph_context}")
    print(f"\n粒术(图谱增强) >> {clean_think_tag(res.text)}")

    else:
    max_retries = 2
    attempt, current_query, final_answer = 0, user_input, ""
    full_context_query = f"历史:{chat_history[-1]['user']}\n问题:{user_input}" if chat_history else user_input

    while attempt < max_retries:
    attempt += 1
    print(f" (深度融合检索尝试 {attempt}...)")
    hyde_doc = generate_hypothetical_answer(full_context_query, Settings.llm)

    v_res = query_engine.query(f"问题:{full_context_query}\n背景:{hyde_doc}")
    g_nodes = []
    if graph_index:
    g_nodes = graph_index.as_retriever(include_text=True, similarity_top_k=2).retrieve(user_input)

    combined_nodes = v_res.source_nodes + g_nodes
    score = verify_relevance(user_input, combined_nodes, Settings.llm)

    if score == "HIGH" or attempt == max_retries:
    comp_context = compress_context_nodes(user_input, combined_nodes, Settings.llm)
    final_prompt = (
    "【严格指令】你只能使用资料回答。严禁提及外部信息!\n"
    f"参考事实:\n{comp_context}\n问题:{user_input}\n回答:"
    )
    final_res = Settings.llm.complete(final_prompt)
    final_answer = clean_think_tag(final_res.text)
    break
    current_query = rewrite_query_logic(current_query, Settings.llm)
    print(f"\n粒术 >> {final_answer}")
    chat_history.append({"user": user_input, "assistant": final_answer})
    if len(chat_history) > 5: chat_history.pop(0)

    except Exception as e:
    print(f"\n 自动修复中: {e}")

    if __name__ == "__main__":
    asyncio.run(main())
  • Ctrl + S保存文件,并完成文件命名test。新建一个终端,输入python test.py,即可进入 Modular RAG 系统。

    运行结果 运行结果

四、燧原 (Enflame) 部署指南

本章节适用于 燧原 S60 等燧原系列算力卡。

1. 硬件与基础环境

  • 算力型号:燧原 S60(48GB) * 2
  • 算力主机:bd-suiyuan-nodevLLM / 0.11.0 / Python 3.12 / ef 1.7.0.14 镜像选择

2. 基础步骤

  • 进入算力容器,启动实例后,点击 JupyterLab 进入工作台。

    进入容器

3. 实现步骤

3.1 下载 LlamaIndex 与 Milvus Lite 框架

  • 创建终端窗口(Terminal)

    进入终端

  • 输入代码:

    pip install --break-system-packages --target /data/llama_libs --no-deps -i https://mirrors.aliyun.com/pypi/simple/ -U \
    "pymilvus==2.6.6" milvus-lite orjson minio pathspec python-dateutil pytz six \
    llama-index-core llama-index-readers-file llama-index-llms-openai llama-index-llms-openai-like \
    llama-index-embeddings-huggingface llama-index-vector-stores-milvus llama-index-postprocessor-sbert-rerank \
    llama-index-instrumentation llama-index-workflows llama-index-utils-workflow \
    llama-index-retrievers-bm25 rank-bm25 bm25s PyStemmer \
    sentence-transformers pypdf docx2txt nest-asyncio ujson grpcio google-api-core protobuf banks griffe sqlalchemy dataclasses-json marshmallow typing-inspect fsspec filetype deprecated wrapt dirtyjson tenacity jinja2 pyyaml \
    pandas numpy nltk tiktoken requests charset-normalizer urllib3 certifi idna sniffio anyio h11 httpcore httpx mypy_extensions typing_extensions scikit-learn scipy joblib threadpoolctl tqdm pyarrow \
    ragas langchain-core langchain-openai langsmith requests_toolbelt "numpy<2.0" uuid_utils tenacity regex appdirs instructor docstring_parser langchain_community llama-index-llms-huggingface jsonpatch

    pip install --break-system-packages griffe -t /data/llama_libs

    pip install --break-system-packages tinytag -t /data/llama_libs

    pip install --break-system-packages accelerate
  • 完成下载后,新建一个新的终端: 新建终端

3.2 启动 vLLM 推理

  • 在新的终端内输入代码:

    CUDA_VISIBLE_DEVICES=0 vllm serve /mnt/moark-models/Qwen3-8B --gpu-memory-utilization 0.7 --port 8000
  • 当终端提示INFO: Application startup compete,则完成vLLM启动步骤。 启动vLLM

3.3 创建并运行 Python 脚本

  • 点击 Python File:

    创建脚本

  • 输入代码:

    import sys, os, asyncio, nest_asyncio, torch
    import torch_gcu
    from torch_gcu import transfer_to_gcu

    PRIVATE_LIB = "/data/llama_libs"
    if PRIVATE_LIB not in sys.path:
    sys.path.insert(0, PRIVATE_LIB)

    nest_asyncio.apply()

    from datasets import Dataset
    from ragas import evaluate, RunConfig
    from ragas.metrics import Faithfulness, AnswerRelevancy, ContextRecall
    from ragas.llms import LlamaIndexLLMWrapper
    from ragas.embeddings import LlamaIndexEmbeddingsWrapper

    from llama_index.core import SimpleDirectoryReader, VectorStoreIndex, StorageContext, Settings, PromptTemplate, Document
    from llama_index.embeddings.huggingface import HuggingFaceEmbedding
    from llama_index.llms.openai_like import OpenAILike
    from llama_index.vector_stores.milvus import MilvusVectorStore
    from llama_index.core.node_parser import HierarchicalNodeParser, get_leaf_nodes
    from llama_index.core.retrievers import RecursiveRetriever, QueryFusionRetriever
    from llama_index.core.postprocessor import LLMRerank
    from llama_index.core.query_engine import RetrieverQueryEngine
    from llama_index.retrievers.bm25 import BM25Retriever


    def clean_document_content(documents):
    print(f">>> 正在清洗 {len(documents)} 篇文档的 Unicode 编码...")
    cleaned_docs = []

    for doc in documents:
    original_text = doc.get_content() or ""

    if original_text:
    cleaned_text = original_text.encode('utf-8', 'ignore').decode('utf-8')

    new_doc = Document(
    text=cleaned_text,
    metadata=doc.metadata or {},
    excluded_embed_metadata_keys=doc.excluded_embed_metadata_keys or [],
    excluded_llm_metadata_keys=doc.excluded_llm_metadata_keys or [],
    id_=doc.id_
    )
    cleaned_docs.append(new_doc)

    print(f">>> 清洗完成,有效文档数: {len(cleaned_docs)}")
    return cleaned_docs


    def clean_think_tag(text):
    text = str(text).strip()
    if not text:
    return "(粒术正在组织语言,请再试一次)"
    if "</think>" in text:
    res = text.split("</think>")[-1].strip()
    return res if res else "(粒术思考了很久,但没能生成有效回答)"
    return text


    def get_retrieval_strategy(user_input):
    """根据问题复杂度动态决定查询改写次数"""
    vague_keywords = ["总结", "概括", "讲了什么", "主要内容", "核心思想", "这本书", "介绍", "你好", "是谁"]
    if len(user_input) < 10 or any(k in user_input for k in vague_keywords):
    return 1
    return 3


    DATA_DIR = "/mnt/moark-models/deepseek_papers"
    EMBED_PATH = "/mnt/moark-models/Qwen3-Embedding-8B"
    LLM_MODEL = "/mnt/moark-models/Qwen3-8B"


    async def main():
    print(">>> 正在初始化 8B 规模双模型...")
    Settings.embed_model = HuggingFaceEmbedding(
    model_name=EMBED_PATH, device="cuda", trust_remote_code=True,
    model_kwargs={"torch_dtype": torch.float16}
    )
    Settings.llm = OpenAILike(
    model=LLM_MODEL, api_base="http://localhost:8000/v1",
    api_key="fake", is_chat_model=True, timeout=120.0
    )

    print(">>> 正在解析文档并进行深度清理...")
    reader = SimpleDirectoryReader(input_dir=DATA_DIR, recursive=True)
    raw_documents = reader.load_data()

    documents = clean_document_content(raw_documents)

    node_parser = HierarchicalNodeParser.from_defaults(chunk_sizes=[1536, 512, 256])
    all_nodes = node_parser.get_nodes_from_documents(documents)
    raw_leaf_nodes = get_leaf_nodes(all_nodes)

    leaf_nodes = []
    for n in raw_leaf_nodes:
    text_content = n.get_content()
    if text_content is not None and isinstance(text_content, str):
    clean_text = text_content.encode('utf-8', 'ignore').decode('utf-8')
    if clean_text.strip():
    new_node = Document(
    text=clean_text.strip(),
    metadata=n.metadata if hasattr(n, 'metadata') else {},
    id_=n.node_id
    )
    leaf_nodes.append(new_node)
    else:
    preview = str(text_content)[:20] if text_content else "None"
    print(f" 跳过非法节点: 类型={type(text_content)}, 预览={preview}")

    print(f">>> 深度过滤完成,有效叶子节点: {len(leaf_nodes)} / 原始: {len(raw_leaf_nodes)}")

    vector_store = MilvusVectorStore(uri="./advanced_rag_final.db", dim=4096, overwrite=True)
    storage_context = StorageContext.from_defaults(vector_store=vector_store)
    storage_context.docstore.add_documents(all_nodes)

    index = VectorStoreIndex(leaf_nodes, storage_context=storage_context, show_progress=True)

    vector_retriever = index.as_retriever(similarity_top_k=12)
    bm25_retriever = BM25Retriever.from_defaults(nodes=leaf_nodes, similarity_top_k=12)

    fusion_retriever = QueryFusionRetriever(
    [vector_retriever, bm25_retriever],
    similarity_top_k=12,
    num_queries=1,
    mode="reciprocal_rerank",
    use_async=False
    )

    recursive_retriever = RecursiveRetriever(
    "vector",
    retriever_dict={"vector": fusion_retriever},
    node_dict={node.node_id: node for node in all_nodes}
    )

    reranker = LLMRerank(llm=Settings.llm, choice_batch_size=15, top_n=7)

    qa_prompt_tmpl = PromptTemplate(
    "【身份设定】你是DeepSeek RAG 知识库 AI 助理,名字叫做'粒术'。请优先判断用户的意图\n\n"
    "【场景 A:社交寒暄】\n"
    "如果用户在打招呼(如:你好、早上好)或询问你是谁,请忽略下方的参考资料,用温暖、幽默、有礼貌的口吻直接回答。\n\n"
    "【场景 B:技术咨询】\n"
    "如果涉及 DeepSeek 论文或具体技术问题,请严格查阅资料:\n"
    "参考资料:\n---------------------\n{context_str}\n---------------------\n"
    "准则:1. 严禁发挥,必须引用资料数据。2. 资料未提及请幽默回复'大脑里没存这段信息'。3. 保持专业严谨,带一点点幽默。\n\n"
    "用户提问:{query_str}\n"
    )

    query_engine = RetrieverQueryEngine.from_args(
    retriever=recursive_retriever,
    node_postprocessors=[reranker],
    text_qa_template=qa_prompt_tmpl
    )

    print("\n" + "=" * 50 + "\nDeepSeek RAG 论文知识库已就绪!")

    while True:
    try:
    raw_input = input("\n用户 >> ").strip()
    if raw_input.lower() in ['exit', 'quit', '退出']:
    print("再见!祝你今天有个好心情。")
    break
    if not raw_input:
    continue

    user_input = raw_input.encode('utf-8', 'ignore').decode('utf-8')

    greetings = ["你好", "嗨", "hello", "你是谁", "早安", "午安", "在吗", "粒术"]
    is_greeting = any(g in user_input.lower() for g in greetings) or len(user_input) < 5

    if is_greeting:
    greeting_prompt = (
    "你是 DeepSeek 论文研究专家'粒术'。请用温暖、专业且富有逻辑的口吻回应用户的问候。"
    "在回复中明确告知用户:你已经深度研读了 DeepSeek 论文,专门负责解答论文中的技术架构、"
    "实验结论及算法细节。不要提及任何生活技巧或电脑维修等无关内容。"
    )
    fallback_res = Settings.llm.complete(f"{greeting_prompt}\n用户输入:{user_input}")
    print(f"\n粒术 >> {clean_think_tag(fallback_res.text)}")
    else:
    n_queries = get_retrieval_strategy(user_input)
    fusion_retriever.num_queries = n_queries

    response = query_engine.query(user_input)
    final_answer = clean_think_tag(response.response)

    if not final_answer or "Empty Response" in final_answer or len(final_answer) < 5:
    print("\n粒术 >> 抱歉,由于知识库中没有直接相关的细节,我暂时无法给出精准的技术回答。您可以尝试换个问法。")
    else:
    print(f"\n粒术 >> {final_answer}")

    except UnicodeEncodeError:
    print("输入包含非法字符,请尝试手动输入。")
    continue
    except Exception as e:
    print(f"\n粒术 >> 抱歉,处理该请求时遇到了一些技术困难,请尝试换个问法或稍后再试。")
    print(f"DEBUG ERROR: {type(e).__name__}: {e}")
    continue


    if __name__ == "__main__":
    try:
    asyncio.run(main())
    except KeyboardInterrupt:
    pass

  • Ctrl + S保存文件,并完成文件命名test。新建一个终端,输入python test.py,即可进入Modular RAG 系统。

    运行结果