RabbitMq的优势

  1. AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 下面将重点介绍RabbitMQ中的一些基础概念,了解了这些概念,是使用好RabbitMQ的基础–转发!!!
  2. 应该说为什么要使用Mq这种技术,即消息。因为可处理大多高并发,或者一些我们可以不用关心的业务,这句话很矛盾,举个例子,用户注册完后,我们给个友好的提示,发个邮件或者短信,等等,我们不需要知道他是否发送成功,我们只需要发个消息就行。

搭建Rabbitmq

  1. 这里建议使用docker,docker牛皮

  2. 直接上代码

    1
    docker search rabbitmq:management
    1
    docker pull rabbitmq:management
    1
    docker run -d --name=my_rabbitmq --hostname=myrabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
  3. 执行完上面的命令,docker ps就能看到mq的进程,具体为什么,自己百度,唯一提的点就是,rabbitmq的版本要下载management这个标签的,这个是带后台页面的。

  4. ip:15672就可以到后台,默认账号密码guest

Java的应用

废话不多说,根据生产者和消费者的概念,我们结合rabbitMq+spring boot来理解

  1. 队列的配置,队列连接着交换机,发消息到交换机,交换机通过某种匹配的规则给队列。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    @Configuration
    public class QueueConfig {

    /**
    * @Description: 实例化队列,并且进行配置
    * @Param: []
    * @return: org.springframework.amqp.core.Queue
    * @Author: zhouyulin
    * @Date: 2019/11/12
    */
    @Bean
    public Queue firstQueue() {

    //第一参数名字,第二个是否持久化,第三个是否只生效于本次链接,第四个是否自动删除消息
    Map<String, Object> map = new HashMap<>();
    map.put("x-message-ttl", 5000);
    return new Queue("first-queue", true,
    false, false, map);
    }

    /**
    * @Description: 同上
    * @Param: []
    * @return: org.springframework.amqp.core.Queue
    * @Author: zhouyulin
    * @Date: 2019/11/12
    */
    @Bean
    public Queue secondQueue() {
    Map<String, Object> map = new HashMap<>();
    map.put("x-message-ttl", 5000);
    return new Queue("second-queue",
    true, false, false, map);
    }
    }
  2. 交换机的配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    /**
    * @Description: 交换机Exchange配置
    * @Author: zhouyulin
    * @Date: 2019/11/12
    */
    @Configuration
    public class ExchangeConfig {

    /**
    * @Description: 交换机的配置
    * @Param: []
    * @return: org.springframework.amqp.core.DirectExchange
    * @Author: zhouyulin
    * @Date: 2019/11/12
    */
    @Bean
    public DirectExchange directExchange(){
    /**
    * public DirectExchange(String name, boolean durable, boolean autoDelete)
    * 顾名思义,第一个参数是Exchange名字,第二个是否持久化,第三个是否自动删除
    */
    DirectExchange directExchange = new DirectExchange(RabbitMqConfig.EXCHANGE, true, false);
    return directExchange;
    }
    }
  3. RabbitMq连接或者RabbitMq自身的 主要配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    @Configuration
    @Slf4j
    public class RabbitMqConfig {

    public static final String EXCHANGE = "exchangeTest";
    //routingkey设置,作用是设置队列的路由,交换机匹配拦截
    public static final String ROUTINGKEY1 = "queue_one_key1";
    public static final String ROUTINGKEY2 = "queue_one_key2";

    // 配置类
    @Autowired
    private ExchangeConfig exchangeConfig;
    @Autowired
    private QueueConfig queueConfig;

    // 连接工厂
    @Autowired
    private ConnectionFactory connectionFactory;

    //交换机要绑定队列
    @Bean
    public Binding builder_one() {
    //binding编辑,绑定一个队列,绑定到到哪个交换机,交换机的名字
    return BindingBuilder
    .bind(queueConfig.firstQueue())
    .to(exchangeConfig.directExchange())
    .with(RabbitMqConfig.ROUTINGKEY1);
    }

    //交换机要绑定队列
    @Bean
    public Binding builder_tow() {
    //binding编辑,绑定一个队列,绑定到到哪个交换机,绑定哪个路由
    return BindingBuilder
    .bind(queueConfig.secondQueue())
    .to(exchangeConfig.directExchange())
    .with(RabbitMqConfig.ROUTINGKEY2);
    }

    /**
    * @Description: 监听者,暂时还不清楚什么用
    * @Param: []
    * @return: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
    * @Author: zhouyulin
    * @Date: 2019/11/12
    */
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer_one() {
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
    //监听哪个队列?
    simpleMessageListenerContainer.addQueues(queueConfig.firstQueue(),queueConfig.secondQueue());
    //暴露channel???
    simpleMessageListenerContainer.setExposeListenerChannel(true);
    //设置并发消费者数量
    simpleMessageListenerContainer.setConcurrentConsumers(10);
    //最大创建消费者数量
    simpleMessageListenerContainer.setMaxConcurrentConsumers(15);
    //设置确认模式为手工确认
    simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
    //设置确认信息
    // simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
    // @Override
    // public void onMessage(Message message, Channel channel) throws Exception {
    // /**通过basic.qos方法设置prefetch_count=1,这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message,
    // 换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它 */
    // channel.basicQos(1);
    // byte[] body = message.getBody();
    // log.info("接收处理队列A当中的消息:" +new String(body));
    // /**为了保证永远不会丢失消息,RabbitMQ支持消息应答机制。
    // 当消费者接收到消息并完成任务后会往RabbitMQ服务器发送一条确认的命令,然后RabbitMQ才会将消息删除。*/
    // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    // }
    // });

    return simpleMessageListenerContainer;
    }

    /**
    * @Description: 设置rabbitmq模板属性
    * @Param: []
    * @return: org.springframework.amqp.rabbit.core.RabbitTemplate
    * @Author: zhouyulin
    * @Date: 2019/11/12
    */
    @Bean
    public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    //设置成功返回
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
    }
    });
    //设置失败消息返回
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
    }
    });

    return rabbitTemplate;
    }
    }
  4. 发送者,就是发送消息的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    /**
    * @Description: 发送者
    * @Author: zhouyulin
    * @Date: 2019/11/12
    */
    @Component
    public class FirstSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
    * @Description: 发送消息
    * @Param: [uuid, msg]
    * @return: void
    * @Author: zhouyulin
    * @Date: 2019/11/12
    */
    public void sendFirst(String uuid,Object msg){
    //发布服务的一个状态吧。。。
    CorrelationData correlationData = new CorrelationData(uuid);

    rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY1, msg, correlationData);
    }

    public void sendSecond(String uuid,Object msg){
    //发布服务的一个状态吧。。。
    CorrelationData correlationData = new CorrelationData(uuid);

    rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY2, msg, correlationData);
    }
    }
  5. 消费者,接收消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    @Component
    @Slf4j
    public class FirstConsumer {

    @Autowired
    private UserMapper userMapper;



    @RabbitListener(queues = {"first-queue"})
    public void handleMessage(String message) throws Exception {
    // 处理消息
    User user = new User();
    user.setName("张三"+message);
    user.setAge(message);
    user.setIsGay("1");
    userMapper.insert(user);
    log.info("FirstConsumer {} handleMessage",user);
    }

    @RabbitListener(queues = {"first-queue"})
    public void handleMessage2(String message) throws Exception {
    // 处理消息
    User user = new User();
    user.setName("张三"+message);
    user.setAge(message);
    user.setIsGay("1");
    userMapper.insert(user);
    log.info("FirstConsumer2 {} handleMessage",user);
    }

    @RabbitListener(queues = {"first-queue"})
    public void handleMessage3(String message) throws Exception {
    // 处理消息
    User user = new User();
    user.setName("张三"+message);
    user.setAge(message);
    user.setIsGay("1");
    userMapper.insert(user);
    log.info("FirstConsumer3 {} handleMessage",user);
    }

    @RabbitListener(queues = {"first-queue"})
    public void handleMessage4(String message) throws Exception {
    // 处理消息
    User user = new User();
    user.setName("张三"+message);
    user.setAge(message);
    user.setIsGay("1");
    userMapper.insert(user);
    log.info("FirstConsumer4 {} handleMessage",user);
    }

    @RabbitListener(queues = {"first-queue"})
    public void handleMessage5(String message) throws Exception {
    // 处理消息
    User user = new User();
    user.setName("张三"+message);
    user.setAge(message);
    user.setIsGay("1");
    userMapper.insert(user);
    log.info("FirstConsumer5 {} handleMessage",user);
    }

    @RabbitListener(queues = {"first-queue"})
    public void handleMessage6(String message) throws Exception {
    // 处理消息
    User user = new User();
    user.setName("张三"+message);
    user.setAge(message);
    user.setIsGay("1");
    userMapper.insert(user);
    log.info("FirstConsumer6 {} handleMessage",user);
    }

    @RabbitListener(queues = {"first-queue"})
    public void handleMessage7(String message) throws Exception {
    // 处理消息
    User user = new User();
    user.setName("张三"+message);
    user.setAge(message);
    user.setIsGay("1");
    userMapper.insert(user);
    log.info("FirstConsumer7 {} handleMessage",user);
    }

    @RabbitListener(queues = {"first-queue"})
    public void handleMessage8(String message) throws Exception {
    // 处理消息
    User user = new User();
    user.setName("张三"+message);
    user.setAge(message);
    user.setIsGay("1");
    userMapper.insert(user);
    log.info("FirstConsumer8 {} handleMessage",user);
    }

    @RabbitListener(queues = {"first-queue"})
    public void handleMessage9(String message) throws Exception {
    // 处理消息
    User user = new User();
    user.setName("张三"+message);
    user.setAge(message);
    user.setIsGay("1");
    userMapper.insert(user);
    log.info("FirstConsumer9 {} handleMessage",user);
    }

    @RabbitListener(queues = {"first-queue"})
    public void handleMessage10(String message) throws Exception {
    // 处理消息
    User user = new User();
    user.setName("张三"+message);
    user.setAge(message);
    user.setIsGay("1");
    userMapper.insert(user);
    log.info("FirstConsumer10 {} handleMessage",user);
    }

    @RabbitListener(queues = {"second-queue"})
    public void handleMessageSecond(String message) throws Exception {
    // 处理消息
    User user = new User();
    user.setName("张三"+message);
    user.setAge(message);
    user.setIsGay("1");
    userMapper.insert(user);
    log.info("SecondConsumer {} handleMessage",user);
    }

    }
  6. 代码看着基本就差不多可以理解,就是先配置,队列,交换机,消费者,生产者大消息就ok了

  7. 注意,生产者发的消息阔以是实体,但是需要序列化,就是把对象转化为二进制,这个自己百度,我就8多说了。