Apache Storm - Trident

Trident 是 Storm 的扩展。 与 Storm 一样,Trident 也是由 Twitter 开发的。 开发 Trident 的主要原因是在 Storm 之上提供高级抽象以及状态流处理和低延迟分布式查询。

Trident 使用 spout 和 bolt,但这些低级组件在执行前由 Trident 自动生成。 Trident 具有函数、过滤器、连接、分组和聚合。

Trident 将流作为一系列批次处理,这些批次称为事务。 通常,这些小批量的大小将在数千或数百万个元组的数量级上,具体取决于输入流。 这样一来,Trident 就与 Storm 不同,Storm 执行的是逐个元组的处理。

批处理的概念与数据库事务非常相似。 每个事务都分配有一个事务 ID。 一旦所有处理完成,该事务就被认为是成功的。 但是,处理事务元组之一的失败将导致整个事务被重新传输。 对于每个批次,Trident 将在事务开始时调用 beginCommit,并在事务结束时提交。


TridentTopology

Trident API 提供了一个使用"TridentTopology"类创建 TridentTopology的简单选项。 基本上,TridentTopology从 spout 接收输入流,并对流进行有序的操作(过滤、聚合、分组等)。 Storm Tuple 被 Trident Tuple 替换,Bolts 被操作替换。 可以创建一个简单的 TridentTopology,如下所示 −

TridentTopology topology = new TridentTopology();

TridentTuple 元组

TridentTuple 元组是一个命名的值列表。 TridentTuple 接口是 TridentTopology 的数据模型。 TridentTuple 接口是 TridentTopology 可以处理的基本数据单元。


Trident Spout

Trident spout 与 Storm spout 类似,但提供了使用 Trident 功能的附加选项。 实际上,我们仍然可以使用我们在 TridentTopology 中使用的 IRichSpout,但它本质上是非事务性的,我们将无法使用 Trident 提供的优势。

具有使用 Trident 特性的所有功能的基本 spout 是"ITridentSpout"。 它支持事务和不透明的事务语义。 其他 spout 是 IBatchSpout、IPartitionedTridentSpout 和 IOpaquePartitionedTridentSpout。

除了这些通用的 spout 之外,Trident 还有许多 trident spout 的示例实现。 其中之一是 FeederBatchSpout spout,我们可以使用它轻松发送TridentTuple 的命名列表,而无需担心批处理、并行性等。

FeederBatchSpout 创建和数据馈送可以如下图所示完成 −

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident Operations

Trident 依靠"Trident Operation"来处理 trident 元组的输入流。 Trident API 有许多内置操作来处理从简单到复杂的流处理。 这些操作的范围从简单的验证到 Trident 元组的复杂分组和聚合。 让我们来看看最重要和最常用的操作。.

Filter

Filter 是一个用于执行输入验证任务的对象。 Trident 过滤器获取 trident 元组字段的子集作为输入,并根据是否满足某些条件返回真或假。如果返回 true,则元组保存在输出流中; 否则,从流中删除元组。 Filter 基本上会继承 BaseFilter 类并实现 isKeep 方法。 这是过滤器操作的示例实现 −

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

可以使用"each"方法在 topology 中调用过滤器函数。 "字段"类可用于指定输入(三叉戟元组的子集)。 示例代码如下 −

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

函数

Function 是用于对单个 trident 元组执行简单操作的对象。 它采用 trident 元组字段的子集并发出零个或多个新的 trident 元组字段。

Function 基本上继承自 BaseFunction 类并实现 execute 方法。 下面给出了一个示例实现 −

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

就像过滤器操作一样,函数操作可以使用 each 方法在 topology 中调用。 示例代码如下 −

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

聚合

聚合是一个对象,用于对输入批处理或分区或流执行聚合操作。 Trident 具有三种类型的聚合。 它们如下 −

  • aggregate − 单独聚合每批 trident 元组。 在聚合过程中,元组最初使用全局分组重新分区,以将同一批次的所有分区组合成一个分区。

  • partitionAggregate − 聚合每个分区而不是整个批次的 trident 元组。 分区聚合的输出完全替代了输入元组。 分区聚合的输出包含单个字段元组。

  • persistentaggregate − 聚合所有批次的所有 trident 元组,并将结果存储在内存或数据库中。

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

