@TOC
一.认识AMQP
1.1AMQP简介
AMQP,即Advanced Message Queuing Protocol,⼀个提供统⼀消息服务的应⽤层标准⾼级消息队列协议,是应⽤层协议的⼀个开放标准,为⾯向消息的中间件设计。基于此协议的客⼾端与消息中间件可传递消息,并不受客⼾端中间件不同产品,不同的开发语⾔等条件的限制。Erlang中的实现RabbitMQ等。 它规定了消息系统中三⼤组件⸺消息服务器/代理节点(server/broker)、⽣产者/发布者(producer/publisher)、消费者/订阅者(consumer/subscriber)之间的通信规范,以及代理节点的设计规范等。
1.2MQ核心要素
- Broker:消息代理服务器(⽐如RabbitMQ)
- 虚拟机(Virtual Host):虚拟主机,隔离环境
- 交换机(Exchange):
交换机是消息的分发中⼼,它接收⽣产者发送的消息并根据⼀定的规则将消息路由到⼀个或多个队列中。⽐较典型的交换机类型,包括:
直连交换机(Direct Exchange):根据消息的路由键将消息发送到特定队列。主题交换机(Topic Exchange):根据消息的路由键和通配符匹配将消息发送到多个队列。⼴播交换机(Fanout Exchange):将消息⼴播到与交换机绑定的所有队列。头交换机(Headers Exchange):根据消息的⾃定义头部属性进⾏匹配路由。 - 队列(Queue): 队列是消息的容器,它存储消息直到消费者准备好接收和处理它们。消息通过交换机路由到队列,消费者可以从队列中读取消息。每个队列都有⼀个名称,它们可以绑定到⼀个或多个交换机,并指定了消息的路由规则。
- Binding-Key: Binding-Key是交换机和队列之间的绑定键,⽤于定义消息的路由规则。在直连交换机中,Binding-Key通常与队列的路由键⼀致;在主题交换机中Binding-Key可以使⽤通配符进⾏匹配。
- ⽣产者(Producer): ⽣产者是消息的发送⽅。它们创建消息并将其发布到RabbitMQ的交换机上。生产者通常将消息发送到⼀个或多个队列,以便消费者可以订阅并处理这些消息。
- 消费者(Consumer): 消费者是消息的接收⽅,它们订阅队列并从中获取消息。⼀旦消费者接收到消息,它们可以对消息进⾏处理,例如执⾏某些任务或将数据存储到数据库中。
1.3从MQ的核心要素来了解下MQ的工作流程
直接交换工作流程
广播交换工作流程
主题交换与头部交换有兴趣的读者可以自行进行了解,因为我们本项目使用不到,所以这里不再介绍。
二.AMQP-CPP
2.1简介
AMQP-CPP是⼀个强⼤的开源库,实现了Advanced Message Queuing Protocol(AMQP)的客⼾端,允许使⽤C++轻松地与⽀持AMQP的消息中间件如RabbitMQ、Qpid等进⾏交互。该项⽬由Copernica Marketing Software开发并维护,旨在提供⾼效、稳定且易于使⽤的AMQP接⼝。 AMQP-CPP的项目地址为AMQP-CPP
2.2安装
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make && sudo make install
2.3接口
AMQP-CPP的使⽤有两种模式:
- 使⽤默认的TCP模块进⾏⽹络通信
- 使⽤扩展的libevent、libev、libuv、asio异步⽹络通信组件进⾏通信 我们这里使用libev进行网络通信。
2.3.1默认TCP模式
- 实现⼀个类继承⾃AMQP::TcpHandler类, 它负责⽹络层的TCP连接
- 重写相关函数, 其中必须重写monitor函数
- 在monitor函数中需要实现的是将fd放⼊eventloop(select、epoll)中监控, 当fd可写可读就绪之后,调⽤AMQP-CPP的connection->process(fd, flags)⽅法
#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>
class MyTcpHandler : public AMQP::TcpHandler
{
/**
*AMQP库在创建新连接时调⽤的⽅法
*与处理程序相关联。这是对处理程序的第⼀次调⽤
*@param connection附加到处理程序的连接
*/
virtual void onAttached(AMQP::TcpConnection *connection) override
{
//@todo
// 添加您⾃⼰的实现,例如初始化事物
// 以处理连接。
} /**
*当TCP连接时由AMQP库调⽤的⽅法
*已经建⽴。调⽤此⽅法后,库
*仍然需要设置可选的TLS层和
*在TCP层的顶部建⽴AMQP连接。,这种⽅法
*总是与稍后对onLost()的调⽤配对。
*@param connection现在可以使⽤的连接
*/
virtual void onConnected(AMQP::TcpConnection *connection) override
{
//@todo
// 添加您⾃⼰的实现(可能不需要)
} /**
*在建⽴安全TLS连接时调⽤的⽅法。
*这只对amqps://连接调⽤。它允许您检查连接是否⾜够安全,以满⾜您的喜好
*(例如,您可以检查服务器证书)。AMQP协议仍然需要启动。
*@param connection已被保护的连接
*@param ssl来⾃openssl库的ssl结构
*@return bool如果可以使⽤连接,则为True
*/
virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl)
override
{
//@todo
// 添加您⾃⼰的实现,例如读取证书并检查它是否确实是您的
return true;
} /**
*当登录尝试成功时由AMQP库调⽤的⽅法。在此之后,连接就可以使⽤了。
*@param connection现在可以使⽤的连接
*/
virtual void onReady(AMQP::TcpConnection *connection) override
{
//@todo
// 添加您⾃⼰的实现,例如通过创建⼀个通道实例,然后开始发布或使⽤
}
/**
*该⽅法在服务器尝试协商检测信号间隔时调⽤,
*并被覆盖以摆脱默认实现(否决建议的检测信号间隔),转⽽接受该间隔。
*@param connection发⽣错误的连接
*@param interval建议的间隔(秒)
*/
virtual uint16_t onNegotiate(AMQP::TcpConnection *connection, uint16_t
interval)
{
// 我们接受服务器的建议,但如果间隔⼩于⼀分钟,我们将使⽤⼀分钟的间隔
if (interval < 60)
interval = 60;
//@todo
// 在事件循环中设置⼀个计时器,
// 如果在这段时间内没有发送其他指令,
// 请确保每隔interval秒调⽤connection->heartbeat()。
// 返回我们要使⽤的间隔
return interval;
}
/**
*发⽣致命错误时由AMQP库调⽤的⽅法
例如,因为⽆法识别从RabbitMQ接收的数据,或者基础连接丢失。
此调⽤之后通常会调⽤onLost()(如果错误发⽣在TCP连接建⽴之后)和onDetached
()。
*@param connection发⽣错误的连接
*@param message⼀条⼈类可读的错误消息
*/
virtual void onError(AMQP::TcpConnection *connection, const char *message)
override
{
//@todo
// 添加您⾃⼰的实现,例如,通过向程序的⽤⼾报告错误并记录错误
}
/**
*该⽅法在AMQP协议结束时调⽤的⽅法。这是调⽤connection.close()以正常关闭连接
的计数器部分。请注意,TCP连接此时仍处于活动状态,您还将收到对onLost()和onDetached
()的调⽤
@param connection AMQP协议结束的连接
*/
virtual void onClosed(AMQP::TcpConnection *connection) override
{
//@todo
// 添加您⾃⼰的实现, 可能没有必要,
// 但如果您想在amqp连接结束后⽴即执⾏某些操作,
// ⼜不想等待tcp连接关闭,则这可能会很有⽤
}
/**
*当TCP连接关闭或丢失时调⽤的⽅法。
*如果同时调⽤了onConnected(),则始终调⽤此⽅法
*@param connection已关闭但现在⽆法使⽤的连接
*/
virtual void onLost(AMQP::TcpConnection *connection) override
{
//@todo
// 添加您⾃⼰的实现(可能没有必要)
}
/**
*调⽤的最终⽅法。这表⽰将不再对处理程序进⾏有关连接的进⼀步调⽤。
*@param connection可以被破坏的连接
*/
virtual void onDetached(AMQP::TcpConnection *connection) override
{
//@todo
// 添加您⾃⼰的实现,如清理资源或退出应⽤程序
}
/**
*当AMQP-CPP库想要与主事件循环交互时,它会调⽤该⽅法。
*AMQP-CPP库是完全不阻塞的,
*并且只有在事先知道这些调⽤不会阻塞时才进⾏“write()”或“read()”系统调⽤。
*要在事件循环中注册⽂件描述符,它会调⽤这个“monitor()”⽅法,
*该⽅法带有⼀个⽂件描述符和指⽰是否该检查⽂件描述符的可读性或可写性的标志。
* *
@param connection想要与事件循环交互的连接
*@param fd应该检查的⽂件描述符
*@param标记位或AMQP::可读和/或AMQP::可写
*/
virtual void monitor(AMQP::TcpConnection *connection, int fd, int flags)
override
{
//@todo
// 添加您⾃⼰的实现,
// 例如将⽂件描述符添加到主应⽤程序事件循环(如select()或poll()循环)。
// 当事件循环报告描述符变为可读和或可写时,
// 由您通过调⽤connection->process(fd,flags)⽅法
// 通知AMQP-CPP库⽂件描述符处于活动状态。
}
};
2.3.2扩展模式
以libev为例, 我们不必要⾃⼰实现monitor函数, 可以直接使⽤ AMQP::LibEvHandler 使用时需要包含头文件:
#include <amqpcpp.h>
2.3.2.1connection
⽤于设定rabbitmq服务器地址后,创建⼀个通信连接对象。
class LibEvHandler : public TcpHandler;
class ConnectionHandler;
class Address
{
// amqp://user:pass@host:port/
Address(const std::string &address) gg
};
class TcpConnection : private ConnectionHandler
{
TcpConnection(TcpHandler *handler, const Address &address)
};
class Login
{
Login(std::string user, std::string password)
};
class Connection
{
Connection(
ConnectionHandler *handler,
const Login &login,
const std::string &vhost) bool close() // 关闭连接
};
2.3.2.2channel
channel是⼀个虚拟连接,⼀个连接上可以建⽴多个通道。并且所有的RabbitMq指令都是通过channel传输,所以连接建⽴后的第⼀步,就是建⽴channel。因为所有操作是异步的,所以在channel上执⾏指令的返回值并不能作为操作执⾏结果,实际上它返回的是 Deferred 类,可以使⽤它安装处理函数。
namespace AMQP
{
/**
* 通⽤回调函数类型
*/
using SuccessCallback = std::function<void()>;
using ErrorCallback = std::function<void(const char *message)>;
using FinalizeCallback = std::function<void()>;
// 队列的声明的成功回调函数
using QueueCallback = std::function<void(
const std::string &name,
uint32_t messagecount,
uint32_t consumercount)>;
// 队列删除回调函数
using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
// 消息处理回调函数
using MessageCallback = std::function<void(
const Message &message, // 收到的消息
uint64_t deliveryTag, // 消息的唯⼀标识
bool redelivered)>; // 指⽰消息是否是被重新投递的
// 当启⽤发布者确认时,当服务器确认消息已被接收和处理时,将调⽤AckCallback
using AckCallback = std::function<void(
uint64_t deliveryTag, // 消息的唯⼀标识符
bool multiple)>; // 指⽰确认范围:false确认单个消息,true确认所有消费的该消息
// 消息被RabbitMQ成功接收和处理后调⽤的回调函数
using PublishAckCallback = std::function<void()>;
// 消息被RabbitMQ明确拒绝(nacked)时调⽤的回调函数
using PublishNackCallback = std::function<void()>;
// 消息丢失时调⽤的回调函数
using PublishLostCallback = std::function<void()>;
}
这些回调函数使得AMQP-CPP库能够在消息发布后提供反馈,帮助应⽤程序实现更可靠的消息传递机制。通过实现这些回调,开发者可以对消息的发布结果进⾏适当的处理,确保消息传递的可靠性或进⾏错误恢复
extern const int durable;
extern const int autodelete;
extern const int active;
extern const int passive;
extern const int ifunused;
extern const int ifempty;
extern const int global;
extern const int nolocal;
extern const int noack;
extern const int exclusive;
extern const int nowait;
extern const int mandatory;
extern const int immediate;
extern const int redelivered;
extern const int multiple;
extern const int requeue;
extern const int readable;
extern const int writable;
extern const int internal;
namespace AMQP {
enum ExchangeType
{
fanout, //⼴播交换,绑定的队列都能拿到消息
direct, //直接交换,只将消息交给routingkey⼀致的队列
topic, //主题交换,将消息交给符合bindingkey规则的队列
headers,
consistent_hash,
message_deduplication
}
class Channel {
Channel(Connection *connection);
bool connected();//是否连接可⽤
Deferred &close();//关闭当前channel
DeferredConfirm &confirmSelect();
Deferred &startTransaction();
Deferred &commitTransaction();
Deferred &rollbackTransaction();
/**
*声明交换机
*以下flags可⽤于交换机:
*-durable 持久化,重启后交换机依然有效
*-autodelete 删除所有连接的队列后,⾃动删除交换
*-passive 仅被动检查交换机是否存在
*-internal 创建内部交换*此函数返回⼀个延迟处理程序。可以安装回调
using onSuccess(), onError() and onFinalize() methods.
*/
Deferred &declareExchange(
const std::string_view &name, //交换机名称
ExchangeType type, //交换机类型
int flags) //标志位
/**
*声明队列
*flags可以是以下值的组合:
*-durable 持久队列在代理重新启动后仍然有效
*-autodelete 当所有连接的使⽤者都离开时,⾃动删除队列
*-passive 仅被动检查队列是否存在
*-exclusive 队列仅存在于此连接,并且在连接断开时⾃动删除
*可安装回调:onSuccess()、onError()和onFinalize()⽅法。
Deferred &onError(const char *message)
Deferred &onSuccess(QueueCallback&)
void myCallback(const std::string &name,
uint32_t messageCount,
uint32_t consumerCount);
例如:channel.declareQueue("myqueue").onSuccess(myCallback);
*/
DeferredQueue &declareQueue(
const std::string_view &name, //队列名称
int flags) //队列标志位
/**
*将队列绑定到交换机
*可以安装回调:onSuccess()、onError()和onFinalize()⽅法。
*/
Deferred &bindQueue(
const std::string_view &exchange, //交换机名称
const std::string_view &queue, //队列名称
const std::string_view &bindingkey) //绑定Key)
Deferred &unbindQueue(
const std::string_view &exchange,
const std::string_view &queue,
const std::string_view &bindingkey)
/*
*删除队列
*flag
- ifunused 仅在队列没有连接者时删除
- ifempty 仅在队列为空时删除
*/
DeferredDelete &removeQueue(
const std::string_view &name,
int flags = 0)
/*发布消息
flags参数指定如果消息⽆法路由到队列时应该发⽣的情况。
默认情况下,不可更改的消息将被静默地丢弃。
发布之前,请确保您已经调⽤了recall()⽅法,
并设置了所有适当的处理程序来处理这些返回的消息。
*flags:
*-mandatory 如果设置,服务器将返回未发送到队列的消息
*-immediate 如果设置,服务器将返回⽆法⽴即转发给使⽤者的消息。
*/
bool publish(
const std::string_view &exchange, //交换机名称
const std::string_view &routingKey,//路由Key
const std::string &message, //消息内容
int flags = 0) //标志位
/*
告诉消息您已准备好召回/收回⽆法转发的消息
当你将publish()⽅法与“immediate”或“mandatory”标志结合使⽤时,
rabbitmq会发回不可路由的消息。
使⽤此recall()⽅法,您可以安装⼀种伪消费者,定义如何处理此类返回的消息
*/
DeferredRecall &recall();
/**
*订阅队列消息
消费者标识⽤于通过channel::cancel()调⽤取消订阅
*⽀持以下flags:
*-nolocal 如果设置了,则不会同时消耗在此通道上发布的消息
*-noack 如果设置了,则不必对已消费的消息进⾏确认
*-exclusive 请求独占访问,只有此使⽤者可以访问队列
可以安装回调:onSuccess()、onError()和onFinalize()⽅法。
可以安装的onSuccess()回调应该具有以下格式:
void myCallback(const std::string_view&tag);
tag:即订阅时传⼊的消费者标识
*/
DeferredConsumer &consume(
const std::string_view &queue, //队列名称
const std::string_view &tag, //消费者标识
int flags = 0) //标志位
/*
取消订阅
flags:
- noack 消费消息不需要确认,也就是⾃动确认
*/
DeferredCancel &cancel(const std::string_view &tag);
/*从MQ检索单条消息
*/
DeferredGet &get(const std::string_view &queue, int flags = 0);
/**
*消费者确认接收到的消息
*当在DeferredConsumer::onReceived()⽅法中接收到消息时,必须确认该消息,
以便RabbitMQ将其从队列中删除(除⾮使⽤noack选项消费)。
flag:
multiple 确认多条消息:之前传递的所有未确认的消息也会被确认
*/
bool ack(uint64_t deliveryTag, int flags=0)
/*
消费者拒绝消息
flags:
multiple:拒绝多条消息:之前传递的所有未确认的消息也都是未确认的
requeue:如果已设置,则消息将放回队列中,否则将被删除
*/
bool reject(uint64_t deliveryTag, int flags=0)
//回复所有还没有被确认的消息
//flags:requeue:如果设置了,服务器将重新对消息进⾏排队,因此也可能最终到达不同的消费者
Deferred &recover(int flags = 0);
}
}
2.3.2.3Reliable
⽤于可靠的消息发布及错误处理。
template <typename BASE = Tagger>
class Reliable : public BASE
{
template <typename... Args>
Reliable(Args &&...args)
DeferredPublish &publish(
const std::string_view &exchange,
const std::string_view &routingKey,
const std::string_view &message,
int flags = 0)
};
// 样例:
AMQP::TcpChannel mychannel(connection);
AMQP::Reliable reliable(mychannel);
reliable.publish("my-exchange", "my-key", "my first message")
.onAck()
.onNack()
.onLost()
.onError();
2.3.2.4Message
⽤于订阅者收到消息后的处理
class Envelope : public MetaData{
const char *body();
uint64_t bodySize();
};
class Message : public Envelope{
const std::string &exchange();
const std::string &routingkey();
};
2.3.2.5Deferred
Deferred 类(包括其派⽣类,如 DeferredQueue , DeferredConsumer 等)是处理异步操作的核⼼机制。它⽤于表⽰⼀个尚未完成的操作,⽤于设置异步调⽤的回调函数,进⾏接⼝的连续调⽤。
class Deferred
{
Deferred &onSuccess(const SuccessCallback& callback);
Deferred &onError(const ErrorCallback& callback);
Deferred &onFinalize(const FinalizeCallback& callback);
};
示例:声明交换机
//channel.declareExchange() 接⼝返回的是Deferred对象
//因此,可以连续调⽤onSuccess设置成功处理回调函数以及onError失败处理回调函数
channel.declareExchange(exchange, AMQP::ExchangeType::direct)
.onSuccess([&](){
std::cout << "声明交换机成功:" << exchange << std::endl;
}
.onError([&](const char *message) {
std::cout << "声明交换机失败:" << message << std::endl;
});
2.3.2.6DeferredQueue
Channel中队列相关操作接⼝的返回对象,⽤于设置队列操作的相关回调函数。
class DeferredQueue : public Deferred
{
DeferredQueue &onSuccess(const QueueCallback& callback);
DeferredQueue &onSuccess(const SuccessCallback& callback);
};
using QueueCallback = std::function<void(const std::string &name,
uint32_t messageCount,
uint32_t consumerCount)>;
示例:声明队列
//channel.declareQueue() 接⼝返回的是DeferredQueue对象
channel.declareQueue(queue)
.onSuccess([&](const std::string &name,
uint32_t messagecount,
uint32_t consumercount){
std::cout << "声明队列成功:" << queue << std::endl;
std::cout << "队列中有" << messagecount << "个消息" << std::endl;
std::cout << "队列中有" << consumercount << "个消费者" << std::endl;
})
.onError([&](const char *message) {
std::cout << "声明队列失败:" << message << std::endl;
abort(); // 终⽌程序
});
2.3.2.7DeferredConsumer
Channel中订阅相关操作接⼝的返回对象,⽤于设置订阅的相关回调函数。
class DeferredConsumer : public Deferred {
DeferredConsumer &onSuccess(const ConsumeCallback& callback);
//收到消息的回调函数
DeferredConsumer &onReceived(const MessageCallback& callback);
/* Alias for onReceived() */
DeferredConsumer &onMessage(const MessageCallback& callback);
//取消订阅的回调
DeferredConsumer &onCancelled(const CancelCallback& callback);
};
using ConsumeCallback = std::function<void(const std::string_view &tag)>;
using MessageCallback = std::function<void (const AMQP::Message &message,
uint64_t deliveryTag,bool redelivered)>;
示例:订阅队列消息
//channel.consume() 接⼝返回的是DeferredConsumer对象
channel.consume(queue)
.onMessage([&](const AMQP::Message &message, //收到的消息
uint64_t deliveryTag, //消息的唯⼀标识
bool redelivered){
std::string body(message.body(), message.bodySize());
std::cout << "收到消息:" << body << std::endl;
// 7.5 收到消息进⾏处理后,不要忘了对消息进⾏确认
channel.ack(deliveryTag);
})
.onError([&](const char *message) {
std::cout << "订阅队列消息失败:" << message << std::endl;
abort(); // 终⽌程序
})
.onSuccess([&](const std::string_view &tag){
std::cout << "订阅队列消息成功:" << std::endl;
});
2.3.3扩展libev网络库
2.3.3.1 LibEvHandler
amqpcpp库与libev⽹络库的对接类
#include <amqpcpp/libev.h>
#include <ev.h>
class LibEvHandler : public TcpHandler{
LibEvHandler(struct ev_loop *loop, int priority = 0);
};
2.3.3.2ev_loop
libev网络库的核心使用接口
typedef struct ev_async
{
EV_WATCHER (ev_async)
EV_ATOMIC_T sent; /* private */
} ev_async;
//break type
enum {
EVBREAK_CANCEL = 0, /* undo unloop */
EVBREAK_ONE = 1, /* unloop once */
EVBREAK_ALL = 2 /* unloop all loops */
};
//创建libev操作句柄
struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0))
# define EV_DEFAULT ev_default_loop (0)
void ev_loop_destroy (struct ev_loop *loop)
//开始事件循环 --- 阻塞接⼝
int ev_run (struct ev_loop *loop);
//退出事件循环
void ev_break (struct ev_loop *loop, int32_t break_type) ;
//异步事件回调函数
//也就是每个外部针对ev的操作都必须在evloop所在线程内执⾏
//这⾥相当打包了⼀个任务,加⼊到evloop的任务队列中进⾏执⾏
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents)
//初始化⼀个异步任务
void ev_async_init(ev_async *w, callback cb);
//启动loop的异步任务监测,这样就可以接收任务进⾏处理了
void ev_async_start(struct ev_loop *loop, ev_async *w) ;
//发送异步任务到loop中进⾏执⾏
void ev_async_send(struct ev_loop *loop, ev_async *w) ;
//⽰例:异步退出
void mycb(struct ev_loop *loop, ev_async *watcher, int32_t revents) {
ev_break(loop, EVBREAK_ALL);
}
ev_async ea;
ev_async_init(&ea, mycb);
ev_async_start(loop, &ea);
ev_async_send(loop, &ea);
2.4访问网页端RabbitMQ
首先在命令行输入如下指令:
docker ps -a
查看rabbitMQ所在的虚拟环境编号:
我的rabbitMQ对应的虚拟环境编号为:b32f26a6303c,接下来依次执行如下指令:
sudo docker exec -it b32f26a6303c /bin/bash
rabbitmq-plugins enable rabbitmq_management
然后在浏览器中输入访问你的虚拟机或云服务器Ip地址,按这样的格式输入:
IP地址:15672
admin 123456
2.5使用样例
2.5.1简单消息传输
//simple_publish.cc
#include <iostream>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>
/*
1.创建libev句柄
2.创建libev与rabbitmq连接的libevHandler
3.创建amqp的tcpconnection
4.创建amqp的channel
5.声明exchange和queue
6.发送订阅消息
7.开启事件循环
*/
int main()
{
//0.定义键值名,交换机名,队列名,绑定键及url
const std::string url = "amqp://admin:123456@192.168.30.128:5672/";
const std::string exchange = "my-exchange";
const std::string queue = "my-queue";
const std::string binding_key = "my-binding-key";
//1.创建libev句柄
auto* ev_loop = EV_DEFAULT;
//2.创建libev与rabbitmq连接的libevHandler
AMQP::LibEvHandler handler(ev_loop);
//3.创建amqp的tcpconnection
AMQP::TcpConnection connection(&handler,AMQP::Address(url));
//4.创建amqp的channel
AMQP::TcpChannel channel(&connection);
//5.声明exchange(直接交换)和queue
channel.declareExchange(exchange,AMQP::ExchangeType::direct)
.onSuccess([&](){
//交换机声明成功,声明队列
channel.declareQueue(queue)
.onSuccess([&](const std::string &name,uint32_t messagecount,uint32_t consumercount){
std::cout<<"队列声明成功: "<< name<<", 消息数: "<< messagecount<<", 消费者数: "<< consumercount<<std::endl;
//队列声明成功,绑定交换机和队列
channel.bindQueue(exchange,queue,binding_key)
.onSuccess([&](){
std::cout<<"队列绑定成功"<<std::endl;
//队列绑定成功,发送订阅消息
bool ret = channel.publish(exchange,binding_key,"Hello, world!");
if(ret == false) {
std::cout<<"消息发送失败"<<std::endl;
}
})
.onError([&](const char* message){
std::cerr<<"队列绑定失败: "<< message <<std::endl;
exit(-1);//绑定失败直接退出
});
})
.onError([&](const char* message){
std::cerr<<"队列声明失败: "<< message <<std::endl;
exit(-1);//声明失败直接退出
});
})
.onError([&](const char* message){
std::cerr<<"交换机声明失败: "<< message <<std::endl;
exit(-1);//声明失败直接退出
});
//7.开启事件循环
ev_run(ev_loop);
return 0;
}
//simple_subscribe.cc
#include <iostream>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>
int main()
{
//0.定义键值名,交换机名,队列名,绑定键及url
const std::string url = "amqp://admin:123456@192.168.30.128:5672/";
const std::string exchange = "my-exchange";
const std::string queue = "my-queue";
const std::string binding_key = "my-binding-key";
//1.创建libev句柄
auto* ev_loop = EV_DEFAULT;
//2.创建libev与rabbitmq连接的libevHandler
AMQP::LibEvHandler handler(ev_loop);
//3.创建amqp的tcpconnection
AMQP::TcpConnection connection(&handler,AMQP::Address(url));
//4.创建amqp的channel
AMQP::TcpChannel channel(&connection);
//5.声明exchange(直接交换)和queue
channel.declareExchange(exchange,AMQP::ExchangeType::direct)
.onSuccess([&](){
//交换机声明成功,声明队列
channel.declareQueue(queue)
.onSuccess([&](const std::string &name,uint32_t messagecount,uint32_t consumercount){
std::cout<<"队列声明成功: "<< name<<", 消息数: "<< messagecount<<", 消费者数: "<< consumercount<<std::endl;
//队列声明成功,绑定交换机和队列
channel.bindQueue(exchange,queue,binding_key)
.onSuccess([&](){
std::cout<<"队列绑定成功"<<std::endl;
//队列绑定成功,开始订阅
channel.consume(queue)
.onReceived([&](const AMQP::Message &message,uint64_t deliveryTag,bool redelivered){
std::string msg_body(message.body(),message.bodySize());
std::cout<<"收到消息: "<< msg_body <<std::endl;
//确认消息
channel.ack(deliveryTag);
})
.onError([&](const char* message){
std::cerr<<"订阅消息失败: "<< message <<std::endl;
exit(-1);//订阅失败直接退出
})
.onSuccess([&](){
std::cout<<"订阅成功"<<std::endl;
});
})
.onError([&](const char* message){
std::cerr<<"队列绑定失败: "<< message <<std::endl;
exit(-1);//绑定失败直接退出
});
})
.onError([&](const char* message){
std::cerr<<"队列声明失败: "<< message <<std::endl;
exit(-1);//声明失败直接退出
});
})
.onError([&](const char* message){
std::cerr<<"交换机声明失败: "<< message <<std::endl;
exit(-1);//声明失败直接退出
});
//7.开启事件循环
ev_run(ev_loop);
return 0;
}
makefile
all: simple_publish simple_subscribe
simple_publish:simple_publish.cc
g++ $^ -o $@ -lpthread -lamqpcpp -lev
simple_subscribe:simple_subscribe.cc
g++ $^ -o $@ -lpthread -lamqpcpp -lev
clean:
rm -f simple_publish simple_subscribe
然后我们分别执行发送消息端与订阅消息端即可在rabbitMQ的网页界面看到响应的接收消费情况。
2.5.2死信队列
我们的消息是可以设置过期时间的。如果一个消息在队列中过时了,它会被丢弃。但这样静默丢弃可能不合理,因此可以通过设置死信交换机来处理这些“死信”消息。 死信交换机本身与普通交换机没有区别。当队列中的消息因 过期、被消费者拒绝且不重新入队、或队列达到最大长度 而变成死信时,它会被重新发布到配置的死信交换机上。 路由时,默认会使用消息原始的路由键。但也可以在声明队列时指定一个 x-dead-letter-routing-key 参数来覆盖,使用一个新的固定路由键。然后,死信交换机会根据这个最终的路由键和其绑定关系,将消息路由到对应的死信队列中。进入死信队列的消息会作为一个新消息存在,等待消费者处理。
相关接口与类
class Field {
//....
};
class Table : public Field {
public:
Table &set(const std::string &name, bool value) ;
// ... ⼀系列常⻅基础类型的重载...
AssociativeFieldProxy operator[](const std::string& name);
};
//死信队列参数
AMQP::Table args;
args["x-dead-letter-exchange"] = "xxx";
args["x-dead-letter-routing-key"] = "xxx";
args["x-message-ttl"] = 10000; //队列消息⽣命周期设置-以毫秒为单位
2.5.3基于死信队列实现延时订阅
//delayed_publish.cc
#include <iostream>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>
//tags不为空表示声明死信队列与交换机
void declare(AMQP::TcpChannel& channel,const std::string& exchange, const std::string& queue, const std::string& binding_key,AMQP::Table args)
{
//5.声明exchange(直接交换)和queue
channel.declareExchange(exchange,AMQP::ExchangeType::direct)
.onSuccess([=,&channel](){
//交换机声明成功,声明队列
channel.declareQueue(queue,args)
.onSuccess([=,&channel](const std::string &name,uint32_t messagecount,uint32_t consumercount){
std::cout<<"队列声明成功: "<< name<<", 消息数: "<< messagecount<<", 消费者数: "<< consumercount<<std::endl;
//队列声明成功,绑定交换机和队列
channel.bindQueue(exchange,queue,binding_key)
.onSuccess([=,&channel](){
std::cout<<"队列绑定成功"<<std::endl;
//队列绑定成功,发送订阅消息
bool ret = channel.publish(exchange,binding_key,"Hello, world!");
if(ret == false) {
std::cout<<"消息发送失败"<<std::endl;
}
})
.onError([=](const char* message){
std::cerr<<"队列绑定失败: "<< message <<std::endl;
exit(-1);//绑定失败直接退出
});
})
.onError([=](const char* message){
std::cerr<<"队列声明失败: "<< message <<std::endl;
exit(-1);//声明失败直接退出
});
})
.onError([=](const char* message){
std::cerr<<"交换机声明失败: "<< message <<std::endl;
exit(-1);//声明失败直接退出
});
}
int main()
{
const std::string url = "amqp://admin:123456@192.168.30.128:5672/";
const std::string exchange = "my-exchange";
const std::string queue = "my-queue";
const std::string binding_key = "my-binding-key";
//1.创建libev句柄
auto* ev_loop = EV_DEFAULT;
//2.创建libev与rabbitmq连接的libevHandler
AMQP::LibEvHandler handler(ev_loop);
//3.创建amqp的tcpconnection
AMQP::TcpConnection connection(&handler,AMQP::Address(url));
//4.创建amqp的channel
AMQP::TcpChannel channel(&connection);
//声明延时队列
AMQP::Table args;
args["x-dead-letter-exchange"] = "dlx-" + exchange;
args["x-dead-letter-routing-key"] = "dlx-" + binding_key;
args["x-message-ttl"] = 5000; //队列消息⽣命周期设置-以毫秒为单位
declare(channel,exchange,queue,binding_key,args);
//声明死信队列
declare(channel,"dlx-" + exchange,"dlx-" + queue,"dlx-" + binding_key,AMQP::Table());
//5.开启事件循环
ev_run(ev_loop);
return 0;
}
//delayed_subscribe.cc
#include <iostream>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>
//tags不为空表示声明死信队列与交换机
void declare(AMQP::TcpChannel& channel,const std::string& exchange, const std::string& queue, const std::string& binding_key)
{
channel.declareExchange(exchange,AMQP::ExchangeType::direct)
.onSuccess([&](){
//交换机声明成功,声明队列
channel.declareQueue(queue)
.onSuccess([&](const std::string &name,uint32_t messagecount,uint32_t consumercount){
std::cout<<"队列声明成功: "<< name<<", 消息数: "<< messagecount<<", 消费者数: "<< consumercount<<std::endl;
//队列声明成功,绑定交换机和队列
channel.bindQueue(exchange,queue,binding_key)
.onSuccess([&](){
std::cout<<"队列绑定成功"<<std::endl;
//队列绑定成功,开始订阅
channel.consume(queue)
.onReceived([&](const AMQP::Message &message,uint64_t deliveryTag,bool redelivered){
std::string msg_body(message.body(),message.bodySize());
std::cout<<"收到消息: "<< msg_body <<std::endl;
//确认消息
channel.ack(deliveryTag);
})
.onError([&](const char* message){
std::cerr<<"订阅消息失败: "<< message <<std::endl;
exit(-1);//订阅失败直接退出
})
.onSuccess([&](){
std::cout<<"订阅成功"<<std::endl;
});
})
.onError([&](const char* message){
std::cerr<<"队列绑定失败: "<< message <<std::endl;
exit(-1);//绑定失败直接退出
});
})
.onError([&](const char* message){
std::cerr<<"队列声明失败: "<< message <<std::endl;
exit(-1);//声明失败直接退出
});
})
.onError([&](const char* message){
std::cerr<<"交换机声明失败: "<< message <<std::endl;
exit(-1);//声明失败直接退出
});
}
int main()
{
const std::string url = "amqp://admin:123456@192.168.30.128:5672/";
const std::string exchange = "dlx-my-exchange";
const std::string queue = "dlx-my-queue";
const std::string binding_key = "dlx-my-binding-key";
//1.创建libev句柄
auto* ev_loop = EV_DEFAULT;
//2.创建libev与rabbitmq连接的libevHandler
AMQP::LibEvHandler handler(ev_loop);
//3.创建amqp的tcpconnection
AMQP::TcpConnection connection(&handler,AMQP::Address(url));
//4.创建amqp的channel
AMQP::TcpChannel channel(&connection);
//声明死信队列与死信交换机
declare(channel,exchange,queue,binding_key);
//5.开启事件循环
ev_run(ev_loop);
return 0;
}
makefile
all:delayed_publish delayed_subscribe
delayed_publish:delayed_publish.cc
g++ $^ -o $@ -lpthread -lamqpcpp -lev
delayed_subscribe:delayed_subscribe.cc
g++ $^ -o $@ -lpthread -lamqpcpp -lev
clean:
rm -f delayed_publish delayed_subscribe
三.二次封装
我们暂时只使用直接交换,所以以下的流程是针对直接交换进行叙述的
3.1封装思路
消息队列的操作基本就五个操作:声明交换机,声明队列,绑定交换机-队列,发布消息,订阅队列消息。 ⽽基于这五个操作,封装⼀些类出来进⾏简化:
- 套件配置声明结构:便于交换机与队列的声明与绑定,以及队列的订阅和消息的发布
- 消息队列客⼾端类:将客⼾端的各个请求操作封装起来,简化外部操作
- 发布者类:基于消息队列客⼾端类进⾏上层封装,⽤于发布端进⾏消息发布
- 订阅者类:基于消息队列客⼾端类进⾏上层封装,⽤于订阅端进⾏消息订阅
那么具体可以定义以下四个类:
- 定义交换机及类型,队列及bind-key。如果是延时队列还需要定义延时时间ttl
- MQ客户端类,即发布消息与订阅消息客户端的共同部分
- 发布消息客户端类
- 订阅消息客户端类
//limemq.h
#pragma once
#include <amqpcpp.h>
#include <ev.h>
#include <amqpcpp/libev.h>
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
namespace limemq {
const std::string DIRECT = "direct";
const std::string FANOUT = "fanout";
const std::string TOPIC = "topic";
const std::string HEADERS = "headers";
const std::string DELAYED = "delayed";
const std::string DLX_PREFIX = "dlx-";
const std::string DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";
const std::string DEAD_LETTER_BINDING_KEY = "x-dead-letter-routing-key";
const std::string MESSAGE_TTL = "x-message-ttl";
struct declare_settings {
std::string exchange;
std::string exchange_type;// 交换机类型: direct、fanout、topic、headers、delayed
std::string queue;
std::string binding_key;
size_t delayed_ttl = 0;
std::string dlx_exchange() const;
std::string dlx_queue() const;
std::string dlx_binding_key() const;
};
extern AMQP::ExchangeType exchange_type(const std::string &type);
using MessageCallback = std::function<void(const char*, size_t)>;
class MQClient {
public:
using ptr = std::shared_ptr<MQClient>;
MQClient(const std::string &url); // 构造成员,启动事件循环
~MQClient(); // 发送异步请求,结束事件循环,等待异步线程结束
//声明交换机,声明队列,并绑定交换机和队列,如果是延时队列,则还要创建配套的死信交换机和队列
//声明交换机和队列以及绑定成功后,需要等待,等待实际交换机和队列声明成功,再返回
void declare(const declare_settings &settings);
bool publish(const std::string &exchange, const std::string &routing_key, const std::string &body);
void consume(const std::string &queue, const MessageCallback &callback);
void wait();
private:
static void callback(struct ev_loop *loop, ev_async *watcher, int32_t revents);
// 声明常规交换机和队列,并进行绑定
void _declared(const declare_settings &settings, AMQP::Table &args, bool is_dlx = false);
private:
std::mutex _mtx;
std::condition_variable _cv;
struct ev_loop *_ev_loop;
struct ev_async _ev_async;
AMQP::LibEvHandler _handler;
AMQP::TcpConnection _connection;
AMQP::TcpChannel _channel;
std::thread _async_thread;
};
class Publisher {
public:
using ptr = std::shared_ptr<Publisher>;
//对成员进行初始化,并声明套件内的交换机和队列
Publisher(const MQClient::ptr &mq_client, const declare_settings &settings);
bool publish(const std::string &body);
private:
MQClient::ptr _mq_client;
declare_settings _settings;
};
class Subscriber {
public:
using ptr = std::shared_ptr<Subscriber>;
//对成员进行初始化,并声明套件内的交换机和队列
Subscriber(const MQClient::ptr &mq_client, const declare_settings &settings);
//如果是延时队列,则实际订阅的是配套的死信队列
void consume(MessageCallback &&callback);
private:
MQClient::ptr _mq_client;
declare_settings _settings;
MessageCallback _callback;
};
}
3.2具体封装实现
#include "limemq.h"
#include "limelog.h"
namespace limemq {
std::string declare_settings::dlx_exchange() const {
return DLX_PREFIX + exchange;
}
std::string declare_settings::dlx_queue() const {
return DLX_PREFIX + queue;
}
std::string declare_settings::dlx_binding_key() const {
return DLX_PREFIX + binding_key;
}
AMQP::ExchangeType exchange_type(const std::string &type) {
if(type == DIRECT)
{
//直接匹配
return AMQP::direct;
}
else if(type == FANOUT)
{
//广播类型
return AMQP::fanout;
}
else if(type == TOPIC)
{
//主题类型
return AMQP::topic;
}
else if(type == HEADERS)
{
//头部匹配
return AMQP::headers;
}
else if(type == DELAYED)
{
return AMQP::direct;
}
else
{
WRN("未知交换机类型:{}, 使用默认类型direct", type);
return AMQP::direct;
}
}
MQClient::MQClient(const std::string &url)
:_ev_loop(EV_DEFAULT),
_handler(_ev_loop),
_connection(&_handler, AMQP::Address(url)),
_channel(&_connection),
_async_thread(std::thread([this](){ ev_run(_ev_loop); }))
{}
void MQClient::declare(const declare_settings &settings)
{
AMQP::Table args;
if(settings.exchange_type == DELAYED)
{
//声明死信交换机和死信队列
_declared(settings, args, true);
args["x-dead-letter-exchange"] = settings.dlx_exchange();
args["x-dead-letter-routing-key"] = settings.dlx_binding_key();
args["x-message-ttl"] = settings.delayed_ttl;
}
//声明延时队列与交换机
_declared(settings, args, false);
}
void MQClient::_declared(const declare_settings &settings, AMQP::Table &args, bool is_dlx)
{
//用于临界控制
std::unique_lock<std::mutex> lock(_mtx);
//定义交换机名,队列名,绑定键
std::string exchange = is_dlx ? settings.dlx_exchange() : settings.exchange;
std::string queue = is_dlx ? settings.dlx_queue() : settings.queue;
std::string binding_key = is_dlx ? settings.dlx_binding_key() : settings.binding_key;
//5.声明exchange(直接交换)和queue
_channel.declareExchange(exchange,exchange_type(settings.exchange_type))
.onSuccess([=](){
//交换机声明成功,声明队列
_channel.declareQueue(queue,args)
.onSuccess([=](const std::string &name,uint32_t messagecount,uint32_t consumercount){
std::cout<<"队列声明成功: "<< name<<", 消息数: "<< messagecount<<", 消费者数: "<< consumercount<<std::endl;
//队列声明成功,绑定交换机和队列
_channel.bindQueue(exchange,queue,binding_key)
.onSuccess([=](){
//成功绑定后通知等待线程
_cv.notify_all();
})
.onError([=](const char* message){
std::cerr<<"队列绑定失败: "<< message <<std::endl;
exit(-1);//绑定失败直接退出
});
})
.onError([=](const char* message){
std::cerr<<"队列声明失败: "<< message <<std::endl;
exit(-1);//声明失败直接退出
});
})
.onError([=](const char* message){
std::cerr<<"交换机声明失败: "<< message <<std::endl;
exit(-1);//声明失败直接退出
});
_cv.wait(lock);
}
bool MQClient::publish(const std::string &exchange, const std::string &routing_key, const std::string &body)
{
return _channel.publish(exchange, routing_key, body);
}
void MQClient::consume(const std::string &queue, const MessageCallback &callback)
{
_channel.consume(queue)
.onReceived([=](const AMQP::Message &message,uint64_t deliveryTag,bool redelivered){
callback(message.body(), message.bodySize());
//确认消息
_channel.ack(deliveryTag);
})
.onError([=](const char* message){
std::cerr<<"订阅消息失败: "<< message <<std::endl;
exit(-1);//订阅失败直接退出
})
.onSuccess([=](){
std::cout<<"订阅成功"<<std::endl;
});
}
void MQClient::callback(struct ev_loop *loop, ev_async *watcher, int32_t revents)
{
//此接口官方规定不能跨线程调用
ev_break(loop, EVBREAK_ALL);
}
void MQClient::wait()
{
//等待异步线程结束,主线程无事可干时等待异步线程结束
_async_thread.join();
}
MQClient::~MQClient(){
//当mqclient对象被销毁时,关闭异步线程
ev_async_init(&_ev_async, MQClient::callback);
ev_async_start(_ev_loop, &_ev_async);
ev_async_send(_ev_loop, &_ev_async);
_async_thread.join();// 等待异步线程结束
}
Publisher::Publisher(const MQClient::ptr &mq_client, const declare_settings &settings)
:_mq_client(mq_client),
_settings(settings)
{
_mq_client->declare(_settings);
}
bool Publisher::publish(const std::string &body)
{
return _mq_client->publish(_settings.exchange, _settings.binding_key, body);
}
Subscriber::Subscriber(const MQClient::ptr &mq_client, const declare_settings &settings)
:_mq_client(mq_client),
_settings(settings)
{
_mq_client->declare(_settings);
}
void Subscriber::consume(MessageCallback &&callback)
{
_callback = callback;
//如果是延时队列,顶订阅的是死信队列消息,否则订阅的是常规队列消息
if (_settings.exchange_type == DELAYED) {
_mq_client->consume(_settings.dlx_queue(), _callback);
}else {
_mq_client->consume(_settings.queue, _callback);
}
}
} // namespace limemq
3.3基于二次封装实现延时消息订阅的样例
//delayed_publish.cc
#include "../../source/limemq.h"
#include "../../source/limelog.h"
int main() {
limelog::limelog_init();
//进行相关初始设定
std::string url = "amqp://admin:123456@192.168.30.128:5672/";
limemq::declare_settings settings
{
.exchange = "my-exchange",
.exchange_type = "delayed",
.queue = "my-queue",
.binding_key = "my-key",
.delayed_ttl = 5000
};
//创建客户端
limemq::MQClient::ptr client = std::make_shared<limemq::MQClient>(url);
//发送订阅消息publish
limemq::Publisher::ptr publisher = std::make_shared<limemq::Publisher>(client, settings);
publisher->publish("Hello, World! This is a delayed message.");
return 0;
}
//delayed_subscribe.cc
#include "../../source/limemq.h"
#include "../../source/limelog.h"
int main() {
limelog::limelog_init();
//进行相关初始设定
std::string url = "amqp://admin:123456@192.168.30.128:5672/";
limemq::declare_settings settings
{
.exchange = "my-exchange",
.exchange_type = "delayed",
.queue = "my-queue",
.binding_key = "my-key",
.delayed_ttl = 5000
};
//创建客户端
limemq::MQClient::ptr client = std::make_shared<limemq::MQClient>(url);
//发送订阅消息subscribe
limemq::Subscriber::ptr subscriber = std::make_shared<limemq::Subscriber>(client,settings);
subscriber->consume([](const char* body, size_t size){
std::string msg(body, size);
std::cout<<"收到消息:"<<msg<<std::endl;
});
client->wait();
return 0;
}
makefile
all:delayed_publish delayed_subscribe
delayed_publish:delayed_publish.cc ../../source/limemq.cc ../../source/limelog.cc
g++ $^ -o $@ -lpthread -lamqpcpp -lev -lspdlog -lfmt -std=c++17
delayed_subscribe:delayed_subscribe.cc ../../source/limemq.cc ../../source/limelog.cc
g++ $^ -o $@ -lpthread -lamqpcpp -lev -lspdlog -lfmt -std=c++17
clean:
rm -f delayed_publish delayed_subscribe




评论(已关闭)
评论已关闭