Skip to content

Commit 0c851b4

Browse files
AAP-52935 - Milan jobs summaries events ee collectors (#214)
* Add jobs collector * Add vanilla job host summary for service * Modify job host summary to be fildered by job.finished * Collect all the events from analytics and metrics combined * Filter by jobs not job host summaries * Improve events collections * Add execution environment collector * Minor tweaks - renaming collectors, adding disable if * Add new collectors to validators * Correct jobs fetching * Correct events and execution environments query * ruff * Add base test file * Add finished into job data * Prepare basic test for jobs * First try * Jobs test * Jobs test name fix * Job host summary * Ruff * Events basics * Insert events query * Create partition for events * Events test passed * Events and EE passed * Add public. into the table name * Ruff
1 parent af17d7f commit 0c851b4

File tree

4 files changed

+780
-1
lines changed

4 files changed

+780
-1
lines changed

metrics_utility/automation_controller_billing/collectors.py

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,3 +657,267 @@ def get_cpu_timeline(prom: PrometheusClient, previous_hour_start, previous_hour_
657657

658658
except Exception as e:
659659
raise MetricsException(f'Error querying CPU timeline: {e}')
660+
661+
662+
@register('unified_jobs', '1.4', format='csv', description=_('Data on jobs run'), fnc_slicing=daily_slicing)
663+
def unified_jobs_table(since, full_path, until, **kwargs):
664+
if 'unified_jobs' not in get_optional_collectors():
665+
return None
666+
667+
unified_job_query = """COPY (SELECT main_unifiedjob.id,
668+
main_unifiedjob.polymorphic_ctype_id,
669+
django_content_type.model,
670+
main_unifiedjob.organization_id,
671+
main_organization.name as organization_name,
672+
main_executionenvironment.image as execution_environment_image,
673+
main_job.inventory_id,
674+
main_inventory.name as inventory_name,
675+
main_unifiedjob.created,
676+
main_unifiedjob.name,
677+
main_unifiedjob.unified_job_template_id,
678+
main_unifiedjob.launch_type,
679+
main_unifiedjob.schedule_id,
680+
main_unifiedjob.execution_node,
681+
main_unifiedjob.controller_node,
682+
main_unifiedjob.cancel_flag,
683+
main_unifiedjob.status,
684+
main_unifiedjob.failed,
685+
main_unifiedjob.started,
686+
main_unifiedjob.finished,
687+
main_unifiedjob.elapsed,
688+
main_unifiedjob.job_explanation,
689+
main_unifiedjob.instance_group_id,
690+
main_unifiedjob.installed_collections,
691+
main_unifiedjob.ansible_version,
692+
main_job.forks
693+
FROM main_unifiedjob
694+
LEFT JOIN django_content_type ON main_unifiedjob.polymorphic_ctype_id = django_content_type.id
695+
LEFT JOIN main_job ON main_unifiedjob.id = main_job.unifiedjob_ptr_id
696+
LEFT JOIN main_inventory ON main_job.inventory_id = main_inventory.id
697+
LEFT JOIN main_organization ON main_organization.id = main_unifiedjob.organization_id
698+
LEFT JOIN main_executionenvironment ON main_executionenvironment.id = main_unifiedjob.execution_environment_id
699+
WHERE ((main_unifiedjob.created >= '{0}' AND main_unifiedjob.created < '{1}')
700+
OR (main_unifiedjob.finished >= '{0}' AND main_unifiedjob.finished < '{1}'))
701+
AND main_unifiedjob.launch_type != 'sync'
702+
ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER
703+
""".format(since.isoformat(), until.isoformat())
704+
705+
return _copy_table(table='unified_jobs', query=unified_job_query, path=full_path)
706+
707+
708+
@register('job_host_summary_service', '1.4', format='csv', description=_('Data for billing'), fnc_slicing=daily_slicing)
709+
def job_host_summary_service_table(since, full_path, until, **kwargs):
710+
if 'job_host_summary_service' not in get_optional_collectors():
711+
return None
712+
713+
prepend_query = """
714+
-- Define function for parsing field out of yaml encoded as text
715+
CREATE OR REPLACE FUNCTION metrics_utility_parse_yaml_field(
716+
str text,
717+
field text
718+
)
719+
RETURNS text AS
720+
$$
721+
DECLARE
722+
line_re text;
723+
field_re text;
724+
BEGIN
725+
field_re := ' *[:=] *(.+?) *$';
726+
line_re := '(?n)^' || field || field_re;
727+
RETURN trim(both '"' from substring(str from line_re) );
728+
END;
729+
$$
730+
LANGUAGE plpgsql;
731+
732+
-- Define function to check if field is a valid json
733+
CREATE OR REPLACE FUNCTION metrics_utility_is_valid_json(p_json text)
734+
returns boolean
735+
AS
736+
$$
737+
BEGIN
738+
RETURN (p_json::json is not null);
739+
EXCEPTION
740+
WHEN others THEN
741+
RETURN false;
742+
END;
743+
$$
744+
LANGUAGE plpgsql;
745+
"""
746+
747+
query = f"""
748+
WITH
749+
-- First: restrict to jobs that FINISHED in the window (uses index on main_unifiedjob.finished if present)
750+
filtered_jobs AS (
751+
SELECT mu.id
752+
FROM main_unifiedjob mu
753+
WHERE mu.finished >= '{since.isoformat()}'
754+
AND mu.finished < '{until.isoformat()}'
755+
AND mu.finished IS NOT NULL
756+
),
757+
--
758+
-- Then: only host summaries that belong to those jobs (uses index on main_jobhostsummary.job_id)
759+
filtered_hosts AS (
760+
SELECT DISTINCT mjs.host_id
761+
FROM main_jobhostsummary mjs
762+
JOIN filtered_jobs fj ON fj.id = mjs.job_id
763+
),
764+
--
765+
hosts_variables AS (
766+
SELECT
767+
fh.host_id,
768+
CASE
769+
WHEN metrics_utility_is_valid_json(h.variables)
770+
THEN h.variables::jsonb->>'ansible_host'
771+
ELSE metrics_utility_parse_yaml_field(h.variables, 'ansible_host')
772+
END AS ansible_host_variable,
773+
CASE
774+
WHEN metrics_utility_is_valid_json(h.variables)
775+
THEN h.variables::jsonb->>'ansible_connection'
776+
ELSE metrics_utility_parse_yaml_field(h.variables, 'ansible_connection')
777+
END AS ansible_connection_variable
778+
FROM filtered_hosts fh
779+
LEFT JOIN main_host h ON h.id = fh.host_id
780+
)
781+
782+
SELECT
783+
mjs.id,
784+
mjs.created,
785+
mjs.modified,
786+
mjs.host_name,
787+
mjs.host_id AS host_remote_id,
788+
hv.ansible_host_variable,
789+
hv.ansible_connection_variable,
790+
mjs.changed,
791+
mjs.dark,
792+
mjs.failures,
793+
mjs.ok,
794+
mjs.processed,
795+
mjs.skipped,
796+
mjs.failed,
797+
mjs.ignored,
798+
mjs.rescued,
799+
mu.created AS job_created,
800+
mjs.job_id AS job_remote_id,
801+
mu.unified_job_template_id AS job_template_remote_id,
802+
mu.name AS job_template_name,
803+
mi.id AS inventory_remote_id,
804+
mi.name AS inventory_name,
805+
mo.id AS organization_remote_id,
806+
mo.name AS organization_name,
807+
mup.id AS project_remote_id,
808+
mup.name AS project_name
809+
FROM filtered_jobs fj
810+
JOIN main_jobhostsummary mjs ON mjs.job_id = fj.id
811+
LEFT JOIN main_job mj ON mjs.job_id = mj.unifiedjob_ptr_id
812+
LEFT JOIN main_unifiedjob mu ON mu.id = mjs.job_id
813+
LEFT JOIN main_unifiedjobtemplate AS mup ON mup.id = mj.project_id
814+
LEFT JOIN main_inventory mi ON mi.id = mj.inventory_id
815+
LEFT JOIN main_organization mo ON mo.id = mu.organization_id
816+
LEFT JOIN hosts_variables hv ON hv.host_id = mjs.host_id
817+
ORDER BY mu.finished ASC
818+
"""
819+
820+
return _copy_table(table='main_jobhostsummary', query=f'COPY ({query}) TO STDOUT WITH CSV HEADER', path=full_path, prepend_query=prepend_query)
821+
822+
823+
@register('main_jobevent_service', '1.4', format='csv', description=_('Content usage'), fnc_slicing=daily_slicing)
824+
def main_jobevent_service_table(since, full_path, until, **kwargs):
825+
if 'main_jobevent_service' not in get_optional_collectors():
826+
return None
827+
828+
# Use the table alias 'e' here (you alias main_jobevent as e in the FROM)
829+
event_data = r"replace(e.event_data, '\u', '\u005cu')::jsonb"
830+
831+
# 1) Load finished jobs in the window
832+
jobs_query = """
833+
SELECT uj.id AS job_id,
834+
uj.created AS job_created
835+
FROM main_unifiedjob uj
836+
WHERE uj.finished >= %(since)s
837+
AND uj.finished < %(until)s
838+
"""
839+
jobs = []
840+
841+
# do raw sql for django.db connection
842+
with connection.cursor() as cursor:
843+
cursor.execute(jobs_query, {'since': since, 'until': until})
844+
jobs = cursor.fetchall()
845+
846+
# 2) Build a literal WHERE clause that preserves (job_id, job_created) pairing
847+
if jobs:
848+
# (e.job_id, e.job_created) IN (VALUES (id1, 'ts1'::timestamptz), ...)
849+
pairs_sql = ',\n'.join(f"({jid}, '{jcreated.isoformat()}'::timestamptz)" for jid, jcreated in jobs)
850+
where_clause = f'(e.job_id, e.job_created) IN (VALUES {pairs_sql})'
851+
else:
852+
# No jobs in the window → no events
853+
where_clause = 'FALSE'
854+
855+
# 3) Final event query
856+
query = f"""
857+
SELECT
858+
e.id,
859+
e.created,
860+
e.modified,
861+
e.job_created,
862+
uj.finished,
863+
e.uuid,
864+
e.parent_uuid,
865+
e.event,
866+
867+
-- JSON extracted fields
868+
({event_data}->>'task_action') AS task_action,
869+
({event_data}->>'resolved_action') AS resolved_action,
870+
({event_data}->>'resolved_role') AS resolved_role,
871+
({event_data}->>'duration') AS duration,
872+
({event_data}->>'start')::timestamptz AS start,
873+
({event_data}->>'end')::timestamptz AS end,
874+
875+
e.failed,
876+
e.changed,
877+
e.playbook,
878+
e.play,
879+
e.task,
880+
e.role,
881+
e.job_id AS job_remote_id,
882+
e.host_id AS host_remote_id,
883+
e.host_name,
884+
885+
-- Warnings and deprecations (json arrays)
886+
{event_data}->'res'->'warnings' AS warnings,
887+
{event_data}->'res'->'deprecations' AS deprecations,
888+
889+
CASE WHEN e.event = 'playbook_on_stats'
890+
THEN {event_data} - 'artifact_data'
891+
END AS playbook_on_stats
892+
893+
FROM main_jobevent e
894+
LEFT JOIN main_unifiedjob uj ON uj.id = e.job_id
895+
WHERE {where_clause}
896+
"""
897+
898+
return _copy_table(table='main_jobevent', query=f'COPY ({query}) TO STDOUT WITH CSV HEADER', path=full_path)
899+
900+
901+
@register('execution_environments', '1.4', format='csv', description=_('Execution environments'), fnc_slicing=limit_slicing)
902+
def execution_environments_table(since, full_path, until, **kwargs):
903+
if 'execution_environments' not in get_optional_collectors():
904+
return None
905+
906+
sql = """
907+
SELECT
908+
id,
909+
created,
910+
modified,
911+
description,
912+
image,
913+
managed,
914+
created_by_id,
915+
credential_id,
916+
modified_by_id,
917+
organization_id,
918+
name,
919+
pull
920+
FROM public.main_executionenvironment
921+
"""
922+
923+
return _copy_table(table='main_executionenvironment', query=f'COPY ({sql}) TO STDOUT WITH CSV HEADER', path=full_path)

metrics_utility/management/validation.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,18 @@
4848
'managed_nodes_by_organizations',
4949
},
5050
}
51-
VALID_COLLECTORS = {'main_host', 'main_jobevent', 'main_indirectmanagednodeaudit', 'total_workers_vcpu', ''}
51+
VALID_COLLECTORS = {
52+
'main_host',
53+
'main_jobevent',
54+
'main_indirectmanagednodeaudit',
55+
'total_workers_vcpu',
56+
'unified_jobs',
57+
'job_host_summary_service',
58+
'main_jobevent_service',
59+
'execution_environments',
60+
'',
61+
}
62+
5263
VALID_SHIP_TARGET_BUILD = {'directory', 's3', 'controller_db'}
5364
VALID_SHIP_TARGET_GATHER = {'directory', 's3', 'crc'}
5465

0 commit comments

Comments
 (0)