如何使用 Java 将 JMS 消息排队到 Oracle AQ
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/20497391/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to enqueue a JMS message into Oracle AQ using Java
提问by Humungus
I have an Oracle AQ with the queue type of SYS.AQ$_JMS_TEXT_MESSAGE. What I'm trying to do is to insert a text into the mentioned queue from a java application.
我有一个队列类型为 SYS.AQ$_JMS_TEXT_MESSAGE 的 Oracle AQ。我想要做的是从 Java 应用程序将文本插入到提到的队列中。
The equivalent SQL query is
等效的 SQL 查询是
declare
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload SYS.AQ$_JMS_TEXT_MESSAGE;
begin
o_payload := sys.aq$_jms_text_message.construct;
o_payload.set_text(xmltype('<user>text</user>').getClobVal());
sys.dbms_aq.enqueue (
queue_name => 'QUEUE_NAME',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
commit;
end;
/
I got most of it right using this guide, but I'm stuck at
我使用本指南大部分都正确,但我被困在
o_payload := sys.aq$_jms_text_message.construct;
o_payload.set_text(xmltype('<user>text</user>').getClobVal());
The guide shows how to enqueue a RAW message, but I need it to be JMS, otherwise the data type doesn't match the queue type.
该指南显示了如何将 RAW 消息入队,但我需要它是 JMS,否则数据类型与队列类型不匹配。
Any help would be appreciated, because even with the almighty google I am not able to find a solution to this problem. Is there a way to do it using the oracle.jdbc.aq
classes, or do I just have to suck it up and use the SQL query?
任何帮助将不胜感激,因为即使使用全能的谷歌,我也无法找到解决此问题的方法。有没有办法使用这些oracle.jdbc.aq
类来做到这一点,还是我只需要把它搞定并使用 SQL 查询?
采纳答案by Chathura Kulasinghe
Just copy paste this code and try. (if it works for you) Then carefully go through the code, and understand.
只需复制粘贴此代码并尝试。(如果它适合你)然后仔细阅读代码,并理解。
While executing,
在执行时,
- First uncomment the 'createQueue()' line in the main method.
- 首先取消注释main 方法中的“ createQueue()”行。
after that,
在那之后,
- Comment it and uncomment 'sendMessage()' line and try sending your message.
- 注释并取消注释“ sendMessage()”行并尝试发送您的消息。
Then comment/uncomment each line respectively and give a try.
然后分别注释/取消注释每一行并尝试。
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.TextMessage;
import oracle.AQ.AQQueueTable;
import oracle.AQ.AQQueueTableProperty;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsDestinationProperty;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
public class OracleAQClient {
public static QueueConnection getConnection() {
String hostname = "localhost";
String oracle_sid = "xe";
int portno = 1521;
String userName = "jmsuser";
String password = "jmsuser";
String driver = "thin";
QueueConnectionFactory QFac = null;
QueueConnection QCon = null;
try {
// get connection factory , not going through JNDI here
QFac = AQjmsFactory.getQueueConnectionFactory(hostname, oracle_sid, portno, driver);
// create connection
QCon = QFac.createQueueConnection(userName, password);
} catch (Exception e) {
e.printStackTrace();
}
return QCon;
}
public static void createQueue(String user, String qTable, String queueName) {
try {
/* Create Queue Tables */
System.out.println("Creating Queue Table...");
QueueConnection QCon = getConnection();
Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
AQQueueTableProperty qt_prop;
AQQueueTable q_table = null;
AQjmsDestinationProperty dest_prop;
Queue queue = null;
qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_TEXT_MESSAGE");
q_table = ((AQjmsSession) session).createQueueTable(user, qTable, qt_prop);
System.out.println("Qtable created");
dest_prop = new AQjmsDestinationProperty();
/* create a queue */
queue = ((AQjmsSession) session).createQueue(q_table, queueName, dest_prop);
System.out.println("Queue created");
/* start the queue */
((AQjmsDestination) queue).start(session, true, true);
} catch (Exception e) {
e.printStackTrace();
return;
}
}
public static void sendMessage(String user, String queueName,String message) {
try {
QueueConnection QCon = getConnection();
Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
QCon.start();
Queue queue = ((AQjmsSession) session).getQueue(user, queueName);
MessageProducer producer = session.createProducer(queue);
TextMessage tMsg = session.createTextMessage(message);
//set properties to msg since axis2 needs this parameters to find the operation
tMsg.setStringProperty("SOAPAction", "getQuote");
producer.send(tMsg);
System.out.println("Sent message = " + tMsg.getText());
session.close();
producer.close();
QCon.close();
} catch (JMSException e) {
e.printStackTrace();
return;
}
}
public static void browseMessage(String user, String queueName) {
Queue queue;
try {
QueueConnection QCon = getConnection();
Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
QCon.start();
queue = ((AQjmsSession) session).getQueue(user, queueName);
QueueBrowser browser = session.createBrowser(queue);
Enumeration enu = browser.getEnumeration();
List list = new ArrayList();
while (enu.hasMoreElements()) {
TextMessage message = (TextMessage) enu.nextElement();
list.add(message.getText());
}
for (int i = 0; i < list.size(); i++) {
System.out.println("Browsed msg " + list.get(i));
}
browser.close();
session.close();
QCon.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void consumeMessage(String user, String queueName) {
Queue queue;
try {
QueueConnection QCon = getConnection();
Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
QCon.start();
queue = ((AQjmsSession) session).getQueue(user, queueName);
MessageConsumer consumer = session.createConsumer(queue);
TextMessage msg = (TextMessage) consumer.receive();
System.out.println("MESSAGE RECEIVED " + msg.getText());
consumer.close();
session.close();
QCon.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String args[]) {
String userName = "jmsuser";
String queue = "sample_aq";
String qTable = "sample_aqtbl";
//createQueue(userName, qTable, queue);
//sendMessage(userName, queue,"<user>text</user>");
//browseMessage(userName, queue);
//consumeMessage(userName, queue);
}
}
}
You will need to copy these jars/libs to your java project from your oracle DB setup directory
您需要将这些 jars/libs 从您的 oracle DB 安装目录复制到您的 java 项目
- ojdbc6.jar
- jta.jar
- jmscommon.jar
- aqapi.jar
- ojdbc6.jar
- jar文件
- jmscommon.jar
- aqapi.jar
The credits should go to Ratha for this article[1]. There were few stuff to be amended, I just modified those and provided the code.
本文的功劳应该归功于 Ratha [1]。要修改的东西很少,我只是修改了它们并提供了代码。
[1] http://wso2.com/library/tutorials/2011/11/configuring-wso2-esb-with-oracle-as-messaging-media/
[1] http://wso2.com/library/tutorials/2011/11/configuring-wso2-esb-with-oracle-as-messaging-media/
Thanks
谢谢
回答by Steve S.
I will add some tidbits to the answer of @Chathura Kulasinghe.
我会在@Chathura Kulasinghe 的回答中添加一些花絮。
First, in the consumeMessage method, using the
首先,在consumeMessage方法中,使用
Session.CLIENT_ACKNOWLEDGE
Session.CLIENT_ACKNOWLEDGE
parameter for creating the session object will have the effect of leaving the message you consume in the queue. If you run this program many times, you will see the number of message going up in the database table of the queue. To remove a message, you need to ? acknowledge ? it by calling this method on the message object:
用于创建会话对象的参数将具有将您消费的消息留在队列中的效果。如果您多次运行此程序,您将看到队列的数据库表中的消息数量增加。要删除消息,您需要 ? 承认 ?它通过在消息对象上调用此方法:
msg.acknowledge();
msg.acknowledge();
Second, if you want the session do this for you, simply change the client acknowledge mode to :
其次,如果您希望会话为您执行此操作,只需将客户端确认模式更改为:
Session.AUTO_ACKNOWLEDGE
会话.AUTO_ACKNOWLEDGE
With this parameter, everytime your consumer.receive() is call, it's auto acknowledge and so, removed from the queue.
使用此参数,每次调用您的 consumer.receive() 时,它都会自动确认,因此会从队列中删除。