Berserk Docs

Query gRPC API

Streaming query execution over gRPC

The query service exposes a gRPC API on port 9510 alongside the HTTP API. The gRPC API provides streaming query results, making it suitable for large result sets and real-time progress updates.

QueryService

service QueryService {
  rpc ExecuteQuery(ExecuteQueryRequest) returns (stream ExecuteQueryResultFrame);
}

ExecuteQuery returns a stream of frames. Clients receive schema, row batches, progress updates, and metadata as separate frames, allowing incremental rendering of results.

Frame Types

FrameDescription
schemaTable schema (sent before any row batches for that table)
batchRow data. Each batch has a result_iteration_id — when the ID changes, discard previous rows
progressExecution statistics: rows processed, chunks scanned/skipped, timing
doneQuery complete, close the stream
errorError with code, message, and source location
metadataWarnings and visualization hints

Result Iterations

Row batches carry a result_iteration_id. When a new iteration starts, all previous rows should be discarded. This enables incremental refinement of results. The is_iteration_complete flag on each batch indicates whether the iteration is fully delivered.

Proto Definition

service QueryService {
  rpc ExecuteQuery(ExecuteQueryRequest) returns (stream ExecuteQueryResultFrame);
  // TODO: Plan endpoint
}

message ExecuteQueryRequest {
  string query = 1;

  string since = 2;
  string until = 3;
  string timezone = 4;
}

// A single frame in the stream of results for a query.
message ExecuteQueryResultFrame {
  string request_id = 1;
  oneof payload {
    // TableSchema contains the schema of one table in the results and will always be
    // sent before the associated RowBatches.
    TableSchema schema = 2;
    // RowBatch contains the rows. Note that each RowBatch belongs to a result iteration
    // and for every new result iteration, the old set of RowBatches should be discarded.
    // We need this setup of a result set of multiple rowbatches to avoid hitting the 4MB
    // limit in gRPC messages.
    RowBatch batch = 3;

    // If done is sent, then this stream can be closed and no more events will be sent.
    // TODO: We need to be able to signal that a result iteration is done. We should use completion for that.
    Completion done = 5;

    /// Aux events for progress, etc.
    // progress contains the progress since the last progress was sent.
    Progress progress = 4;
    // error indicates that some error happened that impacted the result set.
    Error error = 6;
    // metadata carries warnings or partial failures for the current result set.
    ResultMetadata metadata = 7;
    // TODO: Add healthcheck frame
  }
}

message TableSchema {
  string name = 1;
  repeated Column columns = 2;
}

message Column {
  string name = 1;
  ColumnType type = 2;
  bool nullable = 3;
}

enum ColumnType {
  COLUMN_TYPE_UNSPECIFIED = 0;
  COLUMN_TYPE_BOOL = 1;
  COLUMN_TYPE_INT32 = 2;
  COLUMN_TYPE_INT64 = 3;
  COLUMN_TYPE_DOUBLE = 4;
  COLUMN_TYPE_STRING = 5;
  COLUMN_TYPE_DATETIME = 6;
  COLUMN_TYPE_TIMESPAN = 7;
  COLUMN_TYPE_GUID = 8;
  COLUMN_TYPE_DYNAMIC = 9;
}

message RowBatch {
  // table_name referes to the name of the table as specified in a QueryTableSchema.
  string table_name = 1;
  // result_iteration_id is an opaque string that identifies a specific result iteration.
  // Whenever a new result iteration starts, all previous rows should be discarded. Thus
  // a result iteration is a complete set of rows for a query.
  string result_iteration_id = 2;
  // ValueRow contains rows in this batch.
  repeated ValueRow rows = 3;
  // True when this is the last batch for this result_iteration_id.
  // UI should only render complete iterations (where is_iteration_complete=true),
  // except for the first batch which can be shown immediately for fast feedback.
  bool is_iteration_complete = 4;
}

message ValueRow {
  // Values contains the values for this row. The order of values matches the order of columns
  // in the QueryTableSchema for the table this row belongs to.
  repeated segment_files.TTDynamic values = 1;
}

