diff --git a/jsonpatch.py b/jsonpatch.py index d3fc26d..3386440 100644 --- a/jsonpatch.py +++ b/jsonpatch.py @@ -501,6 +501,26 @@ def apply(self, obj): return obj +class AppendOperation(PatchOperation): + """ Appends text to a string value at the specified location """ + + def apply(self, obj): + subobj, part = self.pointer.to_last(obj) + + if part is None: + raise JsonPatchConflict("Cannot append to root document") + + try: + if isinstance(subobj[part], basestring): + subobj[part] += self.operation['value'] + else: + raise JsonPatchConflict("Cannot append to non-string value") + except (KeyError, IndexError) as ex: + raise JsonPatchConflict(str(ex)) + + return obj + + class JsonPatch(object): json_dumper = staticmethod(json.dumps) json_loader = staticmethod(_jsonloads) @@ -512,6 +532,7 @@ class JsonPatch(object): 'move': MoveOperation, 'test': TestOperation, 'copy': CopyOperation, + 'append': AppendOperation, }) """A JSON Patch is a list of Patch Operations. @@ -567,7 +588,7 @@ def __init__(self, patch, pointer_cls=JsonPointer): # is correct by retrieving each patch element. # Much of the validation is done in the initializer # though some is delayed until the patch is applied. - for op in self.patch: + for i, op in enumerate(self.patch): # We're only checking for basestring in the following check # for two reasons: # @@ -581,7 +602,21 @@ def __init__(self, patch, pointer_cls=JsonPointer): raise InvalidJsonPatch("Document is expected to be sequence of " "operations, got a sequence of strings.") - self._get_operation(op) + # Skip validation for optimized append operations (only 'value' or 'v' field) + if isinstance(op, dict) and len(op) == 1 and ('value' in op or 'v' in op): + continue + + # Handle shortened notation during validation + if isinstance(op, dict) and 'v' in op: + op_copy = dict(op) + op_copy['value'] = op_copy.pop('v') + if 'p' in op_copy: + op_copy['path'] = op_copy.pop('p') + if 'o' in op_copy: + op_copy['op'] = op_copy.pop('o') + self._get_operation(op_copy) + else: + self._get_operation(op) def __str__(self): """str(self) -> self.to_string()""" @@ -688,8 +723,39 @@ def apply(self, obj, in_place=False): if not in_place: obj = copy.deepcopy(obj) - for operation in self._ops: - obj = operation.apply(obj) + last_append_path = None + + for i, operation in enumerate(self.patch): + # Make a copy to avoid modifying the original + if isinstance(operation, dict): + operation = dict(operation) + + # Handle shortened notation where 'v' is used instead of 'value' + if isinstance(operation, dict) and 'v' in operation: + operation['value'] = operation.pop('v') + if 'p' in operation: + operation['path'] = operation.pop('p') + if 'o' in operation: + operation['op'] = operation.pop('o') + + # Handle optimized append operations (only 'value' field present) + if isinstance(operation, dict) and len(operation) == 1 and 'value' in operation: + # This is a continuation of the previous append operation + if last_append_path is not None: + operation = { + 'op': 'append', + 'path': last_append_path, + 'value': operation['value'] + } + else: + raise InvalidJsonPatch("Standalone 'value' field without preceding append operation") + elif isinstance(operation, dict) and operation.get('op') == 'append': + last_append_path = operation.get('path') + else: + last_append_path = None + + op = self._get_operation(operation) + obj = op.apply(obj) return obj @@ -921,7 +987,19 @@ def _compare_values(self, path, key, src, dst): return else: - self._item_replaced(path, key, dst) + # Check if this is a string append operation + if isinstance(src, basestring) and isinstance(dst, basestring) and dst.startswith(src): + appended_text = dst[len(src):] + if appended_text: # Only create append op if there's actual text to append + self.insert(AppendOperation({ + 'op': 'append', + 'path': _path_join(path, key), + 'value': appended_text, + }, pointer_cls=self.pointer_cls)) + else: + self._item_replaced(path, key, dst) + else: + self._item_replaced(path, key, dst) def _path_join(path, key): diff --git a/tests.py b/tests.py index d9eea92..e18c9be 100755 --- a/tests.py +++ b/tests.py @@ -236,6 +236,69 @@ def test_append(self): {'op': 'add', 'path': '/foo/-', 'value': 4}, ]) self.assertEqual(res['foo'], [1, 2, 3, 4]) + + def test_append_string(self): + obj = {'message': {'content': {'parts': ['H']}}} + res = jsonpatch.apply_patch(obj, [ + {'op': 'append', 'path': '/message/content/parts/0', 'value': 'E'}, + {'op': 'append', 'path': '/message/content/parts/0', 'value': 'O'}, + {'op': 'append', 'path': '/message/content/parts/0', 'value': 'O'}, + {'op': 'append', 'path': '/message/content/parts/0', 'value': 'L'}, + ]) + self.assertEqual(res['message']['content']['parts'][0], 'HELLO') + + def test_append_string_with_short_notation(self): + obj = {'message': {'content': {'parts': ['H']}}} + res = jsonpatch.apply_patch(obj, [ + {'p': '/message/content/parts/0', 'o': 'append', 'v': 'E'}, + {'p': '/message/content/parts/0', 'o': 'append', 'v': 'L'}, + {'p': '/message/content/parts/0', 'o': 'append', 'v': 'L'}, + {'p': '/message/content/parts/0', 'o': 'append', 'v': 'O'}, + ]) + self.assertEqual(res['message']['content']['parts'][0], 'HELLO') + + def test_append_string_optimized(self): + obj = {'message': {'content': {'parts': ['H']}}} + res = jsonpatch.apply_patch(obj, [ + {'p': '/message/content/parts/0', 'o': 'append', 'v': 'E'}, + {'v': 'L'}, # p, o omitted + {'v': 'L'}, # p, o omitted + {'v': 'L'}, # p, o omitted + ]) + self.assertEqual(res['message']['content']['parts'][0], 'HELLO') + + def test_append_to_non_string_fails(self): + obj = {'foo': 123} + with self.assertRaises(jsonpatch.JsonPatchConflict): + jsonpatch.apply_patch(obj, [{'op': 'append', 'path': '/foo', 'value': 'bar'}]) + + def test_append_to_root_fails(self): + obj = 'hello' + with self.assertRaises(jsonpatch.JsonPatchConflict): + jsonpatch.apply_patch(obj, [{'op': 'append', 'path': '', 'value': 'world'}]) + + def test_append_optimized_without_preceding_fails(self): + obj = {'message': 'hello'} + with self.assertRaises(jsonpatch.InvalidJsonPatch): + jsonpatch.apply_patch(obj, [{'v': 'world'}]) + + def test_make_patch_generates_append(self): + src = {'message': 'Hello'} + dst = {'message': 'Hello World'} + patch = jsonpatch.make_patch(src, dst) + self.assertEqual(len(patch.patch), 1) + self.assertEqual(patch.patch[0]['op'], 'append') + self.assertEqual(patch.patch[0]['path'], '/message') + self.assertEqual(patch.patch[0]['value'], ' World') + + def test_make_patch_not_append_for_replacement(self): + src = {'message': 'Hello'} + dst = {'message': 'Goodbye'} + patch = jsonpatch.make_patch(src, dst) + self.assertEqual(len(patch.patch), 1) + self.assertEqual(patch.patch[0]['op'], 'replace') + self.assertEqual(patch.patch[0]['path'], '/message') + self.assertEqual(patch.patch[0]['value'], 'Goodbye') def test_add_missing_path(self): obj = {'bar': 'qux'}