This commit is contained in:
bing
2026-04-02 23:18:28 +08:00
commit 6198e1b53c
112 changed files with 20893 additions and 0 deletions

51
examples/CMakeLists.txt Normal file
View File

@@ -0,0 +1,51 @@
# Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
# SPDX-License-Identifier: mit
add_library(examples-common OBJECT)
target_sources(examples-common PRIVATE
utils.h
utils.c)
if(WIN32)
target_sources(examples-common PRIVATE win32/platform_utils.c)
else()
target_sources(examples-common PRIVATE unix/platform_utils.c)
endif()
target_link_libraries(examples-common PRIVATE rabbitmq::rabbitmq)
add_executable(amqp_sendstring amqp_sendstring.c)
target_link_libraries(amqp_sendstring PRIVATE examples-common rabbitmq::rabbitmq)
add_executable(amqp_rpc_sendstring_client amqp_rpc_sendstring_client.c)
target_link_libraries(amqp_rpc_sendstring_client PRIVATE examples-common rabbitmq::rabbitmq)
add_executable(amqp_exchange_declare amqp_exchange_declare.c)
target_link_libraries(amqp_exchange_declare PRIVATE examples-common rabbitmq::rabbitmq)
add_executable(amqp_listen amqp_listen.c)
target_link_libraries(amqp_listen PRIVATE examples-common rabbitmq::rabbitmq)
add_executable(amqp_producer amqp_producer.c)
target_link_libraries(amqp_producer PRIVATE examples-common rabbitmq::rabbitmq)
add_executable(amqp_confirm_select amqp_confirm_select.c)
target_link_libraries(amqp_confirm_select PRIVATE examples-common rabbitmq::rabbitmq)
add_executable(amqp_connect_timeout amqp_connect_timeout.c)
target_link_libraries(amqp_connect_timeout PRIVATE examples-common rabbitmq::rabbitmq)
add_executable(amqp_consumer amqp_consumer.c)
target_link_libraries(amqp_consumer PRIVATE examples-common rabbitmq::rabbitmq)
add_executable(amqp_unbind amqp_unbind.c)
target_link_libraries(amqp_unbind PRIVATE examples-common rabbitmq::rabbitmq)
add_executable(amqp_bind amqp_bind.c)
target_link_libraries(amqp_bind PRIVATE examples-common rabbitmq::rabbitmq)
add_executable(amqp_listenq amqp_listenq.c)
target_link_libraries(amqp_listenq PRIVATE examples-common rabbitmq::rabbitmq)
if (ENABLE_SSL_SUPPORT)
add_executable(amqp_ssl_connect amqp_ssl_connect.c)
target_link_libraries(amqp_ssl_connect PRIVATE examples-common rabbitmq::rabbitmq)
endif (ENABLE_SSL_SUPPORT)

63
examples/amqp_bind.c Normal file
View File

