@@ -303,7 +303,7 @@ def __init__(
303303 type_schema = make_avsc_object (atype , names )
304304 except Exception as e :
305305 raise SchemaParseException (
306- f'Type property "{ atype } " not a valid Avro schema. '
306+ f'Type property "{ atype } " not a valid Avro schema: { e } '
307307 ) from e
308308 self .set_prop ("type" , type_schema )
309309 self .set_prop ("name" , name )
@@ -409,8 +409,8 @@ def __init__(
409409 items_schema = make_avsc_object (items , names )
410410 except Exception as err :
411411 raise SchemaParseException (
412- f"Items schema ({ items } ) not a valid Avro schema: (known "
413- f"names: { list (names .names .keys ())} )."
412+ f"Items schema ({ items } ) not a valid Avro schema: { err } . "
413+ f"Known names: { list (names .names .keys ())} )."
414414 ) from err
415415
416416 self .set_prop ("items" , items_schema )
@@ -451,7 +451,7 @@ def __init__(
451451 new_schema = make_avsc_object (schema , names )
452452 except Exception as err :
453453 raise SchemaParseException (
454- f"Union item must be a valid Avro schema: { schema } "
454+ f"Union item must be a valid Avro schema: { err } ; { schema } , "
455455 ) from err
456456 # check the new schema
457457 if (
@@ -477,7 +477,7 @@ class RecordSchema(NamedSchema):
477477 def make_field_objects (field_data : List [PropsType ], names : Names ) -> List [Field ]:
478478 """We're going to need to make message parameters too."""
479479 field_objects = [] # type: List[Field]
480- field_names = [] # type: List [str]
480+ parsed_fields : Dict [str , PropsType ] = {}
481481 for field in field_data :
482482 if hasattr (field , "get" ) and callable (field .get ):
483483 atype = field .get ("type" )
@@ -504,10 +504,15 @@ def make_field_objects(field_data: List[PropsType], names: Names) -> List[Field]
504504 atype , name , has_default , default , order , names , doc , other_props
505505 )
506506 # make sure field name has not been used yet
507- if new_field .name in field_names :
508- fail_msg = f"Field name { new_field .name } already in use."
509- raise SchemaParseException (fail_msg )
510- field_names .append (new_field .name )
507+ if new_field .name in parsed_fields :
508+ old_field = parsed_fields [new_field .name ]
509+ if not is_subtype (old_field ["type" ], field ["type" ]):
510+ raise SchemaParseException (
511+ f"Field name { new_field .name } already in use with "
512+ "incompatible type. "
513+ f"{ field ['type' ]} vs { old_field ['type' ]} ."
514+ )
515+ parsed_fields [new_field .name ] = field
511516 else :
512517 raise SchemaParseException (f"Not a valid field: { field } " )
513518 field_objects .append (new_field )
@@ -655,3 +660,62 @@ def make_avsc_object(json_data: JsonDataType, names: Optional[Names] = None) ->
655660 # not for us!
656661 fail_msg = f"Could not make an Avro Schema object from { json_data } ."
657662 raise SchemaParseException (fail_msg )
663+
664+
665+ def is_subtype (existing : PropType , new : PropType ) -> bool :
666+ """Checks if a new type specification is compatible with an existing type spec."""
667+ if existing == new :
668+ return True
669+ if isinstance (existing , list ) and (new in existing ):
670+ return True
671+ if existing == "Any" :
672+ if new is None or new == [] or new == ["null" ] or new == "null" :
673+ return False
674+ if isinstance (new , list ) and "null" in new :
675+ return False
676+ return True
677+ if (
678+ isinstance (existing , dict )
679+ and "type" in existing
680+ and existing ["type" ] == "array"
681+ and isinstance (new , dict )
682+ and "type" in new
683+ and new ["type" ] == "array"
684+ ):
685+ return is_subtype (existing ["items" ], new ["items" ])
686+ if (
687+ isinstance (existing , dict )
688+ and "type" in existing
689+ and existing ["type" ] == "enum"
690+ and isinstance (new , dict )
691+ and "type" in new
692+ and new ["type" ] == "enum"
693+ ):
694+ return is_subtype (existing ["symbols" ], new ["symbols" ])
695+ if (
696+ isinstance (existing , dict )
697+ and "type" in existing
698+ and existing ["type" ] == "record"
699+ and isinstance (new , dict )
700+ and "type" in new
701+ and new ["type" ] == "record"
702+ ):
703+ for new_field in cast (List [Dict [str , Any ]], new ["fields" ]):
704+ new_field_missing = True
705+ for existing_field in cast (List [Dict [str , Any ]], existing ["fields" ]):
706+ if new_field ["name" ] == existing_field ["name" ]:
707+ if not is_subtype (existing_field ["type" ], new_field ["type" ]):
708+ return False
709+ new_field_missing = False
710+ if new_field_missing :
711+ return False
712+ return True
713+ if isinstance (existing , list ) and isinstance (new , list ):
714+ missing = False
715+ for _type in new :
716+ if _type not in existing and (
717+ not is_subtype (existing , cast (PropType , _type ))
718+ ):
719+ missing = True
720+ return not missing
721+ return False
0 commit comments