< 返回版块

eweca 发表于 2020-07-07 09:34

Tags:mpsc; thread

我是一名rust新手,看到TRPL的16.2的消息传递里,里面的MPSC模型有如下范例:

 let (tx, rx) = mpsc::channel();

    let tx1 = mpsc::Sender::clone(&tx);
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }

这个代码运行良好,但是当我尝试用循环加更多线程的时候,问题就出来了: (已修改,主要是tx本身没被drop,手动drop就好了)

for _ in 0..4 {
        let tx1 = mpsc::Sender::clone(&tx);
        thread::spawn(move || {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("thread"),
            ];
        
            for val in vals {
                tx1.send(val).unwrap();
                thread::sleep(Duration::from_secs(1));
            }
        });
    }

    drop(tx);
    
    for received in rx {
        println!("Got: {}", received);
    }

当接听完所有的消息后,线程看起来似乎没有被关闭,然后就静止了。这是什么原理呢?该怎么解决呢?谢谢诸位了!

评论区

写评论
fakeshadow 2020-07-13 13:42

你这个应该是没有等待spawn的线程完成就退出了main。

fn main() {
    let handle = thread::spawn(|| { /* 代码 */ });
    handle.join(); //等待线程结束。
}

--
👇
pader: 相反,我这里单生产者,多消费者有问题:

use std::thread;
use std::sync::{mpsc, Arc, Mutex};
use std::time::Duration;
use threadpool::ThreadPool;

fn main() {
    let (tx, rx) = mpsc::channel();
    let rx = Arc::new(Mutex::new(rx));

    //引用计数形式获取互斥锁的共享接收器
    let rx1 = Arc::clone(&rx);
    thread::spawn(move || {
        loop {
            let v = rx1.lock().unwrap().recv().unwrap();
            println!("Thread 1 get {}", v);
            // thread::sleep(Duration::from_millis(10));
        }
    });

    let rx2 = Arc::clone(&rx);
    thread::spawn(move || {
        loop {
            let v = rx2.lock().unwrap().recv().unwrap();
            println!("Thread 2 get {}", v);
            // thread::sleep(Duration::from_millis(10));
        }
    });

    for i in 0..200 {
        tx.send(format!("Hello {}", i)).unwrap();
    }

    thread::sleep(Duration::from_millis(100));
}

当把子线程里的 sleep 放出来后,就不正常了,我这里觉得这个 tx,rx 本身并没有队列的功能,所以他们是无法堆积队列的。线程一旦进入休息,send 出去的东西无法被消费,tx 就停了。

pader 2020-07-12 22:56

相反,我这里单生产者,多消费者有问题:

use std::thread;
use std::sync::{mpsc, Arc, Mutex};
use std::time::Duration;
use threadpool::ThreadPool;

fn main() {
    let (tx, rx) = mpsc::channel();
    let rx = Arc::new(Mutex::new(rx));

    //引用计数形式获取互斥锁的共享接收器
    let rx1 = Arc::clone(&rx);
    thread::spawn(move || {
        loop {
            let v = rx1.lock().unwrap().recv().unwrap();
            println!("Thread 1 get {}", v);
            // thread::sleep(Duration::from_millis(10));
        }
    });

    let rx2 = Arc::clone(&rx);
    thread::spawn(move || {
        loop {
            let v = rx2.lock().unwrap().recv().unwrap();
            println!("Thread 2 get {}", v);
            // thread::sleep(Duration::from_millis(10));
        }
    });

    for i in 0..200 {
        tx.send(format!("Hello {}", i)).unwrap();
    }

    thread::sleep(Duration::from_millis(100));
}

当把子线程里的 sleep 放出来后,就不正常了,我这里觉得这个 tx,rx 本身并没有队列的功能,所以他们是无法堆积队列的。线程一旦进入休息,send 出去的东西无法被消费,tx 就停了。

作者 eweca 2020-07-07 10:21

我试了下,我懂你的意思了。。。我以为你说的tx1。。。你的意思是循环版本tx本身没被DROP,虽然其他所有cloned tx被drop了,但是tx本身没drop,所以管道没有被关闭是吧。谢谢!应该是这样的,我试了下。

--
👇
eweca1992: 抱歉,我太菜了,没懂,为什么能在第二个spawn里drop,那么第二个spawn里的tx怎么drop的呢?

--
👇
liyiheng: 没有循环的版本里, 在第二个 spawn 里 drop 的

--
👇
eweca1992: 我觉得应该是,但是我没搞懂为什么没有自动drop啊。。。如果不写循环就可以自动drop呀。

--
👇
liyiheng: 是不是因为 tx 没 drop ?

作者 eweca 2020-07-07 10:15

抱歉,我太菜了,没懂,为什么能在第二个spawn里drop,那么第二个spawn里的tx怎么drop的呢?

--
👇
liyiheng: 没有循环的版本里, 在第二个 spawn 里 drop 的

--
👇
eweca1992: 我觉得应该是,但是我没搞懂为什么没有自动drop啊。。。如果不写循环就可以自动drop呀。

--
👇
liyiheng: 是不是因为 tx 没 drop ?

liyiheng 2020-07-07 10:03

没有循环的版本里, 在第二个 spawn 里 drop 的

--
👇
eweca1992: 我觉得应该是,但是我没搞懂为什么没有自动drop啊。。。如果不写循环就可以自动drop呀。

--
👇
liyiheng: 是不是因为 tx 没 drop ?

作者 eweca 2020-07-07 09:59

我觉得应该是,但是我没搞懂为什么没有自动drop啊。。。如果不写循环就可以自动drop呀。

--
👇
liyiheng: 是不是因为 tx 没 drop ?

liyiheng 2020-07-07 09:53

是不是因为 tx 没 drop ?

1 共 7 条评论, 1 页