2525//
2626
2727import AWSDynamoDB
28- import CollectionConcurrencyKit
2928import Foundation
3029import Logging
3130
@@ -38,7 +37,7 @@ public enum AWSDynamoDBLimits {
3837 public static let maxStatementLength = 8192
3938}
4039
41- private struct AWSDynamoDBPolymorphicWriteEntryTransform < Client: DynamoDBClientProtocol > : PolymorphicWriteEntryTransform {
40+ private struct AWSDynamoDBPolymorphicWriteEntryTransform < Client: DynamoDBClientProtocol & Sendable > : PolymorphicWriteEntryTransform {
4241 typealias TableType = GenericAWSDynamoDBCompositePrimaryKeyTable < Client >
4342
4443 let statement : String
@@ -48,12 +47,12 @@ private struct AWSDynamoDBPolymorphicWriteEntryTransform<Client: DynamoDBClientP
4847 }
4948}
5049
51- private struct AWSDynamoDBPolymorphicTransactionConstraintTransform < Client: DynamoDBClientProtocol > : PolymorphicTransactionConstraintTransform {
50+ private struct AWSDynamoDBPolymorphicTransactionConstraintTransform < Client: DynamoDBClientProtocol & Sendable > : PolymorphicTransactionConstraintTransform {
5251 typealias TableType = GenericAWSDynamoDBCompositePrimaryKeyTable < Client >
5352
5453 let statement : String
5554
56- init ( _ entry: TransactionConstraintEntry < some PrimaryKeyAttributes , some Codable , some TimeToLiveAttributes > ,
55+ init ( _ entry: TransactionConstraintEntry < some PrimaryKeyAttributes , some Codable & Sendable , some TimeToLiveAttributes > ,
5756 table: TableType ) throws
5857 {
5958 self . statement = try table. entryToStatement ( entry)
@@ -75,34 +74,32 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable {
7574 internal func entryToStatement(
7675 _ entry: WriteEntry < some Any , some Any , some Any > ) throws -> String
7776 {
78- let statement : String
79- switch entry {
77+ let statement : String = switch entry {
8078 case let . update( new: new, existing: existing) :
81- statement = try getUpdateExpression ( tableName: self . targetTableName,
82- newItem: new,
83- existingItem: existing)
79+ try getUpdateExpression ( tableName: self . targetTableName,
80+ newItem: new,
81+ existingItem: existing)
8482 case let . insert( new: new) :
85- statement = try getInsertExpression ( tableName: self . targetTableName,
86- newItem: new)
83+ try getInsertExpression ( tableName: self . targetTableName,
84+ newItem: new)
8785 case let . deleteAtKey( key: key) :
88- statement = try getDeleteExpression ( tableName: self . targetTableName,
89- existingKey: key)
86+ try getDeleteExpression ( tableName: self . targetTableName,
87+ existingKey: key)
9088 case let . deleteItem( existing: existing) :
91- statement = try getDeleteExpression ( tableName: self . targetTableName,
92- existingItem: existing)
89+ try getDeleteExpression ( tableName: self . targetTableName,
90+ existingItem: existing)
9391 }
9492
9593 return statement
9694 }
9795
9896 internal func entryToStatement(
99- _ entry: TransactionConstraintEntry < some Any , some Any , some Any > ) throws -> String
97+ _ entry: TransactionConstraintEntry < some Any , some Sendable , some Any > ) throws -> String
10098 {
101- let statement : String
102- switch entry {
99+ let statement : String = switch entry {
103100 case let . required( existing: existing) :
104- statement = getExistsExpression ( tableName: self . targetTableName,
105- existingItem: existing)
101+ getExistsExpression ( tableName: self . targetTableName,
102+ existingItem: existing)
106103 }
107104
108105 return statement
@@ -236,11 +233,10 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable {
236233
237234 var isTransactionConflict = false
238235 let reasons = try zip ( cancellationReasons, keys) . compactMap { cancellationReason, entryKey -> DynamoDBTableError ? in
239- let key : CompositePrimaryKey < AttributesType > ?
240- if let item = cancellationReason. item {
241- key = try DynamoDBDecoder ( ) . decode ( . m( item) )
236+ let key : CompositePrimaryKey < AttributesType > ? = if let item = cancellationReason. item {
237+ try DynamoDBDecoder ( ) . decode ( . m( item) )
242238 } else {
243- key = nil
239+ nil
244240 }
245241
246242 let partitionKey = key? . partitionKey ?? entryKey. partitionKey
@@ -339,11 +335,10 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable {
339335
340336 var isTransactionConflict = false
341337 let reasons = try zip ( cancellationReasons, inputKeys) . compactMap { cancellationReason, entryKey -> DynamoDBTableError ? in
342- let key : CompositePrimaryKey < AttributesType > ?
343- if let item = cancellationReason. item {
344- key = try DynamoDBDecoder ( ) . decode ( . m( item) )
338+ let key : CompositePrimaryKey < AttributesType > ? = if let item = cancellationReason. item {
339+ try DynamoDBDecoder ( ) . decode ( . m( item) )
345340 } else {
346- key = nil
341+ nil
347342 }
348343
349344 let partitionKey = key? . partitionKey ?? entryKey. partitionKey
@@ -469,21 +464,20 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable {
469464 }
470465
471466 let statements = try entries. map { entry -> DynamoDBClientTypes . BatchStatementRequest in
472- let statement : String
473- switch entry {
467+ let statement : String = switch entry {
474468 case let . update( new: new, existing: existing) :
475- statement = try getUpdateExpression ( tableName: self . targetTableName,
476- newItem: new,
477- existingItem: existing)
469+ try getUpdateExpression ( tableName: self . targetTableName,
470+ newItem: new,
471+ existingItem: existing)
478472 case let . insert( new: new) :
479- statement = try getInsertExpression ( tableName: self . targetTableName,
480- newItem: new)
473+ try getInsertExpression ( tableName: self . targetTableName,
474+ newItem: new)
481475 case let . deleteAtKey( key: key) :
482- statement = try getDeleteExpression ( tableName: self . targetTableName,
483- existingKey: key)
476+ try getDeleteExpression ( tableName: self . targetTableName,
477+ existingKey: key)
484478 case let . deleteItem( existing: existing) :
485- statement = try getDeleteExpression ( tableName: self . targetTableName,
486- existingItem: existing)
479+ try getDeleteExpression ( tableName: self . targetTableName,
480+ existingItem: existing)
487481 }
488482
489483 return DynamoDBClientTypes . BatchStatementRequest ( consistentRead: self . tableConfiguration. consistentRead, statement: statement)
@@ -515,15 +509,13 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable {
515509 }
516510 }
517511
518- func bulkWriteWithFallback< AttributesType, ItemType, TimeToLiveAttributesType> (
512+ func bulkWriteWithFallback< AttributesType, ItemType: Sendable , TimeToLiveAttributesType> (
519513 _ entries: [ WriteEntry < AttributesType , ItemType , TimeToLiveAttributesType > ] ) async throws
520514 {
521- // fall back to singel operation if the write entry exceeds the statement length limitation
522- var bulkWriteEntries : [ WriteEntry < AttributesType , ItemType , TimeToLiveAttributesType > ] = [ ]
523- let errors : [ DynamoDBTableError ] = try await entries. concurrentCompactMap { entry -> DynamoDBTableError ? in
515+ // fall back to single operation if the write entry exceeds the statement length limitation
516+ let results : [ Result < WriteEntry < AttributesType , ItemType , TimeToLiveAttributesType > , DynamoDBTableError > ] = try await entries. concurrentMap { entry in
524517 do {
525518 try self . validateEntry ( entry: entry)
526- bulkWriteEntries. append ( entry)
527519 } catch DynamoDBTableError . statementLengthExceeded {
528520 do {
529521 switch entry {
@@ -537,11 +529,22 @@ public extension GenericAWSDynamoDBCompositePrimaryKeyTable {
537529 try await self . deleteItem ( existingItem: existing)
538530 }
539531 } catch let error as DynamoDBTableError {
540- return error
532+ return . failure ( error)
541533 }
542534 }
543535
544- return nil
536+ return . success( entry)
537+ }
538+
539+ var bulkWriteEntries : [ WriteEntry < AttributesType , ItemType , TimeToLiveAttributesType > ] = [ ]
540+ var errors : [ DynamoDBTableError ] = [ ]
541+ for result in results {
542+ switch result {
543+ case let . success( entry) :
544+ bulkWriteEntries. append ( entry)
545+ case let . failure( error) :
546+ errors. append ( error)
547+ }
545548 }
546549
547550 do {
0 commit comments