miracle just wanna be better

rabbitmq

2019-10-12
miracle

rabbitmq的结构

  • 客户端:
  • 生产者
  • 消费者
  • 核心组件
  • 连接通道 长连接 connection
    短连接 channel
  • 交换机(exchange)
  • 队列(queue) 存储消息的容器组件,可以实现消息的持久化和队列的持久化

rabbitmq的安装

查看one note笔记(图文)

队列持久化

channel.queueDeclare(queueName, false, false,false,null);
由第二个参数决定,如果是false,在rabbitmq重启宕机后,队列就消失了,否则放在硬盘上,消息是否还存在同样需要设置持久化

rabbitmq的五种工作模式

确认机制

rabbitmq中对消费端逻辑保持一个确认机制,维护 消息的正常消费. 如果消费端需要在消费逻辑执行成功后才在rabbitmq删除消息,可以通过确认机制实现,这样不成功消费的消息依然可以二次获取

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

简单模式

常用场景:聊天工具(网页聊天窗口实现技术(websocket)

生产端:生成发送的消息数据,将消息发送给交换机
交换机:在每一个创建的用户绑定的virtualHost都对应了一批默认交换机,其中有一个特殊的路由交换机 (AMQP defaul) 名称是”“,可以根据后端队列的名称 进行绑定(自动绑定),可以匹配消息想要发送的路由key,来决定当前消息发送到哪个队列
消费者:监听获取队列queue中的消息,执行消费逻辑, 执行确认机制

获取连接

//获取连接
	private Channel channel;
	@Before
	public void initChannel() throws Exception{
		//提供ip port(5672) user password virtualHost名称
		//构造一个连接工程
		ConnectionFactory factory =new ConnectionFactory();
		//从工厂获取连接
		factory.setHost("10.42.141.101");
		factory.setPort(5672);
		factory.setUsername("guest");
		factory.setPassword("guest");
		factory.setVirtualHost("/");
		//获取长连接
		Connection conn=factory.newConnection();
		//每次运行都单独获取一个短连接
		channel=conn.createChannel();
	}

创建队列,发送消息

@Test
	public void sender() throws IOException{
	//准备一个发送的消息
	String msg="hello world rabbitmq";
	//创建一个队列,根据自定义逻辑定义队列的各种属性
	//queue:string 队列名称
	//durable:boolean true队列有持久化,false没有持久化
	//excusive:boolean 是否专属 true表示只有当前创建队列的连接Connection可以操作队列,false表示所有都可以操作
	//autodelete:boolean 是否自动删除,当最后一个连接队列Channel对象关闭后队列消失
	//arguments: map对象,配置队列各种属性
	channel.queueDeclare("simpleq01", false	, false, false, null);
	//执行消费端的发送逻辑
	//exchange:string 交换机名称
	//routingKey:string 消息的目的地路由key,因为使用的是默认路由交换机,routingkey可以使用指定名称
	//props:BasicProperties类型 表示一个消息的所有属性
	//body:Byte数组 表示消息体内容
	channel.basicPublish("", "simpleq01", null, msg.getBytes());
}

消费者

public void consum() throws Exception{
	//创建rabbitmq消费对象
	QueueingConsumer consu=new QueueingConsumer(channel);
	//绑定到队列
	//绑定队列名称
	//是否自动确认 true消费端不考虑消费是否成功,考虑的是接受的速度,一旦获取消息,就返回确认
	//		false需要手动返回
	//callback 消费者
	channel.basicConsume("simpleq01",false,consu );
	/*//绑定监听队列的消息(确认机制)
	Delivery delivery=consu.nextDelivery();//每一次调用这个方法,都会从队列调消息
	*/
	//使用while true死循环获取消息,手动返回确认
	//消费正确后,返回确认
	while(true){
		Delivery delivery=consu.nextDelivery();
		System.out.println(new String(delivery.getBody()));
		//消费逻辑完毕后的手动确认
		//tag是用来表示不同消息封装对象的属性
		channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
	}
}

工作模式(资源争抢)

常用场景:抢红包

通俗讲就是多个消费者连接同一个队列,获取队列里的消息

发送消息,创建队列

public void sender() throws IOException{
		//准备一个发送的消息
		String msg="hello work mode";
		//创建队列
		channel.queueDeclare("workq01", false, false, false, null);
		//发送
		channel.basicPublish("", "workq01", null, msg.getBytes());
	}

消费者

两个消费者,都连接work01队列,两者都有可能获得消息,看谁资源更闲置

public void consum01() throws Exception{
		
		QueueingConsumer consu=new QueueingConsumer(channel);
		
		channel.basicConsume("workq01",false,consu );
		
		while(true){
			Delivery delivery=consu.nextDelivery();
			System.out.println(new String(delivery.getBody()));
			//消费逻辑完毕后的手动确认
			//tag是用来表示不同消息封装对象的属性
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
		}
	}
public void consum02() throws Exception{
		
		QueueingConsumer consu=new QueueingConsumer(channel);
		
		channel.basicConsume("workq01",false,consu );
		
		while(true){
			Delivery delivery=consu.nextDelivery();
			System.out.println(new String(delivery.getBody()));
			//消费逻辑完毕后的手动确认
			//tag是用来表示不同消息封装对象的属性
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
		}
	}
}
	

