From df25bbabccd283cefa5ef0e4aea805aec4e9ad29 Mon Sep 17 00:00:00 2001 From: Bob Long Date: Tue, 20 May 2025 18:15:49 +1000 Subject: [PATCH 1/7] Add dfreader_pandas --- .gitignore | 1 + dfreader_pandas.py | 211 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 212 insertions(+) create mode 100644 dfreader_pandas.py diff --git a/.gitignore b/.gitignore index 880635693..edd01ce04 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,4 @@ include/mavlink/v1.0 dfindexer/dfindexer_cy.c dfindexer/*.so dfindexer/*.pyd +.dfreader_pandas_cache diff --git a/dfreader_pandas.py b/dfreader_pandas.py new file mode 100644 index 000000000..969964a1c --- /dev/null +++ b/dfreader_pandas.py @@ -0,0 +1,211 @@ +""" +Module to parse ArduPilot logs into pandas DataFrames with optional caching. + +- Accepts a log file, list of messages/fields, and resample frequency. +- Aligns output rows to clean time intervals relative to the start of the log. +- Uses a lightweight caching mechanism keyed by file content and module hash. +- Automatically invalidates cache on module updates or parameter changes. + +Intended for efficient, repeatable log analysis workflows. +""" + +import os +import json +import hashlib +import pandas as pd +from pymavlink.DFReader import DFReader_binary + +HASH_SIZE_BYTES = 1024 * 1024 # 1MB + + +def parse_log_to_df(path, specs, frequency, cache_dir=None): + manager = LogCacheManager(cache_dir) + + if manager: + df = manager.try_load_dataframe(path, specs, frequency) + if df is not None: + return df + + reader = DFReader_binary(path) + fields = expand_field_specs(specs, reader) + # Dump the messages dict, so we get NaNs until the first valid message of each type + reader.rewind() + + PERIOD = 1 / frequency + first_timestamp = None + next_emit_time = None + + rows = [] + while True: + msg = reader.recv_match(type=fields.keys()) + if msg is None: + break + if not first_timestamp: + first_timestamp = reader.clock.timestamp + next_emit_time = first_timestamp + if reader.clock.timestamp >= next_emit_time: + n_period = (reader.clock.timestamp - first_timestamp) // PERIOD + n_period += 1 + next_emit_time = first_timestamp + n_period * PERIOD + rows.append(new_row(reader, fields)) + + df = pd.DataFrame(rows) + df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s") + df.set_index("timestamp", inplace=True) + df = df[[f"{m}.{f}" for m in fields.keys() for f in fields[m]]] + + if manager: + manager.save_dataframe(path, df, specs, frequency) + + return df + + +def new_row(reader: DFReader_binary, fields): + row = {"timestamp": reader.clock.timestamp} + for msg in fields.keys(): + if msg not in reader.messages: + continue + m = reader.messages[msg] + for field in fields[msg]: + row[f"{msg}.{field}"] = getattr(m, field, None) + return row + + +def expand_field_specs(specs, reader: DFReader_binary): + out = {} + for spec in specs: + if "." in spec: + msg, field = spec.split(".") + if msg not in reader.name_to_id: + raise ValueError(f"Message {msg} not found in log file") + fmt = reader.formats[reader.name_to_id[msg]] + if field not in fmt.columns: + raise ValueError(f"Field {field} not found in message {msg}") + out.setdefault(msg, []).append(field) + else: + msg = spec + if msg not in reader.name_to_id: + raise ValueError(f"Message {msg} not found in log file") + fmt = reader.formats[reader.name_to_id[msg]] + out.setdefault(msg, []).extend(fmt.columns) + return out + + +class LogCacheManager: + def __init__(self, cache_dir): + self.cache_dir = cache_dir + if cache_dir is not None: + os.makedirs(cache_dir, exist_ok=True) + + def __bool__(self): + return self.cache_dir is not None + + def _compute_key(self, path): + stat = os.stat(path) + size = stat.st_size + with open(path, "rb") as f: + data = f.read(HASH_SIZE_BYTES) + h = hashlib.sha256(data).hexdigest() + h = h[:16] # 16 characters is plenty to prevent collisions + return f"{h}_{size}" + + def _module_hash(self): + with open(__file__, "rb") as f: + return hashlib.sha256(f.read()).hexdigest()[:8] + + @staticmethod + def _specs_equal(a, b): + return set(a) == set(b) + + def try_load_dataframe(self, path, specs, frequency): + key = self._compute_key(path) + cache_path = os.path.join(self.cache_dir, f"{key}.feather") + meta_path = os.path.join(self.cache_dir, f"{key}.meta.json") + + if os.path.exists(cache_path) and os.path.exists(meta_path): + with open(meta_path, "r") as f: + meta = json.load(f) + if ( + self._specs_equal(meta.get("specs", []), specs) + and meta.get("frequency") == frequency + and meta.get("module_hash") == self._module_hash() + ): + return pd.read_feather(cache_path) + return None + + def save_dataframe(self, path, df, specs, frequency): + key = self._compute_key(path) + cache_path = os.path.join(self.cache_dir, f"{key}.feather") + meta_path = os.path.join(self.cache_dir, f"{key}.meta.json") + + df.reset_index(drop=True).to_feather(cache_path) + meta = { + "specs": specs, + "frequency": frequency, + "module_hash": self._module_hash(), + } + with open(meta_path, "w") as f: + json.dump(meta, f) + + +def main(): + import argparse + + parser = argparse.ArgumentParser( + description="Parse a log file to a DataFrame.", + usage="python -m pymavlink.dfreader_pandas [options]", + epilog="Example usage: python -m pymavlink.dfreader_pandas log.bin --fields TECS EFI CTUN.E2T --frequency 10.0", + ) + parser.add_argument("path", type=str, help="Path to the log file") + parser.add_argument( + "--fields", + type=str, + nargs="+", + default=[], + help="Space-separated list of message types and fields to include in the DataFrame", + ) + parser.add_argument( + "--frequency", + type=float, + default=10.0, + help="Frequency in Hz for sampling the log file", + ) + parser.add_argument( + "--cache_dir", + type=str, + default=None, + help="Directory to cache the DataFrame", + ) + parser.add_argument( + "--profile", + action="store_true", + help="Enable profiling of the parsing process", + ) + args = parser.parse_args() + + if args.profile: + from line_profiler import LineProfiler + + profiler = LineProfiler() + profiler.add_function(parse_log_to_df) + profiler.add_function(new_row) + profiler.enable_by_count() + + if args.fields is None or len(args.fields) == 0: + raise ValueError( + "No fields provided. Use --fields to specify message types and/or fields." + ) + + df = parse_log_to_df( + args.path, args.fields, args.frequency, args.cache_dir + ) + print(df.head()) + print("...") + print(df.tail()) + + if args.profile: + profiler.print_stats() + + +if __name__ == "__main__": + main() From 82a9e07760581c581528cbfd3c46bb71340f944f Mon Sep 17 00:00:00 2001 From: Bob Long Date: Tue, 20 May 2025 18:29:18 +1000 Subject: [PATCH 2/7] dfreader_pandas: add instance filtering --- dfreader_pandas.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/dfreader_pandas.py b/dfreader_pandas.py index 969964a1c..a26566e44 100644 --- a/dfreader_pandas.py +++ b/dfreader_pandas.py @@ -10,6 +10,7 @@ """ import os +import re import json import hashlib import pandas as pd @@ -36,8 +37,10 @@ def parse_log_to_df(path, specs, frequency, cache_dir=None): next_emit_time = None rows = [] + # Make a list of base (non-instanced) message types + filter_types = [re.sub(r"\[\d+\]", "", m) for m in fields.keys()] while True: - msg = reader.recv_match(type=fields.keys()) + msg = reader.recv_match(type=filter_types) if msg is None: break if not first_timestamp: @@ -74,19 +77,20 @@ def new_row(reader: DFReader_binary, fields): def expand_field_specs(specs, reader: DFReader_binary): out = {} for spec in specs: - if "." in spec: - msg, field = spec.split(".") - if msg not in reader.name_to_id: - raise ValueError(f"Message {msg} not found in log file") - fmt = reader.formats[reader.name_to_id[msg]] + msg, field = spec.split(".") if "." in spec else (spec, None) + msg_base = re.sub(r"\[\d+\]", "", msg) + if msg_base not in reader.name_to_id: + raise ValueError(f"Message {msg_base} not found in log file") + fmt = reader.formats[reader.name_to_id[msg_base]] + if msg_base != msg and fmt.instance_field is None: + raise ValueError( + f"Message {msg_base} does not support instances, but {msg} was requested" + ) + if field is not None: if field not in fmt.columns: - raise ValueError(f"Field {field} not found in message {msg}") + raise ValueError(f"Field {field} not found in message {msg_base}") out.setdefault(msg, []).append(field) else: - msg = spec - if msg not in reader.name_to_id: - raise ValueError(f"Message {msg} not found in log file") - fmt = reader.formats[reader.name_to_id[msg]] out.setdefault(msg, []).extend(fmt.columns) return out @@ -154,7 +158,7 @@ def main(): parser = argparse.ArgumentParser( description="Parse a log file to a DataFrame.", usage="python -m pymavlink.dfreader_pandas [options]", - epilog="Example usage: python -m pymavlink.dfreader_pandas log.bin --fields TECS EFI CTUN.E2T --frequency 10.0", + epilog="Example usage: python -m pymavlink.dfreader_pandas log.bin --fields TECS EFI CTUN.E2T BAT[0].Volt --frequency 10.0 --cache_dir ./.tmp", ) parser.add_argument("path", type=str, help="Path to the log file") parser.add_argument( From ec74d82a01c585880639ce596e98c8d1abc680f3 Mon Sep 17 00:00:00 2001 From: Bob Long Date: Wed, 21 May 2025 09:51:28 +1000 Subject: [PATCH 3/7] dfreader_pandas: clean up LogCacheManager --- dfreader_pandas.py | 123 +++++++++++++++++++++++++++++++-------------- 1 file changed, 86 insertions(+), 37 deletions(-) diff --git a/dfreader_pandas.py b/dfreader_pandas.py index a26566e44..b4fcfea06 100644 --- a/dfreader_pandas.py +++ b/dfreader_pandas.py @@ -21,11 +21,9 @@ def parse_log_to_df(path, specs, frequency, cache_dir=None): manager = LogCacheManager(cache_dir) - - if manager: - df = manager.try_load_dataframe(path, specs, frequency) - if df is not None: - return df + df = manager.try_load_dataframe(path, specs, frequency) + if df is not None: + return df reader = DFReader_binary(path) fields = expand_field_specs(specs, reader) @@ -57,8 +55,7 @@ def parse_log_to_df(path, specs, frequency, cache_dir=None): df.set_index("timestamp", inplace=True) df = df[[f"{m}.{f}" for m in fields.keys() for f in fields[m]]] - if manager: - manager.save_dataframe(path, df, specs, frequency) + manager.save_dataframe(df) return df @@ -96,15 +93,24 @@ def expand_field_specs(specs, reader: DFReader_binary): class LogCacheManager: + """Cache manager for log files.""" + + _module_hash = None + def __init__(self, cache_dir): self.cache_dir = cache_dir - if cache_dir is not None: - os.makedirs(cache_dir, exist_ok=True) + if cache_dir is None: + return + os.makedirs(cache_dir, exist_ok=True) + if self._module_hash is None: + with open(__file__, "rb") as f: + self._module_hash = hashlib.sha256(f.read()).hexdigest()[:8] - def __bool__(self): - return self.cache_dir is not None - - def _compute_key(self, path): + @staticmethod + def _compute_key(path): + """Compute a unique key for a log file based on content and size.""" + if not os.path.exists(path): + raise FileNotFoundError(f"File {path} does not exist") stat = os.stat(path) size = stat.st_size with open(path, "rb") as f: @@ -113,43 +119,86 @@ def _compute_key(self, path): h = h[:16] # 16 characters is plenty to prevent collisions return f"{h}_{size}" - def _module_hash(self): - with open(__file__, "rb") as f: - return hashlib.sha256(f.read()).hexdigest()[:8] - @staticmethod def _specs_equal(a, b): + """Compare two lists of messages/fields for equality.""" return set(a) == set(b) + def _cache_path(self): + """Compute the cache path for a given key.""" + if self.cache_dir is None: + return None + return os.path.join(self.cache_dir, f"{self.key}.feather") + + def _meta_path(self): + """Compute the metadata path for a given key.""" + if self.cache_dir is None: + return None + return os.path.join(self.cache_dir, f"{self.key}.meta.json") + + @staticmethod + def _metadata(specs, frequency): + """Generate metadata for the cache file.""" + return { + "specs": specs, + "frequency": frequency, + "module_hash": LogCacheManager._module_hash, + } + + @staticmethod + def _compare_metadata(meta1, meta2): + """Compare metadata for equality.""" + if meta1 is None or meta2 is None: + return False + if meta1.keys() != meta2.keys(): + return False + for k in meta1.keys(): + if meta1[k] != meta2[k]: + return False + return True + def try_load_dataframe(self, path, specs, frequency): - key = self._compute_key(path) - cache_path = os.path.join(self.cache_dir, f"{key}.feather") - meta_path = os.path.join(self.cache_dir, f"{key}.meta.json") + """Try to load a cached DataFrame for this log file. + + If the cache file does not exist yet, we store the information needed + to save the cache file later. + """ + if self.cache_dir is None or not os.path.exists(self.cache_dir): + return None + self.path = path + self.key = self._compute_key(path) + self.meta = self._metadata(specs, frequency) + cache_path = self._cache_path() + meta_path = self._meta_path() if os.path.exists(cache_path) and os.path.exists(meta_path): with open(meta_path, "r") as f: meta = json.load(f) - if ( - self._specs_equal(meta.get("specs", []), specs) - and meta.get("frequency") == frequency - and meta.get("module_hash") == self._module_hash() - ): + if self._compare_metadata(meta, self.meta): return pd.read_feather(cache_path) return None - def save_dataframe(self, path, df, specs, frequency): - key = self._compute_key(path) - cache_path = os.path.join(self.cache_dir, f"{key}.feather") - meta_path = os.path.join(self.cache_dir, f"{key}.meta.json") + def save_dataframe(self, df): + """Save the DataFrame to a cache file.""" + if self.cache_dir is None or not os.path.exists(self.cache_dir): + return + key = self._compute_key(self.path) + if self.key != key: + print( + f"Warning: cache key {self.key} does not match computed key {key}. " + "This suggests the log file has changed since it was opened. " + "This dataframe will not be saved to the cache." + ) + return - df.reset_index(drop=True).to_feather(cache_path) - meta = { - "specs": specs, - "frequency": frequency, - "module_hash": self._module_hash(), - } - with open(meta_path, "w") as f: - json.dump(meta, f) + df.to_feather(self._cache_path()) + with open(self._meta_path(), "w") as f: + json.dump(self.meta, f) + + # Force the user to call try_load_dataframe() again before saving again + self.key = None + self.path = None + self.meta = None def main(): From f52ac06ec5f28b5931507d7b0cef70eabeb102f0 Mon Sep 17 00:00:00 2001 From: Bob Long Date: Wed, 21 May 2025 09:52:23 +1000 Subject: [PATCH 4/7] dfreader_pandas: move to tools --- dfreader_pandas.py => tools/dfreader_pandas.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename dfreader_pandas.py => tools/dfreader_pandas.py (100%) diff --git a/dfreader_pandas.py b/tools/dfreader_pandas.py similarity index 100% rename from dfreader_pandas.py rename to tools/dfreader_pandas.py From 9ce12803dfea8e90d142fb00f610ae40d55bd241 Mon Sep 17 00:00:00 2001 From: Bob Long Date: Wed, 21 May 2025 19:18:17 +1000 Subject: [PATCH 5/7] dfreader_pandas: add param loading --- tools/dfreader_pandas.py | 256 ++++++++++++++++++++++++++++++--------- 1 file changed, 198 insertions(+), 58 deletions(-) diff --git a/tools/dfreader_pandas.py b/tools/dfreader_pandas.py index b4fcfea06..921186e38 100644 --- a/tools/dfreader_pandas.py +++ b/tools/dfreader_pandas.py @@ -2,6 +2,7 @@ Module to parse ArduPilot logs into pandas DataFrames with optional caching. - Accepts a log file, list of messages/fields, and resample frequency. +- Extracts both flight data and parameter changes as separate DataFrames. - Aligns output rows to clean time intervals relative to the start of the log. - Uses a lightweight caching mechanism keyed by file content and module hash. - Automatically invalidates cache on module updates or parameter changes. @@ -19,15 +20,51 @@ HASH_SIZE_BYTES = 1024 * 1024 # 1MB -def parse_log_to_df(path, specs, frequency, cache_dir=None): +def load_log(path, fields, frequency, cache_dir=None): + """Load data and parameters from a log file.""" manager = LogCacheManager(cache_dir) - df = manager.try_load_dataframe(path, specs, frequency) - if df is not None: - return df + manager.try_load(path, fields, frequency) + + if manager.data is not None and manager.params is not None: + return manager.data, manager.params + + reader = DFReader_binary(path) + if manager.data is None: + manager.data = parse_log_data(reader, fields, frequency) + if manager.params is None: + manager.params = parse_log_params(reader) + manager.save() + return manager.data, manager.params + +def load_log_params(path, cache_dir=None): + """Load only parameters from a log file.""" + manager = LogCacheManager(cache_dir) + manager.try_load(path) + + if manager.params is not None: + return manager.params reader = DFReader_binary(path) - fields = expand_field_specs(specs, reader) - # Dump the messages dict, so we get NaNs until the first valid message of each type + manager.params = parse_log_params(reader) + manager.save() + return manager.params + +def load_log_data(path, fields, frequency, cache_dir=None): + """Load only data from a log file.""" + manager = LogCacheManager(cache_dir) + manager.try_load(path, fields, frequency) + + if manager.data is not None: + return manager.data + + reader = DFReader_binary(path) + manager.data = parse_log_data(reader, fields, frequency) + manager.save() + return manager.data + +def parse_log_data(reader: DFReader_binary, fields, frequency): + fields = expand_field_specs(fields, reader) + # Ensures missing data is NaN until first message of each type is received reader.rewind() PERIOD = 1 / frequency @@ -55,7 +92,51 @@ def parse_log_to_df(path, specs, frequency, cache_dir=None): df.set_index("timestamp", inplace=True) df = df[[f"{m}.{f}" for m in fields.keys() for f in fields[m]]] - manager.save_dataframe(df) + return df + + +def parse_log_params(reader): + """Parse parameters from the log file.""" + param_dict = {} + reader.rewind() + + while True: + msg = reader.recv_match(type="PARM") + if msg is None: + break + name = msg.Name + ts = msg._timestamp + val = msg.Value + default = msg.Default + + entry = param_dict.setdefault(name, []) + + # A NaN default means "no change" + if pd.isna(default) and len(entry) > 0: + default = entry[-1][2] + + # We force the first timestamp to be time-of-boot. This allows users + # to easily use asof() on the DataFrame later (since they don't have to + # worry about using too early of a timestamp). + if not entry: + ts = reader.clock.timebase + + # Log every change in value/default + if not entry or entry[-1][0] != val or entry[-1][1] != default: + entry.append((ts, val, default)) + + + # Flatten the dictionary list of index/value tuples + index = [] + values = [] + for name, entries in param_dict.items(): + for entry in entries: + index.append((name, pd.to_datetime(entry[0], unit="s"))) + values.append((entry[1], entry[2])) + # Create a DataFrame with a multi-index + df = pd.DataFrame(values, index=pd.MultiIndex.from_tuples(index)) + df.columns = ["Value", "Default"] + df.index.names = ["Name", "Timestamp"] return df @@ -98,13 +179,19 @@ class LogCacheManager: _module_hash = None def __init__(self, cache_dir): + self.init_module_hash() self.cache_dir = cache_dir - if cache_dir is None: - return - os.makedirs(cache_dir, exist_ok=True) - if self._module_hash is None: + self.data = None + self.params = None + if cache_dir is not None: + os.makedirs(cache_dir, exist_ok=True) + + @classmethod + def init_module_hash(cls): + """Initialize the module hash for the current module.""" + if cls._module_hash is None: with open(__file__, "rb") as f: - self._module_hash = hashlib.sha256(f.read()).hexdigest()[:8] + cls._module_hash = hashlib.sha256(f.read()).hexdigest()[:8] @staticmethod def _compute_key(path): @@ -119,30 +206,44 @@ def _compute_key(path): h = h[:16] # 16 characters is plenty to prevent collisions return f"{h}_{size}" - @staticmethod - def _specs_equal(a, b): - """Compare two lists of messages/fields for equality.""" - return set(a) == set(b) - - def _cache_path(self): + def _data_cache_path(self): """Compute the cache path for a given key.""" if self.cache_dir is None: return None - return os.path.join(self.cache_dir, f"{self.key}.feather") + return os.path.join(self.cache_dir, f"{self._key}.feather") + + def _param_cache_path(self): + """Compute the parameter cache path for a given key.""" + if self.cache_dir is None: + return None + return os.path.join(self.cache_dir, f"{self._key}.params.feather") - def _meta_path(self): + def _data_meta_path(self): """Compute the metadata path for a given key.""" if self.cache_dir is None: return None - return os.path.join(self.cache_dir, f"{self.key}.meta.json") + return os.path.join(self.cache_dir, f"{self._key}.meta.json") - @staticmethod - def _metadata(specs, frequency): + def _param_meta_path(self): + """Compute the parameter metadata path for a given key.""" + if self.cache_dir is None: + return None + return os.path.join(self.cache_dir, f"{self._key}.params.meta.json") + + @classmethod + def _data_meta(cls, fields, frequency): """Generate metadata for the cache file.""" return { - "specs": specs, + "fields": fields, "frequency": frequency, - "module_hash": LogCacheManager._module_hash, + "module_hash": cls._module_hash, + } + + @classmethod + def _param_meta(cls): + """Generate metadata for the parameter cache file.""" + return { + "module_hash": cls._module_hash, } @staticmethod @@ -157,48 +258,72 @@ def _compare_metadata(meta1, meta2): return False return True - def try_load_dataframe(self, path, specs, frequency): - """Try to load a cached DataFrame for this log file. + def try_load(self, path, fields=None, frequency=None): + """Try to load a cached data and params for this log file. - If the cache file does not exist yet, we store the information needed - to save the cache file later. + If we store the information needed to save the cache file later in + case a cache file does not exist yet. """ if self.cache_dir is None or not os.path.exists(self.cache_dir): - return None - self.path = path - self.key = self._compute_key(path) - self.meta = self._metadata(specs, frequency) - cache_path = self._cache_path() - meta_path = self._meta_path() - - if os.path.exists(cache_path) and os.path.exists(meta_path): - with open(meta_path, "r") as f: + return None, None + self._log_path = path + self._key = self._compute_key(path) + self._data_meta = self._data_meta(fields, frequency) + self._param_meta = self._param_meta() + data_cache_path = self._data_cache_path() + data_meta_path = self._data_meta_path() + + if os.path.exists(data_cache_path) and os.path.exists(data_meta_path): + with open(data_meta_path, "r") as f: + meta = json.load(f) + if self._compare_metadata(meta, self._data_meta): + self.data = pd.read_feather(data_cache_path) + + param_cache_path = self._param_cache_path() + param_meta_path = self._param_meta_path() + if os.path.exists(param_cache_path) and os.path.exists( + param_meta_path + ): + with open(param_meta_path, "r") as f: meta = json.load(f) - if self._compare_metadata(meta, self.meta): - return pd.read_feather(cache_path) - return None + if self._compare_metadata(meta, self._param_meta): + self.params = pd.read_feather(param_cache_path) + + return self.data, self.params - def save_dataframe(self, df): + def save(self): """Save the DataFrame to a cache file.""" if self.cache_dir is None or not os.path.exists(self.cache_dir): return - key = self._compute_key(self.path) - if self.key != key: + if self.data is None and self.params is None: + raise ValueError( + "Either data or params must be provided to save the cache" + ) + key = self._compute_key(self._log_path) + if self._key != key: print( - f"Warning: cache key {self.key} does not match computed key {key}. " - "This suggests the log file has changed since it was opened. " - "This dataframe will not be saved to the cache." + f"Warning: cache key {self._key} does not match computed key " + f"{key}. This suggests the log file has changed since it was " + "opened. Your data is likely truncated; it will not be saved " + "to the cache." ) return - df.to_feather(self._cache_path()) - with open(self._meta_path(), "w") as f: - json.dump(self.meta, f) + if self.data is not None: + self.data.to_feather(self._data_cache_path()) + with open(self._data_meta_path(), "w") as f: + json.dump(self._data_meta, f) - # Force the user to call try_load_dataframe() again before saving again - self.key = None - self.path = None - self.meta = None + if self.params is not None: + self.params.to_feather(self._param_cache_path()) + with open(self._param_meta_path(), "w") as f: + json.dump(self._param_meta, f) + + # Every call to save() should be preceded by a call to try_load() + self._key = None + self._log_path = None + self._data_meta = None + self._param_meta = None def main(): @@ -240,7 +365,9 @@ def main(): from line_profiler import LineProfiler profiler = LineProfiler() - profiler.add_function(parse_log_to_df) + profiler.add_function(load_log) + profiler.add_function(parse_log_data) + profiler.add_function(parse_log_params) profiler.add_function(new_row) profiler.enable_by_count() @@ -249,12 +376,25 @@ def main(): "No fields provided. Use --fields to specify message types and/or fields." ) - df = parse_log_to_df( + data, params = load_log( args.path, args.fields, args.frequency, args.cache_dir ) - print(df.head()) + print(data.head()) + print("...") + print(data.tail()) + print(params.head()) + print("...") + print(params.tail()) + + # Drop all params that don't change + # Params is a multi-index with Name, Timestamp. Drop everything with only + # one timestamp + params = params.groupby("Name").filter(lambda x: len(x) > 1) + + # Print all the names remaining, and their counts + print("Remaining params:") + print(params.groupby("Name").size()) print("...") - print(df.tail()) if args.profile: profiler.print_stats() From 78ad6b386dc4a338f8639e3809b171c11de3bf91 Mon Sep 17 00:00:00 2001 From: Bob Long Date: Wed, 21 May 2025 20:02:34 +1000 Subject: [PATCH 6/7] setup: add tools to packages --- setup.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/setup.py b/setup.py index 728cb7d46..e56b4bd51 100644 --- a/setup.py +++ b/setup.py @@ -168,7 +168,9 @@ def run(self): 'pymavlink.dialects.v10', 'pymavlink.dialects.v20', 'pymavlink.dfindexer', + 'pymavlink.tools', ], + # TODO: calls to the individual tools should be deprecated in favor of running python -m scripts = [ 'tools/magfit_delta.py', 'tools/mavextract.py', 'tools/mavgraph.py', 'tools/mavparmdiff.py', 'tools/mavtogpx.py', 'tools/magfit_gps.py', From 6d7166fcb5a6c0f3160f2e4d598a609295ef517e Mon Sep 17 00:00:00 2001 From: Bob Long Date: Thu, 22 May 2025 21:46:00 +1000 Subject: [PATCH 7/7] dfreader_pandas: add UTC zone to timestamps --- tools/dfreader_pandas.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/dfreader_pandas.py b/tools/dfreader_pandas.py index 921186e38..7778659c1 100644 --- a/tools/dfreader_pandas.py +++ b/tools/dfreader_pandas.py @@ -88,7 +88,7 @@ def parse_log_data(reader: DFReader_binary, fields, frequency): rows.append(new_row(reader, fields)) df = pd.DataFrame(rows) - df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s") + df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s", utc=True) df.set_index("timestamp", inplace=True) df = df[[f"{m}.{f}" for m in fields.keys() for f in fields[m]]] @@ -131,7 +131,7 @@ def parse_log_params(reader): values = [] for name, entries in param_dict.items(): for entry in entries: - index.append((name, pd.to_datetime(entry[0], unit="s"))) + index.append((name, pd.to_datetime(entry[0], unit="s", utc=True))) values.append((entry[1], entry[2])) # Create a DataFrame with a multi-index df = pd.DataFrame(values, index=pd.MultiIndex.from_tuples(index))