feat(connectors): add Meilisearch sink connector#3497
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3497 +/- ##
=============================================
- Coverage 74.41% 39.58% -34.83%
Complexity 937 937
=============================================
Files 1243 1178 -65
Lines 125698 105778 -19920
Branches 101616 86290 -15326
=============================================
- Hits 93541 41876 -51665
- Misses 29141 61468 +32327
+ Partials 3016 2434 -582
🚀 New features to boost your workflow:
|
|
overall correct approach - JSON-object document model with generated PK is sound, wait_for_tasks + partial-error accounting is thoughtful. test coverage is present but there is |
|
/author |
|
/ready |
|
/author |
|
/ready |
| } | ||
| } | ||
|
|
||
| async fn index_documents( |
There was a problem hiding this comment.
batch partial failure with document_action=update: when chunk N fails, chunks 0..N-1 are already indexed; runtime retries entire batch; update action applies partial updates a second time to already-succeeded chunks. replace action is idempotent; update is not. No warn at open() when document_action=update. Fix: add warn! at open() when action is Update, documenting at-least-once + non-idempotent.
|
|
||
| #[async_trait] | ||
| impl Sink for MeilisearchSink { | ||
| async fn open(&mut self) -> Result<(), Error> { |
There was a problem hiding this comment.
open() has no warn when wait_for_tasks=false. close() now warns, but operators see that too late. Fix: add warn! early in open() when !self.config.wait_for_tasks.
| } | ||
| } | ||
|
|
||
| async fn ensure_index_exists(&self, client: &Client) -> Result<(), Error> { |
There was a problem hiding this comment.
ensure_index_exists on Some(_) only logs "already exists"; does not validate existing index's actual primary key against self.config.primary_key. A misconfigured primary_key on an existing index fails silently at first consume() with a permanent Meilisearch error, not at open(). Fix: read index.primary_key from the returned Index and warn! (or Err) on mismatch.
| config.task_poll_interval.as_deref(), | ||
| DEFAULT_TASK_POLL_INTERVAL, | ||
| ); | ||
| let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES); |
There was a problem hiding this comment.
retry_delay > max_retry_delay not guarded. exponential_backoff clamps to max, so behavior is technically correct, but operator who sets retry_delay="10s", max_retry_delay="1s" gets silent misconfiguration. Fix: swap + warn! (cf. http_sink pattern).
| async move { | ||
| let mut last_count = 0usize; | ||
| for _ in 0..POLL_ATTEMPTS { | ||
| if let Ok(documents) = self.list_documents(TEST_INDEX).await { |
There was a problem hiding this comment.
wait_for_documents hardcodes TEST_INDEX instead of forwarding the index_name param to list_documents.
|
@countradooku Looks good except for the few warnings around back indexing and open calls. After the above requested changes, good to go for second reviewer. |
|
/author |
Summary
meilisearch-sdkMotivation
This splits the sink portion out of #3404 so the Meilisearch sink can be reviewed and merged independently from the source connector.
Closes #3495.
Validation
cargo sort --no-format --workspacecargo test -p iggy_connector_meilisearch_sinkcargo clippy -p iggy_connector_meilisearch_sink --all-targets -- -D warningsenv CARGO_BIN_EXE_iggy-server=... CARGO_BIN_EXE_iggy-connectors=... cargo test -p integration --test mod -- connectors::meilisearch::meilisearch_sinkgit diff --check