Skip to content

Conversation

@TheovanKraay
Copy link
Member

@TheovanKraay TheovanKraay commented Dec 6, 2025

Description

This PR adds transactional batch operation support to the Azure Cosmos DB Spark 3 connector, enabling atomic multi-document upsert operations within a single partition. The implementation leverages the standard bulk ingestion pipeline with a dedicated TransactionalBulkWriter/TransactionalBulkExecutor and configuration-based activation that ensures all-or-nothing execution semantics.

Key Features

  • Atomic Operations: All operations in a batch succeed or fail together (ACID guarantees via Cosmos DB transactional batch API)
  • Standard Bulk Pipeline: Reuses existing bulk write infrastructure with transactional semantics (no separate API endpoints)
  • ItemOverwrite Strategy: Uses ItemWriteStrategy.ItemOverwrite (upsert operations)
  • Configuration-Based: Enable via spark.cosmos.write.bulk.transactional=true configuration property
  • Hierarchical Partition Keys: Full support for 1-3 level hierarchical partition keys
  • 100-Operation Limit Enforcement: Automatically validates and enforces Cosmos DB's 100 operations per partition key limit
  • Automatic Partition Key Buffering: Groups consecutive operations by partition key and flushes batches on partition key changes
  • Spark 3.5 Optimization: Automatic data distribution and ordering via RequiresDistributionAndOrdering interface for optimal batching performance
  • Backward Compatibility: Works on Spark 3.3/3.4 (manual sorting recommended for best performance) and Spark 3.5+ (automatic sorting)
  • Simple Configuration: Enable via spark.cosmos.write.bulk.transactional=true

Usage Examples

Configuration

Transactional batch mode is enabled using the spark.cosmos.write.bulk.transactional configuration property:

# Enable transactional batch via configuration
writeConfig = {
    "spark.cosmos.write.bulk.transactional": "true",
    "spark.cosmos.write.bulk.enabled": "true",  # Bulk mode must be enabled
    # ... other cosmos config
}
df.write.format("cosmos.oltp").options(**writeConfig).save()

Example 1: Basic Upsert Operations

All operations in transactional batch mode use the ItemOverwrite write strategy (upsert):

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define schema for transactional batch writes
schema = StructType([
    StructField("id", StringType(), False),
    StructField("pk", StringType(), False),
    StructField("name", StringType(), False),
    StructField("age", IntegerType(), False)
])

# Create DataFrame - all operations will be upserts
items = [
    ("item1", "partition1", "Alice", 30),
    ("item2", "partition1", "Bob", 25),
    ("item3", "partition1", "Charlie", 35)
]
df = spark.createDataFrame(items, schema)

# Execute transactional batch - all operations succeed or fail together
df.write \
    .format("cosmos.oltp") \
    .option("spark.cosmos.write.bulk.transactional", "true") \
    .option("spark.cosmos.write.bulk.enabled", "true") \
    .options(**cosmosConfig) \
    .save()

Note: For Spark 3.3/3.4 users, manually sort data by partition key for optimal performance:

# Recommended for Spark 3.3/3.4 (automatic in Spark 3.5+)
df.repartition("pk").sortWithinPartitions("pk").write \
    .format("cosmos.oltp") \
    .option("spark.cosmos.write.bulk.transactional", "true") \
    .option("spark.cosmos.write.bulk.enabled", "true") \
    .options(**cosmosConfig) \
    .save()

Example 2: Financial Instrument Temporal Versioning (Hierarchical Partition Keys)

This example demonstrates hierarchical partition keys for temporal versioning of financial instruments. The partition key consists of two levels: PermId (instrument identifier) and SourceId (data source):

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Define schema with hierarchical partition key: PermId (level 1) + SourceId (level 2)
schema = StructType([
    StructField("id", StringType(), False),
    StructField("PermId", StringType(), False),      # Partition key level 1
    StructField("SourceId", StringType(), False),    # Partition key level 2
    StructField("ValidFrom", TimestampType(), False),
    StructField("ValidTo", TimestampType(), True),
    StructField("Price", DoubleType(), False),
    StructField("Currency", StringType(), False)
])

# Temporal update: close old record and create new version atomically
# Both operations use the same hierarchical partition key [PermId="MSFT", SourceId="Bloomberg"]
operations = [
    # Close the current active record by setting ValidTo
    ("inst-msft-v1", "MSFT", "Bloomberg", "2024-01-01", "2024-12-01", 100.50, "USD"),
    # Create new version with updated price
    ("inst-msft-v2", "MSFT", "Bloomberg", "2024-12-01", None, 105.75, "USD")
]
df = spark.createDataFrame(operations, schema)

# Execute atomic temporal update - both succeed or both fail
df.write \
    .format("cosmos.oltp") \
    .option("spark.cosmos.write.bulk.transactional", "true") \
    .option("spark.cosmos.write.bulk.enabled", "true") \
    .options(**cosmosConfig) \
    .save()

Note: In this example, PermId and SourceId together form the hierarchical partition key (2 levels). All operations in the same batch must share the same partition key values to maintain atomicity. All operations use ItemOverwrite write strategy.

Input DataFrame Schema

Your DataFrame should have flat columns representing document properties:

Column Type Required Description
id String Yes Document identifier
pk (or partition key path) String/Multiple Yes Partition key value(s) - supports hierarchical keys
...additional columns... Any No Document properties (converted to JSON)

Note: For hierarchical partition keys, include all partition key path columns (e.g., PermId, SourceId). All operations use ItemOverwrite write strategy.

Implementation Architecture

Standard Bulk Pipeline Integration

The transactional batch feature integrates seamlessly with the existing bulk write infrastructure:

  1. Configuration Detection (ItemsWriterBuilder):

    • Detects spark.cosmos.write.bulk.transactional=true configuration
    • Automatically instantiates TransactionalBulkWriter instead of BulkWriter
  2. Partition Key Buffering (TransactionalBulkWriter):

    • Receives operations from Spark partitions
    • Emits operations to TransactionalBulkExecutor reactor pipeline
  3. Batch Grouping (TransactionalBulkExecutor):

    • Groups consecutive operations by partition key using bufferUntil() operator
    • Flushes batch when partition key changes or 100-operation limit reached
    • Creates CosmosBatch with upsertItemOperation() calls (ItemOverwrite strategy)
  4. Atomic Execution:

    • Sends batch to Cosmos DB via executeBatchRequest() API with atomic semantics
    • All operations succeed or fail together (no partial success)

Spark Version Compatibility

Spark Version Status Distribution/Ordering Performance
Spark 3.3 ✅ Supported Manual (user must sort) Good (with manual sorting)
Spark 3.4 ✅ Supported Manual (user must sort) Good (with manual sorting)
Spark 3.5+ ✅ Supported Automatic via RequiresDistributionAndOrdering Optimal (automatic sorting)

Why it works on all Spark versions:

  • The TransactionalBulkExecutor performs partition key buffering at the writer level, independent of Spark's distribution/ordering
  • Spark 3.3/3.4: Feature works correctly, but users should manually sort data by partition key for optimal batching
  • Spark 3.5+: RequiresDistributionAndOrdering interface automatically instructs Spark to repartition and sort data by partition key columns, ensuring consecutive operations share the same partition key for maximum batch efficiency

Performance Impact:

  • Without sorting (Spark 3.3/3.4 without manual sort): Many small batches (1-2 operations each)
  • With sorting (Spark 3.3/3.4 with manual sort OR Spark 3.5+ automatic): Optimal batches (up to 100 operations each)
  • Recommendation: Always sort by partition key on Spark 3.3/3.4 for best performance

Constraints

  • ItemOverwrite Only: Only ItemWriteStrategy.ItemOverwrite (upsert) is supported in transactional batch mode
  • Same Partition Key: All operations in a batch must target the same partition key value(s)
  • 100 Operation Limit: Batches cannot exceed 100 operations per partition key (enforced with clear error messages)
  • Atomicity: All operations succeed or fail together within each batch (no partial success)
  • 2MB Size Limit: Total batch payload cannot exceed 2MB
  • Hierarchical Keys: Supports up to 3-level hierarchical partition keys
  • Bulk Write Required: Must have spark.cosmos.write.bulk.enabled=true (enabled by default)

Error Handling

Transactional batch operations follow all-or-nothing semantics:

# Any failure in the batch causes all operations to rollback
try:
    df.write \
        .format("cosmos.oltp") \
        .option("spark.cosmos.write.bulk.transactional", "true") \
        .option("spark.cosmos.write.bulk.enabled", "true") \
        .options(**cosmosConfig) \
        .save()
except Exception as e:
    # Batch failed - no operations were committed
    print(f"Transaction failed: {e}")

Common Error Scenarios:

  • 400 Bad Request: Invalid document structure or exceeds size limits
  • Exceeds 100 operations: Clear validation error before execution
  • Mixed partition keys: Validation error preventing batch creation

Validation Errors (thrown before execution):

  • Missing required id column
  • More than 100 operations for a single partition key

All SDK Contribution checklist:

  • The pull request does not introduce [breaking changes]
  • CHANGELOG is updated for new features, bug fixes or other significant changes.
  • I have read the contribution guidelines.

General Guidelines and Best Practices

  • Title of the pull request is clear and informative.
  • There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, see this page.

Testing Guidelines

  • Pull request includes test coverage for the included changes.

Copilot AI review requested due to automatic review settings December 6, 2025 14:19
@TheovanKraay TheovanKraay requested review from a team and kirankumarkolli as code owners December 6, 2025 14:19
@github-actions github-actions bot added the Cosmos label Dec 6, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds transactional batch operation support to the Azure Cosmos DB Spark 3 connector, enabling atomic multi-document operations within a single partition key. The implementation introduces a new TransactionalBulkExecutor and TransactionalBulkWriter that leverage Cosmos DB's native transactional batch API.

Key changes:

  • New TransactionalBulkExecutor.java implements atomic batch execution with partition key buffering
  • New TransactionalBulkWriter.scala integrates transactional batches with Spark bulk write pipeline
  • Spark 3.5+ automatic distribution/ordering via RequiresDistributionAndOrdering interface
  • Configuration options for enabling transactional batch mode
  • Comprehensive test coverage for atomic operations, rollback scenarios, and hierarchical partition keys

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
TransactionalBulkExecutor.java Core batch execution logic with partition key grouping and 100-operation limit enforcement
TransactionalBulkWriter.scala Spark integration layer supporting per-row operation types and retry logic
CosmosWriterBase.scala Writer selection logic to instantiate TransactionalBulkWriter when enabled
CosmosConfig.scala New configuration option WriteBulkEnableTransactions and ItemTransactionalBatch strategy
ItemsWriterBuilder.scala Spark 3.5 integration with automatic data distribution and ordering
AsyncItemWriter.scala Interface extension for per-row operation type support
BulkWriter.scala Updated to implement new interface method (ignores per-row operations)
PointWriter.scala Updated to implement new interface method
TransactionalBatchITest.scala Comprehensive integration tests for various transactional batch scenarios
BulkWriterITest.scala Updated test to pass new required parameter

this.maxMicroBatchPayloadSizeInBytes = cosmosBulkOptions.getMaxMicroBatchPayloadSizeInBytes();
this.cosmosBulkExecutionOptions = cosmosBulkOptions;
this.container = container;
this.bulkSpanName = "nonTransactionalBatch." + this.container.getId();
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The span name is misleading - it says "nonTransactionalBatch" but this is the TransactionalBulkExecutor. This should be "transactionalBatch" to accurately reflect the operation type for diagnostics and tracing.

Suggested change
this.bulkSpanName = "nonTransactionalBatch." + this.container.getId();
this.bulkSpanName = "transactionalBatch." + this.container.getId();

Copilot uses AI. Check for mistakes.
"Transactional batch operation failed: partition key '%s' has %d operations, " +
"which exceeds the maximum allowed limit of %d operations per transactional batch. " +
"Transactional batches require all-or-nothing execution and cannot be split across multiple requests.",
cosmosBatch.getPartitionKeyValue(),
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation error message at line 694-699 correctly identifies the issue but the partition key value might be null which would cause NullPointerException when calling toString(). Consider using String.valueOf(cosmosBatch.getPartitionKeyValue()) or add a null check to prevent NPE in error message formatting.

Suggested change
cosmosBatch.getPartitionKeyValue(),
String.valueOf(cosmosBatch.getPartitionKeyValue()),

Copilot uses AI. Check for mistakes.

