Skip to main content

Python Client (hugr-client)

Python client for the Hugr Data Mesh platform. Query data via GraphQL, get results as Arrow tables, pandas DataFrames, or interactive Perspective viewers.

Installation

pip install hugr-client

For interactive map visualizations (KeplerGL):

pip install hugr-client[viz]

Quick Start

from hugr import HugrClient

client = HugrClient() # reads connection from ~/.hugr/connections.json
result = client.query("{ core { data_sources { name } } }")

# Interactive Perspective viewer in JupyterLab
result

# pandas DataFrame
df = result.df("data.core.data_sources")

# pyarrow Table (zero-copy, no pandas overhead)
table = result.parts["data.core.data_sources"].to_arrow()

Connection

When using JupyterLab with hugr-kernel, connections are managed via the connection manager UI. hugr-client reads the same ~/.hugr/connections.json:

# Default connection
client = HugrClient()

# Named connection
client = HugrClient.from_connection("production")

# With connection= parameter
client = HugrClient(connection="staging")

From Environment Variables

# Uses HUGR_URL, HUGR_API_KEY, HUGR_TOKEN env vars
client = HugrClient()
VariableDescription
HUGR_URLHugr server URL (e.g., http://localhost:15000/ipc)
HUGR_API_KEYAPI key for authentication
HUGR_TOKENBearer token for authentication
HUGR_API_KEY_HEADERCustom API key header name (default: X-Hugr-Api-Key)
HUGR_ROLE_HEADERCustom role header name (default: X-Hugr-Role)
HUGR_TIMEZONEIANA timezone name (e.g., Europe/Moscow). Auto-detected from system if not set.
HUGR_TIMEZONE_HEADERCustom timezone header name (default: X-Hugr-Timezone)
HUGR_CONFIG_PATHCustom path to connections.json

Explicit Parameters

client = HugrClient(
url="http://localhost:15000/ipc",
api_key="sk-...",
api_key_header="X-Custom-Key", # optional custom header
role="analyst",
)

Priority: explicit parameters > environment variables > connections.json

Timezone

The client automatically detects the local system timezone and sends it with every request via the X-Hugr-Timezone header. This affects how Timestamp (TIMESTAMPTZ) values are displayed in query results.

# Auto-detected from system (default behavior)
client = HugrClient(url="...")

# Explicit timezone
client = HugrClient(url="...", timezone="Europe/Moscow")

# Disable timezone (server default will be used)
client = HugrClient(url="...", timezone="UTC")

DateTime values are never affected by the timezone setting.

Authentication

# API Key
client = HugrClient(url="...", api_key="key", api_key_header="X-My-Key")

# Bearer Token
client = HugrClient(url="...", token="eyJ...")

# Role-based access
client = HugrClient(url="...", api_key="key", role="analyst")

When using connection manager, auth is configured in the UI — hugr-client reads credentials from connections.json automatically. If a token expires (401), the client re-reads the file and retries (connection service may have refreshed the token).

Working with Results

Multipart Responses

Hugr returns multipart responses — one query can produce multiple data parts:

result = client.query("""
{
devices { id name geom }
drivers { id name }
}
""")

# Access individual parts
result.parts["data.devices"].df()
result.parts["data.drivers"].to_arrow()

# Display all parts (Perspective viewer in JupyterLab, HTML elsewhere)
result

Data Access Methods

part = result.parts["data.devices"]

# pyarrow Table (zero-copy, most efficient)
table = part.to_arrow()

# pandas DataFrame (fresh copy each call)
df = part.df()

# GeoDataFrame (with geometry decoding)
gdf = part.to_geo_dataframe("geom")

# Shortcuts on response object
df = result.df("data.devices")
gdf = result.gdf("data.devices", "geom")
record = result.record("data.device_by_pk")

Perspective Viewer

In JupyterLab with hugr-perspective-viewer installed, results render as interactive Perspective tables with sorting, filtering, and map visualization:

# Full response — tabs for each part
result

# Single part
result.parts["data.devices"]

This works by writing Arrow data to temporary spool files and emitting application/vnd.hugr.result+json MIME type — the same mechanism used by hugr-kernel and duckdb-kernel.

In VS Code or environments without Perspective, results fall back to HTML table display.

Geometry Support

Geometry fields are automatically detected from server metadata. Supported formats: WKB, GeoJSON, H3Cell.

# GeoDataFrame with CRS
gdf = result.gdf("data.devices", "geom")
print(gdf.crs) # EPSG:4326

# Nested geometry (auto-flattens to target field)
gdf = result.gdf("data.drivers", "devices.geom")

# GeoJSON export
layers = result.geojson_layers()

When writing to spool files for Perspective viewer, WKB geometry is automatically converted to native GeoArrow format (same as Go kernels).

Map Visualization

With hugr-client[viz]:

result.explore_map() # KeplerGL interactive map

Streaming API

For large datasets, use WebSocket streaming to process data in batches without loading everything into memory:

import asyncio
from hugr import connect_stream

async def main():
client = connect_stream() # reads from connections.json

# Stream Arrow batches
async with await client.stream("{ devices { id name geom } }") as stream:
async for batch in stream.chunks():
print(f"Batch: {batch.num_rows} rows")

# Collect into DataFrame
async with await client.stream("{ devices { id name } }") as stream:
df = await stream.to_pandas()

# Row-by-row processing
async with await client.stream("{ devices { id status } }") as stream:
async for row in stream.rows():
if row["status"] == "active":
print(row["id"])

asyncio.run(main())

Stream Methods

MethodDescription
stream.chunks()Async generator of Arrow RecordBatch
stream.rows()Async generator of dict rows
stream.to_pandas()Collect all batches into DataFrame
stream.count()Count total rows
client.cancel_current_query()Cancel running query

Streaming with Authentication

from hugr import connect_stream

# From connections.json
client = connect_stream()

# Explicit auth
client = connect_stream(
url="http://localhost:15000/ipc",
api_key="sk-...",
api_key_header="X-Custom-Key",
)

Using in Jupyter Notebooks

In JupyterLab, await works directly in cells:

from hugr import connect_stream

client = connect_stream()

async with await client.stream("{ devices { id name } }") as stream:
df = await stream.to_pandas()

df.head()

ETL / Headless Usage

hugr-client works without Jupyter — no spool files, no display overhead, no Jupyter imports:

from hugr import HugrClient

client = HugrClient()
result = client.query("{ data_source { id value } }")

# Pure data access — no side effects
table = result.to_arrow("data.data_source") # pyarrow.Table
df = result.df("data.data_source") # pandas.DataFrame

Incremental ML Training

from hugr import connect_stream
from sklearn.linear_model import SGDClassifier

async def train():
client = connect_stream()
model = SGDClassifier()

async with await client.stream("{ training_data { features label } }") as stream:
async for batch in stream.chunks():
df = batch.to_pandas()
model.partial_fit(df[["f1", "f2"]], df["label"], classes=[0, 1])

return model

JupyterLab Integration

Perspective Viewer Setup

To get interactive Perspective viewer rendering (same as hugr-kernel and duckdb-kernel), install the hugr-perspective-viewer extension:

pip install jupyterlab hugr-client hugr-perspective-viewer>=0.3.2

Then set the HUGR_SPOOL_EXTRA_DIRS environment variable so the spool proxy can find hugr-client's temporary Arrow files:

HUGR_SPOOL_EXTRA_DIRS=hugr-client jupyter lab

Now query results displayed in notebook cells render as interactive Perspective tables with sorting, filtering, column reordering, and map visualization for geometry data.

Without Perspective Viewer

If hugr-perspective-viewer is not installed, results display as HTML tables (pandas .to_html()). All data access methods (df(), to_arrow(), gdf()) work regardless.

Connection Manager

When hugr-perspective-viewer is installed, it also provides the Hugr connection manager UI in JupyterLab. Configure connections in the sidebar — hugr-client reads the same ~/.hugr/connections.json.

API Reference

HugrClient

HugrClient(
url=None, # Server URL (or from env/connections.json)
api_key=None, # API key
api_key_header=None, # Custom header name (default: X-Hugr-Api-Key)
token=None, # Bearer token
role=None, # User role
connection=None, # Connection name or dict from connections.json
)
MethodReturnsDescription
query(query, variables)HugrIPCResponseExecute GraphQL query
from_connection(name)HugrClientCreate from connections.json

HugrIPCResponse

MethodReturnsDescription
df(path)DataFramePart as pandas DataFrame
gdf(path, field)GeoDataFramePart as GeoDataFrame
to_arrow(path)pa.TablePart as Arrow Table
record(path)dictPart as dictionary
partsdictAll parts by path
extensions()dictQuery extensions
explore_map()KeplerGlMap visualization (requires [viz])

HugrIPCTable (individual part)

MethodReturnsDescription
df()DataFrameFresh pandas DataFrame
to_arrow()pa.TableZero-copy Arrow Table
to_geo_dataframe(field)GeoDataFrameWith geometry decoding
geojson_layers()dictGeoJSON FeatureCollections
explore_map()KeplerGlMap visualization (requires [viz])

Streaming

connect_stream(
url=None, api_key=None, api_key_header=None,
token=None, role=None, max_frame_size=128*1024*1024,
)
MethodDescription
stream(query, variables)Create async data stream
stream_data_object(obj, fields)Stream specific data object
cancel_current_query()Cancel running query
disconnect()Close WebSocket

Dependencies

Required: requests, requests-toolbelt, pyarrow, pandas, numpy, geopandas, shapely, websockets

Optional ([viz]): keplergl, pydeck, folium, matplotlib, mapclassify

Subscriptions

The Python client supports GraphQL subscriptions over WebSocket, delivering incremental results as Arrow data.

Basic Usage

import asyncio
from hugr import connect_stream

async def main():
client = connect_stream()

sub = await client.subscribe("""
subscription {
core { store {
subscribe(store: "redis", channel: "events") {
channel message
}
} }
}
""")

async for event in sub.events():
# Each event corresponds to one subscription data push
df = event.to_pandas()
print(df)

# Or access Arrow data directly
async for event in sub.events():
for chunk in event.chunks():
print(f"Batch: {chunk.num_rows} rows")
for row in event.rows():
print(row)

asyncio.run(main())

SubscriptionEvent

Each event yielded by sub.events() is a SubscriptionEvent representing one push from the server (one path):

MethodReturnsDescription
event.chunks()list[RecordBatch]Arrow RecordBatches for this event
event.rows()list[dict]Rows as dictionaries
event.to_pandas()DataFrameEvent data as pandas DataFrame

Multiple Subscriptions

Multiple subscriptions can run concurrently on the same WebSocket connection. The client uses Arrow schema metadata (subscription_id, path) to route incoming binary frames to the correct subscription handler -- no text frame markers are needed for demultiplexing.

async def main():
client = connect_stream()

sub1 = await client.subscribe("subscription { core { store { subscribe(store: \"redis\", channel: \"ch1\") { channel message } } } }")
sub2 = await client.subscribe("subscription { core { store { watch(store: \"redis\", pattern: \"user:*\") { key event } } } }")

async def handle(sub, name):
async for event in sub.events():
print(f"[{name}]", event.to_pandas())

await asyncio.gather(handle(sub1, "ch1"), handle(sub2, "watch"))

See Also