建设个人网银网站小红书推广怎么做
2026/6/20 6:55:03 网站建设 项目流程
建设个人网银网站,小红书推广怎么做,临沂百度网站建设,网站做多语言1. PyFlink 为什么要手动指定 Connector/Format JAR#xff1f; 因为#xff1a; Flink 核心运行时在 JVM 上connector#xff08;如 kafka#xff09;和 format#xff08;如 json#xff09;都是 JVM 侧实现Python 代码只是驱动 Table/SQL 的规划与提交 所以你需要通过…1. PyFlink 为什么要手动指定 Connector/Format JAR因为Flink 核心运行时在 JVM 上connector如 kafka和 format如 json都是 JVM 侧实现Python 代码只是驱动 Table/SQL 的规划与提交所以你需要通过pipeline.jars指定依赖多个 jar 用;分隔table_env.get_config().set(pipeline.jars,file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar)实战建议connector jar 和 format jar 都要带上例如 Kafka JSON路径用file:///这种绝对 URI避免分布式环境找不到文件生产上更推荐把 jar 放到统一位置Flink lib 或制品仓并在提交时声明依赖pipeline.jars适合快速验证与 demo2. 在 PyFlink Table API 中推荐用 DDL 定义 Source/SinkPyFlink 的 Table API 使用 connector 最推荐的方式是DDL execute_sql()理由很简单DDL 更直观、更可复制、也最接近线上 SQL Gateway/SQL Client 的使用方式。2.1 Kafka Source/Sink JSON Format最小可用示例source_ddl CREATE TABLE source_table( a VARCHAR, b INT ) WITH ( connector kafka, topic source_topic, properties.bootstrap.servers kafka:9092, properties.group.id test_3, scan.startup.mode latest-offset, format json ) sink_ddl CREATE TABLE sink_table( a VARCHAR ) WITH ( connector kafka, topic sink_topic, properties.bootstrap.servers kafka:9092, format json ) t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)t_env.sql_query(SELECT a FROM source_table)\.execute_insert(sink_table).wait()关键点拆解execute_sql()注册表source/sinksql_query()产出一个 Tableexecute_insert()触发写入并提交作业.wait()在本地/mini cluster 场景常用用于等待作业执行远程集群通常不建议一直 wait3. 完整可运行的 Python 结构把 jar、DDL、DML 串起来你给的完整示例结构非常标准我建议你在博客里也用这种方式组织代码frompyflink.tableimportTableEnvironment,EnvironmentSettingsdeflog_processing():env_settingsEnvironmentSettings.in_streaming_mode()t_envTableEnvironment.create(env_settings)# 1) 指定 connector format jarst_env.get_config().set(pipeline.jars,file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar)# 2) DDL: source/sinksource_ddl CREATE TABLE source_table( a VARCHAR, b INT ) WITH ( connector kafka, topic source_topic, properties.bootstrap.servers kafka:9092, properties.group.id test_3, scan.startup.mode latest-offset, format json ) sink_ddl CREATE TABLE sink_table( a VARCHAR ) WITH ( connector kafka, topic sink_topic, properties.bootstrap.servers kafka:9092, format json ) t_env.execute_sql(source_ddl)t_env.execute_sql(sink_ddl)# 3) DML: query insertt_env.sql_query(SELECT a FROM source_table)\.execute_insert(sink_table)\.wait()if__name____main__:log_processing()4. PyFlink 里“内置”的 Sources/Sinks不用额外 jar 也能跑除了 Kafka 这类外部 connectorFlink 也提供了一些“开箱即用”的数据源/数据汇特别适合本地调试与单测。4.1 from/to Pandas非常适合快速验证frompyflink.table.expressionsimportcolimportpandasaspdimportnumpyasnp pdfpd.DataFrame(np.random.rand(1000,2))tablet_env.from_pandas(pdf,[a,b]).filter(col(a)0.5)pdf2table.to_pandas()注意to_pandas()会把结果收集到客户端内存生产慎用建议先limit()。4.2 from_elements()用 Python 集合直接造表frompyflink.tableimportDataTypes# 自动推断table_env.from_elements([(1,Hi),(2,Hello)])# 指定字段名table_env.from_elements([(1,Hi),(2,Hello)],[a,b])# 指定 schema更稳table_env.from_elements([(1,Hi),(2,Hello)],DataTypes.ROW([DataTypes.FIELD(a,DataTypes.INT()),DataTypes.FIELD(b,DataTypes.STRING())]))这类内置 source 对写教程、做 POC、复现 bug 特别省事。5. 自定义 Sources SinksPython 不能直接写需 Java/Scala 实现文档明确说明了现阶段的边界自定义 source/sink 需要 Java/Scala 实现Python 侧可以通过实现 TableFactory也是 Java/Scala让它能被 DDL 发现并使用也就是说你可以用 PyFlink 写作业逻辑但 connector 生态仍然是 JVM 的。如果你后面要写“自定义 connector”系列博客可以按这个路线写先用 Java 写 DynamicTableSourceFactory / DynamicTableSinkFactorySPI 注册再在 PyFlink 里通过 DDLconnectorxxx直接使用6. 常见踩坑清单PyFlink Connector 场景高频问题只加了 connector jar没加 format jarDDL 里用了formatjson但没带 json format 的 jar会在运行期报找不到 format factorypipeline.jars 路径不可达本地 file 路径对集群 TaskManager 不可见必须用集群可访问路径或随 job 提交用 DDL 建表但没触发执行Table/SQL 是惰性执行必须execute_insert()或execute_sql(INSERT ...)才会提交作业wait() 用错场景本地调试很方便远程集群提交通常希望异步返回避免客户端阻塞

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询