diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/logcache/v1/ReactorLogCacheEndpoints.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/logcache/v1/ReactorLogCacheEndpoints.java index 2e68c52538..6df4093ed5 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/logcache/v1/ReactorLogCacheEndpoints.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/logcache/v1/ReactorLogCacheEndpoints.java @@ -16,15 +16,23 @@ package org.cloudfoundry.reactor.logcache.v1; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import org.cloudfoundry.logcache.v1.Envelope; +import org.cloudfoundry.logcache.v1.EnvelopeType; import org.cloudfoundry.logcache.v1.InfoRequest; import org.cloudfoundry.logcache.v1.InfoResponse; import org.cloudfoundry.logcache.v1.MetaRequest; import org.cloudfoundry.logcache.v1.MetaResponse; import org.cloudfoundry.logcache.v1.ReadRequest; import org.cloudfoundry.logcache.v1.ReadResponse; +import org.cloudfoundry.logcache.v1.TailLogsRequest; import org.cloudfoundry.reactor.ConnectionContext; import org.cloudfoundry.reactor.TokenProvider; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; final class ReactorLogCacheEndpoints extends AbstractLogCacheOperations { @@ -48,4 +56,111 @@ Mono meta(MetaRequest request) { Mono read(ReadRequest request) { return get(request, ReadResponse.class, "read", request.getSourceId()).checkpoint(); } + + Mono recentLogs(ReadRequest request) { + return read(request); + } + + /** + * Continuously polls Log Cache and emits new {@link Envelope}s as they arrive. + * + *

Mirrors the Go {@code logcache.Walk()} / {@code cf tail --follow} semantics: + *

    + *
  1. Start the cursor at {@code startTime} (defaults to now − 5 s in + * nanoseconds).
  2. + *
  3. Issue {@code GET /api/v1/read/{sourceId}?start_time=cursor}.
  4. + *
  5. Emit every returned envelope in ascending timestamp order and advance + * the cursor to {@code lastTimestamp + 1}.
  6. + *
  7. When the batch is empty, wait {@code pollInterval} before the next poll.
  8. + *
  9. Repeat forever – the caller cancels the subscription to stop.
  10. + *
+ * Fully non-blocking: no {@code Thread.sleep}. + */ + Flux logsTail(TailLogsRequest request) { + long defaultStartNanos = (System.currentTimeMillis() - 5_000L) * 1_000_000L; + AtomicLong cursor = + new AtomicLong( + request.getStartTime() != null + ? request.getStartTime() + : defaultStartNanos); + + List envelopeTypes = + request.getEnvelopeTypes() != null + ? request.getEnvelopeTypes() + : Collections.emptyList(); + String nameFilter = request.getNameFilter(); + + /* + * Strategy (mirrors Go's logcache.Walk): + * – Mono.defer builds a fresh ReadRequest from the mutable cursor on every repetition. + * – The Mono returns either the sorted batch (non-empty) or an empty list. + * – flatMapMany turns each batch into a stream of individual Envelope items. + * – repeat() subscribes again after each completion. + * – When the batch was empty we insert a delay via Mono.delay before the next + * repetition so we do not hammer the server. We signal "empty" by returning + * a sentinel Mono (false = was empty, true = had data) and use + * repeatWhen to conditionally delay. + */ + return Flux.defer( + () -> { + // Build the read request from the current cursor position. + ReadRequest.Builder builder = + ReadRequest.builder() + .sourceId(request.getSourceId()) + .startTime(cursor.get()); + if (!envelopeTypes.isEmpty()) { + builder.envelopeTypes(envelopeTypes); + } + if (nameFilter != null && !nameFilter.isEmpty()) { + builder.nameFilter(nameFilter); + } + + return read(builder.build()) + .onErrorReturn(ReadResponse.builder().build()) + .flatMapMany( + resp -> { + List raw = + resp.getEnvelopes() != null + ? resp.getEnvelopes().getBatch() + : Collections.emptyList(); + + if (raw.isEmpty()) { + // Signal "no data" so repeatWhen can insert the + // back-off delay. + return Flux.empty(); + } + + // Sort ascending by timestamp and advance the + // cursor. + List sorted = new ArrayList<>(raw); + sorted.sort( + (a, b) -> + Long.compare( + a.getTimestamp() != null + ? a.getTimestamp() + : 0L, + b.getTimestamp() != null + ? b.getTimestamp() + : 0L)); + + Envelope last = sorted.get(sorted.size() - 1); + cursor.set( + (last.getTimestamp() != null + ? last.getTimestamp() + : cursor.get()) + + 1); + + return Flux.fromIterable(sorted); + }); + }) + // repeatWhen receives a Flux where each element is the count of items + // emitted in the previous cycle (0 = empty batch → insert delay). + .repeatWhen( + companion -> + companion.flatMap( + count -> + count == 0 + ? Mono.delay(request.getPollInterval()) + : Mono.just(count))); + } } diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/logcache/v1/_ReactorLogCacheClient.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/logcache/v1/_ReactorLogCacheClient.java index d9460476ea..0236da2188 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/logcache/v1/_ReactorLogCacheClient.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/logcache/v1/_ReactorLogCacheClient.java @@ -16,6 +16,7 @@ package org.cloudfoundry.reactor.logcache.v1; +import org.cloudfoundry.logcache.v1.Envelope; import org.cloudfoundry.logcache.v1.InfoRequest; import org.cloudfoundry.logcache.v1.InfoResponse; import org.cloudfoundry.logcache.v1.LogCacheClient; @@ -23,9 +24,11 @@ import org.cloudfoundry.logcache.v1.MetaResponse; import org.cloudfoundry.logcache.v1.ReadRequest; import org.cloudfoundry.logcache.v1.ReadResponse; +import org.cloudfoundry.logcache.v1.TailLogsRequest; import org.cloudfoundry.reactor.ConnectionContext; import org.cloudfoundry.reactor.TokenProvider; import org.immutables.value.Value; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.net.URI; @@ -53,6 +56,16 @@ public Mono read(ReadRequest request) { return getReactorLogCacheEndpoints().read(request); } + @Override + public Mono recentLogs(ReadRequest request) { + return getReactorLogCacheEndpoints().recentLogs(request); + } + + @Override + public Flux logsTail(TailLogsRequest request) { + return getReactorLogCacheEndpoints().logsTail(request); + } + /** * The connection context */ diff --git a/cloudfoundry-client/src/main/java/org/cloudfoundry/logcache/v1/LogCacheClient.java b/cloudfoundry-client/src/main/java/org/cloudfoundry/logcache/v1/LogCacheClient.java index e455db220a..6b91461fe3 100644 --- a/cloudfoundry-client/src/main/java/org/cloudfoundry/logcache/v1/LogCacheClient.java +++ b/cloudfoundry-client/src/main/java/org/cloudfoundry/logcache/v1/LogCacheClient.java @@ -16,6 +16,7 @@ package org.cloudfoundry.logcache.v1; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -46,4 +47,25 @@ public interface LogCacheClient { * @return the read response */ Mono read(ReadRequest request); + + /** + * Makes the Log Cache RecentLogs /api/v1/read request + * + * @param request the Recent Logs request + * @return the events from the recent logs + */ + Mono recentLogs(ReadRequest request); + + /** + * Continuously polls the Log Cache /api/v1/read endpoint and streams new {@link Envelope}s + * as they appear. This is the Java equivalent of the Go {@code logcache.Walk()} API and + * {@code cf tail --follow}. + *

+ * The returned {@link Flux} will never complete on its own – unsubscribe (or cancel) it to + * stop streaming. + * + * @param request the tail request (source id, optional filters, poll interval) + * @return an infinite stream of envelopes + */ + Flux logsTail(TailLogsRequest request); } diff --git a/cloudfoundry-client/src/main/java/org/cloudfoundry/logcache/v1/_TailLogsRequest.java b/cloudfoundry-client/src/main/java/org/cloudfoundry/logcache/v1/_TailLogsRequest.java new file mode 100644 index 0000000000..1ca32d388a --- /dev/null +++ b/cloudfoundry-client/src/main/java/org/cloudfoundry/logcache/v1/_TailLogsRequest.java @@ -0,0 +1,65 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.logcache.v1; + +import org.cloudfoundry.Nullable; +import org.immutables.value.Value; + +import java.time.Duration; +import java.util.List; + +/** + * The request options for the Log Cache tail (streaming follow) operation. + * This continuously polls the Log Cache /api/v1/read endpoint, emitting new envelopes + * as they appear – equivalent to {@code cf tail --follow} or the Go {@code logcache.Walk()} API. + */ +@Value.Immutable +abstract class _TailLogsRequest { + + /** + * The source id (application guid or service guid) to stream logs for. + */ + abstract String getSourceId(); + + /** + * Optional start time (UNIX nanoseconds). Defaults to "now – 5 seconds" when not set. + */ + @Nullable + abstract Long getStartTime(); + + /** + * Optional envelope type filter. + */ + @Nullable + abstract List getEnvelopeTypes(); + + /** + * Optional regex name filter (requires Log Cache ≥ 2.1.0). + */ + @Nullable + abstract String getNameFilter(); + + /** + * How long to wait between successive polls when no new envelopes are available. + * Defaults to 250 ms (matching the Go client's {@code AlwaysRetryBackoff}). + */ + @Value.Default + Duration getPollInterval() { + return Duration.ofMillis(250); + } +} + diff --git a/cloudfoundry-operations/src/main/java/org/cloudfoundry/operations/applications/Applications.java b/cloudfoundry-operations/src/main/java/org/cloudfoundry/operations/applications/Applications.java index 56aba6af64..5d95db5d01 100644 --- a/cloudfoundry-operations/src/main/java/org/cloudfoundry/operations/applications/Applications.java +++ b/cloudfoundry-operations/src/main/java/org/cloudfoundry/operations/applications/Applications.java @@ -17,6 +17,10 @@ package org.cloudfoundry.operations.applications; import org.cloudfoundry.doppler.LogMessage; +import org.cloudfoundry.logcache.v1.Envelope; +import org.cloudfoundry.logcache.v1.Log; +import org.cloudfoundry.logcache.v1.ReadRequest; +import org.cloudfoundry.logcache.v1.TailLogsRequest; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -126,11 +130,29 @@ public interface Applications { @Deprecated Flux logs(LogsRequest request); + /** + * List the applications logs from logCacheClient. + * If no messages are available, an empty Flux is returned. + * + * @param request the application logs request + * @return the applications logs + */ + Flux logsRecent(ReadRequest request); + + /** + * Continuously streams application log envelopes from Log Cache by repeatedly polling + * the {@code /api/v1/read} endpoint. The returned {@link Flux} is infinite – cancel it + * to stop streaming. This is the Java equivalent of {@code cf tail --follow}. + * + * @param request the tail request (source id, optional filters, poll interval) + * @return an infinite stream of envelopes + */ + Flux logsTail(TailLogsRequest request); + /** * List the applications logs. - * Uses Log Cache under the hood when {@link ApplicationLogsRequest#getRecent()} is {@code true}. - * Log streaming still uses Doppler, which is not available in CF deployments following - * shared-nothing architecture. + * Only works with {@code Loggregator < 107.0}, shipped in {@code CFD < 24.3} + * and {@code TAS < 4.0}. * * @param request the application logs request * @return the applications logs diff --git a/cloudfoundry-operations/src/main/java/org/cloudfoundry/operations/applications/DefaultApplications.java b/cloudfoundry-operations/src/main/java/org/cloudfoundry/operations/applications/DefaultApplications.java index c56ee52554..23d1fb70cd 100644 --- a/cloudfoundry-operations/src/main/java/org/cloudfoundry/operations/applications/DefaultApplications.java +++ b/cloudfoundry-operations/src/main/java/org/cloudfoundry/operations/applications/DefaultApplications.java @@ -155,8 +155,10 @@ import org.cloudfoundry.doppler.RecentLogsRequest; import org.cloudfoundry.doppler.StreamRequest; import org.cloudfoundry.logcache.v1.EnvelopeBatch; +import org.cloudfoundry.logcache.v1.Log; import org.cloudfoundry.logcache.v1.LogCacheClient; import org.cloudfoundry.logcache.v1.ReadRequest; +import org.cloudfoundry.logcache.v1.TailLogsRequest; import org.cloudfoundry.operations.util.OperationsLogging; import org.cloudfoundry.util.DateUtils; import org.cloudfoundry.util.DelayTimeoutException; @@ -557,25 +559,39 @@ public Flux logs(LogsRequest request) { .checkpoint(); } + @Override + public Flux logsRecent(ReadRequest request) { + return getRecentLogsLogCache(this.logCacheClient, request) + .transform(OperationsLogging.log("Get Application Logs")) + .checkpoint(); + } + + @Override + public Flux logsTail(TailLogsRequest request) { + return this.logCacheClient + .flatMapMany(client -> client.logsTail(request)) + .transform(OperationsLogging.log("Tail Application Logs")) + .checkpoint(); + } + @Override public Flux logs(ApplicationLogsRequest request) { - if (request.getRecent() != null && request.getRecent()) { - return Mono.zip(this.cloudFoundryClient, this.spaceId) - .flatMap( - function( - (cloudFoundryClient, spaceId) -> - getApplicationId( - cloudFoundryClient, - request.getName(), - spaceId))) - .flatMapMany( - applicationId -> getLogsLogCache(this.logCacheClient, applicationId)) - .transform(OperationsLogging.log("Get Application Logs")) - .checkpoint(); - } else { - return logs(LogsRequest.builder().name(request.getName()).recent(false).build()) - .map(DefaultApplications::toApplicationLog); - } + return logs(LogsRequest.builder() + .name(request.getName()) + .recent(request.getRecent()) + .build()) + .map( + logMessage -> + ApplicationLog.builder() + .sourceId(logMessage.getApplicationId()) + .sourceType(logMessage.getSourceType()) + .instanceId(logMessage.getSourceInstance()) + .message(logMessage.getMessage()) + .timestamp(logMessage.getTimestamp()) + .logType( + ApplicationLogType.from( + logMessage.getMessageType().name())) + .build()); } @Override @@ -1630,30 +1646,15 @@ private static Flux getLogs( } } - private static Flux getLogsLogCache( - Mono logCacheClient, String applicationId) { - return requestLogsRecentLogCache(logCacheClient, applicationId) + private static Flux getRecentLogsLogCache( + Mono logCacheClient, ReadRequest readRequest) { + return requestLogsRecentLogCache(logCacheClient, readRequest) + .map(EnvelopeBatch::getBatch) + .map(List::stream) + .flatMapIterable(envelopeStream -> envelopeStream.collect(Collectors.toList())) .filter(e -> e.getLog() != null) .sort(LOG_MESSAGE_COMPARATOR_LOG_CACHE) - .map( - envelope -> - ApplicationLog.builder() - .sourceId( - Optional.ofNullable(envelope.getSourceId()) - .orElse("")) - .sourceType( - envelope.getTags().getOrDefault("source_type", "")) - .instanceId( - Optional.ofNullable(envelope.getInstanceId()) - .orElse("")) - .message(envelope.getLog().getPayloadAsText()) - .timestamp( - Optional.ofNullable(envelope.getTimestamp()) - .orElse(0L)) - .logType( - ApplicationLogType.from( - envelope.getLog().getType().name())) - .build()); + .map(org.cloudfoundry.logcache.v1.Envelope::getLog); } @SuppressWarnings("unchecked") @@ -2549,14 +2550,12 @@ private static Flux requestLogsRecent( RecentLogsRequest.builder().applicationId(applicationId).build())); } - private static Flux requestLogsRecentLogCache( - Mono logCacheClient, String applicationId) { - return logCacheClient - .flatMap( - client -> - client.read(ReadRequest.builder().sourceId(applicationId).build())) - .flatMap(response -> Mono.justOrEmpty(response.getEnvelopes())) - .flatMapIterable(EnvelopeBatch::getBatch); + private static Mono requestLogsRecentLogCache( + Mono logCacheClient, ReadRequest readRequest) { + return logCacheClient.flatMap( + client -> + client.recentLogs(readRequest) + .flatMap(response -> Mono.justOrEmpty(response.getEnvelopes()))); } private static Flux requestLogsStream( @@ -2964,17 +2963,6 @@ private static Mono stopApplicationIfNotStopped( : Mono.just(resource); } - private static ApplicationLog toApplicationLog(LogMessage logMessage) { - return ApplicationLog.builder() - .sourceId(logMessage.getApplicationId()) - .sourceType(logMessage.getSourceType()) - .instanceId(logMessage.getSourceInstance()) - .message(logMessage.getMessage()) - .timestamp(logMessage.getTimestamp()) - .logType(ApplicationLogType.from(logMessage.getMessageType().name())) - .build(); - } - private static ApplicationDetail toApplicationDetail( List buildpacks, SummaryApplicationResponse summaryApplicationResponse, diff --git a/cloudfoundry-operations/src/test/java/org/cloudfoundry/operations/applications/DefaultApplicationsTest.java b/cloudfoundry-operations/src/test/java/org/cloudfoundry/operations/applications/DefaultApplicationsTest.java index f137ac2d84..24508271df 100644 --- a/cloudfoundry-operations/src/test/java/org/cloudfoundry/operations/applications/DefaultApplicationsTest.java +++ b/cloudfoundry-operations/src/test/java/org/cloudfoundry/operations/applications/DefaultApplicationsTest.java @@ -152,6 +152,7 @@ import org.cloudfoundry.logcache.v1.LogType; import org.cloudfoundry.logcache.v1.ReadRequest; import org.cloudfoundry.logcache.v1.ReadResponse; +import org.cloudfoundry.logcache.v1.TailLogsRequest; import org.cloudfoundry.operations.AbstractOperationsTest; import org.cloudfoundry.util.DateUtils; import org.cloudfoundry.util.FluentMap; @@ -1370,28 +1371,81 @@ void logsRecentDoppler() { @Test void logsLogCache() { - requestApplications( - this.cloudFoundryClient, - "test-application-name", - TEST_SPACE_ID, - "test-metadata-id"); - requestLogsRecentLogCache(this.logCacheClient, "test-metadata-id"); + ReadRequest readRequest = ReadRequest.builder().sourceId("test-metadata-id").build(); + requestLogsRecentLogCache(this.logCacheClient, readRequest); this.applications - .logs( - ApplicationLogsRequest.builder() - .name("test-application-name") - .recent(true) - .build()) + .logsRecent(readRequest) .as(StepVerifier::create) .expectNextMatches( log -> - log.getMessage().equals("test-payload") - && log.getLogType() == ApplicationLogType.OUT - && log.getSourceId().equals("test-sourceId") - && log.getInstanceId().equals("test-instanceId") - && log.getSourceType().equals("APP/PROC/WEB") - && log.getTimestamp() == 1L) + log.getPayloadAsText().equals("test-payload") + && log.getType() == LogType.OUT) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void logsTailLogCache() { + TailLogsRequest tailRequest = TailLogsRequest.builder().sourceId("test-source-id").build(); + requestLogsTailLogCache(this.logCacheClient, tailRequest, "test-tail-payload"); + + this.applications + .logsTail(tailRequest) + .take(1) + .as(StepVerifier::create) + .expectNextMatches( + envelope -> + envelope.getLog() != null + && LogType.OUT.equals(envelope.getLog().getType())) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void logsTailLogCacheMultipleEnvelopes() { + TailLogsRequest tailRequest = TailLogsRequest.builder().sourceId("test-source-id").build(); + requestLogsTailLogCacheMultiple(this.logCacheClient, tailRequest); + + this.applications + .logsTail(tailRequest) + .take(3) + .map(e -> e.getLog().getType()) + .as(StepVerifier::create) + .expectNext(LogType.OUT) + .expectNext(LogType.ERR) + .expectNext(LogType.OUT) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void logsTailLogCacheError() { + TailLogsRequest tailRequest = TailLogsRequest.builder().sourceId("test-source-id").build(); + when(this.logCacheClient.logsTail(tailRequest)) + .thenReturn(Flux.error(new RuntimeException("log-cache unavailable"))); + + this.applications + .logsTail(tailRequest) + .as(StepVerifier::create) + .expectErrorMatches( + t -> + t instanceof RuntimeException + && "log-cache unavailable".equals(t.getMessage())) + .verify(Duration.ofSeconds(5)); + } + + @Test + void logsTailLogCacheOutAndErrEnvelopes() { + TailLogsRequest tailRequest = TailLogsRequest.builder().sourceId("test-source-id").build(); + requestLogsTailLogCacheOutAndErr(this.logCacheClient, tailRequest); + + this.applications + .logsTail(tailRequest) + .take(2) + .as(StepVerifier::create) + .expectNextMatches(e -> LogType.OUT.equals(e.getLog().getType())) + .expectNextMatches(e -> LogType.ERR.equals(e.getLog().getType())) .expectComplete() .verify(Duration.ofSeconds(5)); } @@ -5372,10 +5426,11 @@ private static void requestLogsRecent(DopplerClient dopplerClient, String applic .build())); } - private static void requestLogsRecentLogCache(LogCacheClient logCacheClient, String sourceId) { + private static void requestLogsRecentLogCache( + LogCacheClient logCacheClient, ReadRequest readRequest) { String base64Payload = Base64.getEncoder().encodeToString("test-payload".getBytes(StandardCharsets.UTF_8)); - when(logCacheClient.read(ReadRequest.builder().sourceId(sourceId).build())) + when(logCacheClient.recentLogs(readRequest)) .thenReturn( Mono.just( fill(ReadResponse.builder()) @@ -5403,6 +5458,90 @@ private static void requestLogsRecentLogCache(LogCacheClient logCacheClient, Str .build())); } + private static void requestLogsTailLogCache( + LogCacheClient logCacheClient, TailLogsRequest tailRequest, String payload) { + when(logCacheClient.logsTail(tailRequest)) + .thenReturn( + Flux.just( + Envelope.builder() + .sourceId(tailRequest.getSourceId()) + .timestamp(System.nanoTime()) + .log( + Log.builder() + .payload(payload) + .type(LogType.OUT) + .build()) + .build())); + } + + /** + * Three envelopes with types OUT, ERR, OUT and strictly ascending timestamps so ordering + * is deterministic. + */ + private static void requestLogsTailLogCacheMultiple( + LogCacheClient logCacheClient, TailLogsRequest tailRequest) { + long base = System.nanoTime(); + when(logCacheClient.logsTail(tailRequest)) + .thenReturn( + Flux.just( + Envelope.builder() + .sourceId(tailRequest.getSourceId()) + .timestamp(base) + .log( + Log.builder() + .payload("msg1") + .type(LogType.OUT) + .build()) + .build(), + Envelope.builder() + .sourceId(tailRequest.getSourceId()) + .timestamp(base + 1) + .log( + Log.builder() + .payload("msg2") + .type(LogType.ERR) + .build()) + .build(), + Envelope.builder() + .sourceId(tailRequest.getSourceId()) + .timestamp(base + 2) + .log( + Log.builder() + .payload("msg3") + .type(LogType.OUT) + .build()) + .build())); + } + + /** + * Two envelopes – one STDOUT, one STDERR – to verify both log types are forwarded. + */ + private static void requestLogsTailLogCacheOutAndErr( + LogCacheClient logCacheClient, TailLogsRequest tailRequest) { + long base = System.nanoTime(); + when(logCacheClient.logsTail(tailRequest)) + .thenReturn( + Flux.just( + Envelope.builder() + .sourceId(tailRequest.getSourceId()) + .timestamp(base) + .log( + Log.builder() + .payload("stdout") + .type(LogType.OUT) + .build()) + .build(), + Envelope.builder() + .sourceId(tailRequest.getSourceId()) + .timestamp(base + 1) + .log( + Log.builder() + .payload("stderr") + .type(LogType.ERR) + .build()) + .build())); + } + private static void requestLogsStream(DopplerClient dopplerClient, String applicationId) { when(dopplerClient.stream(StreamRequest.builder().applicationId(applicationId).build())) .thenReturn( diff --git a/integration-test/src/test/java/org/cloudfoundry/operations/ApplicationsTest.java b/integration-test/src/test/java/org/cloudfoundry/operations/ApplicationsTest.java index 13b9fc1d3b..bc6d6826d8 100644 --- a/integration-test/src/test/java/org/cloudfoundry/operations/ApplicationsTest.java +++ b/integration-test/src/test/java/org/cloudfoundry/operations/ApplicationsTest.java @@ -25,15 +25,21 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.logging.Level; import org.cloudfoundry.AbstractIntegrationTest; import org.cloudfoundry.CleanupCloudFoundryAfterClass; import org.cloudfoundry.CloudFoundryVersion; import org.cloudfoundry.IfCloudFoundryVersion; -import org.cloudfoundry.RequiresTcpRouting; -import org.cloudfoundry.RequiresV2Api; import org.cloudfoundry.client.CloudFoundryClient; -import org.cloudfoundry.client.v3.applications.ApplicationFeatureResource; -import org.cloudfoundry.client.v3.applications.ListApplicationFeaturesRequest; +import org.cloudfoundry.logcache.v1.Envelope; +import org.cloudfoundry.logcache.v1.EnvelopeBatch; +import org.cloudfoundry.logcache.v1.EnvelopeType; +import org.cloudfoundry.logcache.v1.Log; +import org.cloudfoundry.logcache.v1.LogCacheClient; +import org.cloudfoundry.logcache.v1.LogType; +import org.cloudfoundry.logcache.v1.ReadRequest; +import org.cloudfoundry.logcache.v1.ReadResponse; +import org.cloudfoundry.logcache.v1.TailLogsRequest; import org.cloudfoundry.operations.applications.ApplicationDetail; import org.cloudfoundry.operations.applications.ApplicationEnvironments; import org.cloudfoundry.operations.applications.ApplicationEvent; @@ -82,18 +88,18 @@ import org.cloudfoundry.operations.services.CreateUserProvidedServiceInstanceRequest; import org.cloudfoundry.operations.services.GetServiceInstanceRequest; import org.cloudfoundry.operations.services.ServiceInstance; +import org.cloudfoundry.operations.util.OperationsLogging; import org.cloudfoundry.util.FluentMap; -import org.cloudfoundry.util.PaginationUtils; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.core.io.ClassPathResource; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; import reactor.test.StepVerifier; @CleanupCloudFoundryAfterClass -@RequiresV2Api public final class ApplicationsTest extends AbstractIntegrationTest { private static final String DEFAULT_ROUTER_GROUP = "default-tcp"; @@ -106,6 +112,7 @@ public final class ApplicationsTest extends AbstractIntegrationTest { @Autowired private String serviceName; + @Autowired private LogCacheClient logCacheClient; @Autowired private CloudFoundryClient cloudFoundryClient; // To create a service in #pushBindService, the Service Broker must be installed first. @@ -356,7 +363,6 @@ public void getManifest() throws IOException { } @Test - @RequiresTcpRouting public void getManifestForTcpRoute() throws IOException { String applicationName = this.nameFactory.getApplicationName(); @@ -443,7 +449,6 @@ public void getStopped() throws IOException { } @Test - @RequiresTcpRouting public void getTcp() throws IOException { String applicationName = this.nameFactory.getApplicationName(); String domainName = this.nameFactory.getDomainName(); @@ -507,13 +512,12 @@ public void listTasks() throws IOException { } /** - * Exercise the LogCache client via {@code logs(ApplicationLogsRequest)}. - * LogCache has been a default cf-deployment component since v3.0.0 (July 2018), - * with the {@code /api/v1/read} endpoint available since log-cache-release v2.0.0 - * (October 2018). + * Doppler was dropped in PCF 4.x in favor of logcache. This test does not work + * on TAS 4.x. */ + @Deprecated @Test - @IfCloudFoundryVersion(greaterThanOrEqualTo = CloudFoundryVersion.PCF_2_3) + @IfCloudFoundryVersion(lessThan = CloudFoundryVersion.PCF_4_v2) public void logs() throws IOException { String applicationName = this.nameFactory.getApplicationName(); @@ -538,6 +542,159 @@ public void logs() throws IOException { .verify(Duration.ofMinutes(5)); } + @Test + public void logsRecent() throws IOException { + String applicationName = this.nameFactory.getApplicationName(); + Mono applicationGuid = + getAppGuidFromAppName(cloudFoundryOperations, applicationName); + createApplication( + this.cloudFoundryOperations, + new ClassPathResource("test-application.zip").getFile().toPath(), + applicationName, + false) + .then( + applicationGuid + .map(ApplicationsTest::getReadRequest) + .flatMapMany( + readRequest -> + callLogsRecent( + this.cloudFoundryOperations, + readRequest) + .log(null, Level.ALL, SignalType.ON_NEXT)) + .map(ApplicationsTest::checkOneLogEntry) + .then()) + .as(StepVerifier::create) + .expectComplete() + .verify(Duration.ofMinutes(5)); + } + + /** + * Exercise the LogCache client. Serves as a reference for using the logcache client, + * and will help with the transition to the new + * {@link org.cloudfoundry.operations.applications.Applications#logs(ApplicationLogsRequest)}. + */ + @Test + public void logCacheLogs() throws IOException { + String applicationName = this.nameFactory.getApplicationName(); + + createApplication( + this.cloudFoundryOperations, + new ClassPathResource("test-application.zip").getFile().toPath(), + applicationName, + false) + .then( + this.cloudFoundryOperations + .applications() + .get(GetApplicationRequest.builder().name(applicationName).build())) + .map(ApplicationDetail::getId) + .flatMapMany( + appGuid -> + this.logCacheClient.read( + ReadRequest.builder() + .sourceId(appGuid) + .envelopeType(EnvelopeType.LOG) + .limit(1) + .build())) + .map(ReadResponse::getEnvelopes) + .map(EnvelopeBatch::getBatch) + .flatMap(Flux::fromIterable) + .map(Envelope::getLog) + .map(Log::getType) + .next() + .as(StepVerifier::create) + .expectNext(LogType.OUT) + .expectComplete() + .verify(Duration.ofMinutes(5)); + } + + /** + * Integration test for {@link org.cloudfoundry.operations.applications.Applications#logsTail}. + * Verifies that streaming a single LOG envelope from a running application succeeds. + */ + @Test + public void logsTail() throws IOException { + String applicationName = this.nameFactory.getApplicationName(); + + createApplication( + this.cloudFoundryOperations, + new ClassPathResource("test-application.zip").getFile().toPath(), + applicationName, + false) + .then( + this.cloudFoundryOperations + .applications() + .get( + GetApplicationRequest.builder() + .name(applicationName) + .build())) + .map(ApplicationDetail::getId) + .flatMapMany( + appGuid -> + this.cloudFoundryOperations + .applications() + .logsTail( + TailLogsRequest.builder() + .sourceId(appGuid) + .envelopeTypes( + Collections.singletonList( + EnvelopeType.LOG)) + .build()) + .take(1)) + .map(Envelope::getLog) + .map(Log::getType) + .as(StepVerifier::create) + .expectNextMatches( + logType -> LogType.OUT.equals(logType) || LogType.ERR.equals(logType)) + .expectComplete() + .verify(Duration.ofMinutes(5)); + } + + /** + * Integration test for {@link org.cloudfoundry.operations.applications.Applications#logsTail} + * verifying that multiple LOG envelopes can be streamed from a running application. + */ + @Test + public void logsTailMultipleEnvelopes() throws IOException { + String applicationName = this.nameFactory.getApplicationName(); + + createApplication( + this.cloudFoundryOperations, + new ClassPathResource("test-application.zip").getFile().toPath(), + applicationName, + false) + .then( + this.cloudFoundryOperations + .applications() + .get( + GetApplicationRequest.builder() + .name(applicationName) + .build())) + .map(ApplicationDetail::getId) + .flatMapMany( + appGuid -> + this.cloudFoundryOperations + .applications() + .logsTail( + TailLogsRequest.builder() + .sourceId(appGuid) + .envelopeTypes( + Collections.singletonList( + EnvelopeType.LOG)) + .build()) + .take(3)) + .map(Envelope::getLog) + .map(Log::getType) + .as(StepVerifier::create) + .expectNextMatches( + logType -> LogType.OUT.equals(logType) || LogType.ERR.equals(logType)) + .expectNextMatches( + logType -> LogType.OUT.equals(logType) || LogType.ERR.equals(logType)) + .expectNextMatches( + logType -> LogType.OUT.equals(logType) || LogType.ERR.equals(logType)) + .expectComplete() + .verify(Duration.ofMinutes(5)); + } + @Test public void pushBindServices() throws IOException { String applicationName = this.nameFactory.getApplicationName(); @@ -753,59 +910,6 @@ public void pushManifestV3() throws IOException { .verify(Duration.ofMinutes(5)); } - @Test - @IfCloudFoundryVersion(greaterThanOrEqualTo = CloudFoundryVersion.PCF_4_v2) - public void pushManifestV3WithFeature() throws IOException { - String applicationName = this.nameFactory.getApplicationName(); - - final String featureKey = "ssh"; - final boolean featureValue = false; - ManifestV3 manifest = - ManifestV3.builder() - .application( - ManifestV3Application.builder() - .buildpack("staticfile_buildpack") - .disk(512) - .healthCheckType(ApplicationHealthCheck.PORT) - .memory(64) - .name(applicationName) - .feature(featureKey, false) - .path( - new ClassPathResource("test-application.zip") - .getFile() - .toPath()) - .build()) - .build(); - - this.cloudFoundryOperations - .applications() - .pushManifestV3(PushManifestV3Request.builder().manifest(manifest).build()) - .then( - this.cloudFoundryOperations - .applications() - .get(GetApplicationRequest.builder().name(applicationName).build())) - .map(ApplicationDetail::getId) - .flatMapMany( - applicationId -> - PaginationUtils.requestClientV3Resources( - page -> - this.cloudFoundryClient - .applicationsV3() - .listFeatures( - ListApplicationFeaturesRequest - .builder() - .applicationId( - applicationId) - .page(page) - .build()))) - .filter(feature -> featureKey.equals(feature.getName())) - .map(ApplicationFeatureResource::getEnabled) - .as(StepVerifier::create) - .expectNext(featureValue) - .expectComplete() - .verify(Duration.ofMinutes(5)); - } - @Test @IfCloudFoundryVersion(greaterThanOrEqualTo = CloudFoundryVersion.PCF_4_v2) public void pushManifestV3WithMetadata() throws IOException { @@ -1201,7 +1305,6 @@ public void pushRoutePath() throws IOException { } @Test - @RequiresTcpRouting public void pushTcpRoute() throws IOException { String applicationName = this.nameFactory.getApplicationName(); String domainName = this.nameFactory.getDomainName(); @@ -1325,7 +1428,6 @@ public void pushUpdateRoute() throws IOException { } @Test - @RequiresTcpRouting public void pushUpdateTcpRoute() throws IOException { String applicationName = this.nameFactory.getApplicationName(); String domainName = this.nameFactory.getDomainName(); @@ -2173,4 +2275,27 @@ private static Mono requestSshEnabled( .applications() .sshEnabled(ApplicationSshEnabledRequest.builder().name(applicationName).build()); } + + private static ReadRequest getReadRequest(String applicationId) { + return ReadRequest.builder().sourceId(applicationId).build(); + } + + private static Flux callLogsRecent( + CloudFoundryOperations cloudFoundryOperations, ReadRequest readRequest) { + return cloudFoundryOperations.applications().logsRecent(readRequest); + } + + private static Mono getAppGuidFromAppName( + CloudFoundryOperations cloudFoundryOperations, String applicationName) { + return cloudFoundryOperations + .applications() + .get(GetApplicationRequest.builder().name(applicationName).build()) + .map(ApplicationDetail::getId); + } + + private static Log checkOneLogEntry(Log log) { + OperationsLogging.log("one log entry: " + log.getType() + " " + log.getPayloadAsText()); + assertThat(log.getType()).isIn(LogType.OUT, LogType.ERR); + return log; + } }