Verified Commit 7c8ee23a authored by Michael Usachenko's avatar Michael Usachenko Committed by GitLab
Browse files

feat(pipeline): hoist up query pipeline as its own crate + decouple formatting

parent 970dde94
Loading
Loading
Loading
Loading
+55 −0
Original line number Diff line number Diff line
@@ -2753,6 +2753,10 @@ dependencies = [
 "prost-build",
 "prost-types",
 "query-engine",
 "querying-formatters",
 "querying-pipeline",
 "querying-shared-stages",
 "querying-types",
 "reqwest 0.13.2",
 "rustc-hash",
 "rustls",
@@ -3408,6 +3412,8 @@ dependencies = [
 "gkg-server",
 "gkg-utils",
 "query-engine",
 "querying-formatters",
 "querying-shared-stages",
 "serde_json",
 "testcontainers",
 "tokio",
@@ -3439,6 +3445,9 @@ dependencies = [
 "parking_lot",
 "prost",
 "query-engine",
 "querying-formatters",
 "querying-pipeline",
 "querying-shared-stages",
 "serde",
 "serde_json",
 "sha2",
@@ -5176,6 +5185,52 @@ dependencies = [
 "thiserror 2.0.18",
]

[[package]]
name = "querying-formatters"
version = "0.1.0"
dependencies = [
 "arrow",
 "gkg-utils",
 "indexmap 2.13.0",
 "query-engine",
 "querying-shared-stages",
 "querying-types",
 "serde",
 "serde_json",
]

[[package]]
name = "querying-pipeline"
version = "0.1.0"
dependencies = [
 "ontology",
 "query-engine",
 "thiserror 2.0.18",
]

[[package]]
name = "querying-shared-stages"
version = "0.1.0"
dependencies = [
 "arrow",
 "ontology",
 "query-engine",
 "querying-pipeline",
 "querying-types",
 "tokio",
]

[[package]]
name = "querying-types"
version = "0.1.0"
dependencies = [
 "arrow",
 "gkg-utils",
 "query-engine",
 "serde",
 "serde_json",
]

[[package]]
name = "quote"
version = "1.0.45"
+4 −0
Original line number Diff line number Diff line
@@ -24,6 +24,10 @@ members = [
  "crates/integration-testkit",
  "crates/utils",
  "crates/integration-tests",
  "crates/querying-types",
  "crates/querying-pipeline",
  "crates/querying-shared-stages",
  "crates/querying-formatters",
]

[workspace.dependencies]
+4 −0
Original line number Diff line number Diff line
@@ -35,6 +35,10 @@ reqwest = { workspace = true }
rustls = { workspace = true }
parking_lot = { workspace = true }
query-engine = { path = "../query-engine" }
querying-types = { path = "../querying-types" }
querying-pipeline = { path = "../querying-pipeline" }
querying-shared-stages = { path = "../querying-shared-stages" }
querying-formatters = { path = "../querying-formatters" }
gitlab-client = { workspace = true }
jsonwebtoken = { workspace = true }
labkit-rs = { path = "../labkit-rs", features = ["http", "grpc", "metrics-axum", "metrics-grpc"] }
+17 −23
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@ use tracing::{info, instrument};

use crate::auth::JwtValidator;
use crate::cluster_health::ClusterHealthChecker;
use crate::pipeline::{QueryPipelineService, receive_query_request, send_query_error};
use crate::proto::{
    ExecuteQueryMessage, ExecuteQueryResult, GetClusterHealthRequest, GetClusterHealthResponse,
    GetGraphSchemaRequest, GetGraphSchemaResponse, ListToolsRequest, ListToolsResponse,
@@ -21,10 +22,8 @@ use crate::proto::{
    SchemaNodeStyle, SchemaProperty, StructuredSchema, ToolDefinition as ProtoToolDefinition,
    execute_query_message, get_graph_schema_response,
};
use crate::query_pipeline::{
    GoonFormatter, GraphFormatter, QueryPipelineService, receive_query_request, send_query_error,
};
use crate::tools::{ToolRegistry, ToolService};
use querying_formatters::{GoonFormatter, GraphFormatter, ResultFormatter};

use super::auth::extract_claims;

@@ -36,8 +35,7 @@ pub struct KnowledgeGraphServiceImpl {
    validator: Arc<JwtValidator>,
    ontology: Arc<Ontology>,
    tool_service: ToolService,
    query_pipeline: QueryPipelineService<GraphFormatter>,
    llm_pipeline: QueryPipelineService<GoonFormatter>,
    pipeline: QueryPipelineService,
    cluster_health: Arc<ClusterHealthChecker>,
}

@@ -50,15 +48,12 @@ impl KnowledgeGraphServiceImpl {
    ) -> Self {
        let client = Arc::new(clickhouse_config.build_client());
        let tool_service = ToolService::new(Arc::clone(&ontology));
        let query_pipeline =
            QueryPipelineService::new(Arc::clone(&ontology), Arc::clone(&client), GraphFormatter);
        let llm_pipeline = QueryPipelineService::new(Arc::clone(&ontology), client, GoonFormatter);
        let pipeline = QueryPipelineService::new(Arc::clone(&ontology), client);
        Self {
            validator,
            ontology,
            tool_service,
            query_pipeline,
            llm_pipeline,
            pipeline,
            cluster_health,
        }
    }
@@ -113,8 +108,7 @@ impl crate::proto::knowledge_graph_service_server::KnowledgeGraphService
        let mut stream = request.into_inner();
        let (tx, rx) = mpsc::channel(4);

        let raw_pipeline = self.query_pipeline.clone();
        let llm_pipeline = self.llm_pipeline.clone();
        let pipeline = self.pipeline.clone();

        tokio::spawn(async move {
            let req = match receive_query_request(&mut stream, &tx).await {
@@ -126,15 +120,9 @@ impl crate::proto::knowledge_graph_service_server::KnowledgeGraphService

            let use_llm_format = req.format == ResponseFormat::Llm as i32;

            let result = if use_llm_format {
                llm_pipeline
                    .run_query(&claims, &req.query, &tx, &mut stream)
                    .await
            } else {
                raw_pipeline
                    .run_query(&claims, &req.query, &tx, &mut stream)
                    .await
            };
            let result = pipeline
                .run_query(claims, &req.query, tx.clone(), stream)
                .await;

            match result {
                Ok(output) => {
@@ -142,10 +130,16 @@ impl crate::proto::knowledge_graph_service_server::KnowledgeGraphService

                    use crate::proto::execute_query_result::Content;

                    let formatted = if use_llm_format {
                        GoonFormatter.format(&output)
                    } else {
                        GraphFormatter.format(&output)
                    };

                    let content = if use_llm_format {
                        Some(Content::FormattedText(output.formatted_result.to_string()))
                        Some(Content::FormattedText(formatted.to_string()))
                    } else {
                        Some(Content::ResultJson(output.formatted_result.to_string()))
                        Some(Content::ResultJson(formatted.to_string()))
                    };

                    let metadata = Some(QueryMetadata {
+1 −1
Original line number Diff line number Diff line
@@ -5,8 +5,8 @@ pub mod config;
pub mod constants;
pub mod grpc;
pub mod health_check;
pub mod pipeline;
pub mod proto;
pub mod query_pipeline;
pub mod redaction;
pub mod secret_file_source;
pub mod shutdown;
Loading