Berserk Docs

Meta gRPC API

Catalog management, ingestion, merge tasks, and cluster operations

The meta service exposes a gRPC API on port 9500 (configurable via META_BIND). It manages all cluster metadata: datasets, schemas, views, projections, ingestion streams, merge tasks, and segment lifecycle. gRPC reflection is enabled for tooling support.

Services Overview

ServiceDescription
CatalogServiceManages the data catalog: datasets, schemas, views, projections, and their relationships.
IngestionServiceManages data ingestion: worker configuration, segment registration, and stream lifecycle.
IngestTokenServiceManages ingest tokens for authentication and signal routing.
NurseryServiceManages ingest stream lifecycle, merged segment registration, and offset tracking.
MergeTaskServiceCoordinates segment merge tasks between the janitor and merge workers.
SegmentDeletionServiceManages cleanup of tombstoned segments from storage.
SegmentLookupServiceDiscovers segments matching a time range and dataset filter.
JanitorServiceProvides cluster statistics and merge task management for maintenance operations.

CatalogService

Manages the data catalog: datasets, schemas, views, projections, and their relationships.

RPCs

RPCDescription
ListDatasourcesList all datasources (legacy aggregated view)
CreateDatasetCreate a new dataset
GetDatasetGet dataset by ID or name
ListDatasetsList datasets with pagination
DeleteDatasetDelete a dataset
SetShardingFieldsConfigure dataset sharding fields
GetShardingFieldsGet dataset sharding configuration
CreateSchemaCreate a new schema
GetSchemaGet schema by ID or name
ListSchemasList schemas with pagination
UpdateSchemaUpdate schema (rename, add/remove columns)
DeleteSchemaDelete a schema
CreateViewCreate a new view
GetViewGet view by ID or name
ListViewsList views with pagination
UpdateViewUpdate view properties
DeleteViewDelete a view
CreateProjectionCreate a new projection
GetProjectionGet projection by ID or name
ListProjectionsList projections with pagination
UpdateProjectionUpdate projection definition
DeleteProjectionDelete a projection
LinkViewDatasetProjectionLink a view, dataset, and projection together
UnlinkViewDatasetProjectionRemove a view-dataset-projection link
ListViewDatasetProjectionsList view-dataset-projection links

Proto Definition

service CatalogService {
  rpc ListDatasources(ListDatasourcesRequest) returns (ListDatasourcesResponse);

  // Dataset CRUD
  rpc CreateDataset(CreateDatasetRequest) returns (CreateDatasetResponse);
  rpc GetDataset(GetDatasetRequest) returns (GetDatasetResponse);
  rpc ListDatasets(ListDatasetsRequest) returns (ListDatasetsResponse);
  rpc DeleteDataset(DeleteDatasetRequest) returns (DeleteDatasetResponse);

  // Dataset Sharding Fields
  rpc SetShardingFields(SetShardingFieldsRequest) returns (SetShardingFieldsResponse);
  rpc GetShardingFields(GetShardingFieldsRequest) returns (GetShardingFieldsResponse);

  // Schema CRUD
  rpc CreateSchema(CreateSchemaRequest) returns (CreateSchemaResponse);
  rpc GetSchema(GetSchemaRequest) returns (GetSchemaResponse);
  rpc ListSchemas(ListSchemasRequest) returns (ListSchemasResponse);
  rpc UpdateSchema(UpdateSchemaRequest) returns (UpdateSchemaResponse);
  rpc DeleteSchema(DeleteSchemaRequest) returns (DeleteSchemaResponse);

  // View CRUD
  rpc CreateView(CreateViewRequest) returns (CreateViewResponse);
  rpc GetView(GetViewRequest) returns (GetViewResponse);
  rpc ListViews(ListViewsRequest) returns (ListViewsResponse);
  rpc UpdateView(UpdateViewRequest) returns (UpdateViewResponse);
  rpc DeleteView(DeleteViewRequest) returns (DeleteViewResponse);

  // Projection CRUD
  rpc CreateProjection(CreateProjectionRequest) returns (CreateProjectionResponse);
  rpc GetProjection(GetProjectionRequest) returns (GetProjectionResponse);
  rpc ListProjections(ListProjectionsRequest) returns (ListProjectionsResponse);
  rpc UpdateProjection(UpdateProjectionRequest) returns (UpdateProjectionResponse);
  rpc DeleteProjection(DeleteProjectionRequest) returns (DeleteProjectionResponse);

  // View-Dataset-Projection Links
  rpc LinkViewDatasetProjection(LinkViewDatasetProjectionRequest) returns (LinkViewDatasetProjectionResponse);
  rpc UnlinkViewDatasetProjection(UnlinkViewDatasetProjectionRequest) returns (UnlinkViewDatasetProjectionResponse);
  rpc ListViewDatasetProjections(ListViewDatasetProjectionsRequest) returns (ListViewDatasetProjectionsResponse);
}

