Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
720 changes: 720 additions & 0 deletions docetl/checkpoint_manager.py

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions docetl/operations/equijoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ def compare_pair(
output = self.runner.api.parse_llm_response(
response.response, {"is_match": "bool"}
)[0]
# Convert to bool if it's a string
if isinstance(output["is_match"], str):
output["is_match"] = output["is_match"].lower() == "true"
except Exception as e:
self.console.log(f"[red]Error parsing LLM response: {e}[/red]")
return False, cost
Expand Down
4 changes: 4 additions & 0 deletions docetl/operations/resolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ def compare_pair(
{"is_match": "bool"},
)[0]

# Convert to bool if it's a string
if isinstance(output["is_match"], str):
output["is_match"] = output["is_match"].lower() == "true"

return output["is_match"], response.total_cost, prompt

def syntax_check(self) -> None:
Expand Down
103 changes: 27 additions & 76 deletions docetl/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import hashlib
import json
import os
import shutil
import time
from collections import defaultdict
from typing import Any, Dict, List, Optional, Tuple, Union
Expand All @@ -37,6 +36,7 @@
from rich.markup import escape
from rich.panel import Panel

from docetl.checkpoint_manager import CheckpointManager
from docetl.config_wrapper import ConfigWrapper
from docetl.containers import OpContainer, StepBoundary
from docetl.dataset import Dataset, create_parsing_tool_map
Expand Down Expand Up @@ -128,8 +128,17 @@ def __init__(self, config: Dict, max_threads: int = None, **kwargs):
def _initialize_state(self) -> None:
"""Initialize basic runner state and datasets"""
self.datasets = {}
self.intermediate_dir = (
self.config.get("pipeline", {}).get("output", {}).get("intermediate_dir")
output_config = self.config.get("pipeline", {}).get("output", {})
self.intermediate_dir = output_config.get("intermediate_dir")
storage_type = output_config.get("storage_type", "json") # default to json

# Initialize checkpoint manager
self.checkpoint_manager = (
CheckpointManager(
self.intermediate_dir, console=self.console, storage_type=storage_type
)
if self.intermediate_dir
else None
)

def _setup_parsing_tools(self) -> None:
Expand Down Expand Up @@ -544,14 +553,7 @@ def save(self, data: List[Dict]) -> None:
def _load_from_checkpoint_if_exists(
self, step_name: str, operation_name: str
) -> Optional[List[Dict]]:
if self.intermediate_dir is None:
return None

intermediate_config_path = os.path.join(
self.intermediate_dir, ".docetl_intermediate_config.json"
)

if not os.path.exists(intermediate_config_path):
if not self.checkpoint_manager:
return None

# Make sure the step and op name is in the checkpoint config path
Expand All @@ -561,40 +563,18 @@ def _load_from_checkpoint_if_exists(
):
return None

# See if the checkpoint config is the same as the current step op hash
with open(intermediate_config_path, "r") as f:
intermediate_config = json.load(f)

if (
intermediate_config.get(step_name, {}).get(operation_name, "")
!= self.step_op_hashes[step_name][operation_name]
):
return None

checkpoint_path = os.path.join(
self.intermediate_dir, step_name, f"{operation_name}.json"
# Use the checkpoint manager to load the checkpoint
operation_hash = self.step_op_hashes[step_name][operation_name]
return self.checkpoint_manager.load_checkpoint(
step_name, operation_name, operation_hash
)
# check if checkpoint exists
if os.path.exists(checkpoint_path):
if f"{step_name}_{operation_name}" not in self.datasets:
self.datasets[f"{step_name}_{operation_name}"] = Dataset(
self, "file", checkpoint_path, "local"
)

self.console.log(
f"[green]✓[/green] [italic]Loaded checkpoint for operation '{operation_name}' in step '{step_name}' from {checkpoint_path}[/italic]"
)

return self.datasets[f"{step_name}_{operation_name}"].load()
return None

def clear_intermediate(self) -> None:
"""
Clear the intermediate directory.
"""
# Remove the intermediate directory
if self.intermediate_dir:
shutil.rmtree(self.intermediate_dir)
if self.checkpoint_manager:
self.checkpoint_manager.clear_all_checkpoints()
return

raise ValueError("Intermediate directory not set. Cannot clear intermediate.")
Expand All @@ -605,7 +585,7 @@ def _save_checkpoint(
"""
Save a checkpoint of the current data after an operation.

This method creates a JSON file containing the current state of the data
This method saves the current state of the data using PyArrow format
after an operation has been executed. The checkpoint is saved in a directory
structure that reflects the step and operation names.

Expand All @@ -618,44 +598,15 @@ def _save_checkpoint(
The checkpoint is saved only if a checkpoint directory has been specified
when initializing the DSLRunner.
"""
checkpoint_path = os.path.join(
self.intermediate_dir, step_name, f"{operation_name}.json"
)
if os.path.dirname(checkpoint_path):
os.makedirs(os.path.dirname(checkpoint_path), exist_ok=True)
with open(checkpoint_path, "w") as f:
json.dump(data, f)

# Update the intermediate config file with the hash for this step/operation
# so that future runs can validate and reuse this checkpoint.
if self.intermediate_dir:
intermediate_config_path = os.path.join(
self.intermediate_dir, ".docetl_intermediate_config.json"
)

# Initialize or load existing intermediate configuration
if os.path.exists(intermediate_config_path):
try:
with open(intermediate_config_path, "r") as cfg_file:
intermediate_config: Dict[str, Dict[str, str]] = json.load(cfg_file)
except json.JSONDecodeError:
# If the file is corrupted, start fresh to avoid crashes
intermediate_config = {}
else:
intermediate_config = {}

# Ensure nested dict structure exists
step_dict = intermediate_config.setdefault(step_name, {})

# Write (or overwrite) the hash for the current operation
step_dict[operation_name] = self.step_op_hashes[step_name][operation_name]
if not self.checkpoint_manager:
return

# Persist the updated configuration
with open(intermediate_config_path, "w") as cfg_file:
json.dump(intermediate_config, cfg_file, indent=2)
# Get the operation hash for validation
operation_hash = self.step_op_hashes[step_name][operation_name]

self.console.log(
f"[green]✓ [italic]Intermediate saved for operation '{operation_name}' in step '{step_name}' at {checkpoint_path}[/italic][/green]"
# Use the checkpoint manager to save the checkpoint
self.checkpoint_manager.save_checkpoint(
step_name, operation_name, data, operation_hash
)

def should_optimize(
Expand Down
57 changes: 56 additions & 1 deletion docs/execution/running-pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,59 @@ Here are some additional notes to help you get the most out of your pipeline:
type: file
path: ...
intermediate_dir: intermediate_results
```
storage_type: json # Optional: "json" (default) or "arrow"
```

- **Storage Format**: You can choose the storage format for intermediate checkpoints using the `storage_type` parameter in your pipeline's output configuration:

- **JSON Format** (`storage_type: json`): Human-readable format that's easy to inspect and debug. This is the default format for backward compatibility.
- **PyArrow Format** (`storage_type: arrow`): Compressed binary format using Parquet files. Offers better performance and smaller file sizes for large datasets. Complex nested data structures are automatically sanitized for PyArrow compatibility while preserving the original data structure when loaded.

Example configurations:

```yaml
# Use JSON format (default)
pipeline:
output:
type: file
path: results.json
intermediate_dir: checkpoints
storage_type: json
```

```yaml
# Use PyArrow format for better performance
pipeline:
output:
type: file
path: results.json
intermediate_dir: checkpoints
storage_type: arrow
```

The checkpoint system is fully backward compatible - you can read existing JSON checkpoints even when using `storage_type: arrow`, and vice versa. This allows for seamless migration between formats.

- **Standalone CheckpointManager Usage**: You can use the CheckpointManager independently from DocETL pipelines to load and analyze checkpoint data programmatically:

```python
from docetl.checkpoint_manager import CheckpointManager

# Create from existing intermediate directory (auto-detects storage format)
cm = CheckpointManager.from_intermediate_dir("/path/to/intermediate")

# List all available checkpoints
outputs = cm.list_outputs()
print(f"Available checkpoints: {outputs}")

# Load specific checkpoint data
data = cm.load_output_by_step_and_op("step_name", "operation_name")

# Load as pandas DataFrame for analysis
df = cm.load_output_as_dataframe("step_name", "operation_name")

# Check checkpoint file sizes
size = cm.get_checkpoint_size("step_name", "operation_name")
total_size = cm.get_total_checkpoint_size()
```

This is useful for post-pipeline analysis, debugging, or building custom tools that work with DocETL checkpoint data.
59 changes: 57 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ boto3 = "^1.37.27"
pandas = "^2.3.0"
python-multipart = "^0.0.20"
fastapi = { version = "^0.115.4", optional = true }
pyarrow = "^18.0.0"

[tool.poetry.extras]
parsing = ["python-docx", "openpyxl", "pydub", "python-pptx", "azure-ai-documentintelligence", "paddlepaddle", "pymupdf"]
Expand Down
Loading