@@ -0,0 +1,63 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
#include "utils.h"
int main(int argc, char const *const *argv) {
char const *hostname;
int port, status;
char const *exchange;
char const *bindingkey;
char const *queue;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if (argc < 6) {
fprintf(stderr, "Usage: amqp_bind host port exchange bindingkey queue\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
exchange = argv[3];
bindingkey = argv[4];
queue = argv[5];
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_queue_bind(conn, 1, amqp_cstring_bytes(queue),
amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding");
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}

View File

@@ -0,0 +1,188 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit
#include <inttypes.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
#include "utils.h"
#if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__)))
#ifndef WINVER
#define WINVER 0x0502
#endif
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#include <winsock2.h>
#else
#include <sys/time.h>
#endif
#define SUMMARY_EVERY_US 5000
static void send_batch(amqp_connection_state_t conn, amqp_bytes_t queue_name,
int rate_limit, int message_count) {
uint64_t start_time = now_microseconds();
int i;
int sent = 0;
int previous_sent = 0;
uint64_t previous_report_time = start_time;
uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
char message[256];
amqp_bytes_t message_bytes;
for (i = 0; i < (int)sizeof(message); i++) {
message[i] = i & 0xff;
}
message_bytes.len = sizeof(message);
message_bytes.bytes = message;
for (i = 0; i < message_count; i++) {
uint64_t now = now_microseconds();
die_on_error(amqp_basic_publish(conn, 1, amqp_literal_bytes("amq.direct"),
queue_name, 0, 0, NULL, message_bytes),
"Publishing");
sent++;
if (now > next_summary_time) {
int countOverInterval = sent - previous_sent;
double intervalRate =
countOverInterval / ((now - previous_report_time) / 1000000.0);
printf("%d ms: Sent %d - %d since last report (%d Hz)\n",
(int)(now - start_time) / 1000, sent, countOverInterval,
(int)intervalRate);
previous_sent = sent;
previous_report_time = now;
next_summary_time += SUMMARY_EVERY_US;
}
while (((i * 1000000.0) / (now - start_time)) > rate_limit) {
microsleep(2000);
now = now_microseconds();
}
}
{
uint64_t stop_time = now_microseconds();
int total_delta = (int)(stop_time - start_time);
printf("PRODUCER - Message count: %d\n", message_count);
printf("Total time, milliseconds: %d\n", total_delta / 1000);
printf("Overall messages-per-second: %g\n",
(message_count / (total_delta / 1000000.0)));
}
}
#define CONSUME_TIMEOUT_USEC 100
#define WAITING_TIMEOUT_USEC (30 * 1000)
void wait_for_acks(amqp_connection_state_t conn) {
uint64_t start_time = now_microseconds();
struct timeval timeout = {0, CONSUME_TIMEOUT_USEC};
uint64_t now = 0;
amqp_publisher_confirm_t result = {};
for (;;) {
amqp_rpc_reply_t ret;
now = now_microseconds();
if (now > start_time + WAITING_TIMEOUT_USEC) {
return;
}
amqp_maybe_release_buffers(conn);
ret = amqp_publisher_confirm_wait(conn, &timeout, &result);
if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type) {
if (AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) {
fprintf(stderr, "An unexpected method was received\n");
return;
} else if (AMQP_STATUS_TIMEOUT == ret.library_error) {
// Timeout means you're done; no publisher confirms were waiting!
return;
} else {
die_on_amqp_error(ret, "Waiting for publisher confirmation");
}
}
switch (result.method) {
case AMQP_BASIC_ACK_METHOD:
fprintf(stderr, "Got an ACK!\n");
fprintf(stderr, "Here's the ACK:\n");
fprintf(stderr, "\tdelivery_tag: «%" PRIu64 "»\n",
result.payload.ack.delivery_tag);
fprintf(stderr, "\tmultiple: «%d»\n", result.payload.ack.multiple);
break;
case AMQP_BASIC_NACK_METHOD:
fprintf(stderr, "NACK\n");
break;
case AMQP_BASIC_REJECT_METHOD:
fprintf(stderr, "REJECT\n");
break;
default:
fprintf(stderr, "Unexpected method «%s» is.\n",
amqp_method_name(result.method));
};
}
}
int main(int argc, char const *const *argv) {
char const *hostname;
int port, status;
int rate_limit;
int message_count;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if (argc < 5) {
fprintf(stderr,
"Usage: amqp_producer host port rate_limit message_count\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
rate_limit = atoi(argv[3]);
message_count = atoi(argv[4]);
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
// Enable confirm_select
amqp_confirm_select(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Enable confirm-select");
send_batch(conn, amqp_literal_bytes("test queue"), rate_limit, message_count);
wait_for_acks(conn);
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}

View File

@@ -0,0 +1,79 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
#include <assert.h>
#ifdef _WIN32
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#include <Winsock2.h>
#else
#include <sys/time.h>
#endif
#include "utils.h"
int main(int argc, char const *const *argv) {
char const *hostname;
int port;
amqp_socket_t *socket;
amqp_connection_state_t conn;
struct timeval tval;
struct timeval *tv;
if (argc < 3) {
fprintf(stderr,
"Usage: amqp_connect_timeout host port [timeout_sec "
"[timeout_usec=0]]\n");
return 1;
}
if (argc > 3) {
tv = &tval;
tv->tv_sec = atoi(argv[3]);
if (argc > 4) {
tv->tv_usec = atoi(argv[4]);
} else {
tv->tv_usec = 0;
}
} else {
tv = NULL;
}
hostname = argv[1];
port = atoi(argv[2]);
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
die_on_error(amqp_socket_open_noblock(socket, hostname, port, tv),
"opening TCP socket");
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"guest", "guest"),
"Logging in");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
printf("Done\n");
return 0;
}

185
examples/amqp_consumer.c Normal file
View File

@@ -0,0 +1,185 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
#include <assert.h>
#include "utils.h"
#define SUMMARY_EVERY_US 1000000
static void run(amqp_connection_state_t conn) {
uint64_t start_time = now_microseconds();
int received = 0;
int previous_received = 0;
uint64_t previous_report_time = start_time;
uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
amqp_frame_t frame;
uint64_t now;
for (;;) {
amqp_rpc_reply_t ret;
amqp_envelope_t envelope;
now = now_microseconds();
if (now > next_summary_time) {
int countOverInterval = received - previous_received;
double intervalRate =
countOverInterval / ((now - previous_report_time) / 1000000.0);
printf("%d ms: Received %d - %d since last report (%d Hz)\n",
(int)(now - start_time) / 1000, received, countOverInterval,
(int)intervalRate);
previous_received = received;
previous_report_time = now;
next_summary_time += SUMMARY_EVERY_US;
}
amqp_maybe_release_buffers(conn);
ret = amqp_consume_message(conn, &envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type &&
AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) {
if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) {
return;
}
if (AMQP_FRAME_METHOD == frame.frame_type) {
switch (frame.payload.method.id) {
case AMQP_BASIC_ACK_METHOD:
/* if we've turned publisher confirms on, and we've published a
* message here is a message being confirmed.
*/
break;
case AMQP_BASIC_RETURN_METHOD:
/* if a published message couldn't be routed and the mandatory
* flag was set this is what would be returned. The message then
* needs to be read.
*/
{
amqp_message_t message;
ret = amqp_read_message(conn, frame.channel, &message, 0);
if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
return;
}
amqp_destroy_message(&message);
}
break;
case AMQP_CHANNEL_CLOSE_METHOD:
/* a channel.close method happens when a channel exception occurs,
* this can happen by publishing to an exchange that doesn't exist
* for example.
*
* In this case you would need to open another channel redeclare
* any queues that were declared auto-delete, and restart any
* consumers that were attached to the previous channel.
*/
return;
case AMQP_CONNECTION_CLOSE_METHOD:
/* a connection.close method happens when a connection exception
* occurs, this can happen by trying to use a channel that isn't
* open for example.
*
* In this case the whole connection must be restarted.
*/
return;
default:
fprintf(stderr, "An unexpected method was received %u\n",
frame.payload.method.id);
return;
}
}
}
} else {
amqp_destroy_envelope(&envelope);
}
received++;
}
}
int main(int argc, char const *const *argv) {
char const *hostname;
int port, status;
char const *exchange;
char const *bindingkey;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
amqp_bytes_t queuename;
if (argc < 3) {
fprintf(stderr, "Usage: amqp_consumer host port\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
exchange = "amq.direct"; /* argv[3]; */
bindingkey = "test queue"; /* argv[4]; */
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
amqp_queue_declare_ok_t *r = amqp_queue_declare(
conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
fprintf(stderr, "Out of memory while copying queue name");
return 1;
}
}
amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange),
amqp_cstring_bytes(bindingkey), amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0,
amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
run(conn);
amqp_bytes_free(queuename);
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}

