diff --git a/services/apps/script_executor_worker/package.json b/services/apps/script_executor_worker/package.json index d528cdd285..82a87bc110 100644 --- a/services/apps/script_executor_worker/package.json +++ b/services/apps/script_executor_worker/package.json @@ -9,6 +9,7 @@ "dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug", "cleanup-fork-activities": "npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts", "cleanup-member-aggregates": "npx tsx src/bin/cleanup-member-aggregates.ts", + "cleanup-activities-by-platform-and-type": "npx tsx src/bin/cleanup-activities-by-platform-and-type.ts", "recalculate-enrichment-affiliations": "npx tsx src/bin/recalculate-enrichment-affiliations.ts", "recalculate-all-affiliations": "npx tsx src/bin/recalculate-all-affiliations.ts", "add-lf-projects-to-collection": "npx tsx src/bin/add-lf-projects-to-collection.ts", diff --git a/services/apps/script_executor_worker/src/bin/cleanup-activities-by-platform-and-type.ts b/services/apps/script_executor_worker/src/bin/cleanup-activities-by-platform-and-type.ts new file mode 100644 index 0000000000..d97992fe63 --- /dev/null +++ b/services/apps/script_executor_worker/src/bin/cleanup-activities-by-platform-and-type.ts @@ -0,0 +1,586 @@ +/** + * Activities Cleanup Script (by platform + type) + * + * Deletes activities from PostgreSQL (`activityRelations`) and Tinybird + * (`activities` and `activityRelations` datasources) for a given platform and + * one or more activity types, optionally bounded by a date cutoff. + * + * Before any deletion, the script prints the affected row counts from Tinybird + * and prompts for confirmation (skip with --yes). + * + * NOTE: This script only purges the raw Tinybird datasources (`activities` and + * `activityRelations`). Derived materialized views (activities_backup, + * activities_deduplicated_ds, activityRelations_bucket_MV_ds_*, etc.) are NOT + * affected because Tinybird/ClickHouse MV deletes do not cascade. + * + * Usage: + * pnpm run cleanup-activities-by-platform-and-type -- \ + * --platform \ + * --types \ + * [--segment-id ] \ + * [--before ] \ + * [--dry-run] \ + * [--yes|-y] \ + * [--tb-token ] + * + * Required: + * --platform Platform name (e.g. 'gitlab', 'gerrit') + * --types Comma-separated activity types (e.g. 'merge_request-closed') + * + * Optional: + * --segment-id Restrict deletion to a single segment (UUID) + * --before Only delete rows with "updatedAt" < this date (YYYY-MM-DD) + * --dry-run Report what would be deleted without deleting + * --yes / -y Skip the interactive confirmation prompt + * --tb-token Override CROWD_TINYBIRD_ACTIVITIES_TOKEN + * + * Environment Variables Required: + * CROWD_DB_WRITE_HOST, CROWD_DB_PORT, CROWD_DB_USERNAME, CROWD_DB_PASSWORD, + * CROWD_DB_DATABASE, CROWD_TINYBIRD_BASE_URL, CROWD_TINYBIRD_ACTIVITIES_TOKEN + */ +import * as fs from 'fs' +import * as path from 'path' +import * as readline from 'readline' + +import { + TinybirdClient, + WRITE_DB_CONFIG, + getDbConnection, +} from '@crowd/data-access-layer/src/database' +import { QueryExecutor, pgpQx } from '@crowd/data-access-layer/src/queryExecutor' +import { getServiceChildLogger } from '@crowd/logging' + +const log = getServiceChildLogger('cleanup-activities-by-platform-and-type-script') + +// Identifiers passed to Tinybird filters are interpolated, not parameterized, +// so guard against injection by requiring conservative character sets. +const IDENTIFIER_PATTERN = /^[A-Za-z0-9_-]+$/ +const DATE_PATTERN = /^\d{4}-\d{2}-\d{2}$/ +const UUID_PATTERN = /^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$/ + +const POSTGRES_BATCH_SIZE = 10000 +const TINYBIRD_JOB_POLL_INTERVAL_MS = 60_000 +const TINYBIRD_JOB_TIMEOUT_MS = 6 * 60 * 60 * 1000 // 6h: bulk deletes can be slow + +interface CleanupFilters { + platform: string + types: string[] + segmentId?: string + before?: string +} + +interface DeletionStatus { + success: boolean + jobId?: string + error?: string +} + +interface CleanupResult { + status: 'success' | 'failure' | 'pending' + startTime: string + endTime?: string + filters: CleanupFilters + counts: { + tinybirdActivities: number + tinybirdActivityRelations: number + } + postgresDeleted: number + postgresFailedBatches: number + tinybirdJobIds: string[] + deletions: { + postgres: DeletionStatus + tinybird: { + activities: DeletionStatus + activityRelations: DeletionStatus + } + } +} + +// --------------------------------------------------------------------------- +// Filter clause builders +// --------------------------------------------------------------------------- + +/** Tinybird/ClickHouse WHERE clause — unquoted identifiers, single-quoted strings. */ +function buildTinybirdFilterClause(filters: CleanupFilters): string { + const parts = [`platform = '${filters.platform}'`] + const typesList = filters.types.map((t) => `'${t}'`).join(', ') + parts.push(`type IN (${typesList})`) + if (filters.segmentId) { + parts.push(`segmentId = '${filters.segmentId}'`) + } + if (filters.before) { + parts.push(`updatedAt < '${filters.before}'`) + } + return parts.join(' AND ') +} + +/** + * Postgres WHERE clause + pg-promise param map. + * `"updatedAt"` is camelCase and must be double-quoted; `platform` and `type` + * are lowercase and unquoted-safe. + */ +function buildPostgresFilter(filters: CleanupFilters): { + where: string + values: Record +} { + const conditions: string[] = [`platform = $(platform)`, `type IN ($(types:csv))`] + const values: Record = { + platform: filters.platform, + types: filters.types, + } + if (filters.segmentId) { + conditions.push(`"segmentId" = $(segmentId)`) + values.segmentId = filters.segmentId + } + if (filters.before) { + conditions.push(`"updatedAt" < $(beforeDate)`) + values.beforeDate = filters.before + } + return { where: conditions.join(' AND '), values } +} + +// --------------------------------------------------------------------------- +// Init +// --------------------------------------------------------------------------- + +async function initPostgresClient(): Promise { + log.info('Initializing Postgres connection...') + const dbConnection = await getDbConnection(WRITE_DB_CONFIG()) + const queryExecutor = pgpQx(dbConnection) + log.info('Postgres connection established') + return queryExecutor +} + +// --------------------------------------------------------------------------- +// Count helpers +// --------------------------------------------------------------------------- + +async function countTinybirdRows( + tinybird: TinybirdClient, + datasource: string, + filters: CleanupFilters, +): Promise { + const whereClause = buildTinybirdFilterClause(filters) + const query = `SELECT count() AS c FROM ${datasource} WHERE ${whereClause} FORMAT JSON` + const result = await tinybird.executeSql<{ data: Array<{ c: number }> }>(query) + return result.data[0]?.c ?? 0 +} + +// --------------------------------------------------------------------------- +// Confirmation prompt +// --------------------------------------------------------------------------- + +async function confirmOrAbort(message: string): Promise { + const rl = readline.createInterface({ input: process.stdin, output: process.stdout }) + return new Promise((resolve, reject) => { + rl.question(`${message}\nType "yes" to proceed: `, (answer) => { + rl.close() + if (answer.trim().toLowerCase() === 'yes') { + resolve() + } else { + reject(new Error('Aborted by user')) + } + }) + }) +} + +// --------------------------------------------------------------------------- +// Postgres chunked delete +// --------------------------------------------------------------------------- + +/** + * Fetch matching IDs from `activityRelations` and delete by PK in batches. + * PK deletes are cheap index lookups; the filter scan happens once per batch. + * Returns { deleted, failedBatches }. Stops on the first failed batch so we + * don't re-fetch the same undeleted rows forever. + */ +async function deletePostgresInChunks( + postgres: QueryExecutor, + filters: CleanupFilters, + batchSize = POSTGRES_BATCH_SIZE, +): Promise<{ deleted: number; failedBatches: number }> { + const { where, values } = buildPostgresFilter(filters) + const fetchQuery = `SELECT "activityId" FROM "activityRelations" WHERE ${where} LIMIT ${batchSize}` + + let total = 0 + let batch = 0 + let failedBatches = 0 + let rows: Array<{ activityId: string }> + + do { + rows = (await postgres.select(fetchQuery, values)) as Array<{ activityId: string }> + if (rows.length === 0) break + + const ids = rows.map((r) => r.activityId) + batch++ + + try { + await postgres.result(`DELETE FROM "activityRelations" WHERE "activityId" IN ($(ids:csv))`, { + ids, + }) + total += ids.length + } catch (error) { + log.error( + ` Batch ${batch} delete failed (sample IDs: ${ids.slice(0, 3).join(', ')}): ${error.message}`, + ) + failedBatches++ + break + } + + if (batch % 10 === 0) { + log.info(` … deleted ${total.toLocaleString()} rows so far (batch ${batch})`) + } + } while (rows.length === batchSize) + + return { deleted: total, failedBatches } +} + +// --------------------------------------------------------------------------- +// Tinybird delete jobs +// --------------------------------------------------------------------------- + +async function deleteActivitiesFromTinybird( + tinybird: TinybirdClient, + filters: CleanupFilters, +): Promise<{ + activities: DeletionStatus + activityRelations: DeletionStatus + jobIds: string[] +}> { + const deleteCondition = buildTinybirdFilterClause(filters) + const results = { + activities: { success: false } as DeletionStatus, + activityRelations: { success: false } as DeletionStatus, + } + const triggeredJobIds: string[] = [] + + log.info('Triggering deletion job for Tinybird activities datasource...') + try { + const resp = await tinybird.deleteDatasource('activities', deleteCondition, true, false) + log.info(`✓ Triggered activities deletion job (job_id: ${resp.job_id})`) + triggeredJobIds.push(resp.job_id) + results.activities = { success: true, jobId: resp.job_id } + } catch (error) { + log.error(`Failed to trigger deletion job for activities: ${error.message}`) + results.activities = { success: false, error: error.message } + } + + log.info('Triggering deletion job for Tinybird activityRelations datasource...') + try { + const resp = await tinybird.deleteDatasource('activityRelations', deleteCondition, true, false) + log.info(`✓ Triggered activityRelations deletion job (job_id: ${resp.job_id})`) + triggeredJobIds.push(resp.job_id) + results.activityRelations = { success: true, jobId: resp.job_id } + } catch (error) { + log.error(`Failed to trigger deletion job for activityRelations: ${error.message}`) + results.activityRelations = { success: false, error: error.message } + } + + return { ...results, jobIds: triggeredJobIds } +} + +// --------------------------------------------------------------------------- +// Result JSON persistence +// --------------------------------------------------------------------------- + +function resultJsonPath(filters: CleanupFilters, startTime: string): string { + const safeSuffix = `${filters.platform}_${filters.types.join('-')}`.replace( + /[^A-Za-z0-9_-]/g, + '_', + ) + return path.join( + '/tmp', + `cleanup_activities_${safeSuffix}_${startTime.replace(/[:.]/g, '-')}.json`, + ) +} + +function writeResult(filePath: string, result: CleanupResult): void { + try { + fs.writeFileSync(filePath, JSON.stringify(result, null, 2), 'utf-8') + log.info(`✓ Cleanup results saved to: ${filePath}`) + } catch (error) { + log.error(`Failed to write cleanup results to ${filePath}: ${error.message}`) + } +} + +// --------------------------------------------------------------------------- +// Main orchestration +// --------------------------------------------------------------------------- + +async function runCleanup( + filters: CleanupFilters, + dryRun: boolean, + skipConfirm: boolean, + tbToken?: string, +): Promise { + const startTime = new Date().toISOString() + + log.info(`\n${'='.repeat(80)}`) + log.info(`${dryRun ? '[DRY RUN MODE] ' : ''}Activities Cleanup`) + log.info(`Filters: ${buildTinybirdFilterClause(filters)}`) + log.info(`${'='.repeat(80)}`) + + const postgres = await initPostgresClient() + const tinybird = new TinybirdClient(tbToken) + + log.info('Counting affected rows in Tinybird...') + const [tbActivitiesCount, tbRelationsCount] = await Promise.all([ + countTinybirdRows(tinybird, 'activities', filters), + countTinybirdRows(tinybird, 'activityRelations', filters), + ]) + + log.info(` Tinybird activities : ${tbActivitiesCount.toLocaleString()} rows`) + log.info(` Tinybird activityRelations : ${tbRelationsCount.toLocaleString()} rows`) + log.info(` PostgreSQL activityRelations : will be deleted by streaming batches (no pre-count)`) + + if (dryRun) { + log.info(`\n[DRY RUN] Would delete:`) + log.info( + ` PostgreSQL activityRelations matching filter (actual count reported during real execution)`, + ) + log.info(` ${tbActivitiesCount.toLocaleString()} rows from Tinybird activities`) + log.info(` ${tbRelationsCount.toLocaleString()} rows from Tinybird activityRelations`) + log.info('[DRY RUN] No data was deleted.') + return + } + + if (tbActivitiesCount === 0 && tbRelationsCount === 0) { + log.info('No matching rows found in Tinybird. Nothing to delete.') + return + } + + if (!skipConfirm) { + await confirmOrAbort( + `\nAbout to permanently delete PG rows matching filter, ${tbActivitiesCount.toLocaleString()} TB activities, ${tbRelationsCount.toLocaleString()} TB activityRelations.`, + ) + } + + const result: CleanupResult = { + status: 'pending', + startTime, + filters, + counts: { + tinybirdActivities: tbActivitiesCount, + tinybirdActivityRelations: tbRelationsCount, + }, + postgresDeleted: 0, + postgresFailedBatches: 0, + tinybirdJobIds: [], + deletions: { + postgres: { success: true }, + tinybird: { + activities: { success: true }, + activityRelations: { success: true }, + }, + }, + } + const jsonFilePath = resultJsonPath(filters, startTime) + + log.info(`\nStep 1: Deleting matching rows from PostgreSQL in ${POSTGRES_BATCH_SIZE} batches...`) + try { + const pgResult = await deletePostgresInChunks(postgres, filters) + result.postgresDeleted = pgResult.deleted + result.postgresFailedBatches = pgResult.failedBatches + if (pgResult.failedBatches > 0) { + log.warn( + ` ${pgResult.failedBatches} batch(es) failed — ${pgResult.deleted.toLocaleString()} rows deleted successfully`, + ) + result.deletions.postgres = { + success: false, + error: `${pgResult.failedBatches} batch(es) failed`, + } + } else { + log.info(`✓ Deleted ${pgResult.deleted.toLocaleString()} row(s) from PostgreSQL`) + } + } catch (error) { + log.error(`Postgres deletion failed: ${error.message}`) + result.deletions.postgres = { success: false, error: error.message } + } + + log.info('\nStep 2: Triggering Tinybird deletions...') + const tinybirdStatuses = await deleteActivitiesFromTinybird(tinybird, filters) + result.deletions.tinybird.activities = tinybirdStatuses.activities + result.deletions.tinybird.activityRelations = tinybirdStatuses.activityRelations + result.tinybirdJobIds = tinybirdStatuses.jobIds + + // Persist result BEFORE waiting on TB jobs so the job IDs survive a timeout. + writeResult(jsonFilePath, result) + + if (tinybirdStatuses.jobIds.length > 0) { + log.info( + `Waiting for ${tinybirdStatuses.jobIds.length} Tinybird deletion job(s) to complete (timeout: ${TINYBIRD_JOB_TIMEOUT_MS / 1000 / 60} min)...`, + ) + try { + await tinybird.waitForJobs( + tinybirdStatuses.jobIds, + TINYBIRD_JOB_POLL_INTERVAL_MS, + TINYBIRD_JOB_TIMEOUT_MS, + ) + log.info(`✓ All Tinybird deletion jobs completed`) + } catch (error) { + log.error( + `Failed to wait for Tinybird deletion jobs: ${error.message} (jobs are still running in background; IDs persisted to ${jsonFilePath})`, + ) + } + } + + result.endTime = new Date().toISOString() + const anyFailure = + !result.deletions.postgres.success || + !result.deletions.tinybird.activities.success || + !result.deletions.tinybird.activityRelations.success + result.status = anyFailure ? 'failure' : 'success' + writeResult(jsonFilePath, result) + + log.info(`\n${'='.repeat(80)}`) + log.info('Cleanup Summary') + log.info(`${'='.repeat(80)}`) + log.info(`✓ PostgreSQL rows deleted: ${result.postgresDeleted.toLocaleString()}`) + if (result.postgresFailedBatches > 0) { + log.warn(`✗ PostgreSQL failed batches: ${result.postgresFailedBatches}`) + } + if (tinybirdStatuses.activities.success) { + log.info(`✓ Tinybird activities deletion job: ${tinybirdStatuses.activities.jobId || 'N/A'}`) + } else { + log.error(`✗ Tinybird activities deletion failed: ${tinybirdStatuses.activities.error}`) + } + if (tinybirdStatuses.activityRelations.success) { + log.info( + `✓ Tinybird activityRelations deletion job: ${tinybirdStatuses.activityRelations.jobId || 'N/A'}`, + ) + } else { + log.error( + `✗ Tinybird activityRelations deletion failed: ${tinybirdStatuses.activityRelations.error}`, + ) + } + + if (result.status === 'failure') { + process.exit(1) + } +} + +// --------------------------------------------------------------------------- +// CLI +// --------------------------------------------------------------------------- + +function getFlagValue(args: string[], name: string): string | undefined { + const idx = args.indexOf(name) + if (idx === -1) return undefined + if (idx + 1 >= args.length) { + log.error(`Error: ${name} requires a value`) + process.exit(1) + } + return args[idx + 1] +} + +function printHelp(): void { + log.info(` + Usage: + pnpm run cleanup-activities-by-platform-and-type -- \\ + --platform \\ + --types \\ + [--segment-id ] \\ + [--before ] \\ + [--dry-run] \\ + [--yes|-y] \\ + [--tb-token ] + + Required: + --platform Platform name (e.g. 'gitlab', 'gerrit') + --types Comma-separated activity types (e.g. 'merge_request-closed') + + Optional: + --segment-id Restrict deletion to a single segment (UUID) + --before Only delete rows with "updatedAt" < this date (YYYY-MM-DD) + --dry-run Report what would be deleted without deleting + --yes / -y Skip the interactive confirmation prompt + --tb-token Override CROWD_TINYBIRD_ACTIVITIES_TOKEN + + Examples: + # Dry run: see how many gitlab merge_request-closed rows would be deleted + pnpm run cleanup-activities-by-platform-and-type -- \\ + --platform gitlab --types merge_request-closed --dry-run + + # Actual cleanup (will prompt for confirmation) + pnpm run cleanup-activities-by-platform-and-type -- \\ + --platform gitlab --types merge_request-closed + `) +} + +async function main() { + const args = process.argv.slice(2) + + if (args.includes('--help') || args.includes('-h') || args.length === 0) { + printHelp() + process.exit(args.length === 0 ? 1 : 0) + } + + const dryRun = args.includes('--dry-run') + const skipConfirm = args.includes('--yes') || args.includes('-y') + const tbToken = getFlagValue(args, '--tb-token') + const platform = getFlagValue(args, '--platform') + const typesArg = getFlagValue(args, '--types') + const segmentId = getFlagValue(args, '--segment-id') + const before = getFlagValue(args, '--before') + + if (!platform) { + log.error('Error: --platform is required') + printHelp() + process.exit(1) + } + if (!IDENTIFIER_PATTERN.test(platform)) { + log.error(`Error: invalid --platform value '${platform}' (allowed: ${IDENTIFIER_PATTERN})`) + process.exit(1) + } + + if (!typesArg) { + log.error('Error: --types is required (comma-separated)') + printHelp() + process.exit(1) + } + const types = typesArg + .split(',') + .map((t) => t.trim()) + .filter(Boolean) + if (types.length === 0) { + log.error('Error: --types must contain at least one value') + process.exit(1) + } + for (const t of types) { + if (!IDENTIFIER_PATTERN.test(t)) { + log.error(`Error: invalid type '${t}' (allowed: ${IDENTIFIER_PATTERN})`) + process.exit(1) + } + } + + if (before) { + if (!DATE_PATTERN.test(before) || Number.isNaN(Date.parse(before))) { + log.error(`Error: --before must be a valid YYYY-MM-DD date, got '${before}'`) + process.exit(1) + } + } + + if (segmentId && !UUID_PATTERN.test(segmentId)) { + log.error(`Error: --segment-id must be a UUID, got '${segmentId}'`) + process.exit(1) + } + + if (dryRun) { + log.info(`\n${'='.repeat(80)}`) + log.info('🧪 DRY RUN MODE - No data will be deleted') + log.info(`${'='.repeat(80)}\n`) + } + + try { + await runCleanup({ platform, types, segmentId, before }, dryRun, skipConfirm, tbToken) + } catch (error) { + log.error(error, 'Failed to run cleanup script') + log.error(`\n❌ Error: ${error.message}`) + process.exit(1) + } +} + +main().catch((error) => { + log.error('Unexpected error:', error) + process.exit(1) +}) diff --git a/services/apps/script_executor_worker/src/bin/cleanup-gerrit-activities.ts b/services/apps/script_executor_worker/src/bin/cleanup-gerrit-activities.ts deleted file mode 100644 index 93b056f566..0000000000 --- a/services/apps/script_executor_worker/src/bin/cleanup-gerrit-activities.ts +++ /dev/null @@ -1,502 +0,0 @@ -/** - * Gerrit Activities Cleanup Script - * - * PROBLEM: - * Gerrit activities need to be cleaned up from both PostgreSQL and Tinybird - * based on specific platform and type filters with a date cutoff. - * - * SOLUTION: - * This script deletes activities from Gerrit platform across: - * - PostgreSQL (activityRelations table only) - * - Tinybird (activities and activityRelations tables) - * - * Filters: - * - platform = 'gerrit' - * - type in ('changeset-merged', 'changeset-abandoned', 'patchset_approval-created') - * - updatedAt < '2025-12-15' - * - * Usage: - * # Via package.json script (recommended): - * pnpm run cleanup-gerrit-activities -- [--dry-run] [--tb-token ] - * - * # Or directly with tsx: - * npx tsx src/bin/cleanup-gerrit-activities.ts [--dry-run] [--tb-token ] - * - * Options: - * --dry-run Display what would be deleted without actually deleting anything - * --tb-token Tinybird API token to use (overrides CROWD_TINYBIRD_ACTIVITIES_TOKEN environment variable) - * - * Environment Variables Required: - * CROWD_DB_WRITE_HOST - Postgres write host - * CROWD_DB_PORT - Postgres port - * CROWD_DB_USERNAME - Postgres username - * CROWD_DB_PASSWORD - Postgres password - * CROWD_DB_DATABASE - Postgres database name - * CROWD_TINYBIRD_BASE_URL - Tinybird API base URL - * CROWD_TINYBIRD_ACTIVITIES_TOKEN - Tinybird API token - */ -import * as fs from 'fs' -import * as path from 'path' - -import { - TinybirdClient, - WRITE_DB_CONFIG, - getDbConnection, -} from '@crowd/data-access-layer/src/database' -import { QueryExecutor, pgpQx } from '@crowd/data-access-layer/src/queryExecutor' -import { getServiceChildLogger } from '@crowd/logging' - -const log = getServiceChildLogger('cleanup-gerrit-activities-script') - -interface DeletionStatus { - success: boolean - jobId?: string - error?: string -} - -interface CleanupResult { - status: 'success' | 'failure' - startTime: string - endTime: string - totalBatches: number - failedBatches: number - deletions: { - postgres: DeletionStatus - tinybird: { - activities: DeletionStatus - activityRelations: DeletionStatus - } - } -} - -/** - * Initialize Postgres connection using QueryExecutor - */ -async function initPostgresClient(): Promise { - log.info('Initializing Postgres connection...') - - const dbConnection = await getDbConnection(WRITE_DB_CONFIG()) - const queryExecutor = pgpQx(dbConnection) - - log.info('Postgres connection established') - return queryExecutor -} - -/** - * Query activity IDs from Tinybird in batches and delete from Postgres immediately - * Uses batched queries to avoid hitting Tinybird's result size limit (100 MiB) - */ -async function queryAndProcessActivityIdsInBatches( - tinybird: TinybirdClient, - postgres: QueryExecutor, - dryRun: boolean, - onBatchProcessed: () => void, -): Promise { - log.info('Querying activity IDs from Tinybird for Gerrit cleanup...') - - const BATCH_SIZE = 10000 - let offset = 0 - let hasMore = true - let totalProcessed = 0 - let batchNumber = 0 - - try { - while (hasMore) { - const query = `SELECT DISTINCT activityId FROM activityRelations WHERE platform = 'gerrit' AND type IN ('changeset-merged', 'changeset-abandoned', 'patchset_approval-created') AND updatedAt < '2025-12-15' ORDER BY activityId LIMIT ${BATCH_SIZE} OFFSET ${offset} FORMAT JSON` - log.info(`Querying batch: offset=${offset}, limit=${BATCH_SIZE}`) - - const result = await tinybird.executeSql<{ data: Array<{ activityId: string }> }>(query) - const batchActivityIds = result.data.map((row) => row.activityId) - - if (batchActivityIds.length === 0) { - hasMore = false - } else { - batchNumber++ - log.info( - `Processing batch ${batchNumber} (${batchActivityIds.length} activities, total processed: ${totalProcessed})...`, - ) - - const postgresStatus = await deleteActivityRelationsFromPostgres( - postgres, - batchActivityIds, - dryRun, - ) - - if (!postgresStatus.success) { - log.error(`Failed to delete batch ${batchNumber} from Postgres: ${postgresStatus.error}`) - } - - totalProcessed += batchActivityIds.length - onBatchProcessed() - - // If we got fewer results than the batch size, we've reached the end - if (batchActivityIds.length < BATCH_SIZE) { - hasMore = false - } else { - offset += BATCH_SIZE - } - } - } - - log.info(`Found and processed ${totalProcessed} total activity ID(s) in Tinybird`) - return totalProcessed - } catch (error) { - const statusCode = error?.response?.status || 'unknown' - const responseBody = error?.response?.data - ? JSON.stringify(error.response.data) - : error?.response?.body || 'no body' - log.error( - `Failed to query activity IDs from Tinybird: ${error.message} (status: ${statusCode}, body: ${responseBody})`, - ) - throw error - } -} - -/** - * Delete activity relations from Postgres using activity IDs - */ -async function deleteActivityRelationsFromPostgres( - postgres: QueryExecutor, - activityIds: string[], - dryRun = false, -): Promise { - if (activityIds.length === 0) { - log.info(`No activity IDs to ${dryRun ? 'query' : 'delete'} from Postgres`) - return { success: true } - } - - try { - if (dryRun) { - log.info(`[DRY RUN] Querying ${activityIds.length} activity relations from Postgres...`) - const query = ` - SELECT COUNT(*) as count - FROM "activityRelations" - WHERE "activityId" IN ($(activityIds:csv)) - ` - const result = (await postgres.selectOne(query, { activityIds })) as { count: string } - const rowCount = parseInt(result.count, 10) - log.info(`[DRY RUN] Would delete ${rowCount} activity relation(s) from Postgres`) - return { success: true } - } - - log.info(`Deleting ${activityIds.length} activity relations from Postgres...`) - const query = ` - DELETE FROM "activityRelations" - WHERE "activityId" IN ($(activityIds:csv)) - ` - const rowCount = await postgres.result(query, { activityIds }) - log.info(`✓ Deleted ${rowCount} activity relation(s) from Postgres`) - return { success: true } - } catch (error) { - log.error(`Failed to delete activity relations from Postgres: ${error.message}`) - return { success: false, error: error.message } - } -} - -/** - * Delete activities from Tinybird using the delete API - */ -async function deleteActivitiesFromTinybird( - tinybird: TinybirdClient, - dryRun = false, -): Promise<{ - activities: DeletionStatus - activityRelations: DeletionStatus - jobIds: string[] -}> { - const results = { - activities: { success: false } as DeletionStatus, - activityRelations: { success: false } as DeletionStatus, - } - - if (dryRun) { - log.info('[DRY RUN] Would delete activities from Tinybird using Gerrit filters...') - log.info( - `[DRY RUN] Filters: platform='gerrit', type in ('changeset-merged', 'changeset-abandoned', 'patchset_approval-created'), updatedAt < '2025-12-15'`, - ) - log.info(`[DRY RUN] Would delete from 'activities' datasource`) - log.info(`[DRY RUN] Would delete from 'activityRelations' datasource`) - return { - activities: { success: true }, - activityRelations: { success: true }, - jobIds: [], - } - } - - log.info('Deleting activities from Tinybird using Gerrit filters...') - - const triggeredJobIds: string[] = [] - - // Define deletion conditions - const activitiesDeleteCondition = `platform = 'gerrit' AND type IN ('changeset-merged', 'changeset-abandoned', 'patchset_approval-created') AND updatedAt < '2025-12-15'` - const activityRelationsDeleteCondition = `platform = 'gerrit' AND type IN ('changeset-merged', 'changeset-abandoned', 'patchset_approval-created') AND updatedAt < '2025-12-15'` - - // Delete from activities datasource - try { - log.info('Triggering deletion job for activities datasource...') - const activitiesJobResponse = await tinybird.deleteDatasource( - 'activities', - activitiesDeleteCondition, - true, - false, // Don't wait - ) - log.info(`✓ Triggered deletion job for activities (job_id: ${activitiesJobResponse.job_id})`) - triggeredJobIds.push(activitiesJobResponse.job_id) - results.activities = { - success: true, - jobId: activitiesJobResponse.job_id, - } - } catch (error) { - log.error(`Failed to trigger deletion job for activities datasource: ${error.message}`) - results.activities = { - success: false, - error: error.message, - } - } - - // Delete from activityRelations datasource - try { - log.info('Triggering deletion job for activityRelations datasource...') - const activityRelationsJobResponse = await tinybird.deleteDatasource( - 'activityRelations', - activityRelationsDeleteCondition, - true, - false, // Don't wait - ) - log.info( - `✓ Triggered deletion job for activityRelations (job_id: ${activityRelationsJobResponse.job_id})`, - ) - triggeredJobIds.push(activityRelationsJobResponse.job_id) - results.activityRelations = { - success: true, - jobId: activityRelationsJobResponse.job_id, - } - } catch (error) { - log.error(`Failed to trigger deletion job for activityRelations datasource: ${error.message}`) - results.activityRelations = { - success: false, - error: error.message, - } - } - - log.info(`✓ All deletion jobs triggered (${triggeredJobIds.length} running in background)`) - - return { - ...results, - jobIds: triggeredJobIds, - } -} - -/** - * Main cleanup process - */ -async function runCleanup(dryRun = false, tbToken?: string): Promise { - const startTime = new Date().toISOString() - let failedBatches = 0 - let totalBatches = 0 - - if (dryRun) { - log.info(`\n${'='.repeat(80)}`) - log.info(`[DRY RUN MODE] Gerrit Activities Cleanup`) - log.info(`${'='.repeat(80)}`) - } else { - log.info(`\n${'='.repeat(80)}`) - log.info(`Gerrit Activities Cleanup`) - log.info(`${'='.repeat(80)}`) - } - - try { - // Initialize database clients - const postgres = await initPostgresClient() - const tinybird = new TinybirdClient(tbToken) - - // Track deletion statuses - const allDeletionStatuses = { - postgres: { success: true } as DeletionStatus, - tinybird: { - activities: { success: true } as DeletionStatus, - activityRelations: { success: true } as DeletionStatus, - }, - } - - // Step 1: Query activity IDs from Tinybird in batches and delete from Postgres as we go - log.info( - 'Step 1: Processing activity IDs in batches from Tinybird and deleting from Postgres...', - ) - const totalProcessed = await queryAndProcessActivityIdsInBatches( - tinybird, - postgres, - dryRun, - () => { - totalBatches++ - }, - ) - - if (totalProcessed === 0) { - log.info('No activities to delete, skipping Tinybird deletion steps') - log.info(`✓ Completed ${dryRun ? 'dry run for' : 'cleanup for'} Gerrit activities`) - return - } - - log.info(`✓ Completed processing ${totalProcessed} activities from Postgres`) - - // Step 2: Delete from Tinybird in a single operation per datasource - log.info('Step 2: Triggering Tinybird deletions...') - const tinybirdStatuses = await deleteActivitiesFromTinybird(tinybird, dryRun) - - // Track failures from Tinybird - if (!tinybirdStatuses.activities.success) { - allDeletionStatuses.tinybird.activities = tinybirdStatuses.activities - failedBatches++ - } - if (!tinybirdStatuses.activityRelations.success) { - allDeletionStatuses.tinybird.activityRelations = tinybirdStatuses.activityRelations - failedBatches++ - } - - // Wait for all Tinybird deletion jobs to complete - if (!dryRun && tinybirdStatuses.jobIds.length > 0) { - log.info( - `Waiting for ${tinybirdStatuses.jobIds.length} Tinybird deletion job(s) to complete...`, - ) - try { - await tinybird.waitForJobs(tinybirdStatuses.jobIds, 60000, 3600000) // 1min interval, 1h timeout - log.info(`✓ All Tinybird deletion jobs completed`) - } catch (error) { - log.error(`Failed to wait for Tinybird deletion jobs: ${error.message}`) - // Continue anyway - jobs are still running in background - } - } - - // Create cleanup result - const endTime = new Date().toISOString() - const result: CleanupResult = { - status: failedBatches > 0 ? 'failure' : 'success', - startTime, - endTime, - totalBatches, - failedBatches, - deletions: allDeletionStatuses, - } - - // Save results to file - const jsonFilePath = path.join( - '/tmp', - `cleanup_gerrit_activities_${new Date().toISOString().replace(/[:.]/g, '-')}.json`, - ) - try { - fs.writeFileSync(jsonFilePath, JSON.stringify(result, null, 2), 'utf-8') - log.info(`✓ Cleanup results saved to: ${jsonFilePath}`) - } catch (error) { - log.error(`Failed to write cleanup results to ${jsonFilePath}: ${error.message}`) - } - - // Summary - log.info(`\n${'='.repeat(80)}`) - log.info('Cleanup Summary') - log.info(`${'='.repeat(80)}`) - log.info(`✓ Activities ${dryRun ? 'found' : 'deleted'}: ${totalProcessed}`) - log.info(`✓ Batches processed: ${totalBatches}`) - if (failedBatches > 0) { - log.warn(`✗ Failed batches: ${failedBatches}`) - } - - if (tinybirdStatuses.activities.success) { - log.info( - `✓ Tinybird activities deletion job ${dryRun ? 'would be' : 'was'} triggered: ${tinybirdStatuses.activities.jobId || 'N/A'}`, - ) - } else { - log.error(`✗ Tinybird activities deletion failed: ${tinybirdStatuses.activities.error}`) - } - - if (tinybirdStatuses.activityRelations.success) { - log.info( - `✓ Tinybird activityRelations deletion job ${dryRun ? 'would be' : 'was'} triggered: ${tinybirdStatuses.activityRelations.jobId || 'N/A'}`, - ) - } else { - log.error( - `✗ Tinybird activityRelations deletion failed: ${tinybirdStatuses.activityRelations.error}`, - ) - } - - if (result.status === 'failure') { - process.exit(1) - } - } catch (error) { - log.error(`Failed to run Gerrit cleanup: ${error.message}`) - throw error - } -} - -/** - * Main entry point - */ -async function main() { - const args = process.argv.slice(2) - - // Parse flags - const dryRunIndex = args.indexOf('--dry-run') - const tbTokenIndex = args.indexOf('--tb-token') - const dryRun = dryRunIndex !== -1 - - // Extract tb-token value if provided - let tbToken: string | undefined - if (tbTokenIndex !== -1) { - if (tbTokenIndex + 1 >= args.length) { - log.error('Error: --tb-token requires a value') - process.exit(1) - } - tbToken = args[tbTokenIndex + 1] - } - - // Check for help flag or no valid arguments - if (args.includes('--help') || args.includes('-h')) { - log.info(` - Usage: - # Via package.json script (recommended): - pnpm run cleanup-gerrit-activities -- [--dry-run] [--tb-token ] - - # Or directly with tsx: - npx tsx src/bin/cleanup-gerrit-activities.ts [--dry-run] [--tb-token ] - - Options: - --dry-run: (optional) Display what would be deleted without actually deleting anything - --tb-token: (optional) Tinybird API token to use (overrides CROWD_TINYBIRD_ACTIVITIES_TOKEN environment variable) - - Examples: - # Run cleanup - pnpm run cleanup-gerrit-activities - - # Dry run to preview what would be deleted - pnpm run cleanup-gerrit-activities -- --dry-run - - # Use custom Tinybird token - pnpm run cleanup-gerrit-activities -- --tb-token your-token-here - - Filters applied: - - platform = 'gerrit' - - type in ('changeset-merged', 'changeset-abandoned', 'patchset_approval-created') - - updatedAt < '2025-12-15' - `) - process.exit(0) - } - - if (dryRun) { - log.info(`\n${'='.repeat(80)}`) - log.info('🧪 DRY RUN MODE - No data will be deleted') - log.info(`${'='.repeat(80)}\n`) - } - - try { - await runCleanup(dryRun, tbToken) - } catch (error) { - log.error(error, 'Failed to run Gerrit cleanup script') - log.error(`\n❌ Error: ${error.message}`) - process.exit(1) - } -} - -main().catch((error) => { - log.error('Unexpected error:', error) - process.exit(1) -}) diff --git a/services/libs/integrations/src/integrations/gitlab/processStream.ts b/services/libs/integrations/src/integrations/gitlab/processStream.ts index 5af8196efd..8f41a49724 100644 --- a/services/libs/integrations/src/integrations/gitlab/processStream.ts +++ b/services/libs/integrations/src/integrations/gitlab/processStream.ts @@ -192,7 +192,7 @@ const handleMergeRequestsStream: GitlabStreamHandler = async (ctx, api, data) => data: item.data, user, }, - type: GitlabActivityType.MERGE_REQUEST_CLOSED, + type: GitlabActivityType.MERGE_REQUEST_MERGED, projectId: data.projectId, pathWithNamespace: data.pathWithNamespace, })