这是使用ActiveMQ 中间件的jar包 activemq-all-5.4.2.jar 来实现的消息发送与接受的Demo(完全原创的哈..):
请大家多多指教:
/**
* 启动ActiveMQ内置服务
*
* @author WLei 2011-4-12
*/
public class ActiveMQConfigBean {
/**
* 启动ActiveMQ服务
*
* @return
* @author WLei 2011-4-12
*/
public BrokerService getBrokerService() {
try {
BrokerService brokerService = new BrokerService();
brokerService.addConnector("tcp://localhost:0");
brokerService.start();
return brokerService;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 创建ActiveMQ连接工厂
*
* @return
* @author WLei 2011-4-12
*/
public ActiveMQConnectionFactory createActiveMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
return activeMQConnectionFactory;
}
}
/**
* 监听生成类(监听可以写成多线程)
*
* @author WLei Apr 20, 2011
*/
public class GenerateListener {
/**
* 生成主题类型的监听
*
* @param activeMQConnectionFactory
* @param topicName
* @author WLei Apr 20, 2011
*/
public void getTopicListener(
ActiveMQConnectionFactory activeMQConnectionFactory,
String topicName) {
try {
//生成一个连接
TopicConnection topicConnection = activeMQConnectionFactory
.createTopicConnection();
//创建一个会话
TopicSession topicSession = topicConnection.createTopicSession(
false, TopicSession.AUTO_ACKNOWLEDGE);
//创建一个主题
Topic topic = topicSession.createTopic(topicName);
//创建主题订阅者
TopicSubscriber topicSubscriber = topicSession
.createSubscriber(topic);
//新建一个订阅者的监听
topicSubscriber.setMessageListener(new MessageListener() {
//实现MessageListener的onMessage 方法
@Override
public void onMessage(Message msg) {
//判定获取到的消息类型
if (msg instanceof TextMessage) {
TextMessage textMessage = (TextMessage) msg;
try {
//打印收到的消息
System.out.println("主题模式监听到:"
+ textMessage.getText());
} catch (JMSException e1) {
e1.printStackTrace();
}
try {
msg.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 启动监听..(这其实是个线程)
topicConnection.start();
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* 生成队列形式的监听
*
* @param activeMQConnectionFactory
* @param queueName
* @author WLei Apr 20, 2011
*/
public void getQueueListener(
ActiveMQConnectionFactory activeMQConnectionFactory,
String queueName) {
try {
// 创建一个连接
QueueConnection connection = activeMQConnectionFactory
.createQueueConnection();
//创建一个队列消息会话
QueueSession queueSession = connection.createQueueSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
//创建一个队列
Queue queue = queueSession.createQueue(queueName);
//创建一个接受者
QueueReceiver receiver = queueSession.createReceiver(queue);
//消息监听
receiver.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
try {
if (msg instanceof TextMessage) {
TextMessage textMessage = (TextMessage) msg;
System.out.println("队列模式监听到:"
+ textMessage.getText());
}
msg.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
connection.start();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
/**
* 发送生成类
*
* @author WLei Apr 20, 2011
*/
public class GenerateSender {
/**
* 创建一个发送主题消息的方法(是个简单的例子没有考虑性能而进行拆分)
*
* @param activeMQConnectionFactory
* @param msg
* @param topicName
* @author WLei Apr 20, 2011
*/
public void topicSender(
ActiveMQConnectionFactory activeMQConnectionFactory, String msg,
String topicName) {
try {
// 创建一个主题消息的连接
TopicConnection topicConnection = activeMQConnectionFactory
.createTopicConnection();
// 创建一个主题会话
TopicSession topicSession = topicConnection.createTopicSession(
false, TopicSession.AUTO_ACKNOWLEDGE);
// 创建一个主题
Topic topic = topicSession.createTopic(topicName);
// 创建一个该主题发布者
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
// publisher.setDeliveryMode(DeliveryMode.PERSISTENT);//设置消息模式,有持久与非持久的
// publisher.setTimeToLive(3*24*60*60*1000);//生存时间
// 创建一个文本消息
TextMessage textMessage = topicSession.createTextMessage();
// 设置文本消息内容
textMessage.setText(msg);
// 发布消息
topicPublisher.publish(textMessage);
// 关闭连接
topicConnection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* 创建一个发送队列消息的方法
*
* @param activeMQConnectionFactory
* @param msg
* @param queueName
* @author WLei Apr 20, 2011
*/
public void queueSender(
ActiveMQConnectionFactory activeMQConnectionFactory, String msg,
String queueName) {
try {
// 创建一个连接
QueueConnection queueConnection = activeMQConnectionFactory
.createQueueConnection();
// 创建一个会话
QueueSession queueSession = queueConnection.createQueueSession(
false, QueueSession.AUTO_ACKNOWLEDGE);
// 创建一个队列
Queue queue = queueSession.createQueue(queueName);
// 创建一个该队列的发送者
QueueSender queueSender = queueSession.createSender(queue);
// 使用queueSession创建一个文本消息
TextMessage textMessage = queueSession.createTextMessage();
// 设置该文本消息的文本内容
textMessage.setText(msg);
// 发送该文本消息
queueSender.send(textMessage);
// 关闭连接
queueConnection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
/**
* 发送信息
*
* @author WLei 2011-4-13
*/
public class Test {
public static void main(String[] args) {
String topicMsg = "this is a message of topic..";
String queueMsg = "this is a message of queue..";
// 实例化一个配置bean
ActiveMQConfigBean activeMQConfig = new ActiveMQConfigBean();
// 启动中间件服务
activeMQConfig.getBrokerService();
// 获取一个连接工厂
ActiveMQConnectionFactory ac = activeMQConfig
.createActiveMQConnectionFactory();
// 生成一个监听
GenerateListener generateListener = new GenerateListener();
String topicChat = "topic";
String queueChat = "queue";
// 启动主题监听
generateListener.getTopicListener(ac, topicChat);
generateListener.getQueueListener(ac, queueChat);
// 创建一个发送类
GenerateSender generateSender = new GenerateSender();
// 发送主题消息
generateSender.topicSender(ac, topicMsg, topicChat);
// 发送队列消息
generateSender.queueSender(ac, queueMsg, queueChat);
}
}
|