APache Camel SEDA组件

SEDA组件

seda:组件提供异步  SEDA行为,以便在BlockingQueue上交换消息,并在与生产者的单独线程中调用消费者。

请注意,队列仅在单个 CamelContext中可见。如果要跨CamelContext实例进行通信(例如,在Web应用程序之间进行通信),请参阅VM组件。

如果VM在消息尚未处理时终止,则此组件不会实现任何类型的持久性或恢复。如果您需要持久性,可靠性或分布式SEDA,请尝试使用JMSActiveMQ

同步

当生产者发送消息交换时,Direct组件提供任何消费者的同步调用。

URI格式

SEDA:[?选项] someName

哪里  someName可以是唯一标识当前CamelContext中端点的字符串

您可以使用以下格式将查询选项附加到URI: ?option=value&option=value&...

列如:URL = “seda:persistStage?waitForTaskToComplete=Never&concurrentConsumers={{seda.threadCount}}&blockWhenFull=true&size={{seda.queueSize}}&timeout={{seda.timeout}}&failIfNoConsumers=true”/>

选项

名称 以来 默认 描述
size seda队列的最大容量  ,即它可以容纳的消息数。

Camel 2.2或更早版本中的默认值为1000

来自Camel 2.3:默认情况下,大小无限制。

 

注意:使用此选项时应小心。大小由创建第一个端点时指定的值确定。因此,每个端点必须指定相同的大小。

 Camel 2.11:进行验证以确保对于相同的队列名称使用混合队列大小,Camel将检测到这一点并且无法创建端点。

concurrentConsumers 1 处理交换的并发线程数。
waitForTaskToComplete IfReplyExpected 用于指定调用方是否应在继续之前等待异步任务完成的选项。

支持以下选项:

  • Always
  • Never
  • IfReplyExpected

前两个值是不言自明的。

最后一个值IfReplyExpected只会在消息为Request Reply based时等待。

有关详细信息,请参阅异步消息。

timeout 30000 seda生成器停止等待异步任务完成之前的超时(以毫秒为单位)  。

有关详细信息,请参阅waitForTaskToComplete异步

Camel 2.2:您现在可以通过使用0或否定值来禁用超时  。

multipleConsumers 2.2 false 指定是否允许多个使用者。如果启用,您可以使用SEDA进行发布 – 订阅消息传递。也就是说,您可以向seda队列发送消息,  并让每个消费者都收到该消息的副本。启用后,应在每个使用者端点上指定此选项。
limitConcurrentConsumers 2.3 true 是否限制concurrentConsumers到最大数量500

默认情况下,如果seda端点配置了更大的数字,则会引发异常  。您可以通过关闭此选项来禁用该检查。

blockWhenFull 2.9 false 将消息发送到完整seda队列的线程是否  会阻塞,直到队列的容量不再耗尽为止。默认情况下,将抛出一个异常,说明队列已满。通过启用此选项,调用线程将阻塞并等待,直到可以接受消息。
queueSize 2.9 仅限组件:seda队列的最大大小(它可以容纳的消息数量的容量)  。

size未指定时使用此选项。

pollTimeout 2.9.3 1000 仅限使用者:轮询时使用的超时。发生超时时,消费者可以检查是否允许继续运行。设置较低的值允许消费者在关机时更快地做出反应。
purgeWhenStopping 2.11.1 false 是否在停止使用者/路由时清除任务队列。这允许更快地停止,因为队列上的任何未决消息都被丢弃。
queue 2.12.0 null 定义将由seda端点使用的队列实例
queueFactory 2.12.0 null 定义  QueueFactory哪个可以为seda端点创建队列
failIfNoConsumers 2.12.0 false 生成器是否应该在发送到没有活动使用者的seda队列时抛出异常而失败  。

只有一个选项  discardIfNoConsumers,并failIfNoConsumers可以在同一时间启用。

discardIfNoConsumers 2.16 false 生成器是否应该在发送到没有活动使用者的seda队列时丢弃该消息(不要将消息添加到队列中)  。

只有一个选项  discardIfNoConsumers,并  failIfNoConsumers可以在同一时间启用。

选择BlockingQueue实现

自Camel 2.12起可用

XML<bean id =“arrayQueue”class =“java.util.ArrayBlockingQueue”> <constructor-arg index =“0”value =“10”> <! – size – > <constructor-arg index =“1”value =“true”> <! – fairness – > </ bean> <! – … – > <from uri =“seda:array?queue =#arrayQueue”/>

默认情况下,  seda组件实例化a LinkedBlockingQueue。但是,可以通过指定自定义BlockingQueue实现来选择不同的  实现。配置自定义实现时,将size忽略该  选项。

可用BlockingQueueFactory实施列表  包括:

  • LinkedBlockingQueueFactory
  • ArrayBlockingQueueFactory
  • PriorityBlockingQueueFactory

XML<bean id =“priorityQueueFactory”class =“org.apache.camel.component.seda.PriorityBlockingQueueFactory”> <property name =“comparator”> <bean class =“org.apache.camel.demo.MyExchangeComparator”/> </ property> </ bean> <! – …以及 – > <from uri =“seda:priority?queueFactory =#priorityQueueFactory&size = 100”/> <! – … – > 

使用请求回复

SEDA组件支持使用请求回复,其中主叫方将等待异步途径来完成。例如:

java的from(“mina:tcp://0.0.0.0:9876?textline = true&sync = true”)。to(“seda:input”); from(“seda:input”).to(“bean:processInput”)。to(“bean:createResponse”);

在上面的路由中,我们在端口上有一个TCP侦听器,  9876它接受传入的请求。请求被路由到seda:input队列。由于它是请求回复消息,我们等待响应。当seda:input队列中的使用者完成时,它将响应复制到原始消息响应。

直到2.2:仅适用于2个端点

SEDAVM上使用请求回复仅适用于2个端点。您不能通过发送到等等链接端点  。原因是实现逻辑相当简单。要支持3+端点,逻辑会更加复杂,以便正确处理等待线程之间的排序和通知。A -> B -> CA -> B

这在Camel 2.3中得到了改进,它允许您根据需要链接尽可能多的端点。

并发消费者

默认情况下,SEDA端点使用单个使用者线程,但您可以将其配置为使用并发使用者线程。因此,您可以使用以下代码而不是线程池:

java的from(“seda:stageName?concurrentConsumers = 5”)。process(…)

至于两者之间的区别,请注意线程池可以在运行时根据负载动态增加/收缩,而并发消费者的数量总是固定的。

线程池

请注意,通过执行以下操作,将线程池添加到  seda端点:

java的from(“seda:stageName”)。thread(5).process(…)

可以结束两个BlockQueues一个来自  seda端点,一个来自线程池的工作队列,这可能不是你想要的。相反,您可能希望使用线程池配置Direct端点,该线程池可以同步和异步处理消息。例如:

java的from(“direct:stageName”)。thread(5).process(…)

您还可以使用该选项直接配置在seda端点上处理消息的线程数  concurrentConsumers

样品

在下面的路由中,我们使用SEDA队列将请求发送到此异步队列,以便能够发送一个fire-and-forget消息以便在另一个线程中进一步处理,并在此线程中向原始调用者返回一个常量回复。排队{片段:ID = E1 | LANG = java的| URL =骆驼/中继/骆驼核/ SRC /测试/ JAVA /组织/阿帕奇/骆驼/组件/ SEDA / SedaAsyncRouteTest.java}在这里,我们发送一个Hello World消息,并期望回复没问题。排队{片段:ID = E2 | =郎的java | URL =骆驼/中继/骆驼核/ SRC /测试/ JAVA /组织/阿帕奇/骆驼/组件/ SEDA / SedaAsyncRouteTest.java}该  Hello World消息将来自被消耗  SEDA从另一个线程用于进一步的处理队列中。由于这是来自单元测试,因此它将被发送到mock端点,我们可以在单元测试中进行断言。

运用 multipleConsumers

自Camel 2.2起可用

在这个例子中,我们定义了两个使用者并将它们注册为spring bean。排队{片段:ID = E1 | LANG = XML |网址=骆驼/中继/组件/骆驼弹簧/ SRC /测试/资源/组织/阿帕奇/骆驼/弹簧/示例/ fooEventRoute.xml}由于我们已multipleConsumers=trueseda foo端点上指定,因此我们可以让这两个消费者收到他们自己的消息副本,作为一种pub-sub样式的消息传递。

由于bean是单元测试的一部分,因此它们只是将消息发送到  mock端点。注意使用@Consumeseda队列中消耗。排队{片段:ID = E1 | LANG = java的| URL =骆驼/中继/组件/骆驼弹簧/ SRC /测试/ JAVA /组织/阿帕奇/骆驼/弹簧/示例/ FooEventConsumer.java}

发表评论

电子邮件地址不会被公开。 必填项已用*标注