产品网站设计论文网页网站建设的ppt
2026/6/20 11:11:09 网站建设 项目流程
产品网站设计论文,网页网站建设的ppt,南昌地宝网首页,八年级信息做网站所用软件1. Operators 是什么#xff1a;DataStream 的“积木” DataStream 的算子#xff08;Operators / Transformations#xff09;本质上就是#xff1a; 输入一个或多个 DataStream#xff0c;输出一个新的 DataStream。 你把这些算子串起来#xff0c;就形成了 Flink 的数…1. Operators 是什么DataStream 的“积木”DataStream 的算子Operators / Transformations本质上就是输入一个或多个DataStream输出一个新的DataStream。你把这些算子串起来就形成了 Flink 的数据流拓扑DAG。常见链路长这样Source - map - flat_map - filter - key_by - reduce/aggregate - sink2. Functions算子里三种常见写法在 PyFlink 里算子需要“函数”来定义处理逻辑。官方文档强调了三种写法2.1 实现 Function 接口推荐可维护、可复用、可做 open 初始化例如MapFunctionfrompyflink.datastream.functionsimportMapFunctionclassMyMapFunction(MapFunction):defmap(self,value):returnvalue1使用frompyflink.common.typeinfoimportTypes data_streamenv.from_collection([1,2,3,4,5],type_infoTypes.INT())mapped_streamdata_stream.map(MyMapFunction(),output_typeTypes.INT())适合场景需要open()里加载资源/初始化状态逻辑复杂想结构化代码需要在类里保存变量、复用对象2.2 Lambda快速但有边界mapped_streamdata_stream.map(lambdax:x1,output_typeTypes.INT())注意官方的坑ConnectedStream.map()和ConnectedStream.flat_map()不支持 lambda它们必须分别接收CoMapFunction/CoFlatMapFunction结论单流简单逻辑可以 lambda涉及双流/连接流别用。2.3 普通 Python function兼顾可读性与轻量defmy_map_func(value):returnvalue1mapped_streamdata_stream.map(my_map_func,output_typeTypes.INT())3. Output Type为什么你经常“必须显式写 output_type”PyFlink DataStream 的一个关键机制是如果你不写output_type默认就是Types.PICKLED_BYTE_ARRAY()用 pickle 序列化。这会带来两个问题1很多下游算子/转换尤其 DataStream - Table要求类型“可解释”而不是一坨 pickle2性能上 pickle 通常更慢、也更难跨语言/跨生态联动官方给了两个典型场景转 Table与写 Sink。3.1 DataStream 转 Table 时必须是“复合类型composite type”t_env.from_data_stream(ds)需要 ds 的输出类型是 Row/Tuple 这类 composite type。所以像你这个例子里flat_map(split, Types.TUPLE([...]))必须明确类型因为后面reduce会“隐式继承这个输出类型”最终from_data_stream(ds)才能知道 schema示例你给的例子我保持同风格整理一下frompyflink.common.typeinfoimportTypesfrompyflink.datastreamimportStreamExecutionEnvironmentfrompyflink.tableimportStreamTableEnvironmentdefdata_stream_api_demo():envStreamExecutionEnvironment.get_execution_environment()t_envStreamTableEnvironment.create(stream_execution_environmentenv)t_env.execute_sql( CREATE TABLE my_source ( a INT, b VARCHAR ) WITH ( connector datagen, number-of-rows 10 ) )dst_env.to_append_stream(t_env.from_path(my_source),Types.ROW([Types.INT(),Types.STRING()]))defsplit(s):splitss[1].split(|)forspinsplits:yields[0],sp dsds.map(lambdai:(i[0]1,i[1]))\.flat_map(split,Types.TUPLE([Types.INT(),Types.STRING()]))\.key_by(lambdai:i[1])\.reduce(lambdai,j:(i[0]j[0],i[1]))t_env.execute_sql( CREATE TABLE my_sink ( a INT, b VARCHAR ) WITH ( connector print ) )tablet_env.from_data_stream(ds)table_resulttable.execute_insert(my_sink)# 本地/mini-cluster 执行建议 wait防止脚本提前退出table_result.wait()if__name____main__:data_stream_api_demo()一句话你只要把 DataStream 结果要转 Table当场就把 output_type 写死。3.2 写 Sink 时也建议显式 output_type某些 sink 只接受特定结构例如 Row/Tuplemap 后不写类型可能导致 sink 端拿到 pickle 字节数组或者 schema 不匹配。ds.map(lambdai:(i[0]1,i[1]),Types.TUPLE([Types.INT(),Types.STRING()]))\.sink_to(...)4. Operator Chaining为什么 Flink 默认会“把你的算子粘在一起”官方描述的核心是默认会把多个非 shuffle 的 Python 算子链在一起减少序列化/反序列化与调用开销提高吞吐。这能显著提升性能但也会在某些场景“适得其反”比如 flat_map 一个输入吐出成千上万个输出链在一起可能导致下游处理被单并行度拖死或你希望在某个节点切开单独调整并行度/slot 资源或希望隔离 backpressure 传播范围4.1 禁用 chaining 的几种方式官方列举你可以理解为三大类A. 用“会引入 shuffle/重分区”的算子切断禁用后续 chaining在某个算子后面加以下操作之一通常会打断链路key_byshuffleshufflerescalerebalancepartition_customB. 在当前算子上显式控制链路边界start_new_chain()只断开“前面到我”的链disable_chaining()断开“前后两边”的链C. 通过资源配置把链路切断给上下游设置不同parallelism或不同slot sharing group或全局配置python.operator-chaining.enabled false实战建议默认别动 chaining先跑通发现某段链“CPU 拉满且 backpressure 一路传”时再考虑拆链flat_map 爆炸式输出、或需要单独调并行度的节点是最常见拆链点5. 工程化必看Bundling Python Functions否则远程必踩 ModuleNotFoundError官方给了一个非常真实的生产坑如果 Python functions 不在 main 文件里而你提交到非本地模式YARN/Standalone/K8s不打包 python-files 很容易报ModuleNotFoundError: No module named my_function解决思路按官方用python-files把你的函数定义文件一起带上。经验补充写博客时可强调本地 IDE/mini cluster 可能“看不出问题”一到远程集群就炸所以从第一天就按“可提交”方式组织代码和依赖6. 在 Python Function 里加载资源用 open() 做一次性初始化典型场景模型推理/大字典/大配置只想加载一次。官方示例思路是继承 Function例如 MapFunction在open()里加载资源然后 map 里重复使用。frompyflink.datastream.functionsimportMapFunction,RuntimeContextimportpickleclassPredict(MapFunction):defopen(self,runtime_context:RuntimeContext):withopen(resources.zip/resources/model.pkl,rb)asf:self.modelpickle.load(f)defmap(self,x):returnself.model.predict(x)要点open()每个并行子任务会执行一次相当于每个 subtask 初始化一次模型要能在 TaskManager 侧访问到通常配合文件分发/依赖打包7. 最后给你一套“写作业时的快速检查清单”1你用了 lambda 吗如果是 ConnectedStream换 CoMapFunction/CoFlatMapFunction2你写 output_type 了吗尤其是flat_map / map 后要转 Tablesink 需要 Row/Tuple/schema3你远程跑吗函数分文件了吗如果是配置 python-files4flat_map 输出爆炸吗考虑拆链、调并行度5需要加载模型/资源吗放 open()别每条数据都加载

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询