Skip to content

feat(connectors): add OTLP gRPC source connector#3516

Open
mfyuce wants to merge 8 commits into
apache:masterfrom
mfyuce:feat/otlp-source-connector
Open

feat(connectors): add OTLP gRPC source connector#3516
mfyuce wants to merge 8 commits into
apache:masterfrom
mfyuce:feat/otlp-source-connector

Conversation

@mfyuce

@mfyuce mfyuce commented Jun 20, 2026

Copy link
Copy Markdown

Summary

  • Adds iggy_connector_otlp_source, a cdylib connector plugin that binds port 4317 (OTLP standard) and receives logs, metrics, and traces from any OpenTelemetry SDK or Collector.
  • Each incoming gRPC export request is deserialized from the opentelemetry-proto wire format (already a transitive dep of opentelemetry-otlp) and serialized as JSON into the connector channel — no build-time proto compilation required.
  • All three OTLP services (LogsService, MetricsService, TraceService) share one listener, with gzip compression enabled on every service (OTel SDKs and the Collector compress payloads by default).
  • Follows the pull-model Source trait: poll() blocks on the first message then drains up to batch_size additional messages via try_recv() before returning.

Design notes

Why opentelemetry-proto instead of tonic-build?
The opentelemetry-proto crate already generates the tonic stubs and ships them as a library. Using it avoids a build.rs / protoc dependency and keeps the build hermetic.

Why a single gRPC listener for all three signals?
Standard OTLP deployments use one endpoint for all telemetry. Separating them would require operators to configure three addresses, matching no existing tooling convention.

JSON schema (documented in README.md):
Each message carries a top-level "signal" field ("log", "metric", or "trace") plus signal-specific fields derived from the OTLP proto structs.

Configuration

[plugin_config]
listen_addr = "0.0.0.0:4317"
channel_capacity = 50000
batch_size = 1000

Point any OTel SDK or Collector at grpc://host:4317.

Test plan

  • cargo clippy -p iggy_connector_otlp_source --all-features --all-targets -- -D warnings — clean
  • cargo test -p iggy_connector_otlp_source — passes
  • License headers present on all new .rs and Cargo.toml files
  • Taplo and markdownlint pass on new TOML/Markdown files
  • Deployed and validated against a live OTel Collector in a Kubernetes environment: logs, metrics, and traces all fan-out correctly into Iggy topics

🤖 Generated with Claude Code

@github-actions

Copy link
Copy Markdown

Thanks for the PR. It is labeled S-waiting-on-review and queued for review.

Slash commands (own line, regular comment) move it around the queue:

  • /ready - back to S-waiting-on-review after addressing feedback
  • /author - flip to S-waiting-on-author while you finish changes
  • /request-review @user-or-team - request a reviewer

See CONTRIBUTING.md for details.

@github-actions github-actions Bot added the S-waiting-on-review PR is waiting on a reviewer label Jun 20, 2026
@mfyuce mfyuce force-pushed the feat/otlp-source-connector branch from d6d1751 to 133d6e6 Compare June 20, 2026 20:00
mfyuce added a commit to mfyuce/iggy that referenced this pull request Jun 20, 2026
mfyuce added a commit to mfyuce/iggy that referenced this pull request Jun 21, 2026
Mark COOP_TASKRUN PR apache#3517 as submitted; clear TOBEDECIDED.md.
Both apache#3516 and apache#3517 are now S-waiting-on-review on apache/iggy.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01QBxbPbdKXzoMdvLBeugNBX
mfyuce added a commit to mfyuce/iggy that referenced this pull request Jun 21, 2026
- AGENTS.md: 104→75 lines. Removed redundant repo structure (derivable
  by ls), collapsed principles to iggy-specific rules only, merged
  Jenkins/QW infra into Infra section, updated handover block.
