现在的位置: 首页 > 综合 > 正文

camel JMS replyTo

2014年12月03日 ⁄ 综合 ⁄ 共 12860字 ⁄ 字号 评论关闭

 

package test.requestreply;

 

import javax.jms.ConnectionFactory;

 

import org.apache.activemq.ActiveMQConnectionFactory;

 

import org.apache.activemq.camel.component.ActiveMQComponent;

 

import org.apache.camel.CamelContext;

 

import org.apache.camel.Exchange;

 

import org.apache.camel.Processor;

 

import org.apache.camel.ProducerTemplate;

 

import org.apache.camel.builder.RouteBuilder;

 

import org.apache.camel.component.mock.MockEndpoint;

 

import org.apache.camel.impl.DefaultCamelContext;

 

public class Test {

publicstaticvoid
main(String args[])
throws Exception {

fromJava();

}

publicstaticvoid
fromJava()
throws Exception {

CamelContext context = new DefaultCamelContext();

ProducerTemplate template = context.createProducerTemplate();

ActiveMQComponent component = context.getComponent("activemq", ActiveMQComponent.class);

ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&broker.useJmx=false");

component.setConnectionFactory(cf);

context.addRoutes(new RouteBuilder() {

@Override

 

publicvoid
configure()
throws Exception {

from("activemq:queue:queue2").inOut("activemq:queue:queue3").process(new
TestProcessor())

.to("mock:result").to("log:com.citi.etrading.retail.tps.batch.common.processor.ETPSBatchMessageConverter?level=INFO");

from("activemq:queue:queue3").process(new
TestProcessor1())

.to("mock:result");

 

from("activemq:queue:queue1?replyTo=queue:queue2&replyToType=Exclusive")

.transform(constant("Bye Camel"));

//.to("activemq:queue:queue3");

 

}

});

context.start();

template.sendBody("activemq:queue:queue1","Hello
World"
);

Thread.sleep(5000);

MockEndpoint endpoint = (MockEndpoint)context.getEndpoint("mock:result");

endpoint.expectedBodiesReceived("Hello World");

context.stop();

}

publicstaticclass
TestProcessor
implements Processor{

publicvoid
process(Exchange exchange)
throws Exception {

//TODO
Auto-generated method stub

 

System.out.println(exchange);

System.out.println("queue2");

}

}

publicstaticclass
TestProcessor1
implements Processor{

publicvoid
process(Exchange exchange)
throws Exception {

//TODO
Auto-generated method stub

 

System.out.println(exchange);

System.out.println("queue3");

}

}

}

 

 

 

===============

http://stackoverflow.com/questions/10085340/camel-is-it-possible-to-implement-request-reply-competing-consumers-with-the

If you use Camel 2.9 or better, then I suggest to use replyToType=Exclusive on the activemq endpoint where you do request/reply. This tells Camel that the queue is exclusive, and it speedup, as no JMS message selectors is needed to pickup expected correlated
messages.

See the section Request-reply over JMS onwards at the Camel JMS docs:http://camel.apache.org/jms

If you use temporary queues, then that is also fast as well, as no JMS message selectors is needed.

Also your route starts with a direct endpoint. That is a synchronous call, so the caller will wait/block until the Exchange is completely done.

Also the Splitter EIP can be configured to run in parallel mode which will use concurrent processing. And if you have a big message to split, then consider using streaming which will split the message on-demand, instead of loading the entire message content
into memory.

Anyway there is a lot going on in the route. Can you pin-point more precisely where you have an issue? It makes it easier to help out.

 

 

=================

http://grokbase.com/t/camel/users/128n88xeva/how-to-use-request-reply-in-jms

Hi

Just configure the name of the reply queue on the from uri. You may
want to configure the reply queue as being exclusive then it runs
faster. Then when the route completes, the message will be send back
to the reply queue. eg in the example below, after the processor has
run, the message containing "Hello World" will be send back to the
OUTPUT.Q.

from("activemq://queue:INPUT.Q?jmsMessageType=Object&replyTo=OUTPUT.Q&replyToType=Exclusive")
.process(new
Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getOut().setBody("Hello World");
}
});

I suggest to read the JMS page. It has sections about request/reply
etc, and some pointers how and what to look out for. And what to
configure etc.
http://camel.apache.org/jms

And mind that you can use 3 kind of queues when doing request/reply
- temporary queue
- shared queue
- exclusive queue

So mind the difference.

Make sure someone is listening on OUTPUT.Q and sending back a message
to REPLY.Q.
You get this exception because Camel timed out after 20 seconds. You
can adjust this value if you need longer timeout.

===================

