Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 1.2.0 [unreleased]

### Features

1. [#209](https://github.com/InfluxCommunity/influxdb3-java/pull/209) Add query function returning row as map.

## 1.1.0 [2025-05-22]

### Features
Expand Down
81 changes: 81 additions & 0 deletions src/main/java/com/influxdb/v3/client/InfluxDBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,87 @@ Stream<Object[]> query(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.queryRows("select * from cpu where host=intel")) {
* rows.forEach(row -&gt; {
* // process row
* });
* };
* </pre>
*
* @param query the query string to execute, cannot be null
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<Map<String, Object>> queryRows(@Nonnull final String query);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.queryRows("select * from cpu where host=$host",
* Map.of("host", "server-a"))) {
* rows.forEach(row -&gt; {
* // process row
* })
* };
* </pre>
*
* @param query the query string to execute, cannot be null
* @param parameters query named parameters
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final Map<String, Object> parameters);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.queryRows("select * from cpu where host=intel",
* options)) {
* rows.forEach(row -&gt; {
* // process row
* })
* };
* </pre>
*
* @param query the query string to execute, cannot be null
* @param options the options for querying data from InfluxDB
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.queryRows("select * from cpu where host=$host",
* Map.of("host", "server-a"), options)) {
* rows.forEach(row -&gt; {
* // process row
* })
* };
* </pre>
*
* @param query the query string to execute, cannot be null
* @param parameters query named parameters
* @param options the options for querying data from InfluxDB
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<Map<String, Object>> queryRows(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx into Point structure using FlightSQL.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,41 @@ public Stream<Object[]> query(@Nonnull final String query,
)));
}

@Nonnull
@Override
public Stream<Map<String, Object>> queryRows(@Nonnull final String query) {
return queryRows(query, NO_PARAMETERS, QueryOptions.DEFAULTS);
}

@Nonnull
@Override
public Stream<Map<String, Object>> queryRows(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters
) {
return queryRows(query, parameters, QueryOptions.DEFAULTS);
}

@Nonnull
@Override
public Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final QueryOptions options) {
return queryRows(query, NO_PARAMETERS, options);
}

@Nonnull
@Override
public Stream<Map<String, Object>> queryRows(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options) {
return queryData(query, parameters, options)
.flatMap(vector -> IntStream.range(0, vector.getRowCount())
.mapToObj(rowNumber ->
VectorSchemaRootConverter.INSTANCE
.getMapFromVectorSchemaRoot(
vector,
rowNumber
)));
}

@Nonnull
@Override
public Stream<PointValues> queryPoints(@Nonnull final String query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

import java.math.BigInteger;
import java.time.LocalDateTime;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -195,4 +197,25 @@ public Object[] getArrayObjectFromVectorSchemaRoot(@Nonnull final VectorSchemaRo

return row;
}

/**
* Get a Map from VectorSchemaRoot.
*
* @param vector The data return from InfluxDB.
* @param rowNumber The row number of data
* @return A Map represents a row of data
*/
public Map<String, Object> getMapFromVectorSchemaRoot(@Nonnull final VectorSchemaRoot vector, final int rowNumber) {
Map<String, Object> row = new LinkedHashMap<>();
for (FieldVector fieldVector : vector.getFieldVectors()) {
Object mappedValue = getMappedValue(
fieldVector.getField(),
fieldVector.getObject(rowNumber)
);
row.put(fieldVector.getName(), mappedValue);

}

return row;
}
}
9 changes: 0 additions & 9 deletions src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,6 @@ void requiredHostConnectionString() {
.hasMessageContaining("no protocol");
}

@Test
void requiredHostEnvOrProperties() {

Assertions.assertThatThrownBy(InfluxDBClient::getInstance)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("The URL of the InfluxDB server has to be defined.");
}

@Test
void fromParameters() throws Exception {

Expand Down Expand Up @@ -136,5 +128,4 @@ public void unsupportedQueryParams() throws Exception {
+ "class com.influxdb.v3.client.internal.InfluxDBClientImpl");
}
}

}
Loading
Loading