如何在Rust中使用ClickHouse

2021-04-16 12:54:02 浏览数 (1)

由于公众号不再按时间线推送,如果不想错过精彩内容,请在关注公众号后,点击右上角 ... 设为星标,感谢支持。


最近沉迷于学习 Rust,简单分享一篇在 Rust 中使用 ClickHouse 的方法。

Example 工程连接会放在末尾。

目前已经有实现的 clickhouse crates库,地址如下:

https://crates.io/crates/clickhouse-rs/

crates.io 上的版本有点问题,所以直接依赖它在 GitHub 上的版本

Cargo.toml:

代码语言:javascript复制
[dependencies]
clickhouse-rs = { git = "https://github.com/suharev7/clickhouse-rs", rev = "67c3bcb14751e05b83b3592fa9ee9f6ad2f408c6" }
tokio = {version = "*", features = ["full"]}

clickhouse-rs 基于异步编程实现,默认使用的是 tokio, 所以在依赖中需要一并添加。

新建一个 lib.rs,实现简单的连接逻辑,首先新建一个结构体:

代码语言:javascript复制
pub struct ClickHouseEngine {
    pool: Pool,
}

接着为其定义关联函数:

代码语言:javascript复制
impl ClickHouseEngine {
    pub fn new(database_url: &str) -> Self {
        let pool = Pool::new(database_url);
        ClickHouseEngine { pool }
    }
 }

实现简单的 ddl、insert 和 select 方法,支持链式调用:

代码语言:javascript复制
impl ClickHouseEngine {
    pub fn new(database_url: &str) -> Self {
        let pool = Pool::new(database_url);
        ClickHouseEngine { pool }
    }

   pub async fn ddl_str(&self, ddl: &str) -> Result<(), Box<dyn Error>> {
        let mut client = self.pool.get_handle().await?;
        client.execute(ddl).await?;
        Ok(())
    }

   pub async fn query_str(&self, sql: &str) -> Result<Block<Complex>, Box<dyn Error>> {
        let mut client = self.pool.get_handle().await?;
        let block = client.query(sql).fetch_all().await?;
        Ok(block)
    }

    pub async fn insert_block(&self, table_name: &str, block: Block) -> Result<(), Box<dyn Error>> {
        let mut client = self.pool.get_handle().await?;
        client.insert(table_name, block).await?;
        Ok(())
    }

}

完成之后,直接在 lib.rs 中为其编写单元测试:

代码语言:javascript复制
#[cfg(test)]
mod tests {

    use super::*;

    async fn print_row(block: Block<Complex>) -> Result<(), Box<dyn Error>> {
        println!("count:{} ", block.rows().count());
        for row in block.rows() {
            let id: u32 = row.get("product_id")?;
            let amount: u32 = row.get("price")?;
            let name: Option<&str> = row.get("product_name")?;
            println!("Found  {}: {} {:?}", id, amount, name);
        }
        Ok(())
    }

    #[tokio::test]
    async fn test_qb() -> Result<(), Box<dyn Error>> {
        let ddl = r"
        CREATE TABLE IF NOT EXISTS t_product (
            product_id  UInt32,
            price       UInt32,
            product_name Nullable(FixedString(5))
        ) Engine=Memory";

        let block = Block::new()
            .column("product_id", vec![1_u32, 3, 5, 7, 9])
            .column("price", vec![2_u32, 4, 6, 8, 10])
            .column(
                "product_name",
                vec![Some("foo"), Some("foo"), None, None, Some("bar")],
            );

        let database_url = "tcp://10.37.129.9:9000/default?compression=lz4&ping_timeout=42ms";

        let ce = ClickHouseEngine::new(database_url);
        ce.ddl_str(ddl).await?;
        ce.insert_block("t_product", block).await?;

        let block = ce.query_str("SELECT * FROM t_product").await?;
        print_row(block).await?;
        Ok(())
    }

}

执行单元测试,可以看到功能正常:

代码语言:javascript复制
    Finished test [unoptimized   debuginfo] target(s) in 0.12s
     Running target/debug/deps/rust2ch-7eda52001fbe25f8
count:5 
Found  1: 2 Some("foo")
Found  3: 4 Some("foo")
Found  5: 6 None
Found  7: 8 None
Found  9: 10 Some("bar")

Process finished with exit code 0

接下来,我们将新增和查询功能制作成命令行工具,在 main.rs 中定义结构体和枚举,用来封装 shell 接收的参数:

代码语言:javascript复制
/// A rust2ch example
#[derive(StructOpt,Debug)]
struct Args {
    #[structopt(subcommand)]
    cmd: Option<Command>,
}

#[derive(StructOpt,Debug)]
enum Command {
    /// Add new product
    Add {product_name: String},
    /// List products
    List {id: String},
}

这里使用了非常好用的 StructOpt,它可以帮助我们生成一系列帮助功能,例如对编译好的二进制执行 -help 命令:

代码语言:javascript复制
(base) nauu@Boness-MBP debug % ./rust2ch -h    
rust2ch 0.1.0
A rust2ch example

USAGE:
    rust2ch [SUBCOMMAND]

FLAGS:
    -h, --help       Prints help information
    -V, --version    Prints version information

SUBCOMMANDS:
    add     Add new product
    help    Prints this message or the help of the given subcommand(s)
    list    List products

是不是有点那意思了?

接着在通过枚举匹配输入:

代码语言:javascript复制
 match opt.cmd {
        Some(Command::Add { product_name, .. }) => {
            println!("Add new product with product_name '{}'", &product_name);
            ce.ddl_str(ddl).await?;
            ce.insert_block("t_product", block).await?;
        }
        Some(Command::List { id }) => {
            let block = ce.query_str("SELECT * FROM t_product").await?;
            println!("count:{} ", block.rows().count());
            for row in block.rows() {
                let id: u32 = row.get("product_id")?;
                let amount: u32 = row.get("price")?;
                let name: Option<&str> = row.get("product_name")?;
                println!("Found  {}: {} {:?}", id, amount, name);
            }
        }
        _ => {}
    }

将工程 Build 成二进制文件之后,就可以直接使用了,例如:

代码语言:javascript复制
(base) nauu@Boness-MBP debug % ./rust2ch list 1
count:5 
Found  1: 2 Some("foo")
Found  3: 4 Some("foo")
Found  5: 6 None
Found  7: 8 None
Found  9: 10 Some("bar")

好了今天的分享就到这里吧,代码工程比较简单,只做抛砖引玉之用,地址如下:https://github.com/nauu/rust2ch

Rust 作为一款新兴的语言十分强大,未来肯定会有所作为,赶快学起来吧

原创不易,如果这篇文章对你有帮助,欢迎 点赞、转发、在看 三连击

欢迎大家扫码关注我的公众号和视频号:

0 人点赞