message Progress {
  // Execution statistics
  uint64 rows_processed = 1;
  uint64 chunks_total = 2;
  uint64 chunks_scanned = 3;
  uint64 chunks_skipped_range = 4;
  uint64 chunks_skipped_bloom = 5;
  uint64 chunks_skipped_shar = 6;
  uint64 predicate_checks = 7;
  // Field 8 removed (was bloom_checks - dead code)
  bool short_circuit_completion = 9;
  uint64 chunk_scanned_raw_body_size = 10;
  uint64 chunk_skipped_raw_body_size = 11;
  uint64 chunk_skipped_compressed_size = 12;
  uint64 chunk_scan_time_nanos = 13;
  uint64 query_time_nanos = 14;
  uint64 chunk_scanned_compressed_size = 15;
  // Bin progress for queries using bin() on numeric or time fields
  optional BinProgress bin_progress = 16;
  // Time spent waiting in queue before execution started (nanoseconds).
  // Present when the query was queued due to concurrent query limits.
  optional uint64 queue_wait_nanos = 17;
  // Planning progress during segment processing phase.
  // Present during planning, None after planning completes.
  optional PlanningProgress planning_progress = 18;
  // Total bytes of BLOM chunks evaluated during bloom filtering.
  uint64 bloom_filter_bytes = 19;
  // Wall-clock time spent in merge+delivery across all query threads (nanoseconds).
  uint64 merge_time_nanos = 20;
  // Chunks scanned that yielded zero rows passing filters (false positives from pre-filtering).
  uint64 chunks_empty_scan = 21;
  // Chunks that passed pre-filters but failed during row processing (e.g. type conversion error).
  uint64 chunks_errored = 22;
  // Chunks skipped because required input fields were absent from the chunk.
  uint64 chunks_skipped_required_fields = 23;
  // Extended per-operator diagnostics in a stable key/value envelope.
  repeated OperatorDiagnostics operator_diagnostics = 24;
}

message OperatorDiagnostics {
  string kind = 1;
  uint32 operator_id = 2;
  repeated KeyValue values = 3;
}

message KeyValue {
  string key = 1;
  string value = 2;
}

// Progress information for the planning phase.
// During planning, workers process segments (loading MIMX, applying filters,
// computing chunk/slice overlaps). This tracks that progress.
message PlanningProgress {
  // Number of segments that have completed planning across all workers.
  uint64 segments_done = 1;
  // Total number of segments to process across all workers.
  uint64 segments_total = 2;
}

// Progress information for bin() aggregations.
// Reports completion progress per bin for queries using `summarize ... by bin(field, span)`.
message BinProgress {
  // Start value of the first bin (the bin boundary).
  // Bin N covers range [first_bin_start + N * bin_span, first_bin_start + (N+1) * bin_span)
  sint64 first_bin_start = 1;
  // Span (width) of each bin.
  uint64 bin_span = 2;
  // Completion percentage (0-100) for each bin.
  // Index i corresponds to bin i. Value 100 means all workers have reported.
  bytes completion_percentages = 3;
}

message Completion {
  // Empty marker to signal query completion
}

message Error {
  // Error code - discriminator for error_details (e.g., "UnknownFunction", "TypeMismatch")
  string code = 1;
  // Brief title
  string title = 2;
  // Support ticket ID for tracking
  string support_ticket_id = 4;
  // Pre-rendered error message with span annotations (for simple clients)
  string message = 5;
  // Source location
  Location location = 7;
  // JSON-encoded error details (actual KustoError for reconstruction)
  string details = 8;
}

// Source code location
message Location {
  uint32 start_byte = 1;
  uint32 end_byte = 2;
  uint32 start_line = 3;
  uint32 start_column = 4;
  uint32 end_line = 5;
  uint32 end_column = 6;
}

message PartialFailure {
  repeated string segment_ids = 1;
  string message = 2;
}

// Visualization metadata from render operator
message VisualizationMetadata {
  // Visualization type: "table", "timechart", "linechart", etc.
  optional string visualization_type = 1;
  map<string, string> properties = 2;
}

message ResultMetadata {
  repeated PartialFailure partial_failures = 1;
  // Visualization metadata from render operator (if present)
  optional VisualizationMetadata visualization = 2;
  // Execution warnings (limits hit, etc.)
  repeated QueryWarning warnings = 3;
}

// Warning produced during query execution (e.g., limit was hit).
message QueryWarning {
  // Operator ID for source correlation
  uint32 operator_id = 1;
  // Branch index for fork queries (0 = not a fork)
  optional uint32 branch_index = 2;
  // Warning kind discriminator (e.g., "SummarizeMemoryLimit")
  string kind = 3;
  // Source location (resolved from span registry)
  optional Location location = 4;
  // Human-readable message
  string message = 5;
  // Structured details as JSON
  string details = 6;
}

On this page