activemq消息持久化方式(activemq和kafka区别)

2022-07-18,,,,

队列模式(点对点模式,p2p)特点:

1、客户端包括生产者和消费者;

2、队列中的消息只能被一个消费者消费;

3、消费者可以随时消费队列中的消息;

队列模式和主题模式的区别

1、提前订阅,队列模式:消费者不需要提前订阅也可以消费消息;主题模式:只有提前进行订阅的消费者才能成功消费消息;

2、多个消费者分配消息:队列模式:只能平均消费消息,被别的消费者消费的消息不能重复被其他的消费者消费;主题模式:每个订阅者都可以消费主题模式中的每一条消息;

案例代码:

生产者:

import org.apache.activemq.activemqconnectionfactory;

import javax.jms.*;

public class activemqproducer {

    public static final string active_url = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final string queue_name = "queue01";

    public static void main(string[] args) throws jmsexception {
        //创建连接工厂 ,,按照定的url地址给定默认的用户名和密码
        activemqconnectionfactory activemqconnectionfactory = new activemqconnectionfactory(active_url);
        //通过连接工厂获取connection连接 并启动访问
        connection connection = activemqconnectionfactory.createconnection();
        connection.start();
        //创建会话session  需要两个参数,第一个事务,第二个签收
        session session = connection.createsession(false, session.auto_acknowledge);
        //创建目的地(选择是队列还是主题)
        queue queue = session.createqueue(queue_name);
        //创建消息的生产者
        messageproducer messageproducer = session.createproducer(queue);
        //通过使用消息生产者messageproducer生产3条消息发送到队列中
        for (int i = 1; i <= 7; i++) {
            //创建消息   一个字符串消息
            textmessage textmessage = session.createtextmessage("msg---->" + i);
            //通过messageproducer 发布消息
            messageproducer.send(textmessage);
        }
        //关闭资源
        messageproducer.close();
        session.close();
        connection.close();
        system.out.println("消息发送到mq成功");
    }

}

消费者1:

import org.apache.activemq.activemqconnectionfactory;

import javax.jms.*;

public class activemqconsumer {

    public static final string active_url = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final string queue_name="queue01";
    public static void main(string[] args) throws jmsexception {
        activemqconnectionfactory activemqconnectionfactory = new activemqconnectionfactory(active_url);
        //通过连接工厂获取connection连接 并启动访问
        connection connection = activemqconnectionfactory.createconnection();
        connection.start();
        //创建会话session  需要两个参数,第一个事务,第二个签收
        session session = connection.createsession(false, session.auto_acknowledge);
        //创建目的地(选择是队列还是主题)
        queue queue = session.createqueue(queue_name);
        //创建消息的消费者
        messageconsumer messageconsumer = session.createconsumer(queue);
        while (true){
            //从队列中获取消息  receive未设置最大时间 是阻塞的,
            textmessage textmessage = (textmessage) messageconsumer.receive();
            if (textmessage !=null){
                system.out.println("消费者接受到消息---->"+textmessage.gettext());
            }else {
                break;
            }
        }
        messageconsumer.close();
        session.close();
        connection.close();
    }

}

输出:

 info | successfully connected to tcp://192.168.1.17:61616
消费者接受到消息---->msg---->2
消费者接受到消息---->msg---->4
消费者接受到消息---->msg---->6

消费者2:

import org.apache.activemq.activemqconnectionfactory;

import javax.jms.*;
import java.io.ioexception;

public class activemqconsumerlistener {

    public static final string active_url = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final string queue_name = "queue01";

    public static void main(string[] args) throws jmsexception, ioexception {
        activemqconnectionfactory activemqconnectionfactory = new activemqconnectionfactory(active_url);
        //通过连接工厂获取connection连接 并启动访问
        connection connection = activemqconnectionfactory.createconnection();
        connection.start();
        //创建会话session  需要两个参数,第一个事务,第二个签收
        session session = connection.createsession(false, session.auto_acknowledge);
        //创建目的地(选择是队列还是主题)
        queue queue = session.createqueue(queue_name);
        //创建消息的消费者
        messageconsumer messageconsumer = session.createconsumer(queue);
        //通过监听的机制消费消息
        messageconsumer.setmessagelistener((message) -> {
            if (message != null && message instanceof textmessage) {
                textmessage textmessage = (textmessage) message;
                try {
                    system.out.println("消费者接受到消息---->" + textmessage.gettext());
                } catch (jmsexception e) {
                    e.printstacktrace();
                }
            }
        });
        //不关闭控制台  如果不加这句话,在下面可能在连接的时候直接关闭了,造成无法消费的问题
        system.in.read();
        messageconsumer.close();
        session.close();
        connection.close();
    }

}

