From 9152ddbe5cec47a81f047eb963d9dbd53f8cd84a Mon Sep 17 00:00:00 2001 From: Paul Walsh Date: Fri, 17 Sep 2021 13:55:04 +0300 Subject: [PATCH 1/2] Adds unwind processor. --- dataflows/processors/__init__.py | 1 + dataflows/processors/unwind.py | 39 +++++++++++++ tests/test_lib.py | 97 +++++++++++++++++++++++++++++++- 3 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 dataflows/processors/unwind.py diff --git a/dataflows/processors/__init__.py b/dataflows/processors/__init__.py index 0cd73d4..d7104e7 100644 --- a/dataflows/processors/__init__.py +++ b/dataflows/processors/__init__.py @@ -25,6 +25,7 @@ from .stream import stream from .unpivot import unpivot from .unstream import unstream +from .unwind import unwind from .update_package import update_package, add_metadata from .update_resource import update_resource from .update_schema import update_schema diff --git a/dataflows/processors/unwind.py b/dataflows/processors/unwind.py new file mode 100644 index 0000000..b7dfbfc --- /dev/null +++ b/dataflows/processors/unwind.py @@ -0,0 +1,39 @@ +from dataflows.helpers.resource_matcher import ResourceMatcher + + +def unwind(from_key, to_key, transformer=None, resources=None, source_delete=True): + + """From a row of data, generate a row per value from from_key, where the value is set onto to_key.""" + + def _unwinder(rows): + for row in rows: + try: + iter(row[from_key]) + for value in row[from_key]: + ret = {} + ret.update(row) + ret[to_key] = value if transformer is None else transformer(value) + if source_delete is True: + del ret[from_key] + yield ret + except TypeError: + # no iterable to unwind. Take the value we have and set it on the to_key. + ret = {} + ret.update(row) + ret[to_key] = ( + ret[from_key] if transformer is None else transformer(ret[from_key]) + ) + if source_delete is True: + del ret[from_key] + yield ret + + def func(package): + matcher = ResourceMatcher(resources, package.pkg) + yield package.pkg + for r in package: + if matcher.match(r.res.name): + yield _unwinder(r) + else: + yield r + + return func diff --git a/tests/test_lib.py b/tests/test_lib.py index cde9044..28da5a0 100644 --- a/tests/test_lib.py +++ b/tests/test_lib.py @@ -2309,4 +2309,99 @@ def mult(row): add_field('c', 'integer'), parallelize(mult), ).results()[0][0][:100] - print(res) \ No newline at end of file + print(res) + + +def test_unwind_basic(): + from dataflows import Flow, unwind + + data = [ + {'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world'], 'comments': ['Nice post.', 'Well written']} + ] + results, dp, _ = Flow( + data, + unwind('tags', 'tag'), + ).results() + + assert len(results[0]) == 2 + assert results[0][0]['tag'] == 'hello' + + +def test_unwind_twice_in_flow(): + from dataflows import Flow, unwind + + data = [ + {'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world'], 'comments': ['Nice post.', 'Well written']} + ] + results, dp, _ = Flow( + data, + unwind('tags', 'tag'), + unwind('comments', 'comment'), + ).results() + + assert len(results[0]) == 4 + assert results[0][0]['tag'] == 'hello' + assert results[0][0]['comment'] == 'Nice post.' + + +def test_unwind_from_key_not_iterable(): + from dataflows import Flow, unwind + + data = [ + {'id': 1, 'title': 'Blog Post', 'tags': None, 'comments': ['Nice post.', 'Well written']} + ] + results, dp, _ = Flow( + data, + unwind('tags', 'tag'), + ).results() + + assert len(results[0]) == 1 + assert results[0][0]['tag'] == None + + +def test_unwind_source_delete(): + from dataflows import Flow, unwind + + data = [ + {'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world']} + ] + results, dp, _ = Flow( + data, + unwind('tags', 'tag'), + ).results() + + assert len(results[0]) == 2 + assert results[0][0]['tag'] + assert not results[0][0]['tags'] + + +def test_unwind_source_keep(): + from dataflows import Flow, unwind + + data = [ + {'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world']} + ] + results, dp, _ = Flow( + data, + unwind('tags', 'tag', source_delete=False), + ).results() + + assert len(results[0]) == 2 + assert results[0][0]['tag'] + assert results[0][0]['tags'] == data[0]['tags'] + + +def test_unwind_with_transformer(): + from dataflows import Flow, unwind + + data = [ + {'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world']} + ] + results, dp, _ = Flow( + data, + unwind('tags', 'tag', lambda v: v.title()), + ).results() + + assert len(results[0]) == 2 + assert results[0][0]['tag'] == 'Hello' + assert results[0][1]['tag'] == 'World' From a09fdc7fc47b6a5b586262fe4b3a5c6d3c0bc8ed Mon Sep 17 00:00:00 2001 From: Paul Walsh Date: Sun, 26 Sep 2021 20:43:35 +0300 Subject: [PATCH 2/2] step 1 of managing fields on schema. --- dataflows/processors/unwind.py | 29 ++++++++++++++++------ tests/test_lib.py | 45 ++++++++++++++++++++-------------- 2 files changed, 48 insertions(+), 26 deletions(-) diff --git a/dataflows/processors/unwind.py b/dataflows/processors/unwind.py index b7dfbfc..fca5212 100644 --- a/dataflows/processors/unwind.py +++ b/dataflows/processors/unwind.py @@ -1,7 +1,8 @@ from dataflows.helpers.resource_matcher import ResourceMatcher +from dataflows.processors.add_computed_field import get_new_fields -def unwind(from_key, to_key, transformer=None, resources=None, source_delete=True): +def unwind(from_key: str, to_key: dict, transformer=None, resources=None, source_delete=True): """From a row of data, generate a row per value from from_key, where the value is set onto to_key.""" @@ -12,7 +13,7 @@ def _unwinder(rows): for value in row[from_key]: ret = {} ret.update(row) - ret[to_key] = value if transformer is None else transformer(value) + ret[to_key['name']] = value if transformer is None else transformer(value) if source_delete is True: del ret[from_key] yield ret @@ -20,7 +21,7 @@ def _unwinder(rows): # no iterable to unwind. Take the value we have and set it on the to_key. ret = {} ret.update(row) - ret[to_key] = ( + ret[to_key['name']] = ( ret[from_key] if transformer is None else transformer(ret[from_key]) ) if source_delete is True: @@ -29,11 +30,25 @@ def _unwinder(rows): def func(package): matcher = ResourceMatcher(resources, package.pkg) + for resource in package.pkg.descriptor['resources']: + if matcher.match(resource['name']): + new_fields = get_new_fields( + resource, [{'target': {'name': to_key['name'], 'type': to_key['type']}}] + ) + if source_delete is True: + resource['schema']['fields'] = [ + field + for field in resource['schema']['fields'] + if not field['name'] == from_key + ] + resource['schema']['fields'].extend(new_fields) + yield package.pkg - for r in package: - if matcher.match(r.res.name): - yield _unwinder(r) + + for resource in package: + if matcher.match(resource.res.name): + yield _unwinder(resource) else: - yield r + yield resource return func diff --git a/tests/test_lib.py b/tests/test_lib.py index 28da5a0..fb0e97a 100644 --- a/tests/test_lib.py +++ b/tests/test_lib.py @@ -2316,27 +2316,38 @@ def test_unwind_basic(): from dataflows import Flow, unwind data = [ - {'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world'], 'comments': ['Nice post.', 'Well written']} + { + 'id': 1, + 'title': 'Blog Post', + 'tags': ['hello', 'world'], + 'comments': ['Nice post.', 'Well written'], + } ] results, dp, _ = Flow( data, - unwind('tags', 'tag'), + unwind('tags', {'name': 'tag', 'type': 'string'}), ).results() assert len(results[0]) == 2 assert results[0][0]['tag'] == 'hello' + assert 'tag' in [f.name for f in dp.resources[0].schema.fields] def test_unwind_twice_in_flow(): from dataflows import Flow, unwind data = [ - {'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world'], 'comments': ['Nice post.', 'Well written']} + { + 'id': 1, + 'title': 'Blog Post', + 'tags': ['hello', 'world'], + 'comments': ['Nice post.', 'Well written'], + } ] results, dp, _ = Flow( data, - unwind('tags', 'tag'), - unwind('comments', 'comment'), + unwind('tags', {'name': 'tag', 'type': 'string'}), + unwind('comments', {'name': 'comment', 'type': 'string'}), ).results() assert len(results[0]) == 4 @@ -2352,7 +2363,7 @@ def test_unwind_from_key_not_iterable(): ] results, dp, _ = Flow( data, - unwind('tags', 'tag'), + unwind('tags', {'name': 'tag', 'type': 'string'}), ).results() assert len(results[0]) == 1 @@ -2362,44 +2373,40 @@ def test_unwind_from_key_not_iterable(): def test_unwind_source_delete(): from dataflows import Flow, unwind - data = [ - {'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world']} - ] + data = [{'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world']}] results, dp, _ = Flow( data, - unwind('tags', 'tag'), + unwind('tags', {'name': 'tag', 'type': 'string'}), ).results() assert len(results[0]) == 2 assert results[0][0]['tag'] - assert not results[0][0]['tags'] + assert not results[0][0].get('tags') + assert 'tags' not in [f.name for f in dp.resources[0].schema.fields] def test_unwind_source_keep(): from dataflows import Flow, unwind - data = [ - {'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world']} - ] + data = [{'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world']}] results, dp, _ = Flow( data, - unwind('tags', 'tag', source_delete=False), + unwind('tags', {'name': 'tag', 'type': 'string'}, source_delete=False), ).results() assert len(results[0]) == 2 assert results[0][0]['tag'] assert results[0][0]['tags'] == data[0]['tags'] + assert 'tags' in [f.name for f in dp.resources[0].schema.fields] def test_unwind_with_transformer(): from dataflows import Flow, unwind - data = [ - {'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world']} - ] + data = [{'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world']}] results, dp, _ = Flow( data, - unwind('tags', 'tag', lambda v: v.title()), + unwind('tags', {'name': 'tag', 'type': 'string'}, lambda v: v.title()), ).results() assert len(results[0]) == 2