From 20c3b946bcebad750b6d015aa4f2409d4dd1fc7d Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Wed, 10 Dec 2025 11:47:55 +0100 Subject: [PATCH] Migrating task model and backend to use typed_sql. --- app/lib/database/database.dart | 22 + app/lib/search/backend.dart | 3 +- app/lib/service/services.dart | 1 + app/lib/task/backend.dart | 527 +++++++++--------- .../task/cloudcompute/fakecloudcompute.dart | 1 - app/lib/task/models.dart | 11 +- app/lib/task/scheduler.dart | 59 +- app/test/task/task_test.dart | 21 +- 8 files changed, 346 insertions(+), 299 deletions(-) diff --git a/app/lib/database/database.dart b/app/lib/database/database.dart index 9994f78f14..e1ba6c357a 100644 --- a/app/lib/database/database.dart +++ b/app/lib/database/database.dart @@ -14,7 +14,9 @@ import 'package:pub_dev/database/schema.dart'; import 'package:pub_dev/service/secret/backend.dart'; import 'package:pub_dev/shared/configuration.dart'; import 'package:pub_dev/shared/env_config.dart'; +import 'package:pub_dev/shared/exceptions.dart'; import 'package:pub_dev/task/clock_control.dart'; +import 'package:retry/retry.dart'; import 'package:typed_sql/typed_sql.dart'; final _random = Random.secure(); @@ -180,3 +182,23 @@ Future _dropCustomDatabase(String url, String dbName) async { await conn.execute('DROP DATABASE "$dbName";'); await conn.close(force: true); } + +extension DatabaseExt on Database { + Future transactWithRetry(Future Function() fn) async { + return await retry( + () async { + try { + return await transact(fn); + } on TransactionAbortedException catch (e) { + if (e.reason is ResponseException) { + // TODO: we should keep and use the original stacktrace in typed_sql's exception + throw e.reason; + } + rethrow; + } + }, + maxAttempts: 3, + retryIf: (e) => e is DatabaseException, + ); + } +} diff --git a/app/lib/search/backend.dart b/app/lib/search/backend.dart index 92037ae5e8..9930b51a0b 100644 --- a/app/lib/search/backend.dart +++ b/app/lib/search/backend.dart @@ -85,6 +85,7 @@ void registerSearchIndex(SearchIndex index) => /// Datastore-related access methods for the search service class SearchBackend { final DatastoreDB _db; + final VersionedJsonStorage _snapshotStorage; SearchBackend(this._db, Bucket snapshotBucket) @@ -258,7 +259,7 @@ class SearchBackend { addResult(e.name, e.updated); } - await for (final e in _db.tasks.listFinishedSince(updatedThreshold)) { + await for (final e in taskBackend.listFinishedSince(updatedThreshold)) { addResult(e.package, e.finished); } diff --git a/app/lib/service/services.dart b/app/lib/service/services.dart index b91627c918..431c9d2d06 100644 --- a/app/lib/service/services.dart +++ b/app/lib/service/services.dart @@ -338,6 +338,7 @@ Future _withPubServices(FutureOr Function() fn) async { registerTaskBackend( TaskBackend( dbService, + primaryDatabase!.db, storageService.bucket(activeConfiguration.taskResultBucketName!), ), ); diff --git a/app/lib/task/backend.dart b/app/lib/task/backend.dart index 0aaa2cbf68..5ce1a554ce 100644 --- a/app/lib/task/backend.dart +++ b/app/lib/task/backend.dart @@ -7,6 +7,7 @@ import 'package:_pub_shared/data/package_api.dart' as package_api; import 'package:_pub_shared/data/task_api.dart' as api; import 'package:_pub_shared/data/task_payload.dart'; import 'package:_pub_shared/worker/limits.dart'; +import 'package:basics/basics.dart' show MapBasics; import 'package:chunked_stream/chunked_stream.dart' show MaximumSizeExceeded; import 'package:clock/clock.dart'; import 'package:collection/collection.dart'; @@ -19,6 +20,8 @@ import 'package:indexed_blob/indexed_blob.dart' show BlobIndex, FileRange; import 'package:logging/logging.dart' show Logger; import 'package:pana/models.dart' show Summary; import 'package:pool/pool.dart' show Pool; +import 'package:pub_dev/database/database.dart'; +import 'package:pub_dev/database/schema.dart'; import 'package:pub_dev/package/api_export/api_exporter.dart'; import 'package:pub_dev/package/backend.dart'; import 'package:pub_dev/package/models.dart'; @@ -53,12 +56,14 @@ import 'package:pub_dev/task/models.dart' PackageStateInfo, PackageVersionStateInfo, PackageVersionStatus, + TaskStateExt, derivePendingAt, initialTimestamp, maxTaskExecutionTime; import 'package:pub_dev/task/scheduler.dart'; import 'package:pub_semver/pub_semver.dart' show Version; import 'package:shelf/shelf.dart' as shelf; +import 'package:typed_sql/typed_sql.dart' hide AuthenticationException; final _log = Logger('pub.task.backend'); @@ -88,6 +93,7 @@ TaskBackend get taskBackend => ss.lookup(#_taskBackend) as TaskBackend; class TaskBackend { final DatastoreDB _db; + final Database _schema; final Bucket _bucket; /// If [stop] has been called to stop background processes. @@ -106,7 +112,7 @@ class TaskBackend { DeleteInstancesState _deleteInstancesState = DeleteInstancesState.init(); CreateInstancesState _createInstanesState = CreateInstancesState.init(); - TaskBackend(this._db, this._bucket); + TaskBackend(this._db, this._schema, this._bucket); /// Start continuous background processes for scheduling of tasks. /// @@ -257,7 +263,7 @@ class TaskBackend { } // Check that all [PackageState] entities have a matching [Package] entity. - await for (final state in _db.tasks.listAllForCurrentRuntime()) { + await for (final state in _schema.tasksAccess.listAllForCurrentRuntime()) { if (!packageNames.contains(state.package)) { final r = await pool.request(); @@ -269,7 +275,7 @@ class TaskBackend { // no need to delete [PackageState] } else { // no package entry, deleting is needed - await _db.tasks.delete(state.package); + await _schema.tasksAccess.delete(state.package); } } catch (e, st) { _log.severe('failed to untrack "${state.package}"', e, st); @@ -330,7 +336,7 @@ class TaskBackend { Future _runOneInstanceCreation(bool Function() isAbortedFn) async { final result = await runOneCreateInstancesCycle( taskWorkerCloudCompute, - _db, + _schema, state: _createInstanesState, ); _createInstanesState = result.$1; @@ -352,14 +358,14 @@ class TaskBackend { ); } on NotFoundException catch (_) { // If package is not visible, we should remove it! - await _db.tasks.deleteAllStates(packageName); + await _schema.tasksAccess.deleteAllStates(packageName); return; } final versions = _versionsToTrack( data, ).map((v) => v.canonicalizedVersion).toList(); final changed = await withRetryTransaction(_db, (tx) async { - final state = await tx.tasks.lookupOrNull(packageName); + final task = await _schema.tasksAccess.lookupOrNull(packageName); latestVersion = data.latest.version; // Update the timestamp for when the last version was published. @@ -367,7 +373,7 @@ class TaskBackend { lastVersionCreated = data.versions.map((pv) => pv.published!).max; // Ensure we have PackageState entity - if (state == null) { + if (task == null) { // Create [PackageState] entity to track the package _log.info('Started state tracking for $packageName'); final versionsMap = { @@ -377,31 +383,31 @@ class TaskBackend { attempts: 0, ), }; - await tx.tasks.insert( - PackageState() - ..setId(runtimeVersion, packageName) - ..runtimeVersion = runtimeVersion - ..versions = versionsMap - ..dependencies = [] - ..lastDependencyChanged = initialTimestamp - ..finished = initialTimestamp - ..pendingAt = derivePendingAt( - versions: versionsMap, - lastDependencyChanged: initialTimestamp, - ), - ); + await _schema.tasks + .insert( + runtimeVersion: runtimeVersion.asExpr, + package: packageName.asExpr, + state: TaskState(versions: versionsMap, abortedTokens: []).asExpr, + lastDependencyChanged: initialTimestamp.asExpr, + finished: initialTimestamp.asExpr, + pendingAt: derivePendingAt( + versions: versionsMap, + lastDependencyChanged: initialTimestamp, + ).asExpr, + ) + .execute(); return true; // no more work for this package, state is synced } // List versions that not tracked, but should be final untrackedVersions = [ - ...versions.whereNot(state.versions!.containsKey), + ...versions.whereNot(task.state.versions.containsKey), ]; // List of versions that are tracked, but don't exist. These have // probably been deselected by _versionsToTrack. final deselectedVersions = [ - ...state.versions!.keys.whereNot(versions.contains), + ...task.state.versions.keys.whereNot(versions.contains), ]; // There should never be an overlap between versions untracked and @@ -418,39 +424,48 @@ class TaskBackend { return false; } - state.abortedTokens = [ - ...state.versions!.entries - .where((e) => deselectedVersions.contains(e.key)) - .map((e) => e.value) - .where((vs) => vs.secretToken != null) - .map( - (vs) => AbortedTokenInfo( - token: vs.secretToken!, - expires: vs.scheduled.add(maxTaskExecutionTime), + final oldState = task.state; + final newState = TaskState( + abortedTokens: [ + ...oldState.versions.entries + .where((e) => deselectedVersions.contains(e.key)) + .map((e) => e.value) + .where((vs) => vs.secretToken != null) + .map( + (vs) => AbortedTokenInfo( + token: vs.secretToken!, + expires: vs.scheduled.add(maxTaskExecutionTime), + ), ), - ), - ...?state.abortedTokens, - ].where((t) => t.isNotExpired).take(50).toList(); - - // Make changes! - state.versions! - // Remove versions that have been deselected - ..removeWhere((v, _) => deselectedVersions.contains(v)) - // Add versions we should be tracking - ..addAll({ + ...task.state.abortedTokens, + ].where((t) => t.isNotExpired).take(50).toList(), + versions: { + // Remove versions that have been deselected + ...oldState.versions.whereKey( + (key) => !deselectedVersions.contains(key), + ), + // Add versions we should be tracking for (final v in untrackedVersions) v: PackageVersionStateInfo( scheduled: initialTimestamp, attempts: 0, ), - }); - state.pendingAt = derivePendingAt( - versions: state.versions!, - lastDependencyChanged: state.lastDependencyChanged!, + }, ); _log.info('Update state tracking for $packageName'); - await tx.tasks.update(state); + await _schema.tasks + .byKey(runtimeVersion, packageName) + .update( + (_, set) => set( + state: newState.asExpr, + pendingAt: derivePendingAt( + versions: newState.versions, + lastDependencyChanged: task.lastDependencyChanged, + ).asExpr, + ), + ) + .execute(); return true; }); @@ -469,7 +484,7 @@ class TaskBackend { /// Garbage collect [PackageState] and results from old runtimeVersions. Future garbageCollect() async { - await _db.tasks.deleteBeforeGcRuntime(); + await _schema.tasksAccess.deleteBeforeGcRuntime(); // Limit to 50 concurrent deletion requests final pool = Pool(50); @@ -533,7 +548,10 @@ class TaskBackend { // // We only update [PackageState] to have [lastDependencyChanged], this // ensures that there is no risk of indefinite propagation. - final stream = _db.tasks.listDependenciesOfPackage(package, publishedAt); + final stream = _schema.tasksAccess.listDependenciesOfPackage( + package, + publishedAt, + ); await for (final state in stream) { final r = await pool.request(); @@ -541,7 +559,7 @@ class TaskBackend { // and logs any failures before always releasing the [r]. scheduleMicrotask(() async { try { - final changed = await _db.tasks.updateDependencyChanged( + final changed = await _schema.tasksAccess.updateDependencyChanged( state.package, publishedAt, ); @@ -579,8 +597,8 @@ class TaskBackend { throw AuthenticationException.authenticationRequired(); } - final state = await _db.tasks.lookupOrNull(package); - if (state == null) { + final task = await _schema.tasksAccess.lookupOrNull(package); + if (task == null) { throw NotFoundException.resource( 'PackageState($runtimeVersion/$package)', ); @@ -588,7 +606,7 @@ class TaskBackend { final versionState = _authorizeWorkerCallback( package, version, - state, + task.state, token, ); @@ -656,9 +674,20 @@ class TaskBackend { await _gzippedTaskResult(index, 'summary.json'), ); final hasDocIndexHtml = index.lookup('doc/index.html') != null; - await withRetryTransaction(_db, (tx) async { - final state = await tx.tasks.lookupOrNull(package); - if (state == null) { + + final existingDependencyList = await _schema.taskDependencies + .where( + (dep) => + dep.runtimeVersion.equalsValue(runtimeVersion) & + dep.package.equalsValue(package), + ) + .select((dep) => (dep.dependency,)) + .fetch(); + final existingDependencySet = existingDependencyList.toSet(); + + await _schema.transactWithRetry(() async { + final task = await _schema.tasksAccess.lookupOrNull(package); + if (task == null) { throw NotFoundException.resource( 'PackageState($runtimeVersion/$package)', ); @@ -666,56 +695,80 @@ class TaskBackend { final versionState = _authorizeWorkerCallback( package, version, - state, + task.state, token, ); // Update dependencies, if pana summary has dependencies - if (summary != null && summary.allDependencies != null) { - final updatedDependencies = _updatedDependencies( - state.dependencies, - summary.allDependencies, - // for logging only - package: package, - version: version, - ); - // Only update if new dependencies have been discovered. - // This avoids unnecessary churn on datastore when there is no changes. - if (state.dependencies != updatedDependencies && - !{...?state.dependencies}.containsAll(updatedDependencies)) { - state.dependencies = updatedDependencies; + for (final dependency in summary?.allDependencies ?? const []) { + if (existingDependencySet.contains(dependency)) continue; + + bool isValid = false; + try { + // TODO: These sanity checks should probably split out, into a general + // extension method on [Summary]. The idea here is to protect + // against invalid data from the sandbox. We should consider all + // the output we get from the sandbox as suspect :D + InvalidInputException.checkPackageName(dependency); + isValid = true; + } on ResponseException { + _log.shout( + 'pub_worker responses with summary.allDependencies containing "$dependency"' + ' in package "$package" version "$version"', + ); } + + if (!isValid) continue; + await _schema.taskDependencies + .insert( + runtimeVersion: runtimeVersion.asExpr, + package: package.asExpr, + dependency: dependency.asExpr, + ) + .execute(); } zone = versionState.zone!; instance = versionState.instance!; // Remove instanceName, zone, secretToken, and set attempts = 0 - state.versions![version] = PackageVersionStateInfo( - scheduled: versionState.scheduled, - docs: hasDocIndexHtml, - pana: summary != null, - finished: true, - attempts: 0, - instance: null, // version is no-longer running on this instance - secretToken: null, // TODO: Consider retaining this for idempotency - zone: null, - ); + final newVersions = { + ...task.state.versions, + version: PackageVersionStateInfo( + scheduled: versionState.scheduled, + docs: hasDocIndexHtml, + pana: summary != null, + finished: true, + attempts: 0, + instance: null, // version is no-longer running on this instance + secretToken: null, // TODO: Consider retaining this for idempotency + zone: null, + ), + }; // Determine if something else was running on the instance - isInstanceDone = state.versions!.values.none( - (v) => v.instance == instance, - ); + isInstanceDone = newVersions.values.none((v) => v.instance == instance); // Ensure that we update [state.pendingAt], otherwise it might be // re-scheduled way too soon. - state.pendingAt = derivePendingAt( - versions: state.versions!, - lastDependencyChanged: state.lastDependencyChanged!, + final pendingAt = derivePendingAt( + versions: newVersions, + lastDependencyChanged: task.lastDependencyChanged, ); - state.finished = clock.now().toUtc(); - await tx.tasks.update(state); + await _schema.tasks + .byKey(runtimeVersion, package) + .update( + (_, set) => set( + state: TaskState( + versions: newVersions, + abortedTokens: task.state.abortedTokens, + ).asExpr, + pendingAt: pendingAt.asExpr, + finished: clock.now().toUtc().asExpr, + ), + ) + .execute(); }); // Clearing the state cache after the update. @@ -966,15 +1019,18 @@ class TaskBackend { Future packageStatus(String package) async { final status = await cache.taskPackageStatus(package).get(() async { for (final rt in acceptedRuntimeVersions) { - final state = await _db.tasks.lookupOrNull(package, runtimeVersion: rt); + final task = await _schema.tasksAccess.lookupOrNull( + package, + runtimeVersion: rt, + ); // skip states where the entry was created, but no analysis has not finished yet - if (state == null || state.hasNeverFinished) { + if (task == null || task.hasNeverFinished) { continue; } return PackageStateInfo( - runtimeVersion: state.runtimeVersion!, + runtimeVersion: task.runtimeVersion, package: package, - versions: state.versions ?? {}, + versions: task.state.versions, ); } return PackageStateInfo.empty(package: package); @@ -999,11 +1055,11 @@ class TaskBackend { Future Function(Payload payload) processPayload, ) async { await backfillTrackingState(); - await for (final state in _db.tasks.listAllForCurrentRuntime()) { + await for (final state in _schema.tasksAccess.listAllForCurrentRuntime()) { final zone = taskWorkerCloudCompute.zones.first; // ignore: invalid_use_of_visible_for_testing_member final updated = await updatePackageStateWithPendingVersions( - _db, + _schema, state.package, zone, taskWorkerCloudCompute.generateInstanceName(), @@ -1020,7 +1076,7 @@ class TaskBackend { Future adminBumpPriority(String packageName) async { // Ensure we're up-to-date. await trackPackage(packageName); - await _db.tasks.bumpPriority(packageName); + await _schema.tasksAccess.bumpPriority(packageName); } /// Returns the latest version of the [package] which has a finished analysis. @@ -1029,12 +1085,15 @@ class TaskBackend { Future latestFinishedVersion(String package) async { final cachedValue = await cache.latestFinishedVersion(package).get(() async { for (final rt in acceptedRuntimeVersions) { - final state = await _db.tasks.lookupOrNull(package, runtimeVersion: rt); + final task = await _schema.tasksAccess.lookupOrNull( + package, + runtimeVersion: rt, + ); // skip states where the entry was created, but no analysis has not finished yet - if (state == null || state.hasNeverFinished) { + if (task == null || task.hasNeverFinished) { continue; } - final bestVersion = state.versions?.entries + final bestVersion = task.state.versions.entries .where((e) => e.value.finished) .map((e) => Version.parse(e.key)) .latestVersion; @@ -1072,31 +1131,30 @@ class TaskBackend { () async { final semanticVersion = Version.parse(version); for (final rt in acceptedRuntimeVersions) { - final state = await _db.tasks.lookupOrNull( + final task = await _schema.tasksAccess.lookupOrNull( package, runtimeVersion: rt, ); // Skip states where the entry was created, but the analysis has not finished yet. - if (state == null || state.hasNeverFinished) { + if (task == null || task.hasNeverFinished) { continue; } List? candidates; if (preferDocsCompleted) { - final finishedDocCandidates = state.versions?.entries + final finishedDocCandidates = task.state.versions.entries .where((e) => e.value.docs) .map((e) => Version.parse(e.key)) .toList(); - if (finishedDocCandidates != null && - finishedDocCandidates.isNotEmpty) { + if (finishedDocCandidates.isNotEmpty) { candidates = finishedDocCandidates; } } - candidates ??= state.versions?.entries + candidates ??= task.state.versions.entries .where((e) => e.value.finished) .map((e) => Version.parse(e.key)) .toList(); - if (candidates == null || candidates.isEmpty) { + if (candidates.isEmpty) { continue; } if (candidates.contains(semanticVersion)) { @@ -1118,6 +1176,10 @@ class TaskBackend { ); return (cachedValue == null || cachedValue.isEmpty) ? null : cachedValue; } + + Stream<({String package, DateTime finished})> listFinishedSince( + DateTime since, + ) => _schema.tasksAccess.listFinishedSince(since); } final _blobIdPattern = RegExp(r'^[^/]+/[^/]+/[^/]+/[0-9a-fA-F]+\.blob$'); @@ -1143,18 +1205,18 @@ String? _extractBearerToken(shelf.Request request) { PackageVersionStateInfo _authorizeWorkerCallback( String package, String version, - PackageState state, + TaskState state, String token, ) { // fixed-time verification of aborted tokens final isKnownAbortedToken = state.abortedTokens - ?.map((t) => t.isAuthorized(token)) + .map((t) => t.isAuthorized(token)) .fold(false, (a, b) => a || b); - if (isKnownAbortedToken ?? false) { + if (isKnownAbortedToken) { throw TaskAbortedException('$package/$version has been aborted.'); } - final versionState = state.versions![version]; + final versionState = state.versions[version]; if (versionState == null) { throw TaskAbortedException('The provided token is invalid or expired.'); } @@ -1235,108 +1297,52 @@ List _versionsToTrack(package_api.PackageData data) { }.nonNulls.where(visibleVersions.contains).toList(); } -List _updatedDependencies( - List? dependencies, - List? discoveredDependencies, { - required String package, - required String version, -}) { - dependencies ??= []; - discoveredDependencies ??= []; - - // If discoveredDependencies is in dependencies, then we're done. - if (dependencies.toSet().containsAll(discoveredDependencies)) { - return dependencies; - } - - // Check if any of the dependencies returned have invalid names, if this is - // the case, then we should ignore the entire result! - final hasBadDependencies = discoveredDependencies.any((dep) { - try { - // TODO: These sanity checks should probably split out, into a general - // extension method on [Summary]. The idea here is to protect - // against invalid data from the sandbox. We should consider all - // the output we get from the sandbox as suspect :D - InvalidInputException.checkPackageName(dep); - return false; - } on ResponseException { - _log.shout( - 'pub_worker responses with summary.allDependencies containing "$dep"' - ' in package "$package" version "$version"', - ); - return true; - } - }); - if (hasBadDependencies) { - return dependencies; // no changes! - } - - // An indexed property cannot be larger than 1500 bytes, strings counts as - // length + 1, so we prefer newly [discoveredDependencies] and then choose - // [dependencies], after which we just pick the dependencies we can get while - // staying below 1500 bytes. - var size = 0; - return discoveredDependencies - .followedBy(dependencies.whereNot(discoveredDependencies.contains)) - .takeWhile((p) => (size += p.length + 1) < 1500) - .sorted(); -} - -/// Low-level, narrowly typed data access methods for [PackageState] entity. -extension TaskDatastoreDBExt on DatastoreDB { - _TaskDataAccess get tasks => _TaskDataAccess(this); -} - -extension TaskTransactionWrapperExt on TransactionWrapper { - _TaskTransactionDataAcccess get tasks => _TaskTransactionDataAcccess(this); +/// Low-level, narrowly typed data access methods for [Task] entity. +extension TaskDatabaseExt on Database { + _TaskDataAccess get tasksAccess => _TaskDataAccess(primaryDatabase!.db); } final class _TaskDataAccess { - final DatastoreDB _db; + late final Database _schema; - _TaskDataAccess(this._db); + _TaskDataAccess(this._schema); - Future lookupOrNull( - String package, { - String? runtimeVersion, - }) async { - final key = PackageState.createKey( - _db.emptyKey, - runtimeVersion ?? shared_versions.runtimeVersion, - package, - ); - return await _db.lookupOrNull(key); + Future lookupOrNull(String package, {String? runtimeVersion}) async { + return await _schema.tasks + .byKey(runtimeVersion ?? shared_versions.runtimeVersion, package) + .fetch(); } Future delete(String package) async { - final key = PackageState.createKey(_db.emptyKey, runtimeVersion, package); - await _db.commit(deletes: [key]); + await _schema.tasks.byKey(runtimeVersion, package).delete().execute(); } - // GC the old [PackageState] entities + // GC the old [Task] entities Future deleteBeforeGcRuntime() async { - await _db.deleteWithQuery( - _db.query() - ..filter('runtimeVersion <', gcBeforeRuntimeVersion), - ); + await _schema.tasks + .where((task) => task.runtimeVersion < gcBeforeRuntimeVersion.asExpr) + .delete() + .execute(); } Stream<({String package})> listAllForCurrentRuntime() async* { - final query = _db.query() - ..filter('runtimeVersion =', runtimeVersion); - await for (final ps in query.run()) { - yield (package: ps.package); + final query = _schema.tasks + .where((task) => task.runtimeVersion.equalsValue(runtimeVersion)) + .select((task) => (task.package,)); + await for (final row in query.stream()) { + yield (package: row); } } Stream<({String package, DateTime finished})> listFinishedSince( DateTime since, ) async* { - final query = _db.query() - ..filter('finished >=', since) - ..order('-finished'); - await for (final s in query.run()) { - yield (package: s.package, finished: s.finished); + final query = _schema.tasks + .where((task) => task.finished >= since.asExpr) + .orderBy((task) => [(task.finished, Order.descending)]) + .select((task) => (task.package, task.finished)); + await for (final row in query.stream()) { + yield (package: row.$1, finished: row.$2); } } @@ -1344,33 +1350,40 @@ final class _TaskDataAccess { String package, DateTime publishedAt, ) async* { - final query = _db.query() - ..filter('dependencies =', package) - ..filter('lastDependencyChanged <', publishedAt); - await for (final ps in query.run()) { - yield (package: ps.package); + final query = _schema.taskDependencies + .join(_schema.tasks) + .usingTask() + .where( + (dep, task) => + dep.runtimeVersion.equalsValue(runtimeVersion) & + dep.dependency.equalsValue(package) & + task.lastDependencyChanged.isBeforeValue(publishedAt), + ) + .select((dep, _) => (dep.package,)); + await for (final row in query.stream()) { + yield (package: row); } } Stream<({String package})> selectSomePending(int limit) async* { - final query = _db.query() - ..filter('runtimeVersion =', runtimeVersion) - ..filter('pendingAt <=', clock.now()) - ..order('pendingAt') - ..limit(limit); - await for (final ps in query.run()) { - yield (package: ps.package); + final query = _schema.tasks + .where( + (task) => + task.runtimeVersion.equalsValue(runtimeVersion) & + task.pendingAt.isBeforeValue(clock.now()), + ) + .orderBy((task) => [(task.pendingAt, Order.ascending)]) + .select((task) => (task.package,)) + .limit(limit); + await for (final row in query.stream()) { + yield (package: row); } } Future deleteAllStates(String name) async { - await withRetryTransaction(_db, (tx) async { - // also delete earlier runtime versions + await _schema.transactWithRetry(() async { for (final rv in acceptedRuntimeVersions) { - final s = await lookupOrNull(name, runtimeVersion: rv); - if (s != null) { - tx.delete(s.key); - } + await _schema.tasks.delete(rv, name).execute(); } }); } @@ -1380,24 +1393,29 @@ final class _TaskDataAccess { String package, DateTime publishedAt, ) async { - return await withRetryTransaction(_db, (tx) async { + return await _schema.transactWithRetry(() async { // Reload [state] within a transaction to avoid overwriting changes // made by others trying to update state for another package. - final s = await tx.tasks.lookupOrNull(package); + + final s = await _schema.tasks.byKey(runtimeVersion, package).fetch(); if (s == null) { // No entry has been created yet, probably because of a new deployment rolling out. // We can ignore it for now. return false; } - if (s.lastDependencyChanged!.isBefore(publishedAt)) { - tx.insert( - s - ..lastDependencyChanged = publishedAt - ..pendingAt = derivePendingAt( - versions: s.versions!, - lastDependencyChanged: publishedAt, - ), - ); + if (s.lastDependencyChanged.isBefore(publishedAt)) { + await _schema.tasks + .byKey(runtimeVersion, package) + .update( + (_, set) => set( + lastDependencyChanged: publishedAt.asExpr, + pendingAt: derivePendingAt( + versions: s.state.versions, + lastDependencyChanged: publishedAt, + ).asExpr, + ), + ) + .execute(); return true; } return false; @@ -1405,13 +1423,14 @@ final class _TaskDataAccess { } Future bumpPriority(String packageName) async { - await withRetryTransaction(_db, (tx) async { - final state = await tx.tasks.lookupOrNull(packageName); - if (state != null) { - state.pendingAt = initialTimestamp; - tx.insert(state); - } - }); + await _schema.tasks + .where( + (task) => + task.runtimeVersion.endsWithValue(runtimeVersion) & + task.package.equalsValue(packageName), + ) + .update((_, set) => set(pendingAt: initialTimestamp.asExpr)) + .execute(); } /// Restores the previous versions map state when starting the tasks on [instanceName] failed. @@ -1420,48 +1439,34 @@ final class _TaskDataAccess { String instanceName, Map previousVersionsMap, ) async { - await withRetryTransaction(_db, (tx) async { - final s = await tx.tasks.lookupOrNull(packageName); + await _schema.transactWithRetry(() async { + final s = await _schema.tasks.byKey(runtimeVersion, packageName).fetch(); if (s == null) { return; // Presumably, the package was deleted. } - s.versions!.addEntries( - s.versions!.entries + final versions = s.state.versions; + versions.addEntries( + versions.entries .where((e) => e.value.instance == instanceName) .map((e) => MapEntry(e.key, previousVersionsMap[e.key]!)), ); - s.pendingAt = derivePendingAt( - versions: s.versions!, - lastDependencyChanged: s.lastDependencyChanged!, - ); - await tx.tasks.update(s); - }); - } -} - -class _TaskTransactionDataAcccess { - final TransactionWrapper _tx; - - _TaskTransactionDataAcccess(this._tx); - Future lookupOrNull( - String name, { - String? runtimeVersion, - }) async { - final key = PackageState.createKey( - _tx.emptyKey, - runtimeVersion ?? shared_versions.runtimeVersion, - name, - ); - return await _tx.lookupOrNull(key); - } - - Future insert(PackageState state) async { - _tx.insert(state); - } - - Future update(PackageState state) async { - _tx.insert(state); + await _schema.tasks + .byKey(runtimeVersion, packageName) + .update( + (_, set) => set( + state: TaskState( + versions: versions, + abortedTokens: s.state.abortedTokens, + ).asExpr, + pendingAt: derivePendingAt( + versions: versions, + lastDependencyChanged: s.lastDependencyChanged, + ).asExpr, + ), + ) + .execute(); + }); } } diff --git a/app/lib/task/cloudcompute/fakecloudcompute.dart b/app/lib/task/cloudcompute/fakecloudcompute.dart index a215204b13..3f6bbdf1a5 100644 --- a/app/lib/task/cloudcompute/fakecloudcompute.dart +++ b/app/lib/task/cloudcompute/fakecloudcompute.dart @@ -136,7 +136,6 @@ final class FakeCloudCompute extends CloudCompute { if (!_instances.any((i) => i.instanceName == instanceName)) { throw StateError('instance "$instanceName" does not exist'); } - final instance = _instances.firstWhere( (i) => i.instanceName == instanceName, ); diff --git a/app/lib/task/models.dart b/app/lib/task/models.dart index 1ccd3bba4c..88417a5375 100644 --- a/app/lib/task/models.dart +++ b/app/lib/task/models.dart @@ -7,6 +7,7 @@ import 'dart:convert' show json; import 'package:clock/clock.dart'; import 'package:json_annotation/json_annotation.dart'; import 'package:pub_dev/admin/actions/actions.dart'; +import 'package:pub_dev/database/schema.dart'; import 'package:pub_dev/shared/utils.dart'; import 'package:pub_semver/pub_semver.dart'; @@ -133,10 +134,6 @@ class PackageState extends db.ExpandoModel { @db.DateTimeProperty(required: true, indexed: true) DateTime finished = initialTimestamp; - /// Returns true if the current [PackageState] instance is new, no version analysis - /// has not completed yet (with neither success nor failure). - bool get hasNeverFinished => finished == initialTimestamp; - @override String toString() => 'PackageState(' + @@ -248,6 +245,12 @@ List derivePendingVersions({ return list.map((s) => s.toString()).toList(); } +extension TaskStateExt on Task { + /// Returns true if the current [PackageState] instance is new, no version analysis + /// has not completed yet (with neither success nor failure). + bool get hasNeverFinished => finished == initialTimestamp; +} + /// State of a given `version` within a [PackageState]. @JsonSerializable() class PackageVersionStateInfo { diff --git a/app/lib/task/scheduler.dart b/app/lib/task/scheduler.dart index 4ec22c2e40..a78ab8d405 100644 --- a/app/lib/task/scheduler.dart +++ b/app/lib/task/scheduler.dart @@ -7,13 +7,16 @@ import 'package:basics/basics.dart'; import 'package:clock/clock.dart'; import 'package:logging/logging.dart' show Logger; import 'package:meta/meta.dart'; +import 'package:pub_dev/database/database.dart'; +import 'package:pub_dev/database/schema.dart'; import 'package:pub_dev/package/backend.dart'; import 'package:pub_dev/shared/configuration.dart'; -import 'package:pub_dev/shared/datastore.dart'; import 'package:pub_dev/shared/utils.dart'; +import 'package:pub_dev/shared/versions.dart'; import 'package:pub_dev/task/backend.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; import 'package:pub_dev/task/models.dart'; +import 'package:typed_sql/typed_sql.dart'; final _log = Logger('pub.task.schedule'); @@ -32,7 +35,7 @@ final class CreateInstancesState { /// Schedule tasks from [PackageState], creating cloud compute worker instances. Future<(CreateInstancesState, Duration)> runOneCreateInstancesCycle( CloudCompute compute, - DatastoreDB db, { + Database db, { required CreateInstancesState state, }) async { // Map from zone to DateTime when zone is allowed again @@ -179,7 +182,7 @@ Future<(CreateInstancesState, Duration)> runOneCreateInstancesCycle( // suppose to run on the instance we just failed to create. // If this doesn't work, we'll eventually retry. Hence, correctness // does not hinge on this transaction being successful. - await db.tasks.restorePreviousVersionsState( + await db.tasksAccess.restorePreviousVersionsState( selected.package, instanceName, oldVersionsMap, @@ -189,7 +192,7 @@ Future<(CreateInstancesState, Duration)> runOneCreateInstancesCycle( // Creating an instance can be slow, we want to schedule them concurrently. await Future.wait( - (await db.tasks.selectSomePending(selectLimit).toList()).map( + (await db.tasksAccess.selectSomePending(selectLimit).toList()).map( scheduleInstance, ), ); @@ -226,23 +229,22 @@ Future<(CreateInstancesState, Duration)> runOneCreateInstancesCycle( @visibleForTesting Future<(Payload, Map)?> updatePackageStateWithPendingVersions( - DatastoreDB db, + Database schema, String package, String zone, String instanceName, ) async { - return await withRetryTransaction(db, (tx) async { - final s = await tx.tasks.lookupOrNull(package); - if (s == null) { + return schema.transactWithRetry(() async { + final task = await schema.tasksAccess.lookupOrNull(package); + if (task == null) { // presumably the package was deleted. return null; } - final oldVersionsMap = {...?s.versions}; final now = clock.now(); final pendingVersions = derivePendingVersions( - versions: s.versions!, - lastDependencyChanged: s.lastDependencyChanged!, + versions: task.state.versions, + lastDependencyChanged: task.lastDependencyChanged, at: now, ).toList(); if (pendingVersions.isEmpty) { @@ -250,32 +252,45 @@ updatePackageStateWithPendingVersions( return null; } + final oldVersionsMap = {...task.state.versions}; + // Update PackageState - s.versions!.addAll({ + final newVersions = { + ...task.state.versions, for (final v in pendingVersions.map((v) => v.toString())) v: PackageVersionStateInfo( scheduled: now, - attempts: s.versions![v]!.attempts + 1, + attempts: task.state.versions[v]!.attempts + 1, zone: zone, instance: instanceName, secretToken: createUuid(), - finished: s.versions![v]!.finished, + finished: task.state.versions[v]!.finished, ), - }); - s.pendingAt = derivePendingAt( - versions: s.versions!, - lastDependencyChanged: s.lastDependencyChanged!, - ); - await tx.tasks.update(s); + }; + await schema.tasks + .byKey(runtimeVersion, package) + .update( + (_, set) => set( + state: TaskState( + versions: newVersions, + abortedTokens: task.state.abortedTokens, + ).asExpr, + pendingAt: derivePendingAt( + versions: newVersions, + lastDependencyChanged: task.lastDependencyChanged, + ).asExpr, + ), + ) + .execute(); // Create payload final payload = Payload( - package: s.package, + package: task.package, pubHostedUrl: activeConfiguration.defaultServiceBaseUrl, versions: pendingVersions.map( (v) => VersionTokenPair( version: v.toString(), - token: s.versions![v.toString()]!.secretToken!, + token: newVersions[v.toString()]!.secretToken!, ), ), ); diff --git a/app/test/task/task_test.dart b/app/test/task/task_test.dart index 90e3469ee6..9d978552ec 100644 --- a/app/test/task/task_test.dart +++ b/app/test/task/task_test.dart @@ -9,12 +9,11 @@ import 'dart:io'; import 'package:_pub_shared/data/package_api.dart' show UploadInfo; import 'package:_pub_shared/data/task_payload.dart'; import 'package:clock/clock.dart'; -import 'package:gcloud/db.dart'; import 'package:http/http.dart' as http; import 'package:http_parser/http_parser.dart' show MediaType; import 'package:indexed_blob/indexed_blob.dart'; import 'package:pana/pana.dart'; -import 'package:pub_dev/shared/versions.dart'; +import 'package:pub_dev/database/database.dart'; import 'package:pub_dev/task/backend.dart'; import 'package:pub_dev/task/cloudcompute/fakecloudcompute.dart'; import 'package:pub_dev/task/models.dart'; @@ -736,12 +735,13 @@ void main() { await clockControl.elapse(minutes: 15); // verify token is now aborted - final ps = await dbService.lookupValue( - PackageState.createKey(dbService.emptyKey, runtimeVersion, 'neon'), + final ps = await primaryDatabase!.db.tasksAccess.lookupOrNull('neon'); + expect(ps!.state.versions[v.version]?.secretToken, isNull); + expect(ps.state.abortedTokens, isNotEmpty); + expect( + ps.state.abortedTokens.where((x) => x.token == v.token), + isNotEmpty, ); - expect(ps.versions?[v.version]?.secretToken, isNull); - expect(ps.abortedTokens, isNotEmpty); - expect(ps.abortedTokens?.where((x) => x.token == v.token), isNotEmpty); // Use token to get the upload information final api = createPubApiClient(authToken: v.token); @@ -777,10 +777,11 @@ void main() { ], ), ); - final ps = await dbService.lookupValue( - PackageState.createKey(dbService.emptyKey, runtimeVersion, 'neon'), + final ps = await primaryDatabase!.db.tasksAccess.lookupOrNull('neon'); + expect( + ps!.state.abortedTokens.where((x) => x.token == v.token), + isEmpty, ); - expect(ps.abortedTokens?.where((x) => x.token == v.token), isEmpty); // Report the task as finished final api = createPubApiClient(authToken: v.token);