package test.requestreply;
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
public class Test {
publicstaticvoid
main(String args[])throws Exception {
fromJava();
}
publicstaticvoid
fromJava()throws Exception {
CamelContext context = new DefaultCamelContext();
ProducerTemplate template = context.createProducerTemplate();
ActiveMQComponent component = context.getComponent("activemq", ActiveMQComponent.class);
ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&broker.useJmx=false");
component.setConnectionFactory(cf);
context.addRoutes(new RouteBuilder() {
@Override
publicvoid
configure()throws Exception {
from("activemq:queue:queue2").inOut("activemq:queue:queue3").process(new
TestProcessor())
.to("mock:result").to("log:com.citi.etrading.retail.tps.batch.common.processor.ETPSBatchMessageConverter?level=INFO");
from("activemq:queue:queue3").process(new
TestProcessor1())
.to("mock:result");
from("activemq:queue:queue1?replyTo=queue:queue2&replyToType=Exclusive")
.transform(constant("Bye Camel"));
//.to("activemq:queue:queue3");
}
});
context.start();
template.sendBody("activemq:queue:queue1","Hello
World");
Thread.sleep(5000);
MockEndpoint endpoint = (MockEndpoint)context.getEndpoint("mock:result");
endpoint.expectedBodiesReceived("Hello World");
context.stop();
}
publicstaticclass
TestProcessorimplements Processor{
publicvoid
process(Exchange exchange)throws Exception {
//TODO
Auto-generated method stub
System.out.println(exchange);
System.out.println("queue2");
}
}
publicstaticclass
TestProcessor1implements Processor{
publicvoid
process(Exchange exchange)throws Exception {
//TODO
Auto-generated method stub
System.out.println(exchange);
System.out.println("queue3");
}
}
}
===============
If you use Camel 2.9 or better, then I suggest to use replyToType=Exclusive on the activemq endpoint where you do request/reply. This tells Camel that the queue is exclusive, and it speedup, as no JMS message selectors is needed to pickup expected correlated
messages.
See the section Request-reply over JMS onwards at the Camel JMS docs:http://camel.apache.org/jms
If you use temporary queues, then that is also fast as well, as no JMS message selectors is needed.
Also your route starts with a direct endpoint. That is a synchronous call, so the caller will wait/block until the Exchange is completely done.
Also the Splitter EIP can be configured to run in parallel mode which will use concurrent processing. And if you have a big message to split, then consider using streaming which will split the message on-demand, instead of loading the entire message content
into memory.
Anyway there is a lot going on in the route. Can you pin-point more precisely where you have an issue? It makes it easier to help out.
=================
http://grokbase.com/t/camel/users/128n88xeva/how-to-use-request-reply-in-jms
Hi
Just configure the name of the reply queue on the from uri. You may
want to configure the reply queue as being exclusive then it runs
faster. Then when the route completes, the message will be send back
to the reply queue. eg in the example below, after the processor has
run, the message containing "Hello World" will be send back to the
OUTPUT.Q.
from("activemq://queue:INPUT.Q?jmsMessageType=Object&replyTo=OUTPUT.Q&replyToType=Exclusive")
.process(new
Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getOut().setBody("Hello World");
}
});
I suggest to read the JMS page. It has sections about request/reply
etc, and some pointers how and what to look out for. And what to
configure etc.
http://camel.apache.org/jms
And mind that you can use 3 kind of queues when doing request/reply
- temporary queue
- shared queue
- exclusive queue
So mind the difference.
Make sure someone is listening on OUTPUT.Q and sending back a message
to REPLY.Q.
You get this exception because Camel timed out after 20 seconds. You
can adjust this value if you need longer timeout.
===================
After publish (OutOnly): Reply received for unknown correlationID
Use the disableReplyTo option to not expect any replies.
disableReplyTo false If true, a producer will behave like a InOnly exchange with the exception that JMSReplyTo header is sent out and not be suppressed like in the case of InOnly. Like InOnly the producer will not wait for a reply. A consumer with this flag
will behave like InOnly. This feature can be used to bridge InOut requests to another queue so that a route on the other queue will send it´s response directly back to the original JMSReplyTo.
================http://camel.apache.org/jms.html
About using Camel to send and receive messages and JMSReplyTo
The JMS component is complex and you have to pay close attention to how it works in some cases. So this is a short summary of some of the areas/pitfalls to look for.
When Camel sends a message using its JMSProducer
, it checks the following conditions:
- The message exchange pattern,
- Whether a
JMSReplyTo
was set in the endpoint or in the message headers, - Whether any of the following options have been set on the JMS endpoint:
,
disableReplyTopreserveMessageQos
,explicitQosEnabled
.
All this can be a tad complex to understand and configure to support your use case.
JmsProducer
The JmsProducer
behaves as follows, depending on configuration:
Exchange Pattern |
Other options |
Description |
---|---|---|
InOut |
- |
Camel will expect a reply, set a temporary |
InOut |
|
Camel will expect a reply and, after sending the message, it will start to listen for the reply message on the specified |
InOnly |
- |
Camel will send the message and not expect a reply. |
InOnly |
|
By default, Camel discards the |
JmsConsumer
The JmsConsumer
behaves as follows, depending on configuration:
Exchange Pattern |
Other options |
Description |
---|---|---|
InOut |
- |
Camel will send the reply back to the |
InOnly |
- |
Camel will not send a reply back, as the pattern is InOnly. |
- |
|
This option suppresses replies. |
So pay attention to the message exchange pattern set on your exchanges.
If you send a message to a JMS destination in the middle of your route you can specify the exchange pattern to use, see more at
Request Reply.
This is useful if you want to send an InOnly
message to a JMS topic:
|
Request-reply over JMS
Camel supports
Request Reply over JMS. In essence the MEP of the Exchange should be InOut
when you send a message to a JMS queue.
Camel offers a number of options to configure request/reply over JMS that influence performance and clustered environments. The table below summaries the options.
Option |
Performance |
Cluster |
Description |
---|---|---|---|
|
Fast |
Yes |
A temporary queue is used as reply queue, and automatic created by Camel. To use this do |
|
Slow |
Yes |
A shared persistent queue is used as reply queue. The queue must be created beforehand, although some brokers can create them on the fly such as Apache ActiveMQ. To use this you must specify the replyTo queue name. And you can optionally configure |
|
Fast |
No (*Yes) |
An exclusive persistent queue is used as reply queue. The queue must be created beforehand, although some brokers can create them on the fly such as Apache ActiveMQ. To use this you must specify the replyTo queue name. And you |
|
Fast |
Yes |
Camel 2.10.3: Allows to process reply messages concurrently using concurrent message listeners in use. You can specify a range using the |
|
Fast |
Yes |
Camel 2.10.3: Allows to process reply messages concurrently using concurrent message listeners in use. You can specify a range using the |
The JmsProducer
detects the InOut
and provides a
header with the reply destination to be used. By default Camel uses a temporary queue, but you can use the
JMSReplyTo
replyTo
option on the endpoint to specify a fixed reply queue (see more below about fixed reply queue).
Camel will automatic setup a consumer which listen on the reply queue, so you should
not do anything.
This consumer is a Spring DefaultMessageListenerContainer
which listen for replies. However it's fixed to 1 concurrent consumer.
That means replies will be processed in sequence as there are only 1 thread to process the replies. If you want to process replies faster, then we need to use concurrency. But
not using the concurrentConsumer
option. We should use the
threads
from the Camel DSL instead, as shown in the route below:
from(xxx) .inOut().to("activemq:queue:foo") .threads( 5 ) .to(yyy) .to(zzz); |
In this route we instruct Camel to route replies
asynchronously using a thread pool with 5 threads.
From Camel 2.10.3 onwards you can now configure the listener to use concurrent threads using the
concurrentConsumers
and maxConcurrentConsumers
options. This allows you to easier configure this in Camel as shown below:
from(xxx) .inOut().to("activemq:queue:foo?concurrentConsumers= 5 ") .to(yyy) .to(zzz); |
Request-reply over JMS and using a shared fixed reply queue
If you use a fixed reply queue when doing
Request Reply over JMS as shown in the example below, then pay attention.
from(xxx) .inOut().to("activemq:queue:foo?replyTo=bar") .to(yyy) |
In this example the fixed reply queue named "bar" is used. By default Camel assumes the queue is shared when using fixed reply queues, and therefore it uses a
JMSSelector
to only pickup the expected reply messages (eg based on the
JMSCorrelationID
). See next section for exclusive fixed reply queues. That means its not as fast as temporary queues. You can speedup how often Camel will pull for reply messages using the
receiveTimeout
option. By default its 1000 millis. So to make it faster you can set it to 250 millis to pull 4 times per second as shown:
from(xxx) .inOut().to("activemq:queue:foo?replyTo=bar&receiveTimeout= 250 ") .to(yyy) |
Notice this will cause the Camel to send pull requests to the message broker more frequent, and thus require more network traffic.
It is generally recommended to use temporary queues if possible.
Request-reply over JMS and using an exclusive fixed reply queue
Available as of Camel 2.9
In the previous example, Camel would anticipate the fixed reply queue named "bar" was shared, and thus it uses a
JMSSelector
to only consume reply messages which it expects. However there is a drawback doing this as JMS selectos is slower. Also the consumer on the reply queue is slower to update with new JMS selector ids. In fact it only updates when the
receiveTimeout
option times out, which by default is 1 second. So in theory the reply messages could take up till about 1 sec to be detected. On the other hand if the fixed reply queue is exclusive to the Camel reply consumer, then we can avoid
using the JMS selectors, and thus be more performant. In fact as fast as using temporary queues. So in
Camel 2.9 onwards we introduced the ReplyToType
option which you can configure to
Exclusive
to tell Camel that the reply queue is exclusive as shown in the example below:
from(xxx) .inOut().to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive") .to(yyy) |
Mind that the queue must be exclusive to each and every endpoint. So if you have two routes, then they each need an unique reply queue as shown in the next example:
from(xxx) .inOut().to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive") .to(yyy) from(aaa) .inOut().to("activemq:queue:order?replyTo=order.reply&replyToType=Exclusive") .to(bbb) |
The same applies if you run in a clustered environment. Then each node in the cluster must use an unique reply queue name. As otherwise each node in the cluster may pickup messages which was intended as a reply on another node. For clustered environments
its recommended to use shared reply queues instead.