-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Add transactional batch support for Cosmos DB Spark connector #47478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add transactional batch support for Cosmos DB Spark connector #47478
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds transactional batch operation support to the Azure Cosmos DB Spark 3 connector, enabling atomic multi-document operations within a single partition key. The implementation introduces a new TransactionalBulkExecutor and TransactionalBulkWriter that leverage Cosmos DB's native transactional batch API.
Key changes:
- New
TransactionalBulkExecutor.javaimplements atomic batch execution with partition key buffering - New
TransactionalBulkWriter.scalaintegrates transactional batches with Spark bulk write pipeline - Spark 3.5+ automatic distribution/ordering via
RequiresDistributionAndOrderinginterface - Configuration options for enabling transactional batch mode
- Comprehensive test coverage for atomic operations, rollback scenarios, and hierarchical partition keys
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| TransactionalBulkExecutor.java | Core batch execution logic with partition key grouping and 100-operation limit enforcement |
| TransactionalBulkWriter.scala | Spark integration layer supporting per-row operation types and retry logic |
| CosmosWriterBase.scala | Writer selection logic to instantiate TransactionalBulkWriter when enabled |
| CosmosConfig.scala | New configuration option WriteBulkEnableTransactions and ItemTransactionalBatch strategy |
| ItemsWriterBuilder.scala | Spark 3.5 integration with automatic data distribution and ordering |
| AsyncItemWriter.scala | Interface extension for per-row operation type support |
| BulkWriter.scala | Updated to implement new interface method (ignores per-row operations) |
| PointWriter.scala | Updated to implement new interface method |
| TransactionalBatchITest.scala | Comprehensive integration tests for various transactional batch scenarios |
| BulkWriterITest.scala | Updated test to pass new required parameter |
| this.maxMicroBatchPayloadSizeInBytes = cosmosBulkOptions.getMaxMicroBatchPayloadSizeInBytes(); | ||
| this.cosmosBulkExecutionOptions = cosmosBulkOptions; | ||
| this.container = container; | ||
| this.bulkSpanName = "nonTransactionalBatch." + this.container.getId(); |
Copilot
AI
Dec 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The span name is misleading - it says "nonTransactionalBatch" but this is the TransactionalBulkExecutor. This should be "transactionalBatch" to accurately reflect the operation type for diagnostics and tracing.
| this.bulkSpanName = "nonTransactionalBatch." + this.container.getId(); | |
| this.bulkSpanName = "transactionalBatch." + this.container.getId(); |
| "Transactional batch operation failed: partition key '%s' has %d operations, " + | ||
| "which exceeds the maximum allowed limit of %d operations per transactional batch. " + | ||
| "Transactional batches require all-or-nothing execution and cannot be split across multiple requests.", | ||
| cosmosBatch.getPartitionKeyValue(), |
Copilot
AI
Dec 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validation error message at line 694-699 correctly identifies the issue but the partition key value might be null which would cause NullPointerException when calling toString(). Consider using String.valueOf(cosmosBatch.getPartitionKeyValue()) or add a null check to prevent NPE in error message formatting.
| cosmosBatch.getPartitionKeyValue(), | |
| String.valueOf(cosmosBatch.getPartitionKeyValue()), |
|
|
||
| String batchTrackingId = UUIDs.nonBlockingRandomUUID().toString(); | ||
| logTraceOrWarning( | ||
| "Executing transactional batch - batch TrackingId: %s", |
Copilot
AI
Dec 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logging format string uses %s placeholders (line 688, 754) which won't work with SLF4J. SLF4J requires {} placeholders. Change the format strings to use {} instead of %s for proper log message formatting.
| "Executing transactional batch - batch TrackingId: %s", | |
| "Executing transactional batch - batch TrackingId: {}", |
| .flatMapMany(response -> { | ||
|
|
||
| logTraceOrWarning( | ||
| "Response for transactional batch - status code %s, ActivityId: %s, batch TrackingId %s", |
Copilot
AI
Dec 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same SLF4J formatting issue - use {} instead of %s for placeholders.
| "Response for transactional batch - status code %s, ActivityId: %s, batch TrackingId %s", | |
| "Response for transactional batch - status code {}, ActivityId: {}, batch TrackingId {}", |
|
|
||
| ItemBulkOperation<?, ?> itemBulkOperation = (ItemBulkOperation<?, ?>) itemOperation; |
Copilot
AI
Dec 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message at line 811 says "Unsupported operationType" but should be more specific. Consider: "Unsupported operation type '{}' for transactional batch. Supported types are: CREATE, UPSERT, REPLACE, DELETE, READ, PATCH."
| val bulkItemOperation = effectiveOperationType match { | ||
| case "create" => | ||
| CosmosBulkOperations.getCreateItemOperation(objectNode, partitionKeyValue, operationContext) | ||
|
|
||
| case "upsert" => | ||
| CosmosBulkOperations.getUpsertItemOperation(objectNode, partitionKeyValue, operationContext) | ||
|
|
||
| case "replace" => | ||
| operationContext.eTag match { | ||
| case Some(eTag) => | ||
| CosmosBulkOperations.getReplaceItemOperation( | ||
| operationContext.itemId, | ||
| objectNode, | ||
| partitionKeyValue, | ||
| new CosmosBulkItemRequestOptions().setIfMatchETag(eTag), | ||
| operationContext) | ||
| case _ => | ||
| CosmosBulkOperations.getReplaceItemOperation( | ||
| operationContext.itemId, | ||
| objectNode, | ||
| partitionKeyValue, | ||
| operationContext) | ||
| } | ||
|
|
||
| case "delete" => | ||
| operationContext.eTag match { | ||
| case Some(eTag) => | ||
| CosmosBulkOperations.getDeleteItemOperation( | ||
| operationContext.itemId, | ||
| partitionKeyValue, | ||
| new CosmosBulkItemRequestOptions().setIfMatchETag(eTag), | ||
| operationContext) | ||
| case _ => | ||
| CosmosBulkOperations.getDeleteItemOperation( | ||
| operationContext.itemId, | ||
| partitionKeyValue, | ||
| operationContext) | ||
| } | ||
|
|
||
| case "patch" => | ||
| getPatchItemOperation( | ||
| operationContext.itemId, | ||
| partitionKeyValue, | ||
| partitionKeyDefinition, | ||
| objectNode, | ||
| operationContext) | ||
|
|
||
| case _ => | ||
| throw new IllegalArgumentException( | ||
| s"Unsupported operationType '$effectiveOperationType'. " + | ||
| s"Supported types for transactional batch: create, upsert, replace, delete") | ||
| } |
Copilot
AI
Dec 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing validation: The operationType parameter could contain invalid values like "patch" when used with transactional batch. The code at line 810-813 only throws an error for operation types that don't match the switch statement, but doesn't validate against the supported transactional batch operation types upfront.
| case "patch" => | ||
| getPatchItemOperation( | ||
| operationContext.itemId, | ||
| partitionKeyValue, | ||
| partitionKeyDefinition, | ||
| objectNode, | ||
| operationContext) | ||
|
|
||
| case _ => | ||
| throw new IllegalArgumentException( | ||
| s"Unsupported operationType '$effectiveOperationType'. " + | ||
| s"Supported types for transactional batch: create, upsert, replace, delete") | ||
| } |
Copilot
AI
Dec 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment states "patch" is supported (line 812), but the PR description and usage examples don't include patch operations in transactional batches. The switch case handles PATCH but doesn't validate whether patch is actually supported in transactional batches. Clarify if patch is supported or remove it from the comment/error messages.
| case "patch" => | |
| getPatchItemOperation( | |
| operationContext.itemId, | |
| partitionKeyValue, | |
| partitionKeyDefinition, | |
| objectNode, | |
| operationContext) | |
| case _ => | |
| throw new IllegalArgumentException( | |
| s"Unsupported operationType '$effectiveOperationType'. " + | |
| s"Supported types for transactional batch: create, upsert, replace, delete") | |
| } | |
| // PATCH is not supported in transactional batches | |
| case "patch" => | |
| throw new IllegalArgumentException( | |
| s"Unsupported operationType 'patch' for transactional batch. " + | |
| s"Supported types for transactional batch: create, upsert, replace, delete") | |
| case _ => | |
| throw new IllegalArgumentException( | |
| s"Unsupported operationType '$effectiveOperationType'. " + | |
| s"Supported types for transactional batch: create, upsert, replace, delete") |
| defaultValue = Option.apply(false), | ||
| mandatory = false, | ||
| parseFromStringFunction = enableTransactionsAsString => enableTransactionsAsString.toBoolean, | ||
| helpMessage = "Cosmos DB Item Write enable transactional batch - requires bulk write to be enabled and Spark 3.5+") |
Copilot
AI
Dec 6, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The help message says "requires... Spark 3.5+" but the PR description states the feature works on Spark 3.3/3.4 (with manual sorting). The help message is misleading. Update to clarify that Spark 3.5+ provides automatic distribution/ordering, while 3.3/3.4 requires manual sorting for optimal performance.
| helpMessage = "Cosmos DB Item Write enable transactional batch - requires bulk write to be enabled and Spark 3.5+") | |
| helpMessage = "Cosmos DB Item Write enable transactional batch - requires bulk write to be enabled. " + | |
| "Spark 3.5+ provides automatic distribution/ordering for transactional batch. " + | |
| "On Spark 3.3/3.4, transactional batch is supported but requires manual sorting for optimal performance.") |
| private final Long maxMicroBatchIntervalInMs; | ||
|
|
||
| private final TContext batchContext; | ||
| private final ConcurrentMap<String, PartitionScopeThresholds> partitionScopeThresholds; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOt needed here (relevant for dynamic tuning of bulk batch size)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
| private final Flux<com.azure.cosmos.models.CosmosItemOperation> inputOperations; | ||
|
|
||
| // Options for bulk execution. | ||
| private final Long maxMicroBatchIntervalInMs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
| private final String operationContextText; | ||
| private final OperationContextAndListenerTuple operationListener; | ||
| private final ThrottlingRetryOptions throttlingRetryOptions; | ||
| private final Flux<com.azure.cosmos.models.CosmosItemOperation> inputOperations; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be of type CosmosBatch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated now.
|
|
||
| private Flux<CosmosBulkOperationResponse<TContext>> executeTransactionalBatch( | ||
| List<CosmosItemOperation> operations, | ||
| PartitionKey partitionKey, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like described on Friday - you have to construct the batches at the edge (in TrasnactionalBulkWriter.scheduleWrite) - this class should only operate on CosmsoBatch not CosmosItemOperation - otherwise there is zero chance to get the retries correctly.
Also, the operationType per row is super useful - but it should be a write strategy (otherwise like you mentioned it won't be idempotent). Which also means to trigger transactional batch - it should not be a new Wrte strategy - but probably some other config)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I completely missed this, implemented this now but removed operationType per row - defer to another PR.
| batchTrackingId); | ||
|
|
||
| // Validate that transactional batch does not exceed Cosmos DB limit | ||
| if (operations.size() > BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thsi validation should not happen client-side (service has account-level config override)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, will change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed this and updated test accordingly
| * @param objectNode the json object node | ||
| * @param operationType optional operation type (create, upsert, replace, delete) for this specific row | ||
| */ | ||
| def scheduleWrite(partitionKeyValue: PartitionKey, objectNode: ObjectNode, operationType: Option[String]): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of operationTpye this should be write strategy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I would model this as required parameter in the existing overload - we have the write strategy from config today and can just populate it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternative is to leave out this change and stick with one global write startegy for nwo - then the chaneg to allow write strategy per row can be done in a separate PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To confirm, you are suggesting either:
- Leaving as a global config for transactional batch (where everything is executed as upsert) and addressing per row in a separate PR
- Re-naming operationType to writeStrategy and making activation of transactionBatch as a global config instead of a new write strategy like ItemTransactionalBatch
If so I would choose the 2nd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or are you saying that to have per row operation type, we have to re-think how this is done, and not merely re-name operationType?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I am assuming the former because for bitemporal use cases, not having operationType per row does not seem to make sense (although we do seem to need something that caters for idempotency properly for failed jobs). At least the semantics/terminology will make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, removed this entirely, defer to another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense - but then this change should be reverted?
| private val pendingReadManyRetries = java.util.concurrent.ConcurrentHashMap.newKeySet[ReadManyOperation]().asScala | ||
| private val activeTasks = new AtomicInteger(0) | ||
| private val errorCaptureFirstException = new AtomicReference[Throwable]() | ||
| private val bulkInputEmitter: Sinks.Many[CosmosItemOperation] = Sinks.many().unicast().onBackpressureBuffer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sinks.May[CosmsoBatch]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, this is done now.
| private val bulkInputEmitter: Sinks.Many[CosmosItemOperation] = Sinks.many().unicast().onBackpressureBuffer() | ||
|
|
||
| private val activeBulkWriteOperations =java.util.concurrent.ConcurrentHashMap.newKeySet[CosmosItemOperation]().asScala | ||
| private val activeReadManyOperations = java.util.concurrent.ConcurrentHashMap.newKeySet[ReadManyOperation]().asScala |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the readMany handling would be different in trasnactional - with BulkUpdate (when readMany is used - currently you get all the docs first) - in transactional it is probably wiser to this logical partition by logical partition - readMany would always only go to single logical partition so the orchestration can be done as part fo teh write. Also when allowign per row write strategy we woudl only schedule readMany for rows with write startegy bulk update?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ItemBulkUpdate (patch) is not allowed for now, so I think the readMany issues you mention here do apply?
| } | ||
|
|
||
| override def requiredOrdering(): Array[SortOrder] = { | ||
| if (writeConfig.bulkEnabled && writeConfig.itemWriteStrategy == ItemWriteStrategy.ItemTransactionalBatch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes in this file look great - I would just think that enabling transactional bulk should be based on other config - not write strategy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored now
…ed docs to ingestion.md
|
|
||
| override def requiredDistribution(): Distribution = { | ||
| if (writeConfig.bulkEnabled && writeConfig.bulkTransactional) { | ||
| // For transactional writes, partition by all partition key columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: info log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to info log level (in constructor).
| val clustering = partitionKeyPaths.map(path => Expressions.column(path): Expression).toArray | ||
| Distributions.clustered(clustering) | ||
| } else { | ||
| Distributions.unspecified() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wraning/error log - pk def should never have 0 columns. At elast log woudl be good in case this happens in the wild (could in theory for extremely old containers but containers for years use artificial PK)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added error logging for edge cases, when:
- partition key definition has 0 columns
- partition key definition is null
| private def getPartitionKeyColumnNames(): Seq[String] = { | ||
| try { | ||
| // Need to create a temporary container client to get partition key definition | ||
| val clientCacheItem = CosmosClientCache( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Loan pattern (kind of the pendent to using in .Net for example) should be used here -otherwise you are leakign these clients.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure whether you actually wanted it to look like .NET, but created using[A, B] helper method that ensures resources properly closed via try-finally.
| // Need to create a temporary container client to get partition key definition | ||
| val clientCacheItem = CosmosClientCache( | ||
| CosmosClientConfiguration( | ||
| userConfig.asCaseSensitiveMap().asScala.toMap, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
userConfig.asCaseSensitiveMap().asScala.toMap repeated multiple times - worth converting once and store as variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created userConfigMap at class level and updated toBatch(), toStreaming(), and config parsing calls to use it.
| None | ||
| ) | ||
|
|
||
| val containerProperties = SparkBridgeInternal.getContainerPropertiesFromCollectionCache(container) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can simplify this by just using
private val containerDefinition = SparkBridgeInternal
.getContainerPropertiesFromCollectionCache(container)
private val partitionKeyDefinition = containerDefinition.getPartitionKeyDefinition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated as suggested
| - **Atomic semantics**: All operations for the same partition key succeed or all fail (rollback) | ||
| - **Operation type**: Only upsert operations are supported (equivalent to `ItemOverwrite` write strategy) | ||
| - **Partition grouping**: Spark automatically partitions and orders data by partition key columns | ||
| - **Size limits**: Maximum 100 operations per transaction; maximum 2MB total payload per transaction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT_ call out that this is service-side limitation (and we need to ensure we rely on service to enforce because they actually made this configurable to allow larger batches)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated docs to call out that 100 operation and 2MB limits are enforced service side.
| #### Use cases | ||
|
|
||
| Transactional batch writes are ideal for: | ||
| - Financial transactions requiring consistency across multiple documents |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would call out temporal use case explicitly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated, is this ok?
|
|
||
| #### Error handling | ||
|
|
||
| If any operation in a transaction fails (e.g., insufficient RUs, document too large, transaction exceeds 100 operations), the entire transaction is rolled back and no documents are modified. The Spark task will fail and retry according to Spark's retry policy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important to distinguish between trasnient errors (entire trasnaction retried) vs. non-transient (spark job fails)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expanded to distinguish.
| @@ -67,7 +67,7 @@ | |||
| | `spark.cosmos.write.point.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size | | |||
| | `spark.cosmos.write.bulk.maxPendingOperations` | None | Cosmos DB Item Write bulk mode maximum pending operations. Defines a limit of bulk operations being processed concurrently. If not specified it will be determined based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of `1000 x Cores` | | |||
| | `spark.cosmos.write.bulk.enabled` | `true` | Cosmos DB Item Write bulk enabled | | |||
| | `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). | | |||
| | `spark.cosmos.write.bulk.transactional` | `false` | Enable transactional batch mode for bulk writes. When enabled, all operations for the same partition key are executed atomically (all succeed or all fail). Requires ordering and clustering by partition key columns. Only supports upsert operations. Cannot exceed 100 operations or 2MB per partition key. See [Transactional Batch documentation](https://learn.microsoft.com/azure/cosmos-db/nosql/transactional-batch) for details. |\n| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). | | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call out explicitly that for HPK only logical partitions are tarsnactional (not partial top-level key)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added explicit clarification for HPK
| scheduleWrite(partitionKeyValue, objectNode, None) | ||
| } | ||
|
|
||
| override def scheduleWrite(partitionKeyValue: PartitionKey, objectNode: ObjectNode, operationType: Option[String]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert?
| new com.azure.cosmos.models.PartitionKeyDefinition() | ||
| ) | ||
| val paths = new java.util.ArrayList[String]() | ||
| paths.add("/PermId") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be safe I would use different column names
| ImplementationBridgeHelpers.CosmosBatchResponseHelper.getCosmosBatchResponseAccessor(); | ||
|
|
||
| private final CosmosAsyncContainer container; | ||
| private final int maxMicroBatchPayloadSizeInBytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove - all limits need to be enforced service-side
| private final Flux<CosmosBatch> inputBatches; | ||
|
|
||
| private final TContext batchContext; | ||
| private final CosmosBulkExecutionOptionsImpl cosmosBulkExecutionOptions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove - irrelevant
| checkNotNull(inputOperations, "expected non-null inputOperations"); | ||
| checkNotNull(cosmosBulkOptions, "expected non-null bulkOptions"); | ||
|
|
||
| this.maxMicroBatchPayloadSizeInBytes = cosmosBulkOptions.getMaxMicroBatchPayloadSizeInBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
| ? schedulerSnapshotFromOptions | ||
| : CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC; | ||
|
|
||
| logger.debug("Instantiated BulkExecutor, Context: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| logger.debug("Instantiated BulkExecutor, Context: {}", | |
| logger.debug("Instantiated TransactionalBulkExecutor, Context: {}", |
| .executeCore() | ||
| .doFinally((SignalType signal) -> { | ||
| if (signal == SignalType.ON_COMPLETE) { | ||
| logDebugOrWarning("BulkExecutor.execute flux completed - # left items {}, Context: {}, {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| logDebugOrWarning("BulkExecutor.execute flux completed - # left items {}, Context: {}, {}", | |
| logDebugOrWarning("TransactionalBulkExecutor.execute flux completed - # left items {}, Context: {}, {}", |
| } else { | ||
| int itemsLeftSnapshot = this.totalCount.get(); | ||
| if (itemsLeftSnapshot > 0) { | ||
| logInfoOrWarning("BulkExecutor.execute flux terminated - Signal: {} - # left items {}, Context: {}, {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| logInfoOrWarning("BulkExecutor.execute flux terminated - Signal: {} - # left items {}, Context: {}, {}", | |
| logInfoOrWarning("TransactionalBulkExecutor.execute flux terminated - Signal: {} - # left items {}, Context: {}, {}", |
| this.operationContextText, | ||
| getThreadInfo()); | ||
| } else { | ||
| logDebugOrWarning("BulkExecutor.execute flux terminated - Signal: {} - # left items {}, Context: {}, {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| logDebugOrWarning("BulkExecutor.execute flux terminated - Signal: {} - # left items {}, Context: {}, {}", | |
| logDebugOrWarning("TransactionalBulkExecutor.execute flux terminated - Signal: {} - # left items {}, Context: {}, {}", |
| throwable); | ||
| return throwable; | ||
| }) | ||
| .flatMap( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this scheduling is a bit too simple - the flatMap will allow a certainnumber of batches to be processed concurrently - independently whether they are spanning multiple physical partiitons or not. To optimize bulk jobs - ideally each Spark executor can saturate each physical partition - so like in BulkExecutor this probably needs grouping by physical partition - and then enforcing a certain limit of cocnurrent transactional batches per physical partition. This can also be done in a spearate PR - but I think it iwll some more thinking to get this right. Probably like dynamic batch size iin bulk we need a way to dynamically tune the allowed number of concurrent transactional batches per physical partition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this has to be addressed before letting any custoemr startuse this feature.
| operations, | ||
| this.effectiveItemSerializer); | ||
| request.setAtomicBatch(true); | ||
| request.setShouldContinueOnError(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth checking with backedn folks what this means when setAtomicBatch == true - maybe this is somethign we can optimize to implement retry polciies for soem of the write startegies like Delete where 404 Not Found should be silently ignored.
| }); | ||
| } | ||
|
|
||
| private int calculateTotalSerializedLength(AtomicInteger currentTotalSerializedLength, CosmosItemOperation item) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not used - probably needs an alternative?
| } | ||
|
|
||
| log.logInfo( | ||
| s"BulkWriter instantiated (Host CPU count: $cpuCount, maxPendingOperations: $maxPendingOperations, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| s"BulkWriter instantiated (Host CPU count: $cpuCount, maxPendingOperations: $maxPendingOperations, " + | |
| s"TransactionalBulkWriter instantiated (Host CPU count: $cpuCount, maxPendingOperations: $maxPendingOperations, " + |
| // there is one bulk writer per spark task/partition | ||
| // and default config will create one executor per core on the executor host | ||
| // so multiplying by cpuCount in the default config is too aggressive | ||
| private val maxPendingOperations = writeConfig.bulkMaxPendingOperations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably calculate maxPendingBatches?
| // the buffer should be flushed. A timer-based scheduler will publish this | ||
| // dummy operation for every batchIntervalInMs ms. This operation | ||
| // is filtered out and will never be flushed to the backend | ||
| private val readManyFlushOperationSingleton = ReadManyOperation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove readMany for now?
| new IllegalStateException("Can't accept any new work - BulkWriter has been disposed already")) | ||
| } | ||
|
|
||
| throwIfProgressStaled( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think all the Semaphore dance should only start when you formed a batch - no need to do this in the middle of a batch. Semantically TransactionalBulkWriter should limit the number of outstanding batches - not operations.
| scheduleWriteInternal(partitionKeyValue, objectNode, operationContext) | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
| operationContext: OperationContext): Unit = { | ||
| activeTasks.incrementAndGet() | ||
| if (operationContext.attemptNumber > 1) { | ||
| logInfoOrWarning(s"bulk scheduleWrite attemptCnt: ${operationContext.attemptNumber}, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| logInfoOrWarning(s"bulk scheduleWrite attemptCnt: ${operationContext.attemptNumber}, " + | |
| logInfoOrWarning(s"TransactionalBulkWriter scheduleWrite attemptCnt: ${operationContext.attemptNumber}, " + |
| s"Context: ${operationContext.toString} $getThreadInfo") | ||
| } | ||
|
|
||
| // The handling will make sure that during retry: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BulkUpdate not supported here? Remove this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed readMany parts
| } | ||
| } | ||
|
|
||
| private def scheduleReadManyInternal(partitionKeyValue: PartitionKey, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove - readMany only needed for BulkUpdate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
| } | ||
| } | ||
|
|
||
| private[this] def getPatchOperationsForBatch(itemId: String, objectNode: ObjectNode): CosmosPatchOperations = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
| } | ||
| } | ||
|
|
||
| private[this] def getPatchItemOperation(itemId: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove?
|
|
||
| //scalastyle:off method.length | ||
| //scalastyle:off cyclomatic.complexity | ||
| private[this] def handleNonSuccessfulStatusCode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overly complicated for different write strategies (and those are incorrect - so, this needs clean-up)
|
|
||
| private[this] def getActiveOperationsLog( | ||
| activeOperationsSnapshot: mutable.Set[CosmosItemOperation], | ||
| activeReadManyOperationsSnapshot: mutable.Set[ReadManyOperation]): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove all readMany stuff from this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
| } | ||
| } | ||
|
|
||
| private[this] def sameReadManyOperations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove?
| returnValue | ||
| } | ||
|
|
||
| private def shouldRetryForItemPatchBulkUpdate(statusCode: Int, subStatusCode: Int): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove - wrong write startegy
| private val minDelayOn408RequestTimeoutInMs = 500 | ||
| private val maxItemOperationsToShowInErrorMessage = 10 | ||
| private val BULK_WRITER_REQUESTS_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-requests-bounded-elastic" | ||
| private val BULK_WRITER_INPUT_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-input-bounded-elastic" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transactional-* prefixes
| private val BULK_WRITER_REQUESTS_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-requests-bounded-elastic" | ||
| private val BULK_WRITER_INPUT_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-input-bounded-elastic" | ||
| private val BULK_WRITER_RESPONSES_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-responses-bounded-elastic" | ||
| private val READ_MANY_BOUNDED_ELASTIC_THREAD_NAME = "read-many-bounded-elastic" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove (read-many only needed for bulk update)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
|
|
||
| private val maxPendingOperationsPerJVM: Int = DefaultMaxPendingOperationPerCore * SparkUtils.getNumberOfHostCPUCores | ||
|
|
||
| // Custom bounded elastic scheduler to consume input flux |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe not all of these are still used?
Description
This PR adds transactional batch operation support to the Azure Cosmos DB Spark 3 connector, enabling atomic multi-document upsert operations within a single partition. The implementation leverages the standard bulk ingestion pipeline with a dedicated
TransactionalBulkWriter/TransactionalBulkExecutorand configuration-based activation that ensures all-or-nothing execution semantics.Key Features
ItemWriteStrategy.ItemOverwrite(upsert operations)spark.cosmos.write.bulk.transactional=trueconfiguration propertyRequiresDistributionAndOrderinginterface for optimal batching performancespark.cosmos.write.bulk.transactional=trueUsage Examples
Configuration
Transactional batch mode is enabled using the
spark.cosmos.write.bulk.transactionalconfiguration property:Example 1: Basic Upsert Operations
All operations in transactional batch mode use the
ItemOverwritewrite strategy (upsert):Note: For Spark 3.3/3.4 users, manually sort data by partition key for optimal performance:
Example 2: Financial Instrument Temporal Versioning (Hierarchical Partition Keys)
This example demonstrates hierarchical partition keys for temporal versioning of financial instruments. The partition key consists of two levels:
PermId(instrument identifier) andSourceId(data source):Note: In this example,
PermIdandSourceIdtogether form the hierarchical partition key (2 levels). All operations in the same batch must share the same partition key values to maintain atomicity. All operations useItemOverwritewrite strategy.Input DataFrame Schema
Your DataFrame should have flat columns representing document properties:
Note: For hierarchical partition keys, include all partition key path columns (e.g.,
PermId,SourceId). All operations useItemOverwritewrite strategy.Implementation Architecture
Standard Bulk Pipeline Integration
The transactional batch feature integrates seamlessly with the existing bulk write infrastructure:
Configuration Detection (
ItemsWriterBuilder):spark.cosmos.write.bulk.transactional=trueconfigurationTransactionalBulkWriterinstead ofBulkWriterPartition Key Buffering (
TransactionalBulkWriter):TransactionalBulkExecutorreactor pipelineBatch Grouping (
TransactionalBulkExecutor):bufferUntil()operatorCosmosBatchwithupsertItemOperation()calls (ItemOverwrite strategy)Atomic Execution:
executeBatchRequest()API with atomic semanticsSpark Version Compatibility
RequiresDistributionAndOrderingWhy it works on all Spark versions:
TransactionalBulkExecutorperforms partition key buffering at the writer level, independent of Spark's distribution/orderingRequiresDistributionAndOrderinginterface automatically instructs Spark to repartition and sort data by partition key columns, ensuring consecutive operations share the same partition key for maximum batch efficiencyPerformance Impact:
Constraints
ItemWriteStrategy.ItemOverwrite(upsert) is supported in transactional batch modespark.cosmos.write.bulk.enabled=true(enabled by default)Error Handling
Transactional batch operations follow all-or-nothing semantics:
Common Error Scenarios:
Validation Errors (thrown before execution):
idcolumnAll SDK Contribution checklist:
General Guidelines and Best Practices
Testing Guidelines