Aggregate functions
An aggregate collapses many rows into one value per group: SELECT g, demo.vgi_sum(v) ... GROUP BY g. VGI aggregates are built for DuckDB's parallel, partial aggregation model, which is why the interface has four methods rather than one.
The four methods
Implement AggregateFunction<State>:
| Method | Role |
|---|---|
newState() | create a fresh, empty per-group accumulator |
update(states, groupIds, batch) | fold a batch of rows into the accumulators |
combine(target, source) | merge two partial accumulators |
finalize(result, rowIndex, state) | write a group's accumulator as the output value |
combine() exists because the engine may aggregate in parallel: several threads (or processes) each build partial States over a slice of the data, then merge them. Your State is Serializable so a partial can cross a process boundary. Keep it small.
// VGI-Java example: an aggregate function.
//
// An aggregate collapses many rows into one value per group. VGI aggregates are
// built for DuckDB's *parallel, partial* aggregation model, so you implement
// four pieces:
//
// newState() — a fresh, empty accumulator
// update() — fold a batch of rows into the per-group accumulators
// combine() — merge two partial accumulators (parallel workers / spill)
// finalize() — write a group's accumulator out as the result value
//
// The `State` is `Serializable` because partials may cross process boundaries
// when DuckDB parallelizes the aggregation. Keep it small.
//
// ATTACH 'demo' AS demo (TYPE vgi, LOCATION 'launch:/abs/path/bin/runAggregate');
// SELECT g, demo.vgi_sum(v) FROM (VALUES (1,10),(1,20),(2,5)) t(g,v) GROUP BY g;
// -- 1 -> 30, 2 -> 5
package farm.query.vgi.examples;
import farm.query.vgi.Worker;
import farm.query.vgi.aggregate.AggregateFunction;
import farm.query.vgi.function.FunctionSpec;
import farm.query.vgi.types.Schemas;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
/** {@code vgi_sum(value BIGINT) -> BIGINT}: sum per group, overflow-checked. */
public final class AggregateExample implements AggregateFunction<AggregateExample.State> {
/** Per-group accumulator. Serializable: partials may be merged across workers. */
public static final class State implements Serializable {
private static final long serialVersionUID = 1L;
long total;
}
private static final Schema OUTPUT_SCHEMA =
new Schema(List.of(Schemas.nullable("result", Schemas.INT64)));
private static final FunctionSpec SPEC = FunctionSpec.builder("vgi_sum")
.description("Sum integer values")
.arg("value", Schemas.INT64)
.build();
@Override public FunctionSpec spec() { return SPEC; }
@Override public Schema outputSchema() { return OUTPUT_SCHEMA; }
@Override public State newState() { return new State(); }
// Fold one input batch into the accumulators. `groupIds[i]` is the group of
// row i; states.computeIfAbsent mints an accumulator the first time a group
// is seen in this partition.
@Override
public void update(Map<Long, State> states, long[] groupIds, VectorSchemaRoot input) {
FieldVector v = input.getFieldVectors().get(0);
if (!(v instanceof BigIntVector b)) return;
int rows = input.getRowCount();
try {
for (int i = 0; i < rows; i++) {
if (b.isNull(i)) continue;
State s = states.computeIfAbsent(groupIds[i], k -> new State());
s.total = Math.addExact(s.total, b.get(i));
}
} catch (ArithmeticException e) {
throw new IllegalArgumentException("vgi_sum: int64 overflow", e);
}
}
// Merge a partial (`source`) produced by another worker into `target`.
@Override
public void combine(State target, State source) {
target.total = Math.addExact(target.total, source.total);
}
// Write one group's final value into the output column at `rowIndex`.
@Override
public void finalize(FieldVector result, int rowIndex, State state) {
((BigIntVector) result).setSafe(rowIndex, state.total);
}
public static void main(String[] args) {
Worker.builder()
.catalogName("demo")
.registerAggregate(new AggregateExample())
.runFromArgs(args);
}
}SELECT g, demo.vgi_sum(v)
FROM (VALUES (1,10),(1,20),(2,5)) t(g,v) GROUP BY g ORDER BY g; -- 1->30, 2->5How the pieces fit
updateis the hot loop.groupIds[i]is rowi's group; mint an accumulator withstates.computeIfAbsent(gid, k -> new State()). Read the input columns frombatch.getFieldVectors().combinemust be associative and commutative — it's called in an unspecified order across partials.finalizewrites one output row. The output column is theoutputSchema()you declared (here a singleresult BIGINT).
Arguments and output
The FunctionSpec declares the input arguments and outputSchema() the result:
private static final FunctionSpec SPEC = FunctionSpec.builder("vgi_sum")
.description("Sum integer values")
.arg("value", Schemas.INT64) // one input column
.build();A nullary aggregate (like vgi_count()) declares no arg and counts rows. Override finalizeEmpty(result, rowIndex) to control the empty-group result — COUNT returns 0 there, while SUM returns NULL (the default):
@Override public void finalizeEmpty(FieldVector result, int rowIndex) {
((BigIntVector) result).setSafe(rowIndex, 0L); // count of nothing is 0
}Correctness notes
- Guard overflow.
Math.addExact/Math.multiplyExactturn a silent wrap into a clearvgi_sum: int64 overflowerror. - Don't stash Arrow vectors in
State. The input batch is reused; copy out the scalar values you need. (Stateis serialized — it must be plain data.) combinemay run before or after anyupdate. Never assume an order.
Going further
vgi-example-worker/src/main/java/farm/query/vgi/example/aggregate/ has richer aggregates: Avg (a two-field running state), ListAgg (a growing list state), and Count. They all follow the same four-method shape.
Next: buffering functions →
