Skip to content

Issue 243 - Streamlit UI #318

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import streamlit as st
from tabs.dataset_viewer import dataset_viewer_tab
from tabs.inference import inference_tab
from tabs.evaluator import evaluator_tab

st.set_page_config(page_title="DetectionMetrics", layout="wide")

# st.title("DetectionMetrics")

PAGES = {
"Dataset Viewer": dataset_viewer_tab,
"Inference": inference_tab,
"Evaluator": evaluator_tab
}

page = st.sidebar.radio("DetectionMetrics", list(PAGES.keys()))

PAGES[page]()
24 changes: 12 additions & 12 deletions detectionmetrics/datasets/coco.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class CocoDataset(ImageDetectionDataset):
"""

def __init__(self, annotation_file: str, image_dir: str, split: str = "train"):
# Load COCO object once
# Load COCO object once - this loads all annotations into memory with efficient indexing
self.coco = COCO(annotation_file)
self.image_dir = image_dir
self.split = split
Expand All @@ -94,29 +94,29 @@ def read_annotation(
) -> Tuple[List[List[float]], List[int], List[int]]:
"""Return bounding boxes, labels, and category_ids for a given image ID.

This method uses COCO's efficient indexing to load annotations on-demand.
The COCO object maintains an internal index that allows for very fast
annotation retrieval without needing a separate cache.

:param fname: str (image_id in string form)
:return: Tuple of (boxes, labels, category_ids)
"""
# Extract image ID (fname might be a path or ID string)
try:
image_id = int(
os.path.basename(fname)
) # handles both '123' and '/path/to/123'
image_id = int(os.path.basename(fname))
except ValueError:
raise ValueError(f"Invalid annotation ID: {fname}")


# Use COCO's efficient indexing to get annotations for this image
# getAnnIds() and loadAnns() are very fast due to COCO's internal indexing
ann_ids = self.coco.getAnnIds(imgIds=image_id)
anns = self.coco.loadAnns(ann_ids)

boxes = []
labels = []
category_ids = []


boxes, labels, category_ids = [], [], []
for ann in anns:
# Convert [x, y, width, height] to [x1, y1, x2, y2]
x, y, w, h = ann["bbox"]
boxes.append([x, y, x + w, y + h])
labels.append(ann["category_id"])
category_ids.append(ann["category_id"])

return boxes, labels, category_ids
48 changes: 39 additions & 9 deletions detectionmetrics/models/torch_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def __init__(
model: Union[str, torch.nn.Module],
model_cfg: str,
ontology_fname: str,
device: torch.device = None,
):
"""Image detection model for PyTorch framework

