当前位置: 首页 > 图文教程 > 网络编程 > JSP > 在BEA WebLogic中使用Java消息服务
(暂缺图)
目的地(Destination)
在创建任何消息的生产者或消费者之前,必须要有特定的Destination,通过它您可以将任何生产者与消费者联系起来。记住,Destination是托管对象,类似ConnectionFactories。这意味着Destination是由WebLogic维护的,必须通过JNDI查找来检索它。这还意味着,在这种情况下,Destination必须是事先定义好的。但这并不是说您总是要提前创建Destination。JMS API提供了创建临时Destination的能力,但这个Destination只能在创建它的Session的生存周期中使用,JMS API也能在运行时创建永久性Destination。然而,在JMS当前的WebLogic实现中,应该注意的是,如果通过Session创建Destination,那么只要WebLogic服务器在运行,这些Destination就会存在。如果服务器出故障或者死机了,那么Destination也会不存在。创建真正的永久性Destination的方法是在weblogic.properties文件中创建它(请参阅“install.txt”文件,以获得关于如何做到这一点的详细信息)。
鉴于本文的目的,我们的Destination是通过weblogic.properties文件提前创建的。该文件中定义的Destination是WebLogic应用服务器在开机时创建的,并且可以通过JNDI用于客户代码。清单1显示了Send.java文件中的代码,展示了如何创建QueueConnection和QueueSession,以及如何像检索Sender应用程序的包层次结构那样检索Destination,我们在weblogic.properties文件中定义的队列的名称为“jdj.article.queue.sender”。
消息的消费者和消息的生产者
JMS初始化过程的最后一个阶段是创建MessageConsumers和MessageProducers。像 ConnectionFactory一样,它们也有Connection 和Session两个基本接口,为了使用Topic或者Queue,还有一些扩展这两个基本接口的特定于Destination的基本接口。(我在使用MessageProducer时使用Producer这个术语,在使用MessageConsumer时使用Consumer这个术语)。MessageConsumers用于检索发送到Destination的消息,MessageProducers用于将消息发送到Destination。二者均由Session实例创建。MessageProducer由特定于Destination的接口QueueSender和TopicPublisher扩展。MessageConsumer则由QueueReceiver和TopicSubscriber接口扩展。
一旦已经创建了自己的MessageConsumer和/或MessageProducer,也就做好了接收和/或发送消息的所有准备。因为生产者和消费者的创建是特定于Queue或Topic的,所以我将在下面的相关小节中讨论特定于Destination类型的这两种类型的处理。
消息
在进入下一步并开始深入探讨Topic和Queue中的消息发送和接收之前,我们需要讨论另一个接口,即Message,它表示JMS消息本身。这个对象包含将发送到Destination的信息,以及正在Destination上进行监听的消费者接收的信息。Message是由会话实例创建的,它由三个部分组成:消息头、属性和消息体。消息头是用来识别和路由消息的,客户端开发人员通常不会看到或者处理消息头信息。属性支持通过消息传递的特定于应用程序的值。这些属性字段是预先定义的,对它们的完整描述可以在JMS文档中找到。我将在示例代码中使用一些属性。消息的主体部分是消息的实际“有效负载”,它有5种类型,可以通过5个特定的接口来表示它们:StreamMessage、MapMessage、ObjectMessage、TextMessage 和BytesMessage(参见图1)。
持久性
JMS还有一个重要方面值得讨论:它支持消息的持久传递。很简单,这意味着当消息发送到Destination时,如果Consumer没有运行或不可用,那么这个消息将被保存起来,直至下次Consumer连接到Destination为止。例如,如果您有5个应用程序在某一个Topic上进行监听,其中一个崩溃了,那么下一次该应用程序启动并连接到这个特定的Topic时,尽管该应用程序崩溃了,但发送到这个Topic的所有消息都将在应用程序开始再次进行监听时发送到这个应用程序。如果这看起来难以理解,那么在您运行示例代码时就会更有意义。
队列
Queue是为“点对点”或“一对一”的消息传递而设计的。这意味着应该只有一台客户机发送消息给Queue,并且只有一个应用程序可以处理这个Queue中的消息。
事实上,您可以有多台向某一个Queue发送消息的客户机,但只能有一个应用程序处理该Queue中的消息。如果在Queue上有多个Consumer,那么哪个Consumer接收消息将无法保证,但是只能有一个Consumer接收消息。如果Destination上需要多个Consumer,并且想让它们都收到相同的消息,那么应该使用Topic(参见本文后面内容)。
Sender.java和Receiver.java文件中包含展示如何使用Queue的代码。这些代码展示了JMS的初始化过程,并展示了如何检索预定义的Queue,以及为了在Queue上发送和接收消息,应该如何创建MessageProducer和MessageConsumer。
为了在Queue中消费和产生消息,系统中有两个特定接口。其中一个接口是QueueSender,它是通过调用QueueSession的一个createQueueSender()方法,从QueueSession返回的,用于向Queue发送消息。另一个接口是QueueReceiver,它是通过调用QueueSession的一个createQueueReceiver()方法,从QueueSession返回的,用于接收来自Queue的消息。
清单2是来自Sender.java的sendMsg方法的代码,它展示了如何从Session中如何创建MessageProducer,然后构造一个TextMessage并发送它。在这段代码中,我们将创建一个QueueSender,然后创建一个TextMessage,它包含从应用程序用户界面的TextField中检索到的文本。随后,我们使用QueueSender方法“发送”消息。
在MessageConsumer上,有两种处理传入消息接收的方法:同步的和异步的。第一步是创建MessageConsumer;下一步是判断您想在Consumer上同步发送信息还是异步发送消息。
在从Session中创建了MessageConsumer之后,如果想同步接收为Destination生成的下一条消息,那么可以调用MessageConsumer上的检索方法。该方法没有采用任何参数,在接收下一个Message之前,它将一直受阻,并且会将这个Message返回给调用者。为了接收异步消息,可以向MessageConsumer注册一个MessageListener接口的实现。这种方法对Topic 和 Queue都适用。MessageListener只有一个您必须实现的方法 —— onMessage,它只接收一个参数,即Message。在为每个发送到Destination的消息实现onMessage时,将调用该方法。Sender.java和Receiver.java中的onMessage实现中示范了这个过程。在Receiver.java中,我们将下面的代码放入initializeJMS方法中,这段代码将创建MessageConsumer(一个QueueReceiver)并设置MessageListener的实现:
// Create a Receiver for the Queue...
receiver = session.createReceiver(queue);
// Set the listener (this class)
receiver.setMessageListener(this);
一旦调用了Connection的start方法,消息就会在到达Destination之时开始进入Consumer。
ReplyTo——使用临时队列
您还会注意到,Sender.java和Receiver.java都实现了MessageConsumers和MessageProducers。二者都实现了MessageListener。这说明了JMS的一个有趣特征,也就是说,它使用了临时Destination。希望接收它所产生信息的响应的应用程序可以创建一个临时的Queue或者Topic,并在它发送的Message中传递这个Destination。
Message的属性之一是JMSReplyTo属性。这个属性就是用于这个目的的。您可以创建一个临时的Queue或者Topic,并把它放入Message的JMSReplyTo属性中。收到该消息的消费者有一个私有的临时Destination,可以用它来响应发送者。可以通过两种方法对这一点进行说明,这两种方法分别在两个文件中。Sender.java中包含如下所示代码段,它将创建临时的Queue,并将它放置在TextMessage的JMSReplyTo属性中:
// Create a temporary queue for replies...
tempQueue = (Queue) session.createTemporaryQueue();
以上这行代码可以在Sender.java的initializeJMS方法中找到。这段代码将在启动应用程序时创建一个临时Queue,而这个Queue将存在于应用程序的整个生命周期中。下面这行代码可以在Sender.java的sendMsg方法中找到,它展示了如何设置JMSReplyTo属性,以包含临时的Queue。
// Set ReplyTo to temporary queue...
msg.setJMSReplyTo(tempQueue);
在Receiver.java的QueueReceiver接收这条消息时,会从JMSReplyTo字段中提取临时Queue,并且会通过应用程序构造一个QueueSender,以便将响应消息发送回Sender.java。这展示了如何使用JMS Message的属性,并显示了私有的临时Destination的有用之处。它还展示了客户机可以如何既是消息的Producer,有是消息的Consumer。下面的代码来自Receiver.java,它展示了如何从JMS Message中提取临时Queue;可以在onMessage方法中找到这些代码:
// Get the temporary queue from the JMSReplyTo
// property of the message...
tempQueue = (Queue) msg.getJMSReplyTo();
以下代码块来自sendReplyToMsg方法,它展示了如何创建QueueSender和如何发送应答:
// create a Sender for the temporary queue
if (sender == null)
sender = session.createSender(tempQueue);
TextMessage msg = session.createTextMessage();
msg.setText(REPLYTO_TEXT);
...
// Send the message to the temporary queue...
sender.send(msg);
主题
Topic是为了实现“发布/订阅”消息传递机制而设计的。而Queue是为了拥有一个Producer和一个Consumer而设计的,Topic的设计目标是允许拥有多个能发送消息给它的Producer,同时拥有多个能接收来自Topic的同一消息的Consumer。
Topic的MessageProducers和MessageConsumers的创建过程与Queue的类似。您可以使用Session来创建TopicPublishers和TopicSubscribers。之所以存在QueueSender和QueueReceiver镜像,是因为它们都提供了特定于Topic的功能,并且都实现了基本的MessageProducer和MessageConsumer接口。
TopicPublisher的创建过程几乎与QueueSender的创建过程完全相同。以下代码来自Publisher.java的sendMsg方法,它展示了TopicPublisher的创建过程,以及如何向Topic发布消息:
// create a Publisher if there isn't one...
if (publisher == null)
publisher = session.createPublisher(topic);
TextMessage msg = session.createTextMessage();
msg.setText(text);
...
// Publish it to the topic...
publisher.publish(msg);
持久订阅者
TopicSubscribers一个有趣的方面是它的持久订阅者,即用户提供的惟一名称,可以在Session中识别这个名称。Session拥有一个与之相关的Client ID,这个ID要么是通过Connection上的一个方法调用在运行时定义的,要么是作为托管ConnectionFactory的一部分(在我们的例子中,它是在weblogic.properties文件中用这种方法定义的)。因此,一个Connection可以提供一个带有Client ID的Session,并且持久订阅者的名称是Session中的惟一标识符,它与特定的Client ID相关联。持久订阅者的目的是为给定的Topic创建一个惟一的、持久的Consumer。
通过调用TopicSession的createDurableSubscriber方法创建TopicSubscriber的应用程序必须传递一个持久订阅者的名称(一个字符串)作为其参数之一;例如,您可以将持久订阅者的名称设置为目前已登录的用户的名称,等等。这个名称惟一地标识出了某一Topic的特殊订阅者(连同Connection/Session的惟一Client ID)。一旦已经向Topic注册这个持久订阅者,该Topic就会持久性地确保消息被发送到这个订阅者那里。这意味着如果某一特殊的持久订阅者是不可用的,那么将会一直保存消息,直到下次这个持久订阅者(有相同的、惟一的持久订阅者名称和Client ID)注册为一个Consumer为止。Subscriber.java 文件展示了持久订阅者的创建过程,并允许您使用默认的订阅者名称,或者从命令行设置这个标识符(要得到关于运行这个应用程序和展示通过拥有持久订阅者获得消息永久性的更多细节,请参阅“readme.txt”)。下列代码片段来自Subscriber.java的initializeJMS方法,它展示了如何从TopicSession中创建一个持久订阅者:
subscriber = session.createDurableSubscriber(
topic,
subscriberID,
SELECTOR,
false);
// Set the listener (this class)
subscriber.setMessageListener(this);
TopicSubscribers并没有被创建为持久订阅者,因此它不会持久性地接收消息,而是只在运行期间接收消息。
要想获得关于持久订阅者和Topic的更进一步的信息,请参阅Sun的JMS文档。关于上述代码的另外一点是:注意传递给该方法的第三个参数,这个参数是SELECTOR。这是消息选择器与Consumer有关联的地方(请参阅下面内容,以获得关于消息选择器的更多信息)。
过滤——使用消息选择器
我们将讨论的有关JMS的最后一个方面是消息选择器,消息选择器是用于MessageConsumers的过滤器,可以用来过滤传入消息的属性和头文件部分(但不过滤消息体),并确定是否将实际消费该消息。按照JMS文档的说法,消息选择器是一些字符串,它们基于某种语法,而这种语法是SQL-92的子集。您可以将消息选择器作为MessageConsumer创建的一部分。根据MessageConsumer是QueueReceiver还是TopicSubscriber,这种将消息选择器应用于传入信息的方法也会稍有不同。我建议您通过检查Subscriber.java文件以及运行Publisher和Subscriber应用程序,检查消息收集器的语法,并查看如何应用它们。以下代码段在Subscriber.java中定义了消息选择器,而应用程序本身允许您从文本域中改变这个选择器:
public final String SELECTOR = "JMSType = 'TOPIC_PUBLISHER'";
该选择器检查了来自Topic的传入消息的JMSType属性,并确定了这个属性的值是否等于TOPIC_PUBLISHER。如果相等,则将消息传递到MessageListener实现;如果不相等,那么消息会被忽略。请参阅“readme.txt”文件,以获得关于运行这些应用程序和展示其行为的更多细节。此外还建议您参阅Sun的JMS文档。
结束语
JMS是一项在应用程序中创建可移植消息传递代码的很有吸引力的、功能强大的技术。它允许进行“点对点”和“发布/订阅”消息传递,而且还支持事务和持久性。BEA的WebLogic应用服务器提供了一种健壮并完整的JMS实现,它能与应用服务器提供的其他技术一同工作,比如EJB和servlet。这为通过事务支持在不同企业对象和服务间进行永久的,异步的消息传递创造了极大的可能。我希望本文可以鼓励您开发在线示例代码,并且可以鼓励您检查WebLogic(尤其是JMS)提供的可能性。
原文出处
http://www.sys-con.com/story/?storyid=42639&de=1
评论 (0) All