From d30ee972481aedabfc173380dc309db946badbc3 Mon Sep 17 00:00:00 2001 From: Jake Smola Date: Wed, 27 Aug 2025 08:15:53 +0000 Subject: [PATCH 01/14] Remove stale import --- tools/mavlogdump.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/mavlogdump.py b/tools/mavlogdump.py index e9fe563f1..903da5a47 100755 --- a/tools/mavlogdump.py +++ b/tools/mavlogdump.py @@ -75,7 +75,6 @@ sys.exit(1) # Load these modules here, as they're only needed for MAT file creation import scipy.io - import numpy as np filename = args.log mlog = mavutil.mavlink_connection(filename, planner_format=args.planner, From 75e2e71bcd0e578d837993173d589f9991bcc736 Mon Sep 17 00:00:00 2001 From: Jake Smola Date: Wed, 27 Aug 2025 08:20:40 +0000 Subject: [PATCH 02/14] Consolidate imports --- tools/mavlogdump.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tools/mavlogdump.py b/tools/mavlogdump.py index 903da5a47..8ddc2d0ac 100755 --- a/tools/mavlogdump.py +++ b/tools/mavlogdump.py @@ -13,13 +13,15 @@ import struct import sys import time +import inspect +from argparse import ArgumentParser +from pymavlink import mavutil try: from pymavlink.mavextra import * except: print("WARNING: Numpy missing, mathematical notation will not be supported..") -from argparse import ArgumentParser parser = ArgumentParser(description=__doc__) parser.add_argument("--no-timestamps", dest="notimestamps", action='store_true', help="Log doesn't have timestamps") @@ -59,11 +61,6 @@ if not args.mav10: os.environ['MAVLINK20'] = '1' -import inspect - -from pymavlink import mavutil - - if args.profile: import yappi # We do the import here so that we won't barf if run normally and yappi not available yappi.start() From 5530deaa2a4ff5f40e142a70b6ac0a4df6ed8e42 Mon Sep 17 00:00:00 2001 From: Jake Smola Date: Wed, 27 Aug 2025 10:09:09 +0000 Subject: [PATCH 03/14] Reduce code-reuse; fix semantic error --- tools/mavlogdump.py | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/tools/mavlogdump.py b/tools/mavlogdump.py index 8ddc2d0ac..57948e230 100755 --- a/tools/mavlogdump.py +++ b/tools/mavlogdump.py @@ -15,13 +15,9 @@ import time import inspect from argparse import ArgumentParser +from pymavlink.DFReader import to_string from pymavlink import mavutil -try: - from pymavlink.mavextra import * -except: - print("WARNING: Numpy missing, mathematical notation will not be supported..") - parser = ArgumentParser(description=__doc__) parser.add_argument("--no-timestamps", dest="notimestamps", action='store_true', help="Log doesn't have timestamps") @@ -84,6 +80,9 @@ if args.output: output = open(args.output, mode='wb') +if args.csv_sep == "tab": + args.csv_sep = "\t" + types = args.types if types is not None: types = types.split(',') @@ -141,15 +140,6 @@ def reduce_rate_msg(m, reduction_rate): return False return True -if args.csv_sep == "tab": - args.csv_sep = "," - -# swiped from DFReader.py -def to_string(s): - '''desperate attempt to convert a string regardless of what garbage we get''' - if isinstance(s, str): - return s - return s.decode(errors="backslashreplace") def match_type(mtype, patterns): '''return True if mtype matches pattern''' From aa42c8248c107cafa364b6ccec196d0cabe72e78 Mon Sep 17 00:00:00 2001 From: Jake Smola Date: Wed, 27 Aug 2025 10:09:32 +0000 Subject: [PATCH 04/14] Expand test suite --- tests/test_mavlogdump.py | 391 +++++++++++++++++++++++++++++++++++---- 1 file changed, 358 insertions(+), 33 deletions(-) diff --git a/tests/test_mavlogdump.py b/tests/test_mavlogdump.py index fd7e9b15c..004b723ea 100755 --- a/tests/test_mavlogdump.py +++ b/tests/test_mavlogdump.py @@ -1,47 +1,372 @@ #!/usr/bin/env python3 - """ -regression tests for mavlogdump.py +Comprehensive regression tests for mavlogdump.py """ import unittest import os -import pkg_resources import sys +import json +import tempfile +import shutil +import pkg_resources + +# Add parent directory to path to import mavlogdump +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'tools'))) -class MAVLogDumpTest(unittest.TestCase): + +class MAVLogDumpTest(unittest.TestCase): """ - Class to test mavlogdump + Class to test mavlogdump functionality for all formats """ - def __init__(self, *args, **kwargs): - """Constructor, set up some data that is reused in many tests""" - super(MAVLogDumpTest, self).__init__(*args, **kwargs) - - def test_dump_same(self): - """Test dump of file is what we expect""" - test_filename = "test.BIN" - test_filepath = pkg_resources.resource_filename(__name__, - test_filename) - dump_filename = "tmp.dump" - os.system("mavlogdump.py %s >%s" % (test_filepath, dump_filename)) - with open(dump_filename) as f: - got = f.read() - - possibles = ["test.BIN.py3.dumped", - "test.BIN.dumped"] - success = False - for expected in possibles: - expected_filepath = pkg_resources.resource_filename(__name__, - expected) - with open(expected_filepath) as e: - expected = e.read() - - if expected == got: - success = True - - assert True + def setUp(self): + """Set up test fixtures""" + self.test_dir = tempfile.mkdtemp() + self.test_filename = "test.BIN" + # Get the path to mavlogdump.py relative to this test file │ │ + self.mavlogdump_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "tools", "mavlogdump.py") + try: + self.test_filepath = pkg_resources.resource_filename(__name__, self.test_filename) + except: + # If resource not found, create a dummy file for testing + self.test_filepath = os.path.join(self.test_dir, self.test_filename) + open(self.test_filepath, 'a').close() + + def tearDown(self): + """Clean up test fixtures""" + if os.path.exists(self.test_dir): + shutil.rmtree(self.test_dir) + + def test_dump_standard_format(self): + """Test standard format dump of file""" + output_file = os.path.join(self.test_dir, "standard_output.txt") + cmd = f"{self.mavlogdump_path} {self.test_filepath} > {output_file}" + result = os.system(cmd) + + self.assertEqual(result >> 8, 0, "Standard format dump should succeed") + self.assertTrue(os.path.exists(output_file), "Output file should be created") + + def test_dump_json_format(self): + """Test JSON format output""" + output_file = os.path.join(self.test_dir, "json_output.txt") + cmd = f"{self.mavlogdump_path} --format json {self.test_filepath} > {output_file}" + result = os.system(cmd) + + self.assertEqual(result >> 8, 0, "JSON format dump should succeed") + self.assertTrue(os.path.exists(output_file), "JSON output file should be created") + + # Verify JSON format if file has content + if os.path.getsize(output_file) > 0: + with open(output_file, 'r') as f: + for line in f: + if line.strip(): + try: + data = json.loads(line) + self.assertIn('meta', data, "JSON output should have 'meta' field") + self.assertIn('data', data, "JSON output should have 'data' field") + break + except json.JSONDecodeError: + pass + + def test_dump_json_with_show_source(self): + """Test JSON format with show-source option""" + output_file = os.path.join(self.test_dir, "json_source_output.txt") + cmd = f"{self.mavlogdump_path} --format json --show-source {self.test_filepath} > {output_file}" + result = os.system(cmd) + + self.assertEqual(result >> 8, 0, "JSON format with show-source should succeed") + self.assertTrue(os.path.exists(output_file), "JSON output file should be created") + + # Verify JSON includes source info if file has content + if os.path.getsize(output_file) > 0: + with open(output_file, 'r') as f: + for line in f: + if line.strip(): + try: + data = json.loads(line) + if 'meta' in data: + # Check if source fields are present when data is available + if data.get('data'): + self.assertIn('type', data['meta'], "Meta should have type field") + break + except json.JSONDecodeError: + pass + + def test_dump_csv_format(self): + """Test CSV format output""" + output_file = os.path.join(self.test_dir, "csv_output.csv") + # CSV format requires --types to be specified + cmd = f"{self.mavlogdump_path} --format csv --types 'IMU2' {self.test_filepath} > {output_file} 2>/dev/null" + os.system(cmd) + + # Check if file was created (even if empty) + if os.path.exists(output_file): + with open(output_file, 'r') as f: + content = f.read() + if content.strip(): + # Verify CSV format + lines = content.strip().split('\n') + if lines: + # First line should be headers + headers = lines[0].split(',') + self.assertIn('timestamp', headers, "CSV should have timestamp column") + + def test_dump_csv_with_custom_separator(self): + """Test CSV format with custom separator""" + output_file = os.path.join(self.test_dir, "csv_tab_output.csv") + cmd = f"{self.mavlogdump_path} --format csv --csv_sep tab --types 'IMU2' {self.test_filepath} > {output_file} 2>/dev/null" + result = os.system(cmd) + + if os.path.exists(output_file) and os.path.getsize(output_file) > 0: + with open(output_file, 'r') as f: + first_line = f.readline() + if first_line: + # Check for tab separator + self.assertIn('\t', first_line, "CSV with tab separator should use tabs") + + def test_dump_mat_format(self): + """Test MAT format output""" + mat_file = os.path.join(self.test_dir, "output.mat") + cmd = f"{self.mavlogdump_path} --format mat --mat_file {mat_file} {self.test_filepath} 2>/dev/null" + result = os.system(cmd) + + # MAT format requires scipy, which might not be installed + if result >> 8 == 0: + self.assertTrue(os.path.exists(mat_file), "MAT file should be created") + + def test_dump_mat_with_compression(self): + """Test MAT format with compression""" + mat_file = os.path.join(self.test_dir, "output_compressed.mat") + cmd = f"{self.mavlogdump_path} --format mat --mat_file {mat_file} --compress {self.test_filepath} 2>/dev/null" + result = os.system(cmd) + + # MAT format requires scipy, which might not be installed + if result >> 8 == 0: + self.assertTrue(os.path.exists(mat_file), "Compressed MAT file should be created") + + def test_type_filtering(self): + """Test message type filtering""" + output_file = os.path.join(self.test_dir, "filtered_output.txt") + cmd = f"{self.mavlogdump_path} --types 'ATT,GPS' {self.test_filepath} > {output_file} 2>/dev/null" + result = os.system(cmd) + + self.assertEqual(result >> 8, 0, "Type filtering should succeed") + self.assertTrue(os.path.exists(output_file), "Filtered output file should be created") + + def test_nottype_filtering(self): + """Test message type exclusion""" + output_file = os.path.join(self.test_dir, "excluded_output.txt") + cmd = f"{self.mavlogdump_path} --nottypes 'BAD_DATA' {self.test_filepath} > {output_file} 2>/dev/null" + result = os.system(cmd) + + self.assertEqual(result >> 8, 0, "Type exclusion should succeed") + self.assertTrue(os.path.exists(output_file), "Excluded output file should be created") + + def test_quiet_mode(self): + """Test quiet mode suppresses output""" + output_file = os.path.join(self.test_dir, "quiet_output.txt") + cmd = f"{self.mavlogdump_path} --quiet {self.test_filepath} > {output_file}" + result = os.system(cmd) + + self.assertEqual(result >> 8, 0, "Quiet mode should succeed") + # In quiet mode, output should be minimal or empty + self.assertTrue(os.path.exists(output_file), "Output file should be created even in quiet mode") + + def test_output_to_file(self): + """Test output to file option""" + output_file = os.path.join(self.test_dir, "direct_output.bin") + cmd = f"{self.mavlogdump_path} --output {output_file} {self.test_filepath} 2>/dev/null" + result = os.system(cmd) + + self.assertEqual(result >> 8, 0, "Output to file should succeed") + self.assertTrue(os.path.exists(output_file), "Direct output file should be created") + + def test_show_types(self): + """Test show-types option""" + output_file = os.path.join(self.test_dir, "types_output.txt") + cmd = f"{self.mavlogdump_path} --show-types {self.test_filepath} > {output_file} 2>/dev/null" + result = os.system(cmd) + + self.assertEqual(result >> 8, 0, "Show types should succeed") + self.assertTrue(os.path.exists(output_file), "Types output file should be created") + + def test_reduce_option(self): + """Test message reduction by ratio""" + output_file = os.path.join(self.test_dir, "reduced_output.txt") + cmd = f"{self.mavlogdump_path} --reduce 10 {self.test_filepath} > {output_file} 2>/dev/null" + result = os.system(cmd) + + self.assertEqual(result >> 8, 0, "Reduce option should succeed") + self.assertTrue(os.path.exists(output_file), "Reduced output file should be created") + + def test_reduce_rate_option(self): + """Test message reduction by rate""" + output_file = os.path.join(self.test_dir, "rate_reduced_output.txt") + cmd = f"{self.mavlogdump_path} --reduce-rate 10 {self.test_filepath} > {output_file} 2>/dev/null" + result = os.system(cmd) + + self.assertEqual(result >> 8, 0, "Reduce-rate option should succeed") + self.assertTrue(os.path.exists(output_file), "Rate reduced output file should be created") + + def test_condition_filtering(self): + """Test condition-based filtering""" + output_file = os.path.join(self.test_dir, "condition_output.txt") + # Simple condition that should be valid + cmd = f"{self.mavlogdump_path} --condition 'True' {self.test_filepath} > {output_file} 2>/dev/null" + result = os.system(cmd) + + self.assertEqual(result >> 8, 0, "Condition filtering should succeed") + self.assertTrue(os.path.exists(output_file), "Condition filtered output file should be created") + + def test_mav10_option(self): + """Test MAVLink 1.0 parsing""" + output_file = os.path.join(self.test_dir, "mav10_output.txt") + cmd = f"{self.mavlogdump_path} --mav10 {self.test_filepath} > {output_file} 2>/dev/null" + result = os.system(cmd) + + self.assertEqual(result >> 8, 0, "MAV1.0 parsing should succeed") + self.assertTrue(os.path.exists(output_file), "MAV1.0 output file should be created") + + def test_verbose_mode(self): + """Test verbose output mode""" + output_file = os.path.join(self.test_dir, "verbose_output.txt") + cmd = f"{self.mavlogdump_path} --verbose {self.test_filepath} > {output_file} 2>/dev/null" + result = os.system(cmd) + + self.assertEqual(result >> 8, 0, "Verbose mode should succeed") + self.assertTrue(os.path.exists(output_file), "Verbose output file should be created") + + def test_source_filtering(self): + """Test source system and component filtering""" + output_file = os.path.join(self.test_dir, "source_filtered.txt") + cmd = f"{self.mavlogdump_path} --source-system 1 --source-component 1 {self.test_filepath} > {output_file} 2>/dev/null" + result = os.system(cmd) + + self.assertEqual(result >> 8, 0, "Source filtering should succeed") + self.assertTrue(os.path.exists(output_file), "Source filtered output file should be created") + + def test_combined_options(self): + """Test combination of multiple options""" + output_file = os.path.join(self.test_dir, "combined_output.json") + cmd = (f"mavlogdump.py --format json --types 'ATT,GPS' " + f"--quiet --no-bad-data {self.test_filepath} > {output_file} 2>/dev/null") + result = os.system(cmd) + + self.assertEqual(result >> 8, 0, "Combined options should succeed") + self.assertTrue(os.path.exists(output_file), "Combined output file should be created") + + def test_import_as_module(self): + """Test importing mavlogdump as a module""" + try: + # Try to import the refactored version + from pymavlink.tools import mavlogdump + + # Check that main functions exist + self.assertTrue(hasattr(mavlogdump, 'process_log'), "Should have process_log function") + self.assertTrue(hasattr(mavlogdump, 'process_log_json'), "Should have process_log_json function") + self.assertTrue(hasattr(mavlogdump, 'process_log_csv'), "Should have process_log_csv function") + self.assertTrue(hasattr(mavlogdump, 'process_log_mat'), "Should have process_log_mat function") + except ImportError: + # If new functions don't exist, check for old structure + pass + + def test_programmatic_json_processing(self): + """Test programmatic JSON processing""" + try: + from pymavlink.tools import mavlogdump + if hasattr(mavlogdump, 'process_log'): + # Test programmatic interface + result = mavlogdump.process_log( + self.test_filepath, + output_format='json', + types=['ATT'], + quiet=True + ) + self.assertEqual(result, 0, "Programmatic JSON processing should succeed") + except ImportError: + self.skipTest("mavlogdump module not importable") + + def test_programmatic_csv_processing(self): + """Test programmatic CSV processing""" + try: + from pymavlink.tools import mavlogdump + if hasattr(mavlogdump, 'process_log'): + # Test programmatic interface + output_file = os.path.join(self.test_dir, "prog_csv.csv") + result = mavlogdump.process_log( + self.test_filepath, + output_format='csv', + types=['*'], + output=output_file, + quiet=True + ) + self.assertEqual(result, 0, "Programmatic CSV processing should succeed") + except ImportError: + self.skipTest("mavlogdump module not importable") + + def test_programmatic_mat_processing(self): + """Test programmatic MAT processing""" + try: + from pymavlink.tools import mavlogdump + if hasattr(mavlogdump, 'process_log'): + # Test programmatic interface + mat_file = os.path.join(self.test_dir, "prog_output.mat") + result = mavlogdump.process_log( + self.test_filepath, + output_format='mat', + mat_file=mat_file, + quiet=True + ) + # MAT processing might fail if scipy is not installed + if result == 0: + self.assertTrue(os.path.exists(mat_file), "Programmatic MAT file should be created") + except ImportError: + self.skipTest("mavlogdump module not importable") + + +class MAVLogDumpUnitTest(unittest.TestCase): + """Unit tests for individual functions""" + + def test_match_type_function(self): + """Test the match_type function""" + try: + from pymavlink.tools.mavlogdump import match_type + + # Test exact match + self.assertTrue(match_type('GPS', ['GPS'])) + self.assertFalse(match_type('GPS', ['ATT'])) + + # Test wildcard match + self.assertTrue(match_type('GPS_RAW', ['GPS*'])) + self.assertTrue(match_type('ATT', ['A*'])) + self.assertFalse(match_type('GPS', ['ATT*'])) + + # Test multiple patterns + self.assertTrue(match_type('GPS', ['ATT', 'GPS'])) + self.assertTrue(match_type('ATT', ['ATT', 'GPS'])) + except ImportError: + self.skipTest("match_type function not importable") + + def test_to_string_function(self): + """Test the to_string function""" + try: + from pymavlink.tools.mavlogdump import to_string + + # Test string input + self.assertEqual(to_string("hello"), "hello") + + # Test bytes input + self.assertEqual(to_string(b"hello"), "hello") + + # Test bytes with special characters + result = to_string(b"\xff\xfe") + self.assertIsInstance(result, str) + except ImportError: + self.skipTest("to_string function not importable") + if __name__ == '__main__': - unittest.main() + unittest.main() \ No newline at end of file From e567feb5f8b31c8635acc2f2c28973694f076f09 Mon Sep 17 00:00:00 2001 From: Jake Smola Date: Wed, 27 Aug 2025 10:12:20 +0000 Subject: [PATCH 05/14] Remove artifact --- tests/test_mavlogdump.py | 35 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/tests/test_mavlogdump.py b/tests/test_mavlogdump.py index 004b723ea..d09d85535 100755 --- a/tests/test_mavlogdump.py +++ b/tests/test_mavlogdump.py @@ -46,7 +46,7 @@ def test_dump_standard_format(self): cmd = f"{self.mavlogdump_path} {self.test_filepath} > {output_file}" result = os.system(cmd) - self.assertEqual(result >> 8, 0, "Standard format dump should succeed") + self.assertEqual(result, 0, "Standard format dump should succeed") self.assertTrue(os.path.exists(output_file), "Output file should be created") def test_dump_json_format(self): @@ -55,7 +55,7 @@ def test_dump_json_format(self): cmd = f"{self.mavlogdump_path} --format json {self.test_filepath} > {output_file}" result = os.system(cmd) - self.assertEqual(result >> 8, 0, "JSON format dump should succeed") + self.assertEqual(result, 0, "JSON format dump should succeed") self.assertTrue(os.path.exists(output_file), "JSON output file should be created") # Verify JSON format if file has content @@ -77,7 +77,7 @@ def test_dump_json_with_show_source(self): cmd = f"{self.mavlogdump_path} --format json --show-source {self.test_filepath} > {output_file}" result = os.system(cmd) - self.assertEqual(result >> 8, 0, "JSON format with show-source should succeed") + self.assertEqual(result, 0, "JSON format with show-source should succeed") self.assertTrue(os.path.exists(output_file), "JSON output file should be created") # Verify JSON includes source info if file has content @@ -134,7 +134,7 @@ def test_dump_mat_format(self): result = os.system(cmd) # MAT format requires scipy, which might not be installed - if result >> 8 == 0: + if result == 0: self.assertTrue(os.path.exists(mat_file), "MAT file should be created") def test_dump_mat_with_compression(self): @@ -144,7 +144,7 @@ def test_dump_mat_with_compression(self): result = os.system(cmd) # MAT format requires scipy, which might not be installed - if result >> 8 == 0: + if result == 0: self.assertTrue(os.path.exists(mat_file), "Compressed MAT file should be created") def test_type_filtering(self): @@ -153,7 +153,7 @@ def test_type_filtering(self): cmd = f"{self.mavlogdump_path} --types 'ATT,GPS' {self.test_filepath} > {output_file} 2>/dev/null" result = os.system(cmd) - self.assertEqual(result >> 8, 0, "Type filtering should succeed") + self.assertEqual(result, 0, "Type filtering should succeed") self.assertTrue(os.path.exists(output_file), "Filtered output file should be created") def test_nottype_filtering(self): @@ -162,7 +162,7 @@ def test_nottype_filtering(self): cmd = f"{self.mavlogdump_path} --nottypes 'BAD_DATA' {self.test_filepath} > {output_file} 2>/dev/null" result = os.system(cmd) - self.assertEqual(result >> 8, 0, "Type exclusion should succeed") + self.assertEqual(result, 0, "Type exclusion should succeed") self.assertTrue(os.path.exists(output_file), "Excluded output file should be created") def test_quiet_mode(self): @@ -171,7 +171,7 @@ def test_quiet_mode(self): cmd = f"{self.mavlogdump_path} --quiet {self.test_filepath} > {output_file}" result = os.system(cmd) - self.assertEqual(result >> 8, 0, "Quiet mode should succeed") + self.assertEqual(result, 0, "Quiet mode should succeed") # In quiet mode, output should be minimal or empty self.assertTrue(os.path.exists(output_file), "Output file should be created even in quiet mode") @@ -181,7 +181,7 @@ def test_output_to_file(self): cmd = f"{self.mavlogdump_path} --output {output_file} {self.test_filepath} 2>/dev/null" result = os.system(cmd) - self.assertEqual(result >> 8, 0, "Output to file should succeed") + self.assertEqual(result, 0, "Output to file should succeed") self.assertTrue(os.path.exists(output_file), "Direct output file should be created") def test_show_types(self): @@ -189,8 +189,7 @@ def test_show_types(self): output_file = os.path.join(self.test_dir, "types_output.txt") cmd = f"{self.mavlogdump_path} --show-types {self.test_filepath} > {output_file} 2>/dev/null" result = os.system(cmd) - - self.assertEqual(result >> 8, 0, "Show types should succeed") + self.assertEqual(result, 0, "Show types should succeed") self.assertTrue(os.path.exists(output_file), "Types output file should be created") def test_reduce_option(self): @@ -199,7 +198,7 @@ def test_reduce_option(self): cmd = f"{self.mavlogdump_path} --reduce 10 {self.test_filepath} > {output_file} 2>/dev/null" result = os.system(cmd) - self.assertEqual(result >> 8, 0, "Reduce option should succeed") + self.assertEqual(result, 0, "Reduce option should succeed") self.assertTrue(os.path.exists(output_file), "Reduced output file should be created") def test_reduce_rate_option(self): @@ -208,7 +207,7 @@ def test_reduce_rate_option(self): cmd = f"{self.mavlogdump_path} --reduce-rate 10 {self.test_filepath} > {output_file} 2>/dev/null" result = os.system(cmd) - self.assertEqual(result >> 8, 0, "Reduce-rate option should succeed") + self.assertEqual(result, 0, "Reduce-rate option should succeed") self.assertTrue(os.path.exists(output_file), "Rate reduced output file should be created") def test_condition_filtering(self): @@ -218,7 +217,7 @@ def test_condition_filtering(self): cmd = f"{self.mavlogdump_path} --condition 'True' {self.test_filepath} > {output_file} 2>/dev/null" result = os.system(cmd) - self.assertEqual(result >> 8, 0, "Condition filtering should succeed") + self.assertEqual(result, 0, "Condition filtering should succeed") self.assertTrue(os.path.exists(output_file), "Condition filtered output file should be created") def test_mav10_option(self): @@ -227,7 +226,7 @@ def test_mav10_option(self): cmd = f"{self.mavlogdump_path} --mav10 {self.test_filepath} > {output_file} 2>/dev/null" result = os.system(cmd) - self.assertEqual(result >> 8, 0, "MAV1.0 parsing should succeed") + self.assertEqual(result, 0, "MAV1.0 parsing should succeed") self.assertTrue(os.path.exists(output_file), "MAV1.0 output file should be created") def test_verbose_mode(self): @@ -236,7 +235,7 @@ def test_verbose_mode(self): cmd = f"{self.mavlogdump_path} --verbose {self.test_filepath} > {output_file} 2>/dev/null" result = os.system(cmd) - self.assertEqual(result >> 8, 0, "Verbose mode should succeed") + self.assertEqual(result, 0, "Verbose mode should succeed") self.assertTrue(os.path.exists(output_file), "Verbose output file should be created") def test_source_filtering(self): @@ -245,7 +244,7 @@ def test_source_filtering(self): cmd = f"{self.mavlogdump_path} --source-system 1 --source-component 1 {self.test_filepath} > {output_file} 2>/dev/null" result = os.system(cmd) - self.assertEqual(result >> 8, 0, "Source filtering should succeed") + self.assertEqual(result, 0, "Source filtering should succeed") self.assertTrue(os.path.exists(output_file), "Source filtered output file should be created") def test_combined_options(self): @@ -255,7 +254,7 @@ def test_combined_options(self): f"--quiet --no-bad-data {self.test_filepath} > {output_file} 2>/dev/null") result = os.system(cmd) - self.assertEqual(result >> 8, 0, "Combined options should succeed") + self.assertEqual(result, 0, "Combined options should succeed") self.assertTrue(os.path.exists(output_file), "Combined output file should be created") def test_import_as_module(self): From 46e5a73420e0ce1b9350edfde9d9428380202e23 Mon Sep 17 00:00:00 2001 From: --replace-all Date: Wed, 27 Aug 2025 10:34:43 +0000 Subject: [PATCH 06/14] Suppress stdout in test --- tests/test_mavlogdump.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_mavlogdump.py b/tests/test_mavlogdump.py index d09d85535..dde502f18 100755 --- a/tests/test_mavlogdump.py +++ b/tests/test_mavlogdump.py @@ -178,7 +178,7 @@ def test_quiet_mode(self): def test_output_to_file(self): """Test output to file option""" output_file = os.path.join(self.test_dir, "direct_output.bin") - cmd = f"{self.mavlogdump_path} --output {output_file} {self.test_filepath} 2>/dev/null" + cmd = f"{self.mavlogdump_path} -q --output {output_file} {self.test_filepath} 2>/dev/null" result = os.system(cmd) self.assertEqual(result, 0, "Output to file should succeed") From 955be1297bc4e2453f6a8c4209138f1464f9add4 Mon Sep 17 00:00:00 2001 From: --replace-all Date: Wed, 27 Aug 2025 11:03:27 +0000 Subject: [PATCH 07/14] Make code sharable/importable --- tools/mavlogdump.py | 639 +++++++++++++++++++++++--------------------- 1 file changed, 338 insertions(+), 301 deletions(-) diff --git a/tools/mavlogdump.py b/tools/mavlogdump.py index 57948e230..06f3f18a6 100755 --- a/tools/mavlogdump.py +++ b/tools/mavlogdump.py @@ -18,95 +18,12 @@ from pymavlink.DFReader import to_string from pymavlink import mavutil -parser = ArgumentParser(description=__doc__) - -parser.add_argument("--no-timestamps", dest="notimestamps", action='store_true', help="Log doesn't have timestamps") -parser.add_argument("--planner", action='store_true', help="use planner file format") -parser.add_argument("--robust", action='store_true', help="Enable robust parsing (skip over bad data)") -parser.add_argument("-f", "--follow", action='store_true', help="keep waiting for more data at end of file (not implemented for .bin, .log, .csv") -parser.add_argument("--condition", default=None, help="select packets by condition") -parser.add_argument("-q", "--quiet", action='store_true', help="don't display packets") -parser.add_argument("-o", "--output", default=None, help="output matching packets to give file") -parser.add_argument("-p", "--parms", action='store_true', help="preserve parameters in output with -o") -parser.add_argument("--format", default=None, help="Change the output format between 'standard', 'json', 'csv' and 'mat'. For the CSV output, you must supply types that you want. For MAT output, specify output file with --mat_file") -parser.add_argument("--csv_sep", dest="csv_sep", default=",", help="Select the delimiter between columns for the output CSV file. Use 'tab' to specify tabs. Only applies when --format=csv") -parser.add_argument("--types", default=None, help="types of messages (comma separated with wildcard)") -parser.add_argument("--nottypes", default=None, help="types of messages not to include (comma separated with wildcard)") -parser.add_argument("--mat_file", dest="mat_file", help="Output file path for MATLAB file output. Only applies when --format=mat") -parser.add_argument("-c", "--compress", action='store_true', help="Compress .mat file data") -parser.add_argument("--dialect", default="ardupilotmega", help="MAVLink dialect") -parser.add_argument("--zero-time-base", action='store_true', help="use Z time base for DF logs") -parser.add_argument("--no-bad-data", action='store_true', help="Don't output corrupted messages") -parser.add_argument("--show-source", action='store_true', help="Show source system ID and component ID") -parser.add_argument("--show-seq", action='store_true', help="Show sequence numbers") -parser.add_argument("--show-types", action='store_true', help="Shows all message types available on opened log") -parser.add_argument("--show-loss", action='store_true', help="Shows changes in lost messages") -parser.add_argument("--source-system", type=int, default=None, help="filter by source system ID") -parser.add_argument("--source-component", type=int, default=None, help="filter by source component ID") -parser.add_argument("--link", type=int, default=None, help="filter by comms link ID") -parser.add_argument("--verbose", action='store_true', help="Dump messages in a much more verbose (but non-parseable) format") -parser.add_argument("--mav10", action='store_true', help="parse as MAVLink1") -parser.add_argument("--reduce", type=int, default=0, help="reduce streaming messages") -parser.add_argument("--reduce-rate", type=float, default=0, help="reduce messages to maximum rate in Hz") -parser.add_argument("log", metavar="LOG") -parser.add_argument("--profile", action='store_true', help="run the Yappi python profiler") -parser.add_argument("--meta", action='store_true', help="output meta-data msgs even if not matching condition") - -args = parser.parse_args() - -if not args.mav10: - os.environ['MAVLINK20'] = '1' - -if args.profile: - import yappi # We do the import here so that we won't barf if run normally and yappi not available - yappi.start() - -if args.format == 'mat': - # Check that the mat_file argument has been specified - if args.mat_file is None: - print("mat_file argument must be specified when mat format is selected") - sys.exit(1) - # Load these modules here, as they're only needed for MAT file creation - import scipy.io - -filename = args.log -mlog = mavutil.mavlink_connection(filename, planner_format=args.planner, - notimestamps=args.notimestamps, - robust_parsing=args.robust, - dialect=args.dialect, - zero_time_base=args.zero_time_base) - -output = None -if args.output: - output = open(args.output, mode='wb') - -if args.csv_sep == "tab": - args.csv_sep = "\t" - -types = args.types -if types is not None: - types = types.split(',') - -nottypes = args.nottypes -if nottypes is not None: - nottypes = nottypes.split(',') - -ext = os.path.splitext(filename)[1] -isbin = ext in ['.bin', '.BIN', '.px4log'] -islog = ext in ['.log', '.LOG'] # NOTE: "islog" does not mean a tlog -istlog = ext in ['.tlog', '.TLOG'] - -# list of msgs to reduce in rate when --reduce is used -reduction_msgs = ['NKF*', 'XKF*', 'IMU*', 'AHR2', 'BAR*', 'ATT', 'BAT*', 'CTUN', 'NTUN', 'GP*', 'IMT*', 'MAG*', 'PL', 'POS', 'POW*', 'RATE', 'RC*', 'RFND', 'UBX*', 'VIBE', 'NKQ*', 'MOT*', 'CTRL', 'FTS*', 'DSF', 'CST*', 'LOS*', 'UWB*'] -reduction_yes = set() -reduction_no = set() -reduction_count = {} - -def reduce_msg(mtype, reduction_ratio): +def reduce_msg(mtype, reduction_ratio, reduction_yes, reduction_no, reduction_count): '''return True if this msg should be discarded by reduction''' if mtype in reduction_no: return False if not mtype in reduction_yes: + reduction_msgs = ['NKF*', 'XKF*', 'IMU*', 'AHR2', 'BAR*', 'ATT', 'BAT*', 'CTUN', 'NTUN', 'GP*', 'IMT*', 'MAG*', 'PL', 'POS', 'POW*', 'RATE', 'RC*', 'RFND', 'UBX*', 'VIBE', 'NKQ*', 'MOT*', 'CTRL', 'FTS*', 'DSF', 'CST*', 'LOS*', 'UWB*'] for m in reduction_msgs: if fnmatch.fnmatch(mtype, m): reduction_yes.add(mtype) @@ -121,9 +38,7 @@ def reduce_msg(mtype, reduction_ratio): return False return True -last_msg_rate_t = {} - -def reduce_rate_msg(m, reduction_rate): +def reduce_rate_msg(m, reduction_rate, last_msg_rate_t): '''return True if this msg should be discarded by reduction''' mtype = m.get_type() if mtype in ['PARM','MSG','FMT','FMTU','MULT','MODE','EVT','UNIT', 'VER']: @@ -140,7 +55,6 @@ def reduce_rate_msg(m, reduction_rate): return False return True - def match_type(mtype, patterns): '''return True if mtype matches pattern''' for p in patterns: @@ -148,227 +62,350 @@ def match_type(mtype, patterns): return True return False -# Write out a header row as we're outputting in CSV format. -fields = ['timestamp'] -offsets = {} -if istlog and args.format == 'csv': # we know our fields from the get-go - try: - currentOffset = 1 # Store how many fields in we are for each message. - for type in types: - try: - typeClass = "MAVLink_{0}_message".format(type.lower()) - fields += [type + '.' + x for x in inspect.getfullargspec(getattr(mavutil.mavlink, typeClass).__init__).args[1:]] - offsets[type] = currentOffset - currentOffset += len(fields) - except IndexError: - sys.exit(1) - except AttributeError: - print("Message type '%s' not found" % (type)) - sys.exit(1) - except TypeError: - print("You must specify a list of message types if outputting CSV format via the --types argument.") - sys.exit(1) - - # The first line output are names for all columns - print(args.csv_sep.join(fields)) - -if (isbin or islog) and args.format == 'csv': # need to accumulate columns from message - if types is None or len(types) != 1: - print("Need exactly one type when dumping CSV from bin file") - sys.exit(1) - -# Track types found -available_types = set() - -# for DF logs pre-calculate types list -match_types=None -if types is not None and hasattr(mlog, 'name_to_id'): - for k in mlog.name_to_id.keys(): - if match_type(k, types): - if nottypes is not None and match_type(k, nottypes): +def parse_args(): + parser = ArgumentParser(description=__doc__) + + parser.add_argument("--no-timestamps", action='store_true', help="Log doesn't have timestamps") + parser.add_argument("--planner", action='store_true', help="use planner file format") + parser.add_argument("--robust", action='store_true', help="Enable robust parsing (skip over bad data)") + parser.add_argument("-f", "--follow", action='store_true', help="keep waiting for more data at end of file (not implemented for .bin, .log, .csv") + parser.add_argument("--condition", default=None, help="select packets by condition") + parser.add_argument("-q", "--quiet", action='store_true', help="don't display packets") + parser.add_argument("-o", "--output", default=None, help="output matching packets to give file") + parser.add_argument("-p", "--parms", action='store_true', help="preserve parameters in output with -o") + parser.add_argument("--format", default=None, help="Change the output format between 'standard', 'json', 'csv' and 'mat'. For the CSV output, you must supply types that you want. For MAT output, specify output file with --mat_file") + parser.add_argument("--csv_sep", dest="csv_sep", default=",", help="Select the delimiter between columns for the output CSV file. Use 'tab' to specify tabs. Only applies when --format=csv") + parser.add_argument("--types", default=None, help="types of messages (comma separated with wildcard)") + parser.add_argument("--nottypes", default=None, help="types of messages not to include (comma separated with wildcard)") + parser.add_argument("--mat_file", dest="mat_file", help="Output file path for MATLAB file output. Only applies when --format=mat") + parser.add_argument("-c", "--compress", action='store_true', help="Compress .mat file data") + parser.add_argument("--dialect", default="ardupilotmega", help="MAVLink dialect") + parser.add_argument("--zero-time-base", action='store_true', help="use Z time base for DF logs") + parser.add_argument("--no-bad-data", action='store_true', help="Don't output corrupted messages") + parser.add_argument("--show-source", action='store_true', help="Show source system ID and component ID") + parser.add_argument("--show-seq", action='store_true', help="Show sequence numbers") + parser.add_argument("--show-types", action='store_true', help="Shows all message types available on opened log") + parser.add_argument("--show-loss", action='store_true', help="Shows changes in lost messages") + parser.add_argument("--source-system", type=int, default=None, help="filter by source system ID") + parser.add_argument("--source-component", type=int, default=None, help="filter by source component ID") + parser.add_argument("--link", type=int, default=None, help="filter by comms link ID") + parser.add_argument("--verbose", action='store_true', help="Dump messages in a much more verbose (but non-parseable) format") + parser.add_argument("--mav10", action='store_true', help="parse as MAVLink1") + parser.add_argument("--reduce", type=int, default=0, help="reduce streaming messages") + parser.add_argument("--reduce-rate", type=float, default=0, help="reduce messages to maximum rate in Hz") + parser.add_argument("log", metavar="LOG") + parser.add_argument("--profile", action='store_true', help="run the Yappi python profiler") + parser.add_argument("--meta", action='store_true', help="output meta-data msgs even if not matching condition") + + return parser.parse_args() + +def dump_log( + no_timestamps: bool = False, + planner: bool = False, + robust: bool = False, + follow: bool = False, + condition: str = None, + quiet: bool = False, + output: str = None, + parms: bool = False, + format: str = None, + csv_sep: str = ",", + types: str = None, + nottypes: str = None, + mat_file: str = None, + compress: bool = False, + dialect: str = "ardupilotmega", + zero_time_base: bool = False, + no_bad_data: bool = False, + show_source: bool = False, + show_seq: bool = False, + show_types: bool = False, + show_loss: bool = False, + source_system: int = None, + source_component: int = None, + link: int = None, + verbose: bool = False, + mav10: bool = False, + reduce: int = 0, + reduce_rate: float = 0, + log: str = None, + profile: bool = False, + meta: bool = False, +): + + + if not mav10: + os.environ['MAVLINK20'] = '1' + + if profile: + import yappi # We do the import here so that we won't barf if run normally and yappi not available + yappi.start() + + if format == 'mat': + # Check that the mat_file argument has been specified + if mat_file is None: + print("mat_file argument must be specified when mat format is selected") + sys.exit(1) + # Load these modules here, as they're only needed for MAT file creation + import scipy.io + + filename = log + mlog = mavutil.mavlink_connection(filename, planner_format=planner, + no_timestamps=no_timestamps, + robust_parsing=robust, + dialect=dialect, + zero_time_base=zero_time_base) + + output = None + if output: + output = open(output, mode='wb') + + if csv_sep == "tab": + csv_sep = "\t" + + types = types + if types is not None: + types = types.split(',') + + nottypes = nottypes + if nottypes is not None: + nottypes = nottypes.split(',') + + ext = os.path.splitext(filename)[1] + isbin = ext in ['.bin', '.BIN', '.px4log'] + islog = ext in ['.log', '.LOG'] # NOTE: "islog" does not mean a tlog + istlog = ext in ['.tlog', '.TLOG'] + + reduction_yes = set() + reduction_no = set() + reduction_count = {} + last_msg_rate_t = {} + + # Write out a header row as we're outputting in CSV format. + fields = ['timestamp'] + offsets = {} + if istlog and format == 'csv': # we know our fields from the get-go + try: + currentOffset = 1 # Store how many fields in we are for each message. + for mtype in types: + try: + typeClass = "MAVLink_{0}_message".format(mtype.lower()) + fields += [mtype + '.' + x for x in inspect.getfullargspec(getattr(mavutil.mavlink, typeClass).__init__).args[1:]] + offsets[mtype] = currentOffset + currentOffset += len(fields) + except IndexError: + sys.exit(1) + except AttributeError: + print("Message type '%s' not found" % (mtype)) + sys.exit(1) + except TypeError: + print("You must specify a list of message types if outputting CSV format via the --types argument.") + sys.exit(1) + + # The first line output are names for all columns + print(csv_sep.join(fields)) + + if (isbin or islog) and format == 'csv': # need to accumulate columns from message + if types is None or len(types) != 1: + print("Need exactly one type when dumping CSV from bin file") + sys.exit(1) + + # Track types found + available_types = set() + + # for DF logs pre-calculate types list + match_types=None + if types is not None and hasattr(mlog, 'name_to_id'): + for k in mlog.name_to_id.keys(): + if match_type(k, types): + if nottypes is not None and match_type(k, nottypes): + continue + if match_types is None: + match_types = [] + match_types.append(k) + + if (isbin or islog) and format == 'csv': + # Make sure the specified type was found + if match_types is None: + print("Specified type '%s' not found in log file" % (types[0])) + sys.exit(1) + # we need FMT messages for column headings + match_types.append("FMT") + + last_loss = 0 + + # Keep track of data from the current timestep. If the following timestep has the same data, it's stored in here as well. Output should therefore have entirely unique timesteps. + MAT = {} # Dictionary to hold output data for 'mat' format option + while True: + m = mlog.recv_match(blocking=follow, type=match_types) + if m is None: + break + m_type = m.get_type() + available_types.add(m_type) + if (isbin or islog) and m_type == "FMT" and format == 'csv': + if m.Name == types[0]: + fields += m.Columns.split(',') + print(csv_sep.join(fields)) + + if reduce and reduce_msg(m_type, reduce, reduction_yes, reduction_no, reduction_count): + continue + + if reduce_rate > 0 and reduce_rate_msg(m, reduce_rate, last_msg_rate_t): + continue + + if output is not None: + if (isbin or islog) and m_type == "FMT": + output.write(m.get_msgbuf()) + continue + if (isbin or islog) and m_type in ["FMTU", "MULT", "UNIT"]: + output.write(m.get_msgbuf()) + continue + if (isbin or islog) and (m_type == "PARM" and parms): + output.write(m.get_msgbuf()) + continue + if m_type == 'PARAM_VALUE' and parms: + timestamp = getattr(m, '_timestamp', None) + output.write(struct.pack('>Q', int(timestamp*1.0e6)) + m.get_msgbuf()) continue - if match_types is None: - match_types = [] - match_types.append(k) - -if (isbin or islog) and args.format == 'csv': - # Make sure the specified type was found - if match_types is None: - print("Specified type '%s' not found in log file" % (types[0])) - sys.exit(1) - # we need FMT messages for column headings - match_types.append("FMT") - -last_loss = 0 - -# Keep track of data from the current timestep. If the following timestep has the same data, it's stored in here as well. Output should therefore have entirely unique timesteps. -MAT = {} # Dictionary to hold output data for 'mat' format option -while True: - m = mlog.recv_match(blocking=args.follow, type=match_types) - if m is None: - break - m_type = m.get_type() - available_types.add(m_type) - if (isbin or islog) and m_type == "FMT" and args.format == 'csv': - if m.Name == types[0]: - fields += m.Columns.split(',') - print(args.csv_sep.join(fields)) - - if args.reduce and reduce_msg(m_type, args.reduce): - continue - - if args.reduce_rate > 0 and reduce_rate_msg(m, args.reduce_rate): - continue - - if output is not None: - if (isbin or islog) and m_type == "FMT": - output.write(m.get_msgbuf()) + + if not mavutil.evaluate_condition(condition, mlog.messages) and ( + not (m_type in ['FMT', 'FMTU', 'MULT', 'PARM', 'MODE', 'UNIT', 'VER','CMD','MAVC','MSG','EV'] and meta)): continue - if (isbin or islog) and m_type in ["FMTU", "MULT", "UNIT"]: - output.write(m.get_msgbuf()) + if source_system is not None and source_system != m.get_srcSystem(): continue - if (isbin or islog) and (m_type == "PARM" and args.parms): - output.write(m.get_msgbuf()) + if source_component is not None and source_component != m.get_srcComponent(): continue - if m_type == 'PARAM_VALUE' and args.parms: - timestamp = getattr(m, '_timestamp', None) - output.write(struct.pack('>Q', int(timestamp*1.0e6)) + m.get_msgbuf()) + if link is not None and link != m._link: continue - if not mavutil.evaluate_condition(args.condition, mlog.messages) and ( - not (m_type in ['FMT', 'FMTU', 'MULT', 'PARM', 'MODE', 'UNIT', 'VER','CMD','MAVC','MSG','EV'] and args.meta)): - continue - if args.source_system is not None and args.source_system != m.get_srcSystem(): - continue - if args.source_component is not None and args.source_component != m.get_srcComponent(): - continue - if args.link is not None and args.link != m._link: - continue + if types is not None and m_type != 'BAD_DATA' and not match_type(m_type, types): + continue - if types is not None and m_type != 'BAD_DATA' and not match_type(m_type, types): - continue + if nottypes is not None and match_type(m_type, nottypes): + continue - if nottypes is not None and match_type(m_type, nottypes): - continue + # Ignore BAD_DATA messages is the user requested or if they're because of a bad prefix. The + # latter case is normally because of a mismatched MAVLink version. + if m_type == 'BAD_DATA' and (no_bad_data is True or m.reason == "Bad prefix"): + continue - # Ignore BAD_DATA messages is the user requested or if they're because of a bad prefix. The - # latter case is normally because of a mismatched MAVLink version. - if m_type == 'BAD_DATA' and (args.no_bad_data is True or m.reason == "Bad prefix"): - continue + # Grab the timestamp. + timestamp = getattr(m, '_timestamp', 0.0) - # Grab the timestamp. - timestamp = getattr(m, '_timestamp', 0.0) + # If we're just logging, pack in the timestamp and data into the output file. + if output: + if not (isbin or islog): + output.write(struct.pack('>Q', int(timestamp*1.0e6))) + try: + output.write(m.get_msgbuf()) + except Exception as ex: + print("Failed to write msg %s: %s" % (m_type, str(ex))) - # If we're just logging, pack in the timestamp and data into the output file. - if output: - if not (isbin or islog): - output.write(struct.pack('>Q', int(timestamp*1.0e6))) - try: - output.write(m.get_msgbuf()) - except Exception as ex: - print("Failed to write msg %s: %s" % (m_type, str(ex))) - - # If quiet is specified, don't display output to the terminal. - if args.quiet: - continue - - # If JSON was ordered, serve it up. Split it nicely into metadata and data. - if args.format == 'json': - # Format our message as a Python dict, which gets us almost to proper JSON format - data = m.to_dict() - - # Remove the mavpackettype value as we specify that later. - del data['mavpackettype'] - - # Also, if it's a BAD_DATA message, make it JSON-compatible by removing array objects - if 'data' in data and type(data['data']) is not dict: - data['data'] = list(data['data']) - - # Prepare the message as a single object with 'meta' and 'data' keys holding - # the message's metadata and actual data respectively. - meta = {"type": m_type, "timestamp": timestamp} - if args.show_source: - meta["srcSystem"] = m.get_srcSystem() - meta["srcComponent"] = m.get_srcComponent() - - # convert any array.array (e.g. packed-16-bit fft readings) into lists: - for key in data.keys(): - if type(data[key]) == array.array: - data[key] = list(data[key]) - # convert any byte-strings into utf-8 strings. Don't die trying. - for key in data.keys(): - if type(data[key]) == bytes: - data[key] = to_string(data[key]) - outMsg = {"meta": meta, "data": data} - - # Now print out this object with stringified properly. - print(json.dumps(outMsg)) - - # CSV format outputs columnar data with a user-specified delimiter - elif args.format == 'csv': - data = m.to_dict() - if isbin or islog: - csv_out = [str(data[y]) if y != "timestamp" else "" for y in fields] + # If quiet is specified, don't display output to the terminal. + if quiet: + continue + + # If JSON was ordered, serve it up. Split it nicely into metadata and data. + if format == 'json': + # Format our message as a Python dict, which gets us almost to proper JSON format + data = m.to_dict() + + # Remove the mavpackettype value as we specify that later. + del data['mavpackettype'] + + # Also, if it's a BAD_DATA message, make it JSON-compatible by removing array objects + if 'data' in data and type(data['data']) is not dict: + data['data'] = list(data['data']) + + # Prepare the message as a single object with 'meta' and 'data' keys holding + # the message's metadata and actual data respectively. + meta = {"type": m_type, "timestamp": timestamp} + if show_source: + meta["srcSystem"] = m.get_srcSystem() + meta["srcComponent"] = m.get_srcComponent() + + # convert any array.array (e.g. packed-16-bit fft readings) into lists: + for key in data.keys(): + if type(data[key]) == array.array: + data[key] = list(data[key]) + # convert any byte-strings into utf-8 strings. Don't die trying. + for key in data.keys(): + if type(data[key]) == bytes: + data[key] = to_string(data[key]) + outMsg = {"meta": meta, "data": data} + + # Now print out this object with stringified properly. + print(json.dumps(outMsg)) + + # CSV format outputs columnar data with a user-specified delimiter + elif format == 'csv': + data = m.to_dict() + if isbin or islog: + csv_out = [str(data[y]) if y != "timestamp" else "" for y in fields] + else: + csv_out = [str(data[y.split('.')[-1]]) if y.split('.')[0] == m_type and y.split('.')[-1] in data else "" for y in fields] + csv_out[0] = "{:.8f}".format(timestamp) + print(csv_sep.join(csv_out)) + + # MAT format outputs data to a .mat file specified through the + # --mat_file option + elif format == 'mat': + # If this packet contains data (i.e. is not a FMT + # packet), append the data in this packet to the + # corresponding list + if m_type != 'FMT': + + # If this packet type has not yet been + # seen, add a new entry to the big dict + if m_type not in MAT: + MAT[m_type] = {} + + md = m.to_dict() + del md['mavpackettype'] + cols = md.keys() + for col in cols: + # If this column hasn't had data entered, + # make a new key and list + if col in MAT[m_type]: + MAT[m_type][col].append(md[col]) + else: + MAT[m_type][col] = [md[col]] + elif show_types: + # do nothing + pass + elif verbose and istlog: + mavutil.dump_message_verbose(sys.stdout, m) + print("") + elif verbose and hasattr(m,"dump_verbose"): + m.dump_verbose(sys.stdout) + print("") else: - csv_out = [str(data[y.split('.')[-1]]) if y.split('.')[0] == m_type and y.split('.')[-1] in data else "" for y in fields] - csv_out[0] = "{:.8f}".format(timestamp) - print(args.csv_sep.join(csv_out)) - - # MAT format outputs data to a .mat file specified through the - # --mat_file option - elif args.format == 'mat': - # If this packet contains data (i.e. is not a FMT - # packet), append the data in this packet to the - # corresponding list - if m_type != 'FMT': - - # If this packet type has not yet been - # seen, add a new entry to the big dict - if m_type not in MAT: - MAT[m_type] = {} - - md = m.to_dict() - del md['mavpackettype'] - cols = md.keys() - for col in cols: - # If this column hasn't had data entered, - # make a new key and list - if col in MAT[m_type]: - MAT[m_type][col].append(md[col]) - else: - MAT[m_type][col] = [md[col]] - elif args.show_types: - # do nothing - pass - elif args.verbose and istlog: - mavutil.dump_message_verbose(sys.stdout, m) - print("") - elif args.verbose and hasattr(m,"dump_verbose"): - m.dump_verbose(sys.stdout) - print("") - else: - # Otherwise we output in a standard Python dict-style format - s = "%s.%02u: %s" % (time.strftime("%Y-%m-%d %H:%M:%S", - time.localtime(timestamp)), - int(timestamp*100.0)%100, m) - if args.show_source: - s += " srcSystem=%u srcComponent=%u" % (m.get_srcSystem(), m.get_srcComponent()) - if args.show_seq: - s += " seq=%u" % m.get_seq() - print(s) - if args.show_loss: - if last_loss != mlog.mav_loss: - print("lost %d messages" % (mlog.mav_loss - last_loss)) - last_loss = mlog.mav_loss - -# Export the .mat file -if args.format == 'mat': - scipy.io.savemat(args.mat_file, MAT, do_compression=args.compress) - -if args.show_types: - for msgType in available_types: - print(msgType) - -if args.profile: - yappi.get_func_stats().print_all() - yappi.get_thread_stats().print_all() + # Otherwise we output in a standard Python dict-style format + s = "%s.%02u: %s" % (time.strftime("%Y-%m-%d %H:%M:%S", + time.localtime(timestamp)), + int(timestamp*100.0)%100, m) + if show_source: + s += " srcSystem=%u srcComponent=%u" % (m.get_srcSystem(), m.get_srcComponent()) + if show_seq: + s += " seq=%u" % m.get_seq() + print(s) + if show_loss: + if last_loss != mlog.mav_loss: + print("lost %d messages" % (mlog.mav_loss - last_loss)) + last_loss = mlog.mav_loss + + # Export the .mat file + if format == 'mat': + scipy.io.savemat(mat_file, MAT, do_compression=compress) + + if show_types: + for msgType in available_types: + print(msgType) + + if profile: + yappi.get_func_stats().print_all() + yappi.get_thread_stats().print_all() + +if __name__=="__main__": + args = parse_args() + dump_log(**vars(args)) \ No newline at end of file From 3ce89368453fc773573fdbe4b85e89e2cc7f7415 Mon Sep 17 00:00:00 2001 From: --replace-all Date: Wed, 27 Aug 2025 11:12:32 +0000 Subject: [PATCH 08/14] Decompose dump_log --- tools/mavlogdump.py | 160 +++++++++++++++++++++++--------------------- 1 file changed, 85 insertions(+), 75 deletions(-) diff --git a/tools/mavlogdump.py b/tools/mavlogdump.py index 06f3f18a6..abf2ad1b1 100755 --- a/tools/mavlogdump.py +++ b/tools/mavlogdump.py @@ -62,6 +62,86 @@ def match_type(mtype, patterns): return True return False +def handle_json_output(m, m_type, timestamp, show_source): + '''Handle JSON format output''' + # Format our message as a Python dict, which gets us almost to proper JSON format + data = m.to_dict() + + # Remove the mavpackettype value as we specify that later. + del data['mavpackettype'] + + # Also, if it's a BAD_DATA message, make it JSON-compatible by removing array objects + if 'data' in data and type(data['data']) is not dict: + data['data'] = list(data['data']) + + # Prepare the message as a single object with 'meta' and 'data' keys holding + # the message's metadata and actual data respectively. + meta = {"type": m_type, "timestamp": timestamp} + if show_source: + meta["srcSystem"] = m.get_srcSystem() + meta["srcComponent"] = m.get_srcComponent() + + # convert any array.array (e.g. packed-16-bit fft readings) into lists: + for key in data.keys(): + if type(data[key]) == array.array: + data[key] = list(data[key]) + # convert any byte-strings into utf-8 strings. Don't die trying. + for key in data.keys(): + if type(data[key]) == bytes: + data[key] = to_string(data[key]) + outMsg = {"meta": meta, "data": data} + + # Now print out this object with stringified properly. + print(json.dumps(outMsg)) + +def handle_csv_output(m, m_type, timestamp, csv_sep, fields, isbin, islog): + '''Handle CSV format output''' + data = m.to_dict() + if isbin or islog: + csv_out = [str(data[y]) if y != "timestamp" else "" for y in fields] + else: + csv_out = [str(data[y.split('.')[-1]]) if y.split('.')[0] == m_type and y.split('.')[-1] in data else "" for y in fields] + csv_out[0] = "{:.8f}".format(timestamp) + print(csv_sep.join(csv_out)) + +def handle_mat_output(m, m_type, MAT): + '''Handle MAT format output''' + # If this packet contains data (i.e. is not a FMT + # packet), append the data in this packet to the + # corresponding list + if m_type == 'FMT': + return + + # If this packet type has not yet been + # seen, add a new entry to the big dict + if m_type not in MAT: + MAT[m_type] = {} + + md = m.to_dict() + del md['mavpackettype'] + cols = md.keys() + for col in cols: + # If this column hasn't had data entered, + # make a new key and list + if col in MAT[m_type]: + MAT[m_type][col].append(md[col]) + else: + MAT[m_type][col] = [md[col]] + # Export the .mat file + scipy.io.savemat(mat_file, MAT, do_compression=compress) + +def handle_standard_output(m, timestamp, show_source, show_seq): + '''Handle standard format output''' + # Otherwise we output in a standard Python dict-style format + s = "%s.%02u: %s" % (time.strftime("%Y-%m-%d %H:%M:%S", + time.localtime(timestamp)), + int(timestamp*100.0)%100, m) + if show_source: + s += " srcSystem=%u srcComponent=%u" % (m.get_srcSystem(), m.get_srcComponent()) + if show_seq: + s += " seq=%u" % m.get_seq() + print(s) + def parse_args(): parser = ArgumentParser(description=__doc__) @@ -305,71 +385,13 @@ def dump_log( if quiet: continue - # If JSON was ordered, serve it up. Split it nicely into metadata and data. + # Handle different output formats if format == 'json': - # Format our message as a Python dict, which gets us almost to proper JSON format - data = m.to_dict() - - # Remove the mavpackettype value as we specify that later. - del data['mavpackettype'] - - # Also, if it's a BAD_DATA message, make it JSON-compatible by removing array objects - if 'data' in data and type(data['data']) is not dict: - data['data'] = list(data['data']) - - # Prepare the message as a single object with 'meta' and 'data' keys holding - # the message's metadata and actual data respectively. - meta = {"type": m_type, "timestamp": timestamp} - if show_source: - meta["srcSystem"] = m.get_srcSystem() - meta["srcComponent"] = m.get_srcComponent() - - # convert any array.array (e.g. packed-16-bit fft readings) into lists: - for key in data.keys(): - if type(data[key]) == array.array: - data[key] = list(data[key]) - # convert any byte-strings into utf-8 strings. Don't die trying. - for key in data.keys(): - if type(data[key]) == bytes: - data[key] = to_string(data[key]) - outMsg = {"meta": meta, "data": data} - - # Now print out this object with stringified properly. - print(json.dumps(outMsg)) - - # CSV format outputs columnar data with a user-specified delimiter + handle_json_output(m, m_type, timestamp, show_source) elif format == 'csv': - data = m.to_dict() - if isbin or islog: - csv_out = [str(data[y]) if y != "timestamp" else "" for y in fields] - else: - csv_out = [str(data[y.split('.')[-1]]) if y.split('.')[0] == m_type and y.split('.')[-1] in data else "" for y in fields] - csv_out[0] = "{:.8f}".format(timestamp) - print(csv_sep.join(csv_out)) - - # MAT format outputs data to a .mat file specified through the - # --mat_file option + handle_csv_output(m, m_type, timestamp, csv_sep, fields, isbin, islog) elif format == 'mat': - # If this packet contains data (i.e. is not a FMT - # packet), append the data in this packet to the - # corresponding list - if m_type != 'FMT': - - # If this packet type has not yet been - # seen, add a new entry to the big dict - if m_type not in MAT: - MAT[m_type] = {} - - md = m.to_dict() - del md['mavpackettype'] - cols = md.keys() - for col in cols: - # If this column hasn't had data entered, - # make a new key and list - if col in MAT[m_type]: - MAT[m_type][col].append(md[col]) - else: - MAT[m_type][col] = [md[col]] + handle_mat_output(m, m_type, MAT) elif show_types: # do nothing pass @@ -380,24 +402,12 @@ def dump_log( m.dump_verbose(sys.stdout) print("") else: - # Otherwise we output in a standard Python dict-style format - s = "%s.%02u: %s" % (time.strftime("%Y-%m-%d %H:%M:%S", - time.localtime(timestamp)), - int(timestamp*100.0)%100, m) - if show_source: - s += " srcSystem=%u srcComponent=%u" % (m.get_srcSystem(), m.get_srcComponent()) - if show_seq: - s += " seq=%u" % m.get_seq() - print(s) + handle_standard_output(m, timestamp, show_source, show_seq) if show_loss: if last_loss != mlog.mav_loss: print("lost %d messages" % (mlog.mav_loss - last_loss)) last_loss = mlog.mav_loss - # Export the .mat file - if format == 'mat': - scipy.io.savemat(mat_file, MAT, do_compression=compress) - if show_types: for msgType in available_types: print(msgType) From 0e10d3c706a213f1bca489de6a6a48668be14313 Mon Sep 17 00:00:00 2001 From: --replace-all Date: Wed, 27 Aug 2025 11:20:59 +0000 Subject: [PATCH 09/14] Undo matlab save consolidation --- tools/mavlogdump.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tools/mavlogdump.py b/tools/mavlogdump.py index abf2ad1b1..450adede6 100755 --- a/tools/mavlogdump.py +++ b/tools/mavlogdump.py @@ -127,8 +127,6 @@ def handle_mat_output(m, m_type, MAT): MAT[m_type][col].append(md[col]) else: MAT[m_type][col] = [md[col]] - # Export the .mat file - scipy.io.savemat(mat_file, MAT, do_compression=compress) def handle_standard_output(m, timestamp, show_source, show_seq): '''Handle standard format output''' @@ -222,12 +220,13 @@ def dump_log( yappi.start() if format == 'mat': + # Scipy needed only for matlab format + from scipy.io import savemat + # Check that the mat_file argument has been specified if mat_file is None: print("mat_file argument must be specified when mat format is selected") sys.exit(1) - # Load these modules here, as they're only needed for MAT file creation - import scipy.io filename = log mlog = mavutil.mavlink_connection(filename, planner_format=planner, @@ -408,6 +407,9 @@ def dump_log( print("lost %d messages" % (mlog.mav_loss - last_loss)) last_loss = mlog.mav_loss + if format == 'mat': + # Export the .mat file + savemat(mat_file, MAT, do_compression=compress) if show_types: for msgType in available_types: print(msgType) From b8565b4b65a2d395979c849a7286cc0a6a78693f Mon Sep 17 00:00:00 2001 From: --replace-all Date: Wed, 27 Aug 2025 11:22:11 +0000 Subject: [PATCH 10/14] Fix typo --- tools/mavlogdump.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/mavlogdump.py b/tools/mavlogdump.py index 450adede6..2a3f6ee36 100755 --- a/tools/mavlogdump.py +++ b/tools/mavlogdump.py @@ -149,7 +149,7 @@ def parse_args(): parser.add_argument("-f", "--follow", action='store_true', help="keep waiting for more data at end of file (not implemented for .bin, .log, .csv") parser.add_argument("--condition", default=None, help="select packets by condition") parser.add_argument("-q", "--quiet", action='store_true', help="don't display packets") - parser.add_argument("-o", "--output", default=None, help="output matching packets to give file") + parser.add_argument("-o", "--output", default=None, help="output matching packets to given file") parser.add_argument("-p", "--parms", action='store_true', help="preserve parameters in output with -o") parser.add_argument("--format", default=None, help="Change the output format between 'standard', 'json', 'csv' and 'mat'. For the CSV output, you must supply types that you want. For MAT output, specify output file with --mat_file") parser.add_argument("--csv_sep", dest="csv_sep", default=",", help="Select the delimiter between columns for the output CSV file. Use 'tab' to specify tabs. Only applies when --format=csv") From 2653cf9785f0e8ed6914958fc904ce14a3c3a6bb Mon Sep 17 00:00:00 2001 From: --replace-all Date: Wed, 27 Aug 2025 11:25:12 +0000 Subject: [PATCH 11/14] Fix output bug --- tools/mavlogdump.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/mavlogdump.py b/tools/mavlogdump.py index 2a3f6ee36..88f8ae8cd 100755 --- a/tools/mavlogdump.py +++ b/tools/mavlogdump.py @@ -235,7 +235,6 @@ def dump_log( dialect=dialect, zero_time_base=zero_time_base) - output = None if output: output = open(output, mode='wb') From db439cda29d89d557978dbb210e3430dab80d4cb Mon Sep 17 00:00:00 2001 From: --replace-all Date: Wed, 27 Aug 2025 14:10:25 +0000 Subject: [PATCH 12/14] Decouple writes, reduce side-effects, simplify UX; fix json output (make valid); update tests accordingly --- tests/test_mavlogdump.py | 255 ++++++++++------------ tools/mavlogdump.py | 447 +++++++++++++++++++-------------------- 2 files changed, 331 insertions(+), 371 deletions(-) diff --git a/tests/test_mavlogdump.py b/tests/test_mavlogdump.py index dde502f18..77301add4 100755 --- a/tests/test_mavlogdump.py +++ b/tests/test_mavlogdump.py @@ -15,6 +15,8 @@ sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'tools'))) +from tools import mavlogdump + class MAVLogDumpTest(unittest.TestCase): @@ -43,65 +45,83 @@ def tearDown(self): def test_dump_standard_format(self): """Test standard format dump of file""" output_file = os.path.join(self.test_dir, "standard_output.txt") - cmd = f"{self.mavlogdump_path} {self.test_filepath} > {output_file}" - result = os.system(cmd) - self.assertEqual(result, 0, "Standard format dump should succeed") + mavlogdump.dump_log( + output_path=output_file, + format='standard', + log=self.test_filepath, + ) + self.assertTrue(os.path.exists(output_file), "Output file should be created") def test_dump_json_format(self): """Test JSON format output""" output_file = os.path.join(self.test_dir, "json_output.txt") - cmd = f"{self.mavlogdump_path} --format json {self.test_filepath} > {output_file}" - result = os.system(cmd) - self.assertEqual(result, 0, "JSON format dump should succeed") + mavlogdump.dump_log( + output_path=output_file, + format='json', + log=self.test_filepath, + ) self.assertTrue(os.path.exists(output_file), "JSON output file should be created") # Verify JSON format if file has content if os.path.getsize(output_file) > 0: with open(output_file, 'r') as f: - for line in f: - if line.strip(): - try: - data = json.loads(line) - self.assertIn('meta', data, "JSON output should have 'meta' field") - self.assertIn('data', data, "JSON output should have 'data' field") - break - except json.JSONDecodeError: - pass + content = f.read().strip() + if content: + try: + # JSON output is now an array + if content.startswith('[') and content.endswith(']'): + data = json.loads(content) + if data: # If array is not empty + first_item = data[0] + self.assertIn('meta', first_item, "JSON output should have 'meta' field") + self.assertIn('data', first_item, "JSON output should have 'data' field") + except json.JSONDecodeError as e: + pass # File might be empty or invalid, which is OK for test files def test_dump_json_with_show_source(self): """Test JSON format with show-source option""" output_file = os.path.join(self.test_dir, "json_source_output.txt") - cmd = f"{self.mavlogdump_path} --format json --show-source {self.test_filepath} > {output_file}" - result = os.system(cmd) - self.assertEqual(result, 0, "JSON format with show-source should succeed") + mavlogdump.dump_log( + output_path=output_file, + format='json', + show_source=True, + log=self.test_filepath, + ) self.assertTrue(os.path.exists(output_file), "JSON output file should be created") # Verify JSON includes source info if file has content if os.path.getsize(output_file) > 0: with open(output_file, 'r') as f: - for line in f: - if line.strip(): - try: - data = json.loads(line) - if 'meta' in data: - # Check if source fields are present when data is available - if data.get('data'): - self.assertIn('type', data['meta'], "Meta should have type field") - break - except json.JSONDecodeError: - pass + content = f.read().strip() + if content: + try: + # JSON output is now an array + if content.startswith('[') and content.endswith(']'): + data = json.loads(content) + if data: # If array is not empty + first_item = data[0] + if 'meta' in first_item: + # Check if source fields are present when data is available + if first_item.get('data'): + self.assertIn('type', first_item['meta'], "Meta should have type field") + except json.JSONDecodeError: + pass # File might be empty or invalid, which is OK for test files def test_dump_csv_format(self): """Test CSV format output""" output_file = os.path.join(self.test_dir, "csv_output.csv") - # CSV format requires --types to be specified - cmd = f"{self.mavlogdump_path} --format csv --types 'IMU2' {self.test_filepath} > {output_file} 2>/dev/null" - os.system(cmd) + output_file='testing.csv' + mavlogdump.dump_log( + output_path=output_file, + format='csv', + types='IMU2', + log=self.test_filepath, + ) # Check if file was created (even if empty) if os.path.exists(output_file): with open(output_file, 'r') as f: @@ -117,8 +137,14 @@ def test_dump_csv_format(self): def test_dump_csv_with_custom_separator(self): """Test CSV format with custom separator""" output_file = os.path.join(self.test_dir, "csv_tab_output.csv") - cmd = f"{self.mavlogdump_path} --format csv --csv_sep tab --types 'IMU2' {self.test_filepath} > {output_file} 2>/dev/null" - result = os.system(cmd) + + mavlogdump.dump_log( + output_path=output_file, + format='csv', + csv_sep="tab", + types='IMU2', + log=self.test_filepath, + ) if os.path.exists(output_file) and os.path.getsize(output_file) > 0: with open(output_file, 'r') as f: @@ -165,20 +191,10 @@ def test_nottype_filtering(self): self.assertEqual(result, 0, "Type exclusion should succeed") self.assertTrue(os.path.exists(output_file), "Excluded output file should be created") - def test_quiet_mode(self): - """Test quiet mode suppresses output""" - output_file = os.path.join(self.test_dir, "quiet_output.txt") - cmd = f"{self.mavlogdump_path} --quiet {self.test_filepath} > {output_file}" - result = os.system(cmd) - - self.assertEqual(result, 0, "Quiet mode should succeed") - # In quiet mode, output should be minimal or empty - self.assertTrue(os.path.exists(output_file), "Output file should be created even in quiet mode") - def test_output_to_file(self): """Test output to file option""" output_file = os.path.join(self.test_dir, "direct_output.bin") - cmd = f"{self.mavlogdump_path} -q --output {output_file} {self.test_filepath} 2>/dev/null" + cmd = f"{self.mavlogdump_path} --output {output_file} {self.test_filepath} 2>/dev/null" result = os.system(cmd) self.assertEqual(result, 0, "Output to file should succeed") @@ -229,15 +245,6 @@ def test_mav10_option(self): self.assertEqual(result, 0, "MAV1.0 parsing should succeed") self.assertTrue(os.path.exists(output_file), "MAV1.0 output file should be created") - def test_verbose_mode(self): - """Test verbose output mode""" - output_file = os.path.join(self.test_dir, "verbose_output.txt") - cmd = f"{self.mavlogdump_path} --verbose {self.test_filepath} > {output_file} 2>/dev/null" - result = os.system(cmd) - - self.assertEqual(result, 0, "Verbose mode should succeed") - self.assertTrue(os.path.exists(output_file), "Verbose output file should be created") - def test_source_filtering(self): """Test source system and component filtering""" output_file = os.path.join(self.test_dir, "source_filtered.txt") @@ -251,79 +258,49 @@ def test_combined_options(self): """Test combination of multiple options""" output_file = os.path.join(self.test_dir, "combined_output.json") cmd = (f"mavlogdump.py --format json --types 'ATT,GPS' " - f"--quiet --no-bad-data {self.test_filepath} > {output_file} 2>/dev/null") + f"--no-bad-data {self.test_filepath} > {output_file} 2>/dev/null") result = os.system(cmd) self.assertEqual(result, 0, "Combined options should succeed") self.assertTrue(os.path.exists(output_file), "Combined output file should be created") - def test_import_as_module(self): - """Test importing mavlogdump as a module""" - try: - # Try to import the refactored version - from pymavlink.tools import mavlogdump - - # Check that main functions exist - self.assertTrue(hasattr(mavlogdump, 'process_log'), "Should have process_log function") - self.assertTrue(hasattr(mavlogdump, 'process_log_json'), "Should have process_log_json function") - self.assertTrue(hasattr(mavlogdump, 'process_log_csv'), "Should have process_log_csv function") - self.assertTrue(hasattr(mavlogdump, 'process_log_mat'), "Should have process_log_mat function") - except ImportError: - # If new functions don't exist, check for old structure - pass - def test_programmatic_json_processing(self): """Test programmatic JSON processing""" - try: - from pymavlink.tools import mavlogdump - if hasattr(mavlogdump, 'process_log'): - # Test programmatic interface - result = mavlogdump.process_log( - self.test_filepath, - output_format='json', - types=['ATT'], - quiet=True - ) - self.assertEqual(result, 0, "Programmatic JSON processing should succeed") - except ImportError: - self.skipTest("mavlogdump module not importable") + if hasattr(mavlogdump, 'process_log'): + # Test programmatic interface + result = mavlogdump.process_log( + self.test_filepath, + output_format='json', + types=['ATT'] + ) + self.assertEqual(result, 0, "Programmatic JSON processing should succeed") def test_programmatic_csv_processing(self): """Test programmatic CSV processing""" - try: - from pymavlink.tools import mavlogdump - if hasattr(mavlogdump, 'process_log'): - # Test programmatic interface - output_file = os.path.join(self.test_dir, "prog_csv.csv") - result = mavlogdump.process_log( - self.test_filepath, - output_format='csv', - types=['*'], - output=output_file, - quiet=True + if hasattr(mavlogdump, 'process_log'): + # Test programmatic interface + output_file = os.path.join(self.test_dir, "prog_csv.csv") + result = mavlogdump.process_log( + self.test_filepath, + output_format='csv', + types=['*'], + output=output_file, ) - self.assertEqual(result, 0, "Programmatic CSV processing should succeed") - except ImportError: - self.skipTest("mavlogdump module not importable") + self.assertEqual(result, 0, "Programmatic CSV processing should succeed") def test_programmatic_mat_processing(self): """Test programmatic MAT processing""" - try: - from pymavlink.tools import mavlogdump - if hasattr(mavlogdump, 'process_log'): - # Test programmatic interface - mat_file = os.path.join(self.test_dir, "prog_output.mat") - result = mavlogdump.process_log( - self.test_filepath, - output_format='mat', - mat_file=mat_file, - quiet=True + if hasattr(mavlogdump, 'process_log'): + # Test programmatic interface + mat_file = os.path.join(self.test_dir, "prog_output.mat") + result = mavlogdump.process_log( + self.test_filepath, + output_format='mat', + mat_file=mat_file, ) - # MAT processing might fail if scipy is not installed - if result == 0: - self.assertTrue(os.path.exists(mat_file), "Programmatic MAT file should be created") - except ImportError: - self.skipTest("mavlogdump module not importable") + # MAT processing might fail if scipy is not installed + if result == 0: + self.assertTrue(os.path.exists(mat_file), "Programmatic MAT file should be created") class MAVLogDumpUnitTest(unittest.TestCase): @@ -331,40 +308,32 @@ class MAVLogDumpUnitTest(unittest.TestCase): def test_match_type_function(self): """Test the match_type function""" - try: - from pymavlink.tools.mavlogdump import match_type - - # Test exact match - self.assertTrue(match_type('GPS', ['GPS'])) - self.assertFalse(match_type('GPS', ['ATT'])) - - # Test wildcard match - self.assertTrue(match_type('GPS_RAW', ['GPS*'])) - self.assertTrue(match_type('ATT', ['A*'])) - self.assertFalse(match_type('GPS', ['ATT*'])) - - # Test multiple patterns - self.assertTrue(match_type('GPS', ['ATT', 'GPS'])) - self.assertTrue(match_type('ATT', ['ATT', 'GPS'])) - except ImportError: - self.skipTest("match_type function not importable") + + # Test exact match + self.assertTrue(mavlogdump.match_type('GPS', ['GPS'])) + self.assertFalse(mavlogdump.match_type('GPS', ['ATT'])) + + # Test wildcard match + self.assertTrue(mavlogdump.match_type('GPS_RAW', ['GPS*'])) + self.assertTrue(mavlogdump.match_type('ATT', ['A*'])) + self.assertFalse(mavlogdump.match_type('GPS', ['ATT*'])) + + # Test multiple patterns + self.assertTrue(mavlogdump.match_type('GPS', ['ATT', 'GPS'])) + self.assertTrue(mavlogdump.match_type('ATT', ['ATT', 'GPS'])) def test_to_string_function(self): """Test the to_string function""" - try: - from pymavlink.tools.mavlogdump import to_string - - # Test string input - self.assertEqual(to_string("hello"), "hello") - - # Test bytes input - self.assertEqual(to_string(b"hello"), "hello") - - # Test bytes with special characters - result = to_string(b"\xff\xfe") - self.assertIsInstance(result, str) - except ImportError: - self.skipTest("to_string function not importable") + + # Test string input + self.assertEqual(mavlogdump.to_string("hello"), "hello") + + # Test bytes input + self.assertEqual(mavlogdump.to_string(b"hello"), "hello") + + # Test bytes with special characters + result = mavlogdump.to_string(b"\xff\xfe") + self.assertIsInstance(result, str) if __name__ == '__main__': diff --git a/tools/mavlogdump.py b/tools/mavlogdump.py index 88f8ae8cd..0186a8a9d 100755 --- a/tools/mavlogdump.py +++ b/tools/mavlogdump.py @@ -62,7 +62,7 @@ def match_type(mtype, patterns): return True return False -def handle_json_output(m, m_type, timestamp, show_source): +def handle_json_output(m, m_type, timestamp, show_source, output_fh, count): '''Handle JSON format output''' # Format our message as a Python dict, which gets us almost to proper JSON format data = m.to_dict() @@ -91,10 +91,12 @@ def handle_json_output(m, m_type, timestamp, show_source): data[key] = to_string(data[key]) outMsg = {"meta": meta, "data": data} - # Now print out this object with stringified properly. - print(json.dumps(outMsg)) + # TODO: add file write + # Now write this object stringified properly. + output_fh.write(f'{"," if count>0 else ""}\n\t{json.dumps(outMsg)}') -def handle_csv_output(m, m_type, timestamp, csv_sep, fields, isbin, islog): + +def handle_csv_output(m, m_type, timestamp, csv_sep, fields, isbin, islog, output_fh): '''Handle CSV format output''' data = m.to_dict() if isbin or islog: @@ -102,9 +104,9 @@ def handle_csv_output(m, m_type, timestamp, csv_sep, fields, isbin, islog): else: csv_out = [str(data[y.split('.')[-1]]) if y.split('.')[0] == m_type and y.split('.')[-1] in data else "" for y in fields] csv_out[0] = "{:.8f}".format(timestamp) - print(csv_sep.join(csv_out)) + output_fh.write(f'{csv_sep.join(csv_out)}\n') -def handle_mat_output(m, m_type, MAT): +def handle_mat_output(m, m_type, out_dict): '''Handle MAT format output''' # If this packet contains data (i.e. is not a FMT # packet), append the data in this packet to the @@ -114,8 +116,8 @@ def handle_mat_output(m, m_type, MAT): # If this packet type has not yet been # seen, add a new entry to the big dict - if m_type not in MAT: - MAT[m_type] = {} + if m_type not in out_dict: + out_dict[m_type] = {} md = m.to_dict() del md['mavpackettype'] @@ -123,22 +125,38 @@ def handle_mat_output(m, m_type, MAT): for col in cols: # If this column hasn't had data entered, # make a new key and list - if col in MAT[m_type]: - MAT[m_type][col].append(md[col]) + if col in out_dict[m_type]: + out_dict[m_type][col].append(md[col]) else: - MAT[m_type][col] = [md[col]] + out_dict[m_type][col] = [md[col]] -def handle_standard_output(m, timestamp, show_source, show_seq): +def handle_standard_output(m, timestamp, show_source, show_seq, output_fh, isbin, islog, m_type, parms): '''Handle standard format output''' + # Otherwise we output in a standard Python dict-style format - s = "%s.%02u: %s" % (time.strftime("%Y-%m-%d %H:%M:%S", + if output_fh is not sys.stdout: + if (isbin or islog) and (m_type in ["FMT", "FMTU", "MULT", "UNIT"] or (m_type == "PARM" and parms)): + output_fh.write(m.get_msgbuf()) + elif m_type == 'PARAM_VALUE' and parms: + timestamp = getattr(m, '_timestamp', None) + output_fh.write(struct.pack('>Q', int(timestamp*1.0e6)) + m.get_msgbuf()) + else: + s = "%s.%02u: %s" % (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)), - int(timestamp*100.0)%100, m) - if show_source: - s += " srcSystem=%u srcComponent=%u" % (m.get_srcSystem(), m.get_srcComponent()) - if show_seq: - s += " seq=%u" % m.get_seq() - print(s) + int(timestamp*100.0)%100, m) + if show_source: + s += " srcSystem=%u srcComponent=%u" % (m.get_srcSystem(), m.get_srcComponent()) + if show_seq: + s += " seq=%u" % m.get_seq() + output_fh.write(s) + +def handle_pretty_output(m, istlog, output_fh): + if istlog: + mavutil.dump_message_verbose(output_fh, m) + print("") + elif hasattr(m,"dump_verbose"): + m.dump_verbose(output_fh) + print("") def parse_args(): parser = ArgumentParser(description=__doc__) @@ -148,14 +166,12 @@ def parse_args(): parser.add_argument("--robust", action='store_true', help="Enable robust parsing (skip over bad data)") parser.add_argument("-f", "--follow", action='store_true', help="keep waiting for more data at end of file (not implemented for .bin, .log, .csv") parser.add_argument("--condition", default=None, help="select packets by condition") - parser.add_argument("-q", "--quiet", action='store_true', help="don't display packets") - parser.add_argument("-o", "--output", default=None, help="output matching packets to given file") + parser.add_argument("-o", "--output_path", default=None, help="Output file path; if left undefined, writes to stdout.") parser.add_argument("-p", "--parms", action='store_true', help="preserve parameters in output with -o") - parser.add_argument("--format", default=None, help="Change the output format between 'standard', 'json', 'csv' and 'mat'. For the CSV output, you must supply types that you want. For MAT output, specify output file with --mat_file") + parser.add_argument("--format", default='standard', help="Change the output format between 'standard', 'json', 'csv', 'mat', and 'pretty'. For the CSV output, you must supply types that you want. For MAT output, specify output file with -o") parser.add_argument("--csv_sep", dest="csv_sep", default=",", help="Select the delimiter between columns for the output CSV file. Use 'tab' to specify tabs. Only applies when --format=csv") parser.add_argument("--types", default=None, help="types of messages (comma separated with wildcard)") parser.add_argument("--nottypes", default=None, help="types of messages not to include (comma separated with wildcard)") - parser.add_argument("--mat_file", dest="mat_file", help="Output file path for MATLAB file output. Only applies when --format=mat") parser.add_argument("-c", "--compress", action='store_true', help="Compress .mat file data") parser.add_argument("--dialect", default="ardupilotmega", help="MAVLink dialect") parser.add_argument("--zero-time-base", action='store_true', help="use Z time base for DF logs") @@ -167,13 +183,12 @@ def parse_args(): parser.add_argument("--source-system", type=int, default=None, help="filter by source system ID") parser.add_argument("--source-component", type=int, default=None, help="filter by source component ID") parser.add_argument("--link", type=int, default=None, help="filter by comms link ID") - parser.add_argument("--verbose", action='store_true', help="Dump messages in a much more verbose (but non-parseable) format") parser.add_argument("--mav10", action='store_true', help="parse as MAVLink1") parser.add_argument("--reduce", type=int, default=0, help="reduce streaming messages") parser.add_argument("--reduce-rate", type=float, default=0, help="reduce messages to maximum rate in Hz") - parser.add_argument("log", metavar="LOG") parser.add_argument("--profile", action='store_true', help="run the Yappi python profiler") parser.add_argument("--meta", action='store_true', help="output meta-data msgs even if not matching condition") + parser.add_argument("log", metavar="LOG") return parser.parse_args() @@ -183,14 +198,12 @@ def dump_log( robust: bool = False, follow: bool = False, condition: str = None, - quiet: bool = False, - output: str = None, + output_path: str = None, parms: bool = False, format: str = None, csv_sep: str = ",", types: str = None, nottypes: str = None, - mat_file: str = None, compress: bool = False, dialect: str = "ardupilotmega", zero_time_base: bool = False, @@ -202,7 +215,6 @@ def dump_log( source_system: int = None, source_component: int = None, link: int = None, - verbose: bool = False, mav10: bool = False, reduce: int = 0, reduce_rate: float = 0, @@ -211,211 +223,190 @@ def dump_log( meta: bool = False, ): + # set up output file handler based on format and output_path + with open(output_path, mode='wb' if format =='standard' else 'w') if output_path else sys.stdout as output_fh: + + if not mav10: + os.environ['MAVLINK20'] = '1' + + if profile: + import yappi # We do the import here so that we won't barf if run normally and yappi not available + yappi.start() + + if format == 'mat': + # Scipy needed only for matlab format + from scipy.io import savemat + + # Check that the output_path argument has been specified + if output_path is None: + print("output_path argument must be specified when mat format is selected") + sys.exit(1) + + elif format =='json': + output_fh.write('[') + + filename = log + mlog = mavutil.mavlink_connection(filename, planner_format=planner, + no_timestamps=no_timestamps, + robust_parsing=robust, + dialect=dialect, + zero_time_base=zero_time_base) + + + if csv_sep == "tab": + csv_sep = "\t" + + types = types + if types is not None: + types = types.split(',') + + nottypes = nottypes + if nottypes is not None: + nottypes = nottypes.split(',') - if not mav10: - os.environ['MAVLINK20'] = '1' - - if profile: - import yappi # We do the import here so that we won't barf if run normally and yappi not available - yappi.start() - - if format == 'mat': - # Scipy needed only for matlab format - from scipy.io import savemat - - # Check that the mat_file argument has been specified - if mat_file is None: - print("mat_file argument must be specified when mat format is selected") - sys.exit(1) - - filename = log - mlog = mavutil.mavlink_connection(filename, planner_format=planner, - no_timestamps=no_timestamps, - robust_parsing=robust, - dialect=dialect, - zero_time_base=zero_time_base) - - if output: - output = open(output, mode='wb') - - if csv_sep == "tab": - csv_sep = "\t" - - types = types - if types is not None: - types = types.split(',') - - nottypes = nottypes - if nottypes is not None: - nottypes = nottypes.split(',') - - ext = os.path.splitext(filename)[1] - isbin = ext in ['.bin', '.BIN', '.px4log'] - islog = ext in ['.log', '.LOG'] # NOTE: "islog" does not mean a tlog - istlog = ext in ['.tlog', '.TLOG'] - - reduction_yes = set() - reduction_no = set() - reduction_count = {} - last_msg_rate_t = {} - - # Write out a header row as we're outputting in CSV format. - fields = ['timestamp'] - offsets = {} - if istlog and format == 'csv': # we know our fields from the get-go - try: - currentOffset = 1 # Store how many fields in we are for each message. - for mtype in types: - try: - typeClass = "MAVLink_{0}_message".format(mtype.lower()) - fields += [mtype + '.' + x for x in inspect.getfullargspec(getattr(mavutil.mavlink, typeClass).__init__).args[1:]] - offsets[mtype] = currentOffset - currentOffset += len(fields) - except IndexError: - sys.exit(1) - except AttributeError: - print("Message type '%s' not found" % (mtype)) - sys.exit(1) - except TypeError: - print("You must specify a list of message types if outputting CSV format via the --types argument.") - sys.exit(1) - - # The first line output are names for all columns - print(csv_sep.join(fields)) - - if (isbin or islog) and format == 'csv': # need to accumulate columns from message - if types is None or len(types) != 1: - print("Need exactly one type when dumping CSV from bin file") - sys.exit(1) - - # Track types found - available_types = set() - - # for DF logs pre-calculate types list - match_types=None - if types is not None and hasattr(mlog, 'name_to_id'): - for k in mlog.name_to_id.keys(): - if match_type(k, types): - if nottypes is not None and match_type(k, nottypes): - continue - if match_types is None: - match_types = [] - match_types.append(k) - - if (isbin or islog) and format == 'csv': - # Make sure the specified type was found - if match_types is None: - print("Specified type '%s' not found in log file" % (types[0])) - sys.exit(1) - # we need FMT messages for column headings - match_types.append("FMT") - - last_loss = 0 - - # Keep track of data from the current timestep. If the following timestep has the same data, it's stored in here as well. Output should therefore have entirely unique timesteps. - MAT = {} # Dictionary to hold output data for 'mat' format option - while True: - m = mlog.recv_match(blocking=follow, type=match_types) - if m is None: - break - m_type = m.get_type() - available_types.add(m_type) - if (isbin or islog) and m_type == "FMT" and format == 'csv': - if m.Name == types[0]: - fields += m.Columns.split(',') - print(csv_sep.join(fields)) - - if reduce and reduce_msg(m_type, reduce, reduction_yes, reduction_no, reduction_count): - continue - - if reduce_rate > 0 and reduce_rate_msg(m, reduce_rate, last_msg_rate_t): - continue - - if output is not None: - if (isbin or islog) and m_type == "FMT": - output.write(m.get_msgbuf()) + ext = os.path.splitext(filename)[1] + isbin = ext in ['.bin', '.BIN', '.px4log'] + islog = ext in ['.log', '.LOG'] # NOTE: "islog" does not mean a tlog + istlog = ext in ['.tlog', '.TLOG'] + + reduction_yes = set() + reduction_no = set() + reduction_count = {} + last_msg_rate_t = {} + + # Write out a header row as we're outputting in CSV format. + fields = ['timestamp'] + offsets = {} + if istlog and format == 'csv': # we know our fields from the get-go + try: + currentOffset = 1 # Store how many fields in we are for each message. + for mtype in types: + try: + typeClass = "MAVLink_{0}_message".format(mtype.lower()) + fields += [mtype + '.' + x for x in inspect.getfullargspec(getattr(mavutil.mavlink, typeClass).__init__).args[1:]] + offsets[mtype] = currentOffset + currentOffset += len(fields) + except IndexError: + sys.exit(1) + except AttributeError: + print("Message type '%s' not found" % (mtype)) + sys.exit(1) + except TypeError: + print("You must specify a list of message types if outputting CSV format via the --types argument.") + sys.exit(1) + + # The first line output are names for all columns + output_fh.write(csv_sep.join(fields)) + + if (isbin or islog) and format == 'csv': # need to accumulate columns from message + if types is None or len(types) != 1: + print("Need exactly one type when dumping CSV from bin file") + sys.exit(1) + + # Track types found + available_types = set() + + # for DF logs pre-calculate types list + match_types=None + if types is not None and hasattr(mlog, 'name_to_id'): + for k in mlog.name_to_id.keys(): + if match_type(k, types): + if nottypes is not None and match_type(k, nottypes): + continue + if match_types is None: + match_types = [] + match_types.append(k) + + if (isbin or islog) and format == 'csv': + # Make sure the specified type was found + if match_types is None: + print("Specified type '%s' not found in log file" % (types[0])) + sys.exit(1) + # we need FMT messages for column headings + match_types.append("FMT") + + last_loss = 0 + + # Keep track of data from the current timestep. If the following timestep has the same data, it's stored in here as well. Output should therefore have entirely unique timesteps. + out_dict = {} # Dictionary to hold output data for 'mat' format options + count = 0 + while True: + m = mlog.recv_match(blocking=follow, type=match_types) + if m is None: + break + m_type = m.get_type() + available_types.add(m_type) + if (isbin or islog) and m_type == "FMT" and format == 'csv': + if m.Name == types[0]: + fields += m.Columns.split(',') + output_fh.write(csv_sep.join(fields)) + + if reduce and reduce_msg(m_type, reduce, reduction_yes, reduction_no, reduction_count): + continue + + if reduce_rate > 0 and reduce_rate_msg(m, reduce_rate, last_msg_rate_t): + continue + + if not mavutil.evaluate_condition(condition, mlog.messages) and ( + not (m_type in ['FMT', 'FMTU', 'MULT', 'PARM', 'MODE', 'UNIT', 'VER','CMD','MAVC','MSG','EV'] and meta)): continue - if (isbin or islog) and m_type in ["FMTU", "MULT", "UNIT"]: - output.write(m.get_msgbuf()) + if source_system is not None and source_system != m.get_srcSystem(): continue - if (isbin or islog) and (m_type == "PARM" and parms): - output.write(m.get_msgbuf()) + if source_component is not None and source_component != m.get_srcComponent(): continue - if m_type == 'PARAM_VALUE' and parms: - timestamp = getattr(m, '_timestamp', None) - output.write(struct.pack('>Q', int(timestamp*1.0e6)) + m.get_msgbuf()) + if link is not None and link != m._link: continue - if not mavutil.evaluate_condition(condition, mlog.messages) and ( - not (m_type in ['FMT', 'FMTU', 'MULT', 'PARM', 'MODE', 'UNIT', 'VER','CMD','MAVC','MSG','EV'] and meta)): - continue - if source_system is not None and source_system != m.get_srcSystem(): - continue - if source_component is not None and source_component != m.get_srcComponent(): - continue - if link is not None and link != m._link: - continue - - if types is not None and m_type != 'BAD_DATA' and not match_type(m_type, types): - continue - - if nottypes is not None and match_type(m_type, nottypes): - continue - - # Ignore BAD_DATA messages is the user requested or if they're because of a bad prefix. The - # latter case is normally because of a mismatched MAVLink version. - if m_type == 'BAD_DATA' and (no_bad_data is True or m.reason == "Bad prefix"): - continue - - # Grab the timestamp. - timestamp = getattr(m, '_timestamp', 0.0) - - # If we're just logging, pack in the timestamp and data into the output file. - if output: - if not (isbin or islog): - output.write(struct.pack('>Q', int(timestamp*1.0e6))) - try: - output.write(m.get_msgbuf()) - except Exception as ex: - print("Failed to write msg %s: %s" % (m_type, str(ex))) - - # If quiet is specified, don't display output to the terminal. - if quiet: - continue - - # Handle different output formats - if format == 'json': - handle_json_output(m, m_type, timestamp, show_source) - elif format == 'csv': - handle_csv_output(m, m_type, timestamp, csv_sep, fields, isbin, islog) - elif format == 'mat': - handle_mat_output(m, m_type, MAT) - elif show_types: - # do nothing - pass - elif verbose and istlog: - mavutil.dump_message_verbose(sys.stdout, m) - print("") - elif verbose and hasattr(m,"dump_verbose"): - m.dump_verbose(sys.stdout) - print("") - else: - handle_standard_output(m, timestamp, show_source, show_seq) - if show_loss: - if last_loss != mlog.mav_loss: - print("lost %d messages" % (mlog.mav_loss - last_loss)) - last_loss = mlog.mav_loss - - if format == 'mat': - # Export the .mat file - savemat(mat_file, MAT, do_compression=compress) - if show_types: - for msgType in available_types: - print(msgType) - - if profile: - yappi.get_func_stats().print_all() - yappi.get_thread_stats().print_all() + if types is not None and m_type != 'BAD_DATA' and not match_type(m_type, types): + continue + + if nottypes is not None and match_type(m_type, nottypes): + continue + + # Ignore BAD_DATA messages is the user requested or if they're because of a bad prefix. The + # latter case is normally because of a mismatched MAVLink version. + if m_type == 'BAD_DATA' and (no_bad_data is True or m.reason == "Bad prefix"): + continue + + # Grab the timestamp. + timestamp = getattr(m, '_timestamp', 0.0) + + # Handle different output formats + if format == 'json': + handle_json_output(m, m_type, timestamp, show_source, output_fh, count) + elif format == 'csv': + handle_csv_output(m, m_type, timestamp, csv_sep, fields, isbin, islog, output_fh) + elif format == 'mat': + handle_mat_output(m, m_type, out_dict) + elif format == 'pretty': + handle_pretty_output(m, istlog, output_fh) + elif show_types: + # do nothing + pass + else: + handle_standard_output(m, timestamp, show_source, show_seq, output_fh, isbin, islog, m_type, parms) + + if show_loss: + if last_loss != mlog.mav_loss: + print("lost %d messages" % (mlog.mav_loss - last_loss)) + last_loss = mlog.mav_loss + + count+=1 + + if format == 'mat': + # Export the .mat file + savemat(output_fh, out_dict, do_compression=compress) + elif format == 'json': + output_fh.write('\n]\n') + if show_types: + for msgType in available_types: + output_fh.write(msgType) + + if profile: + yappi.get_func_stats().print_all() + yappi.get_thread_stats().print_all() + + mlog.close() if __name__=="__main__": args = parse_args() From d50b40f356eab500334c820880123f900d043b32 Mon Sep 17 00:00:00 2001 From: --replace-all Date: Wed, 27 Aug 2025 14:52:39 +0000 Subject: [PATCH 13/14] Replace cli tests with pythonic variants; add pretty format tests; simplify UX --- tests/test_mavlogdump.py | 214 +++++++++++++++++++++++++++++---------- tools/mavlogdump.py | 14 ++- 2 files changed, 169 insertions(+), 59 deletions(-) diff --git a/tests/test_mavlogdump.py b/tests/test_mavlogdump.py index 77301add4..2ee5b82fb 100755 --- a/tests/test_mavlogdump.py +++ b/tests/test_mavlogdump.py @@ -27,9 +27,7 @@ class MAVLogDumpTest(unittest.TestCase): def setUp(self): """Set up test fixtures""" self.test_dir = tempfile.mkdtemp() - self.test_filename = "test.BIN" - # Get the path to mavlogdump.py relative to this test file │ │ - self.mavlogdump_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "tools", "mavlogdump.py") + self.test_filename = "test.BIN" try: self.test_filepath = pkg_resources.resource_filename(__name__, self.test_filename) except: @@ -78,7 +76,7 @@ def test_dump_json_format(self): first_item = data[0] self.assertIn('meta', first_item, "JSON output should have 'meta' field") self.assertIn('data', first_item, "JSON output should have 'data' field") - except json.JSONDecodeError as e: + except json.JSONDecodeError: pass # File might be empty or invalid, which is OK for test files def test_dump_json_with_show_source(self): @@ -155,113 +153,227 @@ def test_dump_csv_with_custom_separator(self): def test_dump_mat_format(self): """Test MAT format output""" - mat_file = os.path.join(self.test_dir, "output.mat") - cmd = f"{self.mavlogdump_path} --format mat --mat_file {mat_file} {self.test_filepath} 2>/dev/null" - result = os.system(cmd) + output_path = os.path.join(self.test_dir, "output.mat") - # MAT format requires scipy, which might not be installed - if result == 0: - self.assertTrue(os.path.exists(mat_file), "MAT file should be created") + try: + mavlogdump.dump_log( + format='mat', + output_path=output_path, + log=self.test_filepath, + ) + # MAT format requires scipy, which might not be installed + self.assertTrue(os.path.exists(output_path), "MAT file should be created") + except ImportError: + # MAT format requires scipy, which might not be installed + self.skipTest('Missing import') def test_dump_mat_with_compression(self): """Test MAT format with compression""" - mat_file = os.path.join(self.test_dir, "output_compressed.mat") - cmd = f"{self.mavlogdump_path} --format mat --mat_file {mat_file} --compress {self.test_filepath} 2>/dev/null" - result = os.system(cmd) + output_path = os.path.join(self.test_dir, "output_compressed.mat") - # MAT format requires scipy, which might not be installed - if result == 0: - self.assertTrue(os.path.exists(mat_file), "Compressed MAT file should be created") + try: + mavlogdump.dump_log( + format='mat', + output_path=output_path, + compress=True, + log=self.test_filepath, + ) + # MAT format requires scipy, which might not be installed + self.assertTrue(os.path.exists(output_path), "Compressed MAT file should be created") + except ImportError: + # MAT format requires scipy, which might not be installed + self.skipTest('Missing import') def test_type_filtering(self): """Test message type filtering""" output_file = os.path.join(self.test_dir, "filtered_output.txt") - cmd = f"{self.mavlogdump_path} --types 'ATT,GPS' {self.test_filepath} > {output_file} 2>/dev/null" - result = os.system(cmd) - self.assertEqual(result, 0, "Type filtering should succeed") + mavlogdump.dump_log( + output_path=output_file, + format='standard', + types='ATT,GPS', + log=self.test_filepath, + ) + self.assertTrue(os.path.exists(output_file), "Filtered output file should be created") def test_nottype_filtering(self): """Test message type exclusion""" output_file = os.path.join(self.test_dir, "excluded_output.txt") - cmd = f"{self.mavlogdump_path} --nottypes 'BAD_DATA' {self.test_filepath} > {output_file} 2>/dev/null" - result = os.system(cmd) - self.assertEqual(result, 0, "Type exclusion should succeed") + mavlogdump.dump_log( + output_path=output_file, + format='standard', + nottypes='BAD_DATA', + log=self.test_filepath, + ) + self.assertTrue(os.path.exists(output_file), "Excluded output file should be created") def test_output_to_file(self): """Test output to file option""" output_file = os.path.join(self.test_dir, "direct_output.bin") - cmd = f"{self.mavlogdump_path} --output {output_file} {self.test_filepath} 2>/dev/null" - result = os.system(cmd) - self.assertEqual(result, 0, "Output to file should succeed") + mavlogdump.dump_log( + output_path=output_file, + format='standard', + log=self.test_filepath, + ) + self.assertTrue(os.path.exists(output_file), "Direct output file should be created") def test_show_types(self): """Test show-types option""" output_file = os.path.join(self.test_dir, "types_output.txt") - cmd = f"{self.mavlogdump_path} --show-types {self.test_filepath} > {output_file} 2>/dev/null" - result = os.system(cmd) - self.assertEqual(result, 0, "Show types should succeed") + + mavlogdump.dump_log( + output_path=output_file, + format='types-only', + log=self.test_filepath, + ) + self.assertTrue(os.path.exists(output_file), "Types output file should be created") def test_reduce_option(self): """Test message reduction by ratio""" output_file = os.path.join(self.test_dir, "reduced_output.txt") - cmd = f"{self.mavlogdump_path} --reduce 10 {self.test_filepath} > {output_file} 2>/dev/null" - result = os.system(cmd) - self.assertEqual(result, 0, "Reduce option should succeed") + mavlogdump.dump_log( + output_path=output_file, + format='standard', + reduce=10, + log=self.test_filepath, + ) + self.assertTrue(os.path.exists(output_file), "Reduced output file should be created") def test_reduce_rate_option(self): """Test message reduction by rate""" output_file = os.path.join(self.test_dir, "rate_reduced_output.txt") - cmd = f"{self.mavlogdump_path} --reduce-rate 10 {self.test_filepath} > {output_file} 2>/dev/null" - result = os.system(cmd) - self.assertEqual(result, 0, "Reduce-rate option should succeed") + mavlogdump.dump_log( + output_path=output_file, + format='standard', + reduce_rate=10, + log=self.test_filepath, + ) + self.assertTrue(os.path.exists(output_file), "Rate reduced output file should be created") def test_condition_filtering(self): """Test condition-based filtering""" output_file = os.path.join(self.test_dir, "condition_output.txt") - # Simple condition that should be valid - cmd = f"{self.mavlogdump_path} --condition 'True' {self.test_filepath} > {output_file} 2>/dev/null" - result = os.system(cmd) - self.assertEqual(result, 0, "Condition filtering should succeed") + mavlogdump.dump_log( + output_path=output_file, + format='standard', + condition='True', + log=self.test_filepath, + ) + self.assertTrue(os.path.exists(output_file), "Condition filtered output file should be created") def test_mav10_option(self): """Test MAVLink 1.0 parsing""" output_file = os.path.join(self.test_dir, "mav10_output.txt") - cmd = f"{self.mavlogdump_path} --mav10 {self.test_filepath} > {output_file} 2>/dev/null" - result = os.system(cmd) - self.assertEqual(result, 0, "MAV1.0 parsing should succeed") + mavlogdump.dump_log( + output_path=output_file, + format='standard', + mav10=True, + log=self.test_filepath, + ) + self.assertTrue(os.path.exists(output_file), "MAV1.0 output file should be created") def test_source_filtering(self): """Test source system and component filtering""" output_file = os.path.join(self.test_dir, "source_filtered.txt") - cmd = f"{self.mavlogdump_path} --source-system 1 --source-component 1 {self.test_filepath} > {output_file} 2>/dev/null" - result = os.system(cmd) - self.assertEqual(result, 0, "Source filtering should succeed") + mavlogdump.dump_log( + output_path=output_file, + format='standard', + source_system=1, + source_component=1, + log=self.test_filepath, + ) + self.assertTrue(os.path.exists(output_file), "Source filtered output file should be created") + def test_dump_pretty_format(self): + """Test pretty format output""" + output_file = os.path.join(self.test_dir, "pretty_output.txt") + + mavlogdump.dump_log( + output_path=output_file, + format='pretty', + log=self.test_filepath, + ) + + self.assertTrue(os.path.exists(output_file), "Pretty output file should be created") + + # Verify pretty format produces verbose output if file has content + if os.path.getsize(output_file) > 0: + with open(output_file, 'r') as f: + content = f.read() + # Pretty format should produce multi-line verbose output for each message + # Check that it's not just a single-line format + if content.strip(): + lines = content.strip().split('\n') + # Pretty format typically produces multiple lines per message + self.assertGreater(len(lines), 0, "Pretty format should produce output") + + def test_dump_pretty_with_types(self): + """Test pretty format with type filtering""" + output_file = os.path.join(self.test_dir, "pretty_filtered.txt") + + mavlogdump.dump_log( + output_path=output_file, + format='pretty', + types='ATT,GPS', + log=self.test_filepath, + ) + + self.assertTrue(os.path.exists(output_file), "Pretty filtered output file should be created") + + def test_pretty_with_conditions(self): + """Test pretty format with conditions""" + output_file = os.path.join(self.test_dir, "pretty_condition.txt") + + mavlogdump.dump_log( + output_path=output_file, + format='pretty', + condition='True', + log=self.test_filepath, + ) + + self.assertTrue(os.path.exists(output_file), "Pretty condition output file should be created") + + def test_pretty_with_reduce(self): + """Test pretty format with reduce option""" + output_file = os.path.join(self.test_dir, "pretty_reduced.txt") + + mavlogdump.dump_log( + output_path=output_file, + format='pretty', + reduce=5, + log=self.test_filepath, + ) + + self.assertTrue(os.path.exists(output_file), "Pretty reduced output file should be created") + def test_combined_options(self): """Test combination of multiple options""" output_file = os.path.join(self.test_dir, "combined_output.json") - cmd = (f"mavlogdump.py --format json --types 'ATT,GPS' " - f"--no-bad-data {self.test_filepath} > {output_file} 2>/dev/null") - result = os.system(cmd) - self.assertEqual(result, 0, "Combined options should succeed") + mavlogdump.dump_log( + output_path=output_file, + format='json', + types='ATT,GPS', + no_bad_data=True, + log=self.test_filepath, + ) + self.assertTrue(os.path.exists(output_file), "Combined output file should be created") def test_programmatic_json_processing(self): @@ -292,15 +404,15 @@ def test_programmatic_mat_processing(self): """Test programmatic MAT processing""" if hasattr(mavlogdump, 'process_log'): # Test programmatic interface - mat_file = os.path.join(self.test_dir, "prog_output.mat") + output_path = os.path.join(self.test_dir, "prog_output.mat") result = mavlogdump.process_log( self.test_filepath, output_format='mat', - mat_file=mat_file, + output_path=output_path, ) # MAT processing might fail if scipy is not installed if result == 0: - self.assertTrue(os.path.exists(mat_file), "Programmatic MAT file should be created") + self.assertTrue(os.path.exists(output_path), "Programmatic MAT file should be created") class MAVLogDumpUnitTest(unittest.TestCase): diff --git a/tools/mavlogdump.py b/tools/mavlogdump.py index 0186a8a9d..00ee18b0b 100755 --- a/tools/mavlogdump.py +++ b/tools/mavlogdump.py @@ -153,10 +153,10 @@ def handle_standard_output(m, timestamp, show_source, show_seq, output_fh, isbin def handle_pretty_output(m, istlog, output_fh): if istlog: mavutil.dump_message_verbose(output_fh, m) - print("") + output_fh.write('') elif hasattr(m,"dump_verbose"): m.dump_verbose(output_fh) - print("") + output_fh.write('') def parse_args(): parser = ArgumentParser(description=__doc__) @@ -168,7 +168,7 @@ def parse_args(): parser.add_argument("--condition", default=None, help="select packets by condition") parser.add_argument("-o", "--output_path", default=None, help="Output file path; if left undefined, writes to stdout.") parser.add_argument("-p", "--parms", action='store_true', help="preserve parameters in output with -o") - parser.add_argument("--format", default='standard', help="Change the output format between 'standard', 'json', 'csv', 'mat', and 'pretty'. For the CSV output, you must supply types that you want. For MAT output, specify output file with -o") + parser.add_argument("--format", default='standard', help="Change the output format between 'standard', 'json', 'csv', 'mat', 'types-only', and 'pretty'. For the CSV output, you must supply types that you want. For MAT output, specify output file with -o") parser.add_argument("--csv_sep", dest="csv_sep", default=",", help="Select the delimiter between columns for the output CSV file. Use 'tab' to specify tabs. Only applies when --format=csv") parser.add_argument("--types", default=None, help="types of messages (comma separated with wildcard)") parser.add_argument("--nottypes", default=None, help="types of messages not to include (comma separated with wildcard)") @@ -178,7 +178,6 @@ def parse_args(): parser.add_argument("--no-bad-data", action='store_true', help="Don't output corrupted messages") parser.add_argument("--show-source", action='store_true', help="Show source system ID and component ID") parser.add_argument("--show-seq", action='store_true', help="Show sequence numbers") - parser.add_argument("--show-types", action='store_true', help="Shows all message types available on opened log") parser.add_argument("--show-loss", action='store_true', help="Shows changes in lost messages") parser.add_argument("--source-system", type=int, default=None, help="filter by source system ID") parser.add_argument("--source-component", type=int, default=None, help="filter by source component ID") @@ -210,7 +209,6 @@ def dump_log( no_bad_data: bool = False, show_source: bool = False, show_seq: bool = False, - show_types: bool = False, show_loss: bool = False, source_system: int = None, source_component: int = None, @@ -380,7 +378,7 @@ def dump_log( handle_mat_output(m, m_type, out_dict) elif format == 'pretty': handle_pretty_output(m, istlog, output_fh) - elif show_types: + elif format == 'types-only': # do nothing pass else: @@ -395,10 +393,10 @@ def dump_log( if format == 'mat': # Export the .mat file - savemat(output_fh, out_dict, do_compression=compress) + savemat(output_path, out_dict, do_compression=compress) elif format == 'json': output_fh.write('\n]\n') - if show_types: + elif format == 'types-only': for msgType in available_types: output_fh.write(msgType) From d6c408f77db62555db02f990431d531879767b17 Mon Sep 17 00:00:00 2001 From: --replace-all Date: Mon, 1 Sep 2025 18:06:26 +0000 Subject: [PATCH 14/14] Make JSON output compliant (nan -> null); add ndjson support for back-compatibility and streaming use-cases --- tools/mavlogdump.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/tools/mavlogdump.py b/tools/mavlogdump.py index 00ee18b0b..5f1d977a7 100755 --- a/tools/mavlogdump.py +++ b/tools/mavlogdump.py @@ -14,6 +14,7 @@ import sys import time import inspect +import math from argparse import ArgumentParser from pymavlink.DFReader import to_string from pymavlink import mavutil @@ -62,7 +63,7 @@ def match_type(mtype, patterns): return True return False -def handle_json_output(m, m_type, timestamp, show_source, output_fh, count): +def handle_json_output(m, m_type, timestamp, show_source, output_fh, count, newline_delim=False): '''Handle JSON format output''' # Format our message as a Python dict, which gets us almost to proper JSON format data = m.to_dict() @@ -83,17 +84,23 @@ def handle_json_output(m, m_type, timestamp, show_source, output_fh, count): # convert any array.array (e.g. packed-16-bit fft readings) into lists: for key in data.keys(): - if type(data[key]) == array.array: + if type(data[key]) == float: + if math.isnan(data[key]): + data[key] = None + elif type(data[key]) == array.array: data[key] = list(data[key]) - # convert any byte-strings into utf-8 strings. Don't die trying. - for key in data.keys(): - if type(data[key]) == bytes: - data[key] = to_string(data[key]) + # convert any byte-strings into utf-8 strings. Don't die trying. + elif type(data[key]) == bytes: + str_repr = to_string(data[key]) + data[key] = str_repr outMsg = {"meta": meta, "data": data} # TODO: add file write # Now write this object stringified properly. - output_fh.write(f'{"," if count>0 else ""}\n\t{json.dumps(outMsg)}') + if newline_delim: + output_fh.write(f'{json.dumps(outMsg)}\n') + else: + output_fh.write(f'{"," if count>0 else ""}\n\t{json.dumps(outMsg)}') def handle_csv_output(m, m_type, timestamp, csv_sep, fields, isbin, islog, output_fh): @@ -168,7 +175,7 @@ def parse_args(): parser.add_argument("--condition", default=None, help="select packets by condition") parser.add_argument("-o", "--output_path", default=None, help="Output file path; if left undefined, writes to stdout.") parser.add_argument("-p", "--parms", action='store_true', help="preserve parameters in output with -o") - parser.add_argument("--format", default='standard', help="Change the output format between 'standard', 'json', 'csv', 'mat', 'types-only', and 'pretty'. For the CSV output, you must supply types that you want. For MAT output, specify output file with -o") + parser.add_argument("--format", default='standard', help="Change the output format between 'standard', 'json', 'ndjson', 'csv', 'mat', 'types-only', and 'pretty'. For the CSV output, you must supply types that you want. For MAT output, specify output file with -o") parser.add_argument("--csv_sep", dest="csv_sep", default=",", help="Select the delimiter between columns for the output CSV file. Use 'tab' to specify tabs. Only applies when --format=csv") parser.add_argument("--types", default=None, help="types of messages (comma separated with wildcard)") parser.add_argument("--nottypes", default=None, help="types of messages not to include (comma separated with wildcard)") @@ -372,6 +379,8 @@ def dump_log( # Handle different output formats if format == 'json': handle_json_output(m, m_type, timestamp, show_source, output_fh, count) + elif format == 'ndjson': + handle_json_output(m, m_type, timestamp, show_source, output_fh, count, newline_delim=True) elif format == 'csv': handle_csv_output(m, m_type, timestamp, csv_sep, fields, isbin, islog, output_fh) elif format == 'mat':