In the latest released version, GreptimeDB has implemented the Flow Engine to support continuous aggregations. Users can define a flow task to calculate sums, averages, or other aggregations into a sink table and query the aggregated result in real time.
This article introduces the basic usage and features of continuous aggregations in GreptimeDB and provides examples to illustrate the process of creating, using, and deleting Flow tasks.
What is Continuous Aggregation
Continuous Aggregation is a query that continuously runs to provide incrementally updated, consistent, materialized views.
Continuous aggregation has many practical applications, such as Streaming ETL, real-time analysis, monitoring, alerting, etc. One common use case is downsampling, where window functions can be used to downsample a signal with a millisecond-level output frequency to a second-level frequency (e.g., calculating the average value within one second), thus saving storage and computational costs.
Furthermore, suppose a speed sensor is reading high-frequency data. In that case, continuous aggregation can filter out data points with speeds below or above a certain value, calculate the average speed every five minutes, and then output the results to a result table.
Continuous aggregations are supported by the Flow engine. Flow engine is a built-in lightweight stream processing engine that offers continuous aggregation and window calculation, among other functions. Users can create a Flow task for continuous aggregation directly using SQL statements without writing additional business code. Flow tasks can be used for real-time data processing and calculations.
Application Example
Continuous aggregations can be defined using SQL. This article demonstrates the entire process from creating a Flow task, receiving data for stream processing, to deleting the Flow task.
As an example, let's consider a speed sensor that reads the instantaneous speeds of the left and right wheels. We filter out abnormal values that are too high or too low, and then calculate the average speed over a five-second interval.
First, create a source data table as input:
CREATE TABLE velocity (
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
left_wheel FLOAT,
right_wheel FLOAT,
TIME INDEX(ts)
);
And create a result table for the Flow task output:
CREATE TABLE out_num_cnt (
sum_number BIGINT,
start_window TIMESTAMP TIME INDEX,
end_window TIMESTAMP,
update_at TIMESTAMP,
);
Now you can create a Flow task using the SQL extension we provide, as illustrated in the example below:
CREATE FLOW calc_avg_speed
SINK TO avg_speed
AS
SELECT avg((left_wheel+right_wheel)/2)
FROM velocity
WHERE left_wheel > 0.5 AND right_wheel > 0.5 AND left_wheel < 60 AND right_wheel < 60
GROUP BY tumble(ts, '5 second');
The SQL statement above creates a Flow task named calc_avg_speed
, and outputs results to the avg_speed
table. The query it runs is defined by the SELECT
statement following AS
.
In this example, the specific query operates as follows:
First, it filters out speed values for the left and right wheels that are too small or too large (less than or equal to 0.5, or greater than or equal to 60).
Then, based on the input table
velocity
, it calculates the average speed within each five-second window, using thets
column as the time index.
The query in the Flow job is entirely based on SQL syntax, with necessary extensions implemented as needed (e.g., tumble
window).
Now that the Flow task is created, to observe the continuous aggregation results in avg_speed
, simply insert data into the source data table velocity
:
INSERT INTO velocity
VALUES
("2021-07-01 00:00:00.200", 0.0, 0.7),
("2021-07-01 00:00:00.200", 0.0, 61.0),
("2021-07-01 00:00:02.500", 2.0, 1.0,);
Note that the first two rows are filtered out because they do not meet the conditions, leaving only the third row for calculation. Querying the output table will then yield the computed result.
SELECT * FROM avg_speed;
avg_speed | start_window | end_window | update_at
-----------+----------------------------+----------------------------+----------------------------
1.5 | 2021-07-01 00:00:00.000000 | 2021-07-01 00:00:05.000000 | 2024-06-04 03:35:20.670000
(1 row)
Try inserting more data into the velocity
table:
INSERT INTO velocity
VALUES
("2021-07-01 00:00:05.100", 5.0, 4.0),
("2021-07-01 00:00:09.600", 2.3, 2.1);
The avg_speed
table now contains two rows, representing the average speeds for two 5-second windows, which are 1.5 and 3.35 (calculated as (4.5+2.2)/2), respectively.
SELECT * FROM avg_speed;
avg_speed | start_window | end_window | update_at
-----------+----------------------------+----------------------------+----------------------------
1.5 | 2021-07-01 00:00:00.000000 | 2021-07-01 00:00:05.000000 | 2024-06-04 03:35:20.670000
3.35 | 2021-07-01 00:00:05.000000 | 2021-07-01 00:00:10.000000 | 2024-06-04 03:35:34.693000
The columns in the out_num_cnt
table are explained as follows:
sum_number
: The sum of thenumber
column within the window.start_window
: The start time of the window.end_window
: The end time of the window.update_at
: The time the row data was updated.
The start_window
and end_window
columns are automatically added by the Flow engine's window function tumble
. The update_at
column is automatically added by the Flow engine to the Flow task output table, marking the latest update time of this row of data to help understand the operation status of the Flow task.
Lastly, use DROP FLOW
to delete this Flow task:
DROP FLOW calc_avg_speed;
Flow Management and Advanced Features
Create or update a Flow
CREATE FLOW [ IF NOT EXISTS ] <flow-name>
SINK TO <sink-table-name>
[ EXPIRE AFTER <expr> ]
[ COMMENT = "<string>" ]
AS
<SQL>;
The syntax for creating a Flow task can be explained as follows:
flow-name
: A globally unique identifier.sink-table-name
: The name of the table where aggregated data is stored.- It can be either an existing table or a new table. If the target table does not exist, Flow will automatically create one.
EXPIRE AFTER
: An optional time interval (use SQL syntaxINTERVAL
) used to clear expired intermediate states from the Flow engine.COMMENT
: A descriptive comment for the Flow task.<SQL>
: Defines the specific continuous aggregation query. The Flow engine will extract the referenced tables as the source tables for the Flow task.
A simple example:
CREATE FLOW IF NOT EXISTS my_flow
SINK TO my_sink_table
EXPIRE AFTER INTERVAL '1 hour'
COMMENT = "My first flow in GreptimeDB"
AS
SELECT count(item) from my_source_table GROUP BY tumble(time_index, '5 minutes');
Let's explain a bit more about EXPIRE AFTER
clause. In simple terms, like all modern stream processing systems, the Flow engine has two important concepts: system time and event time.
System time
: Also called processing time, this refers to the system time of the machine executing the respective operation.Event time
: The time an event represented by a row of data happened, usually recorded in a column of that row of data. Flow considers theTIME INDEX
column as the event time.
The EXPIRE AFTER
expiration mechanism uses the difference between system time and event time to discard outdated rows in the Flow intermediate state. In the SQL example above, rows with event times more than one hour prior to the system time will be discarded and will not participate in calculations.
TIP
Note that EXPIRE AFTER
only applies to newly arriving data. Therefore, the results in the output table will not change simply due to the passage of time; instead, it ensures that data arriving late will not be updated in the output table.
Additionally, the intermediate state of Flow is currently unpersisted but is purely stored in memory. State persistence will be added in future releases to ensure data correctness even after a restart.
Delete Flow
To delete a Flow task, use the following statement:
DROP FLOW [IF EXISTS] <name>
Supported Flow Functions
Currently, Flow supports:
Aggregation functions:
count
,sum
,avg
,min
, andmax
;Scalar functions: addition, subtraction, multiplication, division, comparison, and logical operations;
The fixed window
tumble
function.
In the future, more aggregation functions, scalar functions, and window functions will be supported for continuous aggregation.
Summary
This article introduces the basic usage and features of continuous aggregations in GreptimeDB. It provides examples to illustrate the process of creating, using, and deleting Flow tasks.
Continuous aggregation allows for real-time, low-latency (second-level or sub-second) access to information of interest to users while avoiding additional memory and computational overhead.
In the future, in addition to supporting more functions, we will also support the persistence of intermediate states in stream processing and advanced features such as Temporal Filter. For more detailed information, please refer to the relevant documentation and development guides.
About Greptime
We help industries that generate large amounts of time-series data, such as Connected Vehicles (CV), IoT, and Observability, to efficiently uncover the hidden value of data in real-time.
Visit the latest version from any device to get started and get the most out of your data.
- GreptimeDB, written in Rust, is a distributed, open-source, time-series database designed for scalability, efficiency, and powerful analytics.
- GreptimeCloud is a fully-managed cloud database-as-a-service (DBaaS) solution built on GreptimeDB. It efficiently supports applications in fields such as observability, IoT, and finance. The built-in observability solution, GreptimeAI, helps users comprehensively monitor the cost, performance, traffic, and security of LLM applications.
- Vehicle-Cloud Integrated TSDB solution is tailored for business scenarios of automotive enterprises. It addresses the practical business pain points that arise when enterprise vehicle data grows exponentially.
If anything above draws your attention, don't hesitate to star us on GitHub or join GreptimeDB Community on Slack. Also, you can go to our contribution page to find some interesting issues to start with.