Guides
Pipelines
A pipeline indexes documents through staged work whose row count changes. The common shape is extract (CPU) and embed (GPU). The gateway tracks document state in PostgreSQL and exports queue depth so the operator can autoscale workers through KEDA.
Once vectors land in Turbopuffer, query and fetch them through the namespace API — see Namespaces.
Pipeline flow
CPU worker Gateway GPU worker
| POST /v2/pipelines |
|---- chunks --> PUT /documents/{doc_id} |
| chunks -> S3 + NVMe cache |
| state -> PostgreSQL |
| |
| GET /status <------ KEDA |
| |
| POST /claim <-------------|
| GET /chunks <-------------|
| PUT /vectors <------------|
| vectors -> Turbopuffer |
CPU worker — reads source data, extracts text/metadata, splits into chunks, calls the stage endpoint. Scales on input queue (e.g. SQS depth, Kafka lag).
GPU worker — polls the pipeline status endpoint for pending_count > 0, fetches chunks from the gateway, runs the embedding model, calls the vectors endpoint. Scales on pending_count via KEDA.
The gateway handles chunk storage (S3 backing plus embedded Aerospike cache), vector upsert (Turbopuffer), and state tracking (embedded PostgreSQL). Workers are stateless and never connect to gateway-internal stores.
Prerequisites
Pipeline routes are registered only when DATABASE_URL is configured. The Helm chart sets DATABASE_URL to the gateway pod’s loopback PostgreSQL sidecar. The migration runs automatically on startup.
export DATABASE_URL=postgres://hevlayer:hevlayer@localhost:5432/hevlayer
Pipeline CRD
Declare a Pipeline when the operator should own the worker Deployment and KEDA object. See Pipeline CRD for the full resource reference.
apiVersion: hevlayer.com/v1alpha1
kind: Pipeline
metadata:
name: product-images
namespace: layer
spec:
target:
namespace: products
worker:
image: ghcr.io/hev/product-image-worker:latest
batchSize: 64
timeoutSeconds: 60
scaling:
pool: cpu
mode: autoscale
replicas:
min: 0
max: 8
spec.scaling.pool must name a compute pool in InfraRules/default.
mode: fixed pins replicas to replicas.min; mode: disabled and
spec.paused: true scale the worker to 0.
Gateway API
Create a pipeline
curl -X POST http://gateway:8080/v2/pipelines \
-H 'content-type: application/json' \
-d '{
"id": "product-images",
"target_namespace": "products",
"distance_metric": "cosine_distance"
}'
distance_metric defaults to cosine_distance. Returns 409 if the pipeline already exists.
Stage a document (CPU worker)
curl -X PUT http://gateway:8080/v2/pipelines/product-images/documents/asin-B08N5WRWNW \
-H 'content-type: application/json' \
-d '{
"chunks": [
{"id": "asin-B08N5WRWNW-0", "text": "Wireless noise-cancelling headphones"},
{"id": "asin-B08N5WRWNW-1", "text": "40-hour battery life", "metadata": {"page": 2}}
]
}'
Each chunk is stored durably in S3 and cached in Aerospike (set:
pipe_{target_namespace}). The document is marked pending. Re-staging the
same document ID replaces the previous chunk backing and resets it to
pending.
Get pipeline status (KEDA polling)
curl http://gateway:8080/v2/pipelines/product-images/status
{
"pipeline_id": "product-images",
"counts": {"pending": 142, "indexed": 8530},
"pending_count": 142
}
pending_count is the field KEDA watches. When it hits zero, GPU workers scale to zero.
Read chunks and write vectors (GPU worker)
curl http://gateway:8080/v2/pipelines/product-images/documents/asin-B08N5WRWNW/chunks
After embedding, write vectors back. This upserts to Turbopuffer and marks the document indexed:
curl -X PUT http://gateway:8080/v2/pipelines/product-images/documents/asin-B08N5WRWNW/vectors \
-H 'content-type: application/json' \
-d '{
"vectors": [
{"id": "asin-B08N5WRWNW-0", "vector": [0.0012, -0.043], "attributes": {"text": "..."}}
]
}'
Claim, heartbeat, stage
Workers claim staged documents through layer instead of mutating Postgres directly. Layer sets claimed_by and claimed_at, moves rows to the requested claim stage, recovers stale claims older than the lease, and uses FOR UPDATE SKIP LOCKED so multiple workers can claim concurrently.
POST /v2/pipelines/product-images/claim
{
"stage": "pending",
"claim_stage": "embedding",
"limit": 2000,
"worker_id": "gpu-worker-0",
"lease_seconds": 900
}
Heartbeat long-running claims:
POST /v2/pipelines/product-images/documents/heartbeat
{
"document_ids": ["B07XYZ123"],
"stage": "embedding",
"worker_id": "gpu-worker-0"
}
Move claimed documents to a final stage:
POST /v2/pipelines/product-images/documents/stage
{
"document_ids": ["B07XYZ123"],
"stage": "indexed",
"from_stage": "embedding",
"worker_id": "gpu-worker-0"
}
Use stage: "pending" for release and stage: "failed" for permanent failures. Use create_missing: true without from_stage/worker_id when a pipeline enqueues lightweight document IDs without chunks (e.g. aggregate refresh jobs).
create_missing uses the segmented queue by default. Layer writes document
IDs into compressed S3 manifests and stores only segment leases and counters
in PostgreSQL, so large lightweight queues scale by segment count rather than
by one PostgreSQL row per document. Set PIPELINE_SEGMENT_SIZE to tune the
number of logical documents per segment. The Helm default segment size is
10,000, so 1,000,000 lightweight IDs become about 100 PostgreSQL segment rows.
PIPELINE_QUEUE_BACKEND=row keeps the legacy per-document PostgreSQL queue for
failure testing.
Segment manifests are queue state, not durable history. Layer deletes
superseded manifests after segment splits, deletes completed manifests when
documents move to indexed, and removes the pipeline segment prefix when the
pipeline is deleted.
Document lifecycle
stage_document() write_vectors()
(new doc) ──────────────────► pending ──────────────────► indexed
▲
│ re-stage (idempotent)
- pending — chunks stored in Aerospike, waiting for embedding.
- indexed — vectors written to Turbopuffer.
Re-staging a document resets it to pending with new chunks. Useful for reprocessing after source data changes.
Failure model
- Turbopuffer write failures are hard: the vectors route returns 502 and the document stays in
embeddingfor re-claim. - Aerospike cache failures do not block chunk reads when S3 backing is present; PostgreSQL connectivity surfaces as 500 and should be retried with backoff.
- Lease expiry is handled server-side. A worker that crashes mid-embedding has its rows recovered on the next claim sweep.
Autoscaling
The operator emits KEDA directly from Pipeline.spec.scaling. For
manual workers that are not represented by a Pipeline CR, use the same
Prometheus signal:
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: gpu-embed-worker
spec:
scaleTargetRef:
name: gpu-embed-worker
minReplicaCount: 0
maxReplicaCount: 8
triggers:
- type: prometheus
metadata:
serverAddress: http://layer-gateway.layer.svc.cluster.local:8080/v2/metrics
metricName: product_images_pending
query: 'sum(layer_pipeline_stage_count{pipeline_id="product-images",stage="pending"}) or vector(0)'
threshold: "50" # 1 replica per 50 pending docs
activationThreshold: "1" # scale from 0 when any doc is pending
This keeps autoscaling close to the same source of truth Layer uses for claims while keeping PostgreSQL private to the gateway pod.
CPU workers — scale on input source
CPU workers scale on whatever feeds them — SQS queue depth, Kafka consumer lag, S3 event notifications, etc. This is independent of the pipeline API.
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: cpu-extract-worker
spec:
triggers:
- type: aws-sqs-queue
metadata:
queueURL: https://sqs.us-east-1.amazonaws.com/123456789/product-images
queueLength: "10"
awsRegion: us-east-1