`
i_coding
  • 浏览: 15464 次
  • 性别: Icon_minigender_1
  • 来自: 合肥
社区版块
存档分类
最新评论
收藏列表
标题 标签 来源
使用ActiveMQ的jar包来实现聊天 activemq
这是使用ActiveMQ 中间件的jar包   activemq-all-5.4.2.jar  来实现的消息发送与接受的Demo(完全原创的哈..):
请大家多多指教:

/**
 * 启动ActiveMQ内置服务
 * 
 * @author WLei 2011-4-12
 */
public class ActiveMQConfigBean {

	/**
	 * 启动ActiveMQ服务
	 * 
	 * @return
	 * @author WLei 2011-4-12
	 */
	public BrokerService getBrokerService() {
		try {
			BrokerService brokerService = new BrokerService();
			brokerService.addConnector("tcp://localhost:0");
			brokerService.start();
			return brokerService;
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}

	/**
	 * 创建ActiveMQ连接工厂
	 * 
	 * @return
	 * @author WLei 2011-4-12
	 */
	public ActiveMQConnectionFactory createActiveMQConnectionFactory() {
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
		return activeMQConnectionFactory;
	}

}




/**
 * 监听生成类(监听可以写成多线程)
 * 
 * @author WLei Apr 20, 2011
 */
public class GenerateListener {

	/**
	 * 生成主题类型的监听
	 * 
	 * @param activeMQConnectionFactory
	 * @param topicName
	 * @author WLei Apr 20, 2011
	 */
	public void getTopicListener(
			ActiveMQConnectionFactory activeMQConnectionFactory,
			String topicName) {
		
		
		try {
			
			//生成一个连接
			TopicConnection topicConnection = activeMQConnectionFactory
					.createTopicConnection();
			//创建一个会话
			TopicSession topicSession = topicConnection.createTopicSession(
					false, TopicSession.AUTO_ACKNOWLEDGE);
			//创建一个主题
			Topic topic = topicSession.createTopic(topicName);
			
			//创建主题订阅者
			TopicSubscriber topicSubscriber = topicSession
					.createSubscriber(topic);
			
			//新建一个订阅者的监听
			topicSubscriber.setMessageListener(new MessageListener() {
				
				//实现MessageListener的onMessage 方法
				@Override
				public void onMessage(Message msg) {
					
					//判定获取到的消息类型
					if (msg instanceof TextMessage) {
						TextMessage textMessage = (TextMessage) msg;
						try {
							//打印收到的消息
							System.out.println("主题模式监听到:"
									+ textMessage.getText());
						} catch (JMSException e1) {
							e1.printStackTrace();
						}
						try {
							msg.acknowledge();
						} catch (JMSException e) {
							e.printStackTrace();
						}
					}
				}
			});

			// 启动监听..(这其实是个线程)
			topicConnection.start();
		} catch (JMSException e) {

			e.printStackTrace();
		}
	}

	/**
	 * 生成队列形式的监听
	 * 
	 * @param activeMQConnectionFactory
	 * @param queueName
	 * @author WLei Apr 20, 2011
	 */
	public void getQueueListener(
			ActiveMQConnectionFactory activeMQConnectionFactory,
			String queueName) {
		try {
			// 创建一个连接
			QueueConnection connection = activeMQConnectionFactory
					.createQueueConnection();
			
			//创建一个队列消息会话
			QueueSession queueSession = connection.createQueueSession(false,
					QueueSession.AUTO_ACKNOWLEDGE);
			
			//创建一个队列
			Queue queue = queueSession.createQueue(queueName);
			
			//创建一个接受者
			QueueReceiver receiver = queueSession.createReceiver(queue);
			
			//消息监听
			receiver.setMessageListener(new MessageListener() {
				@Override
				public void onMessage(Message msg) {
					try {
						if (msg instanceof TextMessage) {
							TextMessage textMessage = (TextMessage) msg;
							System.out.println("队列模式监听到:"
									+ textMessage.getText());
						}
						msg.acknowledge();
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}
			});
			connection.start();
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}






/**
 * 发送生成类
 * 
 * @author WLei Apr 20, 2011
 */
public class GenerateSender {

	/**
	 * 创建一个发送主题消息的方法(是个简单的例子没有考虑性能而进行拆分)
	 * 
	 * @param activeMQConnectionFactory
	 * @param msg
	 * @param topicName
	 * @author WLei Apr 20, 2011
	 */
	public void topicSender(
			ActiveMQConnectionFactory activeMQConnectionFactory, String msg,
			String topicName) {
		try {
			// 创建一个主题消息的连接
			TopicConnection topicConnection = activeMQConnectionFactory
					.createTopicConnection();
			// 创建一个主题会话
			TopicSession topicSession = topicConnection.createTopicSession(
					false, TopicSession.AUTO_ACKNOWLEDGE);
			// 创建一个主题
			Topic topic = topicSession.createTopic(topicName);
			// 创建一个该主题发布者
			TopicPublisher topicPublisher = topicSession.createPublisher(topic);

			// publisher.setDeliveryMode(DeliveryMode.PERSISTENT);//设置消息模式,有持久与非持久的
			// publisher.setTimeToLive(3*24*60*60*1000);//生存时间

			// 创建一个文本消息
			TextMessage textMessage = topicSession.createTextMessage();
			// 设置文本消息内容
			textMessage.setText(msg);
			// 发布消息
			topicPublisher.publish(textMessage);
			// 关闭连接
			topicConnection.close();
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 创建一个发送队列消息的方法
	 * 
	 * @param activeMQConnectionFactory
	 * @param msg
	 * @param queueName
	 * @author WLei Apr 20, 2011
	 */
	public void queueSender(
			ActiveMQConnectionFactory activeMQConnectionFactory, String msg,
			String queueName) {
		try {
			// 创建一个连接
			QueueConnection queueConnection = activeMQConnectionFactory
					.createQueueConnection();
			// 创建一个会话
			QueueSession queueSession = queueConnection.createQueueSession(
					false, QueueSession.AUTO_ACKNOWLEDGE);
			// 创建一个队列
			Queue queue = queueSession.createQueue(queueName);
			// 创建一个该队列的发送者
			QueueSender queueSender = queueSession.createSender(queue);
			// 使用queueSession创建一个文本消息
			TextMessage textMessage = queueSession.createTextMessage();
			// 设置该文本消息的文本内容
			textMessage.setText(msg);
			// 发送该文本消息
			queueSender.send(textMessage);
			// 关闭连接
			queueConnection.close();
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}



/**
 * 发送信息
 * 
 * @author WLei 2011-4-13
 */
public class Test {
	public static void main(String[] args) {
		
		String topicMsg = "this is a message of topic..";
		String queueMsg = "this is a message of queue..";

		// 实例化一个配置bean
		ActiveMQConfigBean activeMQConfig = new ActiveMQConfigBean();

		// 启动中间件服务
		activeMQConfig.getBrokerService();

		// 获取一个连接工厂
		ActiveMQConnectionFactory ac = activeMQConfig
				.createActiveMQConnectionFactory();

		// 生成一个监听
		GenerateListener generateListener = new GenerateListener();
		
		String topicChat = "topic";
		String queueChat = "queue";

		// 启动主题监听
		generateListener.getTopicListener(ac, topicChat);
		generateListener.getQueueListener(ac, queueChat);

		// 创建一个发送类
		GenerateSender generateSender = new GenerateSender();
		// 发送主题消息
		generateSender.topicSender(ac, topicMsg, topicChat);
		// 发送队列消息
		generateSender.queueSender(ac, queueMsg, queueChat);

	}
}


Global site tag (gtag.js) - Google Analytics