String batchTrackingId = UUIDs.nonBlockingRandomUUID().toString();
logTraceOrWarning(
"Executing transactional batch - batch TrackingId: %s",
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging format string uses %s placeholders (line 688, 754) which won't work with SLF4J. SLF4J requires {} placeholders. Change the format strings to use {} instead of %s for proper log message formatting.

Suggested change
"Executing transactional batch - batch TrackingId: %s",
"Executing transactional batch - batch TrackingId: {}",

Copilot uses AI. Check for mistakes.
.flatMapMany(response -> {

logTraceOrWarning(
"Response for transactional batch - status code %s, ActivityId: %s, batch TrackingId %s",
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same SLF4J formatting issue - use {} instead of %s for placeholders.

Suggested change
"Response for transactional batch - status code %s, ActivityId: %s, batch TrackingId %s",
"Response for transactional batch - status code {}, ActivityId: {}, batch TrackingId {}",

Copilot uses AI. Check for mistakes.
Comment on lines 810 to 811

ItemBulkOperation<?, ?> itemBulkOperation = (ItemBulkOperation<?, ?>) itemOperation;
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message at line 811 says "Unsupported operationType" but should be more specific. Consider: "Unsupported operation type '{}' for transactional batch. Supported types are: CREATE, UPSERT, REPLACE, DELETE, READ, PATCH."

Copilot uses AI. Check for mistakes.
Comment on lines 762 to 813
val bulkItemOperation = effectiveOperationType match {
case "create" =>
CosmosBulkOperations.getCreateItemOperation(objectNode, partitionKeyValue, operationContext)

case "upsert" =>
CosmosBulkOperations.getUpsertItemOperation(objectNode, partitionKeyValue, operationContext)

case "replace" =>
operationContext.eTag match {
case Some(eTag) =>
CosmosBulkOperations.getReplaceItemOperation(
operationContext.itemId,
objectNode,
partitionKeyValue,
new CosmosBulkItemRequestOptions().setIfMatchETag(eTag),
operationContext)
case _ =>
CosmosBulkOperations.getReplaceItemOperation(
operationContext.itemId,
objectNode,
partitionKeyValue,
operationContext)
}

case "delete" =>
operationContext.eTag match {
case Some(eTag) =>
CosmosBulkOperations.getDeleteItemOperation(
operationContext.itemId,
partitionKeyValue,
new CosmosBulkItemRequestOptions().setIfMatchETag(eTag),
operationContext)
case _ =>
CosmosBulkOperations.getDeleteItemOperation(
operationContext.itemId,
partitionKeyValue,
operationContext)
}

case "patch" =>
getPatchItemOperation(
operationContext.itemId,
partitionKeyValue,
partitionKeyDefinition,
objectNode,
operationContext)

case _ =>
throw new IllegalArgumentException(
s"Unsupported operationType '$effectiveOperationType'. " +
s"Supported types for transactional batch: create, upsert, replace, delete")
}
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing validation: The operationType parameter could contain invalid values like "patch" when used with transactional batch. The code at line 810-813 only throws an error for operation types that don't match the switch statement, but doesn't validate against the supported transactional batch operation types upfront.

Copilot uses AI. Check for mistakes.
Comment on lines 801 to 813
case "patch" =>
getPatchItemOperation(
operationContext.itemId,
partitionKeyValue,
partitionKeyDefinition,
objectNode,
operationContext)

case _ =>
throw new IllegalArgumentException(
s"Unsupported operationType '$effectiveOperationType'. " +
s"Supported types for transactional batch: create, upsert, replace, delete")
}
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment states "patch" is supported (line 812), but the PR description and usage examples don't include patch operations in transactional batches. The switch case handles PATCH but doesn't validate whether patch is actually supported in transactional batches. Clarify if patch is supported or remove it from the comment/error messages.

Suggested change
case "patch" =>
getPatchItemOperation(
operationContext.itemId,
partitionKeyValue,
partitionKeyDefinition,
objectNode,
operationContext)
case _ =>
throw new IllegalArgumentException(
s"Unsupported operationType '$effectiveOperationType'. " +
s"Supported types for transactional batch: create, upsert, replace, delete")
}
// PATCH is not supported in transactional batches
case "patch" =>
throw new IllegalArgumentException(
s"Unsupported operationType 'patch' for transactional batch. " +
s"Supported types for transactional batch: create, upsert, replace, delete")
case _ =>
throw new IllegalArgumentException(
s"Unsupported operationType '$effectiveOperationType'. " +
s"Supported types for transactional batch: create, upsert, replace, delete")

Copilot uses AI. Check for mistakes.
defaultValue = Option.apply(false),
mandatory = false,
parseFromStringFunction = enableTransactionsAsString => enableTransactionsAsString.toBoolean,
helpMessage = "Cosmos DB Item Write enable transactional batch - requires bulk write to be enabled and Spark 3.5+")
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The help message says "requires... Spark 3.5+" but the PR description states the feature works on Spark 3.3/3.4 (with manual sorting). The help message is misleading. Update to clarify that Spark 3.5+ provides automatic distribution/ordering, while 3.3/3.4 requires manual sorting for optimal performance.

Suggested change
helpMessage = "Cosmos DB Item Write enable transactional batch - requires bulk write to be enabled and Spark 3.5+")
helpMessage = "Cosmos DB Item Write enable transactional batch - requires bulk write to be enabled. " +
"Spark 3.5+ provides automatic distribution/ordering for transactional batch. " +
"On Spark 3.3/3.4, transactional batch is supported but requires manual sorting for optimal performance.")

Copilot uses AI. Check for mistakes.
private final Long maxMicroBatchIntervalInMs;

private final TContext batchContext;
private final ConcurrentMap<String, PartitionScopeThresholds> partitionScopeThresholds;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOt needed here (relevant for dynamic tuning of bulk batch size)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

private final Flux<com.azure.cosmos.models.CosmosItemOperation> inputOperations;

// Options for bulk execution.
private final Long maxMicroBatchIntervalInMs;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

private final String operationContextText;
private final OperationContextAndListenerTuple operationListener;
private final ThrottlingRetryOptions throttlingRetryOptions;
private final Flux<com.azure.cosmos.models.CosmosItemOperation> inputOperations;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be of type CosmosBatch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated now.


private Flux<CosmosBulkOperationResponse<TContext>> executeTransactionalBatch(
List<CosmosItemOperation> operations,
PartitionKey partitionKey,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like described on Friday - you have to construct the batches at the edge (in TrasnactionalBulkWriter.scheduleWrite) - this class should only operate on CosmsoBatch not CosmosItemOperation - otherwise there is zero chance to get the retries correctly.

Also, the operationType per row is super useful - but it should be a write strategy (otherwise like you mentioned it won't be idempotent). Which also means to trigger transactional batch - it should not be a new Wrte strategy - but probably some other config)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I completely missed this, implemented this now but removed operationType per row - defer to another PR.

batchTrackingId);

// Validate that transactional batch does not exceed Cosmos DB limit
if (operations.size() > BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thsi validation should not happen client-side (service has account-level config override)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this and updated test accordingly

* @param objectNode the json object node
* @param operationType optional operation type (create, upsert, replace, delete) for this specific row
*/
def scheduleWrite(partitionKeyValue: PartitionKey, objectNode: ObjectNode, operationType: Option[String]): Unit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of operationTpye this should be write strategy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I would model this as required parameter in the existing overload - we have the write strategy from config today and can just populate it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternative is to leave out this change and stick with one global write startegy for nwo - then the chaneg to allow write strategy per row can be done in a separate PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm, you are suggesting either:

  1. Leaving as a global config for transactional batch (where everything is executed as upsert) and addressing per row in a separate PR
  2. Re-naming operationType to writeStrategy and making activation of transactionBatch as a global config instead of a new write strategy like ItemTransactionalBatch

If so I would choose the 2nd.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or are you saying that to have per row operation type, we have to re-think how this is done, and not merely re-name operationType?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I am assuming the former because for bitemporal use cases, not having operationType per row does not seem to make sense (although we do seem to need something that caters for idempotency properly for failed jobs). At least the semantics/terminology will make sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, removed this entirely, defer to another PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense - but then this change should be reverted?

private val pendingReadManyRetries = java.util.concurrent.ConcurrentHashMap.newKeySet[ReadManyOperation]().asScala
private val activeTasks = new AtomicInteger(0)
private val errorCaptureFirstException = new AtomicReference[Throwable]()
private val bulkInputEmitter: Sinks.Many[CosmosItemOperation] = Sinks.many().unicast().onBackpressureBuffer()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sinks.May[CosmsoBatch]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this is done now.

private val bulkInputEmitter: Sinks.Many[CosmosItemOperation] = Sinks.many().unicast().onBackpressureBuffer()

private val activeBulkWriteOperations =java.util.concurrent.ConcurrentHashMap.newKeySet[CosmosItemOperation]().asScala
private val activeReadManyOperations = java.util.concurrent.ConcurrentHashMap.newKeySet[ReadManyOperation]().asScala
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the readMany handling would be different in trasnactional - with BulkUpdate (when readMany is used - currently you get all the docs first) - in transactional it is probably wiser to this logical partition by logical partition - readMany would always only go to single logical partition so the orchestration can be done as part fo teh write. Also when allowign per row write strategy we woudl only schedule readMany for rows with write startegy bulk update?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ItemBulkUpdate (patch) is not allowed for now, so I think the readMany issues you mention here do apply?

}

override def requiredOrdering(): Array[SortOrder] = {
if (writeConfig.bulkEnabled && writeConfig.itemWriteStrategy == ItemWriteStrategy.ItemTransactionalBatch) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes in this file look great - I would just think that enabling transactional bulk should be based on other config - not write strategy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored now


override def requiredDistribution(): Distribution = {
if (writeConfig.bulkEnabled && writeConfig.bulkTransactional) {
// For transactional writes, partition by all partition key columns
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: info log

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to info log level (in constructor).

val clustering = partitionKeyPaths.map(path => Expressions.column(path): Expression).toArray
Distributions.clustered(clustering)
} else {
Distributions.unspecified()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wraning/error log - pk def should never have 0 columns. At elast log woudl be good in case this happens in the wild (could in theory for extremely old containers but containers for years use artificial PK)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added error logging for edge cases, when:

  • partition key definition has 0 columns
  • partition key definition is null

private def getPartitionKeyColumnNames(): Seq[String] = {
try {
// Need to create a temporary container client to get partition key definition
val clientCacheItem = CosmosClientCache(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loan pattern (kind of the pendent to using in .Net for example) should be used here -otherwise you are leakign these clients.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether you actually wanted it to look like .NET, but created using[A, B] helper method that ensures resources properly closed via try-finally.

// Need to create a temporary container client to get partition key definition
val clientCacheItem = CosmosClientCache(
CosmosClientConfiguration(
userConfig.asCaseSensitiveMap().asScala.toMap,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

userConfig.asCaseSensitiveMap().asScala.toMap repeated multiple times - worth converting once and store as variable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created userConfigMap at class level and updated toBatch(), toStreaming(), and config parsing calls to use it.

None
)

val containerProperties = SparkBridgeInternal.getContainerPropertiesFromCollectionCache(container)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simplify this by just using

private val containerDefinition = SparkBridgeInternal
.getContainerPropertiesFromCollectionCache(container)
private val partitionKeyDefinition = containerDefinition.getPartitionKeyDefinition

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated as suggested

- **Atomic semantics**: All operations for the same partition key succeed or all fail (rollback)
- **Operation type**: Only upsert operations are supported (equivalent to `ItemOverwrite` write strategy)
- **Partition grouping**: Spark automatically partitions and orders data by partition key columns
- **Size limits**: Maximum 100 operations per transaction; maximum 2MB total payload per transaction
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT_ call out that this is service-side limitation (and we need to ensure we rely on service to enforce because they actually made this configurable to allow larger batches)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated docs to call out that 100 operation and 2MB limits are enforced service side.

#### Use cases

Transactional batch writes are ideal for:
- Financial transactions requiring consistency across multiple documents
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call out temporal use case explicitly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated, is this ok?


#### Error handling

If any operation in a transaction fails (e.g., insufficient RUs, document too large, transaction exceeds 100 operations), the entire transaction is rolled back and no documents are modified. The Spark task will fail and retry according to Spark's retry policy.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important to distinguish between trasnient errors (entire trasnaction retried) vs. non-transient (spark job fails)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expanded to distinguish.

@@ -67,7 +67,7 @@
| `spark.cosmos.write.point.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size |
| `spark.cosmos.write.bulk.maxPendingOperations` | None | Cosmos DB Item Write bulk mode maximum pending operations. Defines a limit of bulk operations being processed concurrently. If not specified it will be determined based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of `1000 x Cores` |
| `spark.cosmos.write.bulk.enabled` | `true` | Cosmos DB Item Write bulk enabled |
| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). |
| `spark.cosmos.write.bulk.transactional` | `false` | Enable transactional batch mode for bulk writes. When enabled, all operations for the same partition key are executed atomically (all succeed or all fail). Requires ordering and clustering by partition key columns. Only supports upsert operations. Cannot exceed 100 operations or 2MB per partition key. See [Transactional Batch documentation](https://learn.microsoft.com/azure/cosmos-db/nosql/transactional-batch) for details. |\n| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call out explicitly that for HPK only logical partitions are tarsnactional (not partial top-level key)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added explicit clarification for HPK

scheduleWrite(partitionKeyValue, objectNode, None)
}

override def scheduleWrite(partitionKeyValue: PartitionKey, objectNode: ObjectNode, operationType: Option[String]): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert?

new com.azure.cosmos.models.PartitionKeyDefinition()
)
val paths = new java.util.ArrayList[String]()
paths.add("/PermId")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be safe I would use different column names

ImplementationBridgeHelpers.CosmosBatchResponseHelper.getCosmosBatchResponseAccessor();

private final CosmosAsyncContainer container;
private final int maxMicroBatchPayloadSizeInBytes;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove - all limits need to be enforced service-side

private final Flux<CosmosBatch> inputBatches;

private final TContext batchContext;
private final CosmosBulkExecutionOptionsImpl cosmosBulkExecutionOptions;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove - irrelevant

checkNotNull(inputOperations, "expected non-null inputOperations");
checkNotNull(cosmosBulkOptions, "expected non-null bulkOptions");

this.maxMicroBatchPayloadSizeInBytes = cosmosBulkOptions.getMaxMicroBatchPayloadSizeInBytes();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

? schedulerSnapshotFromOptions
: CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC;

logger.debug("Instantiated BulkExecutor, Context: {}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.debug("Instantiated BulkExecutor, Context: {}",
logger.debug("Instantiated TransactionalBulkExecutor, Context: {}",

.executeCore()
.doFinally((SignalType signal) -> {
if (signal == SignalType.ON_COMPLETE) {
logDebugOrWarning("BulkExecutor.execute flux completed - # left items {}, Context: {}, {}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logDebugOrWarning("BulkExecutor.execute flux completed - # left items {}, Context: {}, {}",
logDebugOrWarning("TransactionalBulkExecutor.execute flux completed - # left items {}, Context: {}, {}",

} else {
int itemsLeftSnapshot = this.totalCount.get();
if (itemsLeftSnapshot > 0) {
logInfoOrWarning("BulkExecutor.execute flux terminated - Signal: {} - # left items {}, Context: {}, {}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logInfoOrWarning("BulkExecutor.execute flux terminated - Signal: {} - # left items {}, Context: {}, {}",
logInfoOrWarning("TransactionalBulkExecutor.execute flux terminated - Signal: {} - # left items {}, Context: {}, {}",

this.operationContextText,
getThreadInfo());
} else {
logDebugOrWarning("BulkExecutor.execute flux terminated - Signal: {} - # left items {}, Context: {}, {}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logDebugOrWarning("BulkExecutor.execute flux terminated - Signal: {} - # left items {}, Context: {}, {}",
logDebugOrWarning("TransactionalBulkExecutor.execute flux terminated - Signal: {} - # left items {}, Context: {}, {}",

throwable);
return throwable;
})
.flatMap(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this scheduling is a bit too simple - the flatMap will allow a certainnumber of batches to be processed concurrently - independently whether they are spanning multiple physical partiitons or not. To optimize bulk jobs - ideally each Spark executor can saturate each physical partition - so like in BulkExecutor this probably needs grouping by physical partition - and then enforcing a certain limit of cocnurrent transactional batches per physical partition. This can also be done in a spearate PR - but I think it iwll some more thinking to get this right. Probably like dynamic batch size iin bulk we need a way to dynamically tune the allowed number of concurrent transactional batches per physical partition

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this has to be addressed before letting any custoemr startuse this feature.

operations,
this.effectiveItemSerializer);
request.setAtomicBatch(true);
request.setShouldContinueOnError(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth checking with backedn folks what this means when setAtomicBatch == true - maybe this is somethign we can optimize to implement retry polciies for soem of the write startegies like Delete where 404 Not Found should be silently ignored.

});
}

private int calculateTotalSerializedLength(AtomicInteger currentTotalSerializedLength, CosmosItemOperation item) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not used - probably needs an alternative?

}

log.logInfo(
s"BulkWriter instantiated (Host CPU count: $cpuCount, maxPendingOperations: $maxPendingOperations, " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s"BulkWriter instantiated (Host CPU count: $cpuCount, maxPendingOperations: $maxPendingOperations, " +
s"TransactionalBulkWriter instantiated (Host CPU count: $cpuCount, maxPendingOperations: $maxPendingOperations, " +

// there is one bulk writer per spark task/partition
// and default config will create one executor per core on the executor host
// so multiplying by cpuCount in the default config is too aggressive
private val maxPendingOperations = writeConfig.bulkMaxPendingOperations
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should probably calculate maxPendingBatches?

// the buffer should be flushed. A timer-based scheduler will publish this
// dummy operation for every batchIntervalInMs ms. This operation
// is filtered out and will never be flushed to the backend
private val readManyFlushOperationSingleton = ReadManyOperation(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove readMany for now?

new IllegalStateException("Can't accept any new work - BulkWriter has been disposed already"))
}

throwIfProgressStaled(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all the Semaphore dance should only start when you formed a batch - no need to do this in the middle of a batch. Semantically TransactionalBulkWriter should limit the number of outstanding batches - not operations.

scheduleWriteInternal(partitionKeyValue, objectNode, operationContext)
}

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

operationContext: OperationContext): Unit = {
activeTasks.incrementAndGet()
if (operationContext.attemptNumber > 1) {
logInfoOrWarning(s"bulk scheduleWrite attemptCnt: ${operationContext.attemptNumber}, " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logInfoOrWarning(s"bulk scheduleWrite attemptCnt: ${operationContext.attemptNumber}, " +
logInfoOrWarning(s"TransactionalBulkWriter scheduleWrite attemptCnt: ${operationContext.attemptNumber}, " +

s"Context: ${operationContext.toString} $getThreadInfo")
}

// The handling will make sure that during retry:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BulkUpdate not supported here? Remove this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed readMany parts

}
}

private def scheduleReadManyInternal(partitionKeyValue: PartitionKey,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove - readMany only needed for BulkUpdate

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

}
}

private[this] def getPatchOperationsForBatch(itemId: String, objectNode: ObjectNode): CosmosPatchOperations = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

}
}

private[this] def getPatchItemOperation(itemId: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove?


//scalastyle:off method.length
//scalastyle:off cyclomatic.complexity
private[this] def handleNonSuccessfulStatusCode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overly complicated for different write strategies (and those are incorrect - so, this needs clean-up)


private[this] def getActiveOperationsLog(
activeOperationsSnapshot: mutable.Set[CosmosItemOperation],
activeReadManyOperationsSnapshot: mutable.Set[ReadManyOperation]): String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove all readMany stuff from this file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

}
}

private[this] def sameReadManyOperations
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove?

returnValue
}

private def shouldRetryForItemPatchBulkUpdate(statusCode: Int, subStatusCode: Int): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove - wrong write startegy

private val minDelayOn408RequestTimeoutInMs = 500
private val maxItemOperationsToShowInErrorMessage = 10
private val BULK_WRITER_REQUESTS_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-requests-bounded-elastic"
private val BULK_WRITER_INPUT_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-input-bounded-elastic"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transactional-* prefixes

private val BULK_WRITER_REQUESTS_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-requests-bounded-elastic"
private val BULK_WRITER_INPUT_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-input-bounded-elastic"
private val BULK_WRITER_RESPONSES_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-responses-bounded-elastic"
private val READ_MANY_BOUNDED_ELASTIC_THREAD_NAME = "read-many-bounded-elastic"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove (read-many only needed for bulk update)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


private val maxPendingOperationsPerJVM: Int = DefaultMaxPendingOperationPerCore * SparkUtils.getNumberOfHostCPUCores

// Custom bounded elastic scheduler to consume input flux
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe not all of these are still used?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants