From adab294d67b8ced87d41484da82d35389bf4d4ea Mon Sep 17 00:00:00 2001 From: Sayak Maity Date: Thu, 2 Jul 2026 15:16:14 -0400 Subject: [PATCH 1/3] fix: make DB migrations idempotent and stamp alembic head after init_database - Initial revision adopts pre-existing schemas (skips initial.sql when hosted_model_inference.endpoints already exists) instead of failing on DuplicateSchema. - All add-column revisions guard add_column/drop_column with inspector existence checks, so they no-op on schemas created via create_all. - init_database entrypoint stamps 'alembic head' after a successful create_all so initialized databases are never left unstamped, and no longer prints the full connection URL (host/database only). --- .../model_engine_server/db/migrations/README | 17 +++++++ .../2024_09_09_1736-fa3267c80731_initial.py | 11 +++++ ...711e35_chat_completion_add_extra_routes.py | 27 ++++++---- ...9_24_1456-f55525c81eb5_multinode_bundle.py | 49 +++++++++++-------- ...-e580182d6bfd_add_passthrough_forwarder.py | 27 ++++++---- ..._25_1940-221aa19d3f32_add_routes_column.py | 27 ++++++---- ...f8b3403_add_task_expires_seconds_column.py | 27 ++++++---- ...dd_queue_message_timeout_seconds_column.py | 27 ++++++---- ...0-c4d5e6f7a8b9_add_status_reason_column.py | 27 ++++++---- .../entrypoints/init_database.py | 22 ++++++++- 10 files changed, 179 insertions(+), 82 deletions(-) diff --git a/model-engine/model_engine_server/db/migrations/README b/model-engine/model_engine_server/db/migrations/README index 7863d56d2..204906731 100644 --- a/model-engine/model_engine_server/db/migrations/README +++ b/model-engine/model_engine_server/db/migrations/README @@ -19,6 +19,23 @@ alembic revision -m “initial” alembic stamp fa3267c80731 ``` +# Idempotency and adoption of pre-alembic databases + +Migrations are self-adopting and safe to re-run: + +1. The initial revision (fa3267c80731) checks whether + hosted_model_inference.endpoints already exists. If it does (e.g. the + database was initialized via the init_database entrypoint's create_all + before alembic was ever run), it skips initial.sql and just records the + revision instead of failing on CREATE SCHEMA. +2. Subsequent add-column revisions check for column existence before + adding/dropping, so they no-op on schemas that already have the columns. +3. The init_database entrypoint stamps `alembic head` after a successful + create_all, so freshly initialized databases are never left unstamped. + +Manual stamping (stamp_initial_schema.sh) is therefore only needed for +databases created before this behavior existed. + # Steps to make generic database schema changes Steps can be found here: https://alembic.sqlalchemy.org/en/latest/tutorial.html#running-our-second-migration diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_09_1736-fa3267c80731_initial.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_09_1736-fa3267c80731_initial.py index efee89638..0026c19f2 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_09_1736-fa3267c80731_initial.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_09_1736-fa3267c80731_initial.py @@ -22,6 +22,17 @@ def upgrade() -> None: + # If the schema already exists (e.g. the database was initialized by + # init_database's create_all before alembic was stamped), adopt the + # existing schema instead of replaying initial.sql, whose bare + # CREATE SCHEMA statements would fail with DuplicateSchema. + inspector = sa.inspect(op.get_bind()) + if inspector.has_table("endpoints", schema="hosted_model_inference"): + print( + "Table hosted_model_inference.endpoints already exists; " + "adopting existing schema and skipping initial.sql." + ) + return with open(INITIAL_MIGRATION_PATH) as fd: op.execute(fd.read()) diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_09_1831-b574e9711e35_chat_completion_add_extra_routes.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_09_1831-b574e9711e35_chat_completion_add_extra_routes.py index 43279e0f9..24bf5701f 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_09_1831-b574e9711e35_chat_completion_add_extra_routes.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_09_1831-b574e9711e35_chat_completion_add_extra_routes.py @@ -16,17 +16,24 @@ depends_on = None +def _has_column(table: str, column: str, schema: str) -> bool: + inspector = sa.inspect(op.get_bind()) + return any(col["name"] == column for col in inspector.get_columns(table, schema=schema)) + + def upgrade() -> None: - op.add_column( - "bundles", - sa.Column("runnable_image_extra_routes", ARRAY(sa.Text), nullable=True), - schema="hosted_model_inference", - ) + if not _has_column("bundles", "runnable_image_extra_routes", "hosted_model_inference"): + op.add_column( + "bundles", + sa.Column("runnable_image_extra_routes", ARRAY(sa.Text), nullable=True), + schema="hosted_model_inference", + ) def downgrade(): - op.drop_column( - "bundles", - "runnable_image_extra_routes", - schema="hosted_model_inference", - ) + if _has_column("bundles", "runnable_image_extra_routes", "hosted_model_inference"): + op.drop_column( + "bundles", + "runnable_image_extra_routes", + schema="hosted_model_inference", + ) diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_24_1456-f55525c81eb5_multinode_bundle.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_24_1456-f55525c81eb5_multinode_bundle.py index 532b0e386..010e53050 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_24_1456-f55525c81eb5_multinode_bundle.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_24_1456-f55525c81eb5_multinode_bundle.py @@ -16,27 +16,36 @@ depends_on = None +def _has_column(table: str, column: str, schema: str) -> bool: + inspector = sa.inspect(op.get_bind()) + return any(col["name"] == column for col in inspector.get_columns(table, schema=schema)) + + def upgrade() -> None: - op.add_column( - "bundles", - sa.Column("runnable_image_worker_command", ARRAY(sa.Text), nullable=True), - schema="hosted_model_inference", - ) - op.add_column( - "bundles", - sa.Column("runnable_image_worker_env", sa.JSON, nullable=True), - schema="hosted_model_inference", - ) + if not _has_column("bundles", "runnable_image_worker_command", "hosted_model_inference"): + op.add_column( + "bundles", + sa.Column("runnable_image_worker_command", ARRAY(sa.Text), nullable=True), + schema="hosted_model_inference", + ) + if not _has_column("bundles", "runnable_image_worker_env", "hosted_model_inference"): + op.add_column( + "bundles", + sa.Column("runnable_image_worker_env", sa.JSON, nullable=True), + schema="hosted_model_inference", + ) def downgrade() -> None: - op.drop_column( - "bundles", - "runnable_image_worker_command", - schema="hosted_model_inference", - ) - op.drop_column( - "bundles", - "runnable_image_worker_env", - schema="hosted_model_inference", - ) + if _has_column("bundles", "runnable_image_worker_command", "hosted_model_inference"): + op.drop_column( + "bundles", + "runnable_image_worker_command", + schema="hosted_model_inference", + ) + if _has_column("bundles", "runnable_image_worker_env", "hosted_model_inference"): + op.drop_column( + "bundles", + "runnable_image_worker_env", + schema="hosted_model_inference", + ) diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_16_1741-e580182d6bfd_add_passthrough_forwarder.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_16_1741-e580182d6bfd_add_passthrough_forwarder.py index a40d2266a..eacdc9c24 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_16_1741-e580182d6bfd_add_passthrough_forwarder.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_16_1741-e580182d6bfd_add_passthrough_forwarder.py @@ -15,17 +15,24 @@ depends_on = None +def _has_column(table: str, column: str, schema: str) -> bool: + inspector = sa.inspect(op.get_bind()) + return any(col["name"] == column for col in inspector.get_columns(table, schema=schema)) + + def upgrade() -> None: - op.add_column( - "bundles", - sa.Column("runnable_image_forwarder_type", sa.String(), nullable=True), - schema="hosted_model_inference", - ) + if not _has_column("bundles", "runnable_image_forwarder_type", "hosted_model_inference"): + op.add_column( + "bundles", + sa.Column("runnable_image_forwarder_type", sa.String(), nullable=True), + schema="hosted_model_inference", + ) def downgrade() -> None: - op.drop_column( - "bundles", - "runnable_image_forwarder_type", - schema="hosted_model_inference", - ) \ No newline at end of file + if _has_column("bundles", "runnable_image_forwarder_type", "hosted_model_inference"): + op.drop_column( + "bundles", + "runnable_image_forwarder_type", + schema="hosted_model_inference", + ) \ No newline at end of file diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py index 50de5f543..10f86c515 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py @@ -15,17 +15,24 @@ depends_on = None +def _has_column(table: str, column: str, schema: str) -> bool: + inspector = sa.inspect(op.get_bind()) + return any(col['name'] == column for col in inspector.get_columns(table, schema=schema)) + + def upgrade() -> None: - op.add_column( - 'bundles', - sa.Column('runnable_image_routes', sa.ARRAY(sa.Text), nullable=True), - schema='hosted_model_inference', - ) + if not _has_column('bundles', 'runnable_image_routes', 'hosted_model_inference'): + op.add_column( + 'bundles', + sa.Column('runnable_image_routes', sa.ARRAY(sa.Text), nullable=True), + schema='hosted_model_inference', + ) def downgrade() -> None: - op.drop_column( - 'bundles', - 'runnable_image_routes', - schema='hosted_model_inference', - ) + if _has_column('bundles', 'runnable_image_routes', 'hosted_model_inference'): + op.drop_column( + 'bundles', + 'runnable_image_routes', + schema='hosted_model_inference', + ) diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2026_02_10_1920-62da4f8b3403_add_task_expires_seconds_column.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_02_10_1920-62da4f8b3403_add_task_expires_seconds_column.py index 8991ff590..65e4384e1 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2026_02_10_1920-62da4f8b3403_add_task_expires_seconds_column.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_02_10_1920-62da4f8b3403_add_task_expires_seconds_column.py @@ -15,17 +15,24 @@ depends_on = None +def _has_column(table: str, column: str, schema: str) -> bool: + inspector = sa.inspect(op.get_bind()) + return any(col['name'] == column for col in inspector.get_columns(table, schema=schema)) + + def upgrade() -> None: - op.add_column( - 'endpoints', - sa.Column('task_expires_seconds', sa.Integer, nullable=True), - schema='hosted_model_inference', - ) + if not _has_column('endpoints', 'task_expires_seconds', 'hosted_model_inference'): + op.add_column( + 'endpoints', + sa.Column('task_expires_seconds', sa.Integer, nullable=True), + schema='hosted_model_inference', + ) def downgrade() -> None: - op.drop_column( - 'endpoints', - 'task_expires_seconds', - schema='hosted_model_inference', - ) + if _has_column('endpoints', 'task_expires_seconds', 'hosted_model_inference'): + op.drop_column( + 'endpoints', + 'task_expires_seconds', + schema='hosted_model_inference', + ) diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2026_02_20_1200-a1b2c3d4e5f6_add_queue_message_timeout_seconds_column.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_02_20_1200-a1b2c3d4e5f6_add_queue_message_timeout_seconds_column.py index d43a8eab4..6ca8a2e2d 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2026_02_20_1200-a1b2c3d4e5f6_add_queue_message_timeout_seconds_column.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_02_20_1200-a1b2c3d4e5f6_add_queue_message_timeout_seconds_column.py @@ -15,17 +15,24 @@ depends_on = None +def _has_column(table: str, column: str, schema: str) -> bool: + inspector = sa.inspect(op.get_bind()) + return any(col['name'] == column for col in inspector.get_columns(table, schema=schema)) + + def upgrade() -> None: - op.add_column( - 'endpoints', - sa.Column('queue_message_timeout_seconds', sa.Integer, nullable=True), - schema='hosted_model_inference', - ) + if not _has_column('endpoints', 'queue_message_timeout_seconds', 'hosted_model_inference'): + op.add_column( + 'endpoints', + sa.Column('queue_message_timeout_seconds', sa.Integer, nullable=True), + schema='hosted_model_inference', + ) def downgrade() -> None: - op.drop_column( - 'endpoints', - 'queue_message_timeout_seconds', - schema='hosted_model_inference', - ) + if _has_column('endpoints', 'queue_message_timeout_seconds', 'hosted_model_inference'): + op.drop_column( + 'endpoints', + 'queue_message_timeout_seconds', + schema='hosted_model_inference', + ) diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2026_06_16_1200-c4d5e6f7a8b9_add_status_reason_column.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_06_16_1200-c4d5e6f7a8b9_add_status_reason_column.py index d332cb4e2..a5d13eef7 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2026_06_16_1200-c4d5e6f7a8b9_add_status_reason_column.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_06_16_1200-c4d5e6f7a8b9_add_status_reason_column.py @@ -15,17 +15,24 @@ depends_on = None +def _has_column(table: str, column: str, schema: str) -> bool: + inspector = sa.inspect(op.get_bind()) + return any(col['name'] == column for col in inspector.get_columns(table, schema=schema)) + + def upgrade() -> None: - op.add_column( - 'endpoints', - sa.Column('status_reason', sa.Text, nullable=True), - schema='hosted_model_inference', - ) + if not _has_column('endpoints', 'status_reason', 'hosted_model_inference'): + op.add_column( + 'endpoints', + sa.Column('status_reason', sa.Text, nullable=True), + schema='hosted_model_inference', + ) def downgrade() -> None: - op.drop_column( - 'endpoints', - 'status_reason', - schema='hosted_model_inference', - ) + if _has_column('endpoints', 'status_reason', 'hosted_model_inference'): + op.drop_column( + 'endpoints', + 'status_reason', + schema='hosted_model_inference', + ) diff --git a/model-engine/model_engine_server/entrypoints/init_database.py b/model-engine/model_engine_server/entrypoints/init_database.py index 14f7ac777..5cf3b12b4 100644 --- a/model-engine/model_engine_server/entrypoints/init_database.py +++ b/model-engine/model_engine_server/entrypoints/init_database.py @@ -1,15 +1,19 @@ # flake8: noqa import os +import subprocess +from pathlib import Path import psycopg2 from model_engine_server.db.base import Base, get_engine_url from model_engine_server.db.models import * from sqlalchemy import create_engine -from sqlalchemy.engine import Engine +from sqlalchemy.engine import Engine, make_url from tenacity import Retrying, stop_after_attempt, wait_exponential SCHEMAS = ["hosted_model_inference", "model"] +MIGRATIONS_DIR = Path(__file__).resolve().parents[1] / "db" / "migrations" + def init_database(database_url: str, psycopg_connection): with psycopg_connection.cursor() as cursor: @@ -36,6 +40,17 @@ def init_database_and_engine(database_url) -> Engine: return engine +def stamp_alembic_head() -> None: + # Mark the database as being at the latest alembic revision, so that a + # database initialized via create_all is not left unstamped (which would + # cause a subsequent `alembic upgrade head` to replay migrations from the + # very beginning). This mirrors run_database_migration.sh, which invokes + # alembic from the migrations directory; env.py resolves the database URL + # itself (from ML_INFRA_DATABASE_URL if set, otherwise from cloud secrets), + # and the subprocess inherits our environment. + subprocess.run(["alembic", "stamp", "head"], cwd=MIGRATIONS_DIR, check=True) + + if __name__ == "__main__": url = os.getenv("ML_INFRA_DATABASE_URL") # If we are at this point, we want to init the db. @@ -50,4 +65,7 @@ def init_database_and_engine(database_url) -> Engine: with attempt: init_database_and_engine(url) - print(f"Successfully initialized database at {url}") + stamp_alembic_head() + + safe_url = make_url(url) + print(f"Successfully initialized database {safe_url.database} at {safe_url.host}") From b4dd29c7ee84b5e023764fe01294cd30402f902c Mon Sep 17 00:00:00 2001 From: Sayak Maity Date: Thu, 2 Jul 2026 15:16:14 -0400 Subject: [PATCH 2/3] fix: allow retries for DB init/migration helm hook jobs Bump backoffLimit 0 -> 2 on both database hook Jobs now that the migrations are idempotent and safe to retry; bump chart to 0.2.8. --- charts/model-engine/Chart.yaml | 2 +- charts/model-engine/templates/database_init_job.yaml | 2 +- charts/model-engine/templates/database_migration_job.yaml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/charts/model-engine/Chart.yaml b/charts/model-engine/Chart.yaml index 0fe620d0b..06c16fca5 100644 --- a/charts/model-engine/Chart.yaml +++ b/charts/model-engine/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.2.7 +version: 0.2.8 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/model-engine/templates/database_init_job.yaml b/charts/model-engine/templates/database_init_job.yaml index 8e26c1763..0ffde5189 100644 --- a/charts/model-engine/templates/database_init_job.yaml +++ b/charts/model-engine/templates/database_init_job.yaml @@ -10,7 +10,7 @@ metadata: "helm.sh/hook-weight": "-2" "helm.sh/hook-delete-policy": before-hook-creation,hook-succeeded spec: - backoffLimit: 0 + backoffLimit: 2 activeDeadlineSeconds: 600 template: metadata: diff --git a/charts/model-engine/templates/database_migration_job.yaml b/charts/model-engine/templates/database_migration_job.yaml index 0e95ce0be..7a8a1fff2 100644 --- a/charts/model-engine/templates/database_migration_job.yaml +++ b/charts/model-engine/templates/database_migration_job.yaml @@ -10,7 +10,7 @@ metadata: "helm.sh/hook-weight": "-1" "helm.sh/hook-delete-policy": before-hook-creation,hook-succeeded spec: - backoffLimit: 0 + backoffLimit: 2 activeDeadlineSeconds: 600 template: metadata: From 7d003ed1137673f537796d328fe6d3693c8e8e18 Mon Sep 17 00:00:00 2001 From: Sayak Maity Date: Thu, 2 Jul 2026 15:29:28 -0400 Subject: [PATCH 3/3] fix: only stamp alembic head when init_database created the schema fresh Unconditionally running `alembic stamp head` in the init entrypoint regressed the normal upgrade path: a database already stamped at the previous head would be fast-forwarded to the new head by the init job (hook weight -2) before the migration job (weight -1) ran, so pending add-column revisions were silently skipped forever and the app failed at runtime with UndefinedColumn. Stamp only when this run initialized the schema from scratch: - skip if public.alembic_version_model_engine already records a revision (migrations manage this DB; let `alembic upgrade head` run) - skip if hosted_model_inference.endpoints existed before create_all (legacy unstamped create_all DB, possibly from an older image missing columns; leave it unstamped so the adopting initial revision and guarded add-column revisions bring it to head) Verified against a throwaway postgres:13: fresh init stamps head; stamped-at-previous-head DB is left alone and the migration job applies the pending revision; at-head re-run no-ops; legacy unstamped schema is adopted by the migration job. --- .../model_engine_server/db/migrations/README | 11 ++- .../entrypoints/init_database.py | 77 +++++++++++++++++-- 2 files changed, 81 insertions(+), 7 deletions(-) diff --git a/model-engine/model_engine_server/db/migrations/README b/model-engine/model_engine_server/db/migrations/README index 204906731..e95c775d0 100644 --- a/model-engine/model_engine_server/db/migrations/README +++ b/model-engine/model_engine_server/db/migrations/README @@ -31,7 +31,16 @@ Migrations are self-adopting and safe to re-run: 2. Subsequent add-column revisions check for column existence before adding/dropping, so they no-op on schemas that already have the columns. 3. The init_database entrypoint stamps `alembic head` after a successful - create_all, so freshly initialized databases are never left unstamped. + create_all, but ONLY when this run created the schema from scratch: + it skips stamping if the database already has an alembic revision + recorded (public.alembic_version_model_engine has a row), or if + hosted_model_inference.endpoints existed before create_all ran. + Stamping an already-stamped database would fast-forward the version + table past unapplied migrations and silently skip them; stamping a + pre-existing unstamped schema would over-claim, since create_all only + creates missing tables and never adds columns to existing ones. In both + cases the migration job's `alembic upgrade head` does the right thing + (applies pending revisions / adopts the schema via the guards above). Manual stamping (stamp_initial_schema.sh) is therefore only needed for databases created before this behavior existed. diff --git a/model-engine/model_engine_server/entrypoints/init_database.py b/model-engine/model_engine_server/entrypoints/init_database.py index 5cf3b12b4..6dbf3236f 100644 --- a/model-engine/model_engine_server/entrypoints/init_database.py +++ b/model-engine/model_engine_server/entrypoints/init_database.py @@ -6,7 +6,7 @@ import psycopg2 from model_engine_server.db.base import Base, get_engine_url from model_engine_server.db.models import * -from sqlalchemy import create_engine +from sqlalchemy import create_engine, inspect, text from sqlalchemy.engine import Engine, make_url from tenacity import Retrying, stop_after_attempt, wait_exponential @@ -14,6 +14,10 @@ MIGRATIONS_DIR = Path(__file__).resolve().parents[1] / "db" / "migrations" +# Must match ALEMBIC_TABLE_NAME / version_table_schema in db/migrations/alembic/env.py. +ALEMBIC_VERSION_TABLE = "alembic_version_model_engine" +ALEMBIC_VERSION_TABLE_SCHEMA = "public" + def init_database(database_url: str, psycopg_connection): with psycopg_connection.cursor() as cursor: @@ -40,14 +44,52 @@ def init_database_and_engine(database_url) -> Engine: return engine +def schema_already_initialized(database_url: str) -> bool: + # Return True if hosted_model_inference.endpoints already exists, i.e. the + # schema was initialized by a previous run (possibly of an older image whose + # models lacked columns that current migrations would add). In that case + # this run's create_all only created missing tables, so we must not claim + # the database is at head. + engine = create_engine(database_url, echo=False, future=True) + try: + with engine.connect() as connection: + return inspect(connection).has_table("endpoints", schema="hosted_model_inference") + finally: + engine.dispose() + + +def alembic_is_stamped(database_url: str) -> bool: + # Return True if the alembic version table already records a revision. + # In that case we must NOT `alembic stamp head`: stamping moves the version + # table without running migrations, so a database stamped at an older + # revision would silently skip every pending (and future) migration. + engine = create_engine(database_url, echo=False, future=True) + try: + with engine.connect() as connection: + if not inspect(connection).has_table( + ALEMBIC_VERSION_TABLE, schema=ALEMBIC_VERSION_TABLE_SCHEMA + ): + return False + row = connection.execute( + text( + f"SELECT version_num FROM " # nosec: identifiers are constants + f"{ALEMBIC_VERSION_TABLE_SCHEMA}.{ALEMBIC_VERSION_TABLE} LIMIT 1" + ) + ).first() + return row is not None + finally: + engine.dispose() + + def stamp_alembic_head() -> None: # Mark the database as being at the latest alembic revision, so that a # database initialized via create_all is not left unstamped (which would # cause a subsequent `alembic upgrade head` to replay migrations from the - # very beginning). This mirrors run_database_migration.sh, which invokes - # alembic from the migrations directory; env.py resolves the database URL - # itself (from ML_INFRA_DATABASE_URL if set, otherwise from cloud secrets), - # and the subprocess inherits our environment. + # very beginning). Only call this when the database has no alembic revision + # yet (see alembic_is_stamped). This mirrors run_database_migration.sh, + # which invokes alembic from the migrations directory; env.py resolves the + # database URL itself (from ML_INFRA_DATABASE_URL if set, otherwise from + # cloud secrets), and the subprocess inherits our environment. subprocess.run(["alembic", "stamp", "head"], cwd=MIGRATIONS_DIR, check=True) @@ -57,15 +99,38 @@ def stamp_alembic_head() -> None: if url is None: print("No k8s secret for DB url found, trying AWS secret") url = get_engine_url(read_only=False, sync=True).url + schema_pre_existed = None for attempt in Retrying( stop=stop_after_attempt(6), wait=wait_exponential(), reraise=True, ): with attempt: + # Record (once, on the first attempt that can connect) whether the + # schema existed before this run's create_all, so we only stamp + # databases we actually initialized from scratch. + if schema_pre_existed is None: + schema_pre_existed = schema_already_initialized(url) init_database_and_engine(url) - stamp_alembic_head() + if alembic_is_stamped(url): + # An existing revision means migrations manage this database already. + # Stamping here would fast-forward the version table past unapplied + # migrations (e.g. new add-column revisions shipped in this image), + # so leave it alone and let the migration job run `alembic upgrade head`. + print("Database already has an alembic revision; skipping `alembic stamp head`.") + elif schema_pre_existed: + # Unstamped, but the schema predates this run (likely create_all from an + # older image): its tables may be missing columns that migrations would + # add, and create_all never alters existing tables. Leave it unstamped so + # the migration job's initial revision adopts it and the guarded + # add-column revisions bring it to head. + print( + "Schema already existed but has no alembic revision; skipping " + "`alembic stamp head` so migrations can adopt it." + ) + else: + stamp_alembic_head() safe_url = make_url(url) print(f"Successfully initialized database {safe_url.database} at {safe_url.host}")