fix(connectors): bring quickwit_sink up to convention#3523
Conversation
- Defer index_id YAML parse to open(); return InvalidConfigValue instead of panicking via expect() - Use ClientWithMiddleware (build_retry_client) for ingest retries with exponential backoff on 5xx / connection errors - check_connectivity_with_retry on open() against /health/livez - Map 4xx responses (except 429) to PermanentHttpError so the circuit breaker is not tripped by bad payloads - Add verbose_logging / max_retries / retry_delay / max_retry_delay / max_open_retries / open_retry_max_delay to QuickwitSinkConfig - Downgrade per-batch consume() log to debug! (info! when verbose) - Set Content-Type: application/x-ndjson on ingest requests - Drop unused dashmap / once_cell deps; add reqwest-middleware - 5 unit tests: verbose flag, client init, index_id init Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
|
Thanks for the PR. It is labeled Slash commands (own line, regular comment) move it around the queue:
See CONTRIBUTING.md for details. |
- AGENTS.md: 104→75 lines. Removed redundant repo structure (derivable by ls), collapsed principles to iggy-specific rules only, merged Jenkins/QW infra into Infra section, updated handover block. - TODO.md: replaced stale checked items with 4 open PRs (apache#3516 apache#3517 apache#3523 apache#3525) + QW 0.9 upgrade task. - DONE.md: added sessions 5-10 block (QW sink pipeline, collector cutover, InvalidOffset bug + fix). - quickwit_sink/src/lib.rs: cargo fmt reformatting only.
| info!("Created index: {}", self.index_id); | ||
| Ok(()) | ||
| } | ||
|
|
There was a problem hiding this comment.
lib.rs:ingest() — create_index() treats 409 Conflict as InitError; concurrent open() calls (multi-instance, restart race) both see has_index()=false, both POST, second gets 4xx → InitError → connector
never opens. Fix: absorb 409 (and 400 "already exists") as Ok(()) in create_index(). Retry middleware also retries a 5xx create that succeeded server-side; on retry, server returns 409 → same path. Same fix
covers both.
There was a problem hiding this comment.
Sorry for the late response -- I was running a local benchmark to make sure the setup is working end-to-end.
Fixed in 512b71f04: create_index now handles 409 CONFLICT as a success case -- another instance beat us to it, but the index exists, which is what we wanted. Only other 4xx errors propagate as InitError.
| self.config.open_retry_max_delay.as_deref(), | ||
| DEFAULT_OPEN_RETRY_MAX_DELAY, | ||
| ); | ||
|
|
There was a problem hiding this comment.
reqwest::Client::new() has no timeout; health probe and has_index()/create_index() can hang indefinitely under network partition or slow-starting Quickwit. Fix: reqwest::Client::builder().timeout(...) with configurable or sensible default (e.g. 30s)
There was a problem hiding this comment.
Sorry for the late response -- I was running a local benchmark to make sure the setup is working end-to-end.
Fixed in 512b71f04: added request_timeout: Option<String> to QuickwitSinkConfig (default "30s") and wired it into reqwest::Client::builder().timeout(request_timeout).build() in open(). The example config TOML has the field commented in as a reference.
|
@mfyuce - all the above are minors. I am not sure of quickwit s production data set distribution to mention about the need for circuit breakers (if there are huge number of records/documents for the same cursor key for a given batch size) . Please make a judgement call about the need for circuit breaker implementation . otherwise it is all set for second reviewer's comments before merging. |
|
/author |
…t timeout 409 CONFLICT (and 400 "already exists" for older QW) returned by create_index() no longer fails connector open. This covers two races: concurrent open() calls where both see has_index()=false, and retry middleware retrying a 5xx create that already succeeded server-side. Add request_timeout (default 30s) on the underlying reqwest Client so health probes and index management calls time out under network partition instead of hanging indefinitely. Fixes review feedback from ryerraguntla on PR apache#3523.
|
Thanks for the thorough review @ryerraguntla! Both issues addressed in the latest push: 409 / "already exists" on No timeout on /ready |
|
@ryerraguntla — judgment call on the circuit breaker: The existing QuickWit is typically an internal service in the same cluster, so prolonged partition is rare, and the runtime already isolates failures per connector. A full half-open / trip-threshold circuit breaker on top of this would add complexity without clear benefit for this specific sink. Happy to revisit if the second reviewer sees a concrete failure mode that the existing retry policy doesn't cover. |
apache#3523 review addressed (409 absorb + request_timeout). Four PRs all S-waiting-on-review; pipeline live and clean.
72→60 lines: removed iggy-bench infra line, dropped unwrap/expect and BDD-naming rules (standard Rust / discoverable from tests), folded connectors-overview note into Skills section header. Handover updated: apache#3523 review addressed (409 + timeout), five PRs all S-waiting-on-review, next steps clarified.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #3523 +/- ##
============================================
- Coverage 74.46% 74.40% -0.06%
Complexity 937 937
============================================
Files 1243 1243
Lines 125937 125520 -417
Branches 101857 101484 -373
============================================
- Hits 93775 93396 -379
+ Misses 29152 29071 -81
- Partials 3010 3053 +43
🚀 New features to boost your workflow:
|
|
Can you please add a brief rationale/problem statement/why behind the change? I am finding the PR hard to review without clarity on the goal being achieved. |
…t_timeout Concurrent open() calls (multi-instance restart race) can both see has_index()=false and both POST to create the index; the second request gets 409 Conflict, which is not an error condition -- the index exists, which is exactly what we want. Absorb 409 as Ok(()). reqwest::Client::new() has no timeout, leaving health-probe and index management calls able to hang indefinitely under network partition. Add request_timeout (default 30s) wired into Client::builder().timeout(). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Sorry for the late response -- was doing a local benchmark to make sure the setup is working. @kriti-sc good call, here is the rationale: The original 1. No request timeout -- 2. No connectivity check on 3. No retry middleware -- transient 5xx or 429 responses from QuickWit caused immediate batch failure and offset advancement. This PR wires 4. 409 Conflict on The goal is to bring |
- AGENTS.md: 245→74 lines. Removed ToC, Structure tree, Where-to-look table, Tooling table, Discussion section (all derivable or static). Compressed principles to iggy-specific rules only. - TOBEDECIDED.md: commit unstaged segment compression design section. - Handover updated: apache#3523 review addressed (409 absorb, request_timeout, circuit-breaker judgment); all 5 PRs S-waiting-on-review.
- AGENTS.md: 245→74 lines. Removed ToC, Structure tree, Where-to-look table, Tooling table, Discussion section (all derivable or static). Compressed principles to iggy-specific rules only. - TOBEDECIDED.md: commit unstaged segment compression design section. - Handover updated: apache#3523 review addressed (409 absorb, request_timeout, circuit-breaker judgment); all 5 PRs S-waiting-on-review.
| DEFAULT_OPEN_RETRY_MAX_DELAY, | ||
| ); | ||
|
|
||
| let request_timeout = parse_duration(self.config.request_timeout.as_deref(), "30s"); |
There was a problem hiding this comment.
instead of 30s, define a const like the DEFAULT_* defined at the top of this file, and use that here
Summary
index_idYAML parse toopen()— returnInvalidConfigValueinstead of panicking viaexpect()ClientWithMiddleware(build_retry_client) for ingest retries with exponential backoff on 5xx / connection errorscheck_connectivity_with_retryinopen()against/health/livezPermanentHttpErrorso the circuit breaker is not tripped by bad payloadsverbose_logging/max_retries/retry_delay/max_retry_delay/max_open_retries/open_retry_max_delaytoQuickwitSinkConfig(all optional, forward-compatible)consume()log todebug!(upgraded toinfo!whenverbose_logging = true)Content-Type: application/x-ndjsonon ingest requestsdashmap/once_celldeps; addreqwest-middlewareTest plan
cargo clippy -p iggy_connector_quickwit_sink --all-targets -- -D warnings— cleancargo test -p iggy_connector_quickwit_sink— 5 tests passcargo test -p integration -- connectors::quickwit🤖 Generated with Claude Code