`
i_coding
  • 浏览: 15416 次
  • 性别: Icon_minigender_1
  • 来自: 合肥
社区版块
存档分类
最新评论

使用 ActiveMQ 中间件 实现消息传递可以实现QQ聊天

阅读更多
1.这是使用ActiveMQ 中间件的jar包   activemq-all-5.4.2.jar  来实现的消息发送与接受的Demo(完全原创的哈..): 
2.请大家多多指教: 
3. 
4./**
5. * 启动ActiveMQ内置服务
6. * 
7. * @author WLei 2011-4-12
8. */ 
9.public class ActiveMQConfigBean { 
10. 
11.    /**
12.     * 启动ActiveMQ服务
13.     * 
14.     * @return
15.     * @author WLei 2011-4-12
16.     */ 
17.    public BrokerService getBrokerService() { 
18.        try { 
19.            BrokerService brokerService = new BrokerService(); 
20.            brokerService.addConnector("tcp://localhost:0"); 
21.            brokerService.start(); 
22.            return brokerService; 
23.        } catch (Exception e) { 
24.            e.printStackTrace(); 
25.        } 
26.        return null; 
27.    } 
28. 
29.    /**
30.     * 创建ActiveMQ连接工厂
31.     * 
32.     * @return
33.     * @author WLei 2011-4-12
34.     */ 
35.    public ActiveMQConnectionFactory createActiveMQConnectionFactory() { 
36.        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost"); 
37.        return activeMQConnectionFactory; 
38.    } 
39. 
40.} 
41. 
42. 
43. 
44. 
45./**
46. * 监听生成类(监听可以写成多线程)
47. * 
48. * @author WLei Apr 20, 2011
49. */ 
50.public class GenerateListener { 
51. 
52.    /**
53.     * 生成主题类型的监听
54.     * 
55.     * @param activeMQConnectionFactory
56.     * @param topicName
57.     * @author WLei Apr 20, 2011
58.     */ 
59.    public void getTopicListener( 
60.            ActiveMQConnectionFactory activeMQConnectionFactory, 
61.            String topicName) { 
62.         
63.         
64.        try { 
65.             
66.            //生成一个连接 
67.            TopicConnection topicConnection = activeMQConnectionFactory 
68.                    .createTopicConnection(); 
69.            //创建一个会话 
70.            TopicSession topicSession = topicConnection.createTopicSession( 
71.                    false, TopicSession.AUTO_ACKNOWLEDGE); 
72.            //创建一个主题 
73.            Topic topic = topicSession.createTopic(topicName); 
74.             
75.            //创建主题订阅者 
76.            TopicSubscriber topicSubscriber = topicSession 
77.                    .createSubscriber(topic); 
78.             
79.            //新建一个订阅者的监听 
80.            topicSubscriber.setMessageListener(new MessageListener() { 
81.                 
82.                //实现MessageListener的onMessage 方法 
83.                @Override 
84.                public void onMessage(Message msg) { 
85.                     
86.                    //判定获取到的消息类型 
87.                    if (msg instanceof TextMessage) { 
88.                        TextMessage textMessage = (TextMessage) msg; 
89.                        try { 
90.                            //打印收到的消息 
91.                            System.out.println("主题模式监听到:" 
92.                                    + textMessage.getText()); 
93.                        } catch (JMSException e1) { 
94.                            e1.printStackTrace(); 
95.                        } 
96.                        try { 
97.                            msg.acknowledge(); 
98.                        } catch (JMSException e) { 
99.                            e.printStackTrace(); 
100.                        } 
101.                    } 
102.                } 
103.            }); 
104. 
105.            // 启动监听..(这其实是个线程) 
106.            topicConnection.start(); 
107.        } catch (JMSException e) { 
108. 
109.            e.printStackTrace(); 
110.        } 
111.    } 
112. 
113.    /**
114.     * 生成队列形式的监听
115.     * 
116.     * @param activeMQConnectionFactory
117.     * @param queueName
118.     * @author WLei Apr 20, 2011
119.     */ 
120.    public void getQueueListener( 
121.            ActiveMQConnectionFactory activeMQConnectionFactory, 
122.            String queueName) { 
123.        try { 
124.            // 创建一个连接 
125.            QueueConnection connection = activeMQConnectionFactory 
126.                    .createQueueConnection(); 
127.             
128.            //创建一个队列消息会话 
129.            QueueSession queueSession = connection.createQueueSession(false, 
130.                    QueueSession.AUTO_ACKNOWLEDGE); 
131.             
132.            //创建一个队列 
133.            Queue queue = queueSession.createQueue(queueName); 
134.             
135.            //创建一个接受者 
136.            QueueReceiver receiver = queueSession.createReceiver(queue); 
137.             
138.            //消息监听 
139.            receiver.setMessageListener(new MessageListener() { 
140.                @Override 
141.                public void onMessage(Message msg) { 
142.                    try { 
143.                        if (msg instanceof TextMessage) { 
144.                            TextMessage textMessage = (TextMessage) msg; 
145.                            System.out.println("队列模式监听到:" 
146.                                    + textMessage.getText()); 
147.                        } 
148.                        msg.acknowledge(); 
149.                    } catch (JMSException e) { 
150.                        e.printStackTrace(); 
151.                    } 
152.                } 
153.            }); 
154.            connection.start(); 
155.        } catch (JMSException e) { 
156.            e.printStackTrace(); 
157.        } 
158.    } 
159. 
160.} 
161. 
162. 
163. 
164. 
165. 
166. 
167./**
168. * 发送生成类
169. * 
170. * @author WLei Apr 20, 2011
171. */ 
172.public class GenerateSender { 
173. 
174.    /**
175.     * 创建一个发送主题消息的方法(是个简单的例子没有考虑性能而进行拆分)
176.     * 
177.     * @param activeMQConnectionFactory
178.     * @param msg
179.     * @param topicName
180.     * @author WLei Apr 20, 2011
181.     */ 
182.    public void topicSender( 
183.            ActiveMQConnectionFactory activeMQConnectionFactory, String msg, 
184.            String topicName) { 
185.        try { 
186.            // 创建一个主题消息的连接 
187.            TopicConnection topicConnection = activeMQConnectionFactory 
188.                    .createTopicConnection(); 
189.            // 创建一个主题会话 
190.            TopicSession topicSession = topicConnection.createTopicSession( 
191.                    false, TopicSession.AUTO_ACKNOWLEDGE); 
192.            // 创建一个主题 
193.            Topic topic = topicSession.createTopic(topicName); 
194.            // 创建一个该主题发布者 
195.            TopicPublisher topicPublisher = topicSession.createPublisher(topic); 
196. 
197.            // publisher.setDeliveryMode(DeliveryMode.PERSISTENT);//设置消息模式,有持久与非持久的 
198.            // publisher.setTimeToLive(3*24*60*60*1000);//生存时间 
199. 
200.            // 创建一个文本消息 
201.            TextMessage textMessage = topicSession.createTextMessage(); 
202.            // 设置文本消息内容 
203.            textMessage.setText(msg); 
204.            // 发布消息 
205.            topicPublisher.publish(textMessage); 
206.            // 关闭连接 
207.            topicConnection.close(); 
208.        } catch (JMSException e) { 
209.            e.printStackTrace(); 
210.        } 
211.    } 
212. 
213.    /**
214.     * 创建一个发送队列消息的方法
215.     * 
216.     * @param activeMQConnectionFactory
217.     * @param msg
218.     * @param queueName
219.     * @author WLei Apr 20, 2011
220.     */ 
221.    public void queueSender( 
222.            ActiveMQConnectionFactory activeMQConnectionFactory, String msg, 
223.            String queueName) { 
224.        try { 
225.            // 创建一个连接 
226.            QueueConnection queueConnection = activeMQConnectionFactory 
227.                    .createQueueConnection(); 
228.            // 创建一个会话 
229.            QueueSession queueSession = queueConnection.createQueueSession( 
230.                    false, QueueSession.AUTO_ACKNOWLEDGE); 
231.            // 创建一个队列 
232.            Queue queue = queueSession.createQueue(queueName); 
233.            // 创建一个该队列的发送者 
234.            QueueSender queueSender = queueSession.createSender(queue); 
235.            // 使用queueSession创建一个文本消息 
236.            TextMessage textMessage = queueSession.createTextMessage(); 
237.            // 设置该文本消息的文本内容 
238.            textMessage.setText(msg); 
239.            // 发送该文本消息 
240.            queueSender.send(textMessage); 
241.            // 关闭连接 
242.            queueConnection.close(); 
243.        } catch (JMSException e) { 
244.            e.printStackTrace(); 
245.        } 
246.    } 
247. 
248.} 
249. 
250. 
251. 
252./**
253. * 发送信息
254. * 
255. * @author WLei 2011-4-13
256. */ 
257.public class Test { 
258.    public static void main(String[] args) { 
259.         
260.        String topicMsg = "this is a message of topic.."; 
261.        String queueMsg = "this is a message of queue.."; 
262. 
263.        // 实例化一个配置bean 
264.        ActiveMQConfigBean activeMQConfig = new ActiveMQConfigBean(); 
265. 
266.        // 启动中间件服务 
267.        activeMQConfig.getBrokerService(); 
268. 
269.        // 获取一个连接工厂 
270.        ActiveMQConnectionFactory ac = activeMQConfig 
271.                .createActiveMQConnectionFactory(); 
272. 
273.        // 生成一个监听 
274.        GenerateListener generateListener = new GenerateListener(); 
275.         
276.        String topicChat = "topic"; 
277.        String queueChat = "queue"; 
278. 
279.        // 启动主题监听 
280.        generateListener.getTopicListener(ac, topicChat); 
281.        generateListener.getQueueListener(ac, queueChat); 
282. 
283.        // 创建一个发送类 
284.        GenerateSender generateSender = new GenerateSender(); 
285.        // 发送主题消息 
286.        generateSender.topicSender(ac, topicMsg, topicChat); 
287.        // 发送队列消息 
288.        generateSender.queueSender(ac, queueMsg, queueChat); 
289. 
290.    } 
291.} 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics