< 返回版块

hezijiangjiang 发表于 2024-01-02 15:10

Tags:rust 学习,流数据库,RisingWave,开源

作者:温一鸣 RisingWave Labs 内核开发工程师

在 RisingWave 的存储代码中,我们使用 RAII [1] 的思想来对 LSM iterator 的 metrics 进行监控,从而避免在代码中忘记上报 metrics, 导致 metrics 丢失。在实现中,我们实现了一个 rust 的 struct MonitoredStateStoreIterStats 去收集 LSM iterator 的 metrics,去统计 iterator 中 key 的数量和长度,并为这个 struct 实现了 Drop,在这个 struct 被释放的时候把在本地收集的 metrics 上报 prometheus。通过这种方式,我们不需要在每次在 iterator 使用完后都手动上报 metrics,从而避免了由于代码的疏忽导致忘记上报 metrics。

以下是一段简化过的代码。我们通过 try_stream[2] 这个宏来封装一个 iterator 的 stream来收集这个 stream 的 metrics。在返回的 stream 被释放时,stats 随着 stream 被释放,并调用其 drop 方法来上报收集到的 metrics。

pub struct MonitoredStateStoreIter<S> {
    inner: S,
    stats: MonitoredStateStoreIterStats,
}

struct MonitoredStateStoreIterStats {
    total_items: usize,
    total_size: usize,
    storage_metrics: Arc<MonitoredStorageMetrics>,
}

impl<S: StateStoreIterItemStream> MonitoredStateStoreIter<S> {
    #[try_stream(ok = StateStoreIterItem, error = StorageError)]
    async fn into_stream_inner(mut self) {
        let inner = self.inner;
        futures::pin_mut!(inner);
        while let Some((key, value)) = inner
            .try_next()
            .await
            .inspect_err(|e| error!("Failed in next: {:?}", e))?
        {
            self.stats.total_items += 1;
            self.stats.total_size += key.encoded_len() + value.len();
            yield (key, value);
        }
    }
}

impl Drop for MonitoredStateStoreIterStats {
    fn drop(&mut self) {
        self.storage_metrics
            .iter_item
            .observe(self.total_items as f64);
        self.storage_metrics
            .iter_size
            .observe(self.total_size as f64);
    }
}

然而,在使用过程中,我们遇到了上报的 metrics 全部为0的问题。

最小复现

由于使用了 try_stream 宏来生成 stream,因此我们怀疑在 try_stream 生成的代码中有bug导致 metrics 丢失。于是我们用 cargo-expand [3] 来将查看宏生成的代码。展开后的代码如下

fn into_stream_inner(
    mut self,
) -> impl Stream<Item = StorageResult<StateStoreIterItem>
> {
    ::futures_async_stream::__private::try_stream::from_generator(static move |
        mut __task_context: ::futures_async_stream::__private::future::ResumeTy,
    | -> ::futures_async_stream::__private::Result<(), StorageError> {
        let (): () = {
            let inner = self.inner;
            let mut inner = inner;
            #[allow(unused_mut)]
                let mut inner = unsafe {
                ::pin_utils::core_reexport::pin::Pin::new_unchecked(
                    &mut inner,
                )
            };
            while let Some((key, value))
                = {
                let mut __pinned = inner.try_next();
                loop {
                    if let ::futures_async_stream::__private::Poll::Ready(
                        result,
                    ) = unsafe {
                        poll(Pin::as_mut(&mut __pinned), get_context(__task_context))
                    } {
                        break result;
                    }
                    __task_context = (yield ::futures_async_stream::__private::Poll::Pending);
                }
            }?
            {
                self.stats.total_items += 1;
                self.stats.total_size += key.encoded_len() + value.len();
                __task_context = (yield ::futures_async_stream::__private::Poll::Ready((
                    key,
                    value,
                )));
            }
        };
        #[allow(unreachable_code)]
        {
            return ::futures_async_stream::__private::Ok(());
            loop {
                __task_context = (yield ::futures_async_stream::__private::Poll::Pending);
            }
        }
    })
}

可以看到, try_stream 宏生成的代码中,包含了一个 rust generator 的闭包。闭包中收集和上报 metrics 的逻辑与原代码基本相同,按照我们对 rust 的理解,仍然不应该会出现 metrics 丢失的问题。因此我们怀疑是 rust 编译器中与 generator 相关的逻辑存在问题。在 rust playground 上,我们尝试了以下代码来对问题进行复现。

struct Stat {
    count: usize,
    vec: Vec<u8>,
}

impl Drop for Stat {
    fn drop(&mut self) {
        println!("count: {}", self.count);
    }
}

fn main() {

    let mut stat = Stat {
        count: 0,
        vec: Vec::new(),
    };

    let mut f = move || {
        stat.count += 1;
        1
    };

    println!("num: {}", f());
}

执行以后输出如下。

num: 1
count: 0

按照预期,输出的 num 和 count 应该都为1,因为在调用闭包 f 时 stat.count += 1被调用了,但是以上代码中遇到了和最开始同样的问题。因此以上代码可以作为我们问题的一个最小复现。

问题分析

对以上代码进行分析的话,我们看到闭包 f 的代码中使用了 move,因此在闭包中使用过的对象的 ownership 应该都会转移到闭包中。而 struct Stats 实现了 Drop,因此 Stats是不可以 partial move 的,其必须作为一个整体被 move 进入闭包。而在闭包中执行了 stats.count += 1,因此 stats 中的 count 应该被置为1。但是从程序的输出可以看到在 stats 被 drop 时, stats 的 count 是0。

我们尝试将闭包 f 改为如下代码,来显式的将 stats 的 ownership 给 move 进闭包里。

let mut f = move || {
    let mut stat = stat;
    stat.count += 1;
    1
};

输出恢复正常。

num: 1
count: 1

我们再次尝试在闭包 f 中使用 stat 中的另一个字段 vec

let mut f = move || {
    let _ = stat.vec.len();
    stat.count += 1;
    1
};

输出同样恢复正常。

num: 1
count: 1

可以看到,我们显式地将 stat 整个 move 进闭包,或者在闭包中使用类型为 vec 的字段,都会使得 stat 的ownership 被 move 进闭包。

于是我们推测,尽管 stat 实现了自己的 drop 导致不能被 partial move,但是如果我们在 move 的闭包中只使用了 stat 中实现了 Copy类型的字段,则这个字段的值会被 Copy 到闭包中,而闭包中对这个字段的修改只会作用于被 Copy 后的值,而原字段并不会改变。

验证猜想

我们可以通过将以上代码编译成二进制代码后,对其汇编代码进行分析,从而验证我们的猜想。然而,编译后的汇编代码会过于复杂且晦涩难懂,同时编译器对其进行的一些优化也会改变原有的逻辑导致汇编代码难以理解。因此我们打算通过分析在编译过程中产生的 MIR 中间代码来对问题进行分析。在 rust playground 上可以十分方便地生成 MIR 代码。

首先我们对存在问题的最小复现代码生成 MIR,生成后与闭包相关的 MIR 如下。可以看到这个闭包确实只包含了一个类型为 usize 的字段,这个字段的值取的是 stat 中的 count值。

bb1: {
    _1 = Stat { count: const 0_usize, vec: move _2 };
    _3 = {closure@src/main.rs:19:17: 19:24} { stat: (_1.0: usize) };
}

而我们对后续测试中有正常输出的代码生成 MIR,生成后与闭包相关的 MIR 如下。可以看到这个闭包将整个 stat 的ownership 给 move 了进去。

bb1: {
		_1 = Stat { count: const 0_usize, vec: move _2 };
		_3 = {closure@src/main.rs:19:17: 19:24} { stat: move _1 };
}

于是,我们的猜想得到了验证,在我们出现问题的代码中,闭包确实没有捕获 stat 的 ownership。

后续与总结

我们将这个问题向 rust 社区反映了这个问题,得到的反馈是,这个是 rust 2021 后实现的一个 feature [4]。在 rust 2021 中,一个使用了 move 的闭包在捕获一个 struct 的时候,会尽可能少地去捕获 struct 中的字段。

  • 如果一个 struct 没有实现 Drop,这意味着他里面的字段可以被分开 move,而闭包只会捕获闭包中用到的字段,
  • 如果某个被闭包使用的字段实现了 Copy,那他闭包并不会捕获这个字段的 ownership,而是将这个字段 copy 一份放在闭包中。
  • 如果一个struct 实现了 Drop,那他里面的字段只能作为一个整体被捕获。但如果闭包只使用了这个闭包中实现了 Copy 的字段,那这个闭包不会捕获这个 struct,而是将使用到的字段 copy 一份。

我们的代码中,正是因为这个行为,导致我们的代码产生了歧义,而出现了 metrics 的丢失。

针对这个问题,我们认为有两个地方有提升的空间。

  • 首先, try_stream 这个宏的封装存在一定的问题。在使用宏来声明代码中,其暴露出来的使用方法是通过调用一个方法来生成 stream,而在调用方法时,如果参数是通过 move ownership 的形式传入的,同时在生成 stream 的代码中我们使用了这个参数,那我们应该认为这个 stream 包含了这个参数的 ownership。然而,由于这个宏在实现的时候使用了闭包,导致这个 stream 并没有包含这个参数的 ownership,从而导致问题。这个是宏封装逻辑时的问题。
  • 其次,rust 在语言设计上,由于引入了这个闭包捕获 ownership 的特殊逻辑,导致会写出有歧义的代码。例如,在上述代码中,很难想象 stat.count += 1 并没有去修改 stat 中的 count 值。我们也向 rust 社区反映了这个问题 [5]。

最后,回到我们最开始的问题中。要想解决 metrics 丢失的问题,在我们的代码中,我们只需要做以下修改就能让代码正常运行 [6]。

#[try_stream(ok = StateStoreIterItem, error = StorageError)]
async fn into_stream_inner(mut self) {
    let inner = self.inner;
    ...
    self.stats.total_items += 1;
    self.stats.total_size += key.encoded_len() + value.len();
}

修改为

#[try_stream(ok = StateStoreIterItem, error = StorageError)]
async fn into_stream_inner(self) {
    let inner = self.inner;
    let mut stats = self.stats;
    ...
    stats.total_items += 1;
    stats.total_size += key.encoded_len() + value.len();
}

引用

[1] https://en.wikipedia.org/wiki/Resource_acquisition_is_initialization

[2] https://docs.rs/futures-async-stream/latest/futures_async_stream/index.html

[3] https://crates.io/crates/cargo-expand

[4] https://doc.rust-lang.org/edition-guide/rust-2021/disjoint-capture-in-closures.html

[5] https://github.com/rust-lang/rust/issues/108808

[6] https://github.com/risingwavelabs/risingwave/pull/8372

评论区

写评论

还没有评论

1 共 0 条评论, 1 页