// region: Common Types

enum KqlType {
  KQL_TYPE_BOOL = 0;
  KQL_TYPE_INT = 1;
  KQL_TYPE_LONG = 2;
  KQL_TYPE_REAL = 3;
  KQL_TYPE_STRING = 4;
  KQL_TYPE_DATETIME = 5;
  KQL_TYPE_TIMESPAN = 6;
  KQL_TYPE_GUID = 7;
  KQL_TYPE_DYNAMIC = 8;
}

message PaginationRequest {
  optional uint32 limit = 1;
  optional string after = 2;
  optional string before = 3;
}

message PaginationResponse {
  optional string next_cursor = 1;
  optional string prev_cursor = 2;
  uint64 total = 3;
}

message Column {
  string name = 1;
  KqlType type = 2;
}

// endregion

// region: Dataset Messages

message CreateDatasetRequest {
  string name = 1;
}

message CreateDatasetResponse {
  string id = 1;
  string name = 2;
}

message GetDatasetRequest {
  oneof identifier {
    string id = 1;
    string name = 2;
  }
}

message GetDatasetResponse {
  string id = 1;
  string name = 2;
}

message ListDatasetsRequest {
  PaginationRequest pagination = 1;
}

message ListDatasetsResponse {
  repeated Dataset items = 1;
  PaginationResponse pagination = 2;
}

message Dataset {
  string id = 1;
  string name = 2;
}

message DeleteDatasetRequest {
  oneof identifier {
    string id = 1;
    string name = 2;
  }
}

message DeleteDatasetResponse {
  bool success = 1;
}

message SetShardingFieldsRequest {
  oneof identifier {
    string id = 1;
    string name = 2;
  }
  repeated ShardingField fields = 3;
}

message SetShardingFieldsResponse {
  repeated ShardingField fields = 1;
}

message GetShardingFieldsRequest {
  oneof identifier {
    string id = 1;
    string name = 2;
  }
}

message GetShardingFieldsResponse {
  repeated ShardingField fields = 1;
}

// endregion

// region: Schema Messages

message CreateSchemaRequest {
  string name = 1;
  repeated Column columns = 2;
}

message CreateSchemaResponse {
  string id = 1;
  string name = 2;
  repeated Column columns = 3;
}

message GetSchemaRequest {
  oneof identifier {
    string id = 1;
    string name = 2;
  }
}

message GetSchemaResponse {
  string id = 1;
  string name = 2;
  repeated Column columns = 3;
}

message ListSchemasRequest {
  PaginationRequest pagination = 1;
}

message ListSchemasResponse {
  repeated Schema items = 1;
  PaginationResponse pagination = 2;
}

message Schema {
  string id = 1;
  string name = 2;
  repeated Column columns = 3;
}

message UpdateSchemaRequest {
  oneof identifier {
    string id = 1;
    string name = 2;
  }
  optional string new_name = 3;
  repeated Column add_columns = 4;
  repeated string remove_columns = 5;
}

message UpdateSchemaResponse {
  string id = 1;
  string name = 2;
  repeated Column columns = 3;
}

message DeleteSchemaRequest {
  oneof identifier {
    string id = 1;
    string name = 2;
  }
}

message DeleteSchemaResponse {
  bool success = 1;
}

// endregion

// region: View Messages

message CreateViewRequest {
  string name = 1;
  optional string description = 2;
  optional string schema_id = 3;
}

message CreateViewResponse {
  string id = 1;
  string name = 2;
  optional string description = 3;
  optional string schema_id = 4;
  string created_at = 5;
}

message GetViewRequest {
  oneof identifier {
    string id = 1;
    string name = 2;
  }
}

message GetViewResponse {
  string id = 1;
  string name = 2;
  optional string description = 3;
  optional string schema_id = 4;
  string created_at = 5;
}

message ListViewsRequest {
  PaginationRequest pagination = 1;
  optional string schema_id = 2;
}

message ListViewsResponse {
  repeated View items = 1;
  PaginationResponse pagination = 2;
}

message View {
  string id = 1;
  string name = 2;
  optional string description = 3;
  optional string schema_id = 4;
  string created_at = 5;
}

message UpdateViewRequest {
  oneof identifier {
    string id = 1;
    string name = 2;
  }
  optional string new_name = 3;
  optional string description = 4;
  optional string schema_id = 5;
}

message UpdateViewResponse {
  string id = 1;
  string name = 2;
  optional string description = 3;
  optional string schema_id = 4;
  string created_at = 5;
}

message DeleteViewRequest {
  oneof identifier {
    string id = 1;
    string name = 2;
  }
}

message DeleteViewResponse {
  bool success = 1;
}

// endregion

// region: Projection Messages

message CreateProjectionRequest {
  string name = 1;
  string projection = 2;
  optional string schema_id = 3;
}

message CreateProjectionResponse {
  string id = 1;
  string name = 2;
  string projection = 3;
  optional string schema_id = 4;
}

message GetProjectionRequest {
  oneof identifier {
    string id = 1;
    string name = 2;
  }
}

message GetProjectionResponse {
  string id = 1;
  string name = 2;
  string projection = 3;
  optional string schema_id = 4;
}

message ListProjectionsRequest {
  PaginationRequest pagination = 1;
  optional string schema_id = 2;
}

message ListProjectionsResponse {
  repeated Projection items = 1;
  PaginationResponse pagination = 2;
}

message Projection {
  string id = 1;
  string name = 2;
  string projection = 3;
  optional string schema_id = 4;
}

message UpdateProjectionRequest {
  oneof identifier {
    string id = 1;
    string name = 2;
  }
  optional string new_name = 3;
  optional string projection = 4;
  optional string schema_id = 5;
}

message UpdateProjectionResponse {
  string id = 1;
  string name = 2;
  string projection = 3;
  optional string schema_id = 4;
}

message DeleteProjectionRequest {
  oneof identifier {
    string id = 1;
    string name = 2;
  }
}

message DeleteProjectionResponse {
  bool success = 1;
}

// endregion

// region: View-Dataset-Projection Link Messages

message LinkViewDatasetProjectionRequest {
  string dataset_id = 1;
  string view_id = 2;
  string projection_id = 3;
}

message LinkViewDatasetProjectionResponse {
  bool success = 1;
}

message UnlinkViewDatasetProjectionRequest {
  string dataset_id = 1;
  string view_id = 2;
  string projection_id = 3;
}

message UnlinkViewDatasetProjectionResponse {
  bool success = 1;
}

message ListViewDatasetProjectionsRequest {
  PaginationRequest pagination = 1;
  optional string dataset_id = 2;
  optional string view_id = 3;
  optional string projection_id = 4;
}

message ListViewDatasetProjectionsResponse {
  repeated ViewDatasetProjection items = 1;
  PaginationResponse pagination = 2;
}

message ViewDatasetProjection {
  string dataset_id = 1;
  string view_id = 2;
  string projection_id = 3;
}

// endregion

// region: Legacy Messages (for backward compatibility)

message ListDatasourcesRequest {
  optional uint32 limit = 1;
  optional string after = 2;
  optional string before = 3;
}

message ListDatasourcesResponse {
  repeated DatasourceSummary items = 1;
  optional string next_cursor = 2;
  optional string prev_cursor = 3;
  uint64 total = 4;
}

message DatasourceSummary {
  string id = 1;
  string name = 2;
  ViewSummary view_details = 3;
}

message ViewSummary {
  repeated ViewProjection projections = 1;
}

message ViewProjection {
  string dataset_name = 1;
  string expression = 2;
}

// endregion

IngestionService

Manages data ingestion: worker configuration, segment registration, and stream lifecycle.

RPCs

RPCDescription
DownloadConfigGet configuration for an ingest worker
SaveSegmentsRegister newly created segments
RegisterForStreamAuthenticate an ingest token and get/create a stream for a collector
CommitOffsetCommit a baby segment offset for a stream

Proto Definition

service IngestionService {
  // DownloadConfig returns the config for a single worker.
  rpc DownloadConfig(DownloadConfigRequest) returns (DownloadConfigResponse);
  rpc SaveSegments(SaveSegmentsRequest) returns (SaveSegmentsResponse);
  // RegisterForStream authenticates an ingest token and returns a stream for this collector.
  // Returns the existing active stream for the collector_id, or creates a new one.
  rpc RegisterForStream(RegisterForStreamRequest) returns (RegisterForStreamResponse);
  rpc CommitOffset(CommitOffsetRequest) returns (CommitOffsetResponse);
}

message DownloadConfigRequest {
  string worker_id = 1;
}

