Skip to content

Table-in-out functions

ƒ
N rows → M rows

Consumes a relation and streams a transformed relation back, batch by batch.

A table-in-out function consumes a relation and streams a relation back — a batch-by-batch transform you'd rather express in Java than SQL: SELECT * FROM demo.echo((SELECT * FROM t)).

The defining property: a table-in-out function emits per input batch. It sees a batch, emits a batch, and never needs to hold the whole input. (When you do need the whole input — sort, top-k — use a buffering function instead.)

The model

The easiest base is PassthroughTIOFunction, for functions whose output schema equals their input schema (echo, filter, enrich-in-place). It supplies the bind; you write the exchange — an object whose onInputBatch() runs once per input batch.

java
// VGI-Java example: a table-in-out (TIO) function.
//
// A TIO function consumes a relation and streams a relation back — a row-by-row
// (really batch-by-batch) transform. DuckDB feeds you input batches; you emit
// output batches. Use it for streaming reshapes, enrichment, or filtering that
// you'd rather express in Java than SQL.
//
// This example is the canonical `echo`: output schema == input schema, every
// input batch passes through unchanged. `PassthroughTIOFunction` supplies the
// "output schema = input schema" bind, so you only write the exchange.
//
//   ATTACH 'demo' AS demo (TYPE vgi, LOCATION 'launch:/abs/path/bin/runTableInOut');
//   SELECT * FROM demo.echo((SELECT * FROM range(3) t(x)));   -- 0,1,2
package farm.query.vgi.examples;

import farm.query.vgi.Worker;
import farm.query.vgi.function.ArgSpec;
import farm.query.vgi.function.FunctionMetadata;
import farm.query.vgi.tableinout.PassthroughTIOFunction;
import farm.query.vgi.tableinout.TableInOutExchangeState;
import farm.query.vgi.tableinout.TableInOutInitParams;
import farm.query.vgirpc.AnnotatedBatch;
import farm.query.vgirpc.CallContext;
import farm.query.vgirpc.OutputCollector;
import farm.query.vgirpc.wire.Allocators;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.util.TransferPair;

import java.util.ArrayList;
import java.util.List;

/** {@code echo(data TABLE) -> *}: passes every input batch through unchanged. */
public final class TableInOutExample extends PassthroughTIOFunction {

    @Override public String name() { return "echo"; }

    @Override public FunctionMetadata metadata() {
        return FunctionMetadata.describe("Emit each input batch unchanged")
                .withCategories("utility");
    }

    // Declare the single table-valued argument. TIO functions take a relation.
    @Override public List<ArgSpec> argumentSpecs() {
        return List.of(ArgSpec.table("data", 0));
    }

    @Override public TableInOutExchangeState createExchange(TableInOutInitParams params) {
        return new EchoState();
    }

    /** One exchange instance per execution; `onInputBatch` runs per input batch. */
    public static final class EchoState extends TableInOutExchangeState {
        @Override
        public void onInputBatch(AnnotatedBatch input, OutputCollector out, CallContext ctx) {
            // Transfer the input vectors into a fresh root before emitting.
            //
            // Why not just `out.emit(input.root())`? The framework close()s each
            // emitted root after writing it. The input root is owned by the
            // reader and reused for the NEXT batch — closing it would corrupt the
            // stream. TransferPair moves the buffers into a root we own, leaving
            // the reader intact. (TransferPair, not a row copy, also preserves
            // dictionary-encoded children.)
            VectorSchemaRoot in = input.root();
            List<FieldVector> outVectors = new ArrayList<>();
            for (FieldVector v : in.getFieldVectors()) {
                TransferPair tp = v.getTransferPair(Allocators.root());
                tp.transfer();
                outVectors.add((FieldVector) tp.getTo());
            }
            VectorSchemaRoot copy = new VectorSchemaRoot(outVectors);
            copy.setRowCount(in.getRowCount());
            out.emit(copy);   // emit() takes ownership; do not close `copy` yourself
        }
    }

    public static void main(String[] args) {
        Worker.builder()
                .catalogName("demo")
                .registerTableInOut(new TableInOutExample())
                .runFromArgs(args);
    }
}
sql
SELECT n FROM demo.echo((SELECT * FROM demo.numbers(3))) ORDER BY n;  -- 0,1,2

The ownership rule that bites everyone

The one subtlety in table-in-out is buffer ownership:

You cannot out.emit(input.root()).

The framework close()s every root you emit, after writing it. But the input root is owned by the reader and reused for the next batch — closing it corrupts the stream (you'll see the second batch come back truncated or empty).

The fix, shown in the example, is to TransferPair the input vectors into a fresh root that you own and emit:

java
List<FieldVector> out = new ArrayList<>();
for (FieldVector v : in.getFieldVectors()) {
    TransferPair tp = v.getTransferPair(Allocators.root());
    tp.transfer();                       // move buffers; reader keeps its schema
    out.add((FieldVector) tp.getTo());
}
VectorSchemaRoot copy = new VectorSchemaRoot(out);
copy.setRowCount(in.getRowCount());
out.emit(copy);                          // emit() now owns and will close `copy`

TransferPair (not a row-by-row copy) is also what preserves dictionary-encoded (e.g. ENUM) children correctly.

Doing actual work

onInputBatch is where a real transform lives. To filter, build an output root holding only the rows you keep. To enrich, append computed columns. To reshape, emit a different (declared) schema. A few patterns from the reference worker:

  • filter_by_setting — drop rows failing a session-configured predicate.
  • repeat_inputs — emit each input batch N times.
  • echo_witness — passthrough that also reports projection-pushdown witness columns.

When your output schema differs from the input, implement TableInOutFunction directly (not PassthroughTIOFunction) and return the real schema from onBind().

Projection pushdown

metadata().withPushdown(true, false, false) opts into projection pushdown: the engine sends the set of columns actually needed, the framework narrows the declared output schema, and params.outputSchema() in createExchange reflects it — emit only those columns and no narrowing PROJECTION node sits above your operator. Filter pushdown is intentionally off for table-in-out (the engine always runs a FILTER node above the operator). See pushdown.

Going further

The full table-in-out surface — projection witnesses, ordering modes, partitioned output, batch-index tagging, cancellation — is in vgi-example-worker/src/main/java/farm/query/vgi/example/tableinout/ in the vgi-java repo. A few worth reading:

Next: aggregate functions →