RabbitMQ学习笔记4-使用fanout交换器

2022-10-19,,,

fanout交换器会把发送给它的所有消息发送给绑定在它上面的队列,起到广播一样的效果。

本里使用实际业务中常见的例子,

订单系统:创建订单,然后发送一个事件消息

积分系统:发送订单的积分奖励

短信平台:发送订单的短信

消息生产者SenderWithFanoutExchange

 package com.yzl.test3;

 import java.util.Date;

 import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; /**
* 使用fanout交换器产生事件,消费者订阅事件做相应的处理
* @author: yzl
* @date: 2016-10-22
*/
public class SenderWithFanoutExchange {
//交换器名称
private static final String EXCHANGE_NAME = "myFanoutExchange"; public static void main(String[] args) throws Exception {
//连接到rabbitmq服务器
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
//创建一个信道
final Channel channel = connection.createChannel();
//定义一个名字为topicExchange的fanout类型的exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //创建一个时间的Event对象
EventObj createOrderEvent = null;
for(int i=1; i<10; i++){
createOrderEvent = new EventObj();
createOrderEvent.setUserId(Long.valueOf(i));
createOrderEvent.setCreateTime(new Date());
createOrderEvent.setEventType("create_order");
//转成JSON
String msg = new Gson().toJson(createOrderEvent); System.out.println("send msg:" + msg); //使用order_event路由键来发送该事件消息
channel.basicPublish(EXCHANGE_NAME, "order_event", null, msg.getBytes()); Thread.sleep(1000);
} channel.close();
connection.close();
}
}

消费消费者ReceiverWithFanoutExchange

 package com.yzl.test3;

 import java.io.IOException;

 import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties; /**
* 使用fanout交换器接收订单事件消息
*
* @author: yzl
* @date: 2016-10-22
*/
public class ReceiverWithFanoutExchange {
// 交换器名称
private static final String EXCHANGE_NAME = "myFanoutExchange";
//接收订单事件并发放积分的队列
private static final String QUEUE_ORDER_REWARD_POINTS = "rewardOrderPoints";
//发放订单积分的路由键
private static final String ROUTING_KEY_ORDER_POINTS = "reward_order_points";
//接收订单事件并发短信的队列
private static final String QUEUE_ORDER_SEND_SMS = "sendOrderSms";
//发送订单短信的路由键
private static final String ROUTING_KEY_ORDER_SMS = "send_order_sms"; private static Channel channel = null; static{
try{
// 连接到rabbitmq服务器
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
// 创建一个信道
channel = connection.createChannel();
// 定义一个名字为myFanoutExchange的fanout类型的exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
}catch (Exception e) {
// TODO: handle exception
}
} /**
* 发放订单的积分奖励
*/
public static void rewardPoints() throws Exception {
channel.queueDeclare(QUEUE_ORDER_REWARD_POINTS, false, false, false, null);
channel.queueBind(QUEUE_ORDER_REWARD_POINTS, EXCHANGE_NAME, ROUTING_KEY_ORDER_POINTS); channel.basicConsume(QUEUE_ORDER_REWARD_POINTS, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body);
System.out.println("积分系统接收到订单创建的事件消息 :" + msg);
System.out.println("准备发放积分.....");
}
});
} /**
* 发送订单成功的短信
*/
public static void sendSms() throws Exception {
channel.queueDeclare(QUEUE_ORDER_SEND_SMS, false, false, false, null);
channel.queueBind(QUEUE_ORDER_SEND_SMS, EXCHANGE_NAME, ROUTING_KEY_ORDER_SMS); channel.basicConsume(QUEUE_ORDER_REWARD_POINTS, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body);
System.out.println("短信平台接收到订单创建的事件消息 :" + msg);
System.out.println("准备发送短信.....");
}
});
} public static void main(String[] args) throws Exception {
rewardPoints();
sendSms();
}
}

运行结果输出:

 send msg:{"userId":1,"createTime":"Oct 22, 2016 10:54:04 PM","eventType":"create_order"}
send msg:{"userId":2,"createTime":"Oct 22, 2016 10:54:05 PM","eventType":"create_order"}
send msg:{"userId":3,"createTime":"Oct 22, 2016 10:54:06 PM","eventType":"create_order"}
send msg:{"userId":4,"createTime":"Oct 22, 2016 10:54:07 PM","eventType":"create_order"}
send msg:{"userId":5,"createTime":"Oct 22, 2016 10:54:08 PM","eventType":"create_order"}
send msg:{"userId":6,"createTime":"Oct 22, 2016 10:54:09 PM","eventType":"create_order"}
send msg:{"userId":7,"createTime":"Oct 22, 2016 10:54:10 PM","eventType":"create_order"}
send msg:{"userId":8,"createTime":"Oct 22, 2016 10:54:11 PM","eventType":"create_order"}
send msg:{"userId":9,"createTime":"Oct 22, 2016 10:54:12 PM","eventType":"create_order"}
 积分系统接收到订单创建的事件消息 :{"userId":1,"createTime":"Oct 22, 2016 10:54:04 PM","eventType":"create_order"}
准备发放积分.....
短信平台接收到订单创建的事件消息 :{"userId":2,"createTime":"Oct 22, 2016 10:54:05 PM","eventType":"create_order"}
准备发送短信.....
积分系统接收到订单创建的事件消息 :{"userId":3,"createTime":"Oct 22, 2016 10:54:06 PM","eventType":"create_order"}
准备发放积分.....
短信平台接收到订单创建的事件消息 :{"userId":4,"createTime":"Oct 22, 2016 10:54:07 PM","eventType":"create_order"}
准备发送短信.....
积分系统接收到订单创建的事件消息 :{"userId":5,"createTime":"Oct 22, 2016 10:54:08 PM","eventType":"create_order"}
准备发放积分.....
短信平台接收到订单创建的事件消息 :{"userId":6,"createTime":"Oct 22, 2016 10:54:09 PM","eventType":"create_order"}
准备发送短信.....
积分系统接收到订单创建的事件消息 :{"userId":7,"createTime":"Oct 22, 2016 10:54:10 PM","eventType":"create_order"}
准备发放积分.....
短信平台接收到订单创建的事件消息 :{"userId":8,"createTime":"Oct 22, 2016 10:54:11 PM","eventType":"create_order"}
准备发送短信.....
积分系统接收到订单创建的事件消息 :{"userId":9,"createTime":"Oct 22, 2016 10:54:12 PM","eventType":"create_order"}
准备发放积分.....

RabbitMQ学习笔记4-使用fanout交换器的相关教程结束。

《RabbitMQ学习笔记4-使用fanout交换器.doc》

下载本文的Word格式文档,以方便收藏与打印。