Runlet is a small JVM library for embeddable, batch-oriented stream processing pipelines.
It is for jobs that need more structure than hand-written loops or Flow, but
do not justify operating Flink, Kafka Streams, or Spark Streaming. Runlet runs
inside your process: no broker, no cluster, no daemon.
Runlet is pre-release. APIs, module names, and behavior may change before a stable release.
Current v0 scope:
- single JVM process
- one source, one linear pipeline, one sink
- chunked execution with
Chunk<T> map,filter, andevalMap- source factories for common chunked and cursor-paged sources
- bounded channels for uncheckpointed pipelines
- serial checkpointed execution for ordered, resumable sources
- file line source, file checkpoint store, and chunk-file sink
- blocking adapters for Java and other blocking JVM integrations
- Spring
SmartLifecycleadapter - Spring Boot starter and autoconfiguration
- optional Micrometer metrics integration for Spring Boot apps
Not implemented yet:
- windowing or
groupBy - event-time semantics or watermarks
- exactly-once semantics
- distributed execution
| Module | Purpose |
|---|---|
runlet-core |
Core API, DSL, runtime, and blocking adapters. |
runlet-connector-file |
File source, file checkpoint store, and chunk-file sink. |
runlet-connector-jackson |
Jackson-backed JSON Lines source and sink helpers. |
runlet-adapter-spring |
Spring Framework SmartLifecycle integration. |
runlet-spring-boot-autoconfigure |
Spring Boot autoconfiguration. |
runlet-spring-boot-starter |
Convenience dependency for Spring Boot applications. |
runlet-sample-spring-boot |
Runnable Spring Boot sample application. |
Runlet is not published to a remote Maven repository yet. For local use, publish the artifacts to your Maven local repository:
./gradlew check
./gradlew publishToMavenLocalThen add mavenLocal() and the modules you need:
repositories {
mavenLocal()
mavenCentral()
}
dependencies {
implementation("org.aetherlink:runlet-core:1.0-SNAPSHOT")
implementation("org.aetherlink:runlet-connector-file:1.0-SNAPSHOT")
implementation("org.aetherlink:runlet-connector-jackson:1.0-SNAPSHOT")
}For Spring Boot applications, prefer the starter:
dependencies {
implementation("org.aetherlink:runlet-spring-boot-starter:1.0-SNAPSHOT")
implementation("org.aetherlink:runlet-connector-file:1.0-SNAPSHOT")
implementation("org.aetherlink:runlet-connector-jackson:1.0-SNAPSHOT")
}A runnable Spring Boot sample lives in
runlet-sample-spring-boot.
This checkpointed file pipeline reads lines from a file, keeps completed records, transforms them, and writes replay-safe chunk files.
import kotlinx.coroutines.runBlocking
import org.aetherlink.runlet.connector.file.ChunkFileSink
import org.aetherlink.runlet.connector.file.FileCheckpointStore
import org.aetherlink.runlet.connector.file.FileSource
import org.aetherlink.runlet.dsl.Runlet
fun main() = runBlocking {
Runlet("orders") {
source(FileSource.lines("orders.txt", chunkSize = 1024))
.checkpoint(FileCheckpointStore("orders.ckpt"))
.filter { line -> line.contains("completed") }
.map { line -> line.uppercase() }
.sink(ChunkFileSink.lines("summaries"))
}.run()
}Checkpointed pipelines run one chunk at a time:
read -> transform -> write -> commit -> persist cursor
The checkpoint cursor only advances after the sink commit returns. If write()
or commit() fails, the checkpoint does not advance.
For checkpointable sources, .sink(...) is only available after
.checkpoint(...) has been called. The DSL enforces this with types rather than
a runtime capability check.
Runlet is useful for local or embedded jobs where the input has an ordered cursor and replay is acceptable. A common shape is:
source file/API export -> validate/filter -> transform -> durable sink
For example, a service can process a partner-provided JSON Lines export at startup or on a schedule:
import kotlinx.coroutines.runBlocking
import org.aetherlink.runlet.connector.file.FileCheckpointStore
import org.aetherlink.runlet.connector.jackson.JacksonChunkFileSink
import org.aetherlink.runlet.connector.jackson.JacksonFileSource
import org.aetherlink.runlet.dsl.Runlet
data class PartnerOrder(
val id: String,
val status: String,
val totalCents: Long,
)
data class OrderSummary(
val id: String,
val totalCents: Long,
)
fun main() = runBlocking {
Runlet("partner-orders") {
source(JacksonFileSource.jsonLines<PartnerOrder>("imports/orders.jsonl"))
.checkpoint(FileCheckpointStore("state/partner-orders.ckpt"))
.filter { order -> order.status == "completed" }
.map { order -> OrderSummary(order.id, order.totalCents) }
.sink(JacksonChunkFileSink.jsonLines("exports/order-summaries"))
}.run()
}Operationally, treat Runlet like an embedded job runner:
- Put checkpoint files on durable storage if resumability matters.
- Make checkpointed sinks replay-safe. A failed chunk may be written again
because Runlet advances the checkpoint only after
commit()succeeds. - Keep
chunkSizelarge enough to amortize overhead, but small enough that a replayed chunk is acceptable. - Use Spring Boot lifecycle integration for long-running application pipelines.
- Watch the
runletHealthIndicatorin Spring Boot apps; a failed pipeline is reported asDOWN. - Keep distributed coordination outside Runlet. If several app instances run the same pipeline against the same input/checkpoint, use an external lock or run only one active instance.
Strong production shapes for Runlet:
| Shape | Source | Sink | Checkpoint |
|---|---|---|---|
| Database backfill | Ordered primary-key scan | Table/index upsert | Last processed primary key |
| Object storage JSONL import | Object manifest or JSONL object | Database or clean output prefix | Object key plus byte offset |
| API cursor poller | Paginated API cursor | Durable storage | API cursor or page token |
| Search index backfill | Database or object storage | Elasticsearch/OpenSearch bulk index | Last primary key or object offset after successful bulk index |
These shapes describe where the model fits. Today, only file and Jackson JSONL connectors are implemented; database, object storage, API, and search connectors would live in separate optional modules.
Most application code should not implement SourceReader directly. Use the
highest-level API that fits:
| Need | Use |
|---|---|
| Built-in file or JSONL processing | FileSource, JacksonFileSource, and the matching sinks |
| App-specific non-checkpointed reads | Sources.records(...) or Sources.chunks(...) |
| App-specific resumable reads | CheckpointableSources.byLongCursor(...) or CheckpointableSources.chunks(...) |
| Reusable connector modules | Implement Source, CheckpointableSource, Sink, or CheckpointStore |
For example, a database backfill can be expressed as "read the next page after this cursor" without writing a custom reader class:
import org.aetherlink.runlet.api.CheckpointableSources
import org.aetherlink.runlet.connector.file.FileCheckpointStore
import org.aetherlink.runlet.dsl.Runlet
val source =
CheckpointableSources.byLongCursor(
chunkSize = 500,
read = { afterId, limit ->
orderDao.fetchOrdersAfter(id = afterId, limit = limit)
},
cursorOf = { order -> order.id },
)
Runlet("orders-search-backfill") {
source(source)
.checkpoint(FileCheckpointStore("state/orders-search-backfill.ckpt"))
.map(::toSearchDocument)
.sink(searchIndexSink)
}The low-level reader interfaces are still public because connector modules need them, but they are not the ergonomic starting point for application pipelines.
Spring Boot applications can register Runlet pipelines as beans. The starter
creates a shared coroutine scope, wraps each registration in a SmartLifecycle,
and starts/stops pipelines with the application context.
import org.aetherlink.runlet.adapter.spring.boot.RunletPipelineRegistration
import org.aetherlink.runlet.api.RunletRuntimeConfig
import org.aetherlink.runlet.connector.file.FileCheckpointStore
import org.aetherlink.runlet.dsl.Runlet
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class PipelineConfiguration {
@Bean
fun orderCheckpointStore(): FileCheckpointStore =
FileCheckpointStore("state/orders.ckpt")
@Bean
fun ordersPipeline(
runletRuntimeConfig: RunletRuntimeConfig,
orderCheckpointStore: FileCheckpointStore,
): RunletPipelineRegistration =
RunletPipelineRegistration("orders") {
Runlet("orders", config = runletRuntimeConfig) {
source(orderSource)
.checkpoint(orderCheckpointStore)
.map(::summarize)
.sink(orderSink)
}
}
}RunletRuntimeConfig is auto-configured by the starter from
runlet.runtime.*. Today it controls the bounded channel capacity used by
uncheckpointed pipelines. You can inject it into Runlet(...) as shown above or
construct your own config manually outside Spring Boot.
orderCheckpointStore is application-owned checkpoint storage. The example uses
FileCheckpointStore, which persists the last completed cursor to
state/orders.ckpt. In production, put that file on durable storage or provide
your own CheckpointStore backed by a database or object storage.
application.yml:
runlet:
enabled: true
threads: 4
shutdown-timeout: 30s
health:
enabled: true
metrics:
enabled: true
runtime:
channel-capacity: 4Connector-specific settings, such as FileSource.lines(..., chunkSize = 1024),
are still chosen when constructing that source.
When Spring Boot's health module is on the classpath, Runlet contributes a
runletHealthIndicator. It reports UP when registered pipelines have no
recorded failure and DOWN when one or more pipelines fail.
When Micrometer is on the classpath and a MeterRegistry bean exists, Runlet
also contributes a pipeline metrics observer. Actuator-enabled Spring Boot apps
typically provide that registry. Metrics can be disabled with
runlet.metrics.enabled=false.
Published meters include:
| Meter | Type | Tags | Meaning |
|---|---|---|---|
runlet.pipeline.starts |
counter | pipeline |
Pipeline run starts. |
runlet.pipeline.completions |
counter | pipeline |
Pipeline run completions. |
runlet.pipeline.failures |
counter | pipeline, exception |
Pipeline run failures. |
runlet.pipeline.chunks |
counter | pipeline |
Chunks committed after successful sink commit. |
runlet.pipeline.records |
counter | pipeline |
Records committed after successful sink commit. |
runlet.pipeline.running |
gauge | pipeline |
1 while a pipeline is running, otherwise 0. |
runlet.pipeline.last.success.epoch.seconds |
gauge | pipeline |
Last successful completion time. |
runlet.pipeline.last.failure.epoch.seconds |
gauge | pipeline |
Last failure time. |
Runlet moves records through a pipeline as chunks, not one record at a time. Stages still use ordinary per-record functions, but the runtime batches the plumbing around them.
Uncheckpointed pipelines run stages concurrently with bounded channels between the source, stages, and sink:
import org.aetherlink.runlet.api.RunletRuntimeConfig
Runlet(
name = "fast-path",
config = RunletRuntimeConfig(channelCapacity = 4),
) {
source(mySource)
.map(::normalize)
.evalMap(::enrich)
.sink(mySink)
}.run()Checkpointed pipelines intentionally stay serial in v0 because cursor advancement depends on sink durability.
The file connector supports serializer-agnostic JSON Lines by accepting decode/encode functions:
val source = FileSource.jsonLines(
path = "orders.jsonl",
decode = ::decodeOrder,
)
val sink = ChunkFileSink.jsonLines(
directory = "summaries",
encode = ::encodeSummary,
)For Jackson, add runlet-connector-jackson and use the Jackson factories:
import org.aetherlink.runlet.connector.jackson.JacksonChunkFileSink
import org.aetherlink.runlet.connector.jackson.JacksonFileSource
val source = JacksonFileSource.jsonLines<Order>("orders.jsonl")
val sink = JacksonChunkFileSink.jsonLines<OrderSummary>("summaries")Java and blocking JVM integrations can implement blocking interfaces and adapt them into Runlet's coroutine contracts:
import org.aetherlink.runlet.adapter.blocking.BlockingSink
import org.aetherlink.runlet.adapter.blocking.asSink
import org.aetherlink.runlet.api.Chunk
class ConsoleBlockingSink : BlockingSink<String> {
override fun write(chunk: Chunk<String>) {
chunk.records.forEach(::println)
}
}
val sink = ConsoleBlockingSink().asSink()Blocking adapter calls run on Dispatchers.IO.
Applications that use Spring Framework without Spring Boot can wrap a pipeline
as a SmartLifecycle bean:
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import org.aetherlink.runlet.adapter.spring.SpringPipelineLifecycle
import java.util.concurrent.Executors
val dispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
val scope = CoroutineScope(SupervisorJob() + dispatcher)
val lifecycle = SpringPipelineLifecycle(
pipeline = pipeline,
scope = scope,
onFailure = { failure -> logger.error("Runlet pipeline failed", failure) },
)Run the full verification suite:
./gradlew checkThis runs compilation, tests, and ktlint.
Useful tasks:
./gradlew test
./gradlew ktlintCheck
./gradlew ktlintFormat
./gradlew publishToMavenLocalIf you need event-time correctness, exactly-once distributed processing, or horizontal scale, use Flink, Kafka Streams, or Spark Streaming. Runlet is for small, local, embeddable JVM pipelines.