Skip to main content

Go Client

Go client for the Hugr Data Mesh platform. Execute GraphQL queries and subscriptions with results delivered as Apache Arrow record readers.

Installation

go get github.com/hugr-lab/query-engine/client

Quick Start

package main

import (
"context"
"fmt"
"log"

"github.com/hugr-lab/query-engine/client"
)

func main() {
c := client.NewClient("http://localhost:15000/ipc",
client.WithApiKey("sk-..."),
)

resp, err := c.Query(context.Background(),
`{ core { data_sources { name type } } }`,
nil,
)
if err != nil {
log.Fatal(err)
}

for _, part := range resp.Parts {
fmt.Printf("Path: %s, Rows: %d\n", part.Path, part.Reader.NumRows())
}
}

Client Options

c := client.NewClient(url,
client.WithApiKey("sk-..."),
client.WithTimeout(30 * time.Second),
client.WithSubscriptionPool(10, 5),
)
OptionDescription
WithApiKey(key)API key sent via X-Hugr-Api-Key header
WithTimeout(d)HTTP request timeout
WithSubscriptionPool(max, idle)WebSocket connection pool for subscriptions

Queries

The Query method executes a GraphQL query and returns a *types.Response containing one or more Arrow parts.

resp, err := c.Query(ctx, query, variables)
ParameterTypeDescription
querystringGraphQL query string
variablesmap[string]anyQuery variables (nil if none)

The response contains Parts — each part has a Path (e.g., data.devices) and a Reader (arrow.RecordReader).

resp, err := c.Query(ctx, `
query($limit: Int!) {
devices(limit: $limit) { id name status }
}
`, map[string]any{"limit": 100})
if err != nil {
log.Fatal(err)
}

for _, part := range resp.Parts {
reader := part.Reader
defer reader.Release()
for reader.Next() {
rec := reader.Record()
fmt.Printf("%s: %d rows\n", part.Path, rec.NumRows())
}
}

Subscriptions

Connection Pool

When WithSubscriptionPool is configured, subscriptions share a pool of WebSocket connections. This is suitable for many short-lived or concurrent subscriptions.

c := client.NewClient(url,
client.WithSubscriptionPool(10, 5), // max 10 connections, 5 idle
)

sub, err := c.Subscribe(ctx, `
subscription {
query(interval: 5) {
devices { id status }
}
}
`, nil)
if err != nil {
log.Fatal(err)
}
defer sub.Close()

for event := range sub.Events() {
fmt.Printf("Path: %s\n", event.Path)
reader := event.Reader
for reader.Next() {
rec := reader.Record()
fmt.Printf(" %d rows\n", rec.NumRows())
}
reader.Release()
}

Subscription Events

Each SubscriptionEvent contains:

FieldTypeDescription
PathstringData path (e.g., data.devices)
Readerarrow.RecordReaderArrow record reader for this path's data

For multi-path subscriptions, events arrive interleaved — one event per path per tick.

Dedicated Connections

For long-running subscriptions or when you need full control over the WebSocket lifecycle, use a dedicated connection:

conn, err := c.NewSubscriptionConn(ctx)
if err != nil {
log.Fatal(err)
}
defer conn.Close()

sub, err := conn.Subscribe(ctx, `
subscription {
core {
models {
completion(model: "my_llm", prompt: "Explain GraphQL") {
content
finish_reason
}
}
}
}
`, nil)
if err != nil {
log.Fatal(err)
}

for event := range sub.Events() {
fmt.Printf("[%s] data received\n", event.Path)
}

A dedicated connection supports multiple concurrent subscriptions and is not returned to the pool.

Periodic Subscription

sub, err := c.Subscribe(ctx, `
subscription {
query(interval: 10, count: 60) {
sensor_readings { sensor_id temperature }
}
}
`, nil)
if err != nil {
log.Fatal(err)
}
defer sub.Close()

for event := range sub.Events() {
reader := event.Reader
for reader.Next() {
rec := reader.Record()
// process each tick's data
}
reader.Release()
}

Multi-Path Subscription

sub, err := c.Subscribe(ctx, `
subscription {
query(interval: 5) {
devices { id status }
alerts { id severity message }
}
}
`, nil)
if err != nil {
log.Fatal(err)
}
defer sub.Close()

for event := range sub.Events() {
switch event.Path {
case "data.devices":
// handle device updates
case "data.alerts":
// handle alert updates
}
event.Reader.Release()
}

Cleanup

Close all pooled subscription connections when the client is no longer needed:

c.CloseSubscriptions()

This drains the connection pool and closes all idle WebSocket connections. Active subscriptions are cancelled.

See Also