输出:

 info | successfully connected to tcp://192.168.1.17:61616
消费者接受到消息---->msg---->1
消费者接受到消息---->msg---->3
消费者接受到消息---->msg---->5
消费者接受到消息---->msg---->7

number of consumers:表示消费者数量;

number of pending messages:等待消费的消息,这个是当前未出队列的数量;

messages enqueued:进入队列的消息;( 这个数量只增不减,重启后会清零);

messages dequeued:出了队列的消息 可以理解为是消费者消费掉的数量 (重启后会清零);

持久化案例代码:

activemq持久化,生产者产生的数据,在没有被消费者消费时,先保存到数据库中,当数据被消费者消费后,再从数据库中删除。

生产者:

import org.apache.activemq.activemqconnectionfactory;

import javax.jms.*;

public class activemqproducer {

    public static final string active_url = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final string queue_name = "queue02";

    public static void main(string[] args) throws jmsexception {
        //创建连接工厂 ,,按照定的url地址给定默认的用户名和密码
        activemqconnectionfactory activemqconnectionfactory = new activemqconnectionfactory(active_url);
        //通过连接工厂获取connection连接 并启动访问
        connection connection = activemqconnectionfactory.createconnection();
        connection.start();
        //创建会话session  需要两个参数,第一个事务,第二个签收
        session session = connection.createsession(false, session.auto_acknowledge);
        //创建目的地(选择是队列还是主题)
        queue queue = session.createqueue(queue_name);
        //创建消息的生产者
        messageproducer messageproducer = session.createproducer(queue);
        // 消息持久化
        messageproducer.setdeliverymode(deliverymode.persistent);
        //通过使用消息生产者messageproducer生产3条消息发送到队列中
        for (int i = 1; i <= 7; i++) {
            //创建消息   一个字符串消息
            textmessage textmessage = session.createtextmessage("msg---->" + i);
            //通过messageproducer 发布消息
            messageproducer.send(textmessage);
        }
        //关闭资源
        messageproducer.close();
        session.close();
        connection.close();
        system.out.println("消息发送到mq成功");
    }

}

代码:
messageproducer.setdeliverymode(deliverymode.persistent);

消费者:

import org.apache.activemq.activemqconnectionfactory;

import javax.jms.*;

public class activemqconsumer {

    public static final string active_url = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final string queue_name="queue02";
    public static void main(string[] args) throws jmsexception {
        activemqconnectionfactory activemqconnectionfactory = new activemqconnectionfactory(active_url);
        //通过连接工厂获取connection连接 并启动访问
        connection connection = activemqconnectionfactory.createconnection();
        connection.setclientid("client-queue02-01");
        connection.start();
        //创建会话session  需要两个参数,第一个事务,第二个签收
        session session = connection.createsession(false, session.auto_acknowledge);
        //创建目的地(选择是队列还是主题)
        queue queue = session.createqueue(queue_name);
        //创建消息的消费者
        messageconsumer messageconsumer = session.createconsumer(queue);
        while (true){
            //从队列中获取消息  receive未设置最大时间 是阻塞的,
            textmessage textmessage = (textmessage) messageconsumer.receive();
            if (textmessage !=null){
                system.out.println("消费者接受到消息---->"+textmessage.gettext());
            }else {
                break;
            }
        }
        messageconsumer.close();
        session.close();
        connection.close();
    }

}

测试:

1、先运行生产者,activemqproducer

2、查看数据库:

3、在运行消费者,activemqconsumer,输出:

 info | successfully connected to tcp://192.168.1.17:61616
消费者接受到消息---->msg---->1
消费者接受到消息---->msg---->2
消费者接受到消息---->msg---->3
消费者接受到消息---->msg---->4
消费者接受到消息---->msg---->5
消费者接受到消息---->msg---->6
消费者接受到消息---->msg---->7

4、再次查看数据库,消息已删除。

《activemq消息持久化方式(activemq和kafka区别).doc》

下载本文的Word格式文档,以方便收藏与打印。