http://camel.465427.n5.nabble.com/After-publish-OutOnly-Reply-received-for-unknown-correlationID-td3368679.html

After publish (OutOnly): Reply received for unknown correlationID

Use the disableReplyTo option to not expect any replies.

disableReplyTo false If true, a producer will behave like a InOnly exchange with the exception that JMSReplyTo header is sent out and not be suppressed like in the case of InOnly. Like InOnly the producer will not wait for a reply. A consumer with this flag
will behave like InOnly. This feature can be used to bridge InOut requests to another queue so that a route on the other queue will send it´s response directly back to the original JMSReplyTo.

 

================http://camel.apache.org/jms.html

About using Camel to send and receive messages and JMSReplyTo

The JMS component is complex and you have to pay close attention to how it works in some cases. So this is a short summary of some of the areas/pitfalls to look for.

When Camel sends a message using its JMSProducer, it checks the following conditions:

  • The message exchange pattern,
  • Whether a JMSReplyTo was set in the endpoint or in the message headers,
  • Whether any of the following options have been set on the JMS endpoint:
    disableReplyTo
    , preserveMessageQos, explicitQosEnabled.

All this can be a tad complex to understand and configure to support your use case.

JmsProducer

The JmsProducer behaves as follows, depending on configuration:

Exchange Pattern

Other options

Description

InOut

-

Camel will expect a reply, set a temporary JMSReplyTo, and after sending the message, it will start to listen for the reply message on the temporary queue.

InOut

JMSReplyTo is set

Camel will expect a reply and, after sending the message, it will start to listen for the reply message on the specified
JMSReplyTo queue.

InOnly

-

Camel will send the message and not expect a reply.

InOnly

JMSReplyTo is set