message DownloadConfigResponse {
  string config_version = 1;
  // Field 2 removed (was dataset_id) - use default_dataset.id instead
  CloudObjectPrefix cloud_object_prefix = 3;
  DatasetInfo default_dataset = 4;
  repeated DatasetInfo other_datasets = 5;
}

message SaveSegmentsRequest {
  repeated CreateSegment segments = 1;
}

message SaveSegmentsResponse {
  repeated string ids = 1;
}

message RegisterForStreamRequest {
  reserved 1;
  string collector_string = 2;
  string collector_id = 3;
  string ingest_token = 4;
}

message StreamConfig {
  CloudObjectPrefix cloud_object_prefix = 1;
  uint64 flush_timeout_ms = 2;
  uint64 max_segment_size_bytes = 3;
}

message RegisterForStreamResponse {
  string stream_id = 1;
  StreamConfig config = 2;
  // UUID of the ingest token (from the meta database).
  // Tjalfe uses this as the key in CombinedSignals.token_signals
  // so the plaintext token is never persisted to S3.
  string token_id = 3;
}

// Baby segment offset commit
message CommitOffsetRequest {
  string stream_id = 1;
  int64 committed_offset = 2;
  int64 start_time = 3;
  int64 end_time = 4;
  int64 size_bytes = 5;
  int64 num_records = 6;
}

message CommitOffsetResponse {
  // Empty on success
}

IngestTokenService

Manages ingest tokens for authentication and signal routing.

RPCs

RPCDescription
CreateIngestTokenCreate a new token (plaintext shown only once)
ValidateIngestTokenValidate a token and return its routing
RevokeIngestTokenRevoke a token by name
DeleteIngestToken
ListIngestTokensList all tokens
GetIngestTokenGet token details by ID
UpdateIngestTokenRoutingUpdate signal routing for a token

Proto Definition

service IngestTokenService {
  rpc CreateIngestToken(CreateIngestTokenRequest) returns (CreateIngestTokenResponse);
  rpc ValidateIngestToken(ValidateIngestTokenRequest) returns (ValidateIngestTokenResponse);
  rpc RevokeIngestToken(RevokeIngestTokenRequest) returns (RevokeIngestTokenResponse);
  rpc DeleteIngestToken(DeleteIngestTokenRequest) returns (DeleteIngestTokenResponse);
  rpc ListIngestTokens(ListIngestTokensRequest) returns (ListIngestTokensResponse);
  rpc GetIngestToken(GetIngestTokenRequest) returns (GetIngestTokenResponse);
  rpc UpdateIngestTokenRouting(UpdateIngestTokenRoutingRequest) returns (UpdateIngestTokenRoutingResponse);
}

message IngestTokenRouting {
  string traces_dataset_id = 1;
  string logs_dataset_id = 2;
  string metrics_dataset_id = 3;
}

message IngestTokenInfo {
  string id = 1;
  string name = 2;
  string created_at = 3;
  string revoked_at = 4;  // empty if not revoked
  IngestTokenRouting routing = 5;
  string token_hint = 6;  // last 4 chars of the plaintext token
}

message CreateIngestTokenRequest {
  string name = 1;
  string dataset_name = 2;
}

message CreateIngestTokenResponse {
  string plaintext_token = 1;  // shown only once
  IngestTokenInfo token = 2;
}

message ValidateIngestTokenRequest {
  string plaintext_token = 1;
}

message ValidateIngestTokenResponse {
  string token_id = 1;
  IngestTokenRouting routing = 2;
}

message RevokeIngestTokenRequest {
  string token_id = 1;
}

message RevokeIngestTokenResponse {
  // empty on success
}

message DeleteIngestTokenRequest {
  string token_id = 1;
}

message DeleteIngestTokenResponse {
  // empty on success
}

message ListIngestTokensRequest {
  // no parameters
}

message ListIngestTokensResponse {
  repeated IngestTokenInfo tokens = 1;
}

message GetIngestTokenRequest {
  string token_id = 1;
}

message GetIngestTokenResponse {
  IngestTokenInfo token = 1;
}

message UpdateIngestTokenRoutingRequest {
  string token_id = 1;
  IngestTokenRouting routing = 2;
}

message UpdateIngestTokenRoutingResponse {
  // empty on success
}

NurseryService

Manages ingest stream lifecycle, merged segment registration, and offset tracking.

RPCs

RPCDescription
GetIngestStreamsForNurseryGet all active streams and dataset routing info
MarkStreamStoppedMark a stream as stopped (client disconnect or timeout)
RegisterMergedSegmentRegister a newly merged segment and update offsets
UpdateDeletedOffsetUpdate the deleted offset after cleaning baby segments
DeleteIngestStreamDatasetRemove a fully-processed dataset entry from a stream

Proto Definition

service NurseryService {
  rpc GetIngestStreamsForNursery(GetIngestStreamsRequest)
      returns (GetIngestStreamsResponse);
  rpc MarkStreamStopped(MarkStreamStoppedRequest)
      returns (MarkStreamStoppedResponse);
  rpc RegisterMergedSegment(RegisterMergedSegmentRequest)
      returns (RegisterMergedSegmentResponse);
  rpc UpdateDeletedOffset(UpdateDeletedOffsetRequest)
      returns (UpdateDeletedOffsetResponse);
  rpc DeleteIngestStreamDataset(DeleteIngestStreamDatasetRequest)
      returns (DeleteIngestStreamDatasetResponse);
}

message GetIngestStreamsRequest {}

// Checkpoint tracking the mapping between a merged segment offset and stream offset
message OffsetCheckpoint {
  // The MVCC version of the merged segment
  int64 segment_offset = 1;
  // The stream offset at the time of this merge
  int64 stream_offset = 2;
  // Ingest timestamp of the baby segment (nanoseconds since unix epoch)
  int64 ingest_timestamp_nanos = 3;
}

// Dataset-specific routing information within a stream
message IngestStreamDatasetInfo {
  string dataset_id = 1;
  // Last merged offset. Nursery should fetch from merged_offset + 1
  int64 merged_offset = 2;
  // Ingest timestamp of the last merged baby segment (nanoseconds since unix epoch)
  // NOTE: Source precision is seconds (from S3 last_modified), sub-second portion is always zero
  // Used by query engine to determine which segments to query
  int64 merged_ingest_time_nanos = 3;
  // Highest offset that has been deleted from S3 (default -1 = no data deleted yet)
  // Nursery should only delete offsets <= merged_offset
  int64 deleted_offset = 4;
  // Ingest timestamp of the baby segment at deleted_offset (nanoseconds since unix epoch)
  // NOTE: Source precision is seconds (from S3 last_modified), sub-second portion is always zero
  int64 deleted_ingest_time_nanos = 5;
  // Dataset stream creation time in nanoseconds since unix epoch
  int64 created_at_nanos = 6;
  // S3 location for uploading merged segments: endpoint, bucket, path prefix
  // Path format: dataset/<dataset_id>/merged/
  CloudObjectPrefix cloud_merged_segments_prefix = 7;
  // Checkpoints tracking recent merge points (segment_offset, stream_offset, ingest_timestamp)
  // Pruned to keep only checkpoints from the last 30 minutes
  repeated OffsetCheckpoint offset_checkpoints = 8;
}

// External stream information - matches two-table design (ingest_streams + ingest_stream_datasets)
message IngestStreamInfo {
  string stream_id = 1;
  // S3 location for baby segments: endpoint, bucket, path prefix
  // Path format: ingest-stream/<stream_id>/
  CloudObjectPrefix cloud_stream_prefix = 2;
  // If set, stream is stopped (no more baby segments will arrive)
  // Nursery can release stopped streams after processing remaining segments
  optional int64 stopped_at_nanos = 3;
  // Offset of the stop message (only set if stopped_at_nanos is set)
  // A fully-merged stream has merged_offset == stopped_offset
  optional int64 stopped_offset = 4;
  // Dataset-specific routing information for this stream
  repeated IngestStreamDatasetInfo datasets = 5;
}

// Per-token routing: maps each signal type to a target dataset.
// Keyed by token_id (UUID), matching the key in CombinedSignals.token_signals.
message IngestTokenRoutingInfo {
  // UUID of the ingest token (from the meta database).
  // Matches the key used in CombinedSignals.token_signals.
  string token_id = 1;
  string traces_dataset_id = 2;
  string logs_dataset_id = 3;
  string metrics_dataset_id = 4;
}

message GetIngestStreamsResponse {
  repeated IngestStreamInfo streams = 1;
  // All known datasets (name → id mapping for routing resolution)
  repeated DatasetInfo datasets = 2;
  // Monotonic sequence number for staleness detection.
  // Nursery should discard responses with a lower sequence than already seen.
  int64 response_seq = 3;
  // Active ingest token routing for token-based signal routing
  repeated IngestTokenRoutingInfo token_routing = 4;
}

