Query gRPC API
Streaming query execution over gRPC
The query service exposes a gRPC API on port 9510 alongside the HTTP API. The gRPC API provides streaming query results, making it suitable for large result sets and real-time progress updates.
QueryService
service QueryService {
rpc ExecuteQuery(ExecuteQueryRequest) returns (stream ExecuteQueryResultFrame);
}ExecuteQuery returns a stream of frames. Clients receive schema, row batches, progress updates, and metadata as separate frames, allowing incremental rendering of results.
Frame Types
| Frame | Description |
|---|---|
schema | Table schema (sent before any row batches for that table) |
batch | Row data. Each batch has a result_iteration_id — when the ID changes, discard previous rows |
progress | Execution statistics: rows processed, chunks scanned/skipped, timing |
done | Query complete, close the stream |
error | Error with code, message, and source location |
metadata | Warnings and visualization hints |
Result Iterations
Row batches carry a result_iteration_id. When a new iteration starts, all previous rows should be discarded. This enables incremental refinement of results. The is_iteration_complete flag on each batch indicates whether the iteration is fully delivered.
Proto Definition
// The query service executes BQL queries and streams results back to the client.
service QueryService {
// Execute a BQL query and receive results as a stream of typed frames.
// The stream delivers Schema, RowBatch, Progress, and Metadata frames,
// terminated by a Completion frame or an Error frame.
rpc ExecuteQuery(ExecuteQueryRequest) returns (stream ExecuteQueryResultFrame);
}
// Request to execute a BQL query.
message ExecuteQueryRequest {
// The BQL query string to execute.
string query = 1;
// Start of the time range (inclusive). Accepts relative expressions like "1h ago"
// or absolute timestamps like "2024-01-01T00:00:00Z".
string since = 2;
// End of the time range (exclusive). Same format as `since`. Defaults to now.
string until = 3;
// IANA timezone name for time-based operations (e.g., "America/New_York").
// Defaults to UTC if not specified.
string timezone = 4;
}
// A single frame in the streaming response for a query.
// Frames arrive in order: Schema frames first, then interleaved RowBatch and Progress frames,
// and finally a Completion or Error frame.
message ExecuteQueryResultFrame {
// Unique identifier for this request, echoed from the server.
string request_id = 1;
oneof payload {
// Table schema — sent once per table before any RowBatch frames for that table.
TableSchema schema = 2;
// A batch of result rows. Multiple batches may arrive for the same table and iteration.
// When `result_iteration_id` changes, discard all previous rows and start fresh.
RowBatch batch = 3;
// Signals that the query has completed and no more frames will be sent.
Completion done = 5;
// Cumulative execution statistics. Sent periodically during query execution.
// Each Progress frame supersedes the previous one.
Progress progress = 4;
// A query execution error. The stream ends after this frame.
Error error = 6;
// Warnings, partial failures, and visualization hints for the current result set.
ResultMetadata metadata = 7;
}
}
// Schema definition for a result table.
message TableSchema {
// Table name (e.g., "PrimaryResult", "ExtraTable_0", or a fork branch name).
string name = 1;
// Ordered list of columns in this table.
repeated Column columns = 2;
}
// A column definition within a table schema.
message Column {
// Column name.
string name = 1;
// Data type of the column.
ColumnType type = 2;
// Whether the column may contain null values.
bool nullable = 3;
// Berserk extension. Optional structural type for `dynamic` columns
// when the engine knows the inner shape (e.g. an `Array<Real>`
// produced by `make-series`, or an `Object{...}` produced by
// `bag_pack`). Absent means "opaque dynamic, infer by convention"
// — the same shape ADX returns. Clients that don't know this
// field ignore it (proto3 unknown-field tolerance) and continue
// operating against a plain `dynamic` column type.
optional StructuralType structural_type = 4;
}
// Structural type information for a `dynamic` column. Recursive
// because dynamics can be arrays of objects, objects with array
// fields, etc. Mirrors the engine-internal `AnnotationType` ADT.
message StructuralType {
oneof kind {
// Element is a scalar of the given type. The outer `Column.type`
// is `dynamic`; this `scalar` carries the inner type.
ColumnType scalar = 1;
// Element is a homogeneous array of the given inner shape.
// E.g. `make-series s = sum(v) on t step 5s` produces:
// - `s`: array_elem = scalar(LONG) — array of longs
// - `t`: array_elem = scalar(DATETIME) — array of datetimes
StructuralType array_elem = 2;
// Element is an object (property bag) with named fields. Each
// field carries its own structural type. E.g. `series_decompose`
// returns `{baseline, seasonal, trend, residual}` each of which
// is an array of reals.
ObjectSchema object = 3;
}
}
// Object schema for a `dynamic` column whose inner shape is a
// property bag with known named fields.
message ObjectSchema {
repeated ObjectField fields = 1;
}
// One named field within an `ObjectSchema`.
message ObjectField {
string name = 1;
StructuralType type = 2;
}
// BQL data types for column definitions.
enum ColumnType {
COLUMN_TYPE_UNSPECIFIED = 0;
COLUMN_TYPE_BOOL = 1;
COLUMN_TYPE_INT = 2;
COLUMN_TYPE_LONG = 3;
COLUMN_TYPE_REAL = 4;
COLUMN_TYPE_STRING = 5;
COLUMN_TYPE_DATETIME = 6;
COLUMN_TYPE_TIMESPAN = 7;
COLUMN_TYPE_GUID = 8;
COLUMN_TYPE_DYNAMIC = 9;
}
// A batch of rows belonging to a single table and result iteration.
message RowBatch {
// Name of the table this batch belongs to (matches a TableSchema.name).
string table_name = 1;
// Opaque identifier for the current result iteration. When this value changes,
// all previously received rows for all tables must be discarded — the new iteration
// represents a more complete result set that supersedes the previous one.
string result_iteration_id = 2;
// Rows in this batch. Column order matches the TableSchema for this table.
repeated ValueRow rows = 3;
// True when this is the last batch for this table in the current iteration.
// Clients should show the first batch immediately for fast feedback, then
// accumulate subsequent batches until this flag is true.
bool is_iteration_complete = 4;
}
// A single row of dynamically-typed values.
message ValueRow {
// Cell values in column order. Each value corresponds to the column at the same
// index in the TableSchema.
repeated berserk.BqlValue values = 1;
}
// Cumulative execution statistics for the running query.
// Each Progress frame contains the total counts since the query started —
// always use the latest frame and discard earlier ones.
message Progress {
// Total rows processed across all chunks.
uint64 rows_processed = 1;
// Total number of chunks in the query's time range.
uint64 chunks_total = 2;
// Chunks that were scanned (read and evaluated).
uint64 chunks_scanned = 3;
// Chunks skipped because their time range didn't overlap the query range.
uint64 chunks_skipped_range = 4;
// Chunks skipped by bloom filter (no matching values).
uint64 chunks_skipped_bloom = 5;
// Chunks skipped by shard hash (not matching the target shard).
uint64 chunks_skipped_shard = 6;
// Total predicate evaluations performed during scanning.
uint64 predicate_checks = 7;
reserved 8;
reserved "bloom_checks";
// True when the query completed early via short-circuit optimization.
bool short_circuit_completion = 9;
// Total uncompressed bytes of scanned chunk bodies.
uint64 chunk_scanned_raw_body_size = 10;
// Total uncompressed bytes of skipped chunk bodies.
uint64 chunk_skipped_raw_body_size = 11;
// Total compressed bytes of skipped chunks.
uint64 chunk_skipped_compressed_size = 12;
// Wall-clock time spent scanning chunks (nanoseconds).
uint64 chunk_scan_time_nanos = 13;
// Total query execution time (nanoseconds).
uint64 query_time_nanos = 14;
// Total compressed bytes of scanned chunks.
uint64 chunk_scanned_compressed_size = 15;
// Per-bin completion progress for `summarize ... by bin()` queries.
optional BinProgress bin_progress = 16;
// Time spent waiting in the query queue before execution started (nanoseconds).
// Present when the query was queued due to concurrent query limits.
optional uint64 queue_wait_nanos = 17;
// Segment planning progress. Present during planning, absent after planning completes.
optional PlanningProgress planning_progress = 18;
// Total bytes of bloom filter data evaluated during bloom filtering.
uint64 bloom_filter_bytes = 19;
// Wall-clock time spent in merge and delivery across all query threads (nanoseconds).
uint64 merge_time_nanos = 20;
// Chunks that were scanned but yielded zero matching rows (false positives from pre-filtering).
uint64 chunks_empty_scan = 21;
// Chunks that encountered errors during row processing (e.g., type conversion failure).
uint64 chunks_errored = 22;
// Chunks skipped because required input fields were absent from the chunk schema.
uint64 chunks_skipped_required_fields = 23;
// Per-operator diagnostic telemetry in a stable key-value envelope.
repeated OperatorDiagnostics operator_diagnostics = 24;
// Chunks scanned where the range predicate could not be resolved at chunk level and
// required per-row evaluation (i.e., the hoisted range did not fully cover the chunk).
uint64 chunks_range_per_row = 25;
// Number of merge+delivery invocations summed across all query threads.
// Pair counter for `merge_time_nanos`.
uint64 merge_count = 26;
// Wall-clock time spent cloning reducer state during merge invocations
// (nanoseconds, summed across all threads).
uint64 reducer_clone_time_nanos = 27;
// Number of reducer-state clones recorded into `reducer_clone_time_nanos`.
uint64 reducer_clone_count = 28;
// Cumulative wall-clock time the coordinator spent building intermediate
// streaming snapshots (excludes the final result build). Nanoseconds.
uint64 snapshot_build_time_nanos = 29;
// Number of intermediate snapshot builds counted into
// `snapshot_build_time_nanos`. Pair counter; consumers can compute the
// average build duration that drives adaptive snapshot pacing.
uint64 snapshot_build_count = 30;
// Sum of thread CPU time spent inside the chunks arm of
// `query_thread_loop` (nanoseconds, across all worker threads). Pair
// with the chunks-arm wall (`worker_processing_time_nanos` engine-side):
// wall ≫ CPU implies threads are parked on I/O / mutexes /
// oversubscription; wall ≈ CPU implies CPU is the bottleneck.
uint64 worker_chunks_arm_cpu_nanos = 31;
// Sum of wall-clock time worker threads spent inside
// `reader.fetch_chunk_data().await` — i.e. the cache_server fetch path
// for chunk bytes. Nanoseconds, across all worker threads.
uint64 worker_fetching_chunk_nanos = 32;
// Sum of wall-clock time inside the synchronous chunk-body closure
// (decompression + per-row scan + branch accounting). Nanoseconds,
// across all worker threads. Pair with `worker_fetching_chunk_nanos`
// to split chunk-arm wall into I/O wait vs scan body.
uint64 worker_chunk_body_nanos = 33;
// Generic counter bag (`CustomStats` from #2830). Lets engine emit
// new measurements without a proto schema change for each one.
// Keys are dotted lowercase namespaces — `bloom.short_circuit_chunks`,
// `storage.bytes_fetched_cold`, `predicate.<i>.rows_kept`. Values are
// sums across all worker threads; the coordinator merges per-key
// before serializing.
map<string, uint64> custom_stats = 34;
// Cumulative cache-layer S3 retries observed answering this query.
// Surfaced by `SegmentCacheTrait::fetch_stats_snapshot`; the
// cache_server reports per-`OpenedItem` retries from its bounded
// `get_cache_handle_inner` loop and the client tallies them on the
// session. Non-zero is normal under flaky-backend conditions; a
// sustained delta says the backend is the bottleneck.
uint64 s3_retries_total = 35;
// Cumulative cache-layer S3 give-ups for this query — one per cache
// open that hit the bounded retry deadline or returned a
// non-retriable error. Each give-up typically corresponds to a
// PartialFailure for the segment it was trying to open.
uint64 s3_giveups_total = 36;
}
// Diagnostic telemetry for a specific query operator.
message OperatorDiagnostics {
// Operator kind (e.g., "summarize", "join").
string kind = 1;
// Operator ID within the query plan.
uint32 operator_id = 2;
// Key-value diagnostic entries.
repeated KeyValue values = 3;
}
// A string key-value pair.
message KeyValue {
string key = 1;
string value = 2;
}
// Segment planning progress.
message PlanningProgress {
// Number of segments that have completed planning.
uint64 segments_done = 1;
// Total number of segments to plan.
uint64 segments_total = 2;
}
// Per-bin completion progress for `summarize ... by bin()` queries.
message BinProgress {
// Start value of the first bin boundary.
// Bin N covers [first_bin_start + N * bin_span, first_bin_start + (N+1) * bin_span).
sint64 first_bin_start = 1;
// Width of each bin.
uint64 bin_span = 2;
// Completion percentage (0-100) for each bin, one byte per bin.
// Index i corresponds to bin i. Value 100 means fully scanned.
bytes completion_percentages = 3;
}
// Signals that the query has completed successfully.
message Completion {}
// A query execution error.
message Error {
// Error code discriminator (e.g., "UnknownFunction", "TypeMismatch").
string code = 1;
// Brief error title.
string title = 2;
// Support ticket ID for tracking.
string support_ticket_id = 4;
// Human-readable error message with source annotations.
string message = 5;
// Source code location where the error occurred.
Location location = 7;
// Structured error details as JSON.
string details = 8;
}
// A range within the query source text.
message Location {
uint32 start_byte = 1;
uint32 end_byte = 2;
uint32 start_line = 3;
uint32 start_column = 4;
uint32 end_line = 5;
uint32 end_column = 6;
}
// A partial failure for one or more segments that could not be read.
message PartialFailure {
// IDs of the affected segments.
repeated string segment_ids = 1;
// Human-readable error description.
string message = 2;
}
// Visualization metadata from the `render` operator.
message VisualizationMetadata {
// Visualization type (e.g., "table", "timechart", "piechart", "linechart").
optional string visualization_type = 1;
// Visualization properties (e.g., x-column, y-columns, legend).
map<string, string> properties = 2;
}
// Metadata for the current result set.
message ResultMetadata {
// Segments that could not be read (partial data loss).
repeated PartialFailure partial_failures = 1;
// Visualization hints from the `render` operator, if present.
optional VisualizationMetadata visualization = 2;
// Execution warnings (e.g., "summarize memory limit reached", "result truncated").
repeated QueryWarning warnings = 3;
}
// A warning produced during query execution.
message QueryWarning {
// Operator ID for source correlation.
uint32 operator_id = 1;
// Branch index for fork queries (0 = main branch).
optional uint32 branch_index = 2;
// Warning kind discriminator (e.g., "SummarizeMemoryLimit", "ResultTruncated").
string kind = 3;
// Source location where the warning originated.
optional Location location = 4;
// Human-readable warning message.
string message = 5;
// Structured warning details as JSON.
string details = 6;
}