diff --git a/charts/model-engine/Chart.yaml b/charts/model-engine/Chart.yaml index 0fe620d0..06c16fca 100644 --- a/charts/model-engine/Chart.yaml +++ b/charts/model-engine/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.2.7 +version: 0.2.8 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/charts/model-engine/templates/database_init_job.yaml b/charts/model-engine/templates/database_init_job.yaml index 8e26c176..0ffde518 100644 --- a/charts/model-engine/templates/database_init_job.yaml +++ b/charts/model-engine/templates/database_init_job.yaml @@ -10,7 +10,7 @@ metadata: "helm.sh/hook-weight": "-2" "helm.sh/hook-delete-policy": before-hook-creation,hook-succeeded spec: - backoffLimit: 0 + backoffLimit: 2 activeDeadlineSeconds: 600 template: metadata: diff --git a/charts/model-engine/templates/database_migration_job.yaml b/charts/model-engine/templates/database_migration_job.yaml index 0e95ce0b..7a8a1fff 100644 --- a/charts/model-engine/templates/database_migration_job.yaml +++ b/charts/model-engine/templates/database_migration_job.yaml @@ -10,7 +10,7 @@ metadata: "helm.sh/hook-weight": "-1" "helm.sh/hook-delete-policy": before-hook-creation,hook-succeeded spec: - backoffLimit: 0 + backoffLimit: 2 activeDeadlineSeconds: 600 template: metadata: diff --git a/model-engine/model_engine_server/db/migrations/README b/model-engine/model_engine_server/db/migrations/README index 7863d56d..e95c775d 100644 --- a/model-engine/model_engine_server/db/migrations/README +++ b/model-engine/model_engine_server/db/migrations/README @@ -19,6 +19,32 @@ alembic revision -m “initial” alembic stamp fa3267c80731 ``` +# Idempotency and adoption of pre-alembic databases + +Migrations are self-adopting and safe to re-run: + +1. The initial revision (fa3267c80731) checks whether + hosted_model_inference.endpoints already exists. If it does (e.g. the + database was initialized via the init_database entrypoint's create_all + before alembic was ever run), it skips initial.sql and just records the + revision instead of failing on CREATE SCHEMA. +2. Subsequent add-column revisions check for column existence before + adding/dropping, so they no-op on schemas that already have the columns. +3. The init_database entrypoint stamps `alembic head` after a successful + create_all, but ONLY when this run created the schema from scratch: + it skips stamping if the database already has an alembic revision + recorded (public.alembic_version_model_engine has a row), or if + hosted_model_inference.endpoints existed before create_all ran. + Stamping an already-stamped database would fast-forward the version + table past unapplied migrations and silently skip them; stamping a + pre-existing unstamped schema would over-claim, since create_all only + creates missing tables and never adds columns to existing ones. In both + cases the migration job's `alembic upgrade head` does the right thing + (applies pending revisions / adopts the schema via the guards above). + +Manual stamping (stamp_initial_schema.sh) is therefore only needed for +databases created before this behavior existed. + # Steps to make generic database schema changes Steps can be found here: https://alembic.sqlalchemy.org/en/latest/tutorial.html#running-our-second-migration diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_09_1736-fa3267c80731_initial.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_09_1736-fa3267c80731_initial.py index efee8963..0026c19f 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_09_1736-fa3267c80731_initial.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_09_1736-fa3267c80731_initial.py @@ -22,6 +22,17 @@ def upgrade() -> None: + # If the schema already exists (e.g. the database was initialized by + # init_database's create_all before alembic was stamped), adopt the + # existing schema instead of replaying initial.sql, whose bare + # CREATE SCHEMA statements would fail with DuplicateSchema. + inspector = sa.inspect(op.get_bind()) + if inspector.has_table("endpoints", schema="hosted_model_inference"): + print( + "Table hosted_model_inference.endpoints already exists; " + "adopting existing schema and skipping initial.sql." + ) + return with open(INITIAL_MIGRATION_PATH) as fd: op.execute(fd.read()) diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_09_1831-b574e9711e35_chat_completion_add_extra_routes.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_09_1831-b574e9711e35_chat_completion_add_extra_routes.py index 43279e0f..24bf5701 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_09_1831-b574e9711e35_chat_completion_add_extra_routes.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_09_1831-b574e9711e35_chat_completion_add_extra_routes.py @@ -16,17 +16,24 @@ depends_on = None +def _has_column(table: str, column: str, schema: str) -> bool: + inspector = sa.inspect(op.get_bind()) + return any(col["name"] == column for col in inspector.get_columns(table, schema=schema)) + + def upgrade() -> None: - op.add_column( - "bundles", - sa.Column("runnable_image_extra_routes", ARRAY(sa.Text), nullable=True), - schema="hosted_model_inference", - ) + if not _has_column("bundles", "runnable_image_extra_routes", "hosted_model_inference"): + op.add_column( + "bundles", + sa.Column("runnable_image_extra_routes", ARRAY(sa.Text), nullable=True), + schema="hosted_model_inference", + ) def downgrade(): - op.drop_column( - "bundles", - "runnable_image_extra_routes", - schema="hosted_model_inference", - ) + if _has_column("bundles", "runnable_image_extra_routes", "hosted_model_inference"): + op.drop_column( + "bundles", + "runnable_image_extra_routes", + schema="hosted_model_inference", + ) diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_24_1456-f55525c81eb5_multinode_bundle.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_24_1456-f55525c81eb5_multinode_bundle.py index 532b0e38..010e5305 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_24_1456-f55525c81eb5_multinode_bundle.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2024_09_24_1456-f55525c81eb5_multinode_bundle.py @@ -16,27 +16,36 @@ depends_on = None +def _has_column(table: str, column: str, schema: str) -> bool: + inspector = sa.inspect(op.get_bind()) + return any(col["name"] == column for col in inspector.get_columns(table, schema=schema)) + + def upgrade() -> None: - op.add_column( - "bundles", - sa.Column("runnable_image_worker_command", ARRAY(sa.Text), nullable=True), - schema="hosted_model_inference", - ) - op.add_column( - "bundles", - sa.Column("runnable_image_worker_env", sa.JSON, nullable=True), - schema="hosted_model_inference", - ) + if not _has_column("bundles", "runnable_image_worker_command", "hosted_model_inference"): + op.add_column( + "bundles", + sa.Column("runnable_image_worker_command", ARRAY(sa.Text), nullable=True), + schema="hosted_model_inference", + ) + if not _has_column("bundles", "runnable_image_worker_env", "hosted_model_inference"): + op.add_column( + "bundles", + sa.Column("runnable_image_worker_env", sa.JSON, nullable=True), + schema="hosted_model_inference", + ) def downgrade() -> None: - op.drop_column( - "bundles", - "runnable_image_worker_command", - schema="hosted_model_inference", - ) - op.drop_column( - "bundles", - "runnable_image_worker_env", - schema="hosted_model_inference", - ) + if _has_column("bundles", "runnable_image_worker_command", "hosted_model_inference"): + op.drop_column( + "bundles", + "runnable_image_worker_command", + schema="hosted_model_inference", + ) + if _has_column("bundles", "runnable_image_worker_env", "hosted_model_inference"): + op.drop_column( + "bundles", + "runnable_image_worker_env", + schema="hosted_model_inference", + ) diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_16_1741-e580182d6bfd_add_passthrough_forwarder.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_16_1741-e580182d6bfd_add_passthrough_forwarder.py index a40d2266..eacdc9c2 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_16_1741-e580182d6bfd_add_passthrough_forwarder.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_16_1741-e580182d6bfd_add_passthrough_forwarder.py @@ -15,17 +15,24 @@ depends_on = None +def _has_column(table: str, column: str, schema: str) -> bool: + inspector = sa.inspect(op.get_bind()) + return any(col["name"] == column for col in inspector.get_columns(table, schema=schema)) + + def upgrade() -> None: - op.add_column( - "bundles", - sa.Column("runnable_image_forwarder_type", sa.String(), nullable=True), - schema="hosted_model_inference", - ) + if not _has_column("bundles", "runnable_image_forwarder_type", "hosted_model_inference"): + op.add_column( + "bundles", + sa.Column("runnable_image_forwarder_type", sa.String(), nullable=True), + schema="hosted_model_inference", + ) def downgrade() -> None: - op.drop_column( - "bundles", - "runnable_image_forwarder_type", - schema="hosted_model_inference", - ) \ No newline at end of file + if _has_column("bundles", "runnable_image_forwarder_type", "hosted_model_inference"): + op.drop_column( + "bundles", + "runnable_image_forwarder_type", + schema="hosted_model_inference", + ) \ No newline at end of file diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py index 50de5f54..10f86c51 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2025_09_25_1940-221aa19d3f32_add_routes_column.py @@ -15,17 +15,24 @@ depends_on = None +def _has_column(table: str, column: str, schema: str) -> bool: + inspector = sa.inspect(op.get_bind()) + return any(col['name'] == column for col in inspector.get_columns(table, schema=schema)) + + def upgrade() -> None: - op.add_column( - 'bundles', - sa.Column('runnable_image_routes', sa.ARRAY(sa.Text), nullable=True), - schema='hosted_model_inference', - ) + if not _has_column('bundles', 'runnable_image_routes', 'hosted_model_inference'): + op.add_column( + 'bundles', + sa.Column('runnable_image_routes', sa.ARRAY(sa.Text), nullable=True), + schema='hosted_model_inference', + ) def downgrade() -> None: - op.drop_column( - 'bundles', - 'runnable_image_routes', - schema='hosted_model_inference', - ) + if _has_column('bundles', 'runnable_image_routes', 'hosted_model_inference'): + op.drop_column( + 'bundles', + 'runnable_image_routes', + schema='hosted_model_inference', + ) diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2026_02_10_1920-62da4f8b3403_add_task_expires_seconds_column.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_02_10_1920-62da4f8b3403_add_task_expires_seconds_column.py index 8991ff59..65e4384e 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2026_02_10_1920-62da4f8b3403_add_task_expires_seconds_column.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_02_10_1920-62da4f8b3403_add_task_expires_seconds_column.py @@ -15,17 +15,24 @@ depends_on = None +def _has_column(table: str, column: str, schema: str) -> bool: + inspector = sa.inspect(op.get_bind()) + return any(col['name'] == column for col in inspector.get_columns(table, schema=schema)) + + def upgrade() -> None: - op.add_column( - 'endpoints', - sa.Column('task_expires_seconds', sa.Integer, nullable=True), - schema='hosted_model_inference', - ) + if not _has_column('endpoints', 'task_expires_seconds', 'hosted_model_inference'): + op.add_column( + 'endpoints', + sa.Column('task_expires_seconds', sa.Integer, nullable=True), + schema='hosted_model_inference', + ) def downgrade() -> None: - op.drop_column( - 'endpoints', - 'task_expires_seconds', - schema='hosted_model_inference', - ) + if _has_column('endpoints', 'task_expires_seconds', 'hosted_model_inference'): + op.drop_column( + 'endpoints', + 'task_expires_seconds', + schema='hosted_model_inference', + ) diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2026_02_20_1200-a1b2c3d4e5f6_add_queue_message_timeout_seconds_column.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_02_20_1200-a1b2c3d4e5f6_add_queue_message_timeout_seconds_column.py index d43a8eab..6ca8a2e2 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2026_02_20_1200-a1b2c3d4e5f6_add_queue_message_timeout_seconds_column.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_02_20_1200-a1b2c3d4e5f6_add_queue_message_timeout_seconds_column.py @@ -15,17 +15,24 @@ depends_on = None +def _has_column(table: str, column: str, schema: str) -> bool: + inspector = sa.inspect(op.get_bind()) + return any(col['name'] == column for col in inspector.get_columns(table, schema=schema)) + + def upgrade() -> None: - op.add_column( - 'endpoints', - sa.Column('queue_message_timeout_seconds', sa.Integer, nullable=True), - schema='hosted_model_inference', - ) + if not _has_column('endpoints', 'queue_message_timeout_seconds', 'hosted_model_inference'): + op.add_column( + 'endpoints', + sa.Column('queue_message_timeout_seconds', sa.Integer, nullable=True), + schema='hosted_model_inference', + ) def downgrade() -> None: - op.drop_column( - 'endpoints', - 'queue_message_timeout_seconds', - schema='hosted_model_inference', - ) + if _has_column('endpoints', 'queue_message_timeout_seconds', 'hosted_model_inference'): + op.drop_column( + 'endpoints', + 'queue_message_timeout_seconds', + schema='hosted_model_inference', + ) diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2026_06_16_1200-c4d5e6f7a8b9_add_status_reason_column.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_06_16_1200-c4d5e6f7a8b9_add_status_reason_column.py index d332cb4e..a5d13eef 100644 --- a/model-engine/model_engine_server/db/migrations/alembic/versions/2026_06_16_1200-c4d5e6f7a8b9_add_status_reason_column.py +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_06_16_1200-c4d5e6f7a8b9_add_status_reason_column.py @@ -15,17 +15,24 @@ depends_on = None +def _has_column(table: str, column: str, schema: str) -> bool: + inspector = sa.inspect(op.get_bind()) + return any(col['name'] == column for col in inspector.get_columns(table, schema=schema)) + + def upgrade() -> None: - op.add_column( - 'endpoints', - sa.Column('status_reason', sa.Text, nullable=True), - schema='hosted_model_inference', - ) + if not _has_column('endpoints', 'status_reason', 'hosted_model_inference'): + op.add_column( + 'endpoints', + sa.Column('status_reason', sa.Text, nullable=True), + schema='hosted_model_inference', + ) def downgrade() -> None: - op.drop_column( - 'endpoints', - 'status_reason', - schema='hosted_model_inference', - ) + if _has_column('endpoints', 'status_reason', 'hosted_model_inference'): + op.drop_column( + 'endpoints', + 'status_reason', + schema='hosted_model_inference', + ) diff --git a/model-engine/model_engine_server/entrypoints/init_database.py b/model-engine/model_engine_server/entrypoints/init_database.py index 14f7ac77..6dbf3236 100644 --- a/model-engine/model_engine_server/entrypoints/init_database.py +++ b/model-engine/model_engine_server/entrypoints/init_database.py @@ -1,15 +1,23 @@ # flake8: noqa import os +import subprocess +from pathlib import Path import psycopg2 from model_engine_server.db.base import Base, get_engine_url from model_engine_server.db.models import * -from sqlalchemy import create_engine -from sqlalchemy.engine import Engine +from sqlalchemy import create_engine, inspect, text +from sqlalchemy.engine import Engine, make_url from tenacity import Retrying, stop_after_attempt, wait_exponential SCHEMAS = ["hosted_model_inference", "model"] +MIGRATIONS_DIR = Path(__file__).resolve().parents[1] / "db" / "migrations" + +# Must match ALEMBIC_TABLE_NAME / version_table_schema in db/migrations/alembic/env.py. +ALEMBIC_VERSION_TABLE = "alembic_version_model_engine" +ALEMBIC_VERSION_TABLE_SCHEMA = "public" + def init_database(database_url: str, psycopg_connection): with psycopg_connection.cursor() as cursor: @@ -36,18 +44,93 @@ def init_database_and_engine(database_url) -> Engine: return engine +def schema_already_initialized(database_url: str) -> bool: + # Return True if hosted_model_inference.endpoints already exists, i.e. the + # schema was initialized by a previous run (possibly of an older image whose + # models lacked columns that current migrations would add). In that case + # this run's create_all only created missing tables, so we must not claim + # the database is at head. + engine = create_engine(database_url, echo=False, future=True) + try: + with engine.connect() as connection: + return inspect(connection).has_table("endpoints", schema="hosted_model_inference") + finally: + engine.dispose() + + +def alembic_is_stamped(database_url: str) -> bool: + # Return True if the alembic version table already records a revision. + # In that case we must NOT `alembic stamp head`: stamping moves the version + # table without running migrations, so a database stamped at an older + # revision would silently skip every pending (and future) migration. + engine = create_engine(database_url, echo=False, future=True) + try: + with engine.connect() as connection: + if not inspect(connection).has_table( + ALEMBIC_VERSION_TABLE, schema=ALEMBIC_VERSION_TABLE_SCHEMA + ): + return False + row = connection.execute( + text( + f"SELECT version_num FROM " # nosec: identifiers are constants + f"{ALEMBIC_VERSION_TABLE_SCHEMA}.{ALEMBIC_VERSION_TABLE} LIMIT 1" + ) + ).first() + return row is not None + finally: + engine.dispose() + + +def stamp_alembic_head() -> None: + # Mark the database as being at the latest alembic revision, so that a + # database initialized via create_all is not left unstamped (which would + # cause a subsequent `alembic upgrade head` to replay migrations from the + # very beginning). Only call this when the database has no alembic revision + # yet (see alembic_is_stamped). This mirrors run_database_migration.sh, + # which invokes alembic from the migrations directory; env.py resolves the + # database URL itself (from ML_INFRA_DATABASE_URL if set, otherwise from + # cloud secrets), and the subprocess inherits our environment. + subprocess.run(["alembic", "stamp", "head"], cwd=MIGRATIONS_DIR, check=True) + + if __name__ == "__main__": url = os.getenv("ML_INFRA_DATABASE_URL") # If we are at this point, we want to init the db. if url is None: print("No k8s secret for DB url found, trying AWS secret") url = get_engine_url(read_only=False, sync=True).url + schema_pre_existed = None for attempt in Retrying( stop=stop_after_attempt(6), wait=wait_exponential(), reraise=True, ): with attempt: + # Record (once, on the first attempt that can connect) whether the + # schema existed before this run's create_all, so we only stamp + # databases we actually initialized from scratch. + if schema_pre_existed is None: + schema_pre_existed = schema_already_initialized(url) init_database_and_engine(url) - print(f"Successfully initialized database at {url}") + if alembic_is_stamped(url): + # An existing revision means migrations manage this database already. + # Stamping here would fast-forward the version table past unapplied + # migrations (e.g. new add-column revisions shipped in this image), + # so leave it alone and let the migration job run `alembic upgrade head`. + print("Database already has an alembic revision; skipping `alembic stamp head`.") + elif schema_pre_existed: + # Unstamped, but the schema predates this run (likely create_all from an + # older image): its tables may be missing columns that migrations would + # add, and create_all never alters existing tables. Leave it unstamped so + # the migration job's initial revision adopts it and the guarded + # add-column revisions bring it to head. + print( + "Schema already existed but has no alembic revision; skipping " + "`alembic stamp head` so migrations can adopt it." + ) + else: + stamp_alembic_head() + + safe_url = make_url(url) + print(f"Successfully initialized database {safe_url.database} at {safe_url.host}")