Skip to content

Avro Serialisation is not working #98

@ibalachandar86

Description

@ibalachandar86

Hi,

I am using FsSourceConnector Kafka connector to ingest CSV files into a Kafka topic.
I am using confluentinc/cp-helm-charts, with custom build docker image for Kafka connect (Added FsSourceConnector connector jar).
I have mentioned the prerequisites, Kafka Connect and Kafka Connector details below.

Problem Statement:
The below Kafka connector is working and I am able to ingest CSV in to Kafka Topic as a string. My goal is to Avro serialise the CSV data and store it in topics. I am not sure which serialisation configuration is missing in my connect/connector properties.

Prerequisites:
I have placed the CSV file in the kafka connect pod directory. Created a schema in confluent schema registry for the csv.

Below is the Kafka connect details,
cp-control-center:
enabled: false

cp-kafka:
enabled: true

cp-kafka-rest:
enabled: false

cp-ksql-server:
enabled: false

cp-schema-registry:
enabled: true

cp-zookeeper:
enabled: true

cp-kafka-connect:
replicaCount: 1

image: localhost:5000/kc
imageTag: v1
imagePullPolicy: Always

servicePort: 8083

configurationOverrides:
“key.converter”: “io.confluent.connect.avro.AvroConverter”
“key.converter.schema.registry.url”: “test-cp-schema-registry:8081”
“value.converter”: “io.confluent.connect.avro.AvroConverter”
“value.converter.schema.registry.url”: “test-cp-schema-registry:8081”
“key.converter.schemas.enable”: “false”
“value.converter.schemas.enable”: “false”
“internal.key.converter”: “org.apache.kafka.connect.json.JsonConverter”
“internal.value.converter”: “org.apache.kafka.connect.json.JsonConverter”
“use.latest.version”: “true”
“auto.register.schemas”: “false”
“auto.create.topics”: “false”
“config.storage.replication.factor”: “1”
“offset.storage.replication.factor”: “1”
“status.storage.replication.factor”: “1”
“plugin.path”: “/usr/share/java,/usr/share/confluent-hub-components,/etc/kafka-connect/jars”

heapOptions: “-Xms5g -Xmx10g”

customEnv:
KAFKA_JMX_HOSTNAME: “127.0.0.1”

kafka:
bootstrapServers: “test-cp-kafka-headless:9092”

cp-schema-registry:
url: “test-cp-schema-registry:8081”

fullnameOverride: test

Below is the Kafka connector details:
curl -X POST \ http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-d '
{ "name": "sample",
"config": {
"connector.class": "com.github.mmolimar.kafka.connect.fs.FsSourceConnector",
"tasks.max": "1",
"fs.uris": "/home/appuser/csv",
"topic": "sampledata",
"use.latest.version": "true",
"auto.register.schemas": "false",
"poll.interval.ms": "10000",
"auto.create.topics": "false",
"policy.class": "com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy",
"policy.batch_size": "0",
"policy.recursive": "true",
"policy.regexp": "^*.csv$",
"policy.resume.on.error": "false",
"key.converter.schema.registry.url": "http://test-cp-schema-registry:8081",
"key.enhanced.avro.schema.support": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://test-cp-schema-registry:8081", "value.enhanced.avro.schema.support": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"file_reader.delimited.settings.format.quote": """,
"file_reader.delimited.settings.escape_unquoted": "false",
"file_reader.class": "com.github.mmolimar.kafka.connect.fs.file.reader.CsvFileReader", "file_reader.delimited.compression.type": "none",
"file_reader.delimited.settings.schema.avro": "{"type":"record","name":"sampledata","namespace":"default","fields":[{"name":"c1","type":"string"},{"name":"c2","type":"string"},{"name":"c3","type":"string"}]}", "file_reader.delimited.settings.delimiter_detection": "false",
"file_reader.delimited.compression.concatenated": "true",
"file_reader.delimited.settings.format.comment": "#",
"file_reader.delimited.settings.format.quote_escape": """,
"file_reader.delimited.settings.format.delimiter": ",",
"file_reader.encryption.passphrase": "",
"file_reader.delimited.settings.max_chars_per_column": "4096", "file_reader.delimited.settings.line_separator_detection": "false", "file_reader.delimited.settings.format.line_separator": "\n",
"file_reader.delimited.settings.max_columns": "512",
"file_reader.encryption.type": "NONE",
"file_reader.delimited.settings.header": "true",
"file_reader.delimited.settings.ignore_leading_whitespaces": "true",
"file_reader.delimited.settings.rows_to_skip": "0",
"file_reader.batch_size": "0",
"file_reader.encryption.secret": ""
} }'

CSV file:
c1,c2,c3
abc,def,ghi
jkl,mno,pqr
stu,wvy,xyz
x1,x2,x3

Schema in Schema Registry:
{"subject":"sampledata-value","version":1,"id":1,"schema":"{"type":"record","name":"sampledata","namespace":"default","fields":[{"name":"c1","type":"string"},{"name":"c2","type":"string"},{"name":"c3","type":"string"}]}"}

Data in Topic:
/bin/kafka-console-consumer --topic sampledata --from-beginning --bootstrap-server cef-cp-kafka-headless:9092
abcdefghi
jklmnopqr
stuwvyxyz
x1x2x3

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions