前言
本文说明
入门使用
在使用之前先看一下rabbitMQ-client的使用
先引入依赖:
1 2 3 4 5
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.0</version> </dependency>
|
在看代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public void product() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("P [x] Sent '" + message + "'"); channel.close(); connection.close(); }
public void consumer() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("C [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("C [x] Received '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); }
|
代码的注释很详细
SpringBoot中的使用
引入依赖
1 2 3 4 5 6
| <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
|
配置文件
1 2 3 4 5 6 7 8 9
| spring: rabbitmq: host: localhost port: 5672 username: guest password: guest output: ansi: enabled: always
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Component public class Product { @Autowired private AmqpTemplate rabbitTemplate;
public void send() { String context = "hello " + new Date(); System.out.println("生产者发送信息 : " + context);
new Queue("hello"); this.rabbitTemplate.convertAndSend("hello", context); } }
|
创建消息生产者Product。通过注入AmqpTemplate接口的实例来实现消息的发送,AmqpTemplate接口定义了一套针对AMQP协议的基础操作。在Spring Boot中会根据配置来注入其具体实现。在该生产者,我们会产生一个字符串,并发送到名为hello的队列中
消费者
1 2 3 4 5 6 7 8
| @Component @RabbitListener(queues = "hello") public class Consumer { @RabbitHandler public void process(String hello) { System.out.println("消费者接受信息 : " + hello); } }
|
创建消息消费者Consumer。通过@RabbitListener注解定义该类对hello队列的监听,并用@RabbitHandler注解来指定对消息的处理方法。所以,该消费者实现了对hello队列的消费,消费操作为输出消息的字符串内容。
测试类
1 2 3 4 5 6 7 8 9 10 11 12 13
| @RunWith(SpringRunner.class) @SpringBootTest public class JoylauSpringBootRabbitmqApplicationTests {
@Autowired private Product product;
@Test public void test() throws Exception { product.send(); }
}
|
再来一张图
exchange 多个消费者
当Exchange和RoutingKey相同、queue不同时,所有消费者都能消费同样的信息
Exchange和RoutingKey、queue都相同时,消费者中只有一个能消费信息,其他消费者都不能消费该信息。
下面示例的队列名称可以随意写个,启动时 @RabbitListener 的 bindings 会自动使用 key 绑定队列到exchange
1 2 3 4 5 6 7 8 9 10
| @RabbitHandler @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${spring.application.name}"), exchange = @Exchange(value = "${spring.rabbitmq.template.exchange}"), key = "${spring.rabbitmq.template.routing-key}") ) public void listenerTrafficMessage(Message message){ System.out.println(message.getClass().getName()); }
|
消息返回队列
需要处理完消息后在将消息返回队列的话需要配置 spring.rabbitmq.listener.simple.acknowledge-mode: manual
之后注解@RabbitListener 到方法上
Channel channel 进行返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @RabbitHandler @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${spring.application.name}"), exchange = @Exchange(value = "${spring.rabbitmq.template.exchange}"), key = "${spring.rabbitmq.template.routing-key}") ) public void listenerTrafficMessage(Message message, Channel channel){
System.out.println(message.getClass().getName());
try { channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); } catch (IOException e) { e.printStackTrace(); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| spring: rabbitmq: host: 192.168.10.224 port: 35672 username: guest password: guest virtual-host: / listener: simple: acknowledge-mode: manual concurrency: 1 max-concurrency: 1 prefetch: 1 template: exchange: SURVEY_CENTER routing-key: trafficCongestionSituationBD
|
在属性配置文件里面开启了ACK确认 所以如果代码没有执行ACK确认 你在RabbitMQ的后台会看到消息会一直留在队列里面未消费掉 只要程序一启动开始接受该队列消息的时候 又会收到
1 2 3
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
|
丢弃消息
1 2 3 4 5 6 7 8 9 10
| channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
|