Skip to content

feat(connectors): add Meilisearch source connector#3498

Open
countradooku wants to merge 10 commits into
apache:masterfrom
countradooku:feat/meilisearch-source-connector
Open

feat(connectors): add Meilisearch source connector#3498
countradooku wants to merge 10 commits into
apache:masterfrom
countradooku:feat/meilisearch-source-connector

Conversation

@countradooku

Copy link
Copy Markdown
Contributor

Summary

  • add the Meilisearch source connector using the official meilisearch-sdk
  • document source configuration, retry behavior, and primary-key cursor requirements
  • add Docker-backed integration coverage for producing indexed Meilisearch documents into Iggy

Motivation

This splits the source portion out of #3404 so the Meilisearch source can be reviewed and merged independently from the sink connector.

Closes #3496.

Validation

  • cargo sort --no-format --workspace
  • cargo test -p iggy_connector_meilisearch_source
  • cargo clippy -p iggy_connector_meilisearch_source --all-targets -- -D warnings
  • env CARGO_BIN_EXE_iggy-server=... CARGO_BIN_EXE_iggy-connectors=... cargo test -p integration --test mod -- connectors::meilisearch::meilisearch_source
  • git diff --check

@countradooku countradooku marked this pull request as ready for review June 15, 2026 18:59
@github-actions github-actions Bot added the S-waiting-on-review PR is waiting on a reviewer label Jun 15, 2026
@codecov

codecov Bot commented Jun 15, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.02%. Comparing base (74d62eb) to head (36a00cd).

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #3498      +/-   ##
============================================
- Coverage     74.41%   73.02%   -1.39%     
  Complexity      937      937              
============================================
  Files          1243     1193      -50     
  Lines        125987   119755    -6232     
  Branches     101854   100060    -1794     
============================================
- Hits          93756    87457    -6299     
- Misses        29218    29393     +175     
+ Partials       3013     2905     -108     
Components Coverage Δ
Rust Core 75.87% <ø> (+0.69%) ⬆️
Java SDK 59.09% <ø> (-3.35%) ⬇️
C# SDK 72.10% <ø> (ø)
Python SDK 88.88% <ø> (ø)
PHP SDK 83.57% <ø> (-0.72%) ⬇️
Node SDK 87.27% <ø> (-3.86%) ⬇️
Go SDK 15.66% <ø> (-24.48%) ⬇️
see 186 files with indirect coverage changes
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Comment thread Cargo.toml
Comment thread core/connectors/sources/meilisearch_source/src/lib.rs
Comment thread core/connectors/sources/meilisearch_source/src/lib.rs
Comment thread core/connectors/sources/meilisearch_source/src/lib.rs Outdated
Comment thread core/connectors/sources/meilisearch_source/src/lib.rs Outdated
Comment thread core/connectors/sources/meilisearch_source/src/lib.rs
Comment thread core/connectors/sources/meilisearch_source/src/lib.rs
Comment thread core/integration/tests/connectors/fixtures/meilisearch/container.rs
@ryerraguntla

Copy link
Copy Markdown
Contributor

overall correct direction - cursor-by-primary-key is the right shape for Meilisearch, test coverage is present but coverage gap — only happy-path covered. No test for: (a) state restored across restart → cursor starts from last_primary_key, not from zero; (b) second poll produces zero duplicates. refer The elasticsearch_source.rs has state_persists_across_connector_restart as a pattern. For a cursor-based source, duplicate-freedom is the core invariant; it should be integration-tested.

@ryerraguntla

Copy link
Copy Markdown
Contributor

/author

@github-actions github-actions Bot added S-waiting-on-author PR is waiting on author response and removed S-waiting-on-review PR is waiting on a reviewer labels Jun 16, 2026
@countradooku

Copy link
Copy Markdown
Contributor Author

/ready

