Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,41 @@ The `cf-java-client` project is a Java language binding for interacting with a C
## Versions
The Cloud Foundry Java Client has two active versions. The `5.x` line is compatible with Spring Boot `2.4.x - 2.6.x` just to manage its dependencies, while the `4.x` line uses Spring Boot `2.3.x`.

## Deprecations

### `DopplerClient.recentLogs()` — Recent Logs via Doppler

> [!WARNING]
> **Deprecated since cf-java-client `5.17.x`**
>
> The `DopplerClient.recentLogs()` endpoint (and the related `RecentLogsRequest` / `LogMessage` types from the `org.cloudfoundry.doppler` package) are **deprecated** and will be removed in a future release.
>
> This API relies on the [Loggregator][loggregator] Doppler/Traffic Controller endpoint `/apps/{id}/recentlogs`, which was removed in **Loggregator ≥ 107.0**.
> The affected platform versions are:
>
> | Platform | Last version with Doppler recent-logs support |
> | -------- | --------------------------------------------- |
> | CF Deployment (CFD) | `< 24.3` |
> | Tanzu Application Service (TAS) | `< 4.0` |
>
> **Migration:** Replace any call to `DopplerClient.recentLogs()` with [`LogCacheClient.read()`][log-cache-api] (available via `org.cloudfoundry.logcache.v1.LogCacheClient`).
>
> ```java
> // Before (deprecated)
> dopplerClient.recentLogs(RecentLogsRequest.builder()
> .applicationId(appId)
> .build());
>
> // After
> logCacheClient.read(ReadRequest.builder()
> .sourceId(appId)
> .envelopeTypes(EnvelopeType.LOG)
> .build());
> ```
[loggregator]: https://github.com/cloudfoundry/loggregator
[log-cache-api]: https://github.com/cloudfoundry/log-cache
## Dependencies
Most projects will need two dependencies; the Operations API and an implementation of the Client API. For Maven, the dependencies would be defined like this:
Expand Down Expand Up @@ -76,6 +111,9 @@ Both the `cloudfoundry-operations` and `cloudfoundry-client` projects follow a [

### `CloudFoundryClient`, `DopplerClient`, `UaaClient` Builders

> [!NOTE]
> **`DopplerClient` — partial deprecation:** The `recentLogs()` method on `DopplerClient` is deprecated and only works against Loggregator \< 107.0 (CFD \< 24.3 / TAS \< 4.0). See the [Deprecations](#deprecations) section above for the migration path to `LogCacheClient`.
The lowest-level building blocks of the API are `ConnectionContext` and `TokenProvider`. These types are intended to be shared between instances of the clients, and come with out of the box implementations. To instantiate them, you configure them with builders:

```java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,23 @@

package org.cloudfoundry.reactor.logcache.v1;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.cloudfoundry.logcache.v1.Envelope;
import org.cloudfoundry.logcache.v1.EnvelopeType;
import org.cloudfoundry.logcache.v1.InfoRequest;
import org.cloudfoundry.logcache.v1.InfoResponse;
import org.cloudfoundry.logcache.v1.MetaRequest;
import org.cloudfoundry.logcache.v1.MetaResponse;
import org.cloudfoundry.logcache.v1.ReadRequest;
import org.cloudfoundry.logcache.v1.ReadResponse;
import org.cloudfoundry.logcache.v1.TailLogsRequest;
import org.cloudfoundry.reactor.ConnectionContext;
import org.cloudfoundry.reactor.TokenProvider;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

final class ReactorLogCacheEndpoints extends AbstractLogCacheOperations {
Expand All @@ -48,4 +56,111 @@ Mono<MetaResponse> meta(MetaRequest request) {
Mono<ReadResponse> read(ReadRequest request) {
return get(request, ReadResponse.class, "read", request.getSourceId()).checkpoint();
}

Mono<ReadResponse> recentLogs(ReadRequest request) {
return read(request);
}

/**
* Continuously polls Log Cache and emits new {@link Envelope}s as they arrive.
*
* <p>Mirrors the Go {@code logcache.Walk()} / {@code cf tail --follow} semantics:
* <ol>
* <li>Start the cursor at {@code startTime} (defaults to now&nbsp;&minus;&nbsp;5&nbsp;s in
* nanoseconds).</li>
* <li>Issue {@code GET /api/v1/read/{sourceId}?start_time=cursor}.</li>
* <li>Emit every returned envelope in ascending timestamp order and advance
* the cursor to {@code lastTimestamp + 1}.</li>
* <li>When the batch is empty, wait {@code pollInterval} before the next poll.</li>
* <li>Repeat forever – the caller cancels the subscription to stop.</li>
* </ol>
* Fully non-blocking: no {@code Thread.sleep}.
*/
Flux<Envelope> logsTail(TailLogsRequest request) {
long defaultStartNanos = (System.currentTimeMillis() - 5_000L) * 1_000_000L;
AtomicLong cursor =
new AtomicLong(
request.getStartTime() != null
? request.getStartTime()
: defaultStartNanos);

List<EnvelopeType> envelopeTypes =
request.getEnvelopeTypes() != null
? request.getEnvelopeTypes()
: Collections.emptyList();
String nameFilter = request.getNameFilter();

/*
* Strategy (mirrors Go's logcache.Walk):
* – Mono.defer builds a fresh ReadRequest from the mutable cursor on every repetition.
* – The Mono returns either the sorted batch (non-empty) or an empty list.
* – flatMapMany turns each batch into a stream of individual Envelope items.
* – repeat() subscribes again after each completion.
* – When the batch was empty we insert a delay via Mono.delay before the next
* repetition so we do not hammer the server. We signal "empty" by returning
* a sentinel Mono<Boolean> (false = was empty, true = had data) and use
* repeatWhen to conditionally delay.
*/
return Flux.defer(
() -> {
// Build the read request from the current cursor position.
ReadRequest.Builder builder =
ReadRequest.builder()
.sourceId(request.getSourceId())
.startTime(cursor.get());
if (!envelopeTypes.isEmpty()) {
builder.envelopeTypes(envelopeTypes);
}
if (nameFilter != null && !nameFilter.isEmpty()) {
builder.nameFilter(nameFilter);
}

return read(builder.build())
.onErrorReturn(ReadResponse.builder().build())
.flatMapMany(
resp -> {
List<Envelope> raw =
resp.getEnvelopes() != null
? resp.getEnvelopes().getBatch()
: Collections.emptyList();

if (raw.isEmpty()) {
// Signal "no data" so repeatWhen can insert the
// back-off delay.
return Flux.empty();
}

// Sort ascending by timestamp and advance the
// cursor.
List<Envelope> sorted = new ArrayList<>(raw);
sorted.sort(
(a, b) ->
Long.compare(
a.getTimestamp() != null
? a.getTimestamp()
: 0L,
b.getTimestamp() != null
? b.getTimestamp()
: 0L));

Envelope last = sorted.get(sorted.size() - 1);
cursor.set(
(last.getTimestamp() != null
? last.getTimestamp()
: cursor.get())
+ 1);

return Flux.fromIterable(sorted);
});
})
// repeatWhen receives a Flux<Long> where each element is the count of items
// emitted in the previous cycle (0 = empty batch → insert delay).
.repeatWhen(
companion ->
companion.flatMap(
count ->
count == 0
? Mono.delay(request.getPollInterval())
: Mono.just(count)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@

package org.cloudfoundry.reactor.logcache.v1;

import org.cloudfoundry.logcache.v1.Envelope;
import org.cloudfoundry.logcache.v1.InfoRequest;
import org.cloudfoundry.logcache.v1.InfoResponse;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.logcache.v1.MetaRequest;
import org.cloudfoundry.logcache.v1.MetaResponse;
import org.cloudfoundry.logcache.v1.ReadRequest;
import org.cloudfoundry.logcache.v1.ReadResponse;
import org.cloudfoundry.logcache.v1.TailLogsRequest;
import org.cloudfoundry.reactor.ConnectionContext;
import org.cloudfoundry.reactor.TokenProvider;
import org.immutables.value.Value;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.net.URI;
Expand Down Expand Up @@ -53,6 +56,16 @@ public Mono<ReadResponse> read(ReadRequest request) {
return getReactorLogCacheEndpoints().read(request);
}

@Override
public Mono<ReadResponse> recentLogs(ReadRequest request) {
return getReactorLogCacheEndpoints().recentLogs(request);
}

@Override
public Flux<Envelope> logsTail(TailLogsRequest request) {
return getReactorLogCacheEndpoints().logsTail(request);
}

/**
* The connection context
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@ public interface DopplerClient {
/**
* Makes the <a href="https://github.com/cloudfoundry/loggregator/tree/develop/src/trafficcontroller#endpoints">Recent Logs</a> request
*
* @deprecated Use {@link org.cloudfoundry.logcache.v1.LogCacheClient#recentLogs(org.cloudfoundry.logcache.v1.ReadRequest)} instead.
* Only works with {@code Loggregator < 107.0}, shipped in {@code CFD < 24.3} and {@code TAS < 4.0}.
* @param request the Recent Logs request
* @return the events from the recent logs
* @return a flux of events from the recent logs
* @see <a href="https://github.com/cloudfoundry/loggregator">Loggregator</a>
* @see <a href="https://github.com/cloudfoundry/log-cache">Log Cache</a>
* @see org.cloudfoundry.logcache.v1.LogCacheClient#recentLogs(org.cloudfoundry.logcache.v1.ReadRequest)
*/
@Deprecated
Flux<Envelope> recentLogs(RecentLogsRequest request);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.cloudfoundry.logcache.v1;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
Expand Down Expand Up @@ -46,4 +47,25 @@ public interface LogCacheClient {
* @return the read response
*/
Mono<ReadResponse> read(ReadRequest request);

/**
* Makes the Log Cache RecentLogs /api/v1/read request
*
* @param request the Recent Logs request
* @return the events from the recent logs
*/
Mono<ReadResponse> recentLogs(ReadRequest request);

/**
* Continuously polls the Log Cache /api/v1/read endpoint and streams new {@link Envelope}s
* as they appear. This is the Java equivalent of the Go {@code logcache.Walk()} API and
* {@code cf tail --follow}.
* <p>
* The returned {@link Flux} will never complete on its own – unsubscribe (or cancel) it to
* stop streaming.
*
* @param request the tail request (source id, optional filters, poll interval)
* @return an infinite stream of envelopes
*/
Flux<Envelope> logsTail(TailLogsRequest request);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.cloudfoundry.logcache.v1;

import org.cloudfoundry.Nullable;
import org.immutables.value.Value;

import java.time.Duration;
import java.util.List;

/**
* The request options for the Log Cache tail (streaming follow) operation.
* This continuously polls the Log Cache /api/v1/read endpoint, emitting new envelopes
* as they appear – equivalent to {@code cf tail --follow} or the Go {@code logcache.Walk()} API.
*/
@Value.Immutable
abstract class _TailLogsRequest {

/**
* The source id (application guid or service guid) to stream logs for.
*/
abstract String getSourceId();

/**
* Optional start time (UNIX nanoseconds). Defaults to "now – 5 seconds" when not set.
*/
@Nullable
abstract Long getStartTime();

/**
* Optional envelope type filter.
*/
@Nullable
abstract List<EnvelopeType> getEnvelopeTypes();

/**
* Optional regex name filter (requires Log Cache ≥ 2.1.0).
*/
@Nullable
abstract String getNameFilter();

/**
* How long to wait between successive polls when no new envelopes are available.
* Defaults to 250 ms (matching the Go client's {@code AlwaysRetryBackoff}).
*/
@Value.Default
Duration getPollInterval() {
return Duration.ofMillis(250);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.cloudfoundry.client.v3.spaces.ListSpacesRequest;
import org.cloudfoundry.client.v3.spaces.SpaceResource;
import org.cloudfoundry.doppler.DopplerClient;
import org.cloudfoundry.logcache.v1.LogCacheClient;
import org.cloudfoundry.networking.NetworkingClient;
import org.cloudfoundry.operations.advanced.Advanced;
import org.cloudfoundry.operations.advanced.DefaultAdvanced;
Expand Down Expand Up @@ -79,7 +80,7 @@ public Advanced advanced() {
@Override
@Value.Derived
public Applications applications() {
return new DefaultApplications(getCloudFoundryClientPublisher(), getDopplerClientPublisher(), getSpaceId());
return new DefaultApplications(getCloudFoundryClientPublisher(), getDopplerClientPublisher(), getLogCacheClientPublisher(), getSpaceId());
}

@Override
Expand Down Expand Up @@ -197,6 +198,19 @@ Mono<DopplerClient> getDopplerClientPublisher() {
.orElse(Mono.error(new IllegalStateException("DopplerClient must be set")));
}

/**
* The {@link LogCacheClient} to use for operations functionality
*/
@Nullable
abstract LogCacheClient getLogCacheClient();

@Value.Derived
Mono<LogCacheClient> getLogCacheClientPublisher() {
return Optional.ofNullable(getLogCacheClient())
.map(Mono::just)
.orElse(Mono.error(new IllegalStateException("LogCacheClient must be set")));
}

/**
* The {@link NetworkingClient} to use for operations functionality
*/
Expand Down
Loading