Overview
Dynamo's messaging model uses components MessageSources to send
messages to pre-registered topics. So set this up we need to do 4 more things
- Create a JMS topic in OpenJMS
- Write a MessageSource component
- Configure the new MessageSource to send messages to the topic via the patchbay
- Check that the message made it to the topic
Creating a Topic in OpenJMS
I'm going to use the Admin Tool to create the topic, we'll call the topic TestDynamoTopic
and add a durable subscription TestDynamoSub
 Writing
a MessageSource Component
To keep things simple our MessageSource component will have a public method
called sendOneMessage() which can be invoked from the ACC or the Dynamo Admin
UI. package imagescript.atg.openjms;
import atg.nucleus.GenericService; // For logging methods
import atg.dms.patchbay.*; // For DMS Interfaces
import javax.jms.*; // Message and javax.jms.JMSException
public class MessageSource
extends GenericService
implements atg.dms.patchbay.MessageSource
{
//////////////////////////////////////////////////////////////////////////
//
// This piece of code is boilerplate implementation for MessageSource
//
MessageSourceContext mContext = null;
public void setMessageSourceContext(MessageSourceContext p0)
{
logDebug("Setting MessageSourceContext");
mContext = p0;
}
boolean mbStarted = false;
public void startMessageSource()
{
logDebug("startMessageSource");
mbStarted = true;
}
public void stopMessageSource()
{
logDebug("stopMessageSource");
mbStarted = false;
}
//
//////////////////////////////////////////////////////////////////////////
// You can have multiple ports, the XML file supplied with this demo
// only defines one, though!
//
// If a message source uses only one output port,
// that port should be called DEFAULT
//
static final String PORTNAME = "DEFAULT"; // Output port name
// Expose this so you can play with the port name at run time
//
String mPortName = PORTNAME;
public void setPortName(String strPortName)
{
mPortName = strPortName;
}
public String getPortName()
{
return mPortName;
}
static final String JMSTYPE = "DemoMessage.Message"; // Output port name
// Expose this so you can play with the port name at run time
//
String mJMSType = JMSTYPE;
public void setJMSType(String strJMSType)
{
mJMSType = strJMSType;
}
public String getJMSType()
{
return mJMSType;
}
public void sendOneMessage()
{
String strStatus = "nothing";
logDebug("Start : sendOneMessage()");
if(mbStarted && (mContext != null))
{
try
{
String strMessage = "Test Message " + Math.random();
TextMessage msg = mContext.createTextMessage(mPortName);
msg.setJMSType(mJMSType);
msg.setText(strMessage);
logDebug("Sending Message");
mContext.sendMessage(mPortName,msg);
logDebug("Sent (" + strMessage + ") as " + mJMSType + " on " + mPortName);
strStatus = strMessage;
}
catch( javax.jms.JMSException e)
{
strStatus = e.getMessage();
}
catch(Exception e2)
{
logError(e2);
}
}
else
{
strStatus = "Not started or no context context=(" + mContext + ")";
}
logDebug("Done : sendOneMessage() : " + strStatus);
mLastMessageSent = strStatus;
}
String mLastMessageSent;
public String getLastMessageSent()
{
return mLastMessageSent;
}
public MessageSource() { ; }
}Compiling this up adds the MessageSource.class file to the right place
under the module's classes folder.
Now we need to create the component /OpenJMS/TestSource based on our new
class, again I'll use the ACC. This creates the following TestSource.properties
file in the config/OpenJMS folder.
# /OpenJMS/TestSource
#Fri May 31 16:31:15 PDT 2002
$class=imagescript.atg.openjms.MessageSource
$scope=global
loggingDebug=true
Now we need to edit the patchbay to configure this component to send it's
messages to the OpenJMS topic TestDynamoTopic. We do this by editing the
dynamoMessagingSystem.xml again. This time we are adding a message-source
section; inside the <patchbay> tags but after the </provider> tag. <?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE dynamo-message-system
SYSTEM
"http://www.atg.com/dtds/patchbay/patchbay_1.0.dtd">
<dynamo-message-system>
<patchbay>
<!-- local JMS provider -->
<provider>
<provider-name>
openjms
</provider-name>
<topic-connection-factory-name>
JmsTopicConnectionFactory
</topic-connection-factory-name>
<queue-connection-factory-name>
JmsQueueConnectionFactory
</queue-connection-factory-name>
<supports-transactions>
true
</supports-transactions>
<supports-xa-transactions>
false
</supports-xa-transactions>
<initial-context-factory>
/OpenJMS/InitialContextFactory
</initial-context-factory>
</provider>
<message-source>
<nucleus-name>/OpenJMS/TestSource</nucleus-name>
<output-port>
<port-name>DEFAULT</port-name>
<output-destination>
<provider-name>openjms</provider-name>
<destination-name>TestDynamoTopic</destination-name>
<destination-type>Topic</destination-type>
</output-destination>
</output-port>
</message-source>
</patchbay>
</dynamo-message-system>
Testing the connection
Restart Dynamo checking:
- That we don't see any errors on startup
- That any debug messages we are expecting show up
- That the /OpenJMS/TestSource component gets started automatically; check
the red dot is next to the component in the ACC - You shouldn't have to
start the component to get it.
Right now I'm a very happy camper because I got to see my debug messages: **** debug ... /OpenJMS/InitialContextFactory createInitialContext()
**** debug ... /OpenJMS/InitialContextFactory pProviderName = openjms
**** debug ... /OpenJMS/InitialContextFactory createInitialContext()
**** debug ... /OpenJMS/InitialContextFactory pProviderName = openjms
**** debug ... /OpenJMS/TestSource Setting MessageSourceContext
**** debug ... /OpenJMS/TestSource startMessageSource
Of course there were no errors at all to get to that point ;) The main ones
were:
- Checking the JNDI names getting the port numbers right
- Working out that the OpenJMS JNDI lookup just needed a string with no
slashes or prefixes
- Making sure all the client libraries were on the classpath
Sending the Message
I can test my code by bringing up the TestSource component in the ACC
Component Editor and clicking sendOneMessage() on the methods tab...
Uh-oh NullPointerException! It seems that
atg.dms.patchbay.PooledSession.getTopicPublisher passes in a null topic to
create the pooled session objects and OpenJMS doesn't like that ;) Start : sendOneMessage()
Sending Message
--- java.lang.NullPointerException
at org.exolab.jms.client.JmsTopicSession.createPublisher(Unknown Source)
at atg.dms.patchbay.PooledSession.getTopicPublisher(PooledSession.java:197)
at atg.dms.patchbay.OutputDestination.sendMessage(OutputDestination.java:174)
at atg.dms.patchbay.OutputPort.sendMessage(OutputPort.java:138)
at atg.dms.patchbay.ElementManager.sendMessage(ElementManager.java:289)
at imagescript.atg.openjms.MessageSource.sendOneMessage(MessageSource.java:85)
Done : sendOneMessage() : nothing Perhaps we can patch the OpenJMS
implementation? ....
So here's the fix we need to check that topic is not null in
org.exolab.jms.client.JmsTopicSession.createPublisher().
public synchronized TopicPublisher createPublisher(Topic topic)
throws JMSException
{
if(isClosed())
throw new javax.jms.IllegalStateException("Calling method on closed session");
if((topic != null) && ((JmsTopic)topic).isWildCard())
{
throw new JMSException("Cannot create a publisher using a wildcard topic");
} else {
JmsTopicPublisher jmstopicpublisher
= new JmsTopicPublisher(this, (JmsTopic)topic);
addPublisher(jmstopicpublisher);
return jmstopicpublisher;
}
}
To get this fix to work recompile the code with the fix and place it under
the module's classes directory. There's no need to repackage the jar since this
gets picked up first on the CLASSPATH.
So we try again starting the consumer after restarting ATG
**** debug /OpenJMS/TestSource Sending Message
**** debug /OpenJMS/TestSource Sent (Test Message 0.7272302981248917) as
DemoMessage.Message on DEFAULT
**** debug /OpenJMS/TestSource Done : sendOneMessage() :
Test Message 0.7272302981248917
This is what the consumer seesoC:\java\openjms-0.7.2>set JAVA_HOME=c:\jdk1.3.1
C:\java\openjms-0.7.2>set OPENJMS_HOME=C:\java\openjms-0.7.2
Starting Consumer
examples client.console.SimpleConsumer -topic TestDynamoTopic
Using provider url rmi://localhost:1099/JndiServer
Test Message 0.7272302981248917, JMSDeliveryMode=NON_PERSISTENT, Priority=0,
JMSMessageID=ID:6211358353979164532
All's well! - Well mostly, there is a problem because the MessagingManager is
pooling the connections and when the OpenJMS crashes the sessions in the pool
are lost but the pool does not account for such a loss and starts to error out
on every message sent to OpenJMS. With a java.net.ConnectionException veiled in
a JMSException.
Looking at the code this code path is only executed when the session is
transacted so I rebooted after marking the openjms provider as NOT supporting
transactions in the patchbay. That cleaned up the error a bit but still did not
cause anyone to recreate the sessions. :(
OK at this point we have shown how to integrate a third party JMS into ATG
Dynamo's patchbay.
In the future we'll look at making this more robust.
Though I think this may always be a bit of a hack. The patchbay could check that
the session is alive before sending the messages and could try to repopulate the
pool if they get disconnected, I'm not sure what the JMS spec says about this,
it may simply be that session pooling of JMSSessions is not valid with external
JMS providers. I think pehaps that the JMS session pool in the MessagingManager
would need to be updated to handle dropped sessions
|