C++
Main Function
int main(int argc, const char *argv[]) { // need an ip if (argc != 2) { // report error std::cerr << "usage: " << argv[0] << " <ip>" << std::endl; // done return -1; } else { // create connection MyConnection connection(argv[1]); // start the main event loop Event::MainLoop::instance()->run(); // done return 0; } }
RabbitMQ Connection and Error Handling
MyConnection::MyConnection(const std::string &ip) : _socket(Event::MainLoop::instance(), this), _connection(nullptr), _channel(nullptr) { // start connecting if (_socket.connect(Network::Ipv4Address(ip), 5672)) return; // failure onFailure(&_socket); } /** * Destructor */ MyConnection::~MyConnection() { // do we still have a channel? if (_channel) delete _channel; // do we still have a connection? if (_connection) delete _connection; } /** * Method that is called when the connection failed * @param socket Pointer to the socket */ void MyConnection::onFailure(Network::TcpSocket *socket) { // report error std::cout << "connect failure" << std::endl; } /** * Method that is called when the connection timed out (which also is a failure * @param socket Pointer to the socket */ void MyConnection::onTimeout(Network::TcpSocket *socket) { // report error std::cout << "connect timeout" << std::endl; } /** * Method that is called when the connection succeeded * @param socket Pointer to the socket */ void MyConnection::onConnected(Network::TcpSocket *socket) { // report connection std::cout << "connected" << std::endl; // we are connected, leap out if there already is a amqp connection if (_connection) return; // create amqp connection, and a new channel _connection = new AMQP::Connection(this, AMQP::Login("guest", "guest"), "/"); _channel = new AMQP::Channel(_connection, this); // we declare a queue, an exchange and we publish a message _channel->declareQueue("my_queue"); _channel->declareExchange("my_exchange", AMQP::direct); _channel->bindQueue("my_exchange", "my_queue", "key"); } /** * Method that is called when the socket is closed (as a result of a TcpSocket::close() call) * @param socket Pointer to the socket */ void MyConnection::onClosed(Network::TcpSocket *socket) { // show std::cout << "myconnection closed" << std::endl; // close the channel and connection if (_channel) delete _channel; if (_connection) delete _connection; // set to null _channel = nullptr; _connection = nullptr; } /** * Method that is called when the peer closed the connection * @param socket Pointer to the socket */ void MyConnection::onLost(Network::TcpSocket *socket) { // report error std::cout << "connection lost" << std::endl; // close the channel and connection if (_channel) delete _channel; if (_connection) delete _connection; // set to null _channel = nullptr; _connection = nullptr; } /** * Method that is called when data is received on the socket * @param socket Pointer to the socket * @param buffer Pointer to the fill input buffer */ void MyConnection::onData(Network::TcpSocket *socket, Network::Buffer *buffer) { // send what came in std::cout << "received: " << buffer->size() << " bytes" << std::endl; // leap out if there is no connection if (!_connection) return; // let the data be handled by the connection size_t bytes = _connection->parse(buffer->data(), buffer->size()); // shrink the buffer buffer->shrink(bytes); } /** * Method that is called when data needs to be sent over the network * * Note that the AMQP library does no buffering by itself. This means * that this method should always send out all data or do the buffering * itself. * * @param connection The connection that created this output * @param buffer Data to send * @param size Size of the buffer */ void MyConnection::onData(AMQP::Connection *connection, const char *buffer, size_t size) { // send to the socket _socket.write(buffer, size); } /** * When the connection ends up in an error state this method is called. * This happens when data comes in that does not match the AMQP protocol * * After this method is called, the connection no longer is in a valid * state and can be used. In normal circumstances this method is not called. * * @param connection The connection that entered the error state * @param message Error message */ void MyConnection::onError(AMQP::Connection *connection, const std::string &message) { // report error std::cout << "AMQP Connection error: " << message << std::endl; } /** * Method that is called when the login attempt succeeded. After this method * was called, the connection is ready to use * * @param connection The connection that can now be used */ void MyConnection::onConnected(AMQP::Connection *connection) { // show std::cout << "AMQP login success" << std::endl; // create channel if it does not yet exist if (!_channel) _channel = new AMQP::Channel(connection, this); } /** * Method that is called when the channel was succesfully created. * Only after the channel was created, you can use it for subsequent messages over it * @param channel */ void MyConnection::onReady(AMQP::Channel *channel) { // show std::cout << "AMQP channel ready, id: " << (int) channel->id() << std::endl; } /** * An error has occured on the channel * @param channel * @param message */ void MyConnection::onError(AMQP::Channel *channel, const std::string &message) { // show std::cout << "AMQP channel error, id: " << (int) channel->id() << " - message: " << message << std::endl; // main channel cause an error, get rid of if delete _channel; // reset pointer _channel = nullptr; } /** * Method that is called when the channel was paused * @param channel */ void MyConnection::onPaused(AMQP::Channel *channel) { // show std::cout << "AMQP channel paused" << std::endl; } /** * Method that is called when the channel was resumed * @param channel */ void MyConnection::onResumed(AMQP::Channel *channel) { // show std::cout << "AMQP channel resumed" << std::endl; } /** * Method that is called when a channel is closed * @param channel */ void MyConnection::onClosed(AMQP::Channel *channel) { // show std::cout << "AMQP channel closed" << std::endl; } /** * Method that is called when a transaction was started * @param channel */ void MyConnection::onTransactionStarted(AMQP::Channel *channel) { // show std::cout << "AMQP transaction started" << std::endl; } /** * Method that is called when a transaction was committed * @param channel */ void MyConnection::onTransactionCommitted(AMQP::Channel *channel) { // show std::cout << "AMQP transaction committed" << std::endl; } /** * Method that is called when a transaction was rolled back * @param channel */ void MyConnection::onTransactionRolledBack(AMQP::Channel *channel) { // show std::cout << "AMQP transaction rolled back" << std::endl; } /** * Mehod that is called when an exchange is declared * @param channel */ void MyConnection::onExchangeDeclared(AMQP::Channel *channel) { // show std::cout << "AMQP exchange declared" << std::endl; } /** * Method that is called when an exchange is bound * @param channel */ void MyConnection::onExchangeBound(AMQP::Channel *channel) { // show std::cout << "AMQP Exchange bound" << std::endl; } /** * Method that is called when an exchange is unbound * @param channel */ void MyConnection::onExchangeUnbound(AMQP::Channel *channel) { // show std::cout << "AMQP Exchange unbound" << std::endl; } /** * Method that is called when an exchange is deleted * @param channel */ void MyConnection::onExchangeDeleted(AMQP::Channel *channel) { // show std::cout << "AMQP Exchange deleted" << std::endl; } /** * Method that is called when a queue is declared * @param channel * @param name name of the queue * @param messageCount number of messages in queue * @param consumerCount number of active consumers */ void MyConnection::onQueueDeclared(AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount) { // show std::cout << "AMQP Queue declared" << std::endl; } /** * Method that is called when a queue is bound * @param channel * @param */ void MyConnection::onQueueBound(AMQP::Channel *channel) { // show std::cout << "AMQP Queue bound" << std::endl; _channel->publish("my_exchange", "invalid-key", AMQP::mandatory, "this is the message"); } /** * Method that is called when a queue is deleted * @param channel * @param messageCount number of messages deleted along with the queue */ void MyConnection::onQueueDeleted(AMQP::Channel *channel, uint32_t messageCount) { // show std::cout << "AMQP Queue deleted" << std::endl; } /** * Method that is called when a queue is unbound * @param channel */ void MyConnection::onQueueUnbound(AMQP::Channel *channel) { // show std::cout << "AMQP Queue unbound" << std::endl; } /** * Method that is called when a queue is purged * @param messageCount number of message purged */ void MyConnection::onQueuePurged(AMQP::Channel *channel, uint32_t messageCount) { // show std::cout << "AMQP Queue purged" << std::endl; } /** * Method that is called when the quality-of-service was changed * This is the result of a call to Channel::setQos() */ void MyConnection::onQosSet(AMQP::Channel *channel) { // show std::cout << "AMQP Qos set" << std::endl; } /** * Method that is called when a consumer was started * This is the result of a call to Channel::consume() * @param channel the channel on which the consumer was started * @param tag the consumer tag */ void MyConnection::onConsumerStarted(AMQP::Channel *channel, const std::string &tag) { // show std::cout << "AMQP consumer started" << std::endl; } /** * Method that is called when a message has been received on a channel * @param channel the channel on which the consumer was started * @param message the consumed message * @param deliveryTag the delivery tag, you need this to acknowledge the message * @param consumerTag the consumer identifier that was used to retrieve this message * @param redelivered is this a redelivered message? */ void MyConnection::onReceived(AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered) { // show std::cout << "AMQP consumed: " << message.message() << std::endl; // ack the message channel->ack(deliveryTag); } /** * Method that is called when a message you tried to publish was returned * by the server. This only happens when the 'mandatory' or 'immediate' flag * was set with the Channel::publish() call. * @param channel the channel on which the message was returned * @param message the returned message * @param code the reply code * @param text human readable reply reason */ void MyConnection::onReturned(AMQP::Channel *channel, const AMQP::Message &message, int16_t code, const std::string &text) { // show std::cout << "AMQP message returned: " << text << std::endl; } /** * Method that is called when a consumer was stopped * This is the result of a call to Channel::cancel() * @param channel the channel on which the consumer was stopped * @param tag the consumer tag */ void MyConnection::onConsumerStopped(AMQP::Channel *channel, const std::string &tag) { // show std::cout << "AMQP consumer stopped" << std::endl; }