@@ -18,6 +18,7 @@ package v1beta1
1818
1919import (
2020 apimeta "k8s.io/apimachinery/pkg/api/meta"
21+ "k8s.io/apimachinery/pkg/api/resource"
2122 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2223)
2324
@@ -58,13 +59,13 @@ type KafkaTopicConfig struct {
5859 // The default policy ("delete") will discard old segments when their retention time or size limit has been reached.
5960 // The "compact" setting will enable log compaction on the topic.
6061 // +optional
61- CleanupPolicy * CleanupPolicy `json:"cleanupPolicy,omitempty"`
62+ CleanupPolicy * string `json:"cleanupPolicy,omitempty"`
6263
6364 // Final compression type for a given topic.
6465 // Supported are standard compression codecs: 'gzip', 'snappy', 'lz4', 'zstd').
6566 // It additionally accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the original compression codec set by the producer.
6667 // +optional
67- CompressionType * CompressionType `json:"compressionType,omitempty"`
68+ CompressionType * string `json:"compressionType,omitempty"`
6869
6970 // The amount of time to retain delete tombstone markers for log compacted topics. Specified in milliseconds.
7071 // This setting also gives a bound on the time in which a consumer must complete a read if they begin from offset 0
@@ -134,7 +135,7 @@ type KafkaTopicConfig struct {
134135 // Define whether the timestamp in the message is message create time or log append time.
135136 // The value should be either `CreateTime` or `LogAppendTime`
136137 // +optional
137- MessageTimestampType * MessageTimestampType `json:"messageTimestampType,omitempty"`
138+ MessageTimestampType * string `json:"messageTimestampType,omitempty"`
138139
139140 // This configuration controls how frequently the log compactor will attempt to clean the log (assuming LogCompaction is enabled).
140141 // By default we will avoid cleaning a log where more than 50% of the log has been compacted.
@@ -144,12 +145,16 @@ type KafkaTopicConfig struct {
144145 // (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) records for at least the MinCompactionLagMs duration,
145146 // or (ii) if the log has had dirty (uncompacted) records for at most the MaxCompactionLagMs period.
146147 // +optional
147- MinCleanableDirtyRatio * int64 `json:"minCleanableDirtyRatio,omitempty"`
148+ MinCleanableDirtyRatio * resource. Quantity `json:"minCleanableDirtyRatio,omitempty"`
148149
149150 // The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.
150151 // +optional
151152 MinCompactionLagMs * int64 `json:"minCompactionLagMs,omitempty"`
152153
154+ // The maximum time a message will remain ineligible for compaction in the log. Only applicable for logs that are being compacted.
155+ // +optional
156+ MaxCompactionLagMs * int64 `json:"maxCompactionLagMs,omitempty"`
157+
153158 // When a producer sets acks to "all" (or "-1"), this configuration specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful.
154159 // If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
155160 // When used together, MinInsyncReplicas and acks allow you to enforce greater durability guarantees.
@@ -194,16 +199,12 @@ type KafkaTopicConfig struct {
194199 UncleanLeaderElectionEnable * bool `json:"uncleanLeaderElectionEnable,omitempty"`
195200}
196201
197- type CleanupPolicy string
198-
199202const (
200203 CleanupPolicyDelete = "delete"
201204 CleanupPolicyCompact = "compact"
202205 CleanupPolicyDeleteCompact = "delete,compact"
203206)
204207
205- type CompressionType string
206-
207208const (
208209 CompressionTypeGZIP = "gzip"
209210 CompressionTypeSnappy = "snappy"
@@ -213,8 +214,6 @@ const (
213214 CompressionTypeProducer = "producer"
214215)
215216
216- type MessageTimestampType string
217-
218217const (
219218 MessageTimestampTypeCreateTime = "CreateTime"
220219 MessageTimestampTypeLogAppendTime = "LogAppendTime"
@@ -319,14 +318,14 @@ func (in *KafkaTopic) GetReplicationFactor() int64 {
319318 return * in .Spec .ReplicationFactor
320319}
321320
322- func (in * KafkaTopic ) GetCleanupPolicy () * CleanupPolicy {
321+ func (in * KafkaTopic ) GetCleanupPolicy () * string {
323322 if in .Spec .KafkaTopicConfig == nil {
324323 return nil
325324 }
326325 return in .Spec .KafkaTopicConfig .CleanupPolicy
327326}
328327
329- func (in * KafkaTopic ) GetCompressionType () * CompressionType {
328+ func (in * KafkaTopic ) GetCompressionType () * string {
330329 if in .Spec .KafkaTopicConfig == nil {
331330 return nil
332331 }
@@ -410,14 +409,14 @@ func (in *KafkaTopic) GetMessageTimestampDifferenceMaxMs() *int64 {
410409 return in .Spec .KafkaTopicConfig .MessageTimestampDifferenceMaxMs
411410}
412411
413- func (in * KafkaTopic ) GetMessageTimestampType () * MessageTimestampType {
412+ func (in * KafkaTopic ) GetMessageTimestampType () * string {
414413 if in .Spec .KafkaTopicConfig == nil {
415414 return nil
416415 }
417416 return in .Spec .KafkaTopicConfig .MessageTimestampType
418417}
419418
420- func (in * KafkaTopic ) GetMinCleanableDirtyRatio () * int64 {
419+ func (in * KafkaTopic ) GetMinCleanableDirtyRatio () * resource. Quantity {
421420 if in .Spec .KafkaTopicConfig == nil {
422421 return nil
423422 }
@@ -431,6 +430,13 @@ func (in *KafkaTopic) GetMinCompactionLagMs() *int64 {
431430 return in .Spec .KafkaTopicConfig .MinCompactionLagMs
432431}
433432
433+ func (in * KafkaTopic ) GetMaxCompactionLagMs () * int64 {
434+ if in .Spec .KafkaTopicConfig == nil {
435+ return nil
436+ }
437+ return in .Spec .KafkaTopicConfig .MaxCompactionLagMs
438+ }
439+
434440func (in * KafkaTopic ) GetMinInsyncReplicas () * int64 {
435441 if in .Spec .KafkaTopicConfig == nil {
436442 return nil
0 commit comments