Buffering functions
A buffering (Sink + Source) function must see all of its input before it produces any output. That's the defining difference from a table-in-out function, which emits per input batch. Reach for buffering when the operation is inherently whole-relation: sort, top-k, median, dedup, or an aggregation that emits many rows.
The three-phase lifecycle
Implement TableBufferingFunction:
| Phase | Method | Role |
|---|---|---|
| Sink | process(batch, params) | stash each input batch, return an opaque state_id |
| (end of input) | combine(stateIds, params) | group the state_ids into the finalize streams the Source will drain |
| Source | createFinalizeProducer(params) | emit the buffered rows back out, one batch per tick |
State is stashed in params.storage() — a durable, execution-scoped key/value and append-log store. Buffering through storage (rather than a field on your function) is what lets the engine spread the Sink across parallel workers and still have the Source see everything.
// VGI-Java example: a table-buffering (Sink + Source) function.
//
// A buffering function must see ALL input before it produces ANY output — think
// sort, top-k, or whole-relation aggregation. Unlike a TIO function (which emits
// per input batch), it has a three-phase lifecycle:
//
// process() — Sink: stash each input batch, return a state_id
// combine() — once at end-of-input: group state_ids into the
// finalize streams the Source will drain
// createFinalizeProducer() — Source: emit the buffered rows back out
//
// State is stashed in `params.storage()` — a durable, execution-scoped key/value
// + append-log store — so buffering survives even when DuckDB spreads the Sink
// across parallel workers. This example is the canonical "collect every batch,
// replay it during finalize" (a passthrough that happens to fully buffer).
//
// ATTACH 'demo' AS demo (TYPE vgi, LOCATION 'launch:/abs/path/bin/runBuffering');
// SELECT * FROM demo.collect((SELECT * FROM range(3) t(x))); -- 0,1,2
package farm.query.vgi.examples;
import farm.query.vgi.Worker;
import farm.query.vgi.buffering.BufferingFinalizeProducer;
import farm.query.vgi.buffering.TableBufferingCombineParams;
import farm.query.vgi.buffering.TableBufferingFinalizeParams;
import farm.query.vgi.buffering.TableBufferingFunction;
import farm.query.vgi.buffering.TableBufferingProcessParams;
import farm.query.vgi.function.FunctionMetadata;
import farm.query.vgi.function.FunctionSpec;
import farm.query.vgi.internal.BatchUtil;
import farm.query.vgi.internal.SchemaUtil;
import farm.query.vgi.protocol.BindResponse;
import farm.query.vgi.storage.FunctionStorage;
import farm.query.vgi.table.TableProducerState;
import farm.query.vgi.tableinout.TableInOutBindParams;
import farm.query.vgirpc.CallContext;
import farm.query.vgirpc.OutputCollector;
import farm.query.vgirpc.wire.Allocators;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
import java.nio.charset.StandardCharsets;
import java.util.List;
/** {@code collect(data TABLE) -> *}: buffers every input batch, replays them. */
public final class BufferingExample implements TableBufferingFunction {
// A namespace + key naming the append-log we buffer batches into.
private static final byte[] NS = "buf".getBytes(StandardCharsets.UTF_8);
private static final byte[] KEY = new byte[0];
private static final FunctionSpec SPEC = FunctionSpec.builder("collect")
.metadata(FunctionMetadata.describe("Buffer all input, then replay it")
.withCategories("utility"))
.table("data")
.build();
@Override public FunctionSpec spec() { return SPEC; }
// Output schema = input schema (passthrough).
@Override public BindResponse onBind(TableInOutBindParams params) {
Schema in = params.inputSchema();
Schema out = (in == null || in.getFields().isEmpty()) ? new Schema(List.of()) : in;
return BindResponse.forSchema(SchemaUtil.serializeSchema(out));
}
// Sink: append this batch's IPC bytes to the log, return our execution id as
// the state_id (every batch of one execution shares the same log).
@Override public byte[] process(VectorSchemaRoot batch, TableBufferingProcessParams params) {
params.storage().stateAppend(NS, KEY, BatchUtil.writeSingleBatch(batch));
return params.executionId();
}
// Combine: one output stream, keyed by the execution id.
@Override public List<byte[]> combine(List<byte[]> stateIds, TableBufferingCombineParams params) {
return List.of(params.executionId());
}
// Source: drain the log one buffered batch per tick.
@Override public TableProducerState createFinalizeProducer(TableBufferingFinalizeParams params) {
return new ReplayProducer(params);
}
private static final class ReplayProducer extends BufferingFinalizeProducer {
private long afterId = -1; // log cursor; -1 = before the first entry
ReplayProducer(TableBufferingFinalizeParams params) { super(params); }
@Override public void produceTick(OutputCollector out, CallContext ctx) {
List<FunctionStorage.LogEntry> rows = storage.stateLogScan(NS, KEY, afterId, 1);
if (rows.isEmpty()) { out.finish(); return; }
FunctionStorage.LogEntry e = rows.get(0);
VectorSchemaRoot full = BatchUtil.readSingleBatch(e.value(), Allocators.root());
emitProjected(full, out); // narrows to projected cols + applies filters
full.close();
afterId = e.id();
}
}
public static void main(String[] args) {
Worker.builder()
.catalogName("demo")
.registerTableBuffering(new BufferingExample())
.runFromArgs(args);
}
}SELECT n FROM demo.collect((SELECT * FROM demo.numbers(4))) ORDER BY n; -- 0,1,2,3Walking the lifecycle
processis called once per input batch. Here it appends the batch's IPC bytes to an append-log namespace (storage.stateAppend(ns, key, bytes)) and returns the execution id as thestate_id— so every batch of one execution lands in the same log.combineruns once when input is exhausted. It returns thefinalize_state_ids — one per output stream the Source will produce. The example returns a single stream keyed by the execution id.createFinalizeProducerreturns a producer (extendingBufferingFinalizeProducer) whoseproduceTick()cursor-drains the log, one buffered batch per tick, until the log is empty and it callsout.finish().
BufferingFinalizeProducer.emitProjected(full, out) narrows each buffered batch to the projected columns and applies pushed-down filters before emitting — the same projection/filter handling the engine expects, for free.
Where real work goes
The example is a passthrough (collect) so the lifecycle is visible. A useful buffering function does its work where it has the whole picture:
- In
process/combinefor incremental reductions — e.g. accumulate a running sum or a heap of the top-K rows in storage as batches arrive, then have the Source emit just the result. - In the finalize producer for whole-relation passes — e.g. read all buffered batches in
produceTick, sort them, and emit in order.
storage gives you more than an append-log: scoped key/value blobs, atomic int64 counters, ranged scans, and a FIFO work-queue — enough to implement a distributed sort or a streaming top-K. See the accumulate fixtures in the vgi-java repo.
Storage notes
- Execution-scoped.
params.storage()is pinned to one execution; keys you write are invisible to other queries and cleaned up after. - Namespaces starting
_vgi/are reserved for the framework. Use your own. - Order isn't guaranteed across parallel sinks unless you ask. Override
sinkOrderDependent()/sourceOrderDependent()/requiresInputBatchIndex()to force ordered ingest or draining, or to receive the engine's global batch index inprocess.
Going further
vgi-example-worker/src/main/java/farm/query/vgi/example/buffering/ has the full set: SumAllColumns (a real reduction), DistributedSum, ordered and batch-indexed variants, large-state and crash/cancellation fixtures, plus the AbstractBufferAndDrain helper this example is modeled on.
That's all five kinds. Next: parallelism →