- TODO.md: replaced stale checked items with 4 open PRs (apache#3516 apache#3517
  apache#3523 apache#3525) + QW 0.9 upgrade task.
- DONE.md: added sessions 5-10 block (QW sink pipeline, collector
  cutover, InvalidOffset bug + fix).
- quickwit_sink/src/lib.rs: cargo fmt reformatting only.

let (tx, rx) = mpsc::channel(self.config.channel_capacity);
let (shutdown_tx, shutdown_rx) = oneshot::channel();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bind race: tokio::spawn(server::run_grpc_server(addr, tx, shutdown_rx)) at line 80, then Ok(()) returned at line 90 without any signal from the server that bind succeeded. If bind fails (EADDRINUSE, EPERM), server task exits, tx dropped, rx.recv() at line 99 returns None. poll() returns Ok(empty) (line 102-106). SDK loop serializes empty batch, runtime source_forwarding_loop sends 0 Iggy messages, loops forever at CPU speed. Connector status stays Running with no data and no error. Fix: bind TcpListener synchronously inside open() before tokio::spawn, pass bound listener to server task.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in ae6cf6df5: TcpIncoming is now created synchronously inside open() before spawning the task -- bind failure surfaces immediately as an Err from open(), not silently after the task is running.

request: Request<ExportLogsServiceRequest>,
) -> Result<Response<ExportLogsServiceResponse>, Status> {
let messages = convert::export_logs_to_messages(request.into_inner());
send_messages(&self.tx, messages, "logs").await;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happening in multiple places for each of the OTLP messages - Logs, traces and metrics. But commenting in place only with line numbers of other places.
97-99, 111-113, 125-127, 137-138 — Channel-full → false SUCCESS: try_send at line 137 fails, message dropped, warn! at line 138 fires. All three service impls return Ok(Response::new(...{ partial_success: None })) — lines 97-99 (logs), 111-113 (metrics), 125-127 (traces). OTel SDK sees full acceptance, does not retry, does not back off. Data permanently lost with no client-visible signal. OTLP gRPC spec requires partial_success.rejected_log_records / rejected_spans / rejected_data_points to be nonzero when export is rejected. Fix: count dropped messages in send_messages, return count, populate
partial_success rejection field in each service impl.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 3e3b053b7: each service impl now populates the OTLP rejection field (rejected_log_records, rejected_data_points, rejected_spans) when the drop count is nonzero. The try_send approach with partial_success reporting was kept over the RESOURCE_EXHAUSTED alternative to avoid holding the gRPC connection open under sustained load (details in the PR discussion).

@mfyuce

mfyuce commented Jun 21, 2026

Copy link
Copy Markdown
Author

Good catch @ryerraguntla — fixed in the latest push.

Root cause: tokio::spawn(run_grpc_server(addr, ...)) returned immediately; if bind failed, the server task exited silently, tx was dropped, and poll() returned Ok(empty) forever at CPU speed with connector status stuck at Running.

Fix: TcpIncoming::bind(addr) is called synchronously in open() before tokio::spawn. Any bind failure (EADDRINUSE, EPERM, …) now propagates as Error::InitError from open(), which surfaces in the connector lifecycle and stops the source cleanly. run_grpc_server now takes the pre-bound TcpIncoming and uses serve_with_incoming_shutdown.

/ready

let mut rx_guard = self.rx.lock().await;
let rx = rx_guard.as_mut().ok_or_else(|| {
Error::InitError("OTLP source connector is not initialized".to_string())
})?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Channel None returns Ok(empty) not Err: When gRPC server task dies (any reason), tx dropped, rx.recv().await at line 99 returns None. Lines 101-107 return Ok(ProducedMessages { messages: vec![], ... }). SDK loop treats this as success, busy-loops sending empty batches forever. Connector never signals error. Fix: None branch → Err(Error::Connection("gRPC server terminated
unexpectedly".into())).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 3e3b053b7: when rx.recv().await returns None (channel closed because the server task exited), poll() now returns Err(Error::ResourceBusy(...)) so the runtime knows the source is broken rather than silently producing empty results.

}

#[async_trait]
impl Source for OtlpSource {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double open() leaks server task: Second call to open() overwrites Mutex<Option<...>> at lines 82-84. Old rx dropped → old tx all-receivers-gone on next send. Old server_task JoinHandle dropped without abort → task keeps running on same addr. Second server's bind fails with EADDRINUSE → bind-race applies → permanent silent failure. Fix: at start of open(), check self.rx.lock().await.is_some(), return Err(Error::InitError("already open".into())) or call close() first.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 5494b99f4: open() now returns Err(Error::InvalidState) immediately if rx is already Some (already-open guard). close() takes the handle out of the mutex before calling abort() to release the lock, then awaits the task to drain it, and finally clears rx so a subsequent open() starts clean.

}
}

pub fn bytes_to_hex(bytes: &[u8]) -> String {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bytes_to_hex 16 allocs per call: line 292 bytes.iter().map(|b| format!("{b:02x}")).collect() allocates one String per byte. For 16-byte trace_id: 16 heap allocs per call. Called 3x per span (lines 120-122: trace_id, span_id, parent_span_id) = 48 allocs per span. Called 2x per log record (lines 49-50). Fix: let mut s = String::with_capacity(bytes.len() * 2); for b in bytes { let _ = write!(s, "{b:02x}"); } s — one alloc.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 5494b99f4: bytes_to_hex now pre-allocates String::with_capacity(bytes.len() * 2) and writes into it with write! -- one allocation per call regardless of input length.

@mfyuce

mfyuce commented Jun 21, 2026

Copy link
Copy Markdown
Author

Both issues addressed in this push:

Channel-full → false success (server.rs): send_messages now returns the drop count. Each service impl populates the OTLP-spec rejection field when nonzero — rejected_log_records, rejected_data_points, rejected_spans — so the OTel SDK/Collector sees a partial success and can retry or alert accordingly.

rx.recv() == None → busy-loop (lib.rs): The None branch now returns Err(Error::Connection("OTLP gRPC server terminated unexpectedly")) instead of Ok(empty), so the runtime propagates the failure and stops the connector cleanly instead of spinning.

/ready

"trace_id": bytes_to_hex(&record.trace_id),
"span_id": bytes_to_hex(&record.span_id),
"service_name": service_name,
"resource": resource_attrs.clone(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both at 52 and 130 line numbers - resource_attrs cloned per record/span: line 52 "resource": resource_attrs.clone() inside the innermost loop over log records; line 130 same pattern for spans. resource_attrs is
Map<String, Value> = BTreeMap. For batch of 1000 log records sharing one resource with 20 attrs: 1000 BTreeMap clones. Fix: serialize resource_attrs to Value once before inner loop, clone pre-built Value per
record.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 73f6f825e: resource_attrs is now extracted once before the scope/record/span loop and moved into Value::Object(resource_attrs) before the inner loop begins. The inner loop clones the Value (not the raw map), which is a pointer-sized clone for small objects. Metrics path already passed resource_attrs by reference and was not affected.

}
Some(any_value::Value::KvlistValue(kvlist)) => Value::Object(extract_attrs(&kvlist.values)),
Some(any_value::Value::BytesValue(bytes)) => Value::String(bytes_to_hex(bytes)),
Some(any_value::Value::StringValueStrindex(_)) | None => Value::Null,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringValueStrindex → Value::Null silently: Some(any_value::Value::StringValueStrindex(_)) | None => Value::Null at line 284 — unrecognized oneof variant silently becomes JSON null.
Attribute data discarded with no log, no metric. Fix: add tracing::warn! on this arm logging the attribute key so operators know data was dropped.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 73f6f825e: the StringValueStrindex arm now emits warn!(key = %kv.key, "dropping attribute with unrecognized AnyValue variant") before returning Value::Null, so unknown proto extension variants are visible in logs rather than silently discarded.

@ryerraguntla

ryerraguntla commented Jun 21, 2026

Copy link
Copy Markdown
Contributor

@mfyuce Today spent some time knowing (not learning yet :) about OTLP. This is a very good use case for pumping data into iggy and iggy fans out to appropriate dashboards/analytics. initial couple of them are critical. There are few more I need look at , which I will do later tonight or tomorrow. One can wait for iter 2 review or start fixing these.

@mfyuce

mfyuce commented Jun 21, 2026

Copy link
Copy Markdown
Author

Both addressed in this push:

resource_attrs cloned per record/spanresource_attrs: Map<String, Value> is now moved into Value::Object(resource_attrs) once before the scope loops; the inner loop clones the Value instead of the raw Map. The metrics path already passes resource_attrs by reference to metric_to_data_points, so it was not affected.

StringValueStrindex silent dropextract_attrs now matches StringValueStrindex explicitly before calling any_value_to_json, which gives access to kv.key for the warn!. The any_value_to_json return path for StringValueStrindex (and recursive KvlistValue / ArrayValue nesting) still returns Value::Null; only the top-level attribute key context is logged here.

/ready

@ryerraguntla

Copy link
Copy Markdown
Contributor

is it an agent at work? :)

@mfyuce

mfyuce commented Jun 21, 2026

Copy link
Copy Markdown
Author

is it an agent at work? :)

