diff --git a/.kokoro/build.sh b/.kokoro/build.sh index 41f29c8dc5..d87b771e56 100755 --- a/.kokoro/build.sh +++ b/.kokoro/build.sh @@ -43,15 +43,17 @@ export DOCKER_API_VERSION=1.39 python3 -m pip install --upgrade --quiet uv nox python3 -m nox --version -# If this is a continuous build, send the test log to the FlakyBot. +# Clean up the heavy .nox/ environment directory before artifact collection. +# If this is a continuous build, also send the test log to the FlakyBot. # See https://github.com/googleapis/repo-automation-bots/tree/main/packages/flakybot. -if [[ $KOKORO_BUILD_ARTIFACTS_SUBDIR = *"continuous"* ]]; then - cleanup() { +cleanup() { + rm -rf .nox + if [[ $KOKORO_BUILD_ARTIFACTS_SUBDIR = *"continuous"* ]]; then chmod +x $KOKORO_GFILE_DIR/linux_amd64/flakybot $KOKORO_GFILE_DIR/linux_amd64/flakybot - } - trap cleanup EXIT HUP -fi + fi +} +trap cleanup EXIT HUP # If NOX_SESSION is set, it only runs the specified session, # otherwise run all the sessions. diff --git a/noxfile.py b/noxfile.py index c6b7d84b1c..f9e853be4d 100644 --- a/noxfile.py +++ b/noxfile.py @@ -219,6 +219,13 @@ def default(session): "-n", "auto", # Use all available CPU cores "--quiet", + "--durations=50", + # Suppress redundant deprecation warnings to reduce JUnit XML size. + "-W", + "ignore:You are using a Python version (3.10.19):FutureWarning", + # Disable capturing stdout/stderr for passed tests to reduce XML bloat. + "-o", + "junit_log_passing_tests=False", f"--junitxml=unit_{session.python}_sponge_log.xml", "--ignore=tests/unit/vertex_ray", "--ignore=tests/unit/vertex_adk", diff --git a/tests/unit/aiplatform/test_autologging.py b/tests/unit/aiplatform/test_autologging.py index 71a5c06190..da91b5591f 100644 --- a/tests/unit/aiplatform/test_autologging.py +++ b/tests/unit/aiplatform/test_autologging.py @@ -144,7 +144,7 @@ _TEST_TF_EXPERIMENT_RUN_PARAMS = { "batch_size": "None", "class_weight": "None", - "epochs": "5", + "epochs": "1", "initial_epoch": "0", "max_queue_size": "10", "sample_weight": "None", @@ -589,14 +589,6 @@ def build_and_train_test_tf_model(): [1, 2], [2, 2], [2, 3], - [1, 1], - [1, 2], - [2, 2], - [2, 3], - [1, 1], - [1, 2], - [2, 2], - [2, 3], ] ) y = np.dot(X, np.array([1, 2])) + 3 @@ -605,7 +597,6 @@ def build_and_train_test_tf_model(): [ tf.keras.layers.Flatten(input_shape=(2,)), tf.keras.layers.Dense(128, activation="relu"), - tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(1), ] ) @@ -616,7 +607,7 @@ def build_and_train_test_tf_model(): metrics=["accuracy"], ) - model.fit(X, y, epochs=5) + model.fit(X, y, epochs=1) @pytest.mark.usefixtures("google_auth_mock") diff --git a/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py b/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py index c08c70381a..38d715b72f 100644 --- a/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py +++ b/tests/unit/aiplatform/test_automl_forecasting_training_jobs.py @@ -303,12 +303,18 @@ class TestForecastingTrainingJob: def setup_method(self): importlib.reload(initializer) importlib.reload(aiplatform) + self._job_wait_patcher = mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + self._log_wait_patcher = mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) + self._job_wait_patcher.start() + self._log_wait_patcher.start() def teardown_method(self): + self._job_wait_patcher.stop() + self._log_wait_patcher.stop() initializer.global_pool.shutdown(wait=True) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_run_call_pipeline_service_create( @@ -409,8 +415,8 @@ def test_run_call_pipeline_service_create( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_run_call_pipeline_service_create_with_timeout( @@ -497,8 +503,8 @@ def test_run_call_pipeline_service_create_with_timeout( timeout=180.0, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures("mock_pipeline_service_get") @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) @@ -579,8 +585,8 @@ def test_run_call_pipeline_if_no_model_display_name_nor_model_labels( timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures("mock_pipeline_service_get") @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) @@ -660,8 +666,8 @@ def test_run_call_pipeline_if_set_additional_experiments( timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", @@ -746,8 +752,8 @@ def test_run_called_twice_raises( holiday_regions=_TEST_TRAINING_HOLIDAY_REGIONS, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_run_raises_if_pipeline_fails( @@ -827,8 +833,8 @@ def test_raises_before_run_is_called( with pytest.raises(RuntimeError): job.state - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_splits_fraction( @@ -926,8 +932,8 @@ def test_splits_fraction( timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_splits_timestamp( @@ -1027,8 +1033,8 @@ def test_splits_timestamp( timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_splits_predefined( @@ -1122,8 +1128,8 @@ def test_splits_predefined( timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) def test_splits_default( @@ -1211,8 +1217,8 @@ def test_splits_default( timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures("mock_pipeline_service_get") @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) @@ -1294,8 +1300,8 @@ def test_run_call_pipeline_if_set_additional_experiments_probabilistic_inference timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures("mock_pipeline_service_get") @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.parametrize("training_job", _FORECASTING_JOB_MODEL_TYPES) diff --git a/tests/unit/aiplatform/test_base.py b/tests/unit/aiplatform/test_base.py index e0b9b52464..be3b50a1e9 100644 --- a/tests/unit/aiplatform/test_base.py +++ b/tests/unit/aiplatform/test_base.py @@ -46,12 +46,12 @@ def _sync_object_with_future_result(self, result): @classmethod @base.optional_sync() def create(cls, x: int, sync=True) -> "_TestClass": - time.sleep(1) + time.sleep(0.05) return cls(x) @base.optional_sync() def add(self, a: "_TestClass", sync=True) -> None: - time.sleep(1) + time.sleep(0.05) return self._add(a=a, sync=sync) def _add(self, a: "_TestClass", sync=True) -> None: @@ -63,14 +63,14 @@ class _TestClassDownStream(_TestClass): def add_and_create_new( self, a: Optional["_TestClass"] = None, sync=True ) -> _TestClass: - time.sleep(1) + time.sleep(0.05) if a: return _TestClass(self.x + a.x) return None @base.optional_sync(return_input_arg="a", bind_future_to_self=False) def add_to_input_arg(self, a: "_TestClass", sync=True) -> _TestClass: - time.sleep(1) + time.sleep(0.05) a._add(self) return a diff --git a/tests/unit/aiplatform/test_batch_prediction_job_preview.py b/tests/unit/aiplatform/test_batch_prediction_job_preview.py index 9e8c28902f..5fb19b299d 100644 --- a/tests/unit/aiplatform/test_batch_prediction_job_preview.py +++ b/tests/unit/aiplatform/test_batch_prediction_job_preview.py @@ -123,10 +123,23 @@ def create_batch_prediction_job_mock(): class TestBatchPredictionJobPreview: def setup_method(self): + from google.cloud.aiplatform import jobs reload(initializer) reload(aiplatform) + self._preview_job_wait_patcher = mock.patch.object(preview_jobs, "_JOB_WAIT_TIME", 0.05) + self._preview_log_wait_patcher = mock.patch.object(preview_jobs, "_LOG_WAIT_TIME", 0.05) + self._job_wait_patcher = mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + self._log_wait_patcher = mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) + self._preview_job_wait_patcher.start() + self._preview_log_wait_patcher.start() + self._job_wait_patcher.start() + self._log_wait_patcher.start() def teardown_method(self): + self._preview_job_wait_patcher.stop() + self._preview_log_wait_patcher.stop() + self._job_wait_patcher.stop() + self._log_wait_patcher.stop() initializer.global_pool.shutdown(wait=True) def test_init_batch_prediction_job(self, get_batch_prediction_job_mock): @@ -160,8 +173,8 @@ def test_batch_prediction_job_done_get(self, get_batch_prediction_job_mock): assert bp.done() is False assert get_batch_prediction_job_mock.call_count == 2 - @mock.patch.object(preview_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(preview_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(preview_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(preview_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.usefixtures("get_batch_prediction_job_mock") def test_batch_predict_create_with_reservation( @@ -228,8 +241,8 @@ def test_batch_predict_create_with_reservation( timeout=None, ) - @mock.patch.object(preview_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(preview_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(preview_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(preview_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures("get_batch_prediction_job_mock") def test_batch_predict_job_submit(self, create_batch_prediction_job_mock): aiplatform.init(project=_TEST_PROJECT, location=_TEST_LOCATION) diff --git a/tests/unit/aiplatform/test_custom_job.py b/tests/unit/aiplatform/test_custom_job.py index 46028a562a..5cf2758498 100644 --- a/tests/unit/aiplatform/test_custom_job.py +++ b/tests/unit/aiplatform/test_custom_job.py @@ -633,8 +633,14 @@ class TestCustomJob: def setup_method(self): reload(aiplatform.initializer) reload(aiplatform) + self._job_wait_patcher = mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + self._log_wait_patcher = mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) + self._job_wait_patcher.start() + self._log_wait_patcher.start() def teardown_method(self): + self._job_wait_patcher.stop() + self._log_wait_patcher.stop() aiplatform.initializer.global_pool.shutdown(wait=True) @pytest.mark.parametrize("sync", [True, False]) @@ -874,8 +880,8 @@ def test_submit_custom_job_with_experiments( ) @pytest.mark.parametrize("sync", [True, False]) - @mock.patch.object(jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) def test_create_custom_job_with_timeout( self, create_custom_job_mock, get_custom_job_mock, sync ): @@ -1317,8 +1323,8 @@ def test_create_from_local_script_raises_with_no_staging_bucket( "update_context_mock", ) @pytest.mark.parametrize("sync", [True, False]) - @mock.patch.object(jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) def test_create_from_local_script_prebuilt_container_with_all_args( self, get_custom_job_mock, create_custom_job_mock, sync ): @@ -1381,8 +1387,8 @@ def test_create_from_local_script_prebuilt_container_with_all_args( "update_context_mock", ) @pytest.mark.parametrize("sync", [True, False]) - @mock.patch.object(jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) def test_create_from_local_script_custom_container_with_all_args( self, get_custom_job_mock, create_custom_job_mock, sync ): @@ -1458,8 +1464,8 @@ def test_create_from_local_script_enable_autolog_no_experiment_error(self): job.run() @pytest.mark.parametrize("sync", [True, False]) - @mock.patch.object(jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) def test_create_custom_job_with_enable_web_access( self, create_custom_job_mock_with_enable_web_access, @@ -1525,8 +1531,8 @@ def test_get_web_access_uris(self, get_custom_job_mock_with_enable_web_access): assert job.web_access_uris == _TEST_WEB_ACCESS_URIS break - @mock.patch.object(jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) def test_log_access_web_uris_after_get( self, get_custom_job_mock_with_enable_web_access ): diff --git a/tests/unit/aiplatform/test_custom_job_persistent_resource.py b/tests/unit/aiplatform/test_custom_job_persistent_resource.py index bfdab91c02..a10408c97a 100644 --- a/tests/unit/aiplatform/test_custom_job_persistent_resource.py +++ b/tests/unit/aiplatform/test_custom_job_persistent_resource.py @@ -152,8 +152,14 @@ class TestCustomJobPersistentResource: def setup_method(self): reload(aiplatform.initializer) reload(aiplatform) + self._job_wait_patcher = mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + self._log_wait_patcher = mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) + self._job_wait_patcher.start() + self._log_wait_patcher.start() def teardown_method(self): + self._job_wait_patcher.stop() + self._log_wait_patcher.stop() aiplatform.initializer.global_pool.shutdown(wait=True) @pytest.mark.parametrize("sync", [True, False]) diff --git a/tests/unit/aiplatform/test_end_to_end.py b/tests/unit/aiplatform/test_end_to_end.py index 85f127a1f6..a0c2bd2f20 100644 --- a/tests/unit/aiplatform/test_end_to_end.py +++ b/tests/unit/aiplatform/test_end_to_end.py @@ -108,10 +108,24 @@ def make_training_pipeline(state, add_training_task_metadata=True): @pytest.mark.usefixtures("google_auth_mock") class TestEndToEnd: def setup_method(self): + from unittest import mock + from google.cloud.aiplatform import training_jobs, jobs reload(initializer) reload(aiplatform) + self._tj_wait_patcher = mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + self._tj_log_patcher = mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) + self._jb_wait_patcher = mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + self._jb_log_patcher = mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) + self._tj_wait_patcher.start() + self._tj_log_patcher.start() + self._jb_wait_patcher.start() + self._jb_log_patcher.start() def teardown_method(self): + self._tj_wait_patcher.stop() + self._tj_log_patcher.stop() + self._jb_wait_patcher.stop() + self._jb_log_patcher.stop() initializer.global_pool.shutdown(wait=True) @pytest.mark.usefixtures( diff --git a/tests/unit/aiplatform/test_hyperparameter_tuning_job.py b/tests/unit/aiplatform/test_hyperparameter_tuning_job.py index 8c546389f3..6baae0081b 100644 --- a/tests/unit/aiplatform/test_hyperparameter_tuning_job.py +++ b/tests/unit/aiplatform/test_hyperparameter_tuning_job.py @@ -445,13 +445,19 @@ class TestHyperparameterTuningJob: def setup_method(self): reload(aiplatform.initializer) reload(aiplatform) + self._job_wait_patcher = mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + self._log_wait_patcher = mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) + self._job_wait_patcher.start() + self._log_wait_patcher.start() def teardown_method(self): + self._job_wait_patcher.stop() + self._log_wait_patcher.stop() aiplatform.initializer.global_pool.shutdown(wait=True) @pytest.mark.parametrize("sync", [True, False]) - @mock.patch.object(jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) def test_create_hyperparameter_tuning_job( self, create_hyperparameter_tuning_job_mock, @@ -1017,8 +1023,8 @@ def test_create_hyperparameter_tuning_job_with_tensorboard( ) @pytest.mark.parametrize("sync", [True, False]) - @mock.patch.object(jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) def test_create_hyperparameter_tuning_job_with_enable_web_access( self, create_hyperparameter_tuning_job_mock_with_enable_web_access, @@ -1100,8 +1106,8 @@ def test_create_hyperparameter_tuning_job_with_enable_web_access( caplog.clear() - @mock.patch.object(jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) def test_log_enable_web_access_after_get_hyperparameter_tuning_job( self, get_hyperparameter_tuning_job_mock_with_enable_web_access, diff --git a/tests/unit/aiplatform/test_hyperparameter_tuning_job_persistent_resource.py b/tests/unit/aiplatform/test_hyperparameter_tuning_job_persistent_resource.py index 792d9c51b0..f8241fdc6d 100644 --- a/tests/unit/aiplatform/test_hyperparameter_tuning_job_persistent_resource.py +++ b/tests/unit/aiplatform/test_hyperparameter_tuning_job_persistent_resource.py @@ -241,8 +241,14 @@ class TestHyperparameterTuningJobPersistentResource: def setup_method(self): reload(aiplatform.initializer) reload(aiplatform) + self._job_wait_patcher = mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + self._log_wait_patcher = mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) + self._job_wait_patcher.start() + self._log_wait_patcher.start() def teardown_method(self): + self._job_wait_patcher.stop() + self._log_wait_patcher.stop() aiplatform.initializer.global_pool.shutdown(wait=True) @pytest.mark.parametrize("sync", [True, False]) diff --git a/tests/unit/aiplatform/test_jobs.py b/tests/unit/aiplatform/test_jobs.py index 78067fdb37..a12c182cc6 100644 --- a/tests/unit/aiplatform/test_jobs.py +++ b/tests/unit/aiplatform/test_jobs.py @@ -642,8 +642,14 @@ class TestBatchPredictionJob: def setup_method(self): reload(initializer) reload(aiplatform) + self._job_wait_patcher = mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + self._log_wait_patcher = mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) + self._job_wait_patcher.start() + self._log_wait_patcher.start() def teardown_method(self): + self._job_wait_patcher.stop() + self._log_wait_patcher.stop() initializer.global_pool.shutdown(wait=True) def test_init_batch_prediction_job(self, get_batch_prediction_job_mock): @@ -755,8 +761,8 @@ def test_batch_prediction_iter_dirs_invalid_output_info(self): ) bp.iter_outputs() - @mock.patch.object(jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.usefixtures("get_batch_prediction_job_mock") def test_batch_predict_gcs_source_and_dest( @@ -804,8 +810,8 @@ def test_batch_predict_gcs_source_and_dest( timeout=None, ) - @mock.patch.object(jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.usefixtures("get_batch_prediction_job_mock") def test_batch_predict_gcs_source_and_dest_with_timeout( @@ -853,8 +859,8 @@ def test_batch_predict_gcs_source_and_dest_with_timeout( timeout=180.0, ) - @mock.patch.object(jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.usefixtures("get_batch_prediction_job_mock") def test_batch_predict_gcs_source_and_dest_with_timeout_not_explicitly_set( @@ -902,7 +908,7 @@ def test_batch_predict_gcs_source_and_dest_with_timeout_not_explicitly_set( ) @mock.patch.object(jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures("get_batch_prediction_job_mock") def test_batch_predict_job_done_create(self, create_batch_prediction_job_mock): aiplatform.init(project=_TEST_PROJECT, location=_TEST_LOCATION) @@ -925,8 +931,8 @@ def test_batch_predict_job_done_create(self, create_batch_prediction_job_mock): assert batch_prediction_job.done() is True - @mock.patch.object(jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures("get_batch_prediction_job_mock") def test_batch_predict_job_submit(self, create_batch_prediction_job_mock): aiplatform.init(project=_TEST_PROJECT, location=_TEST_LOCATION) @@ -953,8 +959,8 @@ def test_batch_predict_job_submit(self, create_batch_prediction_job_mock): == jobs.gca_job_state.JobState.JOB_STATE_SUCCEEDED ) - @mock.patch.object(jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.usefixtures("get_batch_prediction_job_mock") def test_batch_predict_gcs_source_bq_dest( @@ -1006,8 +1012,8 @@ def test_batch_predict_gcs_source_bq_dest( timeout=None, ) - @mock.patch.object(jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.usefixtures("get_batch_prediction_job_mock") def test_batch_predict_with_all_args( @@ -1086,8 +1092,8 @@ def test_batch_predict_with_all_args( timeout=None, ) - @mock.patch.object(jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.usefixtures("get_batch_prediction_job_v1beta1_mock") def test_batch_predict_with_all_args_and_model_monitoring( diff --git a/tests/unit/aiplatform/test_language_models.py b/tests/unit/aiplatform/test_language_models.py index 74c264d907..2709976d07 100644 --- a/tests/unit/aiplatform/test_language_models.py +++ b/tests/unit/aiplatform/test_language_models.py @@ -1793,14 +1793,21 @@ class TestLanguageModels: """Unit tests for the language models.""" def setup_method(self): + from google.cloud.aiplatform import pipeline_jobs reload(initializer) reload(aiplatform) aiplatform.init( project=_TEST_PROJECT, location=_TEST_LOCATION, ) + self._job_wait_patcher = mock.patch.object(pipeline_jobs, "_JOB_WAIT_TIME", 0.05) + self._log_wait_patcher = mock.patch.object(pipeline_jobs, "_LOG_WAIT_TIME", 0.05) + self._job_wait_patcher.start() + self._log_wait_patcher.start() def teardown_method(self): + self._job_wait_patcher.stop() + self._log_wait_patcher.stop() initializer.global_pool.shutdown(wait=True) @pytest.mark.parametrize("api_transport", ["grpc", "rest"]) @@ -4865,6 +4872,17 @@ def test_batch_prediction_for_text_embedding_preview(self): # TODO (b/285946649): add more test coverage before public preview release @pytest.mark.usefixtures("google_auth_mock") class TestLanguageModelEvaluation: + def setup_method(self): + from google.cloud.aiplatform import pipeline_jobs + self._job_wait_patcher = mock.patch.object(pipeline_jobs, "_JOB_WAIT_TIME", 0.05) + self._log_wait_patcher = mock.patch.object(pipeline_jobs, "_LOG_WAIT_TIME", 0.05) + self._job_wait_patcher.start() + self._log_wait_patcher.start() + + def teardown_method(self): + self._job_wait_patcher.stop() + self._log_wait_patcher.stop() + @pytest.mark.usefixtures( "get_model_with_tuned_version_label_mock", "get_endpoint_with_models_mock", diff --git a/tests/unit/aiplatform/test_matching_engine_index_endpoint.py b/tests/unit/aiplatform/test_matching_engine_index_endpoint.py index 6c4454401a..bb8d3642a8 100644 --- a/tests/unit/aiplatform/test_matching_engine_index_endpoint.py +++ b/tests/unit/aiplatform/test_matching_engine_index_endpoint.py @@ -908,8 +908,12 @@ class TestMatchingEngineIndexEndpoint: def setup_method(self): reload(initializer) reload(aiplatform) + # Patch sleep to speed up polling in tests + self._sleep_patcher = mock.patch("time.sleep", side_effect=lambda seconds: None) + self._sleep_patcher.start() def teardown_method(self): + self._sleep_patcher.stop() initializer.global_pool.shutdown(wait=True) @pytest.mark.parametrize( diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index 221a4435a1..d7addc86a9 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -704,11 +704,17 @@ def setup_method(self): reload(initializer) reload(aiplatform) aiplatform.init(project=_TEST_PROJECT, location=_TEST_LOCATION) + self._job_wait_patcher = mock.patch.object(pipeline_jobs, "_JOB_WAIT_TIME", 0.05) + self._log_wait_patcher = mock.patch.object(pipeline_jobs, "_LOG_WAIT_TIME", 0.05) + self._job_wait_patcher.start() + self._log_wait_patcher.start() def teardown_method(self): + self._job_wait_patcher.stop() + self._log_wait_patcher.stop() initializer.global_pool.shutdown(wait=True) - @mock.patch.object(pipeline_jobs, "_JOB_WAIT_TIME", 0) + @mock.patch.object(pipeline_jobs, "_JOB_WAIT_TIME", 0.05) @mock.patch.object(pipeline_jobs, "_LOG_WAIT_TIME", 0) def test_block_until_complete_logs_symbolic_state_name( self, @@ -752,8 +758,8 @@ def test_block_until_complete_logs_symbolic_state_name( [_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB], ) @pytest.mark.parametrize("sync", [True, False]) - @mock.patch.object(pipeline_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(pipeline_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(pipeline_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(pipeline_jobs, "_LOG_WAIT_TIME", 0.05) def test_run_call_pipeline_service_create( self, mock_pipeline_service_create, @@ -1256,8 +1262,8 @@ def test_run_call_pipeline_service_create_with_timeout_not_explicitly_set( ], ) @pytest.mark.parametrize("sync", [True, False]) - @mock.patch.object(pipeline_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(pipeline_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(pipeline_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(pipeline_jobs, "_LOG_WAIT_TIME", 0.05) def test_run_call_pipeline_service_create_with_failure_policy( self, mock_pipeline_service_create, @@ -1912,8 +1918,8 @@ def test_cancel_pipeline_job_without_running( [_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB], ) @pytest.mark.parametrize("sync", [True, False]) - @mock.patch.object(pipeline_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(pipeline_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(pipeline_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(pipeline_jobs, "_LOG_WAIT_TIME", 0.05) def test_pipeline_failure_raises(self, mock_load_yaml_and_json, sync): aiplatform.init( project=_TEST_PROJECT, @@ -2416,8 +2422,8 @@ def test_submit_v1beta1_pipeline_job_returns_response( "job_spec", [_TEST_PIPELINE_SPEC_JSON], ) - @mock.patch.object(pipeline_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(pipeline_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(pipeline_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(pipeline_jobs, "_LOG_WAIT_TIME", 0.05) def test_run_call_pipeline_service_create_with_default_runtime( self, mock_load_yaml_and_json, diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index be534e1f4c..14519a908a 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -1349,8 +1349,14 @@ def setup_method(self): ) with open(self._local_script_file_name, "w") as fp: fp.write(_TEST_PYTHON_SOURCE) + self._job_wait_patcher = mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + self._log_wait_patcher = mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) + self._job_wait_patcher.start() + self._log_wait_patcher.start() def teardown_method(self): + self._job_wait_patcher.stop() + self._log_wait_patcher.stop() pathlib.Path(self._local_script_file_name).unlink() initializer.global_pool.shutdown(wait=True) @@ -1407,8 +1413,8 @@ def test_block_until_complete_logs_symbolic_state_name( assert "PIPELINE_STATE_RUNNING" in state_log assert "current state:\n3" not in state_log - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_tabular_dataset( self, @@ -1600,8 +1606,8 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( assert job._has_logged_custom_job - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) def test_custom_training_job_run_raises_with_impartial_explanation_spec( self, mock_pipeline_service_create, @@ -1664,8 +1670,8 @@ def test_custom_training_job_run_raises_with_impartial_explanation_spec( "must be specified." ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) def test_custom_training_tabular_done( self, mock_pipeline_service_create, @@ -1726,8 +1732,8 @@ def test_custom_training_tabular_done( assert job.done() is True - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_tabular_dataset_and_timeout( self, @@ -1889,8 +1895,8 @@ def test_run_call_pipeline_service_create_with_tabular_dataset_and_timeout( timeout=180.0, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_tabular_dataset_and_timeout_not_explicitly_set( self, @@ -2051,8 +2057,8 @@ def test_run_call_pipeline_service_create_with_tabular_dataset_and_timeout_not_e timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_bigquery_destination( self, @@ -2213,8 +2219,8 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", @@ -2494,8 +2500,8 @@ def test_run_call_pipeline_service_create_with_no_dataset( assert model_from_job._gca_resource is mock_model_service_get.return_value - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_enable_web_access", "mock_pipeline_service_get_with_enable_web_access", @@ -2543,8 +2549,8 @@ def test_run_call_pipeline_service_create_with_enable_web_access( ) # TODO: Update test to address Mutant issue b/270708320 - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_enable_dashboard_access", "mock_pipeline_service_get_with_enable_dashboard_access", @@ -2591,8 +2597,8 @@ def test_run_call_pipeline_service_create_with_enable_dashboard_access( gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_scheduling", "mock_pipeline_service_get_with_scheduling", @@ -2652,8 +2658,8 @@ def test_run_call_pipeline_service_create_with_scheduling(self, sync, caplog): == f"{_TEST_MAX_WAIT_DURATION}s" ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_spot_strategy", "mock_pipeline_service_get_with_spot_strategy", @@ -2704,8 +2710,8 @@ def test_run_call_pipeline_service_create_with_spot_strategy(self, sync): == _TEST_SPOT_STRATEGY ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_psc_interface_config", "mock_pipeline_service_get_with_psc_interface_config", @@ -2757,8 +2763,8 @@ def test_run_call_pipeline_service_create_with_psc_interface_config(self, sync): == _TEST_PSC_INTERFACE_CONFIG ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_no_model_to_upload", "mock_pipeline_service_get_with_no_model_to_upload", @@ -2795,8 +2801,8 @@ def test_run_returns_none_if_no_model_to_upload( assert model is None - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_no_model_to_upload", "mock_pipeline_service_get_with_no_model_to_upload", @@ -2910,8 +2916,8 @@ def test_run_raises_if_no_staging_bucket(self): container_uri=_TEST_TRAINING_CONTAINER_IMAGE, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_distributed_training( self, @@ -3072,8 +3078,8 @@ def test_run_call_pipeline_service_create_distributed_training( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_distributed_training_with_reduction_server( self, @@ -3324,8 +3330,8 @@ def test_get_and_return_subclass_custom(self): assert isinstance(subcls, aiplatform.training_jobs.CustomTrainingJob) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_nontabular_dataset_without_model_display_name_nor_model_labels( self, @@ -3531,8 +3537,8 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset_raises_if_anno create_request_timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", @@ -3580,8 +3586,8 @@ def test_cancel_training_job_without_running(self, mock_pipeline_service_cancel) assert e.match(regexp=r"TrainingJob has not been launched") - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_persistent_resource_id", "mock_pipeline_service_get_with_persistent_resource_id", @@ -3625,8 +3631,8 @@ def test_run_call_pipeline_service_create_with_persistent_resource_id( gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", @@ -3716,8 +3722,8 @@ def test_training_job_tpu_v5e(self, mock_pipeline_service_create): timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", @@ -3809,8 +3815,8 @@ def test_training_job_tpu_v3_pod(self, mock_pipeline_service_create): timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", @@ -3908,12 +3914,18 @@ class TestCustomContainerTrainingJob: def setup_method(self): importlib.reload(initializer) importlib.reload(aiplatform) + self._job_wait_patcher = mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + self._log_wait_patcher = mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) + self._job_wait_patcher.start() + self._log_wait_patcher.start() def teardown_method(self): + self._job_wait_patcher.stop() + self._log_wait_patcher.stop() initializer.global_pool.shutdown(wait=True) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) def test_custom_container_training_tabular_done( self, mock_pipeline_service_create, @@ -3962,14 +3974,17 @@ def test_custom_container_training_tabular_done( create_request_timeout=None, ) + mock_pipeline_service_create.return_value.state = \ + gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING + assert job.done() is False job.wait() assert job.done() is True - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_tabular_dataset( self, @@ -4142,8 +4157,8 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( assert job._has_logged_custom_job - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) def test_custom_container_training_job_run_raises_with_impartial_explanation_spec( self, mock_pipeline_service_create, @@ -4198,8 +4213,8 @@ def test_custom_container_training_job_run_raises_with_impartial_explanation_spe "must be specified." ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_tabular_dataset_and_timeout( self, @@ -4366,8 +4381,8 @@ def test_run_call_pipeline_service_create_with_tabular_dataset_and_timeout( # assert job._has_logged_custom_job - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_tabular_dataset_and_timeout_not_explicitly_set( self, @@ -4515,8 +4530,8 @@ def test_run_call_pipeline_service_create_with_tabular_dataset_and_timeout_not_e timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_bigquery_destination( self, @@ -4674,8 +4689,8 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", @@ -4834,8 +4849,8 @@ def test_run_with_incomplete_model_info_raises_with_model_to_upload( create_request_timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_no_dataset( self, @@ -4935,8 +4950,8 @@ def test_run_call_pipeline_service_create_with_no_dataset( assert model_from_job._gca_resource is mock_model_service_get.return_value - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_enable_web_access", "mock_pipeline_service_get_with_enable_web_access", @@ -4983,8 +4998,8 @@ def test_run_call_pipeline_service_create_with_enable_web_access( ) # TODO: Update test to address Mutant issue b/270708320 - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_enable_dashboard_access", "mock_pipeline_service_get_with_enable_dashboard_access", @@ -5030,8 +5045,8 @@ def test_run_call_pipeline_service_create_with_enable_dashboard_access( gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_scheduling", "mock_pipeline_service_get_with_scheduling", @@ -5090,8 +5105,8 @@ def test_run_call_pipeline_service_create_with_scheduling(self, sync, caplog): == f"{_TEST_MAX_WAIT_DURATION}s" ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_returns_none_if_no_model_to_upload( self, @@ -5122,8 +5137,8 @@ def test_run_returns_none_if_no_model_to_upload( assert model is None - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_no_model_to_upload", "mock_pipeline_service_get_with_no_model_to_upload", @@ -5163,8 +5178,8 @@ def test_get_model_raises_if_no_model_to_upload( with pytest.raises(RuntimeError): job.get_model() - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_raises_if_pipeline_fails( self, @@ -5235,8 +5250,8 @@ def test_run_raises_if_no_staging_bucket(self): command=_TEST_TRAINING_CONTAINER_CMD, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_distributed_training( self, @@ -5387,8 +5402,8 @@ def test_run_call_pipeline_service_create_distributed_training( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_distributed_training_with_reduction_server( self, @@ -5501,8 +5516,8 @@ def test_run_call_pipeline_service_create_distributed_training_with_reduction_se assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_nontabular_dataset( self, @@ -5706,8 +5721,8 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset_raises_if_anno create_request_timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_persistent_resource_id", "mock_pipeline_service_get_with_persistent_resource_id", @@ -5751,8 +5766,8 @@ def test_run_call_pipeline_service_create_with_persistent_resource_id( gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", @@ -5838,8 +5853,8 @@ def test_training_job_tpu_v5e(self, mock_pipeline_service_create): timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", @@ -5927,8 +5942,8 @@ def test_training_job_tpu_v3_pod(self, mock_pipeline_service_create): timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", @@ -6382,12 +6397,18 @@ class TestCustomPythonPackageTrainingJob: def setup_method(self): importlib.reload(initializer) importlib.reload(aiplatform) + self._job_wait_patcher = mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + self._log_wait_patcher = mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) + self._job_wait_patcher.start() + self._log_wait_patcher.start() def teardown_method(self): + self._job_wait_patcher.stop() + self._log_wait_patcher.stop() initializer.global_pool.shutdown(wait=True) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) @pytest.mark.parametrize( "python_package_gcs_uri", @@ -6576,8 +6597,8 @@ def test_run_call_pipeline_service_create_with_tabular_dataset( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) def test_custom_python_package_training_job_run_raises_with_wrong_package_uris( self, mock_pipeline_service_create, @@ -6617,8 +6638,8 @@ def test_custom_python_package_training_job_run_raises_with_wrong_package_uris( assert e.match("'python_package_gcs_uri' must be a string or list.") - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) def test_custom_python_package_training_job_run_raises_with_impartial_explanation_spec( self, mock_pipeline_service_create, @@ -6675,8 +6696,8 @@ def test_custom_python_package_training_job_run_raises_with_impartial_explanatio "must be specified." ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_tabular_dataset_with_timeout( self, @@ -6833,8 +6854,8 @@ def test_run_call_pipeline_service_create_with_tabular_dataset_with_timeout( timeout=180.0, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_tabular_dataset_with_timeout_not_explicitly_set( self, @@ -6990,8 +7011,8 @@ def test_run_call_pipeline_service_create_with_tabular_dataset_with_timeout_not_ timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_tabular_dataset_without_model_display_name_nor_model_labels( self, @@ -7147,8 +7168,8 @@ def test_run_call_pipeline_service_create_with_tabular_dataset_without_model_dis assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_bigquery_destination( self, @@ -7308,8 +7329,8 @@ def test_run_call_pipeline_service_create_with_bigquery_destination( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", @@ -7435,8 +7456,8 @@ def test_run_with_two_split_raises( create_request_timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_with_incomplete_model_info_raises_with_model_to_upload( self, @@ -7472,8 +7493,8 @@ def test_run_with_incomplete_model_info_raises_with_model_to_upload( create_request_timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_no_dataset( self, @@ -7575,8 +7596,8 @@ def test_run_call_pipeline_service_create_with_no_dataset( assert model_from_job._gca_resource is mock_model_service_get.return_value - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_enable_web_access", "mock_pipeline_service_get_with_enable_web_access", @@ -7624,8 +7645,8 @@ def test_run_call_pipeline_service_create_with_enable_web_access( ) # TODO: Update test to address Mutant issue b/270708320 - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_enable_dashboard_access", "mock_pipeline_service_get_with_enable_dashboard_access", @@ -7671,8 +7692,8 @@ def test_run_call_pipeline_service_create_with_enable_dashboard_access( gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_scheduling", "mock_pipeline_service_get_with_scheduling", @@ -7732,8 +7753,8 @@ def test_run_call_pipeline_service_create_with_scheduling(self, sync, caplog): == f"{_TEST_MAX_WAIT_DURATION}s" ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_no_model_to_upload", "mock_pipeline_service_get_with_no_model_to_upload", @@ -7770,8 +7791,8 @@ def test_run_returns_none_if_no_model_to_upload( assert model is None - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_no_model_to_upload", "mock_pipeline_service_get_with_no_model_to_upload", @@ -7812,8 +7833,8 @@ def test_get_model_raises_if_no_model_to_upload( with pytest.raises(RuntimeError): job.get_model() - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_raises_if_pipeline_fails( self, @@ -7883,8 +7904,8 @@ def test_run_raises_if_no_staging_bucket(self): container_uri=_TEST_TRAINING_CONTAINER_IMAGE, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_distributed_training( self, @@ -8038,8 +8059,8 @@ def test_run_call_pipeline_service_create_distributed_training( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_distributed_training_with_reduction_server( self, @@ -8155,8 +8176,8 @@ def test_run_call_pipeline_service_create_distributed_training_with_reduction_se assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_nontabular_dataset_without_model_display_name_nor_model_labels( self, @@ -8362,8 +8383,8 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset_raises_if_anno model_display_name=_TEST_MODEL_DISPLAY_NAME, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create_with_persistent_resource_id", "mock_pipeline_service_get_with_persistent_resource_id", @@ -8408,8 +8429,8 @@ def test_run_call_pipeline_service_create_with_persistent_resource_id( gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", @@ -8497,8 +8518,8 @@ def test_training_job_tpu_v5e(self, mock_pipeline_service_create): timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", @@ -8588,8 +8609,8 @@ def test_training_job_tpu_v3_pod(self, mock_pipeline_service_create): timeout=None, ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", @@ -8718,8 +8739,8 @@ class TestVersionedTrainingJobs: training_jobs.CustomPythonPackageTrainingJob, ], ) - @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) - @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 0.05) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 0.05) def test_run_pipeline_for_versioned_model( self, mock_pipeline_service_create_with_version, diff --git a/tests/unit/aiplatform/test_uploader.py b/tests/unit/aiplatform/test_uploader.py index aa46b49f71..f35ed1f50b 100644 --- a/tests/unit/aiplatform/test_uploader.py +++ b/tests/unit/aiplatform/test_uploader.py @@ -1612,7 +1612,10 @@ def test_thread_continuously_uploads( uploader_thread = threading.Thread(target=uploader.start_uploading) uploader_thread.start() - time.sleep(5) + for _ in range(50): + if mock_client.write_tensorboard_experiment_data.call_count >= 1: + break + time.sleep(0.1) # Check create_time_series calls self.assertEqual(4, mock_client.create_tensorboard_time_series.call_count) @@ -1670,7 +1673,7 @@ def test_thread_continuously_uploads( with mock.patch.object(uploader, "_end_experiment_runs", return_value=None): uploader._end_uploading() uploader._end_experiment_runs.assert_called_once() - time.sleep(1) + uploader_thread.join(timeout=2) self.assertFalse(uploader_thread.is_alive()) mock_client.write_tensorboard_experiment_data.reset_mock() @@ -1680,7 +1683,7 @@ def test_thread_continuously_uploads( with mock.patch.object(uploader, "_end_experiment_runs", return_value=None): uploader._end_uploading() uploader._end_experiment_runs.assert_called_once() - time.sleep(1) + uploader_thread.join(timeout=2) self.assertFalse(uploader_thread.is_alive()) experiment_tracker_mock.set_experiment.assert_called_once() diff --git a/tests/unit/vertexai/test_evaluation.py b/tests/unit/vertexai/test_evaluation.py index 37789dadb5..677d6b3786 100644 --- a/tests/unit/vertexai/test_evaluation.py +++ b/tests/unit/vertexai/test_evaluation.py @@ -816,12 +816,19 @@ def mock_storage_blob_from_string(): @pytest.mark.usefixtures("google_auth_mock") class TestEvaluation: def setup_method(self): + from google.cloud.aiplatform import pipeline_jobs vertexai.init( project=_TEST_PROJECT, location=_TEST_LOCATION, ) + self._job_wait_patcher = mock.patch.object(pipeline_jobs, "_JOB_WAIT_TIME", 0.05) + self._log_wait_patcher = mock.patch.object(pipeline_jobs, "_LOG_WAIT_TIME", 0.05) + self._job_wait_patcher.start() + self._log_wait_patcher.start() def teardown_method(self): + self._job_wait_patcher.stop() + self._log_wait_patcher.stop() initializer.global_pool.shutdown(wait=True) def test_create_eval_task(self): @@ -1836,12 +1843,19 @@ def test_compute_rubric_based_metric(self, api_transport): @pytest.mark.usefixtures("google_auth_mock") class TestAgentEvaluation: def setup_method(self): + from google.cloud.aiplatform import pipeline_jobs vertexai.init( project=_TEST_PROJECT, location=_TEST_LOCATION, ) + self._job_wait_patcher = mock.patch.object(pipeline_jobs, "_JOB_WAIT_TIME", 0.05) + self._log_wait_patcher = mock.patch.object(pipeline_jobs, "_LOG_WAIT_TIME", 0.05) + self._job_wait_patcher.start() + self._log_wait_patcher.start() def teardown_method(self): + self._job_wait_patcher.stop() + self._log_wait_patcher.stop() initializer.global_pool.shutdown(wait=True) @pytest.mark.parametrize("api_transport", ["grpc", "rest"]) @@ -2674,7 +2688,7 @@ def test_sleep_and_advance(self): assert (time.time() - start_time) >= 0.5 def test_thread_safety(self): - rate_limiter = utils.RateLimiter(rate=2) + rate_limiter = utils.RateLimiter(rate=20) start_time = time.time() def target(): @@ -2686,10 +2700,10 @@ def target(): for thread in threads: thread.join() - # Verify that the total minimum time should be 4.5 seconds - # (9 intervals of 0.5 seconds each). + # Verify that the total minimum time should be 0.45 seconds + # (9 intervals of 0.05 seconds each). total_time = time.time() - start_time - assert total_time >= 4.5 + assert total_time >= 0.45 # TODO(b/361123127) Add test_to_metrics_spec back