2026/4/18 4:25:37
网站建设
项目流程
做网站需要哪些硬件,宁夏找人做网站多少钱,无锡网站制作系统,中山网站只设计好的#xff0c;遵照您的要求。以下是一篇关于 Ray 分布式计算 API 的深度技术文章#xff0c;旨在为开发者提供新颖的视角和实用的洞察。超越Spark与Celery#xff1a;深入Ray分布式计算框架的架构与高级模式
引言#xff1a;分布式计算的“新常态”与Ray的诞生
在当今以 …好的遵照您的要求。以下是一篇关于 Ray 分布式计算 API 的深度技术文章旨在为开发者提供新颖的视角和实用的洞察。超越Spark与Celery深入Ray分布式计算框架的架构与高级模式引言分布式计算的“新常态”与Ray的诞生在当今以 AI 和数据为中心的计算时代我们面临的挑战已从简单的“大数据”批处理演变为对复杂、异构、动态且对延迟敏感的计算工作流的需求。传统的“一招鲜”解决方案开始显得捉襟见肘Apache Spark 擅长于静态数据流的批处理但在迭代式机器学习或实时服务交互上不够灵活Celery 或 Airflow 擅长任务编排却缺乏对复杂状态共享和低延迟通信的原生支持。Ray正是在这种背景下应运而生。它并非另一个任务队列或批处理引擎而是一个旨在将任何 Python、Java 或 C 应用自然地转变为分布式应用的通用分布式计算框架。其核心设计哲学是提供一个简单、通用的 API让开发者像编写单机程序一样编写分布式程序同时由系统底层处理令人头疼的容错、调度和序列化问题。本文旨在超越“Hello World”式的并行ray.get(ray.remote(f).remote())示例深入 Ray 的架构核心并探讨其在构建动态依赖工作流和有状态分布式服务等高级场景下的独特威力。一、 Ray 架构深度解构万物皆对象万物皆可分布要理解 Ray 的强大必须首先理解其简洁而强大的两层架构。1.1 系统层Ray Core的四大支柱Ray 运行时由四个关键组件构成它们共同协作实现了高性能和透明分布式编程的承诺。驱动节点 (Driver Node):用户脚本执行的地方。它不执行实际的计算任务而是负责任务的提交和协调。工作节点 (Worker Node):执行具体任务ray.remote装饰的函数或类方法的进程。它们是计算能力的提供者。对象存储 (Object Store / Plasma):一个跨进程、跨节点的共享内存存储。这是 Ray 性能的基石。当一个任务产生一个中间结果一个ObjectRef时这个结果会尽可能存储在本地节点内存中其他需要该结果的任务即使在同一节点的不同进程可以通过零拷贝方式直接读取避免了昂贵的序列化/反序列化和网络传输开销。全局控制存储 (Global Control Store, GCS):这是 Ray 2.0 之后架构演进的里程碑。GCS 是一个高可用的中心化元数据存储负责跟踪系统中的所有实体节点、Actor、对象、任务等。它的存在极大简化了系统设计和提高了可扩展性使得 Ray 集群可以轻松扩展到成千上万个节点。┌─────────────────────────────────────────────────────────────┐ │ Application Layer │ │ (Tasks, Actors, ray.get, ray.put, wait, etc.) │ └─────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────┐ │ System Layer (Ray Core) │ ├─────────────┬──────────────┬──────────────┬─────────────────┤ │ Scheduler │ GCS │ Object Store│ Raylet │ │ (分布式调度) │ (全局元数据) │ (共享内存) │(本地代理/资源管理)│ └─────────────┴──────────────┴──────────────┴─────────────────┘ ┌─────────────────────────────────────────────────────────────┐ │ Infrastructure Layer │ │ (K8s, VMs, Bare Metal, Cloud Provider) │ └─────────────────────────────────────────────────────────────┘1.2 核心抽象Task 与 Actor 的再认识通常我们将ray.remote函数称为Taskray.remote类称为Actor。但这只是表象。更深层次的理解是Task: 无状态的函数执行。它是一次性、无副作用的计算。Ray 会调度它到一个可用的 Worker 上执行返回结果一个ObjectRef。Task 是分布式函数即服务 (FaaS)的雏形但远比传统 FaaS 灵活因为它支持复杂的依赖图和数据本地性感知调度。Actor: 有状态的服务实例。它是一个长期运行、拥有内部状态的“服务对象”。Actor 的方法调用也是远程的 Task但关键区别在于这些 Task 会被顺序调度到该 Actor 实例所在的唯一 Worker 上执行从而安全地修改和访问其内部状态。Actor 模型是构建分布式微服务、模拟器、参数服务器等的理想范式。二、 高级模式一动态任务依赖图与条件执行传统工作流引擎如 Airflow的 DAG 通常是静态声明式的。而 Ray 允许你在运行时动态创建任务及其依赖关系这为构建自适应、数据驱动的流水线提供了可能。2.1 场景一个智能数据处理与模型训练流水线假设我们有一个流水线数据加载 - 数据验证 - (若验证通过) - 特征工程 - 模型训练 - (若指标达标) - 模型部署。其中特征工程有多个可选的策略需要根据数据验证的结果动态选择。在 Ray 中我们可以优雅地实现这种动态性import ray import random from typing import Dict, Any ray.init() ray.remote def load_data(source: str) - Dict[str, Any]: # 模拟数据加载 data {rows: 1000, quality: random.uniform(0.7, 1.0)} print(fData loaded from {source}, quality: {data[quality]:.2f}) return data ray.remote def validate_data(data: Dict[str, Any]) - Dict[str, Any]: # 数据质量检查并返回下一步的建议 quality data[quality] if quality 0.9: recommendation {valid: True, feature_strategy: advanced} elif quality 0.7: recommendation {valid: True, feature_strategy: basic} else: recommendation {valid: False, reason: Low data quality} print(fValidation result: {recommendation}) return recommendation ray.remote def feature_engineering_basic(data: Dict[str, Any]) - Dict[str, Any]: print(Using BASIC feature engineering strategy.) return {features: basic_features, data: data} ray.remote def feature_engineering_advanced(data: Dict[str, Any]) - Dict[str, Any]: print(Using ADVANCED feature engineering strategy.) return {features: advanced_features, data: data} ray.remote def train_model(feature_data: Dict[str, Any]) - Dict[str, Any]: print(fTraining model with {feature_data[features]}...) # 模拟训练 accuracy random.uniform(0.8, 0.95) return {model_id: model_001, accuracy: accuracy} ray.remote def deploy_model(model_result: Dict[str, Any]) - str: if model_result[accuracy] 0.9: print(fModel {model_result[model_id]} deployed with accuracy {model_result[accuracy]:.2f}) return DEPLOY_SUCCESS else: print(fModel {model_result[model_id]} accuracy {model_result[accuracy]:.2f} too low, skip deployment.) return DEPLOY_SKIPPED # 1. 启动初始任务 data_ref load_data.remote(s3://my-bucket/data.csv) validation_ref validate_data.remote(data_ref) # 2. 动态决定下一步这里无法预先知道该调用哪个特征工程函数 # 我们使用 ray.get 来获取决策结果然后动态提交新任务。 validation_result ray.get(validation_ref) if validation_result[valid]: strategy validation_result[feature_strategy] if strategy basic: features_ref feature_engineering_basic.remote(ray.get(data_ref)) # 传递原始数据 else: # advanced features_ref feature_engineering_advanced.remote(ray.get(data_ref)) # 3. 继续后续流水线 model_ref train_model.remote(features_ref) deploy_result_ref deploy_model.remote(model_ref) final_result ray.get(deploy_result_ref) print(fPipeline finished with: {final_result}) else: print(fPipeline failed at validation: {validation_result[reason]})关键洞察 这里的依赖图并非在脚本开头就完全定义好的。feature_engineering_basic或feature_engineering_advanced的调用是在获取了validate_data的结果后动态决定的。Ray 的ObjectRef和ray.get的阻塞语义使得这种“先计算后决定”的模式变得非常直观。更进一步我们可以利用ray.wait来处理多个可能的并行分支或使用Ray WorkflowRay 的一个官方库来将这种动态 DAG 持久化和容错化。三、 高级模式二基于Actor的弹性有状态服务Actor 不仅仅是“一个有状态的类”。它是构建弹性、可组合分布式系统的乐高积木。3.1 场景一个实时推荐系统仿真假设我们需要模拟一个推荐系统它包含用户画像服务 (UserProfileActor): 维护每个用户的实时兴趣向量。召回服务 (RetrievalActor): 根据用户画像从海量商品中快速筛选出候选集。排序服务 (RankingActor): 对候选集进行精排。 这些服务需要频繁交互且各自维护着重要状态如用户画像、商品索引、排序模型。import ray import numpy as np import time from collections import defaultdict from typing import List, Dict ray.init() ray.remote class UserProfileActor: 维护用户状态的服务。 def __init__(self): # 模拟用户兴趣向量 self.user_profiles defaultdict(lambda: np.random.randn(100)) def update_profile(self, user_id: str, item_vector: np.ndarray): # 简单模拟兴趣更新 self.user_profiles[user_id] 0.9 * self.user_profiles[user_id] 0.1 * item_vector def get_profile(self, user_id: str) - np.ndarray: return self.user_profiles[user_id].copy() ray.remote class RetrievalActor: 召回服务维护商品库索引。 def __init__(self, item_count: int 10000): # 模拟商品向量数据库 self.item_vectors np.random.randn(item_count, 100) self.item_ids [fitem_{i} for i in range(item_count)] def retrieve(self, user_vector: np.ndarray, top_k: int 100) - List[Dict]: # 简单的内积召回 scores np.dot(self.item_vectors, user_vector) top_indices np.argsort(scores)[-top_k:][::-1] return [{item_id: self.item_ids[i], score: float(scores[i])} for i in top_indices] ray.remote class RankingActor: 排序服务可能加载一个重模型。 def __init__(self, model_path: str): # 模拟加载一个复杂的排序模型 self.model_loaded True print(fRanking model loaded from {model_path}) def rank(self, user_vector: np.ndarray, candidate_items: List[Dict]) - List[Dict]: # 模拟一个更精细的排序打分 for item in candidate_items: # 假设排序分数 召回分 * (1 模拟的CTR预估) simulated_ctr 0.5 0.3 * np.random.rand() item[rank_score] item[score] * simulated_ctr ranked_items sorted(candidate_items, keylambda x: x[rank_score], reverseTrue) return ranked_items[:10] # 返回Top-10 # 初始化有状态服务 user_profile_service UserProfileActor.options(nameuser_profile).remote() retrieval_service RetrievalActor.options(nameretrieval).remote() ranking_service RankingActor.options(nameranking, lifetimedetached).remote(./model.pb) # lifetimedetached 使得该Actor在驱动脚本退出后依然存活 # 模拟一个推荐请求的处理流程 def handle_recommendation_request(user_id: str): # 1. 异步并行获取用户画像 (从UserProfileActor) profile_future user_profile_service.get_profile.remote(user_id) # 2. 同时可以并行做一些其他不依赖画像的事情... # 3. 等待画像然后触发召回 user_vector ray.get(profile_future) candidates_future retrieval_service.retrieve.remote(user_vector, top_k200) # 4. 召回完成后进行排序 candidates ray.get(candidates_future) ranked_future ranking_service.rank.remote(user_vector, candidates) # 5. 获取最终排序结果并更新用户画像假设用户点击了Top-1 final_recommendations ray.get(ranked_future) top_item_id final_recommendations[0][item_id] # 假设我们根据点击的item更新画像这里简化用随机向量模拟item向量 clicked_item_vector np.random.randn(100) user_profile_service.update_profile.remote(user_id, clicked_item_vector) return [item[item_id] for item in final_recommendations] # 并发处理多个请求 request_users [user_a, user_b, user_c, user_d] result_refs [ray.remote(handle_recommendation_request).remote(uid) for uid in request_users] results ray.get(result_refs) print(fRecommendation results: {results}) # 即使主程序退出ranking_service Actor 依然在集群中运行 print(Main driver exiting, but RankingActor remains alive (detached).)关键洞察服务发现与通信: 通过.options(nameservice_name)为 Actor 命名其他服务可以通过ray.get_actor(“service_name”)在不持有原始句柄的情况下找到并与之通信实现了松耦合。弹性与容错: 每个 Actor 都可以独立配置重启策略max_restarts。如果一个 RetrievalActor 因内存溢出崩溃Ray 可以自动重启它而依赖它的 RankingActor 只需处理临时的调用失败可通过重试机制解决。资源隔离: 我们可以为不同的 Actor 类指定不同的资源需求如num_gpus1Ray 调度器会确保它们被调度到合适的节点上。组合性: 整个推荐流水线由多个独立的、可复用的 Actor 组合而成。我们可以轻松地替换新的召回算法创建一个新的 RetrievalActor 版本或者对 UserProfileActor 进行水平扩容创建多个处理不同用户分片的实例。四、 Ray 生态与未来展望Ray 早已超越其核心框架成长为一个繁荣的生态系统Ray AI RuntimeRay Train: 分布式深度学习训练库与 PyTorch、TensorFlow 无缝集成。Ray Tune: 超参数调优库支持最先进的算法可轻松扩展至数百个试验。Ray Serve: 高性能、可编程的模型部署和服务库是生产级 AI 服务的理想选择。Ray Datasets: 提供与分布式数据处理管道可插拔执行引擎包括 Ray 自身的简单接口。这些库并非孤立的它们都建立在 Ray Core 强大的 Task 和 Actor 抽象之上意味着你可以在一个应用程序中混用这些组件例如用 Ray Datasets 进行数据预处理用 Ray Train 进行训练用 Ray Tune 调整超参数最后用 Ray Serve 部署模型所有环节都在同一个集群上高效、透明地完成。结论Ray 通过其“万物皆可远程”的简单 APIray.remote和强大的底层架构GCS, Object Store成功地统一了任务并行与 Actor 模型两种范式。它使得开发者能够专注于应用逻辑本身而非分布式系统的复杂性。