Guides
UDFs
A UDF is a stateless worker that preserves row count: one input row produces one derived attribute on the same row. Embeddings, classifications, tags, and attribute backfills all use the same primitive.
Use a pipeline when external data becomes rows, or when one row fans out into many rows. Use a UDF when rows already in Layer acquire derived attributes.
Gateway Worker Deployment
| create ID scan |
| POST /v2/namespaces/{ns}/scans |
| filters: spec.filter |
| |
| enqueue (namespace, id) rows |
| into the UDF queue |
| |
| <----- POST /v2/udfs/{id}/claim ---|
| -----> rows + input columns ------>|
| | fn(*, id, title) -> list[str]
| <- POST /v2/udfs/{id}/items/complete
| |
| writeback: Turbopuffer patch_columns
Author a worker
The Python SDK turns a normal function into the claim/process/complete loop.
import asyncio
from hevlayer.udf import PermanentError, TransientError, run_udf_worker, udf
@udf(inputs=["id", "title", "description"], output="tags", kind="tags")
def tag_product(*, id: str, title: str | None, description: str | None) -> list[str]:
if not title:
raise PermanentError(f"{id}: missing title")
try:
text = f"{title} {description or ''}".lower()
except TypeError as exc:
raise TransientError(str(exc)) from exc
tags: list[str] = []
if "wireless" in text:
tags.append("wireless")
if "waterproof" in text:
tags.append("waterproof")
return tags or ["uncategorized"]
if __name__ == "__main__":
asyncio.run(run_udf_worker(tag_product, udf_id="product-tags"))
Function parameters are keyword-only and named to match inputs.
Raise TransientError for retryable work and PermanentError for
unrecoverable input.
Declare the function
Apply a Function CRD. The operator emits a worker Deployment,
optional Service for push dispatch, and a KEDA ScaledObject from
spec.scaling. The gateway uses the Function spec to register the UDF
queue and discovery policy.
apiVersion: hevlayer.com/v1alpha1
kind: Function
metadata:
name: product-tags
namespace: hev-shop
spec:
paused: false
targetNamespaces:
- amazon-products
inputs:
- id
- title
- description
output:
attribute: tags
kind: tags
version: v1
filter:
- "Or"
- - ["tags_v", "NotEq", "v1"]
- ["tags_v", "Eq", null]
triggers:
- discovery
worker:
image: ghcr.io/hev/hev-shop-udf-product-tags:latest
dispatch: pull
batchSize: 16
timeoutSeconds: 30
schedule:
discoveryIntervalSeconds: 300
leaseSeconds: 120
maxInFlightBatches: 4
maxConcurrentScans: 1
retry:
maxAttempts: 6
initialBackoffSeconds: 5
maxBackoffSeconds: 300
scaling:
pool: cpu
mode: autoscale
replicas:
min: 0
max: 4
spec.filter is the same JSON tuple syntax used in Turbopuffer queries.
The 0.1 CRD preserves array-form filters, so compound expressions like
the example above can be applied directly.
The worker pod receives HEVLAYER_UDF_ID, HEVLAYER_BASE_URL,
HEVLAYER_UDF_BATCH_SIZE, HEVLAYER_UDF_TIMEOUT_SECONDS,
HEVLAYER_UDF_LEASE_SECONDS, and LAYER_GATEWAY_API_KEY.
The CRD is the source of truth for the worker shape. Use
POST /v2/udfs/{id}/discover, claim, and complete only for runtime
coordination and manual recovery; do not create a separate Deployment for
the same function unless you also take over scaling and placement.
Scaling and placement
spec.scaling is the scaling contract for the Function worker.
| Field | Purpose |
|---|---|
pool | Name of a compute pool in InfraRules/default. |
mode | autoscale, fixed, or disabled. |
replicas.min | Minimum worker replicas. Use 1 for warm workers. |
replicas.max | Maximum worker replicas; must not exceed the pool cap. |
InfraRules owns shared placement: node selectors, tolerations, resource
requests, and per-workload replica ceilings. Workload specs choose a
pool; they do not repeat placement rules.
For extra pod-level config, set spec.worker.podSpec. It is
deep-merged into the operator pod spec. Container array overrides are
not merged.
Gateway API
In Kubernetes installs the Function CRD is the source of truth and the runtime API below is registered from it. The routes are the same surface the Python SDK drives, and the path you reach for to register a UDF without the operator or to coordinate and recover workers by hand.
Spec routes
| Route | Behavior |
|---|---|
POST /v2/udfs | Create a UDF definition and queue. |
GET /v2/udfs | List UDFs. |
GET /v2/udfs/{id} | Read a UDF. |
DELETE /v2/udfs/{id} | Delete a UDF and its queue (does not delete written output). |
GET /v2/udfs/{id}/status | Queue depth, in-flight, failed counts. |
The create body carries the same shape the CRD spec expresses:
POST /v2/udfs
Content-Type: application/json
{
"id": "product-tags",
"spec": {
"target_namespaces": ["amazon-products"],
"inputs": ["id", "title", "description"],
"output": {"attribute": "tags", "kind": "tags", "version": "v1"},
"filter": ["Or", ["tags_v", "NotEq", "v1"], ["tags_v", "Eq", null]],
"triggers": ["discovery"],
"worker": {
"image": "ghcr.io/hev/hev-shop-udf-product-tags:latest",
"port": 8080,
"batch_size": 16,
"timeout_seconds": 30
},
"schedule": {
"discovery_interval_seconds": 300,
"lease_seconds": 120,
"max_in_flight_batches": 4,
"max_concurrent_scans": 1
},
"retry": {"max_attempts": 6, "initial_backoff_seconds": 5, "max_backoff_seconds": 300}
}
}
Lifecycle routes
| Route | Behavior |
|---|---|
POST /v2/udfs/{id}/pause | Stop both discovery and dispatch. Workers drain in-flight then idle. |
POST /v2/udfs/{id}/resume | Resume discovery and dispatch. |
POST /v2/udfs/{id}/reset-failed | Move every row in failed back to pending. |
POST /v2/udfs/{id}/discover | Trigger a discovery sweep immediately. |
reset-failed is the recovery path after a transient upstream
incident — for permanent issues, fix the input shape or bump
spec.output.version and re-apply.
Worker coordination routes
| Route | Behavior |
|---|---|
POST /v2/udfs/{id}/claim | Claim a batch of rows for processing. |
POST /v2/udfs/{id}/items/heartbeat | Extend the lease on in-flight items. |
POST /v2/udfs/{id}/items/complete | Report success and persist output. |
POST /v2/udfs/{id}/items/fail | Report failure (transient or permanent). |
The Python SDK’s run_udf_worker implements the full loop — most
workloads should never call these routes directly.
POST /v2/udfs/product-tags/items/complete
Content-Type: application/json
{
"worker_id": "udf-worker-0",
"items": [
{"namespace": "amazon-products", "id": "asin-B08N5WRWNW", "output": ["wireless", "waterproof"]}
]
}
claim returns the batch as (namespace, id) pairs alongside the input
columns the spec declared. Rows the gateway can’t bind from the index
(missing required inputs) surface as bind errors, not silent skips, so
the worker can fail them explicitly rather than retry forever. On fail,
kind: transient honors spec.retry while kind: permanent
dead-letters immediately — the SDK derives kind from TransientError /
PermanentError.
Writeback and discovery
UDF outputs are patched onto the target row as the named attribute.
output.kind is an SDK type hint; writeback semantics are the same for
tags, classifications, scalars, and vectors. When spec.output.version
is set, the gateway atomically writes the output and the matching
{attribute}_v marker in a single patch.
Discovery sweeps create an ID scan with spec.filter against each
target_namespace. Returned IDs are enqueued and deduplicated. The first
sweep after create/apply is implicit; subsequent sweeps run on
schedule.discovery_interval_seconds.
Lifecycle
kubectl get function product-tags
kubectl describe function product-tags
curl -H "authorization: Bearer $LAYER_GATEWAY_API_KEY" \
$LAYER_GATEWAY_URL/v2/udfs/product-tags/status
kubectl patch function product-tags --type=merge -p '{"spec":{"paused":true}}'
kubectl patch function product-tags --type=merge -p '{"spec":{"paused":false}}'
curl -X POST -H "authorization: Bearer $LAYER_GATEWAY_API_KEY" \
$LAYER_GATEWAY_URL/v2/udfs/product-tags/reset-failed
kubectl delete function product-tags
Deletion garbage-collects the operator-managed Deployment, Service, and ScaledObject. Written outputs are not deleted.
Version markers
spec.output.version is the re-run safety rail. When set, the gateway
stamps {attribute}_v alongside every output write. Bump the version
and keep the canonical stale filter when a model, taxonomy, or prompt
changes.
Tuning knobs
| Knob | What it bounds |
|---|---|
worker.batchSize | Rows per worker batch. |
worker.timeoutSeconds | Worker call timeout. |
schedule.leaseSeconds | How long a claim is held before reissue. |
schedule.discoveryIntervalSeconds | Time between discovery scan jobs. |
schedule.maxInFlightBatches | Concurrent worker batches per UDF. |
schedule.maxConcurrentScans | Concurrent namespace discovery jobs. |
retry.maxAttempts | Tries before a row lands in failed. |
Not in 0.1
- Cross-namespace aggregate UDFs.
- Chunkers or fan-out transforms; those remain pipelines.
- Multi-output UDFs.
- Managed image builds.