From 1d90aada7ee2d8467537b7c929a5796c5fce6575 Mon Sep 17 00:00:00 2001 From: Joana Maia Date: Fri, 26 Jun 2026 13:03:13 +0100 Subject: [PATCH 1/4] fix: classify merged gitlab MRs as MERGE_REQUEST_MERGED (CM-1298) API ingestion was emitting MERGE_REQUEST_CLOSED for merge requests with a merged_at timestamp, so MERGE_REQUEST_MERGED activities never landed. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Joana Maia --- .../libs/integrations/src/integrations/gitlab/processStream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/libs/integrations/src/integrations/gitlab/processStream.ts b/services/libs/integrations/src/integrations/gitlab/processStream.ts index 5af8196efd..8f41a49724 100644 --- a/services/libs/integrations/src/integrations/gitlab/processStream.ts +++ b/services/libs/integrations/src/integrations/gitlab/processStream.ts @@ -192,7 +192,7 @@ const handleMergeRequestsStream: GitlabStreamHandler = async (ctx, api, data) => data: item.data, user, }, - type: GitlabActivityType.MERGE_REQUEST_CLOSED, + type: GitlabActivityType.MERGE_REQUEST_MERGED, projectId: data.projectId, pathWithNamespace: data.pathWithNamespace, }) From f408ec73dc6a91fc1a544402457e85343ad348ad Mon Sep 17 00:00:00 2001 From: Joana Maia Date: Fri, 26 Jun 2026 13:19:18 +0100 Subject: [PATCH 2/4] chore: parameterize activities cleanup script by platform and type (CM-1298) Generalize the gerrit one-off cleanup so it can target any platform and any set of activity types via --platform / --types / --before CLI args. Used to purge mislabeled gitlab merge_request-closed rows so re-ingestion can recreate them with the correct type. Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Joana Maia --- .../apps/script_executor_worker/package.json | 1 + ...leanup-activities-by-platform-and-type.ts} | 283 ++++++++++-------- 2 files changed, 160 insertions(+), 124 deletions(-) rename services/apps/script_executor_worker/src/bin/{cleanup-gerrit-activities.ts => cleanup-activities-by-platform-and-type.ts} (65%) diff --git a/services/apps/script_executor_worker/package.json b/services/apps/script_executor_worker/package.json index d528cdd285..82a87bc110 100644 --- a/services/apps/script_executor_worker/package.json +++ b/services/apps/script_executor_worker/package.json @@ -9,6 +9,7 @@ "dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug", "cleanup-fork-activities": "npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts", "cleanup-member-aggregates": "npx tsx src/bin/cleanup-member-aggregates.ts", + "cleanup-activities-by-platform-and-type": "npx tsx src/bin/cleanup-activities-by-platform-and-type.ts", "recalculate-enrichment-affiliations": "npx tsx src/bin/recalculate-enrichment-affiliations.ts", "recalculate-all-affiliations": "npx tsx src/bin/recalculate-all-affiliations.ts", "add-lf-projects-to-collection": "npx tsx src/bin/add-lf-projects-to-collection.ts", diff --git a/services/apps/script_executor_worker/src/bin/cleanup-gerrit-activities.ts b/services/apps/script_executor_worker/src/bin/cleanup-activities-by-platform-and-type.ts similarity index 65% rename from services/apps/script_executor_worker/src/bin/cleanup-gerrit-activities.ts rename to services/apps/script_executor_worker/src/bin/cleanup-activities-by-platform-and-type.ts index 93b056f566..7fc6f974e9 100644 --- a/services/apps/script_executor_worker/src/bin/cleanup-gerrit-activities.ts +++ b/services/apps/script_executor_worker/src/bin/cleanup-activities-by-platform-and-type.ts @@ -1,39 +1,30 @@ /** - * Gerrit Activities Cleanup Script + * Activities Cleanup Script (by platform + type) * - * PROBLEM: - * Gerrit activities need to be cleaned up from both PostgreSQL and Tinybird - * based on specific platform and type filters with a date cutoff. - * - * SOLUTION: - * This script deletes activities from Gerrit platform across: - * - PostgreSQL (activityRelations table only) - * - Tinybird (activities and activityRelations tables) - * - * Filters: - * - platform = 'gerrit' - * - type in ('changeset-merged', 'changeset-abandoned', 'patchset_approval-created') - * - updatedAt < '2025-12-15' + * Deletes activities from PostgreSQL (activityRelations) and Tinybird + * (activities and activityRelations datasources) for a given platform and + * one or more activity types, optionally bounded by a date cutoff. * * Usage: - * # Via package.json script (recommended): - * pnpm run cleanup-gerrit-activities -- [--dry-run] [--tb-token ] + * pnpm run cleanup-activities-by-platform-and-type -- \ + * --platform \ + * --types \ + * [--before ] \ + * [--dry-run] \ + * [--tb-token ] * - * # Or directly with tsx: - * npx tsx src/bin/cleanup-gerrit-activities.ts [--dry-run] [--tb-token ] + * Required: + * --platform Platform name (e.g. 'gitlab', 'gerrit') + * --types Comma-separated activity types (e.g. 'merge_request-closed') * - * Options: - * --dry-run Display what would be deleted without actually deleting anything - * --tb-token Tinybird API token to use (overrides CROWD_TINYBIRD_ACTIVITIES_TOKEN environment variable) + * Optional: + * --before Only delete rows with updatedAt < this date (YYYY-MM-DD) + * --dry-run Report what would be deleted without deleting + * --tb-token Override CROWD_TINYBIRD_ACTIVITIES_TOKEN * * Environment Variables Required: - * CROWD_DB_WRITE_HOST - Postgres write host - * CROWD_DB_PORT - Postgres port - * CROWD_DB_USERNAME - Postgres username - * CROWD_DB_PASSWORD - Postgres password - * CROWD_DB_DATABASE - Postgres database name - * CROWD_TINYBIRD_BASE_URL - Tinybird API base URL - * CROWD_TINYBIRD_ACTIVITIES_TOKEN - Tinybird API token + * CROWD_DB_WRITE_HOST, CROWD_DB_PORT, CROWD_DB_USERNAME, CROWD_DB_PASSWORD, + * CROWD_DB_DATABASE, CROWD_TINYBIRD_BASE_URL, CROWD_TINYBIRD_ACTIVITIES_TOKEN */ import * as fs from 'fs' import * as path from 'path' @@ -46,7 +37,12 @@ import { import { QueryExecutor, pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { getServiceChildLogger } from '@crowd/logging' -const log = getServiceChildLogger('cleanup-gerrit-activities-script') +const log = getServiceChildLogger('cleanup-activities-by-platform-and-type-script') + +// Identifiers passed to Tinybird filters are interpolated, not parameterized, +// so guard against injection by requiring conservative character sets. +const IDENTIFIER_PATTERN = /^[A-Za-z0-9_-]+$/ +const DATE_PATTERN = /^\d{4}-\d{2}-\d{2}$/ interface DeletionStatus { success: boolean @@ -54,10 +50,17 @@ interface DeletionStatus { error?: string } +interface CleanupFilters { + platform: string + types: string[] + before?: string +} + interface CleanupResult { status: 'success' | 'failure' startTime: string endTime: string + filters: CleanupFilters totalBatches: number failedBatches: number deletions: { @@ -69,9 +72,21 @@ interface CleanupResult { } } -/** - * Initialize Postgres connection using QueryExecutor - */ +function quote(value: string): string { + return `'${value.replace(/'/g, "''")}'` +} + +function buildWhereClause(filters: CleanupFilters): string { + const clauses = [ + `platform = ${quote(filters.platform)}`, + `type IN (${filters.types.map(quote).join(', ')})`, + ] + if (filters.before) { + clauses.push(`updatedAt < ${quote(filters.before)}`) + } + return clauses.join(' AND ') +} + async function initPostgresClient(): Promise { log.info('Initializing Postgres connection...') @@ -89,11 +104,13 @@ async function initPostgresClient(): Promise { async function queryAndProcessActivityIdsInBatches( tinybird: TinybirdClient, postgres: QueryExecutor, + filters: CleanupFilters, dryRun: boolean, onBatchProcessed: () => void, ): Promise { - log.info('Querying activity IDs from Tinybird for Gerrit cleanup...') + log.info('Querying activity IDs from Tinybird...') + const where = buildWhereClause(filters) const BATCH_SIZE = 10000 let offset = 0 let hasMore = true @@ -102,7 +119,7 @@ async function queryAndProcessActivityIdsInBatches( try { while (hasMore) { - const query = `SELECT DISTINCT activityId FROM activityRelations WHERE platform = 'gerrit' AND type IN ('changeset-merged', 'changeset-abandoned', 'patchset_approval-created') AND updatedAt < '2025-12-15' ORDER BY activityId LIMIT ${BATCH_SIZE} OFFSET ${offset} FORMAT JSON` + const query = `SELECT DISTINCT activityId FROM activityRelations WHERE ${where} ORDER BY activityId LIMIT ${BATCH_SIZE} OFFSET ${offset} FORMAT JSON` log.info(`Querying batch: offset=${offset}, limit=${BATCH_SIZE}`) const result = await tinybird.executeSql<{ data: Array<{ activityId: string }> }>(query) @@ -152,9 +169,6 @@ async function queryAndProcessActivityIdsInBatches( } } -/** - * Delete activity relations from Postgres using activity IDs - */ async function deleteActivityRelationsFromPostgres( postgres: QueryExecutor, activityIds: string[], @@ -193,11 +207,9 @@ async function deleteActivityRelationsFromPostgres( } } -/** - * Delete activities from Tinybird using the delete API - */ async function deleteActivitiesFromTinybird( tinybird: TinybirdClient, + filters: CleanupFilters, dryRun = false, ): Promise<{ activities: DeletionStatus @@ -209,11 +221,11 @@ async function deleteActivitiesFromTinybird( activityRelations: { success: false } as DeletionStatus, } + const deleteCondition = buildWhereClause(filters) + if (dryRun) { - log.info('[DRY RUN] Would delete activities from Tinybird using Gerrit filters...') - log.info( - `[DRY RUN] Filters: platform='gerrit', type in ('changeset-merged', 'changeset-abandoned', 'patchset_approval-created'), updatedAt < '2025-12-15'`, - ) + log.info('[DRY RUN] Would delete activities from Tinybird...') + log.info(`[DRY RUN] Condition: ${deleteCondition}`) log.info(`[DRY RUN] Would delete from 'activities' datasource`) log.info(`[DRY RUN] Would delete from 'activityRelations' datasource`) return { @@ -223,20 +235,15 @@ async function deleteActivitiesFromTinybird( } } - log.info('Deleting activities from Tinybird using Gerrit filters...') + log.info(`Deleting activities from Tinybird with condition: ${deleteCondition}`) const triggeredJobIds: string[] = [] - // Define deletion conditions - const activitiesDeleteCondition = `platform = 'gerrit' AND type IN ('changeset-merged', 'changeset-abandoned', 'patchset_approval-created') AND updatedAt < '2025-12-15'` - const activityRelationsDeleteCondition = `platform = 'gerrit' AND type IN ('changeset-merged', 'changeset-abandoned', 'patchset_approval-created') AND updatedAt < '2025-12-15'` - - // Delete from activities datasource try { log.info('Triggering deletion job for activities datasource...') const activitiesJobResponse = await tinybird.deleteDatasource( 'activities', - activitiesDeleteCondition, + deleteCondition, true, false, // Don't wait ) @@ -254,12 +261,11 @@ async function deleteActivitiesFromTinybird( } } - // Delete from activityRelations datasource try { log.info('Triggering deletion job for activityRelations datasource...') const activityRelationsJobResponse = await tinybird.deleteDatasource( 'activityRelations', - activityRelationsDeleteCondition, + deleteCondition, true, false, // Don't wait ) @@ -287,30 +293,24 @@ async function deleteActivitiesFromTinybird( } } -/** - * Main cleanup process - */ -async function runCleanup(dryRun = false, tbToken?: string): Promise { +async function runCleanup( + filters: CleanupFilters, + dryRun = false, + tbToken?: string, +): Promise { const startTime = new Date().toISOString() let failedBatches = 0 let totalBatches = 0 - if (dryRun) { - log.info(`\n${'='.repeat(80)}`) - log.info(`[DRY RUN MODE] Gerrit Activities Cleanup`) - log.info(`${'='.repeat(80)}`) - } else { - log.info(`\n${'='.repeat(80)}`) - log.info(`Gerrit Activities Cleanup`) - log.info(`${'='.repeat(80)}`) - } + log.info(`\n${'='.repeat(80)}`) + log.info(`${dryRun ? '[DRY RUN MODE] ' : ''}Activities Cleanup`) + log.info(`Filters: ${buildWhereClause(filters)}`) + log.info(`${'='.repeat(80)}`) try { - // Initialize database clients const postgres = await initPostgresClient() const tinybird = new TinybirdClient(tbToken) - // Track deletion statuses const allDeletionStatuses = { postgres: { success: true } as DeletionStatus, tinybird: { @@ -319,13 +319,13 @@ async function runCleanup(dryRun = false, tbToken?: string): Promise { }, } - // Step 1: Query activity IDs from Tinybird in batches and delete from Postgres as we go log.info( 'Step 1: Processing activity IDs in batches from Tinybird and deleting from Postgres...', ) const totalProcessed = await queryAndProcessActivityIdsInBatches( tinybird, postgres, + filters, dryRun, () => { totalBatches++ @@ -334,17 +334,15 @@ async function runCleanup(dryRun = false, tbToken?: string): Promise { if (totalProcessed === 0) { log.info('No activities to delete, skipping Tinybird deletion steps') - log.info(`✓ Completed ${dryRun ? 'dry run for' : 'cleanup for'} Gerrit activities`) + log.info(`✓ Completed ${dryRun ? 'dry run' : 'cleanup'}`) return } log.info(`✓ Completed processing ${totalProcessed} activities from Postgres`) - // Step 2: Delete from Tinybird in a single operation per datasource log.info('Step 2: Triggering Tinybird deletions...') - const tinybirdStatuses = await deleteActivitiesFromTinybird(tinybird, dryRun) + const tinybirdStatuses = await deleteActivitiesFromTinybird(tinybird, filters, dryRun) - // Track failures from Tinybird if (!tinybirdStatuses.activities.success) { allDeletionStatuses.tinybird.activities = tinybirdStatuses.activities failedBatches++ @@ -354,7 +352,6 @@ async function runCleanup(dryRun = false, tbToken?: string): Promise { failedBatches++ } - // Wait for all Tinybird deletion jobs to complete if (!dryRun && tinybirdStatuses.jobIds.length > 0) { log.info( `Waiting for ${tinybirdStatuses.jobIds.length} Tinybird deletion job(s) to complete...`, @@ -368,21 +365,24 @@ async function runCleanup(dryRun = false, tbToken?: string): Promise { } } - // Create cleanup result const endTime = new Date().toISOString() const result: CleanupResult = { status: failedBatches > 0 ? 'failure' : 'success', startTime, endTime, + filters, totalBatches, failedBatches, deletions: allDeletionStatuses, } - // Save results to file + const safeSuffix = `${filters.platform}_${filters.types.join('-')}`.replace( + /[^A-Za-z0-9_-]/g, + '_', + ) const jsonFilePath = path.join( '/tmp', - `cleanup_gerrit_activities_${new Date().toISOString().replace(/[:.]/g, '-')}.json`, + `cleanup_activities_${safeSuffix}_${new Date().toISOString().replace(/[:.]/g, '-')}.json`, ) try { fs.writeFileSync(jsonFilePath, JSON.stringify(result, null, 2), 'utf-8') @@ -391,7 +391,6 @@ async function runCleanup(dryRun = false, tbToken?: string): Promise { log.error(`Failed to write cleanup results to ${jsonFilePath}: ${error.message}`) } - // Summary log.info(`\n${'='.repeat(80)}`) log.info('Cleanup Summary') log.info(`${'='.repeat(80)}`) @@ -423,62 +422,98 @@ async function runCleanup(dryRun = false, tbToken?: string): Promise { process.exit(1) } } catch (error) { - log.error(`Failed to run Gerrit cleanup: ${error.message}`) + log.error(`Failed to run cleanup: ${error.message}`) throw error } } -/** - * Main entry point - */ +function getFlagValue(args: string[], name: string): string | undefined { + const idx = args.indexOf(name) + if (idx === -1) return undefined + if (idx + 1 >= args.length) { + log.error(`Error: ${name} requires a value`) + process.exit(1) + } + return args[idx + 1] +} + +function printHelp(): void { + log.info(` + Usage: + pnpm run cleanup-activities-by-platform-and-type -- \\ + --platform \\ + --types \\ + [--before ] \\ + [--dry-run] \\ + [--tb-token ] + + Required: + --platform Platform name (e.g. 'gitlab', 'gerrit') + --types Comma-separated activity types (e.g. 'merge_request-closed') + + Optional: + --before Only delete rows with updatedAt < this date (YYYY-MM-DD) + --dry-run Report what would be deleted without deleting + --tb-token Override CROWD_TINYBIRD_ACTIVITIES_TOKEN + + Examples: + # Dry run: see how many gitlab merge_request-closed rows would be deleted + pnpm run cleanup-activities-by-platform-and-type -- \\ + --platform gitlab --types merge_request-closed --dry-run + + # Actual cleanup + pnpm run cleanup-activities-by-platform-and-type -- \\ + --platform gitlab --types merge_request-closed + `) +} + async function main() { const args = process.argv.slice(2) - // Parse flags - const dryRunIndex = args.indexOf('--dry-run') - const tbTokenIndex = args.indexOf('--tb-token') - const dryRun = dryRunIndex !== -1 + if (args.includes('--help') || args.includes('-h') || args.length === 0) { + printHelp() + process.exit(args.length === 0 ? 1 : 0) + } + + const dryRun = args.includes('--dry-run') + const tbToken = getFlagValue(args, '--tb-token') + const platform = getFlagValue(args, '--platform') + const typesArg = getFlagValue(args, '--types') + const before = getFlagValue(args, '--before') + + if (!platform) { + log.error('Error: --platform is required') + printHelp() + process.exit(1) + } + if (!IDENTIFIER_PATTERN.test(platform)) { + log.error(`Error: invalid --platform value '${platform}' (allowed: ${IDENTIFIER_PATTERN})`) + process.exit(1) + } - // Extract tb-token value if provided - let tbToken: string | undefined - if (tbTokenIndex !== -1) { - if (tbTokenIndex + 1 >= args.length) { - log.error('Error: --tb-token requires a value') + if (!typesArg) { + log.error('Error: --types is required (comma-separated)') + printHelp() + process.exit(1) + } + const types = typesArg + .split(',') + .map((t) => t.trim()) + .filter(Boolean) + if (types.length === 0) { + log.error('Error: --types must contain at least one value') + process.exit(1) + } + for (const t of types) { + if (!IDENTIFIER_PATTERN.test(t)) { + log.error(`Error: invalid type '${t}' (allowed: ${IDENTIFIER_PATTERN})`) process.exit(1) } - tbToken = args[tbTokenIndex + 1] } - // Check for help flag or no valid arguments - if (args.includes('--help') || args.includes('-h')) { - log.info(` - Usage: - # Via package.json script (recommended): - pnpm run cleanup-gerrit-activities -- [--dry-run] [--tb-token ] - - # Or directly with tsx: - npx tsx src/bin/cleanup-gerrit-activities.ts [--dry-run] [--tb-token ] - - Options: - --dry-run: (optional) Display what would be deleted without actually deleting anything - --tb-token: (optional) Tinybird API token to use (overrides CROWD_TINYBIRD_ACTIVITIES_TOKEN environment variable) - - Examples: - # Run cleanup - pnpm run cleanup-gerrit-activities - - # Dry run to preview what would be deleted - pnpm run cleanup-gerrit-activities -- --dry-run - - # Use custom Tinybird token - pnpm run cleanup-gerrit-activities -- --tb-token your-token-here - - Filters applied: - - platform = 'gerrit' - - type in ('changeset-merged', 'changeset-abandoned', 'patchset_approval-created') - - updatedAt < '2025-12-15' - `) - process.exit(0) + if (before && !DATE_PATTERN.test(before)) { + log.error(`Error: --before must match YYYY-MM-DD, got '${before}'`) + process.exit(1) } if (dryRun) { @@ -488,9 +523,9 @@ async function main() { } try { - await runCleanup(dryRun, tbToken) + await runCleanup({ platform, types, before }, dryRun, tbToken) } catch (error) { - log.error(error, 'Failed to run Gerrit cleanup script') + log.error(error, 'Failed to run cleanup script') log.error(`\n❌ Error: ${error.message}`) process.exit(1) } From 6c07a9cec804b7f9588dbbbb622d2e08e29c5ade Mon Sep 17 00:00:00 2001 From: Joana Maia Date: Fri, 26 Jun 2026 13:41:32 +0100 Subject: [PATCH 3/4] chore: harden cleanup-activities script with pre-flight count and direct PG delete (CM-1298) Ports safety improvements proposed in #4080 onto the parameterized script: - Split into separate Tinybird (unquoted) and Postgres (pg-promise params, "updatedAt" double-quoted) filter builders so each store gets its own dialect. - Add cheap pre-flight Tinybird count() on both datasources to show blast radius before any destructive action. - Interactive confirmation prompt with --yes / -y bypass for non-interactive runs. - Switch PG cleanup to direct chunked DELETE (fetch matching IDs from PG, delete by PK) rather than streaming IDs from Tinybird. Decouples PG cleanup throughput from Tinybird. - Extend Tinybird job wait timeout from 1h to 6h for large bulk deletes. - Persist result JSON immediately after triggering TB jobs so the job IDs survive a wait timeout. - Docstring note that derived MVs are not cascaded by raw datasource deletes. Supersedes #4080. Signed-off-by: Joana Maia --- ...cleanup-activities-by-platform-and-type.ts | 602 +++++++++--------- 1 file changed, 316 insertions(+), 286 deletions(-) diff --git a/services/apps/script_executor_worker/src/bin/cleanup-activities-by-platform-and-type.ts b/services/apps/script_executor_worker/src/bin/cleanup-activities-by-platform-and-type.ts index 7fc6f974e9..524749e2c9 100644 --- a/services/apps/script_executor_worker/src/bin/cleanup-activities-by-platform-and-type.ts +++ b/services/apps/script_executor_worker/src/bin/cleanup-activities-by-platform-and-type.ts @@ -1,16 +1,25 @@ /** * Activities Cleanup Script (by platform + type) * - * Deletes activities from PostgreSQL (activityRelations) and Tinybird - * (activities and activityRelations datasources) for a given platform and + * Deletes activities from PostgreSQL (`activityRelations`) and Tinybird + * (`activities` and `activityRelations` datasources) for a given platform and * one or more activity types, optionally bounded by a date cutoff. * + * Before any deletion, the script prints the affected row counts from Tinybird + * and prompts for confirmation (skip with --yes). + * + * NOTE: This script only purges the raw Tinybird datasources (`activities` and + * `activityRelations`). Derived materialized views (activities_backup, + * activities_deduplicated_ds, activityRelations_bucket_MV_ds_*, etc.) are NOT + * affected because Tinybird/ClickHouse MV deletes do not cascade. + * * Usage: * pnpm run cleanup-activities-by-platform-and-type -- \ * --platform \ * --types \ * [--before ] \ * [--dry-run] \ + * [--yes|-y] \ * [--tb-token ] * * Required: @@ -18,8 +27,9 @@ * --types Comma-separated activity types (e.g. 'merge_request-closed') * * Optional: - * --before Only delete rows with updatedAt < this date (YYYY-MM-DD) + * --before Only delete rows with "updatedAt" < this date (YYYY-MM-DD) * --dry-run Report what would be deleted without deleting + * --yes / -y Skip the interactive confirmation prompt * --tb-token Override CROWD_TINYBIRD_ACTIVITIES_TOKEN * * Environment Variables Required: @@ -28,6 +38,7 @@ */ import * as fs from 'fs' import * as path from 'path' +import * as readline from 'readline' import { TinybirdClient, @@ -44,11 +55,9 @@ const log = getServiceChildLogger('cleanup-activities-by-platform-and-type-scrip const IDENTIFIER_PATTERN = /^[A-Za-z0-9_-]+$/ const DATE_PATTERN = /^\d{4}-\d{2}-\d{2}$/ -interface DeletionStatus { - success: boolean - jobId?: string - error?: string -} +const POSTGRES_BATCH_SIZE = 10000 +const TINYBIRD_JOB_POLL_INTERVAL_MS = 60_000 +const TINYBIRD_JOB_TIMEOUT_MS = 6 * 60 * 60 * 1000 // 6h: bulk deletes can be slow interface CleanupFilters { platform: string @@ -56,13 +65,24 @@ interface CleanupFilters { before?: string } +interface DeletionStatus { + success: boolean + jobId?: string + error?: string +} + interface CleanupResult { - status: 'success' | 'failure' + status: 'success' | 'failure' | 'pending' startTime: string - endTime: string + endTime?: string filters: CleanupFilters - totalBatches: number - failedBatches: number + counts: { + tinybirdActivities: number + tinybirdActivityRelations: number + } + postgresDeleted: number + postgresFailedBatches: number + tinybirdJobIds: string[] deletions: { postgres: DeletionStatus tinybird: { @@ -72,361 +92,366 @@ interface CleanupResult { } } -function quote(value: string): string { - return `'${value.replace(/'/g, "''")}'` +// --------------------------------------------------------------------------- +// Filter clause builders +// --------------------------------------------------------------------------- + +/** Tinybird/ClickHouse WHERE clause — unquoted identifiers, single-quoted strings. */ +function buildTinybirdFilterClause(filters: CleanupFilters): string { + const parts = [`platform = '${filters.platform}'`] + const typesList = filters.types.map((t) => `'${t}'`).join(', ') + parts.push(`type IN (${typesList})`) + if (filters.before) { + parts.push(`updatedAt < '${filters.before}'`) + } + return parts.join(' AND ') } -function buildWhereClause(filters: CleanupFilters): string { - const clauses = [ - `platform = ${quote(filters.platform)}`, - `type IN (${filters.types.map(quote).join(', ')})`, - ] +/** + * Postgres WHERE clause + pg-promise param map. + * `"updatedAt"` is camelCase and must be double-quoted; `platform` and `type` + * are lowercase and unquoted-safe. + */ +function buildPostgresFilter(filters: CleanupFilters): { + where: string + values: Record +} { + const conditions: string[] = [`platform = $(platform)`, `type IN ($(types:csv))`] + const values: Record = { + platform: filters.platform, + types: filters.types, + } if (filters.before) { - clauses.push(`updatedAt < ${quote(filters.before)}`) + conditions.push(`"updatedAt" < $(beforeDate)`) + values.beforeDate = filters.before } - return clauses.join(' AND ') + return { where: conditions.join(' AND '), values } } +// --------------------------------------------------------------------------- +// Init +// --------------------------------------------------------------------------- + async function initPostgresClient(): Promise { log.info('Initializing Postgres connection...') - const dbConnection = await getDbConnection(WRITE_DB_CONFIG()) const queryExecutor = pgpQx(dbConnection) - log.info('Postgres connection established') return queryExecutor } -/** - * Query activity IDs from Tinybird in batches and delete from Postgres immediately - * Uses batched queries to avoid hitting Tinybird's result size limit (100 MiB) - */ -async function queryAndProcessActivityIdsInBatches( +// --------------------------------------------------------------------------- +// Count helpers +// --------------------------------------------------------------------------- + +async function countTinybirdRows( tinybird: TinybirdClient, - postgres: QueryExecutor, + datasource: string, filters: CleanupFilters, - dryRun: boolean, - onBatchProcessed: () => void, ): Promise { - log.info('Querying activity IDs from Tinybird...') - - const where = buildWhereClause(filters) - const BATCH_SIZE = 10000 - let offset = 0 - let hasMore = true - let totalProcessed = 0 - let batchNumber = 0 - - try { - while (hasMore) { - const query = `SELECT DISTINCT activityId FROM activityRelations WHERE ${where} ORDER BY activityId LIMIT ${BATCH_SIZE} OFFSET ${offset} FORMAT JSON` - log.info(`Querying batch: offset=${offset}, limit=${BATCH_SIZE}`) - - const result = await tinybird.executeSql<{ data: Array<{ activityId: string }> }>(query) - const batchActivityIds = result.data.map((row) => row.activityId) + const whereClause = buildTinybirdFilterClause(filters) + const query = `SELECT count() AS c FROM ${datasource} WHERE ${whereClause} FORMAT JSON` + const result = await tinybird.executeSql<{ data: Array<{ c: number }> }>(query) + return result.data[0]?.c ?? 0 +} - if (batchActivityIds.length === 0) { - hasMore = false +// --------------------------------------------------------------------------- +// Confirmation prompt +// --------------------------------------------------------------------------- + +async function confirmOrAbort(message: string): Promise { + const rl = readline.createInterface({ input: process.stdin, output: process.stdout }) + return new Promise((resolve, reject) => { + rl.question(`${message}\nType "yes" to proceed: `, (answer) => { + rl.close() + if (answer.trim().toLowerCase() === 'yes') { + resolve() } else { - batchNumber++ - log.info( - `Processing batch ${batchNumber} (${batchActivityIds.length} activities, total processed: ${totalProcessed})...`, - ) - - const postgresStatus = await deleteActivityRelationsFromPostgres( - postgres, - batchActivityIds, - dryRun, - ) - - if (!postgresStatus.success) { - log.error(`Failed to delete batch ${batchNumber} from Postgres: ${postgresStatus.error}`) - } - - totalProcessed += batchActivityIds.length - onBatchProcessed() - - // If we got fewer results than the batch size, we've reached the end - if (batchActivityIds.length < BATCH_SIZE) { - hasMore = false - } else { - offset += BATCH_SIZE - } + reject(new Error('Aborted by user')) } - } - - log.info(`Found and processed ${totalProcessed} total activity ID(s) in Tinybird`) - return totalProcessed - } catch (error) { - const statusCode = error?.response?.status || 'unknown' - const responseBody = error?.response?.data - ? JSON.stringify(error.response.data) - : error?.response?.body || 'no body' - log.error( - `Failed to query activity IDs from Tinybird: ${error.message} (status: ${statusCode}, body: ${responseBody})`, - ) - throw error - } + }) + }) } -async function deleteActivityRelationsFromPostgres( +// --------------------------------------------------------------------------- +// Postgres chunked delete +// --------------------------------------------------------------------------- + +/** + * Fetch matching IDs from `activityRelations` and delete by PK in batches. + * PK deletes are cheap index lookups; the filter scan happens once per batch. + * Returns { deleted, failedBatches }. Stops on the first failed batch so we + * don't re-fetch the same undeleted rows forever. + */ +async function deletePostgresInChunks( postgres: QueryExecutor, - activityIds: string[], - dryRun = false, -): Promise { - if (activityIds.length === 0) { - log.info(`No activity IDs to ${dryRun ? 'query' : 'delete'} from Postgres`) - return { success: true } - } + filters: CleanupFilters, + batchSize = POSTGRES_BATCH_SIZE, +): Promise<{ deleted: number; failedBatches: number }> { + const { where, values } = buildPostgresFilter(filters) + const fetchQuery = `SELECT "activityId" FROM "activityRelations" WHERE ${where} LIMIT ${batchSize}` - try { - if (dryRun) { - log.info(`[DRY RUN] Querying ${activityIds.length} activity relations from Postgres...`) - const query = ` - SELECT COUNT(*) as count - FROM "activityRelations" - WHERE "activityId" IN ($(activityIds:csv)) - ` - const result = (await postgres.selectOne(query, { activityIds })) as { count: string } - const rowCount = parseInt(result.count, 10) - log.info(`[DRY RUN] Would delete ${rowCount} activity relation(s) from Postgres`) - return { success: true } + let total = 0 + let batch = 0 + let failedBatches = 0 + let rows: Array<{ activityId: string }> + + do { + rows = (await postgres.select(fetchQuery, values)) as Array<{ activityId: string }> + if (rows.length === 0) break + + const ids = rows.map((r) => r.activityId) + batch++ + + try { + await postgres.result(`DELETE FROM "activityRelations" WHERE "activityId" IN ($(ids:csv))`, { + ids, + }) + total += ids.length + } catch (error) { + log.error( + ` Batch ${batch} delete failed (sample IDs: ${ids.slice(0, 3).join(', ')}): ${error.message}`, + ) + failedBatches++ + break } - log.info(`Deleting ${activityIds.length} activity relations from Postgres...`) - const query = ` - DELETE FROM "activityRelations" - WHERE "activityId" IN ($(activityIds:csv)) - ` - const rowCount = await postgres.result(query, { activityIds }) - log.info(`✓ Deleted ${rowCount} activity relation(s) from Postgres`) - return { success: true } - } catch (error) { - log.error(`Failed to delete activity relations from Postgres: ${error.message}`) - return { success: false, error: error.message } - } + if (batch % 10 === 0) { + log.info(` … deleted ${total.toLocaleString()} rows so far (batch ${batch})`) + } + } while (rows.length === batchSize) + + return { deleted: total, failedBatches } } +// --------------------------------------------------------------------------- +// Tinybird delete jobs +// --------------------------------------------------------------------------- + async function deleteActivitiesFromTinybird( tinybird: TinybirdClient, filters: CleanupFilters, - dryRun = false, ): Promise<{ activities: DeletionStatus activityRelations: DeletionStatus jobIds: string[] }> { + const deleteCondition = buildTinybirdFilterClause(filters) const results = { activities: { success: false } as DeletionStatus, activityRelations: { success: false } as DeletionStatus, } - - const deleteCondition = buildWhereClause(filters) - - if (dryRun) { - log.info('[DRY RUN] Would delete activities from Tinybird...') - log.info(`[DRY RUN] Condition: ${deleteCondition}`) - log.info(`[DRY RUN] Would delete from 'activities' datasource`) - log.info(`[DRY RUN] Would delete from 'activityRelations' datasource`) - return { - activities: { success: true }, - activityRelations: { success: true }, - jobIds: [], - } - } - - log.info(`Deleting activities from Tinybird with condition: ${deleteCondition}`) - const triggeredJobIds: string[] = [] + log.info('Triggering deletion job for Tinybird activities datasource...') try { - log.info('Triggering deletion job for activities datasource...') - const activitiesJobResponse = await tinybird.deleteDatasource( - 'activities', - deleteCondition, - true, - false, // Don't wait - ) - log.info(`✓ Triggered deletion job for activities (job_id: ${activitiesJobResponse.job_id})`) - triggeredJobIds.push(activitiesJobResponse.job_id) - results.activities = { - success: true, - jobId: activitiesJobResponse.job_id, - } + const resp = await tinybird.deleteDatasource('activities', deleteCondition, true, false) + log.info(`✓ Triggered activities deletion job (job_id: ${resp.job_id})`) + triggeredJobIds.push(resp.job_id) + results.activities = { success: true, jobId: resp.job_id } } catch (error) { - log.error(`Failed to trigger deletion job for activities datasource: ${error.message}`) - results.activities = { - success: false, - error: error.message, - } + log.error(`Failed to trigger deletion job for activities: ${error.message}`) + results.activities = { success: false, error: error.message } } + log.info('Triggering deletion job for Tinybird activityRelations datasource...') try { - log.info('Triggering deletion job for activityRelations datasource...') - const activityRelationsJobResponse = await tinybird.deleteDatasource( - 'activityRelations', - deleteCondition, - true, - false, // Don't wait - ) - log.info( - `✓ Triggered deletion job for activityRelations (job_id: ${activityRelationsJobResponse.job_id})`, - ) - triggeredJobIds.push(activityRelationsJobResponse.job_id) - results.activityRelations = { - success: true, - jobId: activityRelationsJobResponse.job_id, - } + const resp = await tinybird.deleteDatasource('activityRelations', deleteCondition, true, false) + log.info(`✓ Triggered activityRelations deletion job (job_id: ${resp.job_id})`) + triggeredJobIds.push(resp.job_id) + results.activityRelations = { success: true, jobId: resp.job_id } } catch (error) { - log.error(`Failed to trigger deletion job for activityRelations datasource: ${error.message}`) - results.activityRelations = { - success: false, - error: error.message, - } + log.error(`Failed to trigger deletion job for activityRelations: ${error.message}`) + results.activityRelations = { success: false, error: error.message } } - log.info(`✓ All deletion jobs triggered (${triggeredJobIds.length} running in background)`) + return { ...results, jobIds: triggeredJobIds } +} + +// --------------------------------------------------------------------------- +// Result JSON persistence +// --------------------------------------------------------------------------- + +function resultJsonPath(filters: CleanupFilters, startTime: string): string { + const safeSuffix = `${filters.platform}_${filters.types.join('-')}`.replace( + /[^A-Za-z0-9_-]/g, + '_', + ) + return path.join( + '/tmp', + `cleanup_activities_${safeSuffix}_${startTime.replace(/[:.]/g, '-')}.json`, + ) +} - return { - ...results, - jobIds: triggeredJobIds, +function writeResult(filePath: string, result: CleanupResult): void { + try { + fs.writeFileSync(filePath, JSON.stringify(result, null, 2), 'utf-8') + log.info(`✓ Cleanup results saved to: ${filePath}`) + } catch (error) { + log.error(`Failed to write cleanup results to ${filePath}: ${error.message}`) } } +// --------------------------------------------------------------------------- +// Main orchestration +// --------------------------------------------------------------------------- + async function runCleanup( filters: CleanupFilters, - dryRun = false, + dryRun: boolean, + skipConfirm: boolean, tbToken?: string, ): Promise { const startTime = new Date().toISOString() - let failedBatches = 0 - let totalBatches = 0 log.info(`\n${'='.repeat(80)}`) log.info(`${dryRun ? '[DRY RUN MODE] ' : ''}Activities Cleanup`) - log.info(`Filters: ${buildWhereClause(filters)}`) + log.info(`Filters: ${buildTinybirdFilterClause(filters)}`) log.info(`${'='.repeat(80)}`) - try { - const postgres = await initPostgresClient() - const tinybird = new TinybirdClient(tbToken) + const postgres = await initPostgresClient() + const tinybird = new TinybirdClient(tbToken) - const allDeletionStatuses = { - postgres: { success: true } as DeletionStatus, - tinybird: { - activities: { success: true } as DeletionStatus, - activityRelations: { success: true } as DeletionStatus, - }, - } + log.info('Counting affected rows in Tinybird...') + const [tbActivitiesCount, tbRelationsCount] = await Promise.all([ + countTinybirdRows(tinybird, 'activities', filters), + countTinybirdRows(tinybird, 'activityRelations', filters), + ]) + log.info(` Tinybird activities : ${tbActivitiesCount.toLocaleString()} rows`) + log.info(` Tinybird activityRelations : ${tbRelationsCount.toLocaleString()} rows`) + log.info(` PostgreSQL activityRelations : will be deleted by streaming batches (no pre-count)`) + + if (dryRun) { + log.info(`\n[DRY RUN] Would delete:`) log.info( - 'Step 1: Processing activity IDs in batches from Tinybird and deleting from Postgres...', - ) - const totalProcessed = await queryAndProcessActivityIdsInBatches( - tinybird, - postgres, - filters, - dryRun, - () => { - totalBatches++ - }, + ` PostgreSQL activityRelations matching filter (actual count reported during real execution)`, ) + log.info(` ${tbActivitiesCount.toLocaleString()} rows from Tinybird activities`) + log.info(` ${tbRelationsCount.toLocaleString()} rows from Tinybird activityRelations`) + log.info('[DRY RUN] No data was deleted.') + return + } - if (totalProcessed === 0) { - log.info('No activities to delete, skipping Tinybird deletion steps') - log.info(`✓ Completed ${dryRun ? 'dry run' : 'cleanup'}`) - return - } - - log.info(`✓ Completed processing ${totalProcessed} activities from Postgres`) + if (tbActivitiesCount === 0 && tbRelationsCount === 0) { + log.info('No matching rows found in Tinybird. Nothing to delete.') + return + } - log.info('Step 2: Triggering Tinybird deletions...') - const tinybirdStatuses = await deleteActivitiesFromTinybird(tinybird, filters, dryRun) + if (!skipConfirm) { + await confirmOrAbort( + `\nAbout to permanently delete PG rows matching filter, ${tbActivitiesCount.toLocaleString()} TB activities, ${tbRelationsCount.toLocaleString()} TB activityRelations.`, + ) + } - if (!tinybirdStatuses.activities.success) { - allDeletionStatuses.tinybird.activities = tinybirdStatuses.activities - failedBatches++ - } - if (!tinybirdStatuses.activityRelations.success) { - allDeletionStatuses.tinybird.activityRelations = tinybirdStatuses.activityRelations - failedBatches++ - } + const result: CleanupResult = { + status: 'pending', + startTime, + filters, + counts: { + tinybirdActivities: tbActivitiesCount, + tinybirdActivityRelations: tbRelationsCount, + }, + postgresDeleted: 0, + postgresFailedBatches: 0, + tinybirdJobIds: [], + deletions: { + postgres: { success: true }, + tinybird: { + activities: { success: true }, + activityRelations: { success: true }, + }, + }, + } + const jsonFilePath = resultJsonPath(filters, startTime) - if (!dryRun && tinybirdStatuses.jobIds.length > 0) { - log.info( - `Waiting for ${tinybirdStatuses.jobIds.length} Tinybird deletion job(s) to complete...`, + log.info(`\nStep 1: Deleting matching rows from PostgreSQL in ${POSTGRES_BATCH_SIZE} batches...`) + try { + const pgResult = await deletePostgresInChunks(postgres, filters) + result.postgresDeleted = pgResult.deleted + result.postgresFailedBatches = pgResult.failedBatches + if (pgResult.failedBatches > 0) { + log.warn( + ` ${pgResult.failedBatches} batch(es) failed — ${pgResult.deleted.toLocaleString()} rows deleted successfully`, ) - try { - await tinybird.waitForJobs(tinybirdStatuses.jobIds, 60000, 3600000) // 1min interval, 1h timeout - log.info(`✓ All Tinybird deletion jobs completed`) - } catch (error) { - log.error(`Failed to wait for Tinybird deletion jobs: ${error.message}`) - // Continue anyway - jobs are still running in background + result.deletions.postgres = { + success: false, + error: `${pgResult.failedBatches} batch(es) failed`, } + } else { + log.info(`✓ Deleted ${pgResult.deleted.toLocaleString()} row(s) from PostgreSQL`) } + } catch (error) { + log.error(`Postgres deletion failed: ${error.message}`) + result.deletions.postgres = { success: false, error: error.message } + } - const endTime = new Date().toISOString() - const result: CleanupResult = { - status: failedBatches > 0 ? 'failure' : 'success', - startTime, - endTime, - filters, - totalBatches, - failedBatches, - deletions: allDeletionStatuses, - } + log.info('\nStep 2: Triggering Tinybird deletions...') + const tinybirdStatuses = await deleteActivitiesFromTinybird(tinybird, filters) + result.deletions.tinybird.activities = tinybirdStatuses.activities + result.deletions.tinybird.activityRelations = tinybirdStatuses.activityRelations + result.tinybirdJobIds = tinybirdStatuses.jobIds - const safeSuffix = `${filters.platform}_${filters.types.join('-')}`.replace( - /[^A-Za-z0-9_-]/g, - '_', - ) - const jsonFilePath = path.join( - '/tmp', - `cleanup_activities_${safeSuffix}_${new Date().toISOString().replace(/[:.]/g, '-')}.json`, + // Persist result BEFORE waiting on TB jobs so the job IDs survive a timeout. + writeResult(jsonFilePath, result) + + if (tinybirdStatuses.jobIds.length > 0) { + log.info( + `Waiting for ${tinybirdStatuses.jobIds.length} Tinybird deletion job(s) to complete (timeout: ${TINYBIRD_JOB_TIMEOUT_MS / 1000 / 60} min)...`, ) try { - fs.writeFileSync(jsonFilePath, JSON.stringify(result, null, 2), 'utf-8') - log.info(`✓ Cleanup results saved to: ${jsonFilePath}`) - } catch (error) { - log.error(`Failed to write cleanup results to ${jsonFilePath}: ${error.message}`) - } - - log.info(`\n${'='.repeat(80)}`) - log.info('Cleanup Summary') - log.info(`${'='.repeat(80)}`) - log.info(`✓ Activities ${dryRun ? 'found' : 'deleted'}: ${totalProcessed}`) - log.info(`✓ Batches processed: ${totalBatches}`) - if (failedBatches > 0) { - log.warn(`✗ Failed batches: ${failedBatches}`) - } - - if (tinybirdStatuses.activities.success) { - log.info( - `✓ Tinybird activities deletion job ${dryRun ? 'would be' : 'was'} triggered: ${tinybirdStatuses.activities.jobId || 'N/A'}`, + await tinybird.waitForJobs( + tinybirdStatuses.jobIds, + TINYBIRD_JOB_POLL_INTERVAL_MS, + TINYBIRD_JOB_TIMEOUT_MS, ) - } else { - log.error(`✗ Tinybird activities deletion failed: ${tinybirdStatuses.activities.error}`) - } - - if (tinybirdStatuses.activityRelations.success) { - log.info( - `✓ Tinybird activityRelations deletion job ${dryRun ? 'would be' : 'was'} triggered: ${tinybirdStatuses.activityRelations.jobId || 'N/A'}`, - ) - } else { + log.info(`✓ All Tinybird deletion jobs completed`) + } catch (error) { log.error( - `✗ Tinybird activityRelations deletion failed: ${tinybirdStatuses.activityRelations.error}`, + `Failed to wait for Tinybird deletion jobs: ${error.message} (jobs are still running in background; IDs persisted to ${jsonFilePath})`, ) } + } - if (result.status === 'failure') { - process.exit(1) - } - } catch (error) { - log.error(`Failed to run cleanup: ${error.message}`) - throw error + result.endTime = new Date().toISOString() + const anyFailure = + !result.deletions.postgres.success || + !result.deletions.tinybird.activities.success || + !result.deletions.tinybird.activityRelations.success + result.status = anyFailure ? 'failure' : 'success' + writeResult(jsonFilePath, result) + + log.info(`\n${'='.repeat(80)}`) + log.info('Cleanup Summary') + log.info(`${'='.repeat(80)}`) + log.info(`✓ PostgreSQL rows deleted: ${result.postgresDeleted.toLocaleString()}`) + if (result.postgresFailedBatches > 0) { + log.warn(`✗ PostgreSQL failed batches: ${result.postgresFailedBatches}`) + } + if (tinybirdStatuses.activities.success) { + log.info(`✓ Tinybird activities deletion job: ${tinybirdStatuses.activities.jobId || 'N/A'}`) + } else { + log.error(`✗ Tinybird activities deletion failed: ${tinybirdStatuses.activities.error}`) + } + if (tinybirdStatuses.activityRelations.success) { + log.info( + `✓ Tinybird activityRelations deletion job: ${tinybirdStatuses.activityRelations.jobId || 'N/A'}`, + ) + } else { + log.error( + `✗ Tinybird activityRelations deletion failed: ${tinybirdStatuses.activityRelations.error}`, + ) + } + + if (result.status === 'failure') { + process.exit(1) } } +// --------------------------------------------------------------------------- +// CLI +// --------------------------------------------------------------------------- + function getFlagValue(args: string[], name: string): string | undefined { const idx = args.indexOf(name) if (idx === -1) return undefined @@ -445,6 +470,7 @@ function printHelp(): void { --types \\ [--before ] \\ [--dry-run] \\ + [--yes|-y] \\ [--tb-token ] Required: @@ -452,8 +478,9 @@ function printHelp(): void { --types Comma-separated activity types (e.g. 'merge_request-closed') Optional: - --before Only delete rows with updatedAt < this date (YYYY-MM-DD) + --before Only delete rows with "updatedAt" < this date (YYYY-MM-DD) --dry-run Report what would be deleted without deleting + --yes / -y Skip the interactive confirmation prompt --tb-token Override CROWD_TINYBIRD_ACTIVITIES_TOKEN Examples: @@ -461,7 +488,7 @@ function printHelp(): void { pnpm run cleanup-activities-by-platform-and-type -- \\ --platform gitlab --types merge_request-closed --dry-run - # Actual cleanup + # Actual cleanup (will prompt for confirmation) pnpm run cleanup-activities-by-platform-and-type -- \\ --platform gitlab --types merge_request-closed `) @@ -476,6 +503,7 @@ async function main() { } const dryRun = args.includes('--dry-run') + const skipConfirm = args.includes('--yes') || args.includes('-y') const tbToken = getFlagValue(args, '--tb-token') const platform = getFlagValue(args, '--platform') const typesArg = getFlagValue(args, '--types') @@ -511,9 +539,11 @@ async function main() { } } - if (before && !DATE_PATTERN.test(before)) { - log.error(`Error: --before must match YYYY-MM-DD, got '${before}'`) - process.exit(1) + if (before) { + if (!DATE_PATTERN.test(before) || Number.isNaN(Date.parse(before))) { + log.error(`Error: --before must be a valid YYYY-MM-DD date, got '${before}'`) + process.exit(1) + } } if (dryRun) { @@ -523,7 +553,7 @@ async function main() { } try { - await runCleanup({ platform, types, before }, dryRun, tbToken) + await runCleanup({ platform, types, before }, dryRun, skipConfirm, tbToken) } catch (error) { log.error(error, 'Failed to run cleanup script') log.error(`\n❌ Error: ${error.message}`) From 24e4b7ec20445bd4b95102ac3200afc179eee27a Mon Sep 17 00:00:00 2001 From: Joana Maia Date: Fri, 26 Jun 2026 14:57:50 +0100 Subject: [PATCH 4/4] chore: add --segment-id filter to cleanup-activities script (CM-1298) Signed-off-by: Joana Maia --- ...cleanup-activities-by-platform-and-type.ts | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/services/apps/script_executor_worker/src/bin/cleanup-activities-by-platform-and-type.ts b/services/apps/script_executor_worker/src/bin/cleanup-activities-by-platform-and-type.ts index 524749e2c9..d97992fe63 100644 --- a/services/apps/script_executor_worker/src/bin/cleanup-activities-by-platform-and-type.ts +++ b/services/apps/script_executor_worker/src/bin/cleanup-activities-by-platform-and-type.ts @@ -17,6 +17,7 @@ * pnpm run cleanup-activities-by-platform-and-type -- \ * --platform \ * --types \ + * [--segment-id ] \ * [--before ] \ * [--dry-run] \ * [--yes|-y] \ @@ -27,6 +28,7 @@ * --types Comma-separated activity types (e.g. 'merge_request-closed') * * Optional: + * --segment-id Restrict deletion to a single segment (UUID) * --before Only delete rows with "updatedAt" < this date (YYYY-MM-DD) * --dry-run Report what would be deleted without deleting * --yes / -y Skip the interactive confirmation prompt @@ -54,6 +56,7 @@ const log = getServiceChildLogger('cleanup-activities-by-platform-and-type-scrip // so guard against injection by requiring conservative character sets. const IDENTIFIER_PATTERN = /^[A-Za-z0-9_-]+$/ const DATE_PATTERN = /^\d{4}-\d{2}-\d{2}$/ +const UUID_PATTERN = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/ const POSTGRES_BATCH_SIZE = 10000 const TINYBIRD_JOB_POLL_INTERVAL_MS = 60_000 @@ -62,6 +65,7 @@ const TINYBIRD_JOB_TIMEOUT_MS = 6 * 60 * 60 * 1000 // 6h: bulk deletes can be sl interface CleanupFilters { platform: string types: string[] + segmentId?: string before?: string } @@ -101,6 +105,9 @@ function buildTinybirdFilterClause(filters: CleanupFilters): string { const parts = [`platform = '${filters.platform}'`] const typesList = filters.types.map((t) => `'${t}'`).join(', ') parts.push(`type IN (${typesList})`) + if (filters.segmentId) { + parts.push(`segmentId = '${filters.segmentId}'`) + } if (filters.before) { parts.push(`updatedAt < '${filters.before}'`) } @@ -121,6 +128,10 @@ function buildPostgresFilter(filters: CleanupFilters): { platform: filters.platform, types: filters.types, } + if (filters.segmentId) { + conditions.push(`"segmentId" = $(segmentId)`) + values.segmentId = filters.segmentId + } if (filters.before) { conditions.push(`"updatedAt" < $(beforeDate)`) values.beforeDate = filters.before @@ -468,6 +479,7 @@ function printHelp(): void { pnpm run cleanup-activities-by-platform-and-type -- \\ --platform \\ --types \\ + [--segment-id ] \\ [--before ] \\ [--dry-run] \\ [--yes|-y] \\ @@ -478,6 +490,7 @@ function printHelp(): void { --types Comma-separated activity types (e.g. 'merge_request-closed') Optional: + --segment-id Restrict deletion to a single segment (UUID) --before Only delete rows with "updatedAt" < this date (YYYY-MM-DD) --dry-run Report what would be deleted without deleting --yes / -y Skip the interactive confirmation prompt @@ -507,6 +520,7 @@ async function main() { const tbToken = getFlagValue(args, '--tb-token') const platform = getFlagValue(args, '--platform') const typesArg = getFlagValue(args, '--types') + const segmentId = getFlagValue(args, '--segment-id') const before = getFlagValue(args, '--before') if (!platform) { @@ -546,6 +560,11 @@ async function main() { } } + if (segmentId && !UUID_PATTERN.test(segmentId)) { + log.error(`Error: --segment-id must be a UUID, got '${segmentId}'`) + process.exit(1) + } + if (dryRun) { log.info(`\n${'='.repeat(80)}`) log.info('🧪 DRY RUN MODE - No data will be deleted') @@ -553,7 +572,7 @@ async function main() { } try { - await runCleanup({ platform, types, before }, dryRun, skipConfirm, tbToken) + await runCleanup({ platform, types, segmentId, before }, dryRun, skipConfirm, tbToken) } catch (error) { log.error(error, 'Failed to run cleanup script') log.error(`\n❌ Error: ${error.message}`)