< 返回版块

bianweiall 发表于 2022-08-13 16:52

Tags:channel,thread

我在结构体里面定义了crossbeam_channel::channel的发送端和接收端,然后一个线程写,3个线程读,我想写完后关闭发送端,接收端收到发送端关闭退出,现在程序可以运行,只是不知道如何关闭,close_sender要怎么写?

use crossbeam_channel::{bounded, select, Receiver, Sender};
use std::error::Error;
use std::thread;
use std::time;

pub trait Exec {
    fn exec(&self);
}

#[derive(Debug)]
pub struct Job {
    id: u64,
    data: String,
}

impl Exec for Job {
    fn exec(&self) {
        println!("job.id: {}, job.data: {}", self.id, self.data);
    }
}

pub struct Server {
    workers: u32,
    sender: Sender<Job>,
    receiver: Receiver<Job>,
}

impl Server {
    fn new(workers: u32) -> Self {
        let (s, r) = bounded::<Job>(0);
        Server {
            workers,
            sender: s,
            receiver: r,
        }
    }

    fn close_sender(&self) {
        // drop(&self.sender)
    }

    fn add_job(&self, job: Job) -> Result<(), Box<dyn Error>> {
        let s = self.sender.clone();
        if let Err(err) = s.send(job) {
            return Err(Box::new(err));
        };
        Ok(())
    }

    fn run(&self) -> thread::Result<()> {
        let mut threads: Vec<thread::JoinHandle<()>> = Vec::with_capacity(3);

        for ch in 0..self.workers {
            let rc = self.receiver.clone();

            let th = thread::spawn(move || {
                println!("channel[{}] start", ch);
                loop {
                    select! {
                        recv(rc) -> ret => {
                            match ret{
                                Ok(job) =>{
                                    println!("recveie job successed, channel: {}, job: {:?}",ch,job);
                                    job.exec();
                                },
                                Err(err)=>{
                                    println!("recveie job failed: {:?}, channel: {}",err,ch);
                                    return;
                                }
                            }
                            thread::sleep(time::Duration::from_secs(1));
                        },
                        default(time::Duration::from_secs(5)) => println!("channel: {}, timed out",ch),
                    }
                }
            });
            threads.push(th);
        }

        for th in threads {
            println!("join start");
            th.join()?;
        }

        println!("server run finished");

        Ok(())
    }
}

#[cfg(test)]
mod test {
    use super::{Job, Server};
    use std::time;
    use std::{sync::Arc, thread};

    #[test]
    fn test_worker2() {
        let server = Arc::new(Server::new(3));
        let s1 = server.clone();
        thread::spawn(move || {
            for i in 0..10 {
                let job = Job {
                    id: i as u64,
                    data: format!(r#"{{"id":{},"name":"name-{}"}}"#, i, i),
                };
                s1.add_job(job).unwrap();
                thread::sleep(time::Duration::from_millis(100));
            }
            s1.close_sender();
            println!("send finished");
        });

        match server.run() {
            Ok(_) => print!("server finished"),
            Err(err) => println!("err:{:?}", err),
        }
    }
}

如果发送端不放在结构体里面,在外面创建channel,是可以关闭的

use crossbeam_channel::{select, Receiver};
use std::thread;
use std::time;

pub trait Exec {
    fn exec(&self);
}

#[derive(Debug)]
pub struct Job {
    id: u64,
    data: String,
}

impl Exec for Job {
    fn exec(&self) {
        println!("job.id: {}, job.data: {}", self.id, self.data);
    }
}

pub struct Server {
    workers: u32,
    receiver: Receiver<Job>,
}

impl Server {
    fn new(workers: u32, r: Receiver<Job>) -> Self {
        Server {
            workers,
            receiver: r,
        }
    }

    fn run(&self) -> thread::Result<()> {
        let mut threads: Vec<thread::JoinHandle<()>> = Vec::with_capacity(3);

        for ch in 0..self.workers {
            let rc = self.receiver.clone();

            let th = thread::spawn(move || {
                println!("channel[{}] start", ch);
                loop {
                    select! {
                        recv(rc) -> ret => {
                            match ret{
                                Ok(job) =>{
                                    println!("recveie job successed, channel: {}, job: {:?}",ch,job);
                                    job.exec();
                                },
                                Err(err)=>{
                                    println!("recveie job failed: {:?}, channel: {}",err,ch);
                                    return;
                                }
                            }
                            thread::sleep(time::Duration::from_secs(1));
                        },
                        default(time::Duration::from_secs(5)) => println!("channel: {}, timed out",ch),
                    }
                }
            });
            threads.push(th);
        }

        for th in threads {
            println!("join start");
            th.join()?;
        }

        println!("server run finished");

        Ok(())
    }
}

#[cfg(test)]
mod test {
    use crossbeam_channel::bounded;
    use std::time;
    use std::{panic, thread};

    use super::{Job, Server};

    #[test]
    fn test_worker() {
        let (s, r) = bounded::<Job>(0);
        thread::spawn(move || {
            for i in 0..10 {
                let job = Job {
                    id: i as u64,
                    data: format!(r#"{{"id":{},"name":"name-{}"}}"#, i, i),
                };
                s.send(job).unwrap();
                thread::sleep(time::Duration::from_millis(100));
            }
            drop(s);
            println!("send finished");
        });

        match Server::new(3, r).run() {
            Ok(_) => print!("server finished"),
            Err(err) => panic::resume_unwind(err),
        }
    }
}

评论区

写评论
Earthson 2022-08-23 19:06

直接drop掉发送端就行了, 接收端会收到Err, 判断下退出就可以了

1 共 1 条评论, 1 页