-
Notifications
You must be signed in to change notification settings - Fork 34
Open
Labels
Description
Describe the bug
When chaining a component which produces FORMAT.record
to stetl.outputs.dboutput.PostgresInsertOutput
which consumes both FORMAT.record_array
and FORMAT.record
a ValueError
exception is raised because only the first format is checked.
Component in question:
# BAG related filters
from stetl.component import Config
from stetl.util import Util
from stetl.filter import Filter
from stetl.packet import FORMAT
log = Util.get_log("bagfilter")
class LeveringFilter(Filter):
"""
Convert Leveringsdocument-BAG-Extract.xml content to record for
insertion into nlx_bag_info table.
"""
@Config(ptype=str, default='sleutel', required=False)
def key_column(self):
"""
Column name for key
"""
pass
@Config(ptype=str, default='levering_xml', required=False)
def key_value(self):
"""
Column value for key
"""
pass
@Config(ptype=str, default='waarde', required=False)
def value_column(self):
"""
Column name for value
"""
pass
# Constructor
def __init__(self, configdict, section, consumes=FORMAT.string, produces=FORMAT.record):
Filter.__init__(self, configdict, section, consumes, produces)
def invoke(self, packet):
if packet.data is None or packet.is_end_of_stream():
return packet
with open(packet.data, 'rt') as f:
data = f.read()
record = {
self.key_column: self.key_value,
self.value_column: data,
}
packet.data = record
return packet
To Reproduce
Configuration file:
[etl]
chains = input_bag_zipfile|extract_bag_zipfile|convert_string_to_record|output_postgres_levering
[input_bag_zipfile]
class = stetl.inputs.fileinput.ZipFileInput
file_path = {bag_input_file}
name_filter = Leveringsdocument-BAG-Extract.xml
[extract_bag_zipfile]
class = stetl.filters.fileextractor.ZipFileExtractor
file_path = temp/Leveringsdocument-BAG-Extract.xml
[convert_string_to_record]
class = stetl.filters.bagfilter.LeveringFilter
[output_postgres_levering]
class = stetl.outputs.dboutput.PostgresInsertOutput
database = {pg_db}
host = {pg_host}
port = {pg_port}
user = {pg_user}
password = {password}
schema = {schema}
table = nlx_bag_info
key = sleutel
replace = true
$ PYTHONPATH=../../externals/stetl ./stetl.sh -c conf/levering.cfg
~/git/nlextract/nlextract/bagv2/etl ~/git/nlextract/nlextract/bagv2/etl
INFO: 21-11-29 18:00:43 - Using options_file=options/osiris.args and user_args=-c conf/levering.cfg
2021-11-29 18:00:44,030 util INFO Found lxml.etree, native XML parsing, fabulous!
2021-11-29 18:00:44,094 util INFO Found GDAL/OGR Python bindings, super!!
2021-11-29 18:00:44,097 main INFO Stetl version = 2.1.dev0
2021-11-29 18:00:44,098 main INFO Found args file at: /home/bas/git/nlextract/nlextract/bagv2/etl/options/common.args
2021-11-29 18:00:44,098 main INFO Found args file at: options/osiris.args
2021-11-29 18:00:44,099 ETL INFO INIT - Stetl version is 2.1.dev0
2021-11-29 18:00:44,099 ETL INFO Config/working dir = /home/bas/git/nlextract/nlextract/bagv2/etl/conf
2021-11-29 18:00:44,099 ETL INFO Reading config_file = conf/levering.cfg
2021-11-29 18:00:44,099 ETL INFO Substituting 17 args in config file from args_dict: ['bag_file_wildcard', 'pg_host', 'pg_user', 'spatial_extent', 'srs_opts', 'config_opts', 'multival_opts', 'pg_db', 'feat_per_tx', 'bag_file_ext', 'schema', 'pg_port', 'bag_input_file', 'layer_creation_opts', 'open_opts', 'bag_obj_file_pat', 'password']
2021-11-29 18:00:44,099 ETL INFO Substituting args OK
2021-11-29 18:00:44,100 ETL INFO START
2021-11-29 18:00:44,100 util INFO Timer start: total ETL
2021-11-29 18:00:44,100 chain INFO Assembling Chain: input_bag_zipfile|extract_bag_zipfile|convert_string_to_record|output_postgres_levering...
2021-11-29 18:00:44,103 input INFO cfg = {'class': 'stetl.inputs.fileinput.ZipFileInput', 'file_path': 'test/data/lv/BAGNLDL-15092020-small.zip', 'name_filter': 'Leveringsdocument-BAG-Extract.xml'}
2021-11-29 18:00:44,103 fileinput INFO file_list=['test/data/lv/BAGNLDL-15092020-small.zip']
2021-11-29 18:00:44,115 output INFO cfg = {'class': 'stetl.outputs.dboutput.PostgresInsertOutput', 'database': 'bagcurrent', 'host': 'isis', 'port': '5432', 'user': '<hidden>', 'password': '<hidden>', 'schema': 'bag', 'table': 'nlx_bag_info', 'key': 'sleutel', 'replace': 'true'}
Traceback (most recent call last):
File "/home/bas/git/nlextract/nlextract/externals/stetl/bin/stetl", line 43, in <module>
main()
File "/home/bas/git/nlextract/nlextract/externals/stetl/bin/stetl", line 36, in main
etl.run()
File "/home/bas/git/nlextract/nlextract/externals/stetl/stetl/etl.py", line 154, in run
chain.assemble()
File "/home/bas/git/nlextract/nlextract/externals/stetl/stetl/chain.py", line 90, in assemble
self.add(etl_comp)
File "/home/bas/git/nlextract/nlextract/externals/stetl/stetl/chain.py", line 102, in add
self.cur_comp.add_next(etl_comp)
File "/home/bas/git/nlextract/nlextract/externals/stetl/stetl/component.py", line 155, in add_next
raise ValueError(
ValueError: Incompatible components are linked: <class 'stetl.filters.bagfilter.LeveringFilter'>: in=string out=record and <class 'stetl.outputs.dboutput.PostgresInsertOutput'>: in=record_array out=None
~/git/nlextract/nlextract/bagv2/etl
Expected Behavior
No exception is raised
Context (please complete one or more from the following information):
- OS: Debian unstable
- Python Version: 3.9.9
- Stetl Version: 2.1.dev0
Additional context
A more generic format converter would be nicer, but that doesn't work: #124