2026/4/18 12:59:55
网站建设
项目流程
接送车服务网站怎么做,wordpress3.0手机版,个人开发的软件能卖吗,琼山网站制作ai自己制作mod2 ocr vlm识别 模型页面点击打开模型页面_哔哩哔哩_bilibili
ocr识别不了点赞图标#xff0c;不然点赞收藏一气喝成就能下载模型了 # e:\code\my_python_server\llm_server\memory_llm.py
import tkinter as tk
from tkinter import scrolledtext, messagebox, …ai自己制作mod2 ocr vlm识别 模型页面点击打开模型页面_哔哩哔哩_bilibiliocr识别不了点赞图标不然点赞收藏一气喝成就能下载模型了# e:\code\my_python_server\llm_server\memory_llm.py import tkinter as tk from tkinter import scrolledtext, messagebox, ttk import os import subprocess import re import json import sys from pathlib import Path from llm_class import VLMService # 假设VLMService基于LLMService import pyautogui from PIL import Image import base64 from io import BytesIO def execute_python_script(script_path, *args): 执行指定路径的Python脚本 # 获取项目根目录从当前脚本位置向上一级 current_dir Path(__file__).parent.parent # 回到项目根目录 script_full_path current_dir / script_path if not script_full_path.exists(): return f错误: 脚本 {script_path} 不存在 if script_full_path.suffix ! .py: return f错误: 文件必须是Python脚本 (.py文件) try: # 构建命令工具名称作为脚本的第一个参数 cmd [sys.executable, str(script_full_path)] list(args) # 执行Python脚本指定编码为UTF-8 result subprocess.run( cmd, capture_outputTrue, textTrue, timeout30, cwdstr(current_dir), encodingutf-8, # 明确指定UTF-8编码 errorsreplace # 遇到编码错误时替换字符 ) print( ,result.stdout.strip()) if result.returncode 0: return result.stdout.strip() else: return f脚本执行失败: {result.stderr.strip()} except subprocess.TimeoutExpired: return f脚本执行超时: {script_path} except Exception as e: return f执行脚本时出错: {str(e)} def list_available_tools(): 从配置文件中列出所有可用工具 current_dir Path(__file__).parent config_path current_dir / tools_config.json if not config_path.exists(): return [] try: with open(config_path, r, encodingutf-8) as f: config json.load(f) return config.get(tools, []) except Exception as e: return [] def get_tool_by_name(tool_name): 根据工具名称获取工具信息 tools list_available_tools() for tool in tools: if tool[name] tool_name: return tool return None def execute_tool(tool_name, *args): 执行指定名称的工具 tool_info get_tool_by_name(tool_name) if not tool_info: return f错误: 未找到工具 {tool_name} script_path tool_info[path] result execute_python_script(script_path, tool_name, *args) return result def get_available_tools_info(): 获取所有可用工具的信息 current_dir Path(__file__).parent config_path current_dir / tools_config.json if not config_path.exists(): return 错误: 工具配置文件不存在 try: with open(config_path, r, encodingutf-8) as f: config json.load(f) return config.get(tools, []) except Exception as e: return [] def get_tools_description(): 获取工具描述用于提供给VLM tools get_available_tools_info() if not tools or isinstance(tools, str): # 检查是否返回错误 return 当前没有可用工具 tools_desc 可用工具列表:\n for tool in tools: name tool.get(name, 未知工具) desc tool.get(description, 无描述) params tool.get(parameters, []) if params: param_desc , .join([f{p[name]}({p[type]}) for p in params]) tools_desc f- {name}: {desc} (参数: {param_desc})\n else: tools_desc f- {name}: {desc} (无参数)\n tools_desc \n使用格式: [TOOL:工具名称,参数1,参数2,...]\n return tools_desc def image_to_base64(image_path): 将图像文件转换为base64编码 with open(image_path, rb) as image_file: return base64.b64encode(image_file.read()).decode(utf-8) def is_task_completed(ai_response, tool_result): 判断任务是否完成 completion_indicators [ 任务完成, 完成任务, 任务已结束, 已完成, 任务完成, task completed, finished, done, success, 成功 ] combined_text f{ai_response} {tool_result}.lower() return any(indicator in combined_text for indicator in completion_indicators) def vision_task_loop(task_description, knowledge_fileNone, memory_fileNone, reset_first_iterationTrue): 基于视觉的循环任务执行器 current_dir os.path.dirname(os.path.abspath(__file__)) if knowledge_file is None: knowledge_file os.path.join(current_dir, knowledge.txt) if memory_file is None: memory_file os.path.join(current_dir, memory.txt) # 创建LLM服务实例模拟VLM vlm_service VLMService() # 读取固定知识 system_prompt_parts [] # 添加可用工具信息到系统提示 tools_description get_tools_description() system_prompt_parts.append(f可用工具信息:\n{tools_description}) # 添加固定知识 if os.path.exists(knowledge_file): with open(knowledge_file, r, encodingutf-8) as f: knowledge_content f.read() if knowledge_content.strip(): system_prompt_parts.append(f重要知识:\n{knowledge_content}) # 组合系统提示 system_prompt \n.join(system_prompt_parts) iteration_count 0 max_iterations 50 # 设置最大迭代次数防止无限循环 first_iteration reset_first_iteration # 使用参数来决定是否重置首次迭代标志 # 记录之前的AI响应用于检测重复行为 previous_ai_response previous_tool_result while iteration_count max_iterations: iteration_count 1 # 截取当前屏幕 screenshot pyautogui.screenshot() screenshot_path os.path.join(current_dir, current_screen.png) screenshot.save(screenshot_path) # 准备消息列表 messages [] # 添加系统提示包含工具信息 if system_prompt: messages.append({ role: system, content: system_prompt }) # 添加任务描述和当前截图信息 - 使用更清晰的格式 if first_iteration: # 首次迭代时AI只需要开始分析任务 user_message f当前任务: {task_description}\n请分析当前屏幕截图并开始执行任务。 else: # 非首次迭代时询问任务完成情况避免重复之前的操作 user_message ( f当前任务: {task_description}\n f上一轮AI分析: {previous_ai_response}\n f上一轮工具执行结果: {previous_tool_result}\n f请分析当前屏幕截图判断任务完成情况避免重复执行相同操作并按需执行相应操作。 f如果任务已经完成请明确说明任务已完成。 ) # 构建包含图像的消息内容 image_content { type: image_url, image_url: { url: fdata:image/jpeg;base64,{image_to_base64(screenshot_path)} } } text_content { type: text, text: user_message } # 构建用户消息包含文本和图像 messages.append({ role: user, content: [text_content, image_content] }) print(f用户消息: {user_message}, 截图保存在: {screenshot_path}) try: # 调用LLM服务模拟VLM功能 result vlm_service.create_with_image(messages) # 不传递图像路径因为已经在消息中包含 ai_response result[choices][0][message][content] print(fVLM响应: {ai_response}) # 执行AI返回的工具指令 tool_execution_result process_tool_calls(ai_response, memory_file) # 更新历史记录 previous_ai_response ai_response previous_tool_result tool_execution_result or # 显示AI响应 yield fAI分析: {ai_response} # 只在非首次迭代时检查任务完成状态 if not first_iteration: # 检查任务是否完成 if is_task_completed(ai_response, tool_execution_result or ): yield 任务已完成退出循环 break # 更新标志表示不再是第一次迭代 first_iteration False # 如果没有工具执行结果检查AI响应是否表明任务已完成 if any(indicator in ai_response.lower() for indicator in [任务完成, 完成任务, 已完成, task completed, finished, done]): yield 任务已完成退出循环 # 取消自动删除短期记忆改为手动删除 break except Exception as e: error_msg f执行任务时出错: {str(e)} yield error_msg break if iteration_count max_iterations: yield 达到最大迭代次数停止任务执行 def process_tool_calls(response_text, memory_file_pathNone): 解析AI响应中的工具调用指令 支持格式: [TOOL:工具名称,arg1,arg2,arg3...] # 修复正则表达式以正确捕获工具名称和所有参数 tool_pattern r\[TOOL:([^\],\]])(?:,([^\]]*))?\] matches re.findall(tool_pattern, response_text) all_results [] if not matches: print (未找到工具调用指令) for match in matches: tool_name match[0] tool_args_str match[1] # 包含所有参数的字符串可能为空 # 验证工具是否存在 tools get_available_tools_info() if not tools or isinstance(tools, str): # 检查是否返回错误 all_results.append(f工具 {tool_name} 执行失败: 无法获取工具列表) continue tool_exists any(tool[name] tool_name for tool in tools) if not tool_exists: all_results.append(f工具 {tool_name} 执行失败: 工具不存在) continue # 解析参数处理带引号的参数值如果存在参数 tool_args [] if tool_args_str: # 如果有参数 current_arg inside_quotes False quote_char None i 0 while i len(tool_args_str): char tool_args_str[i] if char in [, ] and not inside_quotes: # 开始引号 inside_quotes True quote_char char elif char quote_char and inside_quotes: # 结束引号 inside_quotes False quote_char None elif char , and not inside_quotes: # 参数分隔符不在引号内 tool_args.append(current_arg.strip()) current_arg else: current_arg char i 1 # 添加最后一个参数 if current_arg: tool_args.append(current_arg.strip()) print(f正在执行工具{tool_name}) print(f参数列表{tool_args}) result execute_tool(tool_name, *tool_args) if tool_args else execute_tool(tool_name) print(结果,result) # 处理执行结果为None的情况 if result is None: result 工具执行结果为空 # 将所有工具的结果写入记忆文件这样AI可以看到 if memory_file_path: try: with open(memory_file_path, a, encodingutf-8) as f: f.write(f工具 {tool_name} 执行结果:\n{result}\n\n) except Exception as e: print(f写入记忆文件失败: {e}) all_results.append(f工具 {tool_name} 执行结果: {result}) return \n.join(all_results) if all_results else None def parse_history_content(content): 解析历史对话内容转换为messages格式 messages [] lines content.strip().split(\n) current_role None current_content [] for line in lines: line line.strip() if line.startswith(用户:): if current_role and current_content: messages.append({ role: current_role, content: \n.join(current_content).strip() }) current_role user current_content [line[3:].strip()] elif line.startswith(AI:): if current_role and current_content: messages.append({ role: current_role, content: \n.join(current_content).strip() }) current_role assistant current_content [line[3:].strip()] elif line and current_content: if current_role and current_content: messages.append({ role: current_role, content: \n.join(current_content).strip() }) current_role None current_content [] elif current_role: current_content.append(line) if current_role and current_content: messages.append({ role: current_role, content: \n.join(current_content).strip() }) return messages class VLMTaskApp: def __init__(self, root): self.root root self.root.title(VLM任务执行器) # 修改窗口大小为较小尺寸并设置为置顶 self.root.geometry(400x600) # 调整为较小的尺寸 self.root.attributes(-topmost, True) # 设置窗口置顶 # 任务执行标志 self.is_executing False # 创建界面 self.setup_ui() # 文件路径 current_dir os.path.dirname(os.path.abspath(__file__)) self.knowledge_file os.path.join(current_dir, knowledge.txt) self.memory_file os.path.join(current_dir, memory.txt) # 启动时加载记忆文件内容到显示区域 self.load_memory_content() def setup_ui(self): # 任务描述输入区域 task_frame tk.Frame(self.root) task_frame.pack(filltk.X, padx10, pady5) tk.Label(task_frame, text任务描述:).pack(anchortk.W) self.task_input tk.Text(task_frame, height3) self.task_input.pack(filltk.X, pady5) # 控制按钮区域 control_frame tk.Frame(self.root) control_frame.pack(filltk.X, padx10, pady5) self.start_button tk.Button( control_frame, text开始执行任务, commandself.start_task ) self.start_button.pack(sidetk.LEFT, padx(0, 10)) self.stop_button tk.Button( control_frame, text停止任务, commandself.stop_task, statetk.DISABLED ) self.stop_button.pack(sidetk.LEFT, padx(0, 10)) # 添加清除短期记忆按钮 self.clear_memory_button tk.Button( control_frame, text清除短期记忆, commandself.clear_short_term_memory ) self.clear_memory_button.pack(sidetk.LEFT) # 聊天历史显示区域 self.chat_history scrolledtext.ScrolledText( self.root, wraptk.WORD, statedisabled, height30 ) self.chat_history.pack(padx10, pady5, filltk.BOTH, expandTrue) # 任务状态标签 self.status_label tk.Label(self.root, text状态: 等待任务开始, bd1, relieftk.SUNKEN, anchortk.W) self.status_label.pack(sidetk.BOTTOM, filltk.X) def load_memory_content(self): 启动时加载记忆文件内容到显示区域 if os.path.exists(self.memory_file): try: with open(self.memory_file, r, encodingutf-8) as f: content f.read() if content.strip(): # 启用文本框编辑 self.chat_history.config(statenormal) # 清空当前内容 self.chat_history.delete(1.0, tk.END) # 插入记忆文件内容 self.chat_history.insert(tk.END, content) # 禁用编辑并滚动到底部 self.chat_history.config(statedisabled) self.chat_history.see(tk.END) except Exception as e: print(f加载记忆文件失败: {str(e)}) else: # 如果记忆文件不存在清空显示区域 self.chat_history.config(statenormal) self.chat_history.delete(1.0, tk.END) self.chat_history.config(statedisabled) def start_task(self): 开始执行任务 task_description self.task_input.get(1.0, tk.END).strip() if not task_description: messagebox.showwarning(警告, 请输入任务描述) return self.is_executing True self.start_button.config(statetk.DISABLED) self.stop_button.config(statetk.NORMAL) self.update_status(状态: 任务执行中...) # 在新线程中执行任务以避免界面冻结 import threading task_thread threading.Thread( targetself.run_task, args(task_description,) ) task_thread.daemon True task_thread.start() def stop_task(self): 停止任务执行 self.is_executing False self.start_button.config(statetk.NORMAL) self.stop_button.config(statetk.DISABLED) self.update_status(状态: 任务已停止) def run_task(self, task_description): 执行任务的主循环 try: # 显示用户输入的任务 self.display_message(f用户: {task_description}) # 追加到记忆文件而不是覆盖 with open(self.memory_file, a, encodingutf-8) as f: f.write(f用户: {task_description}\n\n) # 执行任务循环确保重置首次迭代标志 for output in vision_task_loop(task_description, self.knowledge_file, self.memory_file, reset_first_iterationTrue): if not self.is_executing: self.display_message(系统: 任务已手动停止) break self.display_message(fAI: {output}) # 将输出追加到记忆文件 with open(self.memory_file, a, encodingutf-8) as f: f.write(fAI: {output}\n\n) except Exception as e: self.display_message(f系统: 执行任务时出错: {str(e)}) finally: self.is_executing False self.root.after(0, lambda: self.start_button.config(statetk.NORMAL)) self.root.after(0, lambda: self.stop_button.config(statetk.DISABLED)) self.root.after(0, lambda: self.update_status(状态: 任务执行完成)) def display_message(self, message): 显示消息 self.root.after(0, self._display_message, message) def _display_message(self, message): 在主线程中更新UI self.chat_history.config(statenormal) self.chat_history.insert(tk.END, f{message}\n\n) self.chat_history.config(statedisabled) self.chat_history.see(tk.END) def update_status(self, status_text): 更新状态栏 self.status_label.config(textstatus_text) def clear_short_term_memory(self): 手动清除短期记忆 if os.path.exists(self.memory_file): try: os.remove(self.memory_file) # 同时清空显示区域 self.chat_history.config(statenormal) self.chat_history.delete(1.0, tk.END) self.chat_history.config(statedisabled) self.display_message( 短期记忆已手动清除) except Exception as e: messagebox.showerror(错误, f清除短期记忆失败: {str(e)}) else: self.display_message(短期记忆文件不存在) def main(): root tk.Tk() app VLMTaskApp(root) root.mainloop() if __name__ __main__: main()import tkinter as tk from tkinter import filedialog import pytesseract from PIL import ImageGrab import cv2 import numpy as np from typing import List, Tuple, Dict import json import io from PIL import Image import pyautogui import easyocr import sys def ocr_screen_tool(): 弹窗选择图片并进行OCR识别返回文字和坐标信息的JSON格式结果 try: # 创建tkinter根窗口 root tk.Tk() root.withdraw() # 隐藏主窗口 # 弹窗选择图片文件 file_path filedialog.askopenfilename( title选择要识别的图片, filetypes[ (图片文件, *.png *.jpg *.jpeg *.bmp *.tiff *.tif), (PNG文件, *.png), (JPG文件, *.jpg), (JPEG文件, *.jpeg), (BMP文件, *.bmp), (TIFF文件, *.tiff), (所有文件, *.*) ] ) # 销毁根窗口 root.destroy() # 如果用户取消选择返回错误信息 if not file_path: error_result {error: 用户取消了图片选择} return json.dumps(error_result, ensure_asciiFalse, indent2) # 加载选择的图片 image Image.open(file_path) # 初始化OCR读取器 reader easyocr.Reader([ch_sim, en]) # 可根据需要添加其他语言 # 执行OCR识别 results reader.readtext(np.array(image)) # 格式化结果 formatted_results [] for (bbox, text, confidence) in results: # 将NumPy类型转换为Python原生类型 formatted_bbox [ [int(point[0]), int(point[1])] for point in bbox ] formatted_results.append({ text: text, confidence: float(confidence), bbox: formatted_bbox }) print(ocr:, formatted_results) return json.dumps(formatted_results, ensure_asciiFalse, indent2) except Exception as e: error_result {error: fOCR识别失败: {str(e)}} return json.dumps(error_result, ensure_asciiFalse, indent2) def find_text_coordinates(text_to_find: str, image_path: str None): 从指定图片或全屏截图中查找指定文字的中心点坐标 try: # 如果没有提供图片路径使用全屏截图 if not image_path: screenshot pyautogui.screenshot() image screenshot else: # 加载指定的图片 image Image.open(image_path) # 初始化OCR读取器 reader easyocr.Reader([ch_sim, en]) # 可根据需要添加其他语言 # 执行OCR识别 results reader.readtext(np.array(image)) # 遍历OCR结果查找指定文字 for (bbox, text, confidence) in results: if text_to_find.lower() in text.lower(): # 不区分大小写匹配 # 计算边界框的中心点坐标 x_coords [point[0] for point in bbox] y_coords [point[1] for point in bbox] center_x int((min(x_coords) max(x_coords)) / 2) center_y int((min(y_coords) max(y_coords)) / 2) result { text: text, target_text: text_to_find, center_x: center_x, center_y: center_y, bbox: [[int(point[0]), int(point[1])] for point in bbox], confidence: float(confidence) } return json.dumps(result, ensure_asciiFalse, indent2) # 如果没有找到指定文字 result { error: f未找到文字: {text_to_find}, found_texts: [] # 可以返回所有找到的文字作为参考 } # 添加所有找到的文字可选用于调试 for (bbox, text, confidence) in results: result[found_texts].append({ text: text, confidence: float(confidence) }) return json.dumps(result, ensure_asciiFalse, indent2) except Exception as e: error_result {error: f查找文字坐标失败: {str(e)}} return json.dumps(error_result, ensure_asciiFalse, indent2) def get_all_text(image_path: str None): 从指定图片或全屏截图中获取所有识别到的文本以字符串形式返回 try: # 如果没有提供图片路径使用全屏截图 if not image_path: screenshot pyautogui.screenshot() image screenshot else: # 加载指定的图片 image Image.open(image_path) # 初始化OCR读取器 reader easyocr.Reader([ch_sim, en]) # 可根据需要添加其他语言 # 执行OCR识别 results reader.readtext(np.array(image)) # 提取所有文本按置信度排序 all_text [] for (bbox, text, confidence) in results: all_text.append(text) # 将所有文本连接成一个字符串每行一个文本 full_text \n.join(all_text) return full_text except Exception as e: return f获取所有文本失败: {str(e)} def main(): 主函数处理命令行参数以适配executor.py. if len(sys.argv) 2: # 当没有命令行参数时直接运行ocr_screen_tool result ocr_screen_tool() print(result) return tool_name sys.argv[1] try: if tool_name ocr_screen: result ocr_screen_tool() elif tool_name find_text_coordinates: if len(sys.argv) 3: result 错误: find_text_coordinates 需要指定要查找的文字 else: text_to_find sys.argv[2] # 如果提供了图片路径参数 image_path sys.argv[3] if len(sys.argv) 3 else None result find_text_coordinates(text_to_find, image_path) elif tool_name get_all_text: # 如果提供了图片路径参数 image_path sys.argv[2] if len(sys.argv) 2 else None result get_all_text(image_path) else: result f错误: 未找到工具 {tool_name} except Exception as e: result f执行工具时出错: {str(e)} print(result) if __name__ __main__: main()import os from dotenv import load_dotenv import json import http.client from urllib.parse import urlparse import base64 import requests from io import BytesIO from PIL import Image dotenv_path rE:\code\my_python_server_private\.env load_dotenv(dotenv_path) import ssl import urllib3 # 在程序开始时禁用SSL警告仅在开发环境中使用 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class LLMService: def __init__(self): # 从环境变量中获取 DeepSeek 参数 # 加载 .env 文件中的环境变量 self.api_url os.getenv(LLM_OPENAI_API_URL) self.model_name os.getenv(LLM_MODEL_NAME) self.api_key os.getenv(LLM_OPENAI_API_KEY) # 检查必需的环境变量是否存在 if not self.api_url: raise ValueError(环境变量 deepseek_OPENAI_API_URL 未设置或为空) if not self.model_name: raise ValueError(环境变量 deepseek_MODEL_NAME 未设置或为空) if not self.api_key: raise ValueError(环境变量 deepseek_OPENAI_API_KEY 未设置或为空) print(fLLM服务初始化完成模型: {self.model_name}) def create(self, messages, toolsNone): print(开始调用LLM服务) # 解析 URL去掉协议部分 parsed urlparse(f{self.api_url}/chat/completions) host, path parsed.hostname, parsed.path if not host: raise ValueError(API URL 无效无法解析主机名) # 创建 HTTP 连接 conn http.client.HTTPSConnection(host) # 构造请求体 request_body { model: self.model_name, messages: messages, tools: tools, temperature: 0.9 # 添加温度参数 } # 发送 POST 请求 headers { Content-Type: application/json, Authorization: fBearer {self.api_key} } conn.request( POST, path, bodyjson.dumps(request_body), headersheaders ) # 获取响应 response conn.getresponse() print(fLLM服务响应状态码: {response.status}) if response.status ! 200: error_msg response.read().decode(utf-8) raise Exception(fLLM服务器错误: {response.status} - {error_msg}) # 读取响应内容 response_data response.read().decode(utf-8) data json.loads(response_data) # 确保output目录存在 os.makedirs(output, exist_okTrue) # 将响应保存到文件 (修复路径分隔符问题) output_file_path os.path.join(output, formatted_data.json) with open(output_file_path, w, encodingutf-8) as f: json.dump(data, f, indent4, ensure_asciiFalse) # 关闭连接 conn.close() print(LLM服务调用完成) return data class VLMService: def __init__(self): # 从环境变量中获取 VLM 参数 self.api_url os.getenv(VLM_OPENAI_API_URL) self.model_name os.getenv(VLM_MODEL_NAME) self.api_key os.getenv(VLM_OPENAI_API_KEY) # 检查必需的环境变量是否存在 if not self.api_url: raise ValueError(环境变量 VLM_OPENAI_API_URL 未设置或为空) if not self.model_name: raise ValueError(环境变量 VLM_MODEL_NAME 未设置或为空) if not self.api_key: raise ValueError(环境变量 VLM_OPENAI_API_KEY 未设置或为空) def encode_image(self, image_source): 编码图像为base64字符串 支持URL和本地文件路径 try: if image_source.startswith((http://, https://)): # 从URL获取图像 response requests.get(image_source) image_data response.content else: # 从本地文件路径获取图像 with open(image_source, rb) as image_file: image_data image_file.read() return base64.b64encode(image_data).decode(utf-8) except Exception as e: raise Exception(f图像编码失败: {str(e)}) def create_with_image(self, messages, image_sourceNone, toolsNone): 使用图像创建VLM请求 :param messages: 消息列表 :param image_source: 图像源可选如果在消息中已经包含图像则可不传 :param tools: 工具定义可选 # 如果提供了图像源且第一条消息是用户消息则添加图像到该消息 if image_source and messages and messages[0][role] user: # 编码图像 base64_image self.encode_image(image_source) # 获取当前用户消息的内容 current_content messages[0][content] # 构建图像内容 image_content { type: image_url, image_url: { url: fdata:image/jpeg;base64,{base64_image} } } # 如果当前内容是字符串转换为列表并添加图像 if isinstance(current_content, str): text_content { type: text, text: current_content } messages[0][content] [text_content, image_content] # 如果已经是列表直接添加图像内容 elif isinstance(current_content, list): messages[0][content].append(image_content) # 解析 URL - 需要确保URL格式正确 full_url f{self.api_url}/chat/completions parsed urlparse(full_url) host parsed.netloc or parsed.hostname path parsed.path if parsed.path else (parsed.netloc.split(/, 1)[1] if / in parsed.netloc else /v1/chat/completions) if not host: print(f解析URL失败: {full_url}) raise ValueError(API URL 无效无法解析主机名) # 创建 HTTP 连接 conn http.client.HTTPSConnection(host) # 构造请求体 request_body { model: self.model_name, messages: messages, tools: tools, temperature: 0.7 } # 发送 POST 请求 headers { Content-Type: application/json, Authorization: fBearer {self.api_key} } try: conn.request( POST, path, bodyjson.dumps(request_body), headersheaders ) except Exception as e: conn.close() raise e # 获取响应 response conn.getresponse() if response.status ! 200: error_msg response.read().decode(utf-8) print(fVLM服务器错误响应: {error_msg}) conn.close() raise Exception(fVLM服务器错误: {response.status} - {error_msg}) # 读取响应内容 response_data response.read().decode(utf-8) data json.loads(response_data) # 确保output目录存在 os.makedirs(output, exist_okTrue) # 将响应保存到文件 output_file_path os.path.join(output, vlm_formatted_data.json) with open(output_file_path, w, encodingutf-8) as f: json.dump(data, f, indent4, ensure_asciiFalse) # 关闭连接 conn.close() return data #