-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Description
Background
Currently we want to make a comprehensive control for our Kafka cluster, like topic creation policy by using SASL authentication to authenticate with our Kafka cluster. CAP does create the topic automatically by default. AFAIK, you should be able to disable this behavior by setting the librdkafka allow.auto.create.topics
to false
.
Issue
So, we have changed the allow.auto.create.topics
like this in our app:
builder.Services.AddCap(x =>
{
x.UseInMemoryController();
x.UseKafka(kafkaOptions =>
{
kafkaOptions.MainConfig.Add("allow.auto.create.topics", "false");
kafkaOptions.MainConfig.Add("security.protocol", "sasl_plaintext");
kafkaOptions.MainConfig.Add("sasl.mechanism", "SCRAM-SHA-256");
kafkaOptions.MainConfig.Add("sasl.username", "dotnet-app-user");
kafkaOptions.MainConfig.Add("sasl.password", "helloworld");
kafkaOptions.Servers = "localhost:9092";
});
});
And with this consumer:
[ApiController]
[Route("[controller]")]
public class ConsumerController : ControllerBase
{
private readonly ILogger<ConsumerController> _logger;
public ConsumerController(ILogger<ConsumerController> logger)
{
_logger = logger;
}
[NonAction]
[CapSubscribe("dotnet-test")]
public void ReceiveMessage(string message)
{
_logger.LogInformation("Received message: {Message}", message);
}
}
But it still trying to create the topics automatically with these exceptions:
fail: DotNetCore.CAP.Internal.ConsumerRegister[0]
Kafka client consume error. --> An error was encountered when automatically creating topic! -->An error occurred creating topics: [dotnet-test]: [Unauthorized].
info: DotNetCore.CAP.Processor.CapProcessingServer[0]
Starting the processing server.
fail: DotNetCore.CAP.Internal.ConsumerRegister[0]
FindCoordinator response error: Broker: Group authorization failed
Confluent.Kafka.ConsumeException: FindCoordinator response error: Broker: Group authorization failed
at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
at Confluent.Kafka.Consumer`2.Consume(TimeSpan timeout)
at DotNetCore.CAP.Kafka.KafkaConsumerClient.Listening(TimeSpan timeout, CancellationToken cancellationToken)
at DotNetCore.CAP.Internal.ConsumerRegister.<>c__DisplayClass21_1.<Execute>b__2()
Analysis
The primary suspect based on the exception is on this line: https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs#L58
Question
Can we actually just set allow.auto.create.topics
to false
or we have to use another workaround to disable the automatic topic creation?