Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
// Licensed under the MIT License.
package com.azure.cosmos.spark

import com.azure.cosmos.{CosmosAsyncClient, ReadConsistencyStrategy, SparkBridgeInternal}
import com.azure.cosmos.spark.diagnostics.LoggerHelper
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
import org.apache.spark.sql.connector.expressions.{Expression, Expressions, NullOrdering, SortDirection, SortOrder}
import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
import org.apache.spark.sql.connector.write.{BatchWrite, Write, WriteBuilder}
import org.apache.spark.sql.connector.write.{BatchWrite, RequiresDistributionAndOrdering, Write, WriteBuilder}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand Down Expand Up @@ -46,7 +49,7 @@ private class ItemsWriterBuilder
diagnosticsConfig,
sparkEnvironmentInfo)

private class CosmosWrite extends Write {
private class CosmosWrite extends Write with RequiresDistributionAndOrdering {

private[this] val supportedCosmosMetrics: Array[CustomMetric] = {
Array(
Expand All @@ -56,22 +59,127 @@ private class ItemsWriterBuilder
)
}

// Extract userConfig conversion to avoid repeated calls
private[this] val userConfigMap = userConfig.asCaseSensitiveMap().asScala.toMap

private[this] val writeConfig = CosmosWriteConfig.parseWriteConfig(
userConfigMap,
inputSchema
)

private[this] val containerConfig = CosmosContainerConfig.parseCosmosContainerConfig(
userConfigMap
)

override def toBatch(): BatchWrite =
new ItemsBatchWriter(
userConfig.asCaseSensitiveMap().asScala.toMap,
userConfigMap,
inputSchema,
cosmosClientStateHandles,
diagnosticsConfig,
sparkEnvironmentInfo)

override def toStreaming: StreamingWrite =
new ItemsBatchWriter(
userConfig.asCaseSensitiveMap().asScala.toMap,
userConfigMap,
inputSchema,
cosmosClientStateHandles,
diagnosticsConfig,
sparkEnvironmentInfo)

override def supportedCustomMetrics(): Array[CustomMetric] = supportedCosmosMetrics

override def requiredDistribution(): Distribution = {
if (writeConfig.bulkEnabled && writeConfig.bulkTransactional) {
log.logInfo("Transactional batch mode enabled - configuring data distribution by partition key columns")
// For transactional writes, partition by all partition key columns
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: info log

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to info log level (in constructor).

val partitionKeyPaths = getPartitionKeyColumnNames()
if (partitionKeyPaths.nonEmpty) {
// Use public Expressions.column() factory - returns NamedReference
val clustering = partitionKeyPaths.map(path => Expressions.column(path): Expression).toArray
Distributions.clustered(clustering)
} else {
Distributions.unspecified()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wraning/error log - pk def should never have 0 columns. At elast log woudl be good in case this happens in the wild (could in theory for extremely old containers but containers for years use artificial PK)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added error logging for edge cases, when:

  • partition key definition has 0 columns
  • partition key definition is null

}
} else {
Distributions.unspecified()
}
}

override def requiredOrdering(): Array[SortOrder] = {
if (writeConfig.bulkEnabled && writeConfig.bulkTransactional) {
// For transactional writes, order by all partition key columns (ascending)
val partitionKeyPaths = getPartitionKeyColumnNames()
if (partitionKeyPaths.nonEmpty) {
partitionKeyPaths.map { path =>
// Use public Expressions.sort() factory for creating SortOrder
Expressions.sort(
Expressions.column(path),
SortDirection.ASCENDING,
NullOrdering.NULLS_FIRST
)
}.toArray
} else {
Array.empty[SortOrder]
}
} else {
Array.empty[SortOrder]
}
}

private def getPartitionKeyColumnNames(): Seq[String] = {
try {
Loan(
List[Option[CosmosClientCacheItem]](
Some(createClientForPartitionKeyLookup())
))
.to(clientCacheItems => {
val container = ThroughputControlHelper.getContainer(
userConfigMap,
containerConfig,
clientCacheItems(0).get,
None
)

// Simplified retrieval using SparkBridgeInternal directly
val containerProperties = SparkBridgeInternal.getContainerPropertiesFromCollectionCache(container)
val partitionKeyDefinition = containerProperties.getPartitionKeyDefinition

extractPartitionKeyPaths(partitionKeyDefinition)
})
} catch {
case ex: Exception =>
log.logWarning(s"Failed to get partition key definition for transactional writes: ${ex.getMessage}")
Seq.empty[String]
}
}

private def createClientForPartitionKeyLookup(): CosmosClientCacheItem = {
CosmosClientCache(
CosmosClientConfiguration(
userConfigMap,
ReadConsistencyStrategy.EVENTUAL,
sparkEnvironmentInfo
),
Some(cosmosClientStateHandles.value.cosmosClientMetadataCaches),
"ItemsWriterBuilder-PKLookup"
)
}

private def extractPartitionKeyPaths(partitionKeyDefinition: com.azure.cosmos.models.PartitionKeyDefinition): Seq[String] = {
if (partitionKeyDefinition != null && partitionKeyDefinition.getPaths != null) {
val paths = partitionKeyDefinition.getPaths.asScala
if (paths.isEmpty) {
log.logError("Partition key definition has 0 columns - this should not happen for modern containers")
}
paths.map(path => {
// Remove leading '/' from partition key path (e.g., "/pk" -> "pk")
if (path.startsWith("/")) path.substring(1) else path
}).toSeq
} else {
log.logError("Partition key definition is null - this should not happen for modern containers")
Seq.empty[String]
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
| `spark.cosmos.write.point.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size |
| `spark.cosmos.write.bulk.maxPendingOperations` | None | Cosmos DB Item Write bulk mode maximum pending operations. Defines a limit of bulk operations being processed concurrently. If not specified it will be determined based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of `1000 x Cores` |
| `spark.cosmos.write.bulk.enabled` | `true` | Cosmos DB Item Write bulk enabled |
| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). |
| `spark.cosmos.write.bulk.transactional` | `false` | Enable transactional batch mode for bulk writes. When enabled, all operations for the same partition key are executed atomically (all succeed or all fail). Requires ordering and clustering by partition key columns. Only supports upsert operations. Cannot exceed 100 operations or 2MB per partition key. **Note**: For containers using hierarchical partition keys (HPK), transactional scope applies only to **logical partitions** (complete partition key paths), not partial top-level keys. See [Transactional Batch documentation](https://learn.microsoft.com/azure/cosmos-db/nosql/transactional-batch) for details. |\n| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). |
| `spark.cosmos.write.bulk.initialBatchSize` | `100` | Cosmos DB initial bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the initial micro batch size is 100. Reduce this when you want to avoid that the first few requests consume too many RUs. |
| `spark.cosmos.write.bulk.maxBatchSize` | `100` | Cosmos DB max. bulk micro batch size - a micro batch will be flushed to the backend when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch size is getting automatically tuned based on the throttling rate. By default the max. micro batch size is 100. Use this setting only when migrating Spark 2.4 workloads - for other scenarios relying on the auto-tuning combined with throughput control will result in better experience. |
| `spark.cosmos.write.flush.noProgress.maxIntervalInSeconds` | `180` | The time interval in seconds that write operations will wait when no progress can be made for bulk writes before forcing a retry. The retry will reinitialize the bulk write process - so, any delays on the retry can be sure to be actual service issues. The default value of 3 min should be sufficient to prevent false negatives when there is a short service-side write unavailability - like for partition splits or merges. Increase it only if you regularly see these transient errors to exceed a time period of 180 seconds. |
Expand Down
Loading
Loading