rabbitmq的结构
- 客户端:
- 生产者
- 消费者
- 核心组件
- 连接通道
长连接 connection
短连接 channel - 交换机(exchange)
- 队列(queue) 存储消息的容器组件,可以实现消息的持久化和队列的持久化
rabbitmq的安装
查看one note笔记(图文)
队列持久化
channel.queueDeclare(queueName, false, false,false,null);
由第二个参数决定,如果是false,在rabbitmq重启宕机后,队列就消失了,否则放在硬盘上,消息是否还存在同样需要设置持久化
rabbitmq的五种工作模式
确认机制
rabbitmq中对消费端逻辑保持一个确认机制,维护 消息的正常消费. 如果消费端需要在消费逻辑执行成功后才在rabbitmq删除消息,可以通过确认机制实现,这样不成功消费的消息依然可以二次获取
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
简单模式
常用场景:聊天工具(网页聊天窗口实现技术(websocket)
生产端:生成发送的消息数据,将消息发送给交换机
交换机:在每一个创建的用户绑定的virtualHost都对应了一批默认交换机,其中有一个特殊的路由交换机 (AMQP defaul) 名称是”“,可以根据后端队列的名称 进行绑定(自动绑定),可以匹配消息想要发送的路由key,来决定当前消息发送到哪个队列
消费者:监听获取队列queue中的消息,执行消费逻辑, 执行确认机制
获取连接
//获取连接
private Channel channel;
@Before
public void initChannel() throws Exception{
//提供ip port(5672) user password virtualHost名称
//构造一个连接工程
ConnectionFactory factory =new ConnectionFactory();
//从工厂获取连接
factory.setHost("10.42.141.101");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
//获取长连接
Connection conn=factory.newConnection();
//每次运行都单独获取一个短连接
channel=conn.createChannel();
}
创建队列,发送消息
@Test
public void sender() throws IOException{
//准备一个发送的消息
String msg="hello world rabbitmq";
//创建一个队列,根据自定义逻辑定义队列的各种属性
//queue:string 队列名称
//durable:boolean true队列有持久化,false没有持久化
//excusive:boolean 是否专属 true表示只有当前创建队列的连接Connection可以操作队列,false表示所有都可以操作
//autodelete:boolean 是否自动删除,当最后一个连接队列Channel对象关闭后队列消失
//arguments: map对象,配置队列各种属性
channel.queueDeclare("simpleq01", false , false, false, null);
//执行消费端的发送逻辑
//exchange:string 交换机名称
//routingKey:string 消息的目的地路由key,因为使用的是默认路由交换机,routingkey可以使用指定名称
//props:BasicProperties类型 表示一个消息的所有属性
//body:Byte数组 表示消息体内容
channel.basicPublish("", "simpleq01", null, msg.getBytes());
}
消费者
public void consum() throws Exception{
//创建rabbitmq消费对象
QueueingConsumer consu=new QueueingConsumer(channel);
//绑定到队列
//绑定队列名称
//是否自动确认 true消费端不考虑消费是否成功,考虑的是接受的速度,一旦获取消息,就返回确认
// false需要手动返回
//callback 消费者
channel.basicConsume("simpleq01",false,consu );
/*//绑定监听队列的消息(确认机制)
Delivery delivery=consu.nextDelivery();//每一次调用这个方法,都会从队列调消息
*/
//使用while true死循环获取消息,手动返回确认
//消费正确后,返回确认
while(true){
Delivery delivery=consu.nextDelivery();
System.out.println(new String(delivery.getBody()));
//消费逻辑完毕后的手动确认
//tag是用来表示不同消息封装对象的属性
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
工作模式(资源争抢)
常用场景:抢红包
通俗讲就是多个消费者连接同一个队列,获取队列里的消息
发送消息,创建队列
public void sender() throws IOException{
//准备一个发送的消息
String msg="hello work mode";
//创建队列
channel.queueDeclare("workq01", false, false, false, null);
//发送
channel.basicPublish("", "workq01", null, msg.getBytes());
}
消费者
两个消费者,都连接work01队列,两者都有可能获得消息,看谁资源更闲置
public void consum01() throws Exception{
QueueingConsumer consu=new QueueingConsumer(channel);
channel.basicConsume("workq01",false,consu );
while(true){
Delivery delivery=consu.nextDelivery();
System.out.println(new String(delivery.getBody()));
//消费逻辑完毕后的手动确认
//tag是用来表示不同消息封装对象的属性
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
public void consum02() throws Exception{
QueueingConsumer consu=new QueueingConsumer(channel);
channel.basicConsume("workq01",false,consu );
while(true){
Delivery delivery=consu.nextDelivery();
System.out.println(new String(delivery.getBody()));
//消费逻辑完毕后的手动确认
//tag是用来表示不同消息封装对象的属性
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
发布订阅模式(fanout)
常用场景:群发(广告,消息)
不能再使用默认的交换机,自定义交换机,交换机类型为fanout,接收到消息后,会把消息同步复制给一批绑定相同路由key的队列
多个队列,交换机将消息发布到多个队列
发送消息,声明交换机
发送消息后,将消息发送到了所有绑定ex交换机的队列里,队列所属客户端一上线就能打印出消息
private static final String type="fanout";
private static final String q01=type+"q01";
private static final String q02=type+"q02";
private static final String ex=type+"ex";
@Test
public void sender() throws IOException{
//准备一个发送的消息
String msg="hello fanout";
//声明交换机 交换机名称,类型
//队列的声明,交换机的声明,有则直接使用忽略声明,无则创建
channel.exchangeDeclare(ex, type);
//发送消息,最常用的发布订阅路由key
channel.basicPublish(ex, "", null, msg.getBytes());
}
消费者
声明队列,将队列绑定到ex交换机,”“为路由key
public void consum01() throws Exception{
QueueingConsumer consu=new QueueingConsumer(channel);
//声明队列
channel.queueDeclare(q01,false,false,false,null);
//绑定队列q01到fanoutex,""为路由key
channel.queueBind(q01, ex, "");
channel.basicConsume(q01,false,consu );
while(true){
Delivery delivery=consu.nextDelivery();
System.out.println(new String(delivery.getBody()));
//消费逻辑完毕后的手动确认
//tag是用来表示不同消息封装对象的属性
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
public void consum02() throws Exception{
QueueingConsumer consu=new QueueingConsumer(channel);
//声明队列
channel.queueDeclare(q02,false,false,false,null);
//绑定队列q01到fanoutex,""为路由key
channel.queueBind(q02, ex, "");
channel.basicConsume(q02,false,consu );
while(true){
Delivery delivery=consu.nextDelivery();
System.out.println(new String(delivery.getBody()));
//消费逻辑完毕后的手动确认
//tag是用来表示不同消息封装对象的属性
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
路由模式
将消息携带具体的路由key,后端的多个队列绑定不同的路由key完成在交换机中判断的逻辑.
生产端发送消息,携带具体路由key交换机接收到消息,根据后端队列绑定当前交换机使用的路由key判断消息到底发送给哪些队列进行消费执行.
声明交换机,发送消息
private static final String type="direct";
private static final String q01=type+"q01";
private static final String q02=type+"q02";
private static final String ex=type+"ex";
public void sender() throws IOException{
//准备一个发送的消息
String msg="hello direct";
//声明交换机 交换机名称,类型
//队列的声明,交换机的声明,有则直接使用忽略声明,无则创建
channel.exchangeDeclare(ex, type);
//发送消息,最常用的发布订阅路由key
channel.basicPublish(ex, "北京", null, msg.getBytes());
}
消费者
绑定队列时,路由key设置为北京,可以接收到发送时路由key为北京的消息
public void consum01() throws Exception{
QueueingConsumer consu=new QueueingConsumer(channel);
//声明队列
channel.queueDeclare(q01,false,false,false,null);
//绑定队列q01到fanoutex,""为路由key
channel.queueBind(q01, ex, "北京");
channel.basicConsume(q01,false,consu );
while(true){
Delivery delivery=consu.nextDelivery();
System.out.println(new String(delivery.getBody()));
//消费逻辑完毕后的手动确认
//tag是用来表示不同消息封装对象的属性
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
主题模式
使用场景:多级的中继消费逻辑使用topic主题实现不同的消费服务器做同一种类同一批次的匹配发送
和路由极其类似,实现多级消息传递的方式之一.可以通过范围的匹配,实现消息在多级传递时发送到中继的消费逻辑.
#:表示匹配任意长度的任意内容的字符串 *:表示匹配没有特殊符号的一个任意长度的字符串
声明交换机发送消息
public void sender() throws IOException{
//准备一个发送的消息
String msg="hello topic";
//声明交换机 交换机名称,类型
//队列的声明,交换机的声明,有则直接使用忽略声明,无则创建
channel.exchangeDeclare(ex, type);
//发送消息,最常用的发布订阅路由key
channel.basicPublish(ex, "中国.广东.深圳", null, msg.getBytes());
}
消费者
绑定交换机路由key是中国.,可以匹配到上面的信息,队列可以接收到信息
public void consum01() throws Exception{
QueueingConsumer consu=new QueueingConsumer(channel);
//声明队列
channel.queueDeclare(q01,false,false,false,null);
//绑定队列q01到fanoutex,""为路由key
channel.queueBind(q01, ex, "中国.#");
channel.basicConsume(q01,false,consu );
while(true){
Delivery delivery=consu.nextDelivery();
System.out.println(new String(delivery.getBody()));
//消费逻辑完毕后的手动确认
//tag是用来表示不同消息封装对象的属性
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}