Skip to content

tjb/Runlet

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

30 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Runlet logo

Runlet

Runlet is a small JVM library for embeddable, batch-oriented stream processing pipelines.

It is for jobs that need more structure than hand-written loops or Flow, but do not justify operating Flink, Kafka Streams, or Spark Streaming. Runlet runs inside your process: no broker, no cluster, no daemon.

Status

Runlet is pre-release. APIs, module names, and behavior may change before a stable release.

Current v0 scope:

  • single JVM process
  • one source, one linear pipeline, one sink
  • chunked execution with Chunk<T>
  • map, filter, and evalMap
  • source factories for common chunked and cursor-paged sources
  • bounded channels for uncheckpointed pipelines
  • serial checkpointed execution for ordered, resumable sources
  • file line source, file checkpoint store, and chunk-file sink
  • blocking adapters for Java and other blocking JVM integrations
  • Spring SmartLifecycle adapter
  • Spring Boot starter and autoconfiguration
  • optional Micrometer metrics integration for Spring Boot apps

Not implemented yet:

  • windowing or groupBy
  • event-time semantics or watermarks
  • exactly-once semantics
  • distributed execution

Modules

Module Purpose
runlet-core Core API, DSL, runtime, and blocking adapters.
runlet-connector-file File source, file checkpoint store, and chunk-file sink.
runlet-connector-jackson Jackson-backed JSON Lines source and sink helpers.
runlet-adapter-spring Spring Framework SmartLifecycle integration.
runlet-spring-boot-autoconfigure Spring Boot autoconfiguration.
runlet-spring-boot-starter Convenience dependency for Spring Boot applications.
runlet-sample-spring-boot Runnable Spring Boot sample application.

Install

Runlet is not published to a remote Maven repository yet. For local use, publish the artifacts to your Maven local repository:

./gradlew check
./gradlew publishToMavenLocal

Then add mavenLocal() and the modules you need:

repositories {
    mavenLocal()
    mavenCentral()
}

dependencies {
    implementation("org.aetherlink:runlet-core:1.0-SNAPSHOT")
    implementation("org.aetherlink:runlet-connector-file:1.0-SNAPSHOT")
    implementation("org.aetherlink:runlet-connector-jackson:1.0-SNAPSHOT")
}

For Spring Boot applications, prefer the starter:

dependencies {
    implementation("org.aetherlink:runlet-spring-boot-starter:1.0-SNAPSHOT")
    implementation("org.aetherlink:runlet-connector-file:1.0-SNAPSHOT")
    implementation("org.aetherlink:runlet-connector-jackson:1.0-SNAPSHOT")
}

A runnable Spring Boot sample lives in runlet-sample-spring-boot.

Quick Start

This checkpointed file pipeline reads lines from a file, keeps completed records, transforms them, and writes replay-safe chunk files.

import kotlinx.coroutines.runBlocking
import org.aetherlink.runlet.connector.file.ChunkFileSink
import org.aetherlink.runlet.connector.file.FileCheckpointStore
import org.aetherlink.runlet.connector.file.FileSource
import org.aetherlink.runlet.dsl.Runlet

fun main() = runBlocking {
    Runlet("orders") {
        source(FileSource.lines("orders.txt", chunkSize = 1024))
            .checkpoint(FileCheckpointStore("orders.ckpt"))
            .filter { line -> line.contains("completed") }
            .map { line -> line.uppercase() }
            .sink(ChunkFileSink.lines("summaries"))
    }.run()
}

Checkpointed pipelines run one chunk at a time:

read -> transform -> write -> commit -> persist cursor

The checkpoint cursor only advances after the sink commit returns. If write() or commit() fails, the checkpoint does not advance.

For checkpointable sources, .sink(...) is only available after .checkpoint(...) has been called. The DSL enforces this with types rather than a runtime capability check.

Real-World Usage

Runlet is useful for local or embedded jobs where the input has an ordered cursor and replay is acceptable. A common shape is:

source file/API export -> validate/filter -> transform -> durable sink

For example, a service can process a partner-provided JSON Lines export at startup or on a schedule:

import kotlinx.coroutines.runBlocking
import org.aetherlink.runlet.connector.file.FileCheckpointStore
import org.aetherlink.runlet.connector.jackson.JacksonChunkFileSink
import org.aetherlink.runlet.connector.jackson.JacksonFileSource
import org.aetherlink.runlet.dsl.Runlet

data class PartnerOrder(
    val id: String,
    val status: String,
    val totalCents: Long,
)

data class OrderSummary(
    val id: String,
    val totalCents: Long,
)

fun main() = runBlocking {
    Runlet("partner-orders") {
        source(JacksonFileSource.jsonLines<PartnerOrder>("imports/orders.jsonl"))
            .checkpoint(FileCheckpointStore("state/partner-orders.ckpt"))
            .filter { order -> order.status == "completed" }
            .map { order -> OrderSummary(order.id, order.totalCents) }
            .sink(JacksonChunkFileSink.jsonLines("exports/order-summaries"))
    }.run()
}

Operationally, treat Runlet like an embedded job runner:

  • Put checkpoint files on durable storage if resumability matters.
  • Make checkpointed sinks replay-safe. A failed chunk may be written again because Runlet advances the checkpoint only after commit() succeeds.
  • Keep chunkSize large enough to amortize overhead, but small enough that a replayed chunk is acceptable.
  • Use Spring Boot lifecycle integration for long-running application pipelines.
  • Watch the runletHealthIndicator in Spring Boot apps; a failed pipeline is reported as DOWN.
  • Keep distributed coordination outside Runlet. If several app instances run the same pipeline against the same input/checkpoint, use an external lock or run only one active instance.

Strong production shapes for Runlet:

Shape Source Sink Checkpoint
Database backfill Ordered primary-key scan Table/index upsert Last processed primary key
Object storage JSONL import Object manifest or JSONL object Database or clean output prefix Object key plus byte offset
API cursor poller Paginated API cursor Durable storage API cursor or page token
Search index backfill Database or object storage Elasticsearch/OpenSearch bulk index Last primary key or object offset after successful bulk index

These shapes describe where the model fits. Today, only file and Jackson JSONL connectors are implemented; database, object storage, API, and search connectors would live in separate optional modules.

Custom Sources

Most application code should not implement SourceReader directly. Use the highest-level API that fits:

Need Use
Built-in file or JSONL processing FileSource, JacksonFileSource, and the matching sinks
App-specific non-checkpointed reads Sources.records(...) or Sources.chunks(...)
App-specific resumable reads CheckpointableSources.byLongCursor(...) or CheckpointableSources.chunks(...)
Reusable connector modules Implement Source, CheckpointableSource, Sink, or CheckpointStore

For example, a database backfill can be expressed as "read the next page after this cursor" without writing a custom reader class:

import org.aetherlink.runlet.api.CheckpointableSources
import org.aetherlink.runlet.connector.file.FileCheckpointStore
import org.aetherlink.runlet.dsl.Runlet

val source =
    CheckpointableSources.byLongCursor(
        chunkSize = 500,
        read = { afterId, limit ->
            orderDao.fetchOrdersAfter(id = afterId, limit = limit)
        },
        cursorOf = { order -> order.id },
    )

Runlet("orders-search-backfill") {
    source(source)
        .checkpoint(FileCheckpointStore("state/orders-search-backfill.ckpt"))
        .map(::toSearchDocument)
        .sink(searchIndexSink)
}

The low-level reader interfaces are still public because connector modules need them, but they are not the ergonomic starting point for application pipelines.

Spring Boot

Spring Boot applications can register Runlet pipelines as beans. The starter creates a shared coroutine scope, wraps each registration in a SmartLifecycle, and starts/stops pipelines with the application context.

import org.aetherlink.runlet.adapter.spring.boot.RunletPipelineRegistration
import org.aetherlink.runlet.api.RunletRuntimeConfig
import org.aetherlink.runlet.connector.file.FileCheckpointStore
import org.aetherlink.runlet.dsl.Runlet
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class PipelineConfiguration {
    @Bean
    fun orderCheckpointStore(): FileCheckpointStore =
        FileCheckpointStore("state/orders.ckpt")

    @Bean
    fun ordersPipeline(
        runletRuntimeConfig: RunletRuntimeConfig,
        orderCheckpointStore: FileCheckpointStore,
    ): RunletPipelineRegistration =
        RunletPipelineRegistration("orders") {
            Runlet("orders", config = runletRuntimeConfig) {
                source(orderSource)
                    .checkpoint(orderCheckpointStore)
                    .map(::summarize)
                    .sink(orderSink)
            }
        }
}

RunletRuntimeConfig is auto-configured by the starter from runlet.runtime.*. Today it controls the bounded channel capacity used by uncheckpointed pipelines. You can inject it into Runlet(...) as shown above or construct your own config manually outside Spring Boot.

orderCheckpointStore is application-owned checkpoint storage. The example uses FileCheckpointStore, which persists the last completed cursor to state/orders.ckpt. In production, put that file on durable storage or provide your own CheckpointStore backed by a database or object storage.