By default, Camel discards the JMSReplyTo destination and clears the
JMSReplyTo header before sending the message. Camel then sends the message and does
not expect a reply. Camel logs this in the log at WARN level (changed to
DEBUG level from Camel 2.6 onwards. You can use
preserveMessageQuo=true
to instruct Camel to keep the JMSReplyTo. In all situations the
JmsProducer does not expect any reply and thus continue after sending the message.

JmsConsumer

The JmsConsumer behaves as follows, depending on configuration:

Exchange Pattern

Other options

Description

InOut

-

Camel will send the reply back to the JMSReplyTo queue.

InOnly

-

Camel will not send a reply back, as the pattern is InOnly.

-

disableReplyTo=true

This option suppresses replies.

So pay attention to the message exchange pattern set on your exchanges.

If you send a message to a JMS destination in the middle of your route you can specify the exchange pattern to use, see more at

Request Reply
.

This is useful if you want to send an InOnly message to a JMS topic:

from("activemq:queue:in")

   .to("bean:validateOrder")

   .to(ExchangePattern.InOnly, "activemq:topic:order")

   .to("bean:handleOrder");

Request-reply over JMS

Camel supports
Request Reply
over JMS. In essence the MEP of the Exchange should be InOut when you send a message to a JMS queue.

Camel offers a number of options to configure request/reply over JMS that influence performance and clustered environments. The table below summaries the options.

Option

Performance

Cluster

Description

Temporary

Fast

Yes

A temporary queue is used as reply queue, and automatic created by Camel. To use this do
not specify a replyTo queue name. And you can optionally configure
replyToType=Temporary to make it stand out that temporary queues are in use.

Shared

Slow

Yes

A shared persistent queue is used as reply queue. The queue must be created beforehand, although some brokers can create them on the fly such as Apache ActiveMQ. To use this you must specify the replyTo queue name. And you can optionally configure
replyToType=Shared to make it stand out that shared queues are in use. A shared queue can be used in a clustered environment with multiple nodes running this Camel application at the same time. All using the same shared reply queue. This is possible
because JMS Message selectors are used to correlate expected reply messages; this impacts performance though. JMS Message selectors is slower, and therefore not as fast as
Temporary or Exclusive queues. See further below how to tweak this for better performance.

Exclusive

Fast

No (*Yes)

An exclusive persistent queue is used as reply queue. The queue must be created beforehand, although some brokers can create them on the fly such as Apache ActiveMQ. To use this you must specify the replyTo queue name. And you
must configure replyToType=Exclusive to instruct Camel to use exclusive queues, as
Shared is used by default, if a replyTo queue name was configured. When using exclusive reply queues, then JMS Message selectors are
not in use, and therefore other applications must not use this queue as well. An exclusive queue
cannot be used in a clustered environment with multiple nodes running this Camel application at the same time; as we do not have control if the reply queue comes back to the same node that sent the request message; that is why shared queues
use JMS Message selectors to make sure of this. Though if you configure each Exclusive reply queue with an unique name per node, then you can run this in a clustered environment. As then the reply message will be sent back to that queue for
the given node, that awaits the reply message.

concurrentConsumers

Fast

Yes

Camel 2.10.3: Allows to process reply messages concurrently using concurrent message listeners in use. You can specify a range using the
concurrentConsumers and maxConcurrentConsumers options.
Notice: That using Shared reply queues may not work as well with concurrent listeners, so use this option with care.

maxConcurrentConsumers

Fast

Yes

Camel 2.10.3: Allows to process reply messages concurrently using concurrent message listeners in use. You can specify a range using the
concurrentConsumers and maxConcurrentConsumers options.
Notice: That using Shared reply queues may not work as well with concurrent listeners, so use this option with care.

The JmsProducer detects the InOut and provides a
JMSReplyTo
header with the reply destination to be used. By default Camel uses a temporary queue, but you can use the
replyTo option on the endpoint to specify a fixed reply queue (see more below about fixed reply queue).

Camel will automatic setup a consumer which listen on the reply queue, so you should
not do anything.

This consumer is a Spring DefaultMessageListenerContainer which listen for replies. However it's fixed to 1 concurrent consumer.

That means replies will be processed in sequence as there are only 1 thread to process the replies. If you want to process replies faster, then we need to use concurrency. But
not using the concurrentConsumer option. We should use the
threads from the Camel DSL instead, as shown in the route below:

Icon

Instead of using threads, then use concurrentConsumers option if using Camel 2.10.3 or better. See further below.

from(xxx)
.inOut().to("activemq:queue:foo")
.threads(5)
.to(yyy)
.to(zzz);

In this route we instruct Camel to route replies
asynchronously
using a thread pool with 5 threads.

From Camel 2.10.3 onwards you can now configure the listener to use concurrent threads using the
concurrentConsumers and maxConcurrentConsumers options. This allows you to easier configure this in Camel as shown below:

from(xxx)
.inOut().to("activemq:queue:foo?concurrentConsumers=5")
.to(yyy)
.to(zzz);

Request-reply over JMS and using a shared fixed reply queue

If you use a fixed reply queue when doing
Request Reply
over JMS as shown in the example below, then pay attention.

from(xxx)
.inOut().to("activemq:queue:foo?replyTo=bar")
.to(yyy)

In this example the fixed reply queue named "bar" is used. By default Camel assumes the queue is shared when using fixed reply queues, and therefore it uses a
JMSSelector to only pickup the expected reply messages (eg based on the
JMSCorrelationID). See next section for exclusive fixed reply queues. That means its not as fast as temporary queues. You can speedup how often Camel will pull for reply messages using the
receiveTimeout option. By default its 1000 millis. So to make it faster you can set it to 250 millis to pull 4 times per second as shown:

from(xxx)
.inOut().to("activemq:queue:foo?replyTo=bar&receiveTimeout=250")
.to(yyy)

Notice this will cause the Camel to send pull requests to the message broker more frequent, and thus require more network traffic.

It is generally recommended to use temporary queues if possible.

Request-reply over JMS and using an exclusive fixed reply queue

Available as of Camel 2.9

In the previous example, Camel would anticipate the fixed reply queue named "bar" was shared, and thus it uses a
JMSSelector to only consume reply messages which it expects. However there is a drawback doing this as JMS selectos is slower. Also the consumer on the reply queue is slower to update with new JMS selector ids. In fact it only updates when the
receiveTimeout option times out, which by default is 1 second. So in theory the reply messages could take up till about 1 sec to be detected. On the other hand if the fixed reply queue is exclusive to the Camel reply consumer, then we can avoid
using the JMS selectors, and thus be more performant. In fact as fast as using temporary queues. So in
Camel 2.9 onwards we introduced the ReplyToType option which you can configure to
Exclusive

to tell Camel that the reply queue is exclusive as shown in the example below:

from(xxx)
.inOut().to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive")
.to(yyy)

Mind that the queue must be exclusive to each and every endpoint. So if you have two routes, then they each need an unique reply queue as shown in the next example:

from(xxx)
.inOut().to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive")
.to(yyy)
 
from(aaa)
.inOut().to("activemq:queue:order?replyTo=order.reply&replyToType=Exclusive")
.to(bbb)

The same applies if you run in a clustered environment. Then each node in the cluster must use an unique reply queue name. As otherwise each node in the cluster may pickup messages which was intended as a reply on another node. For clustered environments
its recommended to use shared reply queues instead.

 

抱歉!评论已关闭.