发布订阅模式(fanout)

常用场景:群发(广告,消息) 不能再使用默认的交换机,自定义交换机,交换机类型为fanout,接收到消息后,会把消息同步复制给一批绑定相同路由key的队列
多个队列,交换机将消息发布到多个队列

发送消息,声明交换机

发送消息后,将消息发送到了所有绑定ex交换机的队列里,队列所属客户端一上线就能打印出消息

private static final String type="fanout";
	private static final String q01=type+"q01";
	private static final String q02=type+"q02";
	private static final String ex=type+"ex";
	@Test
	public void sender() throws IOException{
		//准备一个发送的消息
		String msg="hello fanout";
		//声明交换机 交换机名称,类型
		//队列的声明,交换机的声明,有则直接使用忽略声明,无则创建
		channel.exchangeDeclare(ex, type);
		//发送消息,最常用的发布订阅路由key
		channel.basicPublish(ex, "", null, msg.getBytes());		
	}

消费者

声明队列,将队列绑定到ex交换机,”“为路由key

public void consum01() throws Exception{
		
		QueueingConsumer consu=new QueueingConsumer(channel);
		//声明队列
		channel.queueDeclare(q01,false,false,false,null);
		//绑定队列q01到fanoutex,""为路由key
		channel.queueBind(q01, ex, "");
		channel.basicConsume(q01,false,consu );
		
		while(true){
			Delivery delivery=consu.nextDelivery();
			System.out.println(new String(delivery.getBody()));
			//消费逻辑完毕后的手动确认
			//tag是用来表示不同消息封装对象的属性
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
		}
	}
public void consum02() throws Exception{
		QueueingConsumer consu=new QueueingConsumer(channel);
		//声明队列
		channel.queueDeclare(q02,false,false,false,null);
		//绑定队列q01到fanoutex,""为路由key
		channel.queueBind(q02, ex, "");
		channel.basicConsume(q02,false,consu );
		
		while(true){
			Delivery delivery=consu.nextDelivery();
			System.out.println(new String(delivery.getBody()));
			//消费逻辑完毕后的手动确认
			//tag是用来表示不同消息封装对象的属性
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
		}
	}

路由模式

将消息携带具体的路由key,后端的多个队列绑定不同的路由key完成在交换机中判断的逻辑.

生产端发送消息,携带具体路由key交换机接收到消息,根据后端队列绑定当前交换机使用的路由key判断消息到底发送给哪些队列进行消费执行.

声明交换机,发送消息

	private static final String type="direct";
	private static final String q01=type+"q01";
	private static final String q02=type+"q02";
	private static final String ex=type+"ex";
	
	public void sender() throws IOException{
		//准备一个发送的消息
		String msg="hello direct";
		//声明交换机 交换机名称,类型
		//队列的声明,交换机的声明,有则直接使用忽略声明,无则创建
		channel.exchangeDeclare(ex, type);
		//发送消息,最常用的发布订阅路由key
		channel.basicPublish(ex, "北京", null, msg.getBytes());
	}

消费者

绑定队列时,路由key设置为北京,可以接收到发送时路由key为北京的消息

public void consum01() throws Exception{
		
		QueueingConsumer consu=new QueueingConsumer(channel);
		//声明队列
		channel.queueDeclare(q01,false,false,false,null);
		//绑定队列q01到fanoutex,""为路由key
		channel.queueBind(q01, ex, "北京");
		channel.basicConsume(q01,false,consu );
		
		while(true){
			Delivery delivery=consu.nextDelivery();
			System.out.println(new String(delivery.getBody()));
			//消费逻辑完毕后的手动确认
			//tag是用来表示不同消息封装对象的属性
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
		}
	}

主题模式

使用场景:多级的中继消费逻辑使用topic主题实现不同的消费服务器做同一种类同一批次的匹配发送

和路由极其类似,实现多级消息传递的方式之一.可以通过范围的匹配,实现消息在多级传递时发送到中继的消费逻辑.

#:表示匹配任意长度的任意内容的字符串 *:表示匹配没有特殊符号的一个任意长度的字符串

声明交换机发送消息

	public void sender() throws IOException{
		//准备一个发送的消息
		String msg="hello topic";
		//声明交换机 交换机名称,类型
		//队列的声明,交换机的声明,有则直接使用忽略声明,无则创建
		channel.exchangeDeclare(ex, type);
		//发送消息,最常用的发布订阅路由key
		channel.basicPublish(ex, "中国.广东.深圳", null, msg.getBytes());	
	}

消费者

绑定交换机路由key是中国.,可以匹配到上面的信息,队列可以接收到信息

public void consum01() throws Exception{
		
		QueueingConsumer consu=new QueueingConsumer(channel);
		//声明队列
		channel.queueDeclare(q01,false,false,false,null);
		//绑定队列q01到fanoutex,""为路由key
		channel.queueBind(q01, ex, "中国.#");
		channel.basicConsume(q01,false,consu );
		
		while(true){
			Delivery delivery=consu.nextDelivery();
			System.out.println(new String(delivery.getBody()));
			//消费逻辑完毕后的手动确认
			//tag是用来表示不同消息封装对象的属性
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
		}
	}

上一篇 elasticsearch

下一篇 NIO&Concurrent

Comments

Content