// Reason why a stream was stopped
enum StopReason {
  STOP_REASON_UNSPECIFIED = 0;
  // Client (ingester) wrote a stop message
  STOP_REASON_CLIENT = 1;
  // Nursery detected inactivity timeout
  STOP_REASON_TIMEOUT = 2;
}

message MarkStreamStoppedRequest {
  string stream_id = 1;
  // When the stream was stopped (nanoseconds since unix epoch)
  int64 stopped_at_nanos = 2;
  // Final processed offset in the stream
  int64 final_offset = 3;
  // Reason for stopping
  StopReason reason = 4;
}

message MarkStreamStoppedResponse {}

// Per-stream offset info for merged segment registration
message StreamOffsetInfo {
  string stream_id = 1;
  string dataset_id = 2;
  // New merged_offset after this merge (max baby segment offset included)
  int64 new_merged_offset = 3;
  // Ingest timestamp (S3 last_modified) of the highest-offset baby segment
  // NOTE: S3 provides second-level precision, sub-second portion is always zero
  int64 ingest_time_nanos = 4;
  // What merged_offset was when nursery selected segments for merge.
  // Meta validates this matches current value to detect conflicts.
  int64 start_merged_offset = 5;
}

message RegisterMergedSegmentRequest {
  // Segment metadata
  CreateSegment segment = 1;
  // Per-stream offset and ingest time info
  repeated StreamOffsetInfo stream_offsets = 2;
}

message RegisterMergedSegmentResponse {
  // The MVCC version assigned to the segment
  int64 segment_version = 1;
  // Updated stream info for nursery to cache (avoids separate poll)
  repeated IngestStreamInfo streams = 2;
  // Monotonic sequence number for staleness detection (same counter as GetIngestStreamsResponse)
  int64 response_seq = 3;
}

message UpdateDeletedOffsetRequest {
  string stream_id = 1;
  int64 deleted_offset = 3;
  // Ingest timestamp of the baby segment at deleted_offset (nanoseconds since unix epoch)
  int64 deleted_ingest_time_nanos = 4;
}

message UpdateDeletedOffsetResponse {}

message DeleteIngestStreamDatasetRequest {
  string stream_id = 1;
  string dataset_id = 2;
  // The current (highest) offset nursery is processing for this stream.
  // Meta validates this is >= the dataset's merged_offset as a sanity check.
  int64 current_offset = 3;
  // Nursery's view of the dataset's merged_offset.
  // Meta validates this matches its own merged_offset and deleted_offset
  // to catch stale nursery state.
  int64 merged_offset = 4;
}

message DeleteIngestStreamDatasetResponse {}

MergeTaskService

Coordinates segment merge tasks between the janitor and merge workers.

RPCs

RPCDescription
PollForSegmentMergeTaskWorker polls for an available merge task
CompleteSegmentMergeTaskWorker reports successful merge with the new segment
FailSegmentMergeTaskWorker reports merge failure

Proto Definition

service MergeTaskService {
  rpc PollForSegmentMergeTask(PollForSegmentMergeTaskRequest) returns (PollForSegmentMergeTaskResponse);
  rpc CompleteSegmentMergeTask(CompleteSegmentMergeTaskRequest) returns (CompleteSegmentMergeTaskResponse);
  rpc FailSegmentMergeTask(FailSegmentMergeTaskRequest) returns (FailSegmentMergeTaskResponse);
}

message PollForSegmentMergeTaskRequest {
  string worker_id = 1;
}

message PollForSegmentMergeTaskResponse {
  optional SegmentMergeTaskInfo task = 1;
}

message CompleteSegmentMergeTaskRequest {
  string task_id = 1;
  CreateSegment merged_segment = 2;
}

message CompleteSegmentMergeTaskResponse {
  oneof result {
    CompleteSegmentMergeTaskSuccess success = 1;
    CompleteSegmentMergeTaskError error = 2;
  }
}

message CompleteSegmentMergeTaskSuccess {}

message CompleteSegmentMergeTaskError {
  string message = 1;
}

message FailSegmentMergeTaskRequest {
  string task_id = 1;
  string error_message = 2;
}

message FailSegmentMergeTaskResponse {}

SegmentDeletionService

Manages cleanup of tombstoned segments from storage.

RPCs

RPCDescription
GetTombstonedSegmentsForDeletionGet segments marked for deletion
ConfirmTombstoneDeletionConfirm segments have been deleted from storage

Proto Definition