Yes, Claude Code as a pair programmer :); it proposes the fixes, I review and understand each one before it goes in. Your feedback has been the real driver here; the bind race, partial_success, and channel-close issues were genuine bugs I wouldn't have caught this fast on my own. Looking forward to the rest of the review.

async fn close(&mut self) -> Result<(), Error> {
if let Some(tx) = self.shutdown_tx.lock().await.take() {
let _ = tx.send(());
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task.abort() without .await: abort() at line 144 schedules cancellation but returns immediately. close() returns Ok(()) while gRPC server task may still hold the TCP port. A subsequent
open() call (restart) hits EADDRINUSE → combines with double-open finding, restart permanently broken. Fix: task.abort(); let _ = task.await; (ignore JoinError::Cancelled).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cfd47d1d2: close() now takes the handle out of the mutex first (releases the lock), calls abort(), then let _ = task.await to drain the cancelled task before returning. rx is also cleared so a subsequent open() after restart passes the already-open guard.

}))
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log flood + misleading async: try_send failure at line 144 logs warn! per dropped message. Batch of 1000 saturated-channel messages → 1000 warn! calls per export. Additionally async fn send_messages contains no .await — misleading signature, function never yields. Fix: make it fn send_messages (non-async); count drops in loop, single warn!("dropped {dropped}/{total} {signal} messages") after loop.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cfd47d1d2: send_messages is now a plain fn (no .await). Drops are counted silently in the loop; a single warn!("dropped {dropped}/{total} {signal} messages") fires once after the loop.

# under the License.

[package]
name = "iggy_connector_otlp_source"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please cross check this comment - version = "0.1.0" diverges from workspace pattern 0.4.1-edge.1. Version scripts (scripts/extract-version.sh, sync-rustc-version.sh) key off workspace-aligned versions; this crate will be skipped or produce wrong tags. Fix: version = "0.4.1-edge.1".

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cfd47d1d2: bumped to 0.4.1-edge.1 to align with the workspace connector pattern.

async-trait = { workspace = true }
dashmap = { workspace = true }
iggy_connector_sdk = { workspace = true }
once_cell = { workspace = true }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opentelemetry-proto = { version = "0.32.0", ... } not in [workspace.dependencies]. Workspace carries this transitively via opentelemetry-otlp (workspace line 218). Direct pin
unmanaged by workspace tooling; future OTel family bumps silently diverge this pin. scripts/ci/third-party-licenses.sh depends on workspace dep resolution; out-of-workspace pin can corrupt generated manifest.
Fix: move to [workspace.dependencies], reference { workspace = true }.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cfd47d1d2: opentelemetry-proto added to [workspace.dependencies] with default-features = false. The crate now references it via { workspace = true, features = [...] }.

use tokio::task::JoinHandle;
use tonic::transport::server::TcpIncoming;
use tracing::info;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub mod convert; pub mod server; should be pub(crate): Plugin .so loaded via FFI; only intended public surface is source_connector!-generated symbols. All other source connectors use private internal modules. Fix: pub(crate) mod convert; pub(crate) mod server. Use #[cfg(test)] pub use for test-specific exposure if needed.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cfd47d1d2: both convert and server narrowed to pub(crate). Only the FFI symbols emitted by source_connector! remain public.

match serde_json::to_vec(&doc) {
Ok(payload) => messages.push(ProducedMessage {
id: None,
checksum: None,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zero time_unix_nano treated as valid timestamp: timestamp: Some(record.time_unix_nano) at line 62. Proto scalar default for unset timestamp is 0. Code emits timestamp: Some(0) = Unix
epoch 1970-01-01T00:00:00Z instead of "unset". Corrupts time-based queries. Fix: timestamp: if record.time_unix_nano == 0 { None } else { Some(record.time_unix_nano) }. Same applies to line 140
(span.start_time_unix_nano).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in cfd47d1d2: both log records and spans now use .then_some() so proto default 0 maps to None instead of Some(0). Same guard applied to span.start_time_unix_nano.

@mfyuce

mfyuce commented Jun 22, 2026

Copy link
Copy Markdown
Author

All six addressed in this push:

task.abort() without await: close() now takes the handle out of the mutex first (to release the lock), calls abort(), then let _ = task.await to drain the cancelled task before returning. rx is also cleared so a subsequent open() after restart passes the already-open guard.

Log flood / misleading async: send_messages is now a plain fn (no .await inside). Drops are counted silently in the loop; a single warn!("dropped {dropped}/{total} {signal} messages") fires once after the loop.

Version mismatch: bumped to 0.4.1-edge.1 to match workspace connector pattern.

opentelemetry-proto not in workspace: added to [workspace.dependencies] with default-features = false; crate now uses { workspace = true, features = [...] }.

pub mod visibility: convert and server narrowed to pub(crate); only the FFI symbols from source_connector! remain public.

Zero time_unix_nano as epoch: both log records and spans now use .then_some() so proto default 0 maps to None instead of Some(0).

/ready

@mfyuce

mfyuce commented Jun 22, 2026

Copy link
Copy Markdown
Author

The partial_success: None concern and the log-flood / misleading-async comment were both addressed in the push from earlier today (commit cfd47d1):

  • send_messages is now a plain fn (no .await), counts drops silently in the loop, and fires a single warn!("dropped {dropped}/{total} {signal} messages") after the loop.
  • Each service impl now populates the OTLP-spec rejection field (rejected_log_records, rejected_data_points, rejected_spans) when the drop count is nonzero.

On the try_send vs send().await + timeout + RESOURCE_EXHAUSTED question: both are valid. The current approach uses non-blocking try_send and reports drops via partial_success, which keeps the gRPC handler non-blocking and is spec-compliant. The RESOURCE_EXHAUSTED path forces the client to back off and retry, which avoids data loss but holds the gRPC connection open for up to the timeout duration and can cause connection pile-up under sustained load. Happy to switch if you prefer the stronger backpressure guarantee.

/ready

@mfyuce

mfyuce commented Jun 22, 2026

Copy link
Copy Markdown
Author

Added proto storage format in the latest push.

New config field: format = "proto" on OtlpSourceConfig (default: "json", fully backward-compatible).

In proto mode the entire Export*ServiceRequest is prost-encoded as a single raw message per gRPC call instead of fanning out to one JSON document per signal item. Combined with otlp_sink's existing format = "proto" this cuts storage by ~4-5x (JSON field-name overhead eliminated, one message per batch instead of one per span/data-point/record).

prost added as a workspace dep since opentelemetry-proto re-exports the generated types but not the Message encode trait.

mfyuce added a commit to mfyuce/iggy that referenced this pull request Jun 22, 2026
- AGENTS.md: updated READY FOR HANDOVER (5 PRs, segment cleaner note,
  connectors list). Added segment cleaner + connectors to infra quick-ref.
- TODO.md: added apache#3529, reviewer action items for apache#3525 and apache#3516,
  segment cleaner task, TBD investigations.
- DONE.md: added sessions 12-13 block (per-partition DashMap, pre-arm,
  last(), otlp_sink, proto format).
- TOBEDECIDED.md: documented TCP first() bug and otlp_source backpressure.
mfyuce added a commit to mfyuce/iggy that referenced this pull request Jun 22, 2026
- Dockerfile: add Apache license header, drop otlp_source build (belongs
  to PR apache#3516), remove EXPOSE 4317 (sink dials out, does not listen)
- Cargo.toml: move otlp_sink to correct alphabetical position in workspace
  members; add opentelemetry-proto to workspace.dependencies
- otlp_sink/Cargo.toml: bump version to 0.4.1-edge.1; use workspace
  opentelemetry-proto; drop unused simd-json dependency
- lib.rs: use owned_value_to_serde_json instead of simd_json::to_string +
  serde_json::from_str (one allocation instead of two tree walks); change
  gRPC export errors from HttpRequestFailed to CannotStoreData; change
  proto encode errors on HTTP path to WriteFailure; export() now returns
  u64 so messages_sent counts only messages actually forwarded, not empty
  batches where all messages failed to decode
- from_json.rs: fix b.to_string() double-encoding of log body (use
  json_to_any_value instead of StringValue(b.to_string())); make
  severity_from_text case-insensitive; fix hex_to_bytes to return empty
  vec on odd-length input instead of silently dropping last nibble;
  replace magic number 2 with AggregationTemporality::Cumulative; default
  is_monotonic to true and read from JSON when present; hoist ArrayValue
  and KeyValueList imports to module level; add tests for new behaviour
mfyuce added a commit to mfyuce/iggy that referenced this pull request Jun 23, 2026
mfyuce and others added 8 commits June 23, 2026 16:23
Adds iggy_connector_otlp_source, a cdylib connector plugin that binds
port 4317 (OTLP standard) and receives logs, metrics, and traces from
any OpenTelemetry SDK or Collector.

Each incoming gRPC export call is deserialized via the opentelemetry-proto
crate (already a transitive dep of opentelemetry-otlp) and serialized as
JSON messages into the connector channel, eliminating any build-time proto
compilation step. All three OTLP services (LogsService, MetricsService,
TraceService) are served on the same listener, with gzip compression enabled
on every service (OTel SDKs and the Collector compress payloads by default).

The connector follows the pull-model Source trait: poll() blocks on the
first message, then drains up to batch_size additional messages via
try_recv() before returning. A tokio oneshot channel carries the shutdown
signal from close() to the gRPC server task.
Spawn raced with bind: if EADDRINUSE/EPERM, the server task exited
silently, tx was dropped, and poll() returned Ok(empty) forever at CPU
speed with the connector status stuck at Running.

Bind with TcpIncoming::bind() before tokio::spawn so any port error
surfaces as InitError in open() instead of a silent busy-loop.
Switch run_grpc_server to serve_with_incoming_shutdown.
… closed channel

Two correctness issues:

1. Channel-full drops were silent successes: try_send failure dropped the
   message but each service impl returned partial_success: None, so the
   OTel SDK saw full acceptance and did not retry. Now send_messages
   returns the drop count and each impl populates the OTLP-spec
   rejected_log_records / rejected_data_points / rejected_spans field.

2. rx.recv() == None returned Ok(empty), causing a CPU-speed busy-loop
   when the gRPC server task exited for any reason (panic, OS signal).
   Now returns Err(Error::Connection(...)) so the runtime propagates the
   failure and stops the connector.
…to_hex

Double open() overwrote Mutex fields without aborting the old server task,
leaving a dangling task on the same port (EADDRINUSE on restart). Added an
is_some() guard at the top of open() that returns InitError immediately.

bytes_to_hex allocated one String per byte via format!("{b:02x}") — 16
heap allocs per trace_id/span_id call, 48 per span. Replaced with a
single String::with_capacity(len*2) + write! loop.
…n AnyValue

resource_attrs (Map<String, Value>) was cloned inside the innermost loop
over log records and spans — 1000 Map clones per batch for a shared
resource. Convert to Value::Object once before the scope loops; the inner
loop clones the Value instead.

extract_attrs now matches StringValueStrindex before calling
any_value_to_json, so the attribute key is included in the warn! when an
unrecognized AnyValue variant silently drops data.
…e dep, visibility, zero timestamp

close() now awaits the aborted task so the TCP port is released before
returning; rx is also cleared to allow a subsequent open() after restart.

send_messages is non-async (it has no .await); drops are counted and
emitted as a single warn! instead of one per message.

Version aligned to workspace pattern 0.4.1-edge.1.

opentelemetry-proto added to workspace deps; crate references it via
workspace = true.

convert and server modules narrowed to pub(crate); only FFI symbols
generated by source_connector! remain part of the public surface.

Proto scalar default 0 for time_unix_nano / start_time_unix_nano is now
treated as unset and emits None instead of Some(0) / Unix epoch.
Adds `format = "proto"` (default: json) to OtlpSourceConfig. In proto
mode the entire Export*ServiceRequest is prost-encoded as a single raw
message instead of fanning out to per-item JSON documents. Paired with
otlp_sink's proto mode this yields 4-5x storage reduction in Iggy
with zero application-level parsing overhead on the sink side.

The default remains "json" so existing deployments are unaffected.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
`iggy_connector_otlp_source` was not being built or shipped in the
runtime image. Add it to the build stage and the runtime COPY step.
@mfyuce mfyuce force-pushed the feat/otlp-source-connector branch from d9eda8c to c54eb7c Compare June 23, 2026 13:24
mfyuce added a commit to mfyuce/iggy that referenced this pull request Jun 23, 2026
- Dockerfile: add Apache license header, drop otlp_source build (belongs
  to PR apache#3516), remove EXPOSE 4317 (sink dials out, does not listen)
- Cargo.toml: move otlp_sink to correct alphabetical position in workspace
  members; add opentelemetry-proto to workspace.dependencies
- otlp_sink/Cargo.toml: bump version to 0.4.1-edge.1; use workspace
  opentelemetry-proto; drop unused simd-json dependency
- lib.rs: use owned_value_to_serde_json instead of simd_json::to_string +
  serde_json::from_str (one allocation instead of two tree walks); change
  gRPC export errors from HttpRequestFailed to CannotStoreData; change
  proto encode errors on HTTP path to WriteFailure; export() now returns
  u64 so messages_sent counts only messages actually forwarded, not empty
  batches where all messages failed to decode
- from_json.rs: fix b.to_string() double-encoding of log body (use
  json_to_any_value instead of StringValue(b.to_string())); make
  severity_from_text case-insensitive; fix hex_to_bytes to return empty
  vec on odd-length input instead of silently dropping last nibble;
  replace magic number 2 with AggregationTemporality::Cumulative; default
  is_monotonic to true and read from JSON when present; hoist ArrayValue
  and KeyValueList imports to module level; add tests for new behaviour
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

S-waiting-on-review PR is waiting on a reviewer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants