Skip to content

Commit

Permalink
refactor: add influxdb module
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Mar 10, 2023
1 parent a803efc commit cdfbacd
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 44 deletions.
2 changes: 1 addition & 1 deletion integration_tests/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl CeresDB {
}

async fn execute_influxql(query: String, http_client: HttpClient) -> Box<dyn Display> {
let url = format!("http://{}/influxql", http_client.endpoint);
let url = format!("http://{}/influxdb/v1/query", http_client.endpoint);
let query_request = InfluxQLRequest { query };
let resp = http_client
.client
Expand Down
3 changes: 3 additions & 0 deletions server/src/handlers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! Error of handlers

use snafu::{Backtrace, Snafu};
use warp::reject::Reject;

use crate::limiter;
// TODO(yingwen): Avoid printing huge sql string
Expand Down Expand Up @@ -73,3 +74,5 @@ pub enum Error {
}

define_result!(Error);

impl Reject for Error {}
84 changes: 84 additions & 0 deletions server/src/handlers/influxdb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

//! This module implements [write][1] and [query][2] for InfluxDB.
//! [1]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint
//! [2]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint

use std::sync::Arc;

use bytes::Bytes;
use handlers::{error::Result, query::QueryRequest};
use query_engine::executor::Executor as QueryExecutor;
use warp::{reject, reply, Rejection, Reply};

use crate::{
context::RequestContext, handlers, instance::InstanceRef,
schema_config_provider::SchemaConfigProviderRef,
};

pub struct Influxdb<Q> {
instance: InstanceRef<Q>,
#[allow(dead_code)]
schema_config_provider: SchemaConfigProviderRef,
}

/// Line protocol
pub struct WriteRequest {
pub payload: String,
}

impl From<Bytes> for WriteRequest {
fn from(bytes: Bytes) -> Self {
WriteRequest {
payload: String::from_utf8_lossy(&bytes).to_string(),
}
}
}

#[allow(dead_code)]
type WriteResponse = String;

impl<Q: QueryExecutor + 'static> Influxdb<Q> {
pub fn new(instance: InstanceRef<Q>, schema_config_provider: SchemaConfigProviderRef) -> Self {
Self {
instance,
schema_config_provider,
}
}

async fn query(
&self,
ctx: RequestContext,
req: QueryRequest,
) -> Result<handlers::query::Response> {
handlers::query::handle_query(&ctx, self.instance.clone(), req)
.await
.map(handlers::query::convert_output)
}

async fn write(&self, _ctx: RequestContext, _req: WriteRequest) -> Result<WriteResponse> {
todo!()
}
}

// TODO: Request and response type don't match influxdb's API now.
pub async fn query<Q: QueryExecutor + 'static>(
ctx: RequestContext,
db: Arc<Influxdb<Q>>,
req: QueryRequest,
) -> std::result::Result<impl Reply, Rejection> {
db.query(ctx, req)
.await
.map_err(reject::custom)
.map(|v| reply::json(&v))
}

// TODO: Request and response type don't match influxdb's API now.
#[allow(dead_code)]
pub async fn write<Q: QueryExecutor + 'static>(
ctx: RequestContext,
db: Arc<Influxdb<Q>>,
req: WriteRequest,
) -> std::result::Result<impl Reply, Rejection> {
db.write(ctx, req).await.map_err(reject::custom)
}
1 change: 1 addition & 0 deletions server/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

pub mod admin;
pub mod error;
pub mod influxdb;
pub mod prom;
pub mod query;

Expand Down
88 changes: 45 additions & 43 deletions server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ use crate::{
consts,
context::RequestContext,
error_util,
handlers::{self, prom::CeresDBStorage, query::Request},
handlers::{
self,
influxdb::{self, Influxdb},
prom::CeresDBStorage,
query::Request,
},
instance::InstanceRef,
metrics,
schema_config_provider::SchemaConfigProviderRef,
Expand Down Expand Up @@ -109,6 +114,7 @@ pub struct Service<Q> {
instance: InstanceRef<Q>,
profiler: Arc<Profiler>,
prom_remote_storage: RemoteStorageRef<RequestContext, crate::handlers::prom::Error>,
influxdb: Arc<Influxdb<Q>>,
tx: Sender<()>,
config: HttpConfig,
}
Expand All @@ -124,15 +130,20 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
fn routes(
&self,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
self.home()
.or(self.metrics())
.or(self.sql())
.or(self.influxql())
.or(self.heap_profile())
.or(self.admin_block())
.or(self.flush_memtable())
.or(self.update_log_level())
.or(self.prom_api())
warp::body::content_length_limit(self.config.max_body_size).and(
self.home()
// public APIs
.or(self.metrics())
.or(self.sql())
.or(self.influxdb_api())
.or(self.prom_api())
// admin APIs
.or(self.admin_block())
// debug APIs
.or(self.flush_memtable())
.or(self.update_log_level())
.or(self.heap_profile()),
)
}

/// Expose `/prom/v1/read` and `/prom/v1/write` to serve Prometheus remote
Expand Down Expand Up @@ -178,7 +189,6 @@ impl<Q: QueryExecutor + 'static> Service<Q> {

warp::path!("sql")
.and(warp::post())
.and(warp::body::content_length_limit(self.config.max_body_size))
.and(extract_request)
.and(self.with_context())
.and(self.with_instance())
Expand All @@ -200,41 +210,24 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
})
}

// POST /influxql
// this request type is not what influxdb API expected, the one in influxdb:
// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint
fn influxql(
/// POST `/influxdb/v1/query` and `/influxdb/v1/write`
fn influxdb_api(
&self,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
// accept json or plain text
let extract_request = warp::body::json()
.or(warp::body::bytes().map(Request::from))
.unify();
let write_api = warp::path!("write")
.and(self.with_context())
.and(self.with_influxdb())
.and(warp::body::bytes().map(influxdb::WriteRequest::from))
.and_then(influxdb::write);
let query_api = warp::path!("query")
.and(self.with_context())
.and(self.with_influxdb())
.and(warp::body::bytes().map(|bytes| QueryRequest::Influxql(Request::from(bytes))))
.and_then(influxdb::query);

warp::path!("influxql")
warp::path!("influxdb" / "v1" / ..)
.and(warp::post())
.and(warp::body::content_length_limit(self.config.max_body_size))
.and(extract_request)
.and(self.with_context())
.and(self.with_instance())
.and_then(|req, ctx, instance| async move {
let req = QueryRequest::Influxql(req);
let result = handlers::query::handle_query(&ctx, instance, req)
.await
// TODO: the sql's `convert_output` function may be not suitable to influxql.
// We should implement influxql's related function in later.
.map(handlers::query::convert_output)
.map_err(|e| {
// TODO(yingwen): Maybe truncate and print the sql
error!("Http service Failed to handle sql, err:{}", e);
Box::new(e)
})
.context(HandleRequest);
match result {
Ok(res) => Ok(reply::json(&res)),
Err(e) => Err(reject::custom(e)),
}
})
.and(write_api.or(query_api))
}

// POST /debug/flush_memtable
Expand Down Expand Up @@ -407,6 +400,13 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
warp::any().map(move || profiler.clone())
}

fn with_influxdb(
&self,
) -> impl Filter<Extract = (Arc<Influxdb<Q>>,), Error = Infallible> + Clone {
let influxdb = self.influxdb.clone();
warp::any().map(move || influxdb.clone())
}

fn with_instance(
&self,
) -> impl Filter<Extract = (InstanceRef<Q>,), Error = Infallible> + Clone {
Expand Down Expand Up @@ -474,15 +474,17 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
.context(MissingSchemaConfigProvider)?;
let prom_remote_storage = Arc::new(CeresDBStorage::new(
instance.clone(),
schema_config_provider,
schema_config_provider.clone(),
));
let influxdb = Arc::new(Influxdb::new(instance.clone(), schema_config_provider));
let (tx, rx) = oneshot::channel();

let service = Service {
engine_runtimes: engine_runtime.clone(),
log_runtime,
instance,
prom_remote_storage,
influxdb,
profiler: Arc::new(Profiler::default()),
tx,
config: self.config.clone(),
Expand Down

0 comments on commit cdfbacd

Please sign in to comment.