< 返回版块

griffenliu 发表于 2023-07-14 11:26

项目里面碰到一个奇怪的问题,在使用Process调用外部程序时,出现外部程序被卡住的情况,这时候线程也会被卡住,想找到一个可以强制终止执行线程的方法,Rust可以实现吗???

评论区

写评论
作者 griffenliu 2023-07-20 10:01

实际测试中还是碰到了其他的问题。我执行的这个外部命令是通过make调用gradle进行编译打包的,经过观察使用child.kill()确实会把make进程给干掉,但是gradle进程并没有被干掉,在gradle没被干掉的时候rust启动的线程还是在运行不会终止。各位大佬有解决这个问题的办法吗?(注:gradle概率假死,导致资源不会被释放) 下面是代码:

pub async fn execute_command(
    cmd: &str,
    work_dir: &str,
    args: Option<Vec<&str>>,
    log_name: String,
) -> AppResult<()> {
    info!("execute: {} {:?}", cmd, args);

    let mut command = Command::new(cmd);
    command
        .current_dir(work_dir)
        .stdout(Stdio::piped())
        .stderr(Stdio::piped());

    let key = format!("{} {:?}", cmd, args);
    if let Some(args) = args {
        command.args(args);
    }

    let mut child = command.spawn()?;

    let (send, recv) = channel::<()>();
    {
        get_signal_senders().insert(key.clone(), send);
    }

    let stdout = match child.stdout.take() {
        Some(s) => s,
        None => return Ok(()),
    };
    // 虽然后面的child.kill()执行了,但是这里因为gradle进程还在执行并不会终止
    std::thread::spawn(move || {
        tracing::subscriber::with_default(create_log_subscriber(&log_name), || {
            let mut reader = BufReader::new(stdout).lines();
            while let Some(line) = match get_runtime().block_on(reader.next_line()) {
                Ok(line) => line,
                Err(e) => {
                    warn!("读取错误: {}", e);
                    return;
                }
            } {
                info!("{}", line);
            }
            info!("===> 命令执行结束,输出线程退出!!!");
        });
    });
    tokio::select! {
        rstatus = child.wait() => {
            get_signal_senders().remove(&key);
            match rstatus {
                Ok(status) => {
                    if !status.success() {
                        match child.stderr.take() {
                            Some(stderr) => {
                                let mut reader = BufReader::new(stderr).lines();
                                while let Ok(Some(line)) = reader.next_line().await {
                                    error!("{}", line);
                                }
                            }
                            None => warn!("未获取到标准错误输出,忽略..."),
                        }
                        return Err(anyhow::anyhow!("命令执行失败: {}", status.to_string()));
                    }
                }
                Err(e) => {
                    error!("执行命令错误: {}", e);
                    return Err(anyhow::anyhow!("命令执行失败: {}", e));
                }
            }
        }
        _ = recv => {
            child.kill().await.expect("kill failed");
            return Err(anyhow::anyhow!("命令被强行终止,执行失败!"));
        }
    };
    Ok(())
}

--
👇
griffenliu: 在tokio的API中找到了这个,应该可以解决我的问题了。

/// Forces the child to exit.
    ///
    /// This is equivalent to sending a SIGKILL on unix platforms.
    ///
    /// If the child has to be killed remotely, it is possible to do it using
    /// a combination of the select! macro and a oneshot channel. In the following
    /// example, the child will run until completion unless a message is sent on
    /// the oneshot channel. If that happens, the child is killed immediately
    /// using the `.kill()` method.
    ///
    /// ```no_run
    /// use tokio::process::Command;
    /// use tokio::sync::oneshot::channel;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let (send, recv) = channel::<()>();
    ///     let mut child = Command::new("sleep").arg("1").spawn().unwrap();
    ///     tokio::spawn(async move { send.send(()) });
    ///     tokio::select! {
    ///         _ = child.wait() => {}
    ///         _ = recv => child.kill().await.expect("kill failed"),
    ///     }
    /// }
    /// ```

--
👇
griffenliu: 嗯,确实,控制线程很难,综合前面几位大佬的回复,我基本确定了应该在适合的时候终止Command而不是想着怎么去杀死线程。我使用的是tokio::process:Command。下面是代码,大佬帮忙看看如何修改来在卡死的时候自动终止?万分感谢:

pub async fn execute_command(cmd: &str, work_dir: &str, args: Option<Vec<&str>>) -> AppResult<()> {
    info!("execute: {} {:?}", cmd, args);

    let mut command = Command::new(cmd);
    command
        .current_dir(work_dir)
        .stdout(Stdio::piped())
        .stderr(Stdio::piped());
    if let Some(args) = args {
        for arg in args {
            command.arg(arg);
        }
    }

    let mut child = command.spawn()?;

    let stdout = match child.stdout.take() {
        Some(s) => s,
        None => return Ok(()),
    };

    let mut reader = BufReader::new(stdout).lines();
    while let Some(line) = reader.next_line().await? {
        info!("{}", line);
    }

    let status = child.wait().await?;
    if !status.success() {
        match child.stderr.take() {
            Some(stderr) => {
                let mut reader = BufReader::new(stderr).lines();
                while let Some(line) = reader.next_line().await? {
                    error!("{}", line);
                }
            }
            None => warn!("未获取到标准错误输出,忽略..."),
        };

        return Err(anyhow!("Execute Command Error!"));
    }
    Ok(())
}

--
👇
Pikachu: 我不理解。

你调用外部程序用的是std::process::Command吗?如果是的话,这个spawn出来的Child是支持try_wait的。

从线程外部取消线程在哪个语言里面都是个难题,因为很难做资源释放之类的。

作者 griffenliu 2023-07-18 18:01

在tokio的API中找到了这个,应该可以解决我的问题了。

/// Forces the child to exit.
    ///
    /// This is equivalent to sending a SIGKILL on unix platforms.
    ///
    /// If the child has to be killed remotely, it is possible to do it using
    /// a combination of the select! macro and a oneshot channel. In the following
    /// example, the child will run until completion unless a message is sent on
    /// the oneshot channel. If that happens, the child is killed immediately
    /// using the `.kill()` method.
    ///
    /// ```no_run
    /// use tokio::process::Command;
    /// use tokio::sync::oneshot::channel;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let (send, recv) = channel::<()>();
    ///     let mut child = Command::new("sleep").arg("1").spawn().unwrap();
    ///     tokio::spawn(async move { send.send(()) });
    ///     tokio::select! {
    ///         _ = child.wait() => {}
    ///         _ = recv => child.kill().await.expect("kill failed"),
    ///     }
    /// }
    /// ```

--
👇
griffenliu: 嗯,确实,控制线程很难,综合前面几位大佬的回复,我基本确定了应该在适合的时候终止Command而不是想着怎么去杀死线程。我使用的是tokio::process:Command。下面是代码,大佬帮忙看看如何修改来在卡死的时候自动终止?万分感谢:

pub async fn execute_command(cmd: &str, work_dir: &str, args: Option<Vec<&str>>) -> AppResult<()> {
    info!("execute: {} {:?}", cmd, args);

    let mut command = Command::new(cmd);
    command
        .current_dir(work_dir)
        .stdout(Stdio::piped())
        .stderr(Stdio::piped());
    if let Some(args) = args {
        for arg in args {
            command.arg(arg);
        }
    }

    let mut child = command.spawn()?;

    let stdout = match child.stdout.take() {
        Some(s) => s,
        None => return Ok(()),
    };

    let mut reader = BufReader::new(stdout).lines();
    while let Some(line) = reader.next_line().await? {
        info!("{}", line);
    }

    let status = child.wait().await?;
    if !status.success() {
        match child.stderr.take() {
            Some(stderr) => {
                let mut reader = BufReader::new(stderr).lines();
                while let Some(line) = reader.next_line().await? {
                    error!("{}", line);
                }
            }
            None => warn!("未获取到标准错误输出,忽略..."),
        };

        return Err(anyhow!("Execute Command Error!"));
    }
    Ok(())
}

--
👇
Pikachu: 我不理解。

你调用外部程序用的是std::process::Command吗?如果是的话,这个spawn出来的Child是支持try_wait的。

从线程外部取消线程在哪个语言里面都是个难题,因为很难做资源释放之类的。

作者 griffenliu 2023-07-18 10:17

嗯,确实,控制线程很难,综合前面几位大佬的回复,我基本确定了应该在适合的时候终止Command而不是想着怎么去杀死线程。我使用的是tokio::process:Command。下面是代码,大佬帮忙看看如何修改来在卡死的时候自动终止?万分感谢:

pub async fn execute_command(cmd: &str, work_dir: &str, args: Option<Vec<&str>>) -> AppResult<()> {
    info!("execute: {} {:?}", cmd, args);

    let mut command = Command::new(cmd);
    command
        .current_dir(work_dir)
        .stdout(Stdio::piped())
        .stderr(Stdio::piped());
    if let Some(args) = args {
        for arg in args {
            command.arg(arg);
        }
    }

    let mut child = command.spawn()?;

    let stdout = match child.stdout.take() {
        Some(s) => s,
        None => return Ok(()),
    };

    let mut reader = BufReader::new(stdout).lines();
    while let Some(line) = reader.next_line().await? {
        info!("{}", line);
    }

    let status = child.wait().await?;
    if !status.success() {
        match child.stderr.take() {
            Some(stderr) => {
                let mut reader = BufReader::new(stderr).lines();
                while let Some(line) = reader.next_line().await? {
                    error!("{}", line);
                }
            }
            None => warn!("未获取到标准错误输出,忽略..."),
        };

        return Err(anyhow!("Execute Command Error!"));
    }
    Ok(())
}

--
👇
Pikachu: 我不理解。

你调用外部程序用的是std::process::Command吗?如果是的话,这个spawn出来的Child是支持try_wait的。

从线程外部取消线程在哪个语言里面都是个难题,因为很难做资源释放之类的。

Pikachu 2023-07-14 21:31

我不理解。

你调用外部程序用的是std::process::Command吗?如果是的话,这个spawn出来的Child是支持try_wait的。

从线程外部取消线程在哪个语言里面都是个难题,因为很难做资源释放之类的。

Bai-Jinlin 2023-07-14 17:55

别用pthread的线程取消,不可靠,要停止线程别用这种方法。

--
👇
griffenliu: 好的,有空尝试一下,之前找到一个很老的库 https://crates.io/crates/stop-thread 试了一下,线程是干掉了,但是同时也把主进程给干掉了😂

--
👇
ruby: libc::pthread_kill or libc::pthread_cancel 但不可靠所以标准库也没收录这两个方法

https://docs.rs/libc/latest/libc/fn.pthread_kill.html

作者 griffenliu 2023-07-14 17:44

好的,有空尝试一下,之前找到一个很老的库 https://crates.io/crates/stop-thread 试了一下,线程是干掉了,但是同时也把主进程给干掉了😂

--
👇
ruby: libc::pthread_kill or libc::pthread_cancel 但不可靠所以标准库也没收录这两个方法

https://docs.rs/libc/latest/libc/fn.pthread_kill.html

ruby 2023-07-14 17:02

libc::pthread_kill or libc::pthread_cancel 但不可靠所以标准库也没收录这两个方法

https://docs.rs/libc/latest/libc/fn.pthread_kill.html

ruby 2023-07-14 17:01

可以用 glibc 函数 pthread_kill/pthread_cancel 试试,但不保证好用,Linux 本身就没有提供终结线程的靠谱方法,一个卡住的线程真就没辙了只能重启

作者 griffenliu 2023-07-14 15:01

我这边也没有使用超时的机制,就是前台提供了一个接口,使用接口手动终止任务,如果实在实现不了终止线程,终止外部的命令也是可行的方案(就是感觉不爽,还要绕一下)。

--
👇
Bai-Jinlin: 设定个超时时间,要是超过超时时间外部程序还没有输出就终止这个外部程序程序,还是挺好写的。

作者 griffenliu 2023-07-14 14:56

thread_handle.thread().unpark(); 这个可以终止掉被Process阻塞的线程吗? 看文档说是和自旋锁相关,和park()成对使用的,这个需要测试一下。

👇
dierbei: 不知道这样的方法适合不

--
👇
dierbei:

use std::thread;
use std::time::{Duration, Instant};

fn main() {
    let thread_handle = thread::spawn(move || {
        // 线程执行体
        let start = Instant::now();
        let timeout = Duration::from_secs(10);

        while start.elapsed() < timeout {
            // 执行任务...
        }
    });

    // 设置超时时间
    thread::park_timeout(Duration::from_secs(5));

    // 终止线程
    thread_handle.thread().unpark();
    thread_handle.join().expect("Failed to join the thread");
}

Bai-Jinlin 2023-07-14 13:26

设定个超时时间,要是超过超时时间外部程序还没有输出就终止这个外部程序程序,还是挺好写的。

dierbei 2023-07-14 11:57

不知道这样的方法适合不

--
👇
dierbei:

use std::thread;
use std::time::{Duration, Instant};

fn main() {
    let thread_handle = thread::spawn(move || {
        // 线程执行体
        let start = Instant::now();
        let timeout = Duration::from_secs(10);

        while start.elapsed() < timeout {
            // 执行任务...
        }
    });

    // 设置超时时间
    thread::park_timeout(Duration::from_secs(5));

    // 终止线程
    thread_handle.thread().unpark();
    thread_handle.join().expect("Failed to join the thread");
}

dierbei 2023-07-14 11:56
use std::thread;
use std::time::{Duration, Instant};

fn main() {
    let thread_handle = thread::spawn(move || {
        // 线程执行体
        let start = Instant::now();
        let timeout = Duration::from_secs(10);

        while start.elapsed() < timeout {
            // 执行任务...
        }
    });

    // 设置超时时间
    thread::park_timeout(Duration::from_secs(5));

    // 终止线程
    thread_handle.thread().unpark();
    thread_handle.join().expect("Failed to join the thread");
}

1 共 13 条评论, 1 页