View File

@@ -0,0 +1,62 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
#include "utils.h"
int main(int argc, char const *const *argv) {
char const *hostname;
int port, status;
char const *exchange;
char const *exchangetype;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if (argc < 5) {
fprintf(stderr,
"Usage: amqp_exchange_declare host port exchange exchangetype\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
exchange = argv[3];
exchangetype = argv[4];
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange),
amqp_cstring_bytes(exchangetype), 0, 0, 0, 0,
amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring exchange");
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}

113
examples/amqp_listen.c Normal file
View File

@@ -0,0 +1,113 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
#include <assert.h>
#include "utils.h"
int main(int argc, char const *const *argv) {
char const *hostname;
int port, status;
char const *exchange;
char const *bindingkey;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
amqp_bytes_t queuename;
if (argc < 5) {
fprintf(stderr, "Usage: amqp_listen host port exchange bindingkey\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
exchange = argv[3];
bindingkey = argv[4];
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
amqp_queue_declare_ok_t *r = amqp_queue_declare(
conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
fprintf(stderr, "Out of memory while copying queue name");
return 1;
}
}
amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange),
amqp_cstring_bytes(bindingkey), amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0,
amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
for (;;) {
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
res = amqp_consume_message(conn, &envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL != res.reply_type) {
break;
}
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
(unsigned)envelope.delivery_tag, (int)envelope.exchange.len,
(char *)envelope.exchange.bytes, (int)envelope.routing_key.len,
(char *)envelope.routing_key.bytes);
if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n",
(int)envelope.message.properties.content_type.len,
(char *)envelope.message.properties.content_type.bytes);
}
printf("----\n");
amqp_dump(envelope.message.body.bytes, envelope.message.body.len);
amqp_destroy_envelope(&envelope);
}
}
amqp_bytes_free(queuename);
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}

90
examples/amqp_listenq.c Normal file
View File

@@ -0,0 +1,90 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
#include <assert.h>
#include "utils.h"
int main(int argc, char const *const *argv) {
char const *hostname;
int port, status;
char const *queuename;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if (argc < 4) {
fprintf(stderr, "Usage: amqp_listenq host port queuename\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
queuename = argv[3];
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_basic_consume(conn, 1, amqp_cstring_bytes(queuename), amqp_empty_bytes,
0, 0, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
for (;;) {
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
res = amqp_consume_message(conn, &envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL != res.reply_type) {
break;
}
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
(unsigned)envelope.delivery_tag, (int)envelope.exchange.len,
(char *)envelope.exchange.bytes, (int)envelope.routing_key.len,
(char *)envelope.routing_key.bytes);
if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n",
(int)envelope.message.properties.content_type.len,
(char *)envelope.message.properties.content_type.bytes);
}
printf("----\n");
amqp_dump(envelope.message.body.bytes, envelope.message.body.len);
amqp_destroy_envelope(&envelope);
}
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}

117
examples/amqp_producer.c Normal file
View File

@@ -0,0 +1,117 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
#include "utils.h"
#define SUMMARY_EVERY_US 1000000
static void send_batch(amqp_connection_state_t conn, amqp_bytes_t queue_name,
int rate_limit, int message_count) {
uint64_t start_time = now_microseconds();
int i;
int sent = 0;
int previous_sent = 0;
uint64_t previous_report_time = start_time;
uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
char message[256];
amqp_bytes_t message_bytes;
for (i = 0; i < (int)sizeof(message); i++) {
message[i] = i & 0xff;
}
message_bytes.len = sizeof(message);
message_bytes.bytes = message;
for (i = 0; i < message_count; i++) {
uint64_t now = now_microseconds();
die_on_error(amqp_basic_publish(conn, 1, amqp_literal_bytes("amq.direct"),
queue_name, 0, 0, NULL, message_bytes),
"Publishing");
sent++;
if (now > next_summary_time) {
int countOverInterval = sent - previous_sent;
double intervalRate =
countOverInterval / ((now - previous_report_time) / 1000000.0);
printf("%d ms: Sent %d - %d since last report (%d Hz)\n",
(int)(now - start_time) / 1000, sent, countOverInterval,
(int)intervalRate);
previous_sent = sent;
previous_report_time = now;
next_summary_time += SUMMARY_EVERY_US;
}
while (((i * 1000000.0) / (now - start_time)) > rate_limit) {
microsleep(2000);
now = now_microseconds();
}
}
{
uint64_t stop_time = now_microseconds();
int total_delta = (int)(stop_time - start_time);
printf("PRODUCER - Message count: %d\n", message_count);
printf("Total time, milliseconds: %d\n", total_delta / 1000);
printf("Overall messages-per-second: %g\n",
(message_count / (total_delta / 1000000.0)));
}
}
int main(int argc, char const *const *argv) {
char const *hostname;
int port, status;
int rate_limit;
int message_count;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if (argc < 5) {
fprintf(stderr,
"Usage: amqp_producer host port rate_limit message_count\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
rate_limit = atoi(argv[3]);
message_count = atoi(argv[4]);
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
send_batch(conn, amqp_literal_bytes("test queue"), rate_limit, message_count);
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}

View File

@@ -0,0 +1,211 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
#include <assert.h>
#include "utils.h"
int main(int argc, char *argv[]) {
char const *hostname;
int port, status;
char const *exchange;
char const *routingkey;
char const *messagebody;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
amqp_bytes_t reply_to_queue;
if (argc < 6) { /* minimum number of mandatory arguments */
fprintf(stderr,
"usage:\namqp_rpc_sendstring_client host port exchange routingkey "
"messagebody\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
exchange = argv[3];
routingkey = argv[4];
messagebody = argv[5];
/*
establish a channel that is used to connect RabbitMQ server
*/
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
/*
create private reply_to queue
*/
{
amqp_queue_declare_ok_t *r = amqp_queue_declare(
conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
reply_to_queue = amqp_bytes_malloc_dup(r->queue);
if (reply_to_queue.bytes == NULL) {
fprintf(stderr, "Out of memory while copying queue name");
return 1;
}
}
/*
send the message
*/
{
/*
set properties
*/
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_REPLY_TO_FLAG |
AMQP_BASIC_CORRELATION_ID_FLAG;
props.content_type = amqp_literal_bytes("text/plain");
props.delivery_mode = 2; /* persistent delivery mode */
props.reply_to = amqp_bytes_malloc_dup(reply_to_queue);
if (props.reply_to.bytes == NULL) {
fprintf(stderr, "Out of memory while copying queue name");
return 1;
}
props.correlation_id = amqp_literal_bytes("1");
/*
publish
*/
die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange),
amqp_cstring_bytes(routingkey), 0, 0,
&props, amqp_cstring_bytes(messagebody)),
"Publishing");
amqp_bytes_free(props.reply_to);
}
/*
wait an answer
*/
{
amqp_basic_consume(conn, 1, reply_to_queue, amqp_empty_bytes, 0, 1, 0,
amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
amqp_bytes_free(reply_to_queue);
{
amqp_frame_t frame;
int result;
amqp_basic_deliver_t *d;
amqp_basic_properties_t *p;
size_t body_target;
size_t body_received;
for (;;) {
amqp_maybe_release_buffers(conn);
result = amqp_simple_wait_frame(conn, &frame);
printf("Result: %d\n", result);
if (result < 0) {
break;
}
printf("Frame type: %u channel: %u\n", frame.frame_type, frame.channel);
if (frame.frame_type != AMQP_FRAME_METHOD) {
continue;
}
printf("Method: %s\n", amqp_method_name(frame.payload.method.id));
if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
continue;
}
d = (amqp_basic_deliver_t *)frame.payload.method.decoded;
printf("Delivery: %u exchange: %.*s routingkey: %.*s\n",
(unsigned)d->delivery_tag, (int)d->exchange.len,
(char *)d->exchange.bytes, (int)d->routing_key.len,
(char *)d->routing_key.bytes);
result = amqp_simple_wait_frame(conn, &frame);
if (result < 0) {
break;
}
if (frame.frame_type != AMQP_FRAME_HEADER) {
fprintf(stderr, "Expected header!");
abort();
}
p = (amqp_basic_properties_t *)frame.payload.properties.decoded;
if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n", (int)p->content_type.len,
(char *)p->content_type.bytes);
}
printf("----\n");
body_target = (size_t)frame.payload.properties.body_size;
body_received = 0;
while (body_received < body_target) {
result = amqp_simple_wait_frame(conn, &frame);
if (result < 0) {
break;
}
if (frame.frame_type != AMQP_FRAME_BODY) {
fprintf(stderr, "Expected body!");
abort();
}
body_received += frame.payload.body_fragment.len;
assert(body_received <= body_target);
amqp_dump(frame.payload.body_fragment.bytes,
frame.payload.body_fragment.len);
}
if (body_received != body_target) {
/* Can only happen when amqp_simple_wait_frame returns <= 0 */
/* We break here to close the connection */
break;
}
/* everything was fine, we can quit now because we received the reply */
break;
}
}
}
/*
closing
*/
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}

View File

@@ -0,0 +1,71 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
#include "utils.h"
int main(int argc, char const *const *argv) {
char const *hostname;
int port, status;
char const *exchange;
char const *routingkey;
char const *messagebody;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if (argc < 6) {
fprintf(
stderr,
"Usage: amqp_sendstring host port exchange routingkey messagebody\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
exchange = argv[3];
routingkey = argv[4];
messagebody = argv[5];
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_literal_bytes("text/plain");
props.delivery_mode = 2; /* persistent delivery mode */
die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange),
amqp_cstring_bytes(routingkey), 0, 0,
&props, amqp_cstring_bytes(messagebody)),
"Publishing");
}
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}

102
examples/amqp_ssl_connect.c Normal file
View File

@@ -0,0 +1,102 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/ssl_socket.h>
#include <assert.h>
#ifdef _WIN32
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#include <Winsock2.h>
#else
#include <sys/time.h>
#endif
#include "utils.h"
int main(int argc, char const *const *argv) {
char const *hostname;
int port;
int timeout;
amqp_socket_t *socket;
amqp_connection_state_t conn;
struct timeval tval;
struct timeval *tv;
if (argc < 3) {
fprintf(stderr,
"Usage: amqp_ssl_connect host port timeout_sec "
"[cacert.pem [engine engine_ID] [verifypeer] [verifyhostname] "
"[key.pem cert.pem]]\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
timeout = atoi(argv[3]);
if (timeout > 0) {
tv = &tval;
tv->tv_sec = timeout;
tv->tv_usec = 0;
} else {
tv = NULL;
}
conn = amqp_new_connection();
socket = amqp_ssl_socket_new(conn);
if (!socket) {
die("creating SSL/TLS socket");
}
amqp_ssl_socket_set_verify_peer(socket, 0);
amqp_ssl_socket_set_verify_hostname(socket, 0);
if (argc > 5) {
int nextarg = 5;
die_on_error(amqp_ssl_socket_set_cacert(socket, argv[4]),
"setting CA certificate");
if (argc > nextarg && !strcmp("engine", argv[nextarg])) {
amqp_set_ssl_engine(argv[++nextarg]);
nextarg++;
}
if (argc > nextarg && !strcmp("verifypeer", argv[nextarg])) {
amqp_ssl_socket_set_verify_peer(socket, 1);
nextarg++;
}
if (argc > nextarg && !strcmp("verifyhostname", argv[nextarg])) {
amqp_ssl_socket_set_verify_hostname(socket, 1);
nextarg++;
}
if (argc > nextarg + 1) {
die_on_error(
amqp_ssl_socket_set_key(socket, argv[nextarg + 1], argv[nextarg]),
"setting client key");
}
}
die_on_error(amqp_socket_open_noblock(socket, hostname, port, tv),
"opening SSL/TLS connection");
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"guest", "guest"),
"Logging in");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
die_on_error(amqp_uninitialize_ssl_library(), "Uninitializing SSL library");
printf("Done\n");
return 0;
}

63
examples/amqp_unbind.c Normal file
View File

@@ -0,0 +1,63 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/tcp_socket.h>
#include "utils.h"
int main(int argc, char const *const *argv) {
char const *hostname;
int port, status;
char const *exchange;
char const *bindingkey;
char const *queue;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if (argc < 6) {
fprintf(stderr, "Usage: amqp_unbind host port exchange bindingkey queue\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
exchange = argv[3];
bindingkey = argv[4];
queue = argv[5];
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
"guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
amqp_queue_unbind(conn, 1, amqp_cstring_bytes(queue),
amqp_cstring_bytes(exchange),
amqp_cstring_bytes(bindingkey), amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Unbinding");
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
"Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
"Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}

View File

@@ -0,0 +1,20 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit
#include <stdint.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
uint64_t now_microseconds(void) {
struct timeval tv;
gettimeofday(&tv, NULL);
return (uint64_t)tv.tv_sec * 1000000 + (uint64_t)tv.tv_usec;
}
void microsleep(int usec) {
struct timespec req;
req.tv_sec = 0;
req.tv_nsec = 1000 * usec;
nanosleep(&req, NULL);
}

156
examples/utils.c Normal file
View File

@@ -0,0 +1,156 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit
#include <ctype.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/framing.h>
#include <stdint.h>
#include "utils.h"
void die(const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fprintf(stderr, "\n");
exit(1);
}
void die_on_error(int x, char const *context) {
if (x < 0) {
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x));
exit(1);
}
}
void die_on_amqp_error(amqp_rpc_reply_t x, char const *context) {
switch (x.reply_type) {
case AMQP_RESPONSE_NORMAL:
return;
case AMQP_RESPONSE_NONE:
fprintf(stderr, "%s: missing RPC reply type!\n", context);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error));
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m =
(amqp_connection_close_t *)x.reply.decoded;
fprintf(stderr, "%s: server connection error %uh, message: %.*s\n",
context, m->reply_code, (int)m->reply_text.len,
(char *)m->reply_text.bytes);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *)x.reply.decoded;
fprintf(stderr, "%s: server channel error %uh, message: %.*s\n",
context, m->reply_code, (int)m->reply_text.len,
(char *)m->reply_text.bytes);
break;
}
default:
fprintf(stderr, "%s: unknown server error, method id 0x%08X\n",
context, x.reply.id);
break;
}
break;
}
exit(1);
}
static void dump_row(long count, int numinrow, int *chs) {
int i;
printf("%08lX:", count - numinrow);
if (numinrow > 0) {
for (i = 0; i < numinrow; i++) {
if (i == 8) {
printf(" :");
}
printf(" %02X", chs[i]);
}
for (i = numinrow; i < 16; i++) {
if (i == 8) {
printf(" :");
}
printf(" ");
}
printf(" ");
for (i = 0; i < numinrow; i++) {
if (isprint(chs[i])) {
printf("%c", chs[i]);
} else {
printf(".");
}
}
}
printf("\n");
}
static int rows_eq(int *a, int *b) {
int i;
for (i = 0; i < 16; i++)
if (a[i] != b[i]) {
return 0;
}
return 1;
}
void amqp_dump(void const *buffer, size_t len) {
unsigned char *buf = (unsigned char *)buffer;
long count = 0;
int numinrow = 0;
int chs[16];
int oldchs[16] = {0};
int showed_dots = 0;
size_t i;
for (i = 0; i < len; i++) {
int ch = buf[i];
if (numinrow == 16) {
int j;
if (rows_eq(oldchs, chs)) {
if (!showed_dots) {
showed_dots = 1;
printf(
" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
}
} else {
showed_dots = 0;
dump_row(count, numinrow, chs);
}
for (j = 0; j < 16; j++) {
oldchs[j] = chs[j];
}
numinrow = 0;
}
count++;
chs[numinrow++] = ch;
}
dump_row(count, numinrow, chs);
if (numinrow != 0) {
printf("%08lX:\n", count);
}
}

16
examples/utils.h Normal file
View File

@@ -0,0 +1,16 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit
#ifndef librabbitmq_examples_utils_h
#define librabbitmq_examples_utils_h
void die(const char *fmt, ...);
extern void die_on_error(int x, char const *context);
extern void die_on_amqp_error(amqp_rpc_reply_t x, char const *context);
extern void amqp_dump(void const *buffer, size_t len);
extern uint64_t now_microseconds(void);
extern void microsleep(int usec);
#endif

View File

@@ -0,0 +1,15 @@
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
// SPDX-License-Identifier: mit
#include <stdint.h>
#include <windows.h>
uint64_t now_microseconds(void) {
FILETIME ft;
GetSystemTimeAsFileTime(&ft);
return (((uint64_t)ft.dwHighDateTime << 32) | (uint64_t)ft.dwLowDateTime) /
10;
}
void microsleep(int usec) { Sleep(usec / 1000); }