Chapter 24 - 时序数据库Clickhouse¶
ClickHouse 是一个开源的列式时序数据库管理系统(DBMS),专为高性能和低延迟的数据分析而设计。它最初由俄罗斯的互联网公司 Yandex 开发,用于处理海量的数据分析工作负载。以下是 ClickHouse 的主要特点和介绍:
-
列式存储:ClickHouse 采用列式存储,这意味着它将数据按列存储在磁盘上,而不是按行存储。这种存储方式对于数据分析非常高效,因为它允许查询只读取所需的列,而不必读取整个行。这导致了更快的查询性能和更小的存储空间占用。
-
分布式架构:ClickHouse 具有分布式架构,可以轻松扩展以处理大规模数据集。它支持数据分片、分布式复制和负载均衡,以确保高可用性和容错性。
-
支持 SQL 查询:ClickHouse 支持标准的 SQL 查询语言,使用户可以使用熟悉的查询语法执行数据分析操作。它还支持复杂的查询,如聚合、窗口函数和子查询。
-
高性能:ClickHouse 以查询性能和吞吐量为重点进行了优化。它专为快速的数据分析查询而设计,可以在毫秒级别内处理数十亿行的数据。
-
实时数据注入:ClickHouse 支持实时数据注入,允许将新数据迅速插入到表中,并能够在不停机的情况下进行数据更新。
-
支持多种数据格式:ClickHouse 可以处理多种数据格式,包括 JSON、CSV、Parquet 等,使其能够与各种数据源无缝集成。
-
可扩展性:ClickHouse 具有可扩展性,可以与其他工具和框架(如 Apache Kafka、Spark、Presto)集成,以满足各种数据处理需求。
-
开源和活跃的社区:ClickHouse 是一个开源项目,拥有活跃的社区支持。这意味着你可以免费获取并使用它,并且有一个庞大的开发者社区,提供了大量的文档和资源。
ClickHouse 在大数据分析、日志处理、事件追踪、时序数据分析等场景中得到了广泛的应用。它的高性能、可扩展性和强大的查询功能使其成为处理大规模数据的理想选择。如果你需要处理大量时序数据并进行快速数据分析,那么 ClickHouse 可能是一个非常有价值的数据库管理系统。
24.1 安装和配置ClickHouse数据库¶
24.1.1 安装¶
在Ubuntu上安装ClickHouse:¶
- 打开终端并更新包列表:
- 安装ClickHouse的APT存储库:
sudo apt install apt-transport-https ca-certificates dirmngr
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4
echo "deb https://repo.clickhouse.tech/deb/stable/ main/" | sudo tee /etc/apt/sources.list.d/clickhouse.list
- 再次更新包列表以获取ClickHouse包:
- 安装ClickHouse Server:
- 启动ClickHouse服务:
- 我们可以使用以下命令检查ClickHouse服务器的状态:
在Manjaro / Arch Linux上安装ClickHouse:¶
- 打开终端并使用以下命令安装ClickHouse:
- 启动ClickHouse服务:
- 我们可以使用以下命令检查ClickHouse服务器的状态:
这样ClickHouse就已经安装在你的Ubuntu或Arch Linux系统上了,并且服务已启动。
此时如果我们如果访问本地host上的这个网址:http://localhost:8123 ,会看到服务器返回了一个'Ok'给我们。
24.1.2 配置clickhouse的密码¶
还是不要忘记,生产环境中安全是至关重要的,在ClickHouse中配置密码需要完成以下步骤:
- 创建用户和设置密码: 首先,我们需要登录到ClickHouse服务器上,并使用管理员权限创建用户并设置密码。我们可以使用ClickHouse客户端或者通过在配置文件中执行SQL来完成这一步骤。
使用ClickHouse客户端:
请将 'your_username' 替换为我们要创建的用户名,将 'your_password' 替换为用户的密码。
- 分配权限:
创建用户后,需要分配相应的权限。通常,我们可以使用
GRANT语句来为用户分配权限。以下是一个示例,将允许用户对特定表执行SELECT操作:
这将授予 'your_username' 用户对 'database_name.table_name' 表的SELECT权限。我们可以根据需要为用户分配不同的权限。
- 配置ClickHouse服务:
接下来,我们需要配置ClickHouse服务器以启用身份验证。在ClickHouse的配置文件中,找到并编辑
users.xml文件。通常,该文件的位置是/etc/clickhouse-server/users.xml。在该文件中,我们可以为刚刚创建的用户添加相应的配置。
<yandex>
<profiles>
<!-- 添加用户配置 -->
<your_username>
<password>your_password</password>
<networks>
<ip>::/0</ip> <!-- 允许所有IP连接 -->
</networks>
</your_username>
</profiles>
</yandex>
请注意,这只是一个示例配置,我们需要将 'your_username' 和 'your_password' 替换为实际的用户名和密码。此外,上述配置允许来自所有IP地址的连接,这可能不是最安全的配置。我们可以根据需要限制连接的IP地址范围。
- 重启ClickHouse服务: 最后,重新启动ClickHouse服务器以使配置更改生效:
这会重新加载配置文件并应用新的用户和权限设置。
完成上述步骤后,我们的ClickHouse服务器将配置了用户名和密码的身份验证机制,并且只有具有正确凭据的用户才能访问相应的数据库和表。请确保密码强度足够,以增强安全性。
24.2 ClickHouse for Rust: clickhouse.rs库¶
clickhouse.rs 是一个网友 Paul Loyd 开发的比较成熟的第三方 Rust 库,旨在与 ClickHouse 数据库进行交互,提供了便捷的查询执行、数据处理和连接管理功能。以下是该库的一些主要特点:
主要特点¶
- 异步支持:
clickhouse.rs利用了 Rust 的异步编程能力,非常适合需要非阻塞数据库操作的高性能应用程序。 - 类型化接口:该库提供了强类型接口,使数据库交互更加安全和可预测,减少运行时错误并提高代码的健壮性。
- 支持 ClickHouse 特性:库支持多种 ClickHouse 特性,包括批量插入、复杂查询和不同的数据类型。
- 连接池:
clickhouse.rs包含连接池功能,在高负载场景下实现高效的数据库连接管理。 - 易用性:该库设计简洁明了的 API,使各级 Rust 开发者都能轻松上手。
安装¶
要使用 clickhouse.rs,需要在 Cargo.toml 中添加依赖项:
基本用法¶
以下是使用 clickhouse.rs 连接 ClickHouse 数据库并执行查询的简单示例:
use clickhouse::{Client, Row};
use futures::stream::StreamExt;
use tokio;
#[tokio::main]
async fn main() {
// 初始化客户端
let client = Client::default()
.with_url("http://localhost:8123")
.with_database("default");
// 执行查询示例
let mut cursor = client.query("SELECT number FROM system.numbers LIMIT 10").fetch().unwrap();
while let Some(row) = cursor.next().await {
let number: u64 = row.unwrap().get("number").unwrap();
println!("{}", number);
}
}
错误处理¶
该库使用标准的 Rust 错误处理机制,使得管理潜在问题变得简单。以下是处理查询执行错误的示例:
use clickhouse::{Client, Error};
use tokio;
#[tokio::main]
async fn main() -> Result<(), Error> {
let client = Client::default().with_url("http://localhost:8123");
let result = client.query("SELECT number FROM system.numbers LIMIT 10")
.fetch()
.await;
match result {
Ok(mut cursor) => {
while let Some(row) = cursor.next().await {
let number: u64 = row.unwrap().get("number").unwrap();
println!("{}", number);
}
}
Err(e) => eprintln!("查询执行错误: {:?}", e),
}
Ok(())
}
高级用法¶
对于批量插入或处理特定数据类型等复杂场景,请参阅库的文档和示例。该库支持多种 ClickHouse 特性,可以适应各种使用场景。
24.3 备份 ClickHouse 数据库的教程¶
备份 ClickHouse 数据库对于数据安全和业务连续性至关重要。通过定期备份,可以在数据丢失、硬件故障或人为错误时快速恢复,确保数据完整性和可用性。此外,备份有助于在系统升级或迁移过程中保护数据,避免意外损失。备份还支持增量备份和压缩,优化存储空间和备份速度,为企业提供灵活、高效的数据管理解决方案。通过良好的备份策略,可以大大降低数据丢失风险,保障业务稳定运行。
配置备份目的地¶
首先,在 /etc/clickhouse-server/config.d/backup_disk.xml 中添加备份目的地配置:
<clickhouse>
<storage_configuration>
<disks>
<backups>
<type>local</type>
<path>/backups/</path>
</backups>
</disks>
</storage_configuration>
<backups>
<allowed_disk>backups</allowed_disk>
<allowed_path>/backups/</allowed_path>
</backups>
</clickhouse>
备份整个数据库¶
执行以下命令将整个数据库备份到指定磁盘:
恢复整个数据库¶
从备份文件中恢复整个数据库:
备份表¶
执行以下命令将表备份到指定磁盘:
恢复表¶
从备份文件中恢复表:
增量备份¶
指定基础备份进行增量备份:
BACKUP DATABASE my_database TO Disk('backups', 'incremental_backup.zip') SETTINGS base_backup = Disk('backups', 'database_backup.zip');
使用密码保护备份¶
备份文件使用密码保护:
BACKUP DATABASE my_database TO Disk('backups', 'protected_backup.zip') SETTINGS password='yourpassword';
压缩设置¶
指定压缩方法和级别:
BACKUP DATABASE my_database TO Disk('backups', 'compressed_backup.zip') SETTINGS compression_method='lzma', compression_level=3;
恢复特定分区¶
从备份中恢复特定分区:
RESTORE TABLE my_database.my_table PARTITIONS 'partition_id' FROM Disk('backups', 'table_backup.zip');
更多详细信息请参考 ClickHouse 文档。
24.4 关于Clickhouse的优化¶
在量化金融领域,处理大量实时数据至关重要。ClickHouse作为一款高性能列式数据库,提供了高效的查询和存储方案。然而,为了充分发挥其性能,必须对其进行优化。
硬件优化¶
- 存储设备:选择高性能 SSD,可以显著提高数据读取和写入速度。
- 内存:增加内存容量,有助于更快地处理大量数据。
- 网络:优化网络带宽和延迟,确保分布式集群间的数据传输效率。
配置优化¶
- 设置合适的分区策略:根据时间或其他关键维度分区,提高查询性能。
- 合并设置:配置合适的
merge_tree设置,优化数据合并过程,减少碎片。 - 缓存和内存设置:调整
mark_cache_size和max_memory_usage等参数,提升缓存命中率和内存使用效率。
查询优化¶
- 索引:利用主键和二级索引,加速查询。
- 并行查询:启用
max_threads参数,充分利用多核 CPU 并行处理查询。 - 物化视图:预计算常用查询结果,减少实时计算开销。
数据模型优化¶
- 列存储设计:尽量将频繁查询的列存储在一起,减少 I/O 开销。
- 压缩算法:选择合适的压缩算法,如
LZ4或ZSTD,在压缩率和性能之间取得平衡。
实践案例¶
以量化金融中的市场数据分析为例,优化 ClickHouse 的关键步骤如下:
- 分区策略:按日期分区,使查询特定时间段数据时更高效。
- 物化视图:预计算每日交易量、价格波动等关键指标,减少实时计算负担。
- 并行查询:调整
max_threads,确保查询时充分利用服务器多核资源。
监控和维护¶
- 监控工具:使用 ClickHouse 提供的
system表监控系统性能和查询效率。 - 定期维护:定期检查并优化分区和索引,防止性能下降。
通过合理的硬件配置、优化查询和数据模型设计,以及持续的监控和维护,ClickHouse 可以在量化金融领域中提供卓越的性能和可靠性,支持高频交易、实时数据分析等应用场景。以下我将介绍一些优化实例:
24.4.1. 硬件配置¶
<clickhouse>
<storage_configuration>
<disks>
<default>
<path>/var/lib/clickhouse/</path>
</default>
<ssd>
<type>local</type>
<path>/mnt/ssd/</path>
</ssd>
</disks>
</storage_configuration>
</clickhouse>
24.4.2. 创建分区表¶
CREATE TABLE market_data (
date Date,
symbol String,
price Float64,
volume UInt64
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (symbol, date);
24.4.3. 使用物化视图¶
CREATE MATERIALIZED VIEW market_summary
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (symbol, date)
AS
SELECT
symbol,
toYYYYMM(date) as month,
avg(price) as avg_price,
sum(volume) as total_volume
FROM market_data
GROUP BY symbol, month;
24.4.4. 并行查询¶
SET max_threads = 8;
SELECT
symbol,
avg(price) as avg_price
FROM market_data
WHERE date >= '2023-01-01' AND date <= '2023-12-31'
GROUP BY symbol;
24.4.5. 压缩算法¶
CREATE TABLE compressed_data (
date Date,
symbol String,
price Float64,
volume UInt64
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (symbol, date)
SETTINGS index_granularity = 8192,
compress_on_write = 1,
compression = 'lz4';
通过合理的硬件配置、优化查询和数据模型设计,以及持续的监控和维护,ClickHouse 可以在量化金融领域中提供卓越的性能和可靠性,支持高频交易、实时数据分析等应用场景。
案例1 通过Rust脚本在Clickhouse数据库中建表、删表、查询¶
在量化金融领域中,使用 Rust 脚本管理 ClickHouse 数据库可以实现高效的数据处理和管理。以下是一个基本案例 。
准备工作¶
首先,确保在你的 Cargo.toml 中添加 clickhouse 依赖:
创建 ClickHouse 客户端¶
定义并初始化一个 ClickHouse 客户端:
use clickhouse::{Client, Row};
use lazy_static::lazy_static;
lazy_static! {
pub static ref CLICKHOUSE_CLIENT: ClickHouseClient = ClickHouseClient::new();
}
pub struct ClickHouseClient {
pub client: Client,
}
impl ClickHouseClient {
pub fn new() -> Self {
let client = Client::default().with_url("http://localhost:8123").with_database("default");
ClickHouseClient { client }
}
}
创建表¶
创建一个新的表 market_data:
impl ClickHouseClient {
pub async fn create_table(&self) -> Result<(), clickhouse::error::Error> {
let query = r#"
CREATE TABLE market_data (
date Date,
symbol String,
price Float64,
volume UInt64
) ENGINE = MergeTree()
PARTITION BY date
ORDER BY (symbol, date)
"#;
self.client.query(query).execute().await
}
}
删除表¶
删除一个已存在的表:
impl ClickHouseClient {
pub async fn drop_table(&self, table_name: &str) -> Result<(), clickhouse::error::Error> {
let query = format!("DROP TABLE IF EXISTS {}", table_name);
self.client.query(&query).execute().await
}
}
查询数据¶
从表中查询数据:
#[derive(Debug, Serialize, Deserialize, Row)]
pub struct MarketData {
pub date: String,
pub symbol: String,
pub price: f64,
pub volume: u64,
}
impl ClickHouseClient {
pub async fn query_data(&self) -> Result<Vec<MarketData>, clickhouse::error::Error> {
let query = "SELECT * FROM market_data LIMIT 10";
let result = self.client.query(query).fetch_all::<MarketData>().await?;
Ok(result)
}
}
示例用法¶
完整的示例代码展示了如何使用这些功能:
#[tokio::main]
async fn main() -> Result<(), clickhouse::error::Error> {
let client = ClickHouseClient::new();
// 创建表
client.create_table().await?;
println!("Table created successfully.");
// 查询数据
let data = client.query_data().await?;
for row in data {
println!("{:?}", row);
}
// 删除表
client.drop_table("market_data").await?;
println!("Table dropped successfully.");
Ok(())
}
通过这个基本教程,你可以在 Rust 脚本中实现对 ClickHouse 数据库的基本管理操作。这些示例代码可以根据具体需求进行扩展和优化,以满足量化金融领域的复杂数据处理需求。
案例2 创建布林带表的 SQL 脚本示例¶
本案例展示如何利用 Rust 脚本与 ClickHouse 交互,计算布林带 (Bollinger Bands) 和其他技术指标,帮助金融分析师和量化交易员优化他们的交易策略。
-- 创建名为 AG2305_TEST 的表,使用 MergeTree 引擎
CREATE TABLE AG2305_TEST
ENGINE = MergeTree()
ORDER BY (minute, mean_lastprice) -- 按 minute 和 mean_lastprice 排序
AS
-- 从子查询中选择字段
SELECT outer_query.minute,
CAST(outer_query.mean_lastprice AS Float32) AS mean_lastprice, -- 将 mean_lastprice 转换为 Float32 类型
CAST(sma AS Float32) AS sma, -- 将 sma 转换为 Float32 类型
CAST(stddev AS Float32) AS stddev, -- 将 stddev 转换为 Float32 类型
CAST(upper AS Float32) AS upper, -- 将 upper 转换为 Float32 类型
CAST(lower AS Float32) AS lower, -- 将 lower 转换为 Float32 类型
CAST(super_upper AS Float32) AS super_upper, -- 将 super_upper 转换为 Float32 类型
CAST(super_lower AS Float32) AS super_lower, -- 将 super_lower 转换为 Float32 类型
CAST(ssuper_upper AS Float32) AS ssuper_upper, -- 将 ssuper_upper 转换为 Float32 类型
CAST(ssuper_lower AS Float32) AS ssuper_lower, -- 将 ssuper_lower 转换为 Float32 类型
CASE
-- 根据价格突破的不同情况赋值 bollinger_band_status
WHEN super_upper > mean_lastprice AND mean_lastprice > upper THEN 1 -- 向上突破2个标准差
WHEN super_lower < mean_lastprice AND mean_lastprice < lower THEN 2 -- 向下突破2个标准差
WHEN ssuper_upper > mean_lastprice AND mean_lastprice > super_upper THEN 3 -- 向上突破4个标准差
WHEN ssuper_lower < mean_lastprice AND mean_lastprice < super_lower THEN 4 -- 向下突破4个标准差
WHEN mean_lastprice > ssuper_upper THEN 5 -- 向上突破5个标准差
WHEN mean_lastprice < ssuper_lower AND upper != lower THEN 6 -- 向下突破5个标准差
ELSE 0 -- 在布林带内及其他情况
END AS bollinger_band_status
FROM (
-- 从内层查询中选择字段
SELECT subquery.minute,
subquery.mean_lastprice,
subquery.start_time,
-- 计算简单移动平均线 (SMA)
avg(subquery.mean_lastprice)
OVER (PARTITION BY subquery.start_time ORDER BY subquery.minute ASC) AS sma,
-- 计算标准差
stddevPop(subquery.mean_lastprice)
OVER (PARTITION BY subquery.start_time ORDER BY subquery.minute ASC) AS stddev,
-- 计算布林带上下轨
sma + 1.5 * stddevPop(subquery.mean_lastprice)
OVER (PARTITION BY subquery.start_time ORDER BY subquery.minute ASC) AS upper,
sma - 1.5 * stddevPop(subquery.mean_lastprice)
OVER (PARTITION BY subquery.start_time ORDER BY subquery.minute ASC) AS lower,
-- 计算更高和更低的布林带轨道
sma + 3 * stddevPop(subquery.mean_lastprice)
OVER (PARTITION BY subquery.start_time ORDER BY subquery.minute ASC) AS super_upper,
sma - 3 * stddevPop(subquery.mean_lastprice)
OVER (PARTITION BY subquery.start_time ORDER BY subquery.minute ASC) AS super_lower,
sma + 4 * stddevPop(subquery.mean_lastprice)
OVER (PARTITION BY subquery.start_time ORDER BY subquery.minute ASC) AS ssuper_upper,
sma - 4 * stddevPop(subquery.mean_lastprice)
OVER (PARTITION BY subquery.start_time ORDER BY subquery.minute ASC) AS ssuper_lower
FROM (
-- 最内层查询,按分钟聚合数据并计算均价
SELECT toStartOfMinute(datetime) AS minute,
AVG(lastprice) AS mean_lastprice,
-- 计算开始时间,根据 datetime 决定是当前日期的 21 点还是前一天的 21 点
CASE
WHEN toHour(datetime) < 21 THEN toDate(datetime) - INTERVAL 1 DAY + INTERVAL 21 HOUR
ELSE toDate(datetime) + INTERVAL 21 HOUR
END AS start_time
FROM futures.AG2305
GROUP BY minute, start_time
) subquery
) outer_query
ORDER BY outer_query.minute;
脚本解析¶
- 创建表:使用 MergeTree 引擎创建表
AG2305_TEST,并按minute和mean_lastprice排序。 - 选择字段:从内部查询中选择字段并进行类型转换。
- 布林带状态:根据价格与布林带上下轨的关系,确定
bollinger_band_status的值。 - 内部查询:
- 子查询:聚合每分钟的平均价格
mean_lastprice,并计算开始时间start_time。 - 布林带计算:计算简单移动平均线(SMA)和不同标准差倍数的上下轨。
通过这个逐行注释的 SQL 脚本,可以更好地理解如何在 ClickHouse 中创建复杂的计算表,并应用于量化金融领域的数据处理。