Aggregate🔗
Wrapper for generic SQL aggregate transform with common pipeline fields.
Configuration🔗
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
inputs | [string] | yes | — | List of upstream component names to receive events from. |
error_policy | ErrorPolicy | no | drop_on_error | Policy for handling events that cause errors during aggregation. |
sql | string | yes | — | SQL query to execute against the sealed DuckDB table. |
window_duration | string | yes | — | Window duration (e.g., "15s", "60s", "5m"). |
memory_limit | string (optional) | no | 1GB | DuckDB memory limit (e.g., "8GB"). |
temp_directory | file path (optional) | no | — | Directory for DuckDB temporary files (spill-to-disk). |
outputs | {string: string} | no | — | Named output labels for forking the transform output. |
Telemetry🔗
This component emits the following standard telemetry metrics and events.
Metrics🔗
| Name | Type | Description |
|---|---|---|
component_received_events_total | Counter | Total number of events received from upstream. |
component_sent_events_total | Counter | Total number of events sent downstream. |
component_errors_total | Counter | Total number of errors encountered during processing. |
Events🔗
| Name | Description |
|---|---|
ComponentError | Emitted when an error occurs during component execution. |
ComponentEventsDropped | Emitted when events are intentionally or unintentionally dropped. |
Example🔗
transforms:
my_aggregate:
type: aggregate
inputs: # required
sql: SELECT count(*) AS total FROM events
window_duration: 60s
memory_limit: 8GB
temp_directory: /var/tmp/kinetic
outputs: {"errors": "error_branch"}