消息队列 RabbitMQ -- 交换器类型与消息持久化

Author Avatar
anorz 1月 04, 2018
  • 在其它设备中阅读本文章

四种交换器类型 direct、fanout、topic、headers

四种交换器类型

上章中说到,AMQP消息路由必须有三个部分: 交换器、队列、绑定。
生产者把消息发布到交换器上;消息最终到达队列,并被消费者解释;绑定决定了消息如何从路由器 路由到特定队列。
rabbitmq 服务器会根据路由键将消息从交换器路由到列队,但是它是如何处理投递到多个队列的情况呢?
协议中定义的不同类型交换器发挥了作用。
一共四种类型: direct、fanout、topic、headers.

direct

DirectExchange是RabbitMQ Broker的默认Exchange.
果然路由键匹配的话,消息就被投递到对应的队列。
当声明一个队列时,它会自动绑定到默认交换器,并以 队列名 作为路由键。
fanout

FanoutExchange 
当你发送一条消息到fanout交换器时,它会把消息投递给所有附加在此交换器上的所有队列。
topic

TopicExchange 
它使得来自不同源头的消息能够到达同一个队列。

 

生产者创建消息(消息包含两部分: 有效载荷[payload] 和 标签[label]),然后发布(Publish) 到代理服务器(RabbitMQ)

消费者连接到代理服务器,并订阅到队列上。当消费者接受到消息时,它只得到了消息的一部分:有效载荷。在消息路由过程中,消息的标签并没有随有效载荷一同传递。
如果需要明确知道谁生产的AMQP消息的话,就要看生产者是否把消息方信息放入有效载荷中。

AMQP栈


AMQP消息路由必须有三个部分: 交换器、队列、绑定。
生产者把消息发布到交换器上;消息最终到达队列,并被消费者解释;绑定决定了消息如何从路由器 路由到特定队列。

消费者和生产者到底谁去创建队列?

如果你不能承担得起消息进入“黑洞”而丢失的话,你的生产者和消费者就都应该尝试去创建队列。

生产者创建消息
import "github.com/streadway/amqp"
// step 1. 创建 connection
conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
// step 2. 获取 (信道)channel
channel, err := conn.Channel()
// step 3. 在信道上声明交换器 exchange
channel.ExchangeDeclare(
"exchange_name", // exchange name
amqp.ExchangeDirect, // exchange type
false, // durable
false, // autoDelete
false, // internal
false, // noWait
nil, // args amqp.Table
)
// step 4. 声明队列
queue, err := ch.QueueDeclare(
"queue_name", // queue name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments amqp.Table
)
// step 5. 将队列绑定根据路由键绑定到交换器上
channel.QueueBind(
queue.Name, // queue name
"route_key", // route key
"exchange_name", // exchange name
false, // no-wait
nil, // arguments amqp.Table
)
// step 6. 将消息发送到交换器上,交换器会根据路由键将消息发送到对应的队列queue
channel.Publish(
"exchange_name", // exchange
"route_key", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{ // message
ContentType: "text/plain",
Body: []byte("hello world"),
})
消费者订阅消息
   import "github.com/streadway/amqp"
// step 1. 创建 connection
conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
// step 2. 获取 (信道)channel
channel, err := conn.Channel()
// step 3. 在信道上声明交换器 exchange
channel.ExchangeDeclare(
"exchange_name", // exchange name
amqp.ExchangeDirect, // exchange type
false, // durable
false, // autoDelete
false, // internal
false, // noWait
nil, // args amqp.Table
)
// step 4. 声明队列
queue, err := ch.QueueDeclare(
"queue_name", // queue name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments amqp.Table
)
// step 5. 将队列绑定根据路由键绑定到交换器上
channel.QueueBind(
queue.Name, // queue name
"route_key", // route key
"exchange_name", // exchange name
false, // no-wait
nil, // arguments amqp.Table
)
forever := make(chan bool)
// step 6. 在信道上订阅队列
messages, err := channel.Consume(
queue.Name, // queue
"", // consumer
false, // autoAck
false, // exclusive
false, // noLocal :The noLocal flag is not supported by RabbitMQ.
false, // no-wait
nil, // arguments amqp.Table
)
// 开启一个 goruntine 获取消息内容
go func() {
for message := range messages {
log.Printf("Received a message: %s", message.Body)
message.Ack(true)
}
}()
fmt.Println("Please ctrl+c to stop")
<-forever