可以使用CombinerAggregator、ReducerAggregator 或通用Aggregator 接口创建聚合操作。 上例中使用的"count"聚合器是内置聚合器之一,使用"CombinerAggregator"实现。实现如下 −

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

分组

分组操作是一种内置操作,可以通过 groupBy 方法调用。 groupBy 方法通过对指定字段执行 partitionBy 对流进行重新分区,然后在每个分区中,它将组字段相等的元组组合在一起。 通常,我们使用"groupBy"和"persistentAggregate"来获得分组聚合。 示例代码如下 −

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

合并与加入

合并和加入可以分别使用"merge"和"join"方法来完成。 合并合并一个或多个流。 加入类似于合并,只是加入使用双方的 trident 元组字段来检查和加入两个流。 此外,加入将仅在批处理级别下工作。 示例代码如下 −

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

状态维护

Trident 提供了一种状态维护机制。 状态信息可以存储在 topology 本身中,否则您也可以将其存储在单独的数据库中。 原因是保持一个状态,即如果任何元组在处理过程中失败,则重试失败的元组。 这会在更新状态时产生问题,因为您不确定此元组的状态是否先前已更新。如果元组在更新状态之前已经失败,那么重试元组将使状态稳定。 但是,如果更新状态后元组失败了,那么重试相同的元组将再次增加数据库中的计数并使状态不稳定。 需要执行以下步骤以确保消息仅被处理一次 −

  • 小批量处理元组。

  • 为每个批次分配一个唯一的 ID。 如果该批次被重试,它会被赋予相同的唯一 ID。

  • 状态更新按批次排序。 例如,在第一批的状态更新完成之前,第二批的状态更新是不可能的。


分布式 RPC

分布式 RPC 用于从 Trident topology 中查询和检索结果。 Storm 有一个内置的分布式 RPC 服务器。 分布式 RPC 服务器接收来自客户端的 RPC 请求并将其传递给 topology 。topology 处理请求并将结果发送到分布式 RPC 服务器,分布式 RPC 服务器将其重定向到客户端。 Trident 的分布式 RPC 查询执行起来就像一个普通的 RPC 查询,除了这些查询是并行运行的。


何时使用 Trident?

在许多用例中,如果要求只处理一次查询,我们可以通过在 Trident 中编写拓扑来实现。 另一方面,在 Storm 的情况下,很难实现完全一次处理。 因此,Trident 对于那些只需要一次处理的用例很有用。 Trident 并不适用于所有用例,尤其是高性能用例,因为它增加了 Storm 的复杂性并管理状态。


Trident 的工作示例

我们将把上一节中开发的呼叫日志分析器应用程序转换为 Trident 框架。 与普通的storm相比,Trident应用程序相对容易,这要归功于它的高级API。 Storm 基本上需要在 Trident 中执行 Function、Filter、Aggregate、GroupBy、Join 和 Merge 操作中的任何一项。 最后,我们将使用 LocalDRPC 类启动 DRPC Server,并使用 LocalDRPC 类的 execute 方法搜索一些关键字。

格式化通话信息

FormatCall 类的目的是格式化包括"Caller number"和"Receiver number"的呼叫信息。 完整的程序代码如下 −

Coding: FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

CSVSplit 类的目的是根据"逗号 (,)"拆分输入字符串,并发出字符串中的每个单词。 该函数用于解析分布式查询的输入参数。 完整代码如下 −

Coding: CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

日志分析器

这是主要的应用程序。 最初,应用程序将使用 FeederBatchSpout 初始化 TridentTopology 并提供调用者信息。可以使用 TridentTopology 类的 newStream 方法创建 Trident 拓扑流。 类似地,可以使用 TridentTopology 类的 newDRCPStream 方法创建 Trident 拓扑 DRPC 流。 可以使用 LocalDRPC 类创建一个简单的 DRCP 服务器。 LocalDRPC 有执行方法来搜索一些关键字。 完整的代码如下。

Coding: LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

构建和运行应用程序

完整的应用程序包含三个 Java 代码。 它们如下 −

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

可以使用以下命令构建应用程序 −

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

该应用程序可以使用以下命令运行 −

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

输出

一旦应用启动,应用将输出集群启动过程、操作处理、DRPC Server和客户端信息,最后是集群关闭过程的完整细节。 此输出将显示在控制台上,如下所示。

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends