Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions apps/sim/app/api/cron/cleanup-stale-executions/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
})
}

// Mark stale table jobs (import or delete) as failed. Jobs run detached on the web container
// Mark stale table jobs (import, export, or delete) as failed. Jobs run detached on the web container
// and are lost if the pod is killed mid-run. `updated_at` is bumped by progress updates, so a
// `running` job with no recent update has stalled (not merely slow). Committed work is left in
// place (no rollback); the user retries. Also prune long-settled terminal jobs so the table
// doesn't grow unbounded (the latest job per table is what list/detail reads surface).
let staleImportsMarkedFailed = 0
let staleTableJobsMarkedFailed = 0
try {
const now = new Date()
const staleImports = await db
const staleJobs = await db
.update(tableJobs)
.set({
status: 'failed',
Expand All @@ -132,9 +132,9 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
.where(and(eq(tableJobs.status, 'running'), lt(tableJobs.updatedAt, staleThreshold)))
.returning({ id: tableJobs.id })

staleImportsMarkedFailed = staleImports.length
if (staleImportsMarkedFailed > 0) {
logger.info(`Marked ${staleImportsMarkedFailed} stale table jobs as failed`)
staleTableJobsMarkedFailed = staleJobs.length
if (staleTableJobsMarkedFailed > 0) {
logger.info(`Marked ${staleTableJobsMarkedFailed} stale table jobs as failed`)
}

const terminalRetention = new Date(Date.now() - TABLE_JOB_RETENTION_HOURS * 60 * 60 * 1000)
Expand Down Expand Up @@ -236,8 +236,8 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
staleThresholdMinutes: STALE_THRESHOLD_MINUTES,
retentionHours: JOB_RETENTION_HOURS,
},
tableImports: {
staleMarkedFailed: staleImportsMarkedFailed,
tableJobs: {
staleMarkedFailed: staleTableJobsMarkedFailed,
},
})
} catch (error) {
Expand Down
11 changes: 11 additions & 0 deletions apps/sim/lib/table/export-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ describe('runTableExport', () => {
expect(mockMarkJobFailed).not.toHaveBeenCalled()
})

it('stops before the upload when ownership is lost at the finalize gate', async () => {
mockUpdateJobProgress.mockResolvedValueOnce(true).mockResolvedValue(false)

await runTableExport(payload)

expect(mockSelectExportRowPage).toHaveBeenCalledTimes(1)
expect(mockUploadFile).not.toHaveBeenCalled()
expect(mockMarkJobReady).not.toHaveBeenCalled()
expect(mockMarkJobFailed).not.toHaveBeenCalled()
})

it('cleans up an orphaned upload when the job was canceled at the wire', async () => {
mockMarkJobReady.mockResolvedValue(false)

Expand Down
3 changes: 3 additions & 0 deletions apps/sim/lib/table/export-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ export async function runTableExport(payload: TableExportPayload): Promise<void>
}
if (format === 'json') chunks.push(']')

const ownsFinalize = await updateJobProgress(tableId, exported, jobId)
if (!ownsFinalize) throw new JobSupersededError()

const fileName = `${sanitizeExportFilename(table.name)}.${format}`
const key = `workspace/${workspaceId}/exports/${tableId}/${jobId}/${fileName}`
// `preserveKey` keeps the custom key verbatim (without it the provider rewrites the key to a
Expand Down
Loading