2026/6/20 3:01:57
网站建设
项目流程
网站建设多语种自动翻译插件,淘宝优惠卷网站怎么做,前几年做啥网站致富,河南核酸检测vip你好#xff0c;我是程序员贵哥。
今天我要与你分享的主题是“Apache Beam实战冲刺#xff1a;Beam如何run everywhere”。
你可能已经注意到#xff0c;自第26讲到第29讲#xff0c;从Pipeline的输入输出#xff0c;到Pipeline的设计#xff0c;再到Pipeline的测试…你好我是程序员贵哥。今天我要与你分享的主题是“Apache Beam实战冲刺Beam如何run everywhere”。你可能已经注意到自第26讲到第29讲从Pipeline的输入输出到Pipeline的设计再到Pipeline的测试Beam Pipeline的概念一直贯穿着文章脉络。那么这一讲我们一起来看看一个完整的Beam Pipeline究竟是如何编写的。Beam Pipeline一个Pipeline或者说是一个数据处理任务基本上都会包含以下三个步骤读取输入数据到PCollection。对读进来的PCollection做某些操作也就是Transform得到另一个PCollection。输出你的结果PCollection。这么说看起来很简单但你可能会有些迷惑这些步骤具体该怎么做呢其实这些步骤具体到Pipeline的实际编程中就会包含以下这些代码模块Java// Start by defining the options for the pipeline. PipelineOptions options PipelineOptionsFactory.create(); // Then create the pipeline. Pipeline pipeline Pipeline.create(options); PCollectionString lines pipeline.apply( ReadLines, TextIO.read().from(gs://some/inputData.txt)); PCollectionString filteredLines lines.apply(new FilterLines()); filteredLines.apply(WriteMyFile, TextIO.write().to(gs://some/outputData.txt)); pipeline.run().waitUntilFinish();从上面的代码例子中你可以看到第一行和第二行代码是创建Pipeline实例。任何一个Beam程序都需要先创建一个Pipeline的实例。Pipeline实例就是用来表达Pipeline类型的对象。这里你需要注意一个二进制程序可以动态包含多个Pipeline实例。还是以之前的美团外卖电动车处理的例子来做说明吧。比如我们的程序可以动态判断是否存在第三方的电动车图片只有当有需要处理图片时我们才去创建一个Pipeline实例处理。我们也可以动态判断是否存在需要转换图片格式有需要时我们再去创建第二个Pipeline实例。这时候你的二进制程序可能包含0个、1个或者是2个Pipeline实例。每一个实例都是独立的它封装了你要进行操作的数据和你要进行的操作Transform。Pipeline实例的创建是使用Pipeline.create(options)这个方法。其中options是传递进去的参数options是一个PipelineOptions这个类的实例。我们会在后半部分展开PipelineOptions的丰富变化。第三行代码我们用TextIO.read()这个Transform读取了来自外部文本文件的内容把所有的行表示为一个PCollection。第四行代码用 lines.apply(new FilterLines()) 对读进来的PCollection进行了过滤操作。第五行代码 filteredLines.apply(“WriteMyFile”, TextIO.write().to(“gs://some/outputData.txt”))表示把最终的PCollection结果输出到另一个文本文件。程序运行到第五行的时候是不是我们的数据处理任务就完成了呢并不是。记得我们在第24讲、第25讲中提过Beam是延迟运行的。程序跑到第五行的时候只是构建了Beam所需要的数据处理DAG用来优化和分配计算资源真正的运算完全没有发生。所以我们需要最后一行pipeline.run().waitUntilFinish()这才是数据真正开始被处理的语句。这时候运行我们的代码是不是就大功告成呢别急我们还没有处理好程序在哪里运行的问题。你一定会好奇我们的程序究竟在哪里运行不是说好了分布式数据处理吗在上一讲《如何测试Beam Pipeline》中我们学会了在单元测试环境中运行Beam Pipeline。就如同下面的代码。和上文的代码类似我们把Pipeline.create(options)替换成了TestPipeline.create()。JavaPipeline p TestPipeline.create(); PCollectionString input p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); PCollectionString output input.apply(new CountWords()); PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY); p.run();TestPipeline是Beam Pipeline中特殊的一种让你能够在单机上运行小规模的数据集。之前我们在分析Beam的设计理念时提到过Beam想要把应用层的数据处理业务逻辑和底层的运算引擎分离开来。现如今Beam可以做到让你的Pipeline代码无需修改就可以在本地、Spark、Flink或者在Google Cloud DataFlow上运行。这些都是通过Pipeline.create(options) 这行代码中传递的PipelineOptions实现的。在实战中我们应用到的所有option其实都是实现了PipelineOptions这个接口。举个例子如果我们希望将数据流水线放在Spark这个底层数据引擎运行的时候我们便可以使用SparkPipelineOptions。如果我们想把数据流水线放在Flink上运行就可以使用FlinkPipelineOptions。而这些都是extends了PipelineOptions的接口示例如下Javaoptions PipelineOptionsFactory.as(SparkPipelineOptions.class); Pipeline pipeline Pipeline.create(options);通常一个PipelineOption是用PipelineOptionsFactory这个工厂类来创建的它提供了两个静态工厂方法给我们去创建分别是PipelineOptionsFactory.as(Class)和PipelineOptionsFactory.create()。像上面的示例代码就是用PipelineOptionsFactory.as(Class)这个静态工厂方法来创建的。当然了更加常见的创建方法是从命令行中读取参数来创建PipelineOption使用的是PipelineOptionsFactory#fromArgs(String[])这个方法例如Javapublic static void main(String[] args) { PipelineOptions options PipelineOptionsFactory.fromArgs(args).create(); Pipeline p Pipeline.create(options); }下面我们来看看不同的运行模式的具体使用方法。直接运行模式我们先从直接运行模式开始讲。这是我们在本地进行测试或者调试时倾向使用的模式。在直接运行模式的时候Beam会在单机上用多线程来模拟分布式的并行处理。使用Java Beam SDK时我们要给程序添加Direct Runner的依赖关系。在下面这个maven依赖关系定义文件中我们指定了beam-runners-direct-java这样一个依赖关系。pom.xml dependency groupIdorg.apache.beam/groupId artifactIdbeam-runners-direct-java/artifactId version2.13.0/version scoperuntime/scope /dependency一般我们会把runner通过命令行指令传递进程序。就需要使用PipelineOptionsFactory.fromArgs(args)来创建PipelineOptions。PipelineOptionsFactory.fromArgs()是一个工厂方法能够根据命令行参数选择生成不同的PipelineOptions子类。PipelineOptions options PipelineOptionsFactory.fromArgs(args).create();在实验程序中也可以强行使用Direct Runner。比如PipelineOptions options PipelineOptionsFactory.create(); options.setRunner(DirectRunner.class); // 或者这样 options PipelineOptionsFactory.as(DirectRunner.class); Pipeline pipeline Pipeline.create(options);如果是在命令行中指定Runner的话那么在调用这个程序时候需要指定这样一个参数–runnerDirectRunner。比如mvn compile exec:java -Dexec.mainClassYourMainClass \ -Dexec.args--runnerDirectRunner -Pdirect-runnerSpark运行模式如果我们希望将数据流水线放在Spark这个底层数据引擎运行的时候我们便可以使用Spark Runner。Spark Runner执行Beam程序时能够像原生的Spark程序一样。比如在Spark本地模式部署应用跑在Spark的RM上或者用YARN。Spark Runner为在Apache Spark上运行Beam Pipeline提供了以下功能Batch 和streaming的数据流水线和原生RDD和DStream一样的容错保证和原生Spark同样的安全性能可以用Spark的数据回报系统使用Spark Broadcast实现的Beam side-input。目前使用Spark Runner必须使用Spark 2.2版本以上。这里我们先添加beam-runners-spark的依赖关系。dependency groupIdorg.apache.beam/groupId artifactIdbeam-runners-spark/artifactId version2.13.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version${spark.version}/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version${spark.version}/version /dependency然后要使用SparkPipelineOptions传递进Pipeline.create()方法。常见的创建方法是从命令行中读取参数来创建PipelineOption使用的是PipelineOptionsFactory.fromArgs(String[])这个方法。在命令行中你需要指定runnerSparkRunnermvn exec:java -Dexec.mainClassYourMainClass \ -Pspark-runner \ -Dexec.args--runnerSparkRunner \ --sparkMasterspark master url也可以在Spark的独立集群上运行这时候spark的提交命令spark-submit。spark-submit --class YourMainClass --master spark://HOST:PORT target/...jar --runnerSparkRunner当Beam程序在Spark上运行时你也可以同样用Spark的网页监控数据流水线进度。Flink运行模式Flink Runner是Beam提供的用来在Flink上运行Beam Pipeline的模式。你可以选择在计算集群上比如 Yarn/Kubernetes/Mesos 或者本地Flink上运行。Flink Runner适合大规模连续的数据处理任务包含了以下功能以Streaming为中心支持streaming处理和batch处理和flink一样的容错性和exactly-once的处理语义可以自定义内存管理模型和其他例如YARN的Apache Hadoop生态整合比较好。其实看到这里你可能已经掌握了这里面的诀窍。就是通过PipelineOptions来指定runner而你的数据处理代码不需要修改。PipelineOptions可以通过命令行参数指定。那么类似Spark Runner你也可以使用Flink来运行Beam程序。同样的首先你需要在pom.xml中添加Flink Runner的依赖。dependency groupIdorg.apache.beam/groupId artifactIdbeam-runners-flink-1.6/artifactId version2.13.0/version /dependency然后在命令行中指定flink runnermvn exec:java -Dexec.mainClassYourMainClass \ -Pflink-runner \ -Dexec.args--runnerFlinkRunner \ --flinkMasterflink master urlGoogle Dataflow 运行模式Beam Pipeline也能直接在云端运行。Google Cloud Dataflow就是完全托管的Beam Runner。当你使用Google Cloud Dataflow服务来运行Beam Pipeline时它会先上传你的二进制程序到Google Cloud随后自动分配计算资源创建Cloud Dataflow任务。同前面讲到的Direct Runner和Spark Runner类似你还是需要为Cloud Dataflow添加beam-runners-google-cloud-dataflow-java依赖关系dependency groupIdorg.apache.beam/groupId artifactIdbeam-runners-google-cloud-dataflow-java/artifactId version2.13.0/version scoperuntime/scope /dependency我们假设你已经在Google Cloud上创建了project那么就可以用类似的命令行提交任务mvn -Pdataflow-runner compile exec:java \ -Dexec.mainClassYourMainClass \ -Dexec.args--projectPROJECT_ID \ --stagingLocationgs://STORAGE_BUCKET/staging/ \ --outputgs://STORAGE_BUCKET/output \ --runnerDataflowRunner