Skip to content

Source connector fails to write data to topic saying schema is not matching when using avro converters. #538

@sujit-warrier-maersk

Description

@sujit-warrier-maersk

Description

  • When trying to create a new instance of source connector with value converter set to io.confluent.connect.avro.AvroConverter. The lease container is created and a new schema is created and sometimes a few documents are written to the topic and then it fails.

  • There is no schema defined before connector instance is created. but still we get the below error.

Kafka connect env settings:

  CONNECT_BOOTSTRAP_SERVERS: server-1:39094
      CONNECT_GROUP_ID: "kc101-connect"
      CONNECT_CONFIG_STORAGE_TOPIC: "_kc101-connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "_kc101-connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "_kc101-connect-status"
      CONNECT_REPLICATION_FACTOR: 1
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: schema-registry:8081
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: schema-registry:8081
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect-1"
      CONNECT_LISTENERS: http://connect-1:8083
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'

Connect instance settings:

{
 "name": "cosmosdb-source-connector-tp-legs-raw",
 "config": {
   "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
   "tasks.max": "1",
   "key.converter": "org.apache.kafka.connect.storage.StringConverter",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "connect.cosmos.task.poll.interval": "100",
   "connect.cosmos.connection.endpoint": "{{CosmosDbUrl}}",
   "connect.cosmos.master.key": "{{CosmosDnApiKey}}",
   "connect.cosmos.databasename": "Preplanning",
   "connect.cosmos.containers.topicmap": "transport-plan-raw1#transport-plan-raw-events",
   "connect.cosmos.offset.useLatest": false,
   "value.converter.schemas.enable": "true",
   "key.converter.schemas.enable": "false",
   "value.converter.schema.registry.url": "http://schema-registry:8081",
   "key.converter.schema.registry.url": "http://schema-registry:8081"
 }
}

Error Message: org.apache.kafka.common.config.ConfigException: Failed to access Avro data from topic transport-plan-raw1 : Schema being registered is incompatible with an earlier schema for subject "transport-plan-raw1-value", details:

[
  {
    "errorType": "NAME_MISMATCH",
    "description": "The name of the schema has changed (path '/name')",
    "additionalInfo": "expected: inferred_name_2092778255"
  },
  {
    "errorType": "NAME_MISMATCH",
    "description": "The name of the schema has changed (path '/fields/11/type/name')",
    "additionalInfo": "expected: inferred_name__1187958375"
  },
  {
    "errorType": "NAME_MISMATCH",
    "description": "The name of the schema has changed (path '/fields/11/type/fields/6/type/name')",
    "additionalInfo": "expected: inferred_name_762902212"
  },
  {
    "errorType": "NAME_MISMATCH",
    "description": "The name of the schema has changed (path '/fields/11/type/fields/6/type/name')",
    "additionalInfo": "expected: inferred_name_762902212"
  },
  {
    "errorType": "NAME_MISMATCH",
    "description": "The name of the schema has changed (path '/fields/11/type/fields/6/type/name')",
    "additionalInfo": "expected: inferred_name_762902212"
  },
  {
    "errorType": "NAME_MISMATCH",
    "description": "The name of the schema has changed (path '/fields/11/type/fields/9/type/name')",
    "additionalInfo": "expected: inferred_name_762902212"
  },
  {
    "errorType": "MISSING_UNION_BRANCH",
    "description": "The new schema is missing a type inside a union field at path '/fields/11/type/fields/10/type/1' in the old schema",
    "additionalInfo": "reader union lacking writer type: ARRAY"
  },
  { "oldSchemaVersion": 1 },
  {
    "oldSchema": "{\"type\":\"record\",\"name\":\"inferred_name_2092778255\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"messageKey\",\"type\":\"string\"},{\"name\":\"shipmentNumber\",\"type\":\"string\"},{\"name\":\"shipmentIdentifier\",\"type\":\"string\"},{\"name\":\"shipmentVersionIdentifier\",\"type\":\"string\"},{\"name\":\"shipmentVersionCreatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentVersionUpdatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentStructureUpdateDatetime\",\"type\":\"string\"},{\"name\":\"containerServiceTypeLoadService\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_586916356\",\"fields\":[{\"name\":\"containerServiceTypeLoadService\",\"type\":\"string\"}],\"connect.name\":\"inferred_name_586916356\"}},{\"name\":\"containerServiceTypeDischargeService\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_1295212612\",\"fields\":[{\"name\":\"containerServiceTypeDischargeService\",\"type\":\"string\"}],\"connect.name\":\"inferred_name_1295212612\"}},{\"name\":\"cargoServiceType\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_1290169462\",\"fields\":[{\"name\":\"cargoServiceType\",\"type\":\"string\"}],\"connect.name\":\"inferred_name_1290169462\"}},{\"name\":\"shipmentRoute\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__1187958375\",\"fields\":[{\"name\":\"shipmentRouteIdentifier\",\"type\":\"string\"},{\"name\":\"shipmentRouteTypeCode\",\"type\":\"string\"},{\"name\":\"shipmentRouteTypeName\",\"type\":\"string\"},{\"name\":\"tradeLaneName\",\"type\":\"string\"},{\"name\":\"shipmentRouteCreatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentRouteUpdatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentRoutePointPlaceOfReceipt\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_762902212\",\"fields\":[{\"name\":\"routePointIdentifier\",\"type\":\"string\"},{\"name\":\"site\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_1587501840\",\"fields\":[{\"name\":\"siteIdentifier\",\"type\":\"string\"},{\"name\":\"definedAreaCode\",\"type\":\"string\"},{\"name\":\"definedAreaName\",\"type\":\"string\"},{\"name\":\"definedAreaTypeCode\",\"type\":\"string\"},{\"name\":\"definedAreaRelationshipTypeCodeParentArea\",\"type\":\"string\"},{\"name\":\"operationalFacility\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__1582750289\",\"fields\":[{\"name\":\"operationalFacilityIdentifier\",\"type\":\"string\"},{\"name\":\"facilityCode\",\"type\":\"string\"},{\"name\":\"facilityName\",\"type\":\"string\"},{\"name\":\"facilityType\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_1888608854\",\"fields\":[{\"name\":\"facilityTypeCode\",\"type\":\"string\"},{\"name\":\"isChildOf\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__1616517423\",\"fields\":[{\"name\":\"facilityTypeCode\",\"type\":\"string\"},{\"name\":\"isChildOfL1\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__2136641715\",\"fields\":[{\"name\":\"facilityTypeCode\",\"type\":\"string\"}],\"connect.name\":\"inferred_name__2136641715\"}}],\"connect.name\":\"inferred_name__1616517423\"}}],\"connect.name\":\"inferred_name_1888608854\"}}],\"connect.name\":\"inferred_name__1582750289\"}}],\"connect.name\":\"inferred_name_1587501840\"}},{\"name\":\"city\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__666696926\",\"fields\":[{\"name\":\"cityIdentifier\",\"type\":\"string\"},{\"name\":\"definedAreaCode\",\"type\":\"string\"},{\"name\":\"definedAreaName\",\"type\":\"string\"},{\"name\":\"definedAreaTypeCode\",\"type\":\"string\"},{\"name\":\"definedAreaRelationshipTypeCodeParentArea\",\"type\":\"string\"}],\"connect.name\":\"inferred_name__666696926\"}},{\"name\":\"territorialUnit\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__275936916\",\"fields\":[{\"name\":\"territorialUnitIdentifier\",\"type\":\"string\"},{\"name\":\"definedAreaCode\",\"type\":\"string\"},{\"name\":\"definedAreaName\",\"type\":\"string\"},{\"name\":\"definedAreaTypeCode\",\"type\":\"string\"},{\"name\":\"isoSubdivisionCategory\",\"type\":\"string\"},{\"name\":\"definedAreaRelationshipTypeCodeParentArea\",\"type\":\"string\"}],\"connect.name\":\"inferred_name__275936916\"}},{\"name\":\"country\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_206982570\",\"fields\":[{\"name\":\"countryIdentifier\",\"type\":\"string\"},{\"name\":\"definedAreaCode\",\"type\":\"string\"},{\"name\":\"definedAreaName\",\"type\":\"string\"},{\"name\":\"definedAreaTypeCode\",\"type\":\"string\"}],\"connect.name\":\"inferred_name_206982570\"}},{\"name\":\"locationFunctionCode\",\"type\":\"string\"},{\"name\":\"shipmentRoutePointCreatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentRoutePointUpdatedDatetime\",\"type\":\"string\"}],\"connect.name\":\"inferred_name_762902212\"}},{\"name\":\"shipmentRoutePointFirstVesselLoad\",\"type\":\"inferred_name_762902212\"},{\"name\":\"shipmentRoutePointFinalVesselDischarge\",\"type\":\"inferred_name_762902212\"},{\"name\":\"shipmentRoutePointPlaceOfDelivery\",\"type\":\"inferred_name_762902212\"},{\"name\":\"shipmentRouteLinks\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"inferred_name__1469213405\",\"fields\":[{\"name\":\"shipmentRouteLinkIdentifier\",\"type\":\"string\"},{\"name\":\"shipmentRoutePointStartingPoint\",\"type\":\"inferred_name_762902212\"},{\"name\":\"shipmentRoutePointEndingPoint\",\"type\":\"inferred_name_762902212\"},{\"name\":\"shipmentRouteLinkSequenceNumber\",\"type\":\"string\"},{\"name\":\"routeLinkProductIdentifier\",\"type\":\"long\"},{\"name\":\"shipmentRoutingType\",\"type\":\"string\"},{\"name\":\"transportModeCode\",\"type\":\"string\"},{\"name\":\"transportModeName\",\"type\":\"string\"},{\"name\":\"waterAirLand\",\"type\":\"string\"},{\"name\":\"estimatedTimeOfDepartureLocal\",\"type\":\"string\"},{\"name\":\"estimatedTimeOfDeparture\",\"type\":\"string\"},{\"name\":\"estimatedTimeOfArrivalLocal\",\"type\":\"string\"},{\"name\":\"estimatedTimeOfArrival\",\"type\":\"string\"},{\"name\":\"voyageNumberStartsAt\",\"type\":\"string\"},{\"name\":\"voyageNumberEndsAt\",\"type\":\"string\"},{\"name\":\"vesselSchedule\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_1720401109\",\"fields\":[{\"name\":\"vesselCode\",\"type\":\"string\"},{\"name\":\"vesselName\",\"type\":\"string\"},{\"name\":\"alternativeCodes\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__1505900296\",\"fields\":[{\"name\":\"rksT_CARRIER_ID\",\"type\":\"string\"},{\"name\":\"rksT_CARRIER_NAME\",\"type\":\"string\"},{\"name\":\"rksT_SERVICE_CODE\",\"type\":\"string\"},{\"name\":\"rksT_SERVICE_NAME\",\"type\":\"string\"}],\"connect.name\":\"inferred_name__1505900296\"}}],\"connect.name\":\"inferred_name_1720401109\"}},{\"name\":\"shipmentRouteLinkCreatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentRouteLinkUpdatedDatetime\",\"type\":\"string\"}],\"connect.name\":\"inferred_name__1469213405\"},\"connect.name\":\"inferred_name_726885652\"}],\"default\":null}],\"connect.name\":\"inferred_name__1187958375\"}},{\"name\":\"_rid\",\"type\":\"string\"},{\"name\":\"_self\",\"type\":\"string\"},{\"name\":\"_etag\",\"type\":\"string\"},{\"name\":\"_attachments\",\"type\":\"string\"},{\"name\":\"_ts\",\"type\":\"long\"},{\"name\":\"_lsn\",\"type\":\"long\"}],\"connect.name\":\"inferred_name_2092778255\"}\"}, {compatibility: \"BACKWARD"
  }
]

Error code: 409

Expected Behavior

  • Connector instance should start pushing data into topioc

Reproduce

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Additional Context

  • (If applicable: Add any other context about the problem here; for example: proposed solution, doc changes, screenshots, logs, links, etc)

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions