Listen to heartbeats using JMS
Extend heartbeats using JMS publish and subscribe
Heartbeats, a common software construct, verify the continual operation of a specific component or service. With heartbeats, a targeted service continually broadcasts a signal across its environment. You can assume your system works normally when your client services can detect a targeted service’s heartbeat signals. Meanwhile, if the critical heartbeat signal ceases, each component knows how to react.
As a heartbeat example, consider a backend market-price publisher that publishes realtime prices to other services on the network, as Figure 1 shows. Because a pause in price publishing could indicate either a legitimate market pause or a system problem resulting in stale price information, it’s critical that other services receive proper system status information.
To avoid stale price information, the publisher periodically issues a heartbeat signal to the client services, keeping them informed of its status. If the heartbeat signal ends, the system’s customer inquiry module might warn the customer of the stale price information, but will resume shortly. Meanwhile, the application’s risk management system would suspend other transactions until the publisher resumes publishing current prices.
Background
I recently used a similar setup with a series of interdependent components such that, if one failed, the others received notification. Because the system employed JMS (Java Message Service) as a core component, I used JMS messaging as a way to transmit heartbeats between components.
However, at the project’s start, I didn’t realize the critical nature of the interdependencies, so a proprietary heartbeat, implemented differently each time, supported each relationship. Figure 2 shows the result.
All right, I exaggerate a bit. However, that would have been the result had I not implemented a different system.
In many systems, particularly high-availability operating systems, developers implement heartbeats using multicast — suitable for broadcasting an “I’m still here!” signal, but not necessarily for targeting a specific process or consumer.
However, a JMS-based heartbeat system, as Figure 3 shows, proves ideal for periodically determining whether the overall system — the network, the JMS server, the JVM, and so on — remains in operation. Moreover, with a JMS-based solution, you can test the availability of other resources (see the “Dependent Publishers” section below).
With my JMS-based solution, each critical component publishes a heartbeat to an exclusive known topic, and any interested client components receive the heartbeats by subscribing to that topic. Further, the publisher need not know the subscribers’ nature or number. By choosing a JMS-based approach, my solution became simple, portable, reusable, and scalable.
The heartbeat class library’s functional components
To keep things simple and to promote reuse, I’ve supplied several base components.
Heartbeat publishers
A heartbeat publisher transmits a heartbeat message to a predetermined topic every NP seconds. NP, known as the publisher period, is the published heartbeat’s frequency. The publisher has the following components:
- A connected
TopicConnection
,TopicSession
,Topic
, andTopicPublisher
. - A thread that sleeps for NP seconds, awakens, publishes a message, then sleeps again.
- An exception handler that generates an appropriate alert if the message publication fails.
The architecture features two heartbeat-publisher categories:
- Independent publishers: Publish messages every NP seconds as long as the assigned thread can run and a good connection exists to the JMS supplier. The heartbeat’s requirement is basic, but the subscribers are limited by what they can infer from a successful heartbeat. The inference is limited to knowing that the JVM is reasonably healthy and still runs.
- Dependent publishers: Execute a system test, and, if successful, publish a heartbeat. Test examples include connecting to a database, making an IIOP (Internet Inter-ORB Protocol) connection, testing for a file’s existence, or receiving a heartbeat receipt from another message system, such as TIBCO or MQSeries. Subscribers can make specific inferences from heartbeats they receive from dependent publishers.
Heartbeat subscribers
A heartbeat subscriber listens for heartbeats on a predetermined topic. If a heartbeat message is not received every n seconds, an event defined by a subscriber listener fires.
Heartbeat event listeners
The heartbeat publisher and subscriber both possess event listener interfaces. You can register the objects that implement the listener interfaces with each component. The objects receive notification when a specific event occurs. You can design the event listeners to execute any action upon receiving notification of a heartbeat event. Actions could range from “do nothing” to “turn on a display light” to “send an email.”
The listeners are alerted to the following events:
Publisher listeners
- Start: Raised when the publisher first starts sending heartbeats. Start events represent a good place to initialize any resources whose lifetimes link to the heartbeat session.
- Stop: Raised when the publisher stops sending heartbeats. Accordingly, stop events serve as good clean-up places following a heartbeat session.
- Sent: Raised every time a heartbeat publishes. I usually do not implement any code to respond to the sent event, except perhaps a high-verbosity log statement. You, however, may find it useful.
- Exception: Raised whenever the publisher throws an exception. The exception itself also passes so the event handler can properly handle the event.
Subscriber listeners
- Start: Raised when the subscriber starts and expects heartbeats from the publisher. Once this event occurs, the subscriber expects heartbeats and raises failure events if they do not arrive on schedule.
- Stop: Raised when the subscriber stops and no longer expects heartbeats from the publisher. The subscriber issues no further events.
- Received: Raised each time the subscriber receives a heartbeat from the publisher. In this scenario, your heartbeat-subscriber process constantly verges on shutting down, but the heartbeat’s receipt pulls your process back from the edge. However, the event handler typically does nothing, since the heartbeat subscriber implementation detects the failure for you.
- Failed: Raised if the subscriber’s period expires and the subscriber has not received a heartbeat. For example, if the period is 5 seconds, and 5 seconds elapses without a heartbeat, the failed event handler might display a warning message. The failed event handler also receives the number of failures since the last successfully received heartbeat, allowing the handler to optionally execute the failure event handling code the first time (or, for example, every 10th time) a failure occurs.
- Resumed: After one or more failures, a heartbeat’s receipt raises both a generic received event and a more specialized resumed event. The resumed event indicates that the subscriber was previously in a failed state, but has now resumed its heartbeat session. The event handler may need to resume certain activity once the session has been re-established, because it may have stopped that activity on the first failure. However, it stands to reason that you may not want to execute this activity each time a heartbeat is received.
- Exception: Raised whenever a subscriber throws an exception. The exception itself also passes, so the event handler can properly handle the event. I typically implement a
javax.jms.ExceptionListener
in the subscriber, and when the JMS connection throws an exception, the subscriber simply passes it to the subscriber listener.
Figure 4 shows how our heartbeat architecture now looks.
Implementation review
In our implementation, the base definition IHeartbeatPublisher
and IHeartbeatSubscriber
interfaces define simple behavior. In this JMS-specific implementation, because the subscriber and publisher had so much common functionality, I built most of the utility functionality into the abstract HeartbeatBase
class. Consequently, HeartbeatPublisher
‘s and HeartbeatSubscriber
‘s concrete implementations both extend HeartbeatBase
; they also implement their respective constituent interfaces, as Figure 5 shows.
Let’s review the highlights from these classes.
HeartbeatBase
The HeartbeatBase
provides the following constructs:
Constructors
I’ve implemented the constructors for both the subscriber and the publisher here because both need:
- A topic connection.
- A topic name: The publisher publishes heartbeats to this topic and the subscriber subscribes to it.
- A period: A publisher publishes a heartbeat on a frequency defined as the period, expressed in milliseconds (ms). A subscriber expects to receive a heartbeat every
ms, or else the heartbeat fails.
The two constructors accept a TopicConnectionFactory
or a TopicConnection
, respectively. Both constructors initialize all JMS resources, then invoke the init()
abstract method, which invokes the concrete child class’s method to involve the concrete class-specific JMS resource initialization:
public HeartbeatBase(TopicConnectionFactory topicConnectionFactory,
String topicName, long period, String message ) throws
HeartbeatInitializationException
public HeartbeatBase( TopicConnection topicConnection, String topicName,
long period, String message ) throws HeartbeatInitializationException
Listener registration
The HeartbeatBase
registers and unregisters subscriber or publisher listeners. The concrete classes must then fire the events in the listening objects:
public void registerListener(IHeartbeatEventListener listener)
throws InvalidEventListener
public void unregisterListener(IHeartbeatEventListener listener)
Base destruction
All JMS resources defined in the base class are closed.
Thread pooling
Thread pooling resources handle asynchronous heartbeat-event invocation.
Heartbeat process control
The HeartbeatBase
determines when to stop, start, pause, or resume heartbeat events.
public void startHeartbeatProcess()
public void stopHeartbeatProcess()
public void setMessagesPaused(boolean working)
Register shutdown hook
The base class defines the regShutdownHook()
abstract method. Its concrete implementation does not perform any operation, but you can optionally define a shutdown hook as supported by JDK 1.3’s java.lang.Runtime
class. That lets you create a runnable that runs when the JVM receives a shutdown signal. Some JMS implementations fuss about resources being closed, and closed in the right order, before the JVM shuts down; by defining a shutdown hook, you can ensure that all resources are cleaned properly. I have had mixed results using the shutdown hook, with behavior differing according to JVM version and platform. You can see the shutdown implementation in the code, where it’s disabled.
HeartbeatPublisher
The HeartbeatPublisher
implementation publishes a heartbeat message every period. The publication may depend on a predefined test executed before every publication. The implementation must define actions for each event a publisher can fire:
public void fireHeartbeatPublisherException(Exception erx)
public void fireHeartbeatPublisherStarted()
public void fireHeartbeatPublisherStopped()
public void fireHeartbeatSent()
Because HeartbeatBase
extends java.lang.Thread
, I implemented the publisher’s main event loop in a public void run()
method:
1. public void run() {
2. try {
3. TextMessage tmessage = topicSession.createTextMessage();
4. tmessage.setText(topicName + " HEARTBEAT");
5. fireHeartbeatPublisherStarted();
6. while(run) {
7. Thread.sleep(period);
8. topicPublisher.setTimeToLive(timeToLive);
9. if(process){
10. if(fireDependency()) {
11. topicPublisher.publish(tmessage);
12. fireHeartbeatSent();
13. }
14. }
15. }
16 fireHeartbeatPublisherStopped();
17. shutdown();
18. } catch (Exception erx) {
19. if(run) fireHeartbeatPublisherException(erx);
20. else fireHeartbeatPublisherStopped();
21. shutdown();
22. }
23. }
Line 4’s code creates a recognizable message. When the publisher starts, it informs the listeners in line 5, then starts a loop. The loop sleeps for the publisher period, and if line 9’s code determines that messages are not paused, the dependency check fires in line 10. If this returns a true
value, then the heartbeat publishes (line 11) and line 12’s code informs the publisher listeners of the publication. When the loop ends, the publisher notifies the listeners (line 16) and shuts down resources (line 17). If an exception throws in the loop, the listeners receive notification (line 19).
The publisher interface also defines the following methods:
public void setDependentParameters(Object[] parameters) throws InvalidDependentParameters
public boolean fireDependency()
If the publisher is independent, these methods do nothing. However, if the publisher is dependent, then you must override these methods to achieve the dependency implementation. The subscriber setup calls the setDependentParameters()
method to pass the parameters necessary for the subscriber to execute its dependency. Subsequently, as a precondition to each heartbeat being published, the publisher executes the dependency defined in the fireDependency()
method. This method doesn’t throw any exceptions, but it does return a Boolean indicating the dependency’s success or failure. The heartbeat only publishes if the dependency succeeds. You’ll see a dependent publisher example later in this article.
In contrast, for an independent publisher, setDependentParameters()
does nothing, while fireDependency()
always returns a true
value:
public void setDependentParameters(Object[] parameters)
throws InvalidDependentParameters{
}
public boolean fireDependency() {
return true;
}
HeartbeatSubscriber
The HeartbeatSubscriber
implementation listens on the predefined topic for heartbeat messages. If it does not receive a heartbeat within the defined period, the heartbeat session should fail. The implementation must define actions for each event a publisher can fire:
public void fireHeartbeatSubscriberStarted()
public void fireHeartbeatSubscriberStopped()
public void fireHeartbeatFailed()
public void fireHeartbeatReceived()
public void fireHeartbeatResumed()
public void fireHeartbeatSubscriberException(Exception erx)
The subscriber implementation serves two main functions. First, the main event loop resembles the publisher in that you simply implement the run()
method. In the subscriber’s case, the loop checks that a heartbeat has been received every period or fails the session. In our implementation, the thread sleeps for the period, wakes up, determines whether or not it has received a heartbeat, and acts accordingly. The subscriber has a tick — a Boolean that indicates if a heartbeat has been received. So, during one period, the thread sets the tick to a false
value, then sleeps. When it awakes, if the tick is true
, it knows a heartbeat has been received in the interim:
1. public void run() {
2. try {
3. setTick(false);
4. topicSubscriber.setMessageListener(this);
5. topicConnection.start();
6. fireHeartbeatSubscriberStarted();
7. while(run) {
8. this.sleep(period);
9. if(run && process) {
10. if(!isTick()) { // Heartbeat failed
11. failTime = System.currentTimeMillis();
12. state = true;
13. fTickCount++;
14. fireHeartbeatFailed();
15. } else { // Heartbeat received
16. setTick(false);
17. state = false;
18. }
19. }
20. }
21. fireHeartbeatSubscriberStopped();
22. shutdown();
23. } catch (Exception erx) {
24. if(run)fireHeartbeatSubscriberException(erx);
25. else fireHeartbeatSubscriberStopped();
26. shutdown();
27. }
28. }
When the thread starts, the code sets the tick to false
in line 3. A message listener registers in line 4, then in line 5, the topic subscription-connection commences the message flow. In line 6, the subscriber listeners receive notification of the subscriber startup. The thread sleeps for the defined period in line 8; when it awakes, if the subscriber has not paused or stopped, line 10’s code checks the tick. If a heartbeat has not been received, line 11’s code notes the failure time, line 12’s code sets the failure state, line 13’s code increments the failure count, and, lastly, line 14’s code notifies the subscriber’s listeners of the failure. If the tick is true
, line 16’s code sets it to false
, and line 17’s code sets the failure state to false.
At some point, the thread exits the run loop ending at line 20. The subscriber then notifies the listeners in line 21 and cleans up resources in line 22. If an exception throws in the loop, line 24’s code notifies listeners about the exception. Line 26’s code then cleans up resources. Although the run loop checks the tick, the onMessage()
method — automatically invoked when the topic receives a JMS message — sets it:
1. public void onMessage(Message message) {
2. try {
3. if(message instanceof TextMessage) {
4. if(((TextMessage)message).getText().equals(topicName + " HEARTBEAT")) {
5. setTick(true);
6. if(state) { // Recovering from heartbeat failure
7. state = false;
8. resumeTime = System.currentTimeMillis() - failTime;
9. fireHeartbeatResumed();
10. fTickCount=0;
11. }
12. fireHeartbeatReceived();
13. }
14. }
15. } catch (Exception erx) {
16. setTick(false);
17. fireHeartbeatSubscriberException(erx);
18. }
19. }
The onMessage()
method automatically invokes in a separate thread when the subscriber from the JMS server on the designated topic receives a message. Lines 3 and 4 determine if the message is a valid heartbeat. In this case, the message type and the text’s content must replicate the published publisher (see lines 3 and 4 of the run()
method in the HeartbeatPublisher
).
If the message is a valid heartbeat, line 5 sets the tick to true
. If the subscriber was formerly in a failed state, but is not now, the heartbeat has resumed. A resumed heartbeat indicates an important event, because it can trigger special events that achieve the opposite of a failure. If that’s the case, the resume timestamp records, and line 9’s code notifies the resumption to the subscriber listeners. Line 10’s code sets the failure count back to 0. Line 12’s code then notifies the listeners of a received heartbeat.
If the onMessage()
method throws an exception, the code in lines 16 and 17 sets the tick to false
and notifies the listeners of the exception.
Note: The heartbeat’s payload can contain any runtime data or diagnostic information useful to the subscriber.
Heartbeat event listeners
Classes interested in the individual publishers’ and subscribers’ activities implement the heartbeat event listener interfaces IHeartbeatSubscriberListener
and IHeartbeatPublisherListener
. Figure 6’s UML diagram describes the interfaces and two corresponding basic implementations.
IHeartbeatSubscriberListener
The subscriber listener defines the following methods, which the subscriber fires at the appropriate event:
public void notifyHeartbeatFailedEvent(int ticks):
Fired when the heartbeat fails and passes the number of consecutive failures as an argument into the implementing method. The failure count resets to 0 when the heartbeat resumes.public void notifyHeartbeatReceived():
Fired when a heartbeat is received.public void notifyHeartbeatSubscriberStopped():
Fired when the subscriber stops.public void notifyHeartbeatSubscriberStarted():
Fired when the subscriber starts.public void notifyHeartbeatSubscriberException(Exception erx,HeartbeatBase heartbeat):
Fired when the subscriber throws an exception. Both the exception and the heartbeat base that threw the exception pass as arguments to the implementing method. The base lets the listener take action on the subscriber after an exception.public void notifyHeartbeatResumed(long time, int ticks):
Fired when a heartbeat session resumes. Both the time of the session resumption and the number of failures that occurred before the resumption pass as arguments to the implementing method. For example, if the period is 5 seconds, and the session failed at 1:32:20, then resumed at 1:32:39, the resume time is 1:32:39 and the number of elapsed failures is three.
IHeartbeatPublisherListener
IHeartbeatPublisherListener
defines the following methods, which the publisher fires at the appropriate events:
public void notifyHeartbeatPublisherStopped():
Fired when the publisher stops.public void notifyHeartbeatPublisherStarted():
Fired when the publisher starts.public void notifyHeartbeatPublisherHeartbeatSent():
Fired when the publisher publishes a heartbeat.public void notifyHeartbeatPublisherException(Exception erx, HeartbeatBase heartbeat):
Fired when the publisher throws an exception. Both the exception and the heartbeat base that threw the exception pass as arguments to the implementing method. The base allows the listener to take action on the publisher after an exception.
The sample listener implementations merely print the event and any passed values to System.out
. Here’s an example from the subscriber:
public void notifyHeartbeatFailedEvent(int ticks)
{
print("Heartbeat Session Failed "+ ticks + " times.");
}
As a quick and dirty test, both the HeartbeatPublisher
and the HeartbeatSubscriber
have main(String args[])
methods. The publisher’s listener terminates the JVM after 10 published heartbeats. I implemented the sample listeners described above in this test to display the applicable heartbeat events.
The main()
method’s parameters include:
- JNDI (Java Naming and Directory Interface) URL (e.g.,
ldap://myserver:389/o=nickman.com
) - JNDI principal (e.g.,
cn=Directory Manager
) - JNDI credentials (e.g.,
secret
) - JNDI
ContextFactory
name (e.g.,com.sun.jndi.ldap.LdapCtxFactory
) - JNDI authentication type (e.g.,
simple
) - JNDI namespace for JMS connection factory (e.g.,
cn=TCF,ou=jms
) - Heartbeat topic name
- Period (e.g.,
ms
)
If you run both a publisher and a subscriber, you’ll produce the following output:
Publisher output
java com.nickman.jms.heartbeats.HeartbeatPublisher
smqp://localhost:4001 admin secret com.swiftmq.jndi.
InitialContextFactoryImpl simple TopicConnectionFactory
MY.WORLD 1000
[Mon Jan 21 21:45:22 EST 2002] Publisher Started
[Mon Jan 21 21:45:23 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:24 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:25 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:26 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:27 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:28 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:29 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:30 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:31 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:32 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:33 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:33 EST 2002] Publisher Stopped
Subscriber output
<java com.nickman.jms.heartbeats.HeartbeatSubscriber
smqp://localhost:4001 admin secret com.swiftmq.jndi.
InitialContextFactoryImpl simple TopicConnectionFactory
MY.WORLD 2000
[Mon Jan 21 21:45:25 EST 2002] Heartbeat Subscriber Started
[Mon Jan 21 21:45:25 EST 2002] Heartbeat Session Resumed at Mon Jan 21 21:45:25 EST 2002 and 0 ticks.
[Mon Jan 21 21:45:25 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:26 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:27 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:28 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:29 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:30 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:31 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:32 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:33 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:37 EST 2002] Heartbeat Session Failed 1 times.
.
.
[Mon Jan 21 21:46:55 EST 2002] Heartbeat Session Failed 40 times.
Dependent publishers
The DependentHeartbeatPublisher
class implements a dependency in which the heartbeat does not publish unless the dependency is satisfied. In that case, the dependency is a connection to a JDBC (Java Database Connectivity) database.
To create a dependent publisher, simply extend HeartbeatPublisher
and override the setDependentParameters()
and fireDependency()
methods. You must also implement the constructors, but they simply pass through. Figure 7’s UML diagram summarizes DependentHeartbeatPublisher
‘s structure.
The setDependentParameters()
method must supply the object with the data necessary for it to accomplish the dependency test. In this example implementation, you simply pass in the JDBC parameters to connect to a database:
public void setDependentParameters(Object[] parameters) throws
InvalidDependentParameters {
if(parameters.length != 4) throw new InvalidDependentParameters
("Invalid Number Of Parameters:" + parameters.length
+ ". Should be 4.");
try {
jdbcDriverClass = (String)parameters[0];
jdbcURL = (String)parameters[1];
jdbcUser = (String)parameters[2];
jdbcPassword = (String)parameters[3];
} catch (Exception erx) {
throw new InvalidDependentParameters("Exception
Occurred Processing Parameters:" + erx);
}
}
Now the run loop fires the following dependency test:
public boolean fireDependency() {
java.sql.Connection conn = null;
try {
Class.forName(jdbcDriverClass);
if(jdbcUser==null) conn = DriverManager.getConnection(jdbcURL);
else conn = DriverManager.getConnection(jdbcURL, jdbcUser, jdbcPassword);
return !conn.isClosed();
} catch (Exception erx) {
this.fireHeartbeatPublisherException(new
PublisherDependencyException(erx));
return false;
} finally {
try { conn.close(); } catch (Exception erx) {}
}
}
It’s neither elegant nor efficient, but I hope it makes the dependent implementation clear. Note that the method does not throw an exception but fires an exception condition to the publisher listener.
To implement another quick and dirty test, the main()
method in DependentHeartbeatPublisher
looks as follows:
public static void main(String args[]) {
try {
TopicConnectionFactory tcf = (TopicConnectionFactory)
getConnectionFactory(args[0], args[1], args[2], args[3],
args[4], args[5]);
DependentHeartbeatPublisher publisher = new
DependentHeartbeatPublisher(tcf, args[6],
Long.parseLong(args[7]), "TEST", DeliveryMode.NON_PERSISTENT);
publisher.setDependentParameters(new Object[]
{"com.pointbase.jdbc.jdbcUniversalDriver",
"jdbc:pointbase://localhost/sample", "PUBLIC", "PUBLIC"});
TestPublisherListener tbl = new TestPublisherListener(publisher);
publisher.registerListener(tbl);
publisher.startHeartbeatProcess();
} catch (Exception erx) {
System.err.println("Error:" + erx);
System.exit(1);
}
}
If you start the publisher with the database running, shut the database down shortly after, and then restart it, the test’s output might look like this:
Dependent publisher output
<java com.nickman.jms.heartbeats.DependentHeartbeatPublisher
smqp://localhost:4001 admin secret com.swiftmq.jndi.
InitialContextFactoryImpl simple TopicConnectionFactory MY.WORLD
5000
[Tue Jan 22 08:38:01 EST 2002] Publisher Started
[Tue Jan 22 08:38:05 EST 2002] Publisher Sent Heartbeat
[Tue Jan 22 08:38:08 EST 2002] Publisher Sent Heartbeat
[Tue Jan 22 08:38:11 EST 2002] Publisher Sent Heartbeat
[Tue Jan 22 08:38:14 EST 2002] Publisher Sent Heartbeat
[Tue Jan 22 08:38:18 EST 2002] Publisher Sent Heartbeat
[Tue Jan 22 08:38:21 EST 2002] Publisher Sent Heartbeat
[Tue Jan 22 08:38:24 EST 2002] Publisher Sent Heartbeat
[Tue Jan 22 08:38:27 EST 2002] Publisher Sent Heartbeat
[Tue Jan 22 08:38:32 EST 2002] Publisher Exception:com.nickman.jms.heartbeats.exception.
PublisherDependencyException: PointBase server rejected
SQL connection.
[Tue Jan 22 08:38:37 EST 2002] Publisher Exception:com.nickman.jms.heartbeats.exception.
PublisherDependencyException: PointBase server rejected
SQL connection.
[Tue Jan 22 08:38:42 EST 2002] Publisher Exception:com.nickman.jms.heartbeats.exception.
PublisherDependencyException: PointBase server rejected
SQL connection.
[Tue Jan 22 08:38:46 EST 2002] Publisher Sent Heartbeat
[Tue Jan 22 08:38:49 EST 2002] Publisher Sent Heartbeat
[Tue Jan 22 08:38:52 EST 2002] Publisher Sent Heartbeat
[Tue Jan 22 08:38:52 EST 2002] Publisher Stopped
Subscriber output
java com.nickman.jms.heartbeats.HeartbeatSubscriber
smqp://localhost:4001 admin secret com.swiftmq.jndi.InitialContextFactoryImpl
simple TopicConnectionFactory MY.WORLD 3000
[Tue Jan 22 08:38:18 EST 2002] Heartbeat Subscriber Started
[Tue Jan 22 08:38:21 EST 2002] Heartbeat Session Resumed at Tue Jan 22 08:38:21 EST 2002 and 0 ticks.
[Tue Jan 22 08:38:21 EST 2002] Heartbeat Received
[Tue Jan 22 08:38:24 EST 2002] Heartbeat Received
[Tue Jan 22 08:38:27 EST 2002] Heartbeat Received
[Tue Jan 22 08:38:33 EST 2002] Heartbeat Session Failed 1 times.
[Tue Jan 22 08:38:38 EST 2002] Heartbeat Session Failed 2 times.
[Tue Jan 22 08:38:43 EST 2002] Heartbeat Session Failed 3 times.
[Tue Jan 22 08:38:46 EST 2002] Heartbeat Session Resumed at Wed Dec 31 19:00:02 EST 1969 and 3 ticks.
[Tue Jan 22 08:38:46 EST 2002] Heartbeat Received
[Tue Jan 22 08:38:49 EST 2002] Heartbeat Received
[Tue Jan 22 08:38:52 EST 2002] Heartbeat Received
I put a heartbeat publisher in a new Swing application, as Figure 8 illustrates, which I use to ensure that our testers are not slacking off!
Implementation and runtime caveats
Differences among operating environments, implementations, and throughputs might (probably will) mean that you’ll need to tweak things to get optimal results from JMS heartbeats. The next sections list issues difficult to address in the framework’s code.
Flow control
Be careful with flow control. Some JMS implementations employ flow control to slow down a publisher to a message rate approximating that of the slowest subscriber. Such flow control can be an asset, but one slow consumer (known as a poison subscriber) on a heartbeat topic might cause false heartbeat failures for some or all other heartbeat subscribers.
Unfortunately, I do not have a conclusive solution to this problem. After much struggle with flow control, I have arrived at the opinion that a JMS specification update should address the issue, which it currently dodges.
In a situation where a publisher’s rate exceeds that of only one subscriber, there are only three options:
- Drop the messages to that subscriber by:
- Dropping the most recent message
- Dropping the oldest message
- Persist the messages until the slow subscriber catches up
- Slow down the publisher to match the slowest subscriber’s rate
Your JMS provider’s abilities will dictate your options.
Thread pools
The publisher and subscriber implementations use a basic thread pool to asynchronously invoke the listener events, except for stops and exceptions. The default implementation uses a pool size of 2. You may need to adjust the size by either modifying the fireEvent()
method, which accepts a Boolean to determine whether to fire the events asynchronously or not, or by adjusting the thread pool size. Your choice will depend on the number and nature of the methods in the registered listeners.
JMS dependency
The framework depends heavily on JMS. Ultimately, the framework should support a more generic implementation in which you can choose the heartbeat’s underlying transport protocol.
JMS throughput
I have not implemented any benchmarks to test throughput. The results would vary based on implementation details as well as JMS software and hardware. I participated in a few threads on the JMS Interest forum on the issue of using JMS for alarms, and while most of the content was geared towards broadcasting alerts (sort of the opposite of heartbeats), when I pitched my idea, two people quickly asked if the architecture would support heartbeats with a frequency of 500 ms or less. The answer is yes, but I cannot be sure what impact this will have on your JMS infrastructure and whether or not this will affect your core traffic. Rerouting heartbeats to a secondary JMS server may be a solution; however, if you do this, you’ll lose the ability to check the health of your core JMS channels.
Wrap up
Most developers should find JMS publish and subscribe constructs an effective way to implement heartbeats. JMS vendors’ increased flexibility and protocol support add to JMS’s usefulness in this capacity. For example, HTTP now serves as a common JMS message protocol, so with minimal setup you could place heartbeat publishers in applets. This article’s framework will give you an implementation head start.
I would like to thank Jonathan “Xylophone” Simon and many others for their invaluable assistance.