Skip to content

Disable Kafka Automatic Topic Creation #1738

@kl-tie

Description

@kl-tie

Background

Currently we want to make a comprehensive control for our Kafka cluster, like topic creation policy by using SASL authentication to authenticate with our Kafka cluster. CAP does create the topic automatically by default. AFAIK, you should be able to disable this behavior by setting the librdkafka allow.auto.create.topics to false.

Issue

So, we have changed the allow.auto.create.topics like this in our app:

builder.Services.AddCap(x =>
{
    x.UseInMemoryController();
    x.UseKafka(kafkaOptions =>
    {
        kafkaOptions.MainConfig.Add("allow.auto.create.topics", "false");
        kafkaOptions.MainConfig.Add("security.protocol", "sasl_plaintext");
        kafkaOptions.MainConfig.Add("sasl.mechanism", "SCRAM-SHA-256");
        kafkaOptions.MainConfig.Add("sasl.username", "dotnet-app-user");
        kafkaOptions.MainConfig.Add("sasl.password", "helloworld");
        kafkaOptions.Servers = "localhost:9092";
    });   
});

And with this consumer:

[ApiController]
[Route("[controller]")]
public class ConsumerController : ControllerBase
{
    private readonly ILogger<ConsumerController> _logger;

    public ConsumerController(ILogger<ConsumerController> logger)
    {
        _logger = logger;
    }

    [NonAction]
    [CapSubscribe("dotnet-test")]
    public void ReceiveMessage(string message)
    {
        _logger.LogInformation("Received message: {Message}", message);
    }
}

But it still trying to create the topics automatically with these exceptions:

fail: DotNetCore.CAP.Internal.ConsumerRegister[0]
      Kafka client consume error. --> An error was encountered when automatically creating topic! -->An error occurred creating topics: [dotnet-test]: [Unauthorized].
info: DotNetCore.CAP.Processor.CapProcessingServer[0]
      Starting the processing server.
fail: DotNetCore.CAP.Internal.ConsumerRegister[0]
      FindCoordinator response error: Broker: Group authorization failed
      Confluent.Kafka.ConsumeException: FindCoordinator response error: Broker: Group authorization failed
         at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
         at Confluent.Kafka.Consumer`2.Consume(TimeSpan timeout)
         at DotNetCore.CAP.Kafka.KafkaConsumerClient.Listening(TimeSpan timeout, CancellationToken cancellationToken)
         at DotNetCore.CAP.Internal.ConsumerRegister.<>c__DisplayClass21_1.<Execute>b__2()

Analysis

The primary suspect based on the exception is on this line: https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs#L58

Question

Can we actually just set allow.auto.create.topics to false or we have to use another workaround to disable the automatic topic creation?

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions