怎么解析ActiveMQ消息队列技术融合Spring过程

2024-03-14,

这篇文章将为大家详细讲解有关怎么解析ActiveMQ消息队列技术融合Spring过程,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

一、业务逻辑

我想在修改一个物品的状态时,同时发送广播,给对应的监听器去实现,此商品存储到solr中,同时通过网页静态模板生成一个当前物品的详情页面,此时用到了广播机制

当我删除一个商品时,发送一个广播,给对应的监听器,同时删除solr中对应的物品。

广播机制:必须要同时在线,才能接收我的消息

使用消息中间件需要导入配置文件

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"  xmlns:jms="http://www.springframework.org/schema/jms"  xsi:schemaLocation="http://www.springframework.org/schema/beans      http://www.springframework.org/schema/beans/spring-beans.xsd    http://www.springframework.org/schema/context      http://www.springframework.org/schema/context/spring-context.xsd">  <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->   <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">     <property name="brokerURL" value="tcp://192.168.200.128:61616"/>   </bean>  <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->   <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">   <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->     <property name="targetConnectionFactory" ref="targetConnectionFactory"/>   </bean>  <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->   <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">     <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->     <property name="connectionFactory" ref="connectionFactory"/>   </bean>      <!-- 发布订阅模式, 商品导入索引库和生成静态页面 -->  <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic">     <!--将商品上架所有的商品的id发送到这个队列中-->     <constructor-arg value="youlexuan_topic_page_solr"/>  </bean>  <!-- 点对点模式-->  <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue">    <!--将商品上架所有的商品的id发送到这个队列中-->    <constructor-arg value="youlexuan_queue_solr_delete"/>  </bean>   </beans>

发布广播:

if ("1".equals(status)){  jmsTemplate.send(topicPageAndSolrDestination, new MessageCreator() {    @Override    public Message createMessage(Session session) throws JMSException {      TextMessage textMessage = session.createTextMessage(String.valueOf(id));      return textMessage;    }  });}

监听器1,将当前商品存入solr中:操作solr的服务器配置文件

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"  xmlns:jms="http://www.springframework.org/schema/jms"  xsi:schemaLocation="http://www.springframework.org/schema/beans      http://www.springframework.org/schema/beans/spring-beans.xsd    http://www.springframework.org/schema/context      http://www.springframework.org/schema/context/spring-context.xsd">   <!--产生Connection-->  <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">     <property name="brokerURL" value="tcp://192.168.200.128:61616"/>   </bean>  <!--spring 管理connectionFactory-->  <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">      <property name="targetConnectionFactory" ref="targetConnectionFactory"/>   </bean>  <!--发布订阅模式  将数据导入solr 索引库-->  <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic">     <constructor-arg value="youlexuan_topic_page_solr"/>  </bean>  <!--发布订阅模式  消息监听容器 将数据导入solr 索引库-->  <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">    <property name="connectionFactory" ref="connectionFactory" />    <property name="destination" ref="topicPageAndSolrDestination" />    <property name="messageListener" ref="pageAndSolrListener" />  </bean>#对应的用来监听执行往solr中保存库存的消息  <bean id="pageAndSolrListener" class="com.ghh.sellergoods.service.listener.ItemSearchListener"></bean>  <!--点对点的模式 删除索引库-->  <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue">    <!--指定从这个队列中 接收下架商品的-->    <constructor-arg value="youlexuan_queue_solr_delete"/>  </bean>  <!--点对点的模式 消息监听器 删除索引库-->  <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">    <property name="connectionFactory" ref="connectionFactory" />    <property name="destination" ref="queueSolrDeleteDestination" />    <property name="messageListener" ref="itemDeleteListener" />  </bean>  <bean id="itemDeleteListener" class="com.ghh.sellergoods.service.listener.ItemDeleteListener"></bean></beans>

监听器类

public class ItemSearchListener implements MessageListener {  @Autowired  private SearchService searchService;  @Autowired  private ItemDao itemDao;  @Override  public void onMessage(Message message) {    //获取生产者发布的广播,往solr中添加库存列表    ActiveMQTextMessage atm = (ActiveMQTextMessage) message;    try {      //获取广播中的数据。      Long goodsId = Long.valueOf(atm.getText());      //通过传过来的商品id去查询库存表      ItemQuery query = new ItemQuery();      ItemQuery.Criteria criteria = query.createCriteria();      criteria.andGoodsIdEqualTo(goodsId);      //查询对应商品id的库存表      List<Item> items = itemDao.selectByExample(query);        //调用对应的方法,往solr中添加当前商品对应库存信息      searchService.importList(items);    } catch (JMSException e) {      e.printStackTrace();    }  }}

监听器类2:配置文件

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"  xmlns:jms="http://www.springframework.org/schema/jms"  xsi:schemaLocation="http://www.springframework.org/schema/beans      http://www.springframework.org/schema/beans/spring-beans.xsd    http://www.springframework.org/schema/context      http://www.springframework.org/schema/context/spring-context.xsd">   <!--产生Connection工厂类-->  <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">    <property name="brokerURL" value="tcp://192.168.200.128:61616"/>   </bean>  <!--spring管理工厂类-->  <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">      <property name="targetConnectionFactory" ref="targetConnectionFactory"/>   </bean>  <!--发布订阅模式 生成页面-->  <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic">      <!--指定从这个队列上获取上架的商品id-->     <constructor-arg value="youlexuan_topic_page_solr"/>  </bean>  <!--发布订阅模式 消息监听器 生成页面-->  <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">    <property name="connectionFactory" ref="connectionFactory" />    <property name="destination" ref="topicPageAndSolrDestination" />    <property name="messageListener" ref="pageListener" />  </bean>  <bean id="pageListener" class="com.ghh.core.service.listener.PageListener"></bean>  </beans>

监听器类2:生成静态网页模板

public class PageListener implements MessageListener {  @Autowired  private CmsService cmsService;  @Override  public void onMessage(Message message) {    ActiveMQTextMessage atm = (ActiveMQTextMessage) message;    try {      Long goodsId = Long.valueOf(atm.getText());      Map<String, Object> goodsData = cmsService.findGoodsData(goodsId);      cmsService.createStaticPage(goodsId,goodsData);    } catch (Exception e) {      e.printStackTrace();    }  }}

点对点

当我删除商品时,我需要对应的服务进行删除solr中库存信息,添加和删除使用的是同一个服务中,使用的是上面的配置文件

//发布广播,  @Autowired  private ActiveMQTopic topicPageAndSolrDestination;//在修改的代码方法中来广播发布当前商品的idif (ids.length>0) {      jmsTemplate.send(queueSolrDeleteDestination, new MessageCreator() {        @Override        public Message createMessage(Session session) throws JMSException {          TextMessage textMessage = session.createTextMessage(String.valueOf(ids));          return textMessage;        }      });    }

#执行删除solr中库存信息public class ItemDeleteListener implements MessageListener {  @Autowired  private SearchService searchService;  @Override  public void onMessage(Message message) {    ActiveMQTextMessage atm = (ActiveMQTextMessage) message;    try {      Long goodsId = Long.valueOf(atm.getText());      searchService.deleteById(goodsId);    } catch (JMSException e) {      e.printStackTrace();    }  }}

关于怎么解析ActiveMQ消息队列技术融合Spring过程就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

《怎么解析ActiveMQ消息队列技术融合Spring过程.doc》

下载本文的Word格式文档,以方便收藏与打印。