2026/4/17 7:23:24
网站建设
项目流程
网站建设企业有哪些,网络推广龙岗比较好的,四川省第十一建筑公司官网,菏泽建设网站过去一周#xff0c;每天公众号和小红薯后台咨询RAG企业应用落地的私信又多了起来#xff0c;目前已经从10稳定在了 20。承接了其中几个需求比较明确的项目#xff0c;#xff08;目前3月底前排期已满#xff09;#xff0c;剩下的小部分接受了付费咨询。
在这个过程中很…过去一周每天公众号和小红薯后台咨询RAG企业应用落地的私信又多了起来目前已经从10稳定在了 20。承接了其中几个需求比较明确的项目目前3月底前排期已满剩下的小部分接受了付费咨询。在这个过程中很高兴能了解到很多之前没涉猎过的行业和场景其中经常被集中问到的问题大概有四个中小企业做RAG知识库落地选择框架哪个比较好如果选择 RAGFlow 如何进行定制化开发如何对文档中的图片进行识别检索如何对复杂文档进行动态分块等。关于选择什么框架的问题仁者见仁需要综合对比的盆友可以参考我之前发的那篇文章企业RAG落地避坑指南自主开发 vs 三大框架核心配置与选型全解析。这篇以 RAGFlow 框架为例针对上述后三个问题结合目前团队实践经验给各位做个分享大家辨证参考。毕竟RAG并没有“一招鲜”的神奇魔法传说那几个大厂手里掌握的RAG”核心技术“私以为也是经过了必要且复杂的“策略优化-管道设计-训练-调优-发布”等专业开发流程不过成熟的开源应用框架无疑是更有想象空间的社会化大创新。注本篇会比较偏开发导向非技术向盆友选择性翻翻就好。以下enjoy:1、优化实施路线图以上篇介绍的机械加工行业的维保场景为例参考文章RAGFlowDeepSeek-R1:14b落地案例分享足够详细机加工行业设备维保场景推荐先采用官方的 Python API 进行合理的优化配置后再修改项目源码最后根据业务目标做必要的高级优化拓展。此处附上一张个人项目实施过程中积累的些优化要点供各位参考1.1、阶段一API 优化配置配置不同文档类型的解析策略调整检索参数优化语义搜索质量定制大模型提示词以适应机械行业特点1.2阶段二基础源码修改实现专业术语处理模块开发查询路由机制增加上下文增强功能1.3、阶段三高级优化扩展实现多级索引结构开发高性能缓存机制添加查询日志分析系统2、6 个官方 Python API 优化全解析根据 RAGFlow 官方 Python API 文档为大家完整梳理了所有可进行 API 优化的模块和参数。这些 API 调优可以不修改源码直接通过参数配置实现性能提升。建议在看完之后还是去官网看下原文亲自动手试过一遍会有不一样的体感。原文出处https://ragflow.io/docs/dev/python_api_reference2.1、数据集管理创建数据集 create_datasetRAGFlow.create_dataset( name: str, avatar: str , description: str , embedding_model: str BAAI/bge-large-zh-v1.5, language: str English, permission: str me, chunk_method: str naive, parser_config: DataSet.ParserConfig None )优化参数embedding_model: 选择合适的嵌入模型影响检索质量中文场景推荐: BAAI/bge-large-zh-v1.5英文场景可选: BAAI/bge-large-en-v1.5language: 选择与文档匹配的语言chunk_method: 关键参数根据文档类型选择最佳分块策略naive: 通用文档paper: 论文/设备手册book: 结构化书籍table: 表格数据qa: 问答格式文档picture: 图片文档one: 整个文档作为一个块knowledge_graph: 知识图谱parser_config: 精细调整解析配置chunk_token_num: 控制分块大小delimiter: 自定义分隔符layout_recognize: 是否启用布局识别raptor: 高级解析选项更新数据集 DataSet.updateDataSet.update(update_message : dict)优化参数:embedding_model: 更换更适合的嵌入模型chunk_method: 调整分块策略meta_fields: 更新元数据字段2.2、文件管理 (FILE MANAGEMENT)上传文档 DataSet.upload_documentsDataSet.upload_documents(document_list : list[dict])优化参数:display_name: 文件显示名方便检索与管理blob: 文件内容更新文档 Document.updateDocument.update(update_message : dict)优化参数:chunk_method: 文档分块方法parser_config: 文档解析配置对于图文文档可设置: layout_recognize: True对于表格文档可设置: html4excel: True解析文档DataSet.async_parse_documents(document_ids : list[str])用于触发文档解析流程支持批量处理。2.3、分块管理 (CHUNK MANAGEMENT)添加分块 Document.add_chunkDocument.add_chunk(content: str, important_keywords: list[str] [])优化参数:important_keywords: 关键词标注增强检索相关性更新分块 Chunk.updateChunk.update(update_message : dict)优化参数:content: 更新分块内容important_keywords: 更新关键词available: 控制分块可用性检索 RAGFlow.retrieve(关键API)RAGFlow.retrieve( question: str , dataset_ids: list[str] None, document_ids: list[str] None, page: int 1, page_size: int 30, similarity_threshold: float 0.2, vector_similarity_weight: float 0.3, top_k: int 1024, rerank_id: str None, keyword: bool False, highlight: bool False )优化参数 (最重要的检索相关参数):similarity_threshold: 相似度阈值影响召回范围vector_similarity_weight: 向量相似度权重与关键词匹配权重的比例设置为 0-1 之间值越大向量权重越高工业领域建议参考0.3-0.5平衡专业术语与语义理解top_k: 参与向量检索的 chunk 数量影响检索范围rerank_id: 重排序模型 ID提升检索精度keyword:开启关键词匹配对专业领域极其有用highlight: 高亮匹配内容帮助理解匹配原因2.4、聊天助手管理创建聊天助手 RAGFlow.create_chatRAGFlow.create_chat( name: str, avatar: str , dataset_ids: list[str] [], llm: Chat.LLM None, prompt: Chat.Prompt None )优化参数:llm: LLM 模型配置model_name: 模型名称temperature: 温度影响创造性top_p: 词汇采样范围presence_penalty: 重复惩罚frequency_penalty: 频率惩罚max_token: 最大输出 token 数prompt: 提示词配置similarity_threshold: 相似度阈值keywords_similarity_weight: 关键词相似度权重top_n: 提供给 LLM 的 chunk 数量rerank_model: 重排序模型top_k: 重排序参与的候选数量 empty_response: 无匹配时的回复show_quote: 是否显示引用来源prompt: 系统提示词内容更新聊天助手 Chat.updateChat.update(update_message : dict)优化参数: 同 create_chat 中的参数2.5、会话管理 (SESSION MANAGEMENT)创建会话 Chat.create_sessionChat.create_session(name: str New session)提问 Session.askSession.ask(question: str , stream: bool False, **kwargs)优化参数:stream: 流式输出提升用户体验**kwargs: 可传递给 prompt 中定义的变量2.6、代理管理 (AGENT MANAGEMENT)创建代理会话 Agent.create_sessionAgent.create_session(id, rag, **kwargs)代理提问 Session.askSession.ask(question: str , stream: bool False)与普通会话的ask方法类似。3、调整项目源码思路参考3.1、专业术语处理需要在检索引擎层面添加工业领域同义词和术语映射# 需要修改源码的示例逻辑 class CustomTerminologyProcessor: def __init__(self, terminology_mapping): self.terminology_mapping terminology_mapping # 同义词映射表 def process_query(self, query): # 专业术语标准化 # 车间俚语转换为标准术语 processed_query query for slang, standard_term in self.terminology_mapping.items(): processed_query processed_query.replace(slang, standard_term) return processed_query修改点在查询预处理阶段添加定制的术语处理模块需要在 RAGFlow 的查询管道中修改源码添加此功能3.2、多级索引结构实现需要定制 Milvus 索引策略实现基础索引层和语义索引层的混合索引# 这部分需要修改源码以下是概念性代码 class CustomHybridIndexBuilder: def __init__(self, vector_db_client): self.client vector_db_client def create_scalar_indices(self, collection_name, fields): # 创建设备编号、故障代码等标量索引 for field in fields: self.client.create_index(collection_name, field, scalar) def create_vector_indices(self, collection_name, fields): # 创建向量索引 for field in fields: self.client.create_index(collection_name, field, {index_type: HNSW, params: {M: 16, efConstruction: 200}})修改点修改 RAGFlow 的索引构建模块扩展Milvus 客户端接口以支持多索引策略3.3、查询路由设计需要实现定制化的查询路由逻辑识别不同类型的查询并路由到最合适的检索通道# 查询路由器 - 需要源码修改实现 class QueryRouter: def route_query(self, query_text): if self._is_equipment_code(query_text): return exact_match, {field: equipment_code} elif self._is_fault_code(query_text): return exact_match, {field: fault_code} elif self._is_parameter_query(query_text): return parameter_lookup, {field: parameter_name} else: return semantic_search, {model: embedding_model}修改点在 RAGFlow 的查询处理流程中添加查询分类和路由机制实现针对不同查询类型的专用处理通道3.4、上下文增强机制增加查询上下文增强融入设备信息、历史记录等# 上下文增强器 - 需要修改源码实现 class ContextEnhancer: def enhance_query(self, query, session_history, equipment_metadataNone): # 添加设备上下文信息 if equipment_metadata: query_context f设备型号: {equipment_metadata[model]}, 生产年份: {equipment_metadata[year]}\n query_context query # 添加历史查询信息 if session_history: relevant_history self._extract_relevant_history(session_history, query) query_context f参考历史信息: {relevant_history}\n query_context return query_context修改点修改会话管理模块实现会话状态跟踪增加设备元数据关联机制在查询处理流程中加入上下文增强步骤4、图文结合文档处理方案依然是两种方案直接使用RAGFlow API方案优势是更简单使用现有功能无需额外的模型调用也能够直接显示原始图片视觉效果更好处理速度更快不依赖外部 API当然成本也无疑更低。但使用一个多模态模型进行预处理的方案优势也很明显图片内容被转换为文本便于向量化和语义搜索也可以依托多模态模型的能力提供更丰富的图片内容解释。目前实际测试下来采用两种方案的组合效果更加稳定。4.1、使用多模态预处理生成图片描述# 使用多模态模型生成图片描述 processor MultimodalDocumentProcessor(api_keyYOUR_API_KEY) enhanced_docs processor.process_pdf(设备手册.pdf)4.2、使用 RAGFlow 处理和存储原始图片# 配置保留图片的数据集 dataset rag_object.create_dataset( name图文设备手册, chunk_methodpaper, parser_config{layout_recognize: True} ) # 上传原始PDF文档 with open(设备手册.pdf, rb) as f: dataset.upload_documents([{display_name: 设备手册.pdf, blob: f.read()}])4.3、创建能够提供文本描述和图片引用的助手assistant rag_object.create_chat( name图文设备专家, dataset_ids[dataset.id], promptChat.Prompt( prompt你是设备维修专家。回答时请同时提供: 1. 文字描述解释故障和解决方案 2. 引用相关图片包括图片描述 3. 告诉用户可以参考哪些图片获取更多信息 {knowledge} ) )4.4、源码修改的一些建议增强图片提取和处理修改 PDF 解析器更准确地绑定文本和相关图片增加图片内容分析功能自动标注图片类型如故障图、结构图等实现图文混合索引为图片创建特殊索引支持通过图片内容或相关文本检索图片在检索结果中包含图片 URL 或直接嵌入图片改进响应生成修改聊天助手的响应生成逻辑自动识别图片引用在生成的回答中包含相关图片或图片链接5、动态分块策略参考RAGFlow的chunk_method参数是在数据集级别或文档级别设置的不支持在单个文档内部动态切换不同的分块策略。但实际情况是同一文档中的不同部分可能需要不同的处理方式比如针对段落、图片、表格、图表等使用单一的分块策略很难同时兼顾所有这些内容类型的特点。5.1、4种分块策略对比源码修改方案修改 RAGFlow 的文档解析器使其能够识别文档中的不同部分并应用不同的分块策略但这需要深入修改 RAGFlow 的核心处理逻辑如果没有深入理解RAGFlow的全局代码建议不要这么做。文档预处理方案在上传到 RAGFlow 前预处理文档将其拆分成不同类型的子文档。例如将设备手册拆分为纯文本部分、表格部分、图文部分等然后分别上传到不同的数据集每个数据集使用适合的分块策略。这个方案优点是可以充分利用RAGFlow针对不同内容类型的专门分块策略但问题也很明显就是文档上下文被拆分可能影响整体理解。自定义分块方案不使用RAGFlow 的自动分块而是手动控制分块使用chunk_methodone将整个文档作为一个块导入然后使用自定义逻辑创建更细粒度的分块。这种做法无疑可以实现最精细的控制、保留文档完整性当然缺点就是实现上会相对麻烦。混合模型方案创建多个使用不同分块策略的数据集将同一文档上传到所有这些数据集在检索时查询所有数据集并合并结果。这种方法保持文档完整性但创建多个使用不同分块策略的副本会造成存储冗余检索时需要合并多个结果集。5.2、推荐自定义方案这种方法使用RAGFlow的API但完全控制分块过程最灵活且无需修改源码。完整保留原始文档:使用 chunk_methodone将文档整体上传保留文档的完整性和上下文关系自定义内容识别:使用 PyMuPDF 识别文档中的不同内容类型:文本、段落、表格内容图片及其相关描述、章节标题和结构动态创建精细分块:文本块: 基于段落和语义分界表格块: 保留表格结构和行列关系图文块: 关联图片和周围的描述文本添加分块类型标记:每个分块添加类型标识符[text], [table], [image]等方便检索时区分不同类型的内容丰富关键词提取:为每个分块提取相关关键词保留章节上下文信息回顾23年和24年国内大部分中小企业对大模型应用落地的态度大抵是23年在观望24年上半年利润和数字化底座都不错企业开始内部小范围试错发现效果不达预期后要么放弃要么就试图寻找一些外部解决方案。但对于大部分公司而言大模型在企业应用落地受限于比较贵的部署成本和复杂的技术门槛迟迟没有提上日程DeepSeek的1/20开源之后正在改变这一局面此刻我们或许也正处于一个关键的转折点之上。RAG的落地优化需要群策群力先前试着建了微信群和小红书的群活跃度比较差本想搜个知识星球加入交流下一时也没搜到于是我就建了个希望能吸引到一线的从业人员或者其他的积极行动者们充分交流。年费499限时3天免费体验有点小贵新手慎入。明天起正式发帖卖个期货内容方向如下已落地项目目前有3个中的技术方案及部分源码多个细分场景知识库应用的调研情况RAG相关系列论文和框架解读其他from ragflow_sdk import RAGFlow import fitz # PyMuPDF import re import pandas as pd import io import logging from typing import List, Dict, Any, Tuple logging.basicConfig(levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s) logger logging.getLogger(__name__) class DynamicChunker: 针对混合格式文档的自定义分块器 def __init__(self, api_key: str, base_url: str): self.rag_object RAGFlow(api_keyapi_key, base_urlbase_url) def process_document(self, pdf_path: str, dataset_name: str None) - str: 处理PDF文档并实现自定义分块 # 1. 创建或获取数据集 if dataset_name: datasets self.rag_object.list_datasets(namedataset_name) if datasets: dataset datasets[0] logger.info(f使用现有数据集: {dataset.name}, ID: {dataset.id}) else: dataset self.rag_object.create_dataset( namedataset_name, embedding_modelBAAI/bge-large-zh-v1.5, languageChinese, chunk_methodone # 使用one方式保持文档完整性 ) logger.info(f创建新数据集: {dataset.name}, ID: {dataset.id}) else: dataset self.rag_object.create_dataset( name混合文档库, embedding_modelBAAI/bge-large-zh-v1.5, languageChinese, chunk_methodone ) logger.info(f创建新数据集: {dataset.name}, ID: {dataset.id}) # 2. 上传文档 with open(pdf_path, rb) as f: document_blob f.read() docs dataset.upload_documents([{ display_name: pdf_path.split(/)[-1], blob: document_blob }]) if not docs: logger.error(文档上传失败) return None doc docs[0] logger.info(f文档上传成功, ID: {doc.id}) # 3. 分析文档结构并创建自定义分块 chunks self._create_custom_chunks(pdf_path) logger.info(f创建了 {len(chunks)} 个自定义分块) # 4. 手动添加分块到文档 chunk_ids [] for chunk in chunks: content chunk[content] keywords chunk[keywords] chunk_type chunk[type] # 添加分块类型标记以便未来检索 content_with_marker f[{chunk_type}]\n{content} # 添加分块 new_chunk doc.add_chunk( contentcontent_with_marker, important_keywordskeywords ) chunk_ids.append(new_chunk.id) logger.info(f添加分块: 类型{chunk_type}, ID{new_chunk.id}, 关键词数量{len(keywords)}) return doc.id def _create_custom_chunks(self, pdf_path: str) - List[Dict[str, Any]]: 分析PDF并创建自定义分块 pdf_doc fitz.open(pdf_path) chunks [] # 用于存储当前上下文 current_section current_text_chunk current_table_data [] current_keywords [] for page_idx, page in enumerate(pdf_doc): # 提取页面文本 text page.get_text() # 检测章节标题 section_headers self._detect_section_headers(text) if section_headers: # 如果有积累的文本块先保存 if current_text_chunk: chunks.append({ content: current_text_chunk, keywords: list(set(current_keywords)), type: text, section: current_section }) current_text_chunk current_keywords [] # 更新当前章节 current_section section_headers[0] current_keywords.append(current_section) # 检测表格 tables self._detect_tables(page) if tables: # 如果有积累的文本块先保存 if current_text_chunk: chunks.append({ content: current_text_chunk, keywords: list(set(current_keywords)), type: text, section: current_section }) current_text_chunk current_keywords [] # 处理表格 for table in tables: table_content self._format_table(table) table_keywords self._extract_keywords_from_table(table) chunks.append({ content: table_content, keywords: list(set(table_keywords current_keywords)), type: table, section: current_section }) # 检测图片 images self._detect_images(page) for img_idx, img in enumerate(images): # 提取图片周围的文本作为上下文 img_context self._extract_image_context(text, img[bbox]) # 如果找到图片相关文本 if img_context: # 如果有积累的文本块先保存 if current_text_chunk: chunks.append({ content: current_text_chunk, keywords: list(set(current_keywords)), type: text, section: current_section }) current_text_chunk current_keywords [] # 创建图文块 image_text_content f图片描述: {img_context}\n图片位置: 第{page_idx1}页 image_keywords self._extract_keywords(img_context) chunks.append({ content: image_text_content, keywords: list(set(image_keywords current_keywords)), type: image, section: current_section }) # 防止重复处理同一段文本 text text.replace(img_context, , 1) # 处理剩余文本 if text.strip(): # 按段落拆分 paragraphs self._split_into_paragraphs(text) for para in paragraphs: if len(para.strip()) 10: # 忽略太短的段落 continue # 累积文本直到达到合适的大小 current_text_chunk para \n\n current_keywords.extend(self._extract_keywords(para)) # 检查是否应该创建新的文本块 if len(current_text_chunk) 1500: # 大约300-500个词 chunks.append({ content: current_text_chunk, keywords: list(set(current_keywords)), type: text, section: current_section }) current_text_chunk current_keywords [] # 处理最后一个文本块 if current_text_chunk: chunks.append({ content: current_text_chunk, keywords: list(set(current_keywords)), type: text, section: current_section }) return chunks def _detect_section_headers(self, text: str) - List[str]: 检测章节标题 # 这是一个简化的实现可以根据实际文档格式调整 header_patterns [ r^第[一二三四五六七八九十\d]章\s*(.)$, r^\d\.\d*\s(.)$, r^[一二三四五六七八九十][、.]\s*(.)$ ] headers [] for line in text.split(\n): line line.strip() for pattern in header_patterns: match re.match(pattern, line) if match: headers.append(line) break return headers def _detect_tables(self, page) - List[Any]: 检测页面中的表格 # 这里使用简化的检测逻辑实际应用中可能需要更复杂的表格检测算法 # 例如可以寻找包含多个垂直和水平线的区域 tables [] # 简单表格检测查找水平线和垂直线的集中区域 # 这里仅作为占位示例实际实现会更复杂 horizontal_lines [] vertical_lines [] for drawing in page.get_drawings(): for item in drawing[items]: if item[type] l: # 线段 x0, y0, x1, y1 item[rect] if abs(y1 - y0) 2: # 水平线 horizontal_lines.append((x0, y0, x1, y1)) elif abs(x1 - x0) 2: # 垂直线 vertical_lines.append((x0, y0, x1, y1)) # 简单的表格判定有足够多的水平线和垂直线 if len(horizontal_lines) 3 and len(vertical_lines) 2: # 找出所有线的边界作为表格边界 min_x min([min(l[0], l[2]) for l in horizontal_lines vertical_lines]) max_x max([max(l[0], l[2]) for l in horizontal_lines vertical_lines]) min_y min([min(l[1], l[3]) for l in horizontal_lines vertical_lines]) max_y max([max(l[1], l[3]) for l in horizontal_lines vertical_lines]) # 提取表格区域的文本 table_rect fitz.Rect(min_x, min_y, max_x, max_y) table_text page.get_text(text, cliptable_rect) # 简化的表格结构化实际应用中需要更复杂的逻辑 table_rows table_text.split(\n) table [] for row in table_rows: if row.strip(): cells row.split() if len(cells) 2: # 至少有2个单元格才视为有效行 table.append(cells) if table: tables.append(table) return tables def _detect_images(self, page) - List[Dict[str, Any]]: 检测页面中的图片 images [] # 获取页面上的图片对象 img_list page.get_images(fullTrue) for img_idx, img_info in enumerate(img_list): xref img_info[0] base_image page.parent.extract_image(xref) if base_image: # 寻找图片在页面上的位置 img_rects [] for rect in page.get_image_rects(xref): img_rects.append(rect) if img_rects: # 使用第一个找到的位置 bbox img_rects[0] images.append({ bbox: [bbox.x0, bbox.y0, bbox.x1, bbox.y1] }) return images def _extract_image_context(self, text: str, bbox: List[float], context_size: int 200) - str: 提取图片周围的文本作为上下文 # 在文本中查找可能的图片标题如图1Figure 1等 caption_patterns [ r图\s*\d[\.:]?\s*(.?)(?:\n|$), rFigure\s*\d[\.:]?\s*(.?)(?:\n|$), r图表\s*\d[\.:]?\s*(.?)(?:\n|$) ] for pattern in caption_patterns: matches re.finditer(pattern, text, re.IGNORECASE) for match in matches: return match.group(0) # 如果找不到明确的图片标题则尝试提取图片周围的文本 # 这是一个简化实现实际应用中可能需要更复杂的逻辑 lines text.split(\n) total_length 0 start_line 0 # 估计图片在文本中的位置 # 这是一个非常粗略的估计实际应用中需要更精确的方法 relative_position bbox[1] / 1000 # 假设页面高度为1000 target_position int(len(text) * relative_position) # 找到大致对应的行 for i, line in enumerate(lines): total_length len(line) 1 # 1 for newline if total_length target_position: start_line max(0, i - 2) # 从前两行开始 break # 提取上下文当前行及其前后几行 context_lines lines[max(0, start_line):min(len(lines), start_line 5)] return \n.join(line for line in context_lines if len(line.strip()) 5) def _format_table(self, table: List[List[str]]) - str: 格式化表格为文本格式 if not table: return # 创建pandas DataFrame df pd.DataFrame(table[1:], columnstable[0] if len(table) 1 else None) # 转换为字符串形式 result io.StringIO() df.to_csv(result, sep|, indexFalse) return result.getvalue() def _extract_keywords_from_table(self, table: List[List[str]]) - List[str]: 从表格中提取关键词 keywords [] # 表头作为关键词 if table and len(table) 0: keywords.extend(table[0]) # 第一列可能是行标题也加入关键词 for row in table[1:] if len(table) 1 else []: if row and len(row) 0: keywords.append(row[0]) return keywords def _split_into_paragraphs(self, text: str) - List[str]: 将文本拆分为段落 # 按照多个换行符拆分 paragraphs re.split(r\n\s*\n, text) return [p.strip() for p in paragraphs if p.strip()] def _extract_keywords(self, text: str) - List[str]: 从文本中提取关键词 # 这是一个简化的关键词提取方法 # 实际应用中可以使用更复杂的NLP技术如TF-IDF、TextRank等 # 1. 移除停用词和标点 stop_words {的, 了, 和, 与, 或, 在, 是, 有, 被, 将, 把} cleaned_text re.sub(r[^\w\s], , text) words cleaned_text.split() filtered_words [w for w in words if w not in stop_words and len(w) 1] # 2. 简单词频统计 word_count {} for word in filtered_words: if word in word_count: word_count[word] 1 else: word_count[word] 1 # 3. 选择频率最高的几个词作为关键词 sorted_words sorted(word_count.items(), keylambda x: x[1], reverseTrue) return [word for word, count in sorted_words[:10] if count 1] # 使用示例 if __name__ __main__: chunker DynamicChunker( api_keyYOUR_API_KEY, base_urlhttp://YOUR_BASE_URL:9380 ) doc_id chunker.process_document(设备维修手册.pdf, 动态分块测试) print(f处理完成文档ID: {doc_id})完