service SegmentDeletionService {
  rpc GetTombstonedSegmentsForDeletion(GetTombstonedSegmentsForDeletionRequest) returns (GetTombstonedSegmentsForDeletionResponse);
  rpc ConfirmTombstoneDeletion(ConfirmTombstoneDeletionRequest) returns (ConfirmTombstoneDeletionResponse);
}

message GetTombstonedSegmentsForDeletionRequest {
  // Empty - service uses internal configuration
}

message GetTombstonedSegmentsForDeletionResponse {
  repeated TombstonedSegmentInfo segments = 1;
}

message TombstonedSegmentInfo {
  string segment_id = 1;
  string dataset_id = 2;
  CloudObjectKey cloud_object_key = 3;
  int64 tombstone_time_nanos = 4;
  int64 tombstone_version = 5;
}

message ConfirmTombstoneDeletionRequest {
  repeated string segment_ids = 1;
}

message ConfirmTombstoneDeletionResponse {
  oneof result {
    ConfirmTombstoneDeletionSuccess success = 1;
    ConfirmTombstoneDeletionError error = 2;
  }
}

message ConfirmTombstoneDeletionSuccess {
  int32 deleted_count = 1;
}

message ConfirmTombstoneDeletionError {
  string message = 1;
}

SegmentLookupService

Discovers segments matching a time range and dataset filter.

RPCs

RPCDescription
FindSegmentsFind segments by time range and dataset names (paginated)
GetSegmentsByIdsGet specific segments by their IDs

Proto Definition

service SegmentLookupService {
  rpc FindSegments(FindSegmentsRequest) returns (FindSegmentsResponse);
  rpc GetSegmentsByIds(GetSegmentsByIdsRequest) returns (GetSegmentsByIdsResponse);
}

message FindSegmentsRequest {
  TimeRange time_range = 1;
  repeated string dataset_names = 3;
  int32 limit = 4;
  optional string cursor = 5;
}

message PagedResponse {
  optional string next_cursor = 1;
}

message FindSegmentsResponse {
  repeated SegmentInfo segments = 1;
  PagedResponse page = 2;
}

message GetSegmentsByIdsRequest {
  repeated string segment_ids = 1;
}

message GetSegmentsByIdsResponse {
  repeated SegmentInfo segments = 1;
}

JanitorService

Provides cluster statistics and merge task management for maintenance operations.

RPCs

RPCDescription
GetJanitorStatsGet segment count, sizes, and outstanding merge tasks
GetSegmentSizeStatsGet segment size distribution by tier
ListMergeTasksList active merge tasks
GetMergeTaskGet detailed info about a specific merge task
UnclaimMergeTaskRelease a stale merge task claim
CreateMergeTasksCreate new merge tasks for eligible segments
RewriteAllSegmentsCreate one single-segment merge task per existing segment not already in a merge task. This forces every segment through the rewrite pipeline to pick up index fixes.

Proto Definition

service JanitorService {
  rpc GetJanitorStats(GetJanitorStatsRequest) returns (GetJanitorStatsResponse);
  rpc GetSegmentSizeStats(GetSegmentSizeStatsRequest) returns (GetSegmentSizeStatsResponse);
  rpc ListMergeTasks(ListMergeTasksRequest) returns (ListMergeTasksResponse);
  rpc GetMergeTask(GetMergeTaskRequest) returns (GetMergeTaskResponse);
  rpc UnclaimMergeTask(UnclaimMergeTaskRequest) returns (UnclaimMergeTaskResponse);
  rpc CreateMergeTasks(CreateMergeTasksRequest) returns (CreateMergeTasksResponse);
  // Create one single-segment merge task per existing segment not already in a merge task.
  // This forces every segment through the rewrite pipeline to pick up index fixes.
  rpc RewriteAllSegments(RewriteAllSegmentsRequest) returns (RewriteAllSegmentsResponse);
}

message GetSegmentSizeStatsRequest {
  // Optional maximum compressed size in bytes (e.g., 10485760 for 10MB).
  // Only segments with compressed_size <= this value are included.
  optional int64 max_compressed_size = 1;
  // Optional minimum age in seconds. Only segments created after now - min_age are included.
  optional int64 min_age_secs = 2;
  // Optional maximum age in seconds. Only segments created before now - max_age are included.
  optional int64 max_age_secs = 3;
  optional string dataset_name = 4;
}

message SegmentSizeTier {
  int32 tier = 1;
  string tier_min = 2;
  string tier_max = 3;
  int64 segment_count = 4;
  int64 total_size = 5;
  int64 smallest = 6;
  int64 largest = 7;
  int64 avg_size = 8;
  string earliest_time = 9;
  string latest_time = 10;
}

