Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,8 @@ iotdb-core/tsfile/src/main/antlr4/org/apache/tsfile/parser/gen/

# Relational Grammar ANTLR
iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/.antlr/

# Claude Code
CLAUDE.md
.omc/
.claude/
6 changes: 6 additions & 0 deletions distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@
<version>2.0.7-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>flight-sql</artifactId>
<version>2.0.7-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
4 changes: 4 additions & 0 deletions distribution/src/assembly/external-service-impl.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,9 @@
<source>${maven.multiModuleProjectDirectory}/external-service-impl/rest/target/rest-${project.version}-jar-with-dependencies.jar</source>
<outputDirectory>/</outputDirectory>
</file>
<file>
<source>${maven.multiModuleProjectDirectory}/external-service-impl/flight-sql/target/flight-sql-${project.version}-jar-with-dependencies.jar</source>
<outputDirectory>/</outputDirectory>
</file>
</files>
</assembly>
17 changes: 16 additions & 1 deletion external-service-impl/flight-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,25 @@
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-sql</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty-buffer-patch</artifactId>
</exclusion>
<!-- Exclude Netty: standalone Netty JARs already on DataNode classpath -->
<exclusion>
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<artifactId>arrow-memory-unsafe</artifactId>
<scope>runtime</scope>
</dependency>
<!-- IoTDB dependencies (provided at runtime by DataNode classloader) -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,117 @@

package org.apache.iotdb.flight;

import org.apache.arrow.flight.CallHeaders;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.auth2.BasicCallHeaderAuthenticator;
import org.apache.arrow.flight.auth2.CallHeaderAuthenticator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Base64;

/**
* Arrow Flight SQL credential validator using Arrow's built-in auth2 framework. Validates
* username/password credentials via IoTDB's SessionManager and returns a Bearer token string as the
* peer identity for subsequent requests.
*
* <p>Used with {@link BasicCallHeaderAuthenticator} and {@link
* org.apache.arrow.flight.auth2.GeneratedBearerTokenAuthenticator} to provide Basic → Bearer token
* authentication flow.
* Arrow Flight SQL authenticator that supports both Basic and Bearer token authentication. On the
* first call, Basic credentials are validated and a Bearer token is returned. On subsequent calls,
* the Bearer token is used to look up the existing session, avoiding creating a new session per
* call.
*/
public class FlightSqlAuthHandler implements BasicCallHeaderAuthenticator.CredentialValidator {
public class FlightSqlAuthHandler implements CallHeaderAuthenticator {

private static final Logger LOGGER = LoggerFactory.getLogger(FlightSqlAuthHandler.class);
private static final String AUTHORIZATION_HEADER = "authorization";
private static final String BASIC_PREFIX = "Basic ";
private static final String BEARER_PREFIX = "Bearer ";

private final FlightSqlSessionManager sessionManager;

public FlightSqlAuthHandler(FlightSqlSessionManager sessionManager) {
this.sessionManager = sessionManager;
}

@Override
public org.apache.arrow.flight.auth2.CallHeaderAuthenticator.AuthResult validate(
String username, String password) {
LOGGER.debug("Validating credentials for user: {}", username);

public AuthResult authenticate(CallHeaders headers) {
Iterable<String> authHeaders;
try {
String token = sessionManager.authenticate(username, password, "unknown");
// Return the token as the peer identity; GeneratedBearerTokenAuthenticator
// wraps it in a Bearer token and sets it in the response header.
return () -> token;
} catch (SecurityException e) {
throw CallStatus.UNAUTHENTICATED.withDescription(e.getMessage()).toRuntimeException();
authHeaders = headers.getAll(AUTHORIZATION_HEADER);
} catch (NullPointerException e) {
throw CallStatus.UNAUTHENTICATED
.withDescription("Missing Authorization header (null header map)")
.toRuntimeException();
}

// First pass: check for Bearer token (reuse existing session)
String basicHeader = null;
if (authHeaders == null) {
throw CallStatus.UNAUTHENTICATED
.withDescription("Missing Authorization header")
.toRuntimeException();
}
for (String authHeader : authHeaders) {
if (authHeader.startsWith(BEARER_PREFIX)) {
String token = authHeader.substring(BEARER_PREFIX.length());
try {
sessionManager.getSessionByToken(token);
return bearerResult(token);
} catch (SecurityException e) {
// Bearer token invalid/expired, fall through to Basic auth
LOGGER.debug("Bearer token invalid, falling back to Basic auth");
}
} else if (authHeader.startsWith(BASIC_PREFIX) && basicHeader == null) {
basicHeader = authHeader;
}
}

// Second pass: fall back to Basic auth (create new session)
if (basicHeader != null) {
String encoded = basicHeader.substring(BASIC_PREFIX.length());
String decoded = new String(Base64.getDecoder().decode(encoded), StandardCharsets.UTF_8);
int colonIdx = decoded.indexOf(':');
if (colonIdx < 0) {
throw CallStatus.UNAUTHENTICATED
.withDescription("Invalid Basic credentials format")
.toRuntimeException();
}
String username = decoded.substring(0, colonIdx);
String password = decoded.substring(colonIdx + 1);

LOGGER.debug("Validating credentials for user: {}", username);
try {
String token = sessionManager.authenticate(username, password, "unknown");
return bearerResult(token);
} catch (SecurityException e) {
throw CallStatus.UNAUTHENTICATED.withDescription(e.getMessage()).toRuntimeException();
}
}

throw CallStatus.UNAUTHENTICATED
.withDescription("Missing or unsupported Authorization header")
.toRuntimeException();
}

/**
* Creates an AuthResult that sends the Bearer token back in response headers. The client's
* ClientIncomingAuthHeaderMiddleware captures this token for use on subsequent calls.
*/
private static AuthResult bearerResult(String token) {
return new AuthResult() {
@Override
public String getPeerIdentity() {
return token;
}

@Override
public void appendToOutgoingHeaders(CallHeaders outgoingHeaders) {
if (outgoingHeaders == null) {
return;
}
try {
outgoingHeaders.insert(AUTHORIZATION_HEADER, BEARER_PREFIX + token);
} catch (NullPointerException e) {
// Some CallHeaders implementations have null internal maps for certain RPCs
LOGGER.debug("Could not append Bearer token to outgoing headers", e);
}
}
};
}
}

This file was deleted.

Loading