application.yml:

runlet:
  enabled: true
  threads: 4
  shutdown-timeout: 30s
  health:
    enabled: true
  metrics:
    enabled: true
  runtime:
    channel-capacity: 4

Connector-specific settings, such as FileSource.lines(..., chunkSize = 1024), are still chosen when constructing that source.

When Spring Boot's health module is on the classpath, Runlet contributes a runletHealthIndicator. It reports UP when registered pipelines have no recorded failure and DOWN when one or more pipelines fail.

When Micrometer is on the classpath and a MeterRegistry bean exists, Runlet also contributes a pipeline metrics observer. Actuator-enabled Spring Boot apps typically provide that registry. Metrics can be disabled with runlet.metrics.enabled=false.

Published meters include:

Meter Type Tags Meaning
runlet.pipeline.starts counter pipeline Pipeline run starts.
runlet.pipeline.completions counter pipeline Pipeline run completions.
runlet.pipeline.failures counter pipeline, exception Pipeline run failures.
runlet.pipeline.chunks counter pipeline Chunks committed after successful sink commit.
runlet.pipeline.records counter pipeline Records committed after successful sink commit.
runlet.pipeline.running gauge pipeline 1 while a pipeline is running, otherwise 0.
runlet.pipeline.last.success.epoch.seconds gauge pipeline Last successful completion time.
runlet.pipeline.last.failure.epoch.seconds gauge pipeline Last failure time.

Runtime Model

Runlet moves records through a pipeline as chunks, not one record at a time. Stages still use ordinary per-record functions, but the runtime batches the plumbing around them.

Uncheckpointed pipelines run stages concurrently with bounded channels between the source, stages, and sink:

import org.aetherlink.runlet.api.RunletRuntimeConfig

Runlet(
    name = "fast-path",
    config = RunletRuntimeConfig(channelCapacity = 4),
) {
    source(mySource)
        .map(::normalize)
        .evalMap(::enrich)
        .sink(mySink)
}.run()

Checkpointed pipelines intentionally stay serial in v0 because cursor advancement depends on sink durability.

JSON Lines

The file connector supports serializer-agnostic JSON Lines by accepting decode/encode functions:

val source = FileSource.jsonLines(
    path = "orders.jsonl",
    decode = ::decodeOrder,
)

val sink = ChunkFileSink.jsonLines(
    directory = "summaries",
    encode = ::encodeSummary,
)

For Jackson, add runlet-connector-jackson and use the Jackson factories:

import org.aetherlink.runlet.connector.jackson.JacksonChunkFileSink
import org.aetherlink.runlet.connector.jackson.JacksonFileSource

val source = JacksonFileSource.jsonLines<Order>("orders.jsonl")
val sink = JacksonChunkFileSink.jsonLines<OrderSummary>("summaries")

Blocking Adapters

Java and blocking JVM integrations can implement blocking interfaces and adapt them into Runlet's coroutine contracts:

import org.aetherlink.runlet.adapter.blocking.BlockingSink
import org.aetherlink.runlet.adapter.blocking.asSink
import org.aetherlink.runlet.api.Chunk

class ConsoleBlockingSink : BlockingSink<String> {
    override fun write(chunk: Chunk<String>) {
        chunk.records.forEach(::println)
    }
}

val sink = ConsoleBlockingSink().asSink()

Blocking adapter calls run on Dispatchers.IO.

Spring Framework

Applications that use Spring Framework without Spring Boot can wrap a pipeline as a SmartLifecycle bean:

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import org.aetherlink.runlet.adapter.spring.SpringPipelineLifecycle
import java.util.concurrent.Executors

val dispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
val scope = CoroutineScope(SupervisorJob() + dispatcher)

val lifecycle = SpringPipelineLifecycle(
    pipeline = pipeline,
    scope = scope,
    onFailure = { failure -> logger.error("Runlet pipeline failed", failure) },
)

Development

Run the full verification suite:

./gradlew check

This runs compilation, tests, and ktlint.

Useful tasks:

./gradlew test
./gradlew ktlintCheck
./gradlew ktlintFormat
./gradlew publishToMavenLocal

Design Notes

Non-Goals

If you need event-time correctness, exactly-once distributed processing, or horizontal scale, use Flink, Kafka Streams, or Spark Streaming. Runlet is for small, local, embeddable JVM pipelines.

About

Embeddable JVM pipelines with chunked processing, checkpointing, and Spring Boot integration.

Resources

License

Contributing

Security policy

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages