From 3ed2e30e3ada54667a0243acf84601cb98e78bc2 Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Thu, 23 May 2024 10:28:26 +0200 Subject: [PATCH 1/6] Provide per-exchange/queue metrics w/out channelID --- .../include/rabbit_core_metrics.hrl | 8 ++ .../rabbit_common/src/rabbit_core_metrics.erl | 44 ++++++--- ...etheus_rabbitmq_core_metrics_collector.erl | 68 +++++++++++++- .../test/rabbit_prometheus_http_SUITE.erl | 94 ++++++++++++++++++- 4 files changed, 194 insertions(+), 20 deletions(-) diff --git a/deps/rabbit_common/include/rabbit_core_metrics.hrl b/deps/rabbit_common/include/rabbit_core_metrics.hrl index 59743b4ec7da..e64c7c4b8246 100644 --- a/deps/rabbit_common/include/rabbit_core_metrics.hrl +++ b/deps/rabbit_common/include/rabbit_core_metrics.hrl @@ -28,6 +28,14 @@ {auth_attempt_metrics, set}, {auth_attempt_detailed_metrics, set}]). +% `CORE_NON_CHANNEL_TABLES` are tables that store counters representing the +% same info as some of the channel_queue_metrics, channel_exchange_metrics and +% channel_queue_exchange_metrics but without including the channel ID in the +% key. +-define(CORE_NON_CHANNEL_TABLES, [{queue_counter_metrics, set}, + {exchange_metrics, set}, + {queue_exchange_metrics, set}]). + -define(CONNECTION_CHURN_METRICS, {node(), 0, 0, 0, 0, 0, 0, 0}). %% connection_created :: {connection_id, proplist} diff --git a/deps/rabbit_common/src/rabbit_core_metrics.erl b/deps/rabbit_common/src/rabbit_core_metrics.erl index 0c46b41db456..f872a6bc278d 100644 --- a/deps/rabbit_common/src/rabbit_core_metrics.erl +++ b/deps/rabbit_common/src/rabbit_core_metrics.erl @@ -111,13 +111,15 @@ create_table({Table, Type}) -> {read_concurrency, true}]). init() -> - _ = [create_table({Table, Type}) - || {Table, Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES], + Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES, + _ = [create_table({Table, Type}) + || {Table, Type} <- Tables], ok. terminate() -> + Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES, [ets:delete(Table) - || {Table, _Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES], + || {Table, _Type} <- Tables], ok. connection_created(Pid, Infos) -> @@ -166,53 +168,65 @@ channel_stats(reductions, Id, Value) -> ets:insert(channel_process_metrics, {Id, Value}), ok. -channel_stats(exchange_stats, publish, Id, Value) -> +channel_stats(exchange_stats, publish, {_ChannelPid, XName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_exchange_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0}), + _ = ets:update_counter(exchange_metrics, XName, {2, Value}, {XName, 0, 0, 0, 0, 0}), ok; -channel_stats(exchange_stats, confirm, Id, Value) -> +channel_stats(exchange_stats, confirm, {_ChannelPid, XName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_exchange_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0}), + _ = ets:update_counter(exchange_metrics, XName, {3, Value}, {XName, 0, 0, 0, 0, 0}), ok; -channel_stats(exchange_stats, return_unroutable, Id, Value) -> +channel_stats(exchange_stats, return_unroutable, {_ChannelPid, XName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_exchange_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0}), + _ = ets:update_counter(exchange_metrics, XName, {4, Value}, {XName, 0, 0, 0, 0, 0}), ok; -channel_stats(exchange_stats, drop_unroutable, Id, Value) -> +channel_stats(exchange_stats, drop_unroutable, {_ChannelPid, XName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_exchange_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0}), + _ = ets:update_counter(exchange_metrics, XName, {5, Value}, {XName, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_exchange_stats, publish, Id, Value) -> +channel_stats(queue_exchange_stats, publish, {_ChannelPid, QueueExchange} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_exchange_metrics, Id, Value, {Id, 0, 0}), + _ = ets:update_counter(queue_exchange_metrics, QueueExchange, Value, {QueueExchange, 0, 0}), ok; -channel_stats(queue_stats, get, Id, Value) -> +channel_stats(queue_stats, get, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_counter_metrics, QName, {2, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, get_no_ack, Id, Value) -> +channel_stats(queue_stats, get_no_ack, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_counter_metrics, QName, {3, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, deliver, Id, Value) -> +channel_stats(queue_stats, deliver, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_counter_metrics, QName, {4, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, deliver_no_ack, Id, Value) -> +channel_stats(queue_stats, deliver_no_ack, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_counter_metrics, QName, {5, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, redeliver, Id, Value) -> +channel_stats(queue_stats, redeliver, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {6, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_counter_metrics, QName, {6, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, ack, Id, Value) -> +channel_stats(queue_stats, ack, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {7, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_counter_metrics, QName, {7, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, get_empty, Id, Value) -> +channel_stats(queue_stats, get_empty, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {8, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_counter_metrics, QName, {8, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok. delete(Table, Key) -> diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index d2198ece681e..43ee8a2a2bc0 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -162,7 +162,15 @@ {2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes}, {2, undefined, stream_segments, counter, "Total number of stream segment files", segments} ]}, - + {queue_counter_metrics, [ + {2, undefined, queue_get_ack_total, counter, "Total number of messages fetched with basic.get in manual acknowledgement mode"}, + {3, undefined, queue_get_total, counter, "Total number of messages fetched with basic.get in automatic acknowledgement mode"}, + {4, undefined, queue_messages_delivered_ack_total, counter, "Total number of messages delivered to consumers in manual acknowledgement mode"}, + {5, undefined, queue_messages_delivered_total, counter, "Total number of messages delivered to consumers in automatic acknowledgement mode"}, + {6, undefined, queue_messages_redelivered_total, counter, "Total number of messages redelivered to consumers"}, + {7, undefined, queue_messages_acked_total, counter, "Total number of messages acknowledged by consumers"}, + {8, undefined, queue_get_empty_total, counter, "Total number of times basic.get operations fetched no message"} + ]}, %%% Metrics that contain reference to a channel. Some of them also have %%% a queue name, but in this case filtering on it doesn't make any %%% sense, as the queue is not an object of interest here. @@ -176,6 +184,13 @@ {2, undefined, channel_prefetch, gauge, "Total limit of unacknowledged messages for all consumers on a channel", global_prefetch_count} ]}, + {exchange_metrics, [ + {2, undefined, exchange_messages_published_total, counter, "Total number of messages published into an exchange on a channel"}, + {3, undefined, exchange_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"}, + {4, undefined, exchange_messages_unroutable_returned_total, counter, "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"}, + {5, undefined, exchange_messages_unroutable_dropped_total, counter, "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"} + ]}, + {channel_exchange_metrics, [ {2, undefined, channel_messages_published_total, counter, "Total number of messages published into an exchange on a channel"}, {3, undefined, channel_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"}, @@ -210,6 +225,10 @@ {2, undefined, connection_channels, gauge, "Channels on a connection", channels} ]}, + {queue_exchange_metrics, [ + {2, undefined, queue_exchange_messages_published_total, counter, "Total number of messages published to queues"} + ]}, + {channel_queue_exchange_metrics, [ {2, undefined, queue_messages_published_total, counter, "Total number of messages published to queues"} ]} @@ -544,8 +563,11 @@ get_data(queue_metrics = Table, false, VHostsFilter) -> {disk_reads, A15}, {disk_writes, A16}, {segments, A17}]}]; get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; Table == queue_coarse_metrics; + Table == queue_counter_metrics; Table == channel_queue_metrics; Table == connection_coarse_metrics; + Table == exchange_metrics; + Table == queue_exchange_metrics; Table == channel_queue_exchange_metrics; Table == ra_metrics; Table == channel_process_metrics -> @@ -553,6 +575,8 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; %% For queue_coarse_metrics ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> Acc; + ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> + Acc; ({_, V1}, {T, A1}) -> {T, V1 + A1}; ({_, V1, _}, {T, A1}) -> @@ -579,6 +603,42 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; _ -> [Result] end; +get_data(exchange_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter)-> + ets:foldl(fun + ({#resource{kind = exchange, virtual_host = VHost}, _, _, _, _, _} = Row, Acc) when + map_get(VHost, VHostsFilter) + -> + [Row | Acc]; + (_Row, Acc) -> + Acc + end, [], Table); +get_data(exchange_metrics, true, _VhostsFilter) -> + []; +get_data(queue_counter_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter)-> + ets:foldl(fun + ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _} = Row, Acc) when + map_get(VHost, VHostsFilter) + -> + [Row | Acc]; + (_Row, Acc) -> + Acc + end, [], Table); +get_data(queue_counter_metrics, true, _VHostsFilter) -> + []; +get_data(queue_exchange_metrics = Table, true, VHostsFilter) -> + ets:foldl(fun + ({{ + #resource{kind = queue, virtual_host = VHost}, + #resource{kind = exchange, virtual_host = VHost} + }, _, _} = Row, Acc) when + map_get(VHost, VHostsFilter) + -> + [Row | Acc]; + (_Row, Acc) -> + Acc + end, [], Table); +get_data(queue_exchange_metrics, true, _VHostsFilter) -> + []; get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) -> ets:foldl(fun ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) -> @@ -671,15 +731,15 @@ division(A, B) -> accumulate_count_and_sum(Value, {Count, Sum}) -> {Count + 1, Sum + Value}. -empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count -> +empty(T) when T == channel_queue_exchange_metrics; T == queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count -> {T, 0}; empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics -> {T, 0, 0, 0}; -empty(T) when T == channel_exchange_metrics; T == queue_coarse_metrics; T == connection_metrics -> +empty(T) when T == channel_exchange_metrics; T == exchange_metrics; T == queue_coarse_metrics; T == connection_metrics -> {T, 0, 0, 0, 0}; empty(T) when T == ra_metrics -> {T, 0, 0, 0, 0, 0, {0, 0}}; -empty(T) when T == channel_queue_metrics; T == channel_metrics -> +empty(T) when T == channel_queue_metrics; T == queue_counter_metrics; T == channel_metrics -> {T, 0, 0, 0, 0, 0, 0, 0}; empty(queue_metrics = T) -> {T, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}. diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index 033723507a8f..50bf0b1ad62a 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -34,7 +34,7 @@ groups() -> {config_path, [], generic_tests()}, {global_labels, [], generic_tests()}, {aggregated_metrics, [], [ - aggregated_metrics_test, + aggregated_metrics_test, specific_erlang_metrics_present_test, global_metrics_present_test, global_metrics_single_metric_family_test @@ -57,6 +57,8 @@ groups() -> queue_consumer_count_single_vhost_per_object_test, queue_consumer_count_all_vhosts_per_object_test, queue_coarse_metrics_per_object_test, + queue_counter_metrics_per_object_test, + queue_exchange_metrics_per_object_test, queue_metrics_per_object_test, queue_consumer_count_and_queue_metrics_mutually_exclusive_test, vhost_status_metric, @@ -523,6 +525,96 @@ queue_coarse_metrics_per_object_test(Config) -> map_get(rabbitmq_detailed_queue_messages, parse_response(Body3))), ok. +queue_counter_metrics_per_object_test(Config) -> + Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7]}, + + {_, Body1} = http_get_with_pal(Config, + "/metrics/detailed?vhost=vhost-1&family=queue_counter_metrics", + [], 200), + ?assertEqual( + Expected1, + map_get( + rabbitmq_detailed_queue_messages_delivered_ack_total, + parse_response(Body1))), + + {_, Body2} = http_get_with_pal(Config, + "/metrics/detailed?vhost=vhost-2&family=queue_counter_metrics", + [], 200), + Expected2 = #{#{queue => "vhost-2-queue-with-consumer", vhost => "vhost-2"} => [11]}, + + ?assertEqual( + Expected2, + map_get( + rabbitmq_detailed_queue_messages_delivered_ack_total, + parse_response(Body2))), + + %% Maybe missing, tests for the queue_exchange_metrics + ok. + + +queue_exchange_metrics_per_object_test(Config) -> + Expected1 = #{ + #{ + queue => "vhost-1-queue-with-messages", + vhost => "vhost-1", + exchange => "" + } => [7], + #{ + exchange => "", + queue => "vhost-1-queue-with-consumer", + vhost => "vhost-1" + } => [7] + }, + + {_, Body1} = http_get_with_pal(Config, + "/metrics/detailed?vhost=vhost-1&family=queue_exchange_metrics", + [], 200), + ?assertEqual( + Expected1, + map_get( + rabbitmq_detailed_queue_exchange_messages_published_total, + parse_response(Body1))), + + + {_, Body2} = http_get_with_pal(Config, + "/metrics/detailed?vhost=vhost-2&family=queue_exchange_metrics", + [], 200), + + + Expected2 = #{ + #{ + queue => "vhost-2-queue-with-messages", + vhost => "vhost-2", + exchange => "" + } => [11], + #{ + exchange => "", + queue => "vhost-2-queue-with-consumer", + vhost => "vhost-2" + } => [11] + }, + + ?assertEqual( + Expected2, + map_get( + rabbitmq_detailed_queue_exchange_messages_published_total, + parse_response(Body2))), + + ok. + +exchange_metrics_per_object_test(Config) -> + Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7]}, + + {_, Body} = http_get_with_pal(Config, + "/metrics/detailed?vhost=vhost-1&family=exchange_metrics", + [], 200), + ?assertEqual( + Expected1, + map_get( + rabbitmq_detailed_queue_messages_delivered_ack_total, + parse_response(Body))), + ok. + queue_metrics_per_object_test(Config) -> Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7], #{queue => "vhost-1-queue-with-messages", vhost => "vhost-1"} => [1]}, From 64e0812ced2bfbf3f21c6bc3b95045463abc5499 Mon Sep 17 00:00:00 2001 From: LoisSotoLopez Date: Thu, 27 Jun 2024 09:42:40 +0200 Subject: [PATCH 2/6] Update deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Péter Gömöri --- .../prometheus_rabbitmq_core_metrics_collector.erl | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index 43ee8a2a2bc0..c36b828eb658 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -577,6 +577,8 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; Acc; ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> Acc; + ({{#resource{kind = queue, virtual_host = VHost}, #resource{kind = exchange}}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> + Acc; ({_, V1}, {T, A1}) -> {T, V1 + A1}; ({_, V1, _}, {T, A1}) -> @@ -612,9 +614,7 @@ get_data(exchange_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) (_Row, Acc) -> Acc end, [], Table); -get_data(exchange_metrics, true, _VhostsFilter) -> - []; -get_data(queue_counter_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter)-> +get_data(queue_counter_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) -> ets:foldl(fun ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) @@ -623,9 +623,7 @@ get_data(queue_counter_metrics = Table, true, VHostsFilter) when is_map(VHostsFi (_Row, Acc) -> Acc end, [], Table); -get_data(queue_counter_metrics, true, _VHostsFilter) -> - []; -get_data(queue_exchange_metrics = Table, true, VHostsFilter) -> +get_data(queue_exchange_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) -> ets:foldl(fun ({{ #resource{kind = queue, virtual_host = VHost}, @@ -637,8 +635,6 @@ get_data(queue_exchange_metrics = Table, true, VHostsFilter) -> (_Row, Acc) -> Acc end, [], Table); -get_data(queue_exchange_metrics, true, _VHostsFilter) -> - []; get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) -> ets:foldl(fun ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) -> From 1aec73b21c42b2260d7acbb09ec20c08ff829386 Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Mon, 1 Jul 2024 13:49:48 +0200 Subject: [PATCH 3/6] New metrics return on detailed only Make new metrics return on detailed only and adjust some of the help messages. --- deps/rabbit/src/rabbit_core_metrics_gc.erl | 20 +++++++-- ...etheus_rabbitmq_core_metrics_collector.erl | 43 +++++++++---------- 2 files changed, 37 insertions(+), 26 deletions(-) diff --git a/deps/rabbit/src/rabbit_core_metrics_gc.erl b/deps/rabbit/src/rabbit_core_metrics_gc.erl index 0849bd503512..54b3a7686800 100644 --- a/deps/rabbit/src/rabbit_core_metrics_gc.erl +++ b/deps/rabbit/src/rabbit_core_metrics_gc.erl @@ -92,14 +92,17 @@ gc_leader_data(Id, Table, GbSet) -> gc_global_queues() -> GbSet = gb_sets:from_list(rabbit_amqqueue:list_names()), gc_process_and_entity(channel_queue_metrics, GbSet), + gc_process_and_entity(queue_counter_metrics, GbSet), gc_process_and_entity(consumer_created, GbSet), ExchangeGbSet = gb_sets:from_list(rabbit_exchange:list_names()), - gc_process_and_entities(channel_queue_exchange_metrics, GbSet, ExchangeGbSet). + gc_process_and_entities(channel_queue_exchange_metrics, GbSet, ExchangeGbSet), + gc_process_and_entities(queue_exchange_metrics, GbSet, ExchangeGbSet). gc_exchanges() -> Exchanges = rabbit_exchange:list_names(), GbSet = gb_sets:from_list(Exchanges), - gc_process_and_entity(channel_exchange_metrics, GbSet). + gc_process_and_entity(channel_exchange_metrics, GbSet), + gc_process_and_entity(exchange_metrics, GbSet). gc_nodes() -> Nodes = rabbit_nodes:list_members(), @@ -172,6 +175,12 @@ gc_process_and_entity(Table, GbSet) -> ({{Pid, Id} = Key, _, _, _, _, _}, none) when Table == channel_exchange_metrics -> gc_process_and_entity(Id, Pid, Table, Key, GbSet); + ({Id = Key, _, _, _, _, _}, none) + when Table == exchange_metrics -> + gc_entity(Id, Table, Key, GbSet); + ({Id = Key, _, _, _, _, _, _, _, _}, none) + when Table == queue_counter_metrics -> + gc_entity(Id, Table, Key, GbSet); ({{Id, Pid, _} = Key, _, _, _, _, _, _}, none) when Table == consumer_created -> gc_process_and_entity(Id, Pid, Table, Key, GbSet); @@ -189,7 +198,12 @@ gc_process_and_entity(Id, Pid, Table, Key, GbSet) -> end. gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) -> - ets:foldl(fun({{Pid, {Q, X}} = Key, _, _}, none) -> + ets:foldl(fun + ({{QueueId, ExchangeId} = Key, _, _}, none) + when Table == queue_exchange_metrics -> + gc_entity(QueueId, Table, Key, QueueGbSet), + gc_entity(ExchangeId, Table, Key, ExchangeGbSet); + ({{Pid, {Q, X}} = Key, _, _}, none) -> gc_process(Pid, Table, Key), gc_entity(Q, Table, Key, QueueGbSet), gc_entity(X, Table, Key, ExchangeGbSet) diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index c36b828eb658..c5836ef122c2 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -162,15 +162,6 @@ {2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes}, {2, undefined, stream_segments, counter, "Total number of stream segment files", segments} ]}, - {queue_counter_metrics, [ - {2, undefined, queue_get_ack_total, counter, "Total number of messages fetched with basic.get in manual acknowledgement mode"}, - {3, undefined, queue_get_total, counter, "Total number of messages fetched with basic.get in automatic acknowledgement mode"}, - {4, undefined, queue_messages_delivered_ack_total, counter, "Total number of messages delivered to consumers in manual acknowledgement mode"}, - {5, undefined, queue_messages_delivered_total, counter, "Total number of messages delivered to consumers in automatic acknowledgement mode"}, - {6, undefined, queue_messages_redelivered_total, counter, "Total number of messages redelivered to consumers"}, - {7, undefined, queue_messages_acked_total, counter, "Total number of messages acknowledged by consumers"}, - {8, undefined, queue_get_empty_total, counter, "Total number of times basic.get operations fetched no message"} - ]}, %%% Metrics that contain reference to a channel. Some of them also have %%% a queue name, but in this case filtering on it doesn't make any %%% sense, as the queue is not an object of interest here. @@ -184,13 +175,6 @@ {2, undefined, channel_prefetch, gauge, "Total limit of unacknowledged messages for all consumers on a channel", global_prefetch_count} ]}, - {exchange_metrics, [ - {2, undefined, exchange_messages_published_total, counter, "Total number of messages published into an exchange on a channel"}, - {3, undefined, exchange_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"}, - {4, undefined, exchange_messages_unroutable_returned_total, counter, "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"}, - {5, undefined, exchange_messages_unroutable_dropped_total, counter, "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"} - ]}, - {channel_exchange_metrics, [ {2, undefined, channel_messages_published_total, counter, "Total number of messages published into an exchange on a channel"}, {3, undefined, channel_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"}, @@ -225,12 +209,8 @@ {2, undefined, connection_channels, gauge, "Channels on a connection", channels} ]}, - {queue_exchange_metrics, [ - {2, undefined, queue_exchange_messages_published_total, counter, "Total number of messages published to queues"} - ]}, - {channel_queue_exchange_metrics, [ - {2, undefined, queue_messages_published_total, counter, "Total number of messages published to queues"} + {2, undefined, queue_messages_published_total, counter, "Total number of messages published into a queue through a exchange on a channel"} ]} ]). @@ -244,8 +224,25 @@ ]}, {exchange_names, [ {2, undefined, exchange_name, gauge, "Enumerates exchanges without any additional info. This value is cluster-wide. A cheaper alternative to `exchange_bindings`"} - ]} -]). + ]}, + {queue_exchange_metrics, [ + {2, undefined, queue_exchange_messages_published_total, counter, "Total number of messages published into a queue through an exchange"} + ]}, + {exchange_metrics, [ + {2, undefined, exchange_messages_published_total, counter, "Total number of messages published into an exchange"}, + {3, undefined, exchange_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed"}, + {4, undefined, exchange_messages_unroutable_returned_total, counter, "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"}, + {5, undefined, exchange_messages_unroutable_dropped_total, counter, "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"} + ]}, + {queue_counter_metrics, [ + {2, undefined, queue_get_ack_total, counter, "Total number of messages fetched from a queue with basic.get in manual acknowledgement mode"}, + {3, undefined, queue_get_total, counter, "Total number of messages fetched from a queue with basic.get in automatic acknowledgement mode"}, + {4, undefined, queue_messages_delivered_ack_total, counter, "Total number of messages delivered from a queue to consumers in manual acknowledgement mode"}, + {5, undefined, queue_messages_delivered_total, counter, "Total number of messages delivered from a queue to consumers in automatic acknowledgement mode"}, + {6, undefined, queue_messages_redelivered_total, counter, "Total number of messages redelivered from a queue to consumers"}, + {7, undefined, queue_messages_acked_total, counter, "Total number of messages acknowledged by consumers on a queue"}, + {8, undefined, queue_get_empty_total, counter, "Total number of times basic.get operations fetched no message on a queue"} + ]}]). -define(TOTALS, [ %% ordering differs from metrics above, refer to list comprehension From 4d592da5ef8683cb3db1fcd41c2bb06e164d2557 Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Tue, 2 Jul 2024 12:16:08 +0200 Subject: [PATCH 4/6] Use functions w/out _process as its more approp. --- deps/rabbit/src/rabbit_core_metrics_gc.erl | 28 ++++++++++++---------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/deps/rabbit/src/rabbit_core_metrics_gc.erl b/deps/rabbit/src/rabbit_core_metrics_gc.erl index 54b3a7686800..04b95980ff0d 100644 --- a/deps/rabbit/src/rabbit_core_metrics_gc.erl +++ b/deps/rabbit/src/rabbit_core_metrics_gc.erl @@ -96,13 +96,13 @@ gc_global_queues() -> gc_process_and_entity(consumer_created, GbSet), ExchangeGbSet = gb_sets:from_list(rabbit_exchange:list_names()), gc_process_and_entities(channel_queue_exchange_metrics, GbSet, ExchangeGbSet), - gc_process_and_entities(queue_exchange_metrics, GbSet, ExchangeGbSet). + gc_entities(queue_exchange_metrics, GbSet, ExchangeGbSet). gc_exchanges() -> Exchanges = rabbit_exchange:list_names(), GbSet = gb_sets:from_list(Exchanges), gc_process_and_entity(channel_exchange_metrics, GbSet), - gc_process_and_entity(exchange_metrics, GbSet). + gc_entity(exchange_metrics, GbSet). gc_nodes() -> Nodes = rabbit_nodes:list_members(), @@ -156,6 +156,12 @@ gc_entity(Table, GbSet) -> ({Id = Key, _, _}, none) -> gc_entity(Id, Table, Key, GbSet); ({Id = Key, _, _, _, _}, none) -> + gc_entity(Id, Table, Key, GbSet); + ({Id = Key, _, _, _, _, _}, none) + when Table == exchange_metrics -> + gc_entity(Id, Table, Key, GbSet); + ({Id = Key, _, _, _, _, _, _, _, _}, none) + when Table == queue_counter_metrics -> gc_entity(Id, Table, Key, GbSet) end, none, Table). @@ -175,12 +181,6 @@ gc_process_and_entity(Table, GbSet) -> ({{Pid, Id} = Key, _, _, _, _, _}, none) when Table == channel_exchange_metrics -> gc_process_and_entity(Id, Pid, Table, Key, GbSet); - ({Id = Key, _, _, _, _, _}, none) - when Table == exchange_metrics -> - gc_entity(Id, Table, Key, GbSet); - ({Id = Key, _, _, _, _, _, _, _, _}, none) - when Table == queue_counter_metrics -> - gc_entity(Id, Table, Key, GbSet); ({{Id, Pid, _} = Key, _, _, _, _, _, _}, none) when Table == consumer_created -> gc_process_and_entity(Id, Pid, Table, Key, GbSet); @@ -197,13 +197,15 @@ gc_process_and_entity(Id, Pid, Table, Key, GbSet) -> none end. -gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) -> - ets:foldl(fun - ({{QueueId, ExchangeId} = Key, _, _}, none) +gc_entities(Table, QueueGbSet, ExchangeGbSet) -> + ets:foldl(fun({{QueueId, ExchangeId} = Key, _, _}, none) when Table == queue_exchange_metrics -> gc_entity(QueueId, Table, Key, QueueGbSet), - gc_entity(ExchangeId, Table, Key, ExchangeGbSet); - ({{Pid, {Q, X}} = Key, _, _}, none) -> + gc_entity(ExchangeId, Table, Key, ExchangeGbSet) + end, none, Table). + +gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) -> + ets:foldl(fun({{Pid, {Q, X}} = Key, _, _}, none) -> gc_process(Pid, Table, Key), gc_entity(Q, Table, Key, QueueGbSet), gc_entity(X, Table, Key, ExchangeGbSet) From b5fb5c4f2c4cd56867e0fcce1c2a4fd7befcfe69 Mon Sep 17 00:00:00 2001 From: LoisSotoLopez Date: Tue, 2 Jul 2024 13:36:20 +0200 Subject: [PATCH 5/6] Update deps/rabbit/src/rabbit_core_metrics_gc.erl MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Péter Gömöri --- deps/rabbit/src/rabbit_core_metrics_gc.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_core_metrics_gc.erl b/deps/rabbit/src/rabbit_core_metrics_gc.erl index 04b95980ff0d..cb18d33884f6 100644 --- a/deps/rabbit/src/rabbit_core_metrics_gc.erl +++ b/deps/rabbit/src/rabbit_core_metrics_gc.erl @@ -92,7 +92,7 @@ gc_leader_data(Id, Table, GbSet) -> gc_global_queues() -> GbSet = gb_sets:from_list(rabbit_amqqueue:list_names()), gc_process_and_entity(channel_queue_metrics, GbSet), - gc_process_and_entity(queue_counter_metrics, GbSet), + gc_entity(queue_counter_metrics, GbSet), gc_process_and_entity(consumer_created, GbSet), ExchangeGbSet = gb_sets:from_list(rabbit_exchange:list_names()), gc_process_and_entities(channel_queue_exchange_metrics, GbSet, ExchangeGbSet), From e5961ad49d5e00de2f00b564084ec8bd5251bb20 Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Fri, 12 Jul 2024 11:20:19 +0200 Subject: [PATCH 6/6] wip: add missing exchange_metrics to tests --- ...etheus_rabbitmq_core_metrics_collector.erl | 26 +++++++++---------- .../test/rabbit_prometheus_http_SUITE.erl | 17 +++++++++--- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index c5836ef122c2..5d22da808bdd 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -211,19 +211,6 @@ {channel_queue_exchange_metrics, [ {2, undefined, queue_messages_published_total, counter, "Total number of messages published into a queue through a exchange on a channel"} - ]} -]). - -%% Metrics that can be only requested through `/metrics/detailed` --define(METRICS_CLUSTER,[ - {vhost_status, [ - {2, undefined, vhost_status, gauge, "Whether a given vhost is running"} - ]}, - {exchange_bindings, [ - {2, undefined, exchange_bindings, gauge, "Number of bindings for an exchange. This value is cluster-wide."} - ]}, - {exchange_names, [ - {2, undefined, exchange_name, gauge, "Enumerates exchanges without any additional info. This value is cluster-wide. A cheaper alternative to `exchange_bindings`"} ]}, {queue_exchange_metrics, [ {2, undefined, queue_exchange_messages_published_total, counter, "Total number of messages published into a queue through an exchange"} @@ -244,6 +231,19 @@ {8, undefined, queue_get_empty_total, counter, "Total number of times basic.get operations fetched no message on a queue"} ]}]). +%% Metrics that can be only requested through `/metrics/detailed` +-define(METRICS_CLUSTER,[ + {vhost_status, [ + {2, undefined, vhost_status, gauge, "Whether a given vhost is running"} + ]}, + {exchange_bindings, [ + {2, undefined, exchange_bindings, gauge, "Number of bindings for an exchange. This value is cluster-wide."} + ]}, + {exchange_names, [ + {2, undefined, exchange_name, gauge, "Enumerates exchanges without any additional info. This value is cluster-wide. A cheaper alternative to `exchange_bindings`"} + ]} +]). + -define(TOTALS, [ %% ordering differs from metrics above, refer to list comprehension {connection_created, connections, gauge, "Connections currently open"}, diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index 50bf0b1ad62a..124a195a526d 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -61,6 +61,7 @@ groups() -> queue_exchange_metrics_per_object_test, queue_metrics_per_object_test, queue_consumer_count_and_queue_metrics_mutually_exclusive_test, + exchange_metrics_per_object_test, vhost_status_metric, exchange_bindings_metric, exchange_names_metric @@ -304,7 +305,17 @@ end_per_group_(Config) -> inets:stop(), rabbit_ct_helpers:run_teardown_steps(Config, rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_broker_helpers:teardown_steps()). - +init_per_testcase(Testcase, Config) + when Testcase =:= queue_counter_metrics_per_object_test; + Testcase =:= queue_exchange_metrics_per_object_test; + Testcase =:= exchange_metrics_per_object_test -> + case rabbit_ct_helpers:is_mixed_versions() of + false -> + rabbit_ct_helpers:testcase_started(Config, Testcase); + true -> + %% skip the test in mixed version mode + {skip, "Should not run in mixed version environments"} + end; init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). @@ -603,8 +614,8 @@ queue_exchange_metrics_per_object_test(Config) -> ok. exchange_metrics_per_object_test(Config) -> - Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7]}, - + Expected1 = #{#{exchange => "vhost-1-queue-with-consumer-direct-exchange", vhost => "vhost-1"} => [7]}, + {_, Body} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&family=exchange_metrics", [], 200),