@github-actions github-actions Bot added S-waiting-on-review PR is waiting on a reviewer and removed S-waiting-on-author PR is waiting on author response labels Jun 16, 2026
Comment thread core/connectors/sources/meilisearch_source/src/lib.rs
Comment thread core/connectors/sources/meilisearch_source/src/lib.rs
Comment thread core/integration/tests/connectors/fixtures/meilisearch/container.rs
Comment thread core/connectors/sources/meilisearch_source/src/lib.rs
Comment thread core/connectors/sources/meilisearch_source/README.md
Comment thread core/connectors/sources/meilisearch_source/src/lib.rs
Comment thread core/connectors/sources/meilisearch_source/src/lib.rs
Comment thread core/connectors/sources/meilisearch_source/src/lib.rs Outdated
Comment thread core/connectors/sources/meilisearch_source/src/lib.rs
@github-actions github-actions Bot added the S-waiting-on-review PR is waiting on a reviewer label Jun 17, 2026
Comment thread core/connectors/sources/meilisearch_source/src/lib.rs
Comment thread core/connectors/sources/meilisearch_source/src/lib.rs
Comment thread core/connectors/sources/meilisearch_source/config.toml
Comment thread core/integration/tests/connectors/meilisearch/meilisearch_source.rs Outdated
Comment thread core/integration/tests/connectors/meilisearch/meilisearch_source.rs Outdated
@ryerraguntla

Copy link
Copy Markdown
Contributor

/author

@github-actions github-actions Bot added S-waiting-on-author PR is waiting on author response and removed S-waiting-on-review PR is waiting on a reviewer labels Jun 19, 2026
@countradooku

Copy link
Copy Markdown
Contributor Author

/ready

@github-actions github-actions Bot added S-waiting-on-review PR is waiting on a reviewer and removed S-waiting-on-author PR is waiting on author response labels Jun 23, 2026
.client
.as_ref()
.ok_or_else(|| Error::Connection("Meilisearch client not initialized".to_string()))?;
let (messages, state_changed) = self.search_documents(client).await?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lib.rs:607-611 + lib.rs:418 — bad-PK doc in mid-batch permanently stalls connector document_primary_keys short-circuits on first Err(InvalidConfigValue). poll() returns that error. Runtime stops connector (permanent error type). Cursor unchanged. Restart re-fetches same batch, same error. Stuck loop with no recovery. Fix: skip bad-PK docs with a warn log and advance cursor to last good integer PK seen, OR return good prefix up to bad doc.

Ok(value.clone())
}

fn document_primary_keys(documents: &[Value], primary_key: &str) -> Result<Vec<Value>, Error> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lib.rs:607-611 + lib.rs:418 — bad-PK doc in mid-batch permanently stalls connector document_primary_keys short-circuits on first Err(InvalidConfigValue). poll() returns that error. Runtime stops connector (permanent error type). Cursor unchanged. Restart re-fetches same batch, same error. Stuck loop with no recovery. Fix: skip bad-PK docs with a warn log and advance cursor to last good integer PK seen, OR return good prefix up to bad doc.


#[async_trait]
impl Source for MeilisearchSource {
async fn open(&mut self) -> Result<(), Error> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sortableAttributes not validated at open() Connector relies on primary_key:asc sort for cursor monotonicity. Meilisearch silently ignores sort on a field not in sortableAttributes. Result: hits returned in arbitrary order, cursor advances to arbitrary last-in-batch PK, skips documents forever with no error signal. Fix: call GET /indexes/{uid}/settings/sortable-attributes at open() time; fail with InvalidConfigValue if primary key field absent.

})
}

async fn check_connectivity(&self, client: &Client) -> Result<(), Error> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check_connectivity fails immediately on non-"available" health retry_sdk_open_operation only retries transient SDK errors. If Meilisearch returns 200 OK with {"status":"unknown"} (during startup), the retry loop never fires; check_connectivity returns Err(Connection(...)) immediately. open() fails on a transient condition. Meilisearch can transiently return non-available status during index loading. Fix: add inner retry loop around the status check up to max_open_retries, mirroring the sink pattern.

let primary_keys = document_primary_keys(&documents, primary_key)?;
let last_document_primary_key = primary_keys.into_iter().last();
let messages = self.documents_to_messages(documents)?;
let state_changed = last_document_primary_key.is_some();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

252 + lib.rs:258-259 + lib.rs:419 - poll_count/documents_produced diverge from persisted state on empty polls
Both fields incremented in-memory at :258-259 on every poll (including empty). state_changed=false on empty polls → persisted_state = None → runtime keeps prior state. On restart, poll_count resets to
last-checkpointed value, not current. No functional correctness impact (metrics only) but persisted state and in-memory state diverge indefinitely. Fix: either always persist state (remove the if state_changed
gate for these fields) or move poll_count/documents_produced to transient struct fields not serialized.

}
}
MeilisearchSdkError::MeilisearchCommunication(communication_error) => {
if communication_error.status_code == 429 || communication_error.status_code >= 500 {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lib.rs:626 + lib.rs:649 — status_code == 0 treated as permanent error
is_transient_sdk_error returns false for MeilisearchCommunication with status_code == 0. map_sdk_error maps it to PermanentHttpError. Meilisearch SDK produces status_code == 0 on pre-response connection failures (connection refused, DNS failure) — these are transient. Fix: add || communication_error.status_code == 0 at both :626 and :649.

})
.await?;
let documents: Vec<Value> = results.hits.into_iter().map(|hit| hit.result).collect();
let primary_keys = document_primary_keys(&documents, primary_key)?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lib.rs:249-250 — document_primary_keys allocates Vec of N cloned PKs; only last() used document_primary_keys at :607-611 clones every PK value into a Vec; into_iter().last() discards all but the last. For batch_size=100, 99 allocations wasted per poll. ```
Fix:
let last_document_primary_key = documents
.iter()
.map(|doc| document_primary_key(doc, primary_key))
.try_fold(None::, |_, pk| pk.map(Some))?;

let client = self.create_client()?;
self.check_connectivity(&client).await?;
let primary_key = self.get_primary_key(&client).await?;
info!(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lib.rs:396-398 — integer-only PK constraint too late to fail
open() succeeds on string-PK indexes (PK name returned fine). InvalidConfigValue fires at first poll() when document values are seen as non-integers. Operator configured connector, open() returned OK, connector
runs, stops permanently on first poll. No way to detect string PK at open() without fetching sample documents; document this limitation prominently (currently buried in README para 2 of Behavior).

#[serde(serialize_with = "iggy_common::serde_secret::serialize_optional_secret")]
pub api_key: Option<SecretString>,
pub query: Option<String>,
pub filter: Option<Value>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter: Option config type undocumented for array syntax
filter accepts string or nested array-of-strings/arrays. Array form only works with JSON config (plugin_config_format = "json") not TOML. README says "string or array" with zero example. Config.toml shows only
string form. Operators won't know how to use array syntax. Fix: add array example to README + note it requires JSON config format.

Additional Information:

  • lib.rs:59 — pub filter: Option — field declaration in public config struct

  • README.md:12 — filter: Optional Meilisearch filter expression or array. — single-line doc, no example of array syntax, no note that array form requires plugin_config_format = "json"

  • config.toml:24 — plugin_config_format = "json" is set, but no filter field appears in the config example at all (lines 34-45)

    The gap: config.toml omits filter entirely, README:12 mentions array but gives no example, and nowhere is it stated that array form requires the JSON config format (which is only set at config.toml:24). String filter works in both TOML and JSON config; array filter requires JSON. That distinction is never documented.

@ryerraguntla

Copy link
Copy Markdown
Contributor

/author

@github-actions github-actions Bot added S-waiting-on-author PR is waiting on author response and removed S-waiting-on-review PR is waiting on a reviewer labels Jun 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

S-waiting-on-author PR is waiting on author response

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add Meilisearch source connector

3 participants