Expand All @@ -201,13 +202,17 @@ def __init__(
:type model_cfg: str
:param ontology_fname: JSON file containing model output ontology
:type ontology_fname: str
:param device: torch.device to use (optional). If not provided, will auto-select cuda, mps, or cpu.
"""
# Get device (GPU, MPS, or CPU)
self.device = torch.device(
"cuda"
if torch.cuda.is_available()
else "mps" if torch.backends.mps.is_available() else "cpu"
)
# Get device (GPU, MPS, or CPU) if not provided
if device is None:
self.device = torch.device(
"cuda"
if torch.cuda.is_available()
else "mps" if torch.backends.mps.is_available() else "cpu"
)
else:
self.device = device

# Load model from file or use passed instance
if isinstance(model, str):
Expand Down Expand Up @@ -309,6 +314,7 @@ def eval(
ontology_translation: Optional[str] = None,
predictions_outdir: Optional[str] = None,
results_per_sample: bool = False,
progress_callback=None,
) -> pd.DataFrame:
"""Evaluate model over a detection dataset and compute metrics

Expand All @@ -322,6 +328,8 @@ def eval(
:type predictions_outdir: Optional[str]
:param results_per_sample: Store per-sample metrics
:type results_per_sample: bool
:param progress_callback: Optional callback function for progress updates in Streamlit UI
:type progress_callback: Optional[Callable[[int, int], None]]
:return: DataFrame containing evaluation results
:rtype: pd.DataFrame
"""
Expand Down Expand Up @@ -360,9 +368,19 @@ def eval(
iou_threshold=iou_threshold, num_classes=self.n_classes
)

# Calculate total samples for progress tracking
total_samples = len(dataloader.dataset)
processed_samples = 0

with torch.no_grad():
pbar = tqdm(dataloader, leave=True)
for image_ids, images, targets in pbar:
# Use tqdm if no progress callback provided, otherwise use regular iteration
if progress_callback is None:
pbar = tqdm(dataloader, leave=True)
iterator = pbar
else:
iterator = dataloader

for image_ids, images, targets in iterator:
# Defensive check for empty images
if not images or any(img.numel() == 0 for img in images):
print("Skipping batch: empty image tensor detected.")
Expand Down Expand Up @@ -448,8 +466,20 @@ def eval(
predictions_outdir, f"{sample_id}_metrics.csv"
)
)

processed_samples += 1

# Call progress callback if provided
if progress_callback is not None:
progress_callback(processed_samples, total_samples)

# Return both the DataFrame and the metrics factory for access to precision-recall curves
return {
"metrics_df": metrics_factory.get_metrics_dataframe(self.ontology),
"metrics_factory": metrics_factory
}


return metrics_factory.get_metrics_dataframe(self.ontology)

def get_computational_cost(
self, image_size: Tuple[int], runs: int = 30, warm_up_runs: int = 5
Expand Down
145 changes: 144 additions & 1 deletion detectionmetrics/utils/detection_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ def __init__(self, iou_threshold: float = 0.5, num_classes: Optional[int] = None
self.iou_threshold = iou_threshold
self.num_classes = num_classes
self.results = defaultdict(list) # stores detection results per class
# Store raw data for multi-threshold evaluation
self.raw_data = [] # List of (gt_boxes, gt_labels, pred_boxes, pred_labels, pred_scores)

def update(self, gt_boxes, gt_labels, pred_boxes, pred_labels, pred_scores):
"""
Expand All @@ -33,6 +35,9 @@ def update(self, gt_boxes, gt_labels, pred_boxes, pred_labels, pred_scores):
if hasattr(pred_scores, "detach"):
pred_scores = pred_scores.detach().cpu().numpy()

# Store raw data for multi-threshold evaluation
self.raw_data.append((gt_boxes, gt_labels, pred_boxes, pred_labels, pred_scores))

# Handle empty inputs
if len(gt_boxes) == 0 and len(pred_boxes) == 0:
return # Nothing to process
Expand Down Expand Up @@ -63,13 +68,19 @@ def _match_predictions(
pred_boxes: np.ndarray,
pred_labels: List[int],
pred_scores: List[float],
iou_threshold: Optional[float] = None,
) -> Dict[int, List[Tuple[float, int]]]:
"""
Match predictions to ground truth and return per-class TP/FP flags with scores.

Args:
iou_threshold: If provided, overrides self.iou_threshold

Returns:
Dict[label_id, List[(score, tp_or_fp)]]
"""
if iou_threshold is None:
iou_threshold = self.iou_threshold

results = defaultdict(list)
used = set()
Expand All @@ -90,7 +101,7 @@ def _match_predictions(
max_iou = iou
max_j = j

if max_iou >= self.iou_threshold:
if max_iou >= iou_threshold:
results[p_label].append((score, 1)) # True positive
used.add(max_j)
else:
Expand Down Expand Up @@ -148,6 +159,124 @@ def compute_metrics(self) -> Dict[int, Dict[str, float]]:

return metrics

def compute_coco_map(self) -> float:
"""
Compute COCO-style mAP (mean AP over IoU thresholds 0.5:0.05:0.95).

Returns:
float: mAP@[0.5:0.95]
"""
iou_thresholds = np.arange(0.5, 1.0, 0.05)
aps = []

for iou_thresh in iou_thresholds:
# Reset results for this threshold
threshold_results = defaultdict(list)

# Process all raw data with current threshold
for gt_boxes, gt_labels, pred_boxes, pred_labels, pred_scores in self.raw_data:
# Handle empty inputs
if len(gt_boxes) == 0 and len(pred_boxes) == 0:
continue

# Handle case where there are predictions but no ground truth
if len(gt_boxes) == 0:
for p_label, score in zip(pred_labels, pred_scores):
threshold_results[p_label].append((score, 0)) # All are false positives
continue

# Handle case where there is ground truth but no predictions
if len(pred_boxes) == 0:
for g_label in gt_labels:
threshold_results[g_label].append((None, -1)) # All are false negatives
continue

matches = self._match_predictions(
gt_boxes, gt_labels, pred_boxes, pred_labels, pred_scores, iou_thresh
)

for label in matches:
threshold_results[label].extend(matches[label])

# Compute AP for this threshold
threshold_ap_values = []
for label, detections in threshold_results.items():
detections = sorted(
[d for d in detections if d[0] is not None], key=lambda x: -x[0]
)
tps = [d[1] == 1 for d in detections]
fps = [d[1] == 0 for d in detections]
fn_count = sum(1 for d in threshold_results[label] if d[1] == -1)

ap, _, _ = compute_ap(tps, fps, fn_count)
threshold_ap_values.append(ap)

# Mean AP for this threshold
if threshold_ap_values:
aps.append(np.mean(threshold_ap_values))
else:
aps.append(0.0)

# Return mean over all thresholds
return np.mean(aps) if aps else 0.0

def get_overall_precision_recall_curve(self) -> Dict[str, List[float]]:
"""
Get overall precision-recall curve data (aggregated across all classes).

Returns:
Dict[str, List[float]] with keys 'precision' and 'recall'
"""
all_detections = []

# Collect all detections from all classes
for label, detections in self.results.items():
all_detections.extend(detections)

if len(all_detections) == 0:
return {"precision": [0.0], "recall": [0.0]}

# Sort by score
all_detections = sorted(
[d for d in all_detections if d[0] is not None], key=lambda x: -x[0]
)

tps = [d[1] == 1 for d in all_detections]
fps = [d[1] == 0 for d in all_detections]
fn_count = sum(1 for d in all_detections if d[1] == -1)

_, precision, recall = compute_ap(tps, fps, fn_count)

return {
"precision": precision.tolist() if hasattr(precision, 'tolist') else list(precision),
"recall": recall.tolist() if hasattr(recall, 'tolist') else list(recall)
}

def compute_auc_pr(self) -> float:
"""
Compute the Area Under the Precision-Recall Curve (AUC-PR).

Returns:
float: Area under the precision-recall curve
"""
curve_data = self.get_overall_precision_recall_curve()
precision = np.array(curve_data['precision'])
recall = np.array(curve_data['recall'])

# Handle edge cases
if len(precision) == 0 or len(recall) == 0:
return 0.0

# Sort by recall to ensure proper integration
sorted_indices = np.argsort(recall)
recall_sorted = recall[sorted_indices]
precision_sorted = precision[sorted_indices]

# Compute AUC using trapezoidal rule
auc = np.trapz(precision_sorted, recall_sorted)

return float(auc)

def get_metrics_dataframe(self, ontology: dict) -> pd.DataFrame:
"""
Get results as a pandas DataFrame.
Expand All @@ -169,6 +298,20 @@ def get_metrics_dataframe(self, ontology: dict) -> pd.DataFrame:
values = [v for v in metrics_dict[metric].values() if not pd.isna(v)]
metrics_dict[metric]["mean"] = np.mean(values) if values else np.nan

# Add COCO-style mAP
coco_map = self.compute_coco_map()
metrics_dict["mAP@[0.5:0.95]"] = {}
for class_name in class_names:
metrics_dict["mAP@[0.5:0.95]"][class_name] = np.nan # Per-class not applicable
metrics_dict["mAP@[0.5:0.95]"]["mean"] = coco_map

# Add AUC-PR
auc_pr = self.compute_auc_pr()
metrics_dict["AUC-PR"] = {}
for class_name in class_names:
metrics_dict["AUC-PR"][class_name] = np.nan # Per-class not applicable
metrics_dict["AUC-PR"]["mean"] = auc_pr

df = pd.DataFrame(metrics_dict)
return df.T # metrics as rows, classes as columns (with mean)

Expand Down
Loading