Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions apps/workers/src/workers/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,11 @@ export async function setupSchedules() {
{ pattern: '0 0 1 * * *' },
{ opts: { attempts: 1 } },
)

// Every day at 2 AM - Removes logs older than 30 days from free plan accounts
await maintenanceQueue.upsertJobScheduler(
'scheduleMigrateSpansJobs',
{ pattern: '0 0 1 * * *' },
{ opts: { attempts: 1 } },
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const jobMappings = {
requestDocumentSuggestionsJob: jobs.requestDocumentSuggestionsJob,
scaleDownMcpServerJob: jobs.scaleDownMcpServerJob,
updateMcpServerLastUsedJob: jobs.updateMcpServerLastUsedJob,
migrateSpansJobs: jobs.migrateSpansJobs,
}

export function startMaintenanceWorker() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ export * from './refreshProjectsStatsCacheJob'
export * from './refreshProjectStatsCacheJob'
export * from './scheduleWorkspaceCleanupJobs'
export * from './cleanupWorkspaceOldLogsJob'
export * from './migrateSpansJobs'
154 changes: 154 additions & 0 deletions packages/core/src/jobs/job-definitions/maintenance/migrateSpansJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import { Job } from 'bullmq'
import { and, asc, eq, gt, inArray, isNotNull, isNull, or } from 'drizzle-orm'
import { commits, documentLogs, spans } from '../../../schema'
import Transaction from '../../../lib/Transaction'
import { Result } from '../../../lib/Result'
import { SpanType } from '@latitude-data/constants'

type MigrateSpansJobData = {
workspaceId: number
}

export const migrateSpansJob = async (job: Job<MigrateSpansJobData>) => {
const { workspaceId } = job.data

// Calculate cutoff date (30 days ago)
const cutoffDate = new Date()
cutoffDate.setDate(cutoffDate.getDate() - 30)

// Cache for commits to avoid redundant queries
const commitCache = new Map<number, string>()

return await new Transaction().call(async (tx) => {
const batchSize = 1000
let cursor: { startedAt: Date; id: string } | null = null
let processedSpans = 0

while (true) {
// Query spans that need migration: have documentLogUuid but missing documentUuid/commitUuid/experimentId
// Only process spans of type 'prompt'
// Use cursor-based pagination for efficiency
let whereClause = and(
eq(spans.workspaceId, workspaceId),
eq(spans.type, SpanType.Prompt),
gt(spans.startedAt, cutoffDate),
isNotNull(spans.documentLogUuid),
isNull(spans.documentUuid),
isNull(spans.commitUuid),
isNull(spans.experimentId),
)

if (cursor) {
whereClause = and(
whereClause,
or(
gt(spans.startedAt, cursor.startedAt),
and(eq(spans.startedAt, cursor.startedAt), gt(spans.id, cursor.id)),
),
)
}

const spansToMigrate = await tx
.select({
id: spans.id,
traceId: spans.traceId,
documentLogUuid: spans.documentLogUuid,
startedAt: spans.startedAt,
})
.from(spans)
.where(whereClause)
.orderBy(asc(spans.startedAt), asc(spans.id))
.limit(batchSize)

if (spansToMigrate.length === 0) break

// Update cursor for next batch
const lastSpan = spansToMigrate[spansToMigrate.length - 1]
cursor = { startedAt: lastSpan.startedAt, id: lastSpan.id }

// Collect unique document log UUIDs
const documentLogUuids = spansToMigrate.map((s) => s.documentLogUuid!)
const uniqueDocumentLogUuids = [...new Set(documentLogUuids)]

// Fetch document logs data
const documentLogsData = await tx
.select({
uuid: documentLogs.uuid,
documentUuid: documentLogs.documentUuid,
commitId: documentLogs.commitId,
experimentId: documentLogs.experimentId,
})
.from(documentLogs)
.where(inArray(documentLogs.uuid, uniqueDocumentLogUuids))

// Create maps for quick lookup
const documentLogMap = new Map(
documentLogsData.map((dl) => [dl.uuid, dl]),
)

// Collect unique commit IDs that need UUID lookup
const commitIds = documentLogsData
.map((dl) => dl.commitId)
.filter((id): id is number => id !== null && !commitCache.has(id))

// Fetch commit UUIDs for uncached commits
if (commitIds.length > 0) {
const commitsData = await tx
.select({
id: commits.id,
uuid: commits.uuid,
})
.from(commits)
.where(inArray(commits.id, commitIds))

commitsData.forEach((c) => commitCache.set(c.id, c.uuid))
}

// Prepare bulk update data
const updateData = spansToMigrate
.map((span) => {
const docLog = documentLogMap.get(span.documentLogUuid!)
if (!docLog) return null

const commitUuid = docLog.commitId
? commitCache.get(docLog.commitId) || null
: null

return {
id: span.id,
traceId: span.traceId,
documentUuid: docLog.documentUuid,
commitUuid,
experimentId: docLog.experimentId,
}
})
.filter(
(update): update is NonNullable<typeof update> => update !== null,
)

if (updateData.length > 0) {
// Update spans individually since we need to set specific values
for (const update of updateData) {
await tx
.update(spans)
.set({
documentUuid: update.documentUuid,
commitUuid: update.commitUuid,
experimentId: update.experimentId,
})
.where(
and(eq(spans.traceId, update.traceId), eq(spans.id, update.id)),
)
.execute()
}
}

processedSpans += spansToMigrate.length

// If we got less than batch size, we've processed all spans
if (spansToMigrate.length < batchSize) break
}

return Result.ok({ processedSpans })
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { database } from '../../../client'
import { workspaces } from '../../../schema'
import { queues } from '../../queues'

export const migrateSpansJobs = async () => {
const freeWorkspaces = await database
.select({
id: workspaces.id,
})
.from(workspaces)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ok?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah the comment is wrong (copy pasted)


let _enqueuedJobs = 0

for (const workspace of freeWorkspaces) {
const { maintenanceQueue } = await queues()
await maintenanceQueue.add(
'migrateSpansJob',
{ workspaceId: workspace.id },
{ attempts: 1 },
)
_enqueuedJobs++
}
}
Loading