message GetSegmentSizeStatsResponse {
  repeated SegmentSizeTier tiers = 1;
}

message GetJanitorStatsRequest {
  optional string dataset_name = 1;
}

message GetJanitorStatsResponse {
  int64 segment_count = 1;
  int64 outstanding_merge_tasks = 2;
  int64 total_size_bytes = 3;
  int64 total_compressed_size_bytes = 4;
  int64 segments_incl_tombstoned = 5;
}

message ListMergeTasksRequest {
  optional string dataset_name = 1;
}

message ListMergeTasksResponse {
  repeated MergeTaskSummary tasks = 1;
}

message MergeTaskSummary {
  string task_id = 1;
  string dataset_id = 2;
  int32 segment_count = 3;
  string assigned_to_worker = 4;
  string created_at = 5;
  string assigned_at = 6;
}

message GetMergeTaskRequest {
  string task_id = 1;
}

message UnclaimMergeTaskRequest {
  string task_id = 1;
  // Minimum duration in seconds the task must have been claimed before it can be unclaimed.
  // Defaults to 3600 (1 hour) if not set.
  int64 min_claimed_secs = 2;
}

message UnclaimMergeTaskResponse {
  // True if the task was unclaimed, false if it was not found or not yet old enough.
  bool unclaimed = 1;
}

message CreateMergeTasksRequest {
  // Optional maximum compressed size in bytes. Only segments with compressed_size <= this value.
  optional int64 max_compressed_size = 1;
  // Optional minimum age in seconds. Only segments older than now - min_age.
  optional int64 min_age_secs = 2;
  // Optional maximum age in seconds. Only segments newer than now - max_age.
  optional int64 max_age_secs = 3;
  optional string dataset_name = 4;
}

message CreateMergeTasksResponse {
  // Number of merge tasks created
  int32 tasks_created = 1;
  // Total number of segments included across all tasks
  int64 segments_included = 2;
}

message GetMergeTaskResponse {
  string task_id = 1;
  string dataset_id = 2;
  int32 segment_count = 3;
  string assigned_to_worker = 4;
  string created_at = 5;
  int64 smallest_segment_size = 6;
  int64 largest_segment_size = 7;
  int64 smallest_segment_compressed_size = 8;
  int64 largest_segment_compressed_size = 9;
  string earliest_time = 10;
  string latest_time = 11;
  int64 total_size = 12;
  int64 total_compressed_size = 13;
  string assigned_at = 14;
}

message RewriteAllSegmentsRequest {
  // Optional: limit to a specific dataset
  optional string dataset_name = 1;
}

message RewriteAllSegmentsResponse {
  // Number of single-segment merge tasks created
  int32 tasks_created = 1;
  // Total number of segments that will be rewritten
  int64 segments_included = 2;
}

Common Types

Shared message types used across multiple services.

message CloudBucket {
  string endpoint = 1;
  string bucket = 2;
  string region = 3;       // empty = provider default
  string path = 4;         // customer-configured base path within bucket (can be empty)
  string provider = 5;     // "s3" (default) or "gcs"
}

message CloudObjectPrefix {
  CloudBucket bucket = 1;
  string prefix = 2;       // e.g. "datasets/ds-123/segments"
}

message CloudObjectKey {
  CloudObjectPrefix prefix = 1;
  string key = 2;           // e.g. "seg-456.ttseg"
}

message TimeRange {
  int64 start_time = 1; // in nanoseconds since the unix epoch
  int64 end_time = 2; // in nanoseconds since the unix epoch
}

message CreateSegment {
  string dataset_id = 1;
  TimeRange time_range = 2;
  int64 size_bytes = 3;
  int64 compressed_size_bytes = 4;
  int32 number_of_events = 5;
  CloudObjectKey cloud_object_key = 6;
}

message SegmentInfo {
  string id = 1;
  TimeRange time_range = 2;
  int64 size_bytes = 3;
  int64 compressed_size_bytes = 4;
  int32 number_of_events = 5;
  string dataset_id = 6;
  CloudObjectKey cloud_object_key = 7;
}

message ShardingField {
  string field_name = 1;
  double weight = 2;
}

message SegmentMergeTaskInfo {
  string task_id = 1;
  string dataset_id = 2;
  repeated SegmentInfo segments = 4;
  CloudObjectPrefix cloud_object_prefix = 5;
  repeated ShardingField sharding_fields = 6;
}

message DatasetInfo {
  string id = 1;
  string name = 2;
  CloudObjectPrefix cloud_object_prefix = 3;
}

On this page