diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 57a2d9a658b1d..c8b7e7ee7bf9f 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2382,11 +2382,9 @@ class ReplicaManager(val config: KafkaConfig, partitions: Set[Partition], correlationId: Int, topicIds: String => Option[Uuid]): Unit = { - val traceLoggingEnabled = stateChangeLogger.isTraceEnabled - try { if (isShuttingDown.get()) { - if (traceLoggingEnabled) { + if (stateChangeLogger.isTraceEnabled) { partitions.foreach { partition => stateChangeLogger.trace(s"Skipped the update topic ID step of the become-follower state " + s"change with correlation id $correlationId from controller $controllerId epoch $controllerEpoch for " + diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 40892bca38c92..8465a80207087 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -128,9 +128,21 @@ object StorageTool extends Logging { setIgnoreFormatted(namespace.getBoolean("ignore_formatted")). setControllerListenerName(config.controllerListenerNames.head). setMetadataLogDirectory(config.metadataLogDir) - Option(namespace.getString("release_version")).foreach( - releaseVersion => formatter. - setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion))) + def metadataVersionsToString(first: MetadataVersion, last: MetadataVersion): String = { + val versions = MetadataVersion.VERSIONS.slice(first.ordinal, last.ordinal + 1) + versions.map(_.toString).mkString(", ") + } + Option(namespace.getString("release_version")).foreach(releaseVersion => { + try { + formatter.setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion)) + } catch { + case _: Throwable => + throw new TerseFailure(s"Unknown metadata.version $releaseVersion. Supported metadata.version are " + + s"${metadataVersionsToString(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestProduction())}") + } + }) + + Option(namespace.getList[String]("feature")).foreach( featureNamesAndLevels(_).foreachEntry { (k, v) => formatter.setFeatureLevel(k, v) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index a09e1f6ebd282..cd09c3069db2b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -186,6 +186,32 @@ class ReplicaManagerTest { } } + @Test + def testHighWaterMarkDirectoryMapping2(): Unit = { + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_))) + val rm = new ReplicaManager( + metrics = metrics, + config = config, + time = time, + scheduler = new MockScheduler(time), + logManager = mockLogMgr, + quotaManagers = quotaManager, + metadataCache = MetadataCache.kRaftMetadataCache(config.brokerId, () => KRaftVersion.KRAFT_VERSION_0), + logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size), + alterPartitionManager = alterPartitionManager) + try { + val partition = rm.createPartition(new TopicPartition(topic, 1)) + partition.createLogIfNotExists(isNew = false, isFutureReplica = false, + new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None) + rm.checkpointHighWatermarks() + config.logDirs.map(s => Paths.get(s, ReplicaManager.HighWatermarkFilename)) + .foreach(checkpointFile => assertTrue(Files.exists(checkpointFile), + s"checkpoint file does not exist at $checkpointFile")) + } finally { + rm.shutdown(checkpointHW = false) + } + } + @Test def testHighwaterMarkRelativeDirectoryMapping(): Unit = { val props = TestUtils.createBrokerConfig(1) diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 9fde243ec1997..18c9c88a7fd81 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -58,6 +58,20 @@ class StorageToolTest { val testingFeatures = Feature.FEATURES.toList.asJava + @Test + def testFormatWithUnsupportedReleaseVersion(): Unit = { + val availableDirs = Seq(TestUtils.tempDir()) + val properties = new Properties() + properties.putAll(defaultStaticQuorumProperties) + properties.setProperty("log.dirs", availableDirs.mkString(",")) + val stream = new ByteArrayOutputStream() + val failure = assertThrows(classOf[TerseFailure], () => + runFormatCommand(stream, properties, Seq("--release-version", "3.3-IV1"))).getMessage + assertTrue(failure.contains("Unknown metadata.version 3.3-IV1")) + assertTrue(failure.contains(MetadataVersion.MINIMUM_VERSION.version)) + assertTrue(failure.contains(MetadataVersion.latestProduction().version)) + } + @Test def testConfigToLogDirectories(): Unit = { val config = new KafkaConfig(newSelfManagedProperties())