< 返回版块

kwsc98 发表于 2024-01-28 21:59

Tags:Rust,RPC,Dubbo,Volo

krpc-rust 一个最像RPC框架的Rust-RPC框架

krpc-rust是一个高性能,轻量级的rpc框架,通过使用Rust宏来解决目前主流rpc框架使用复杂,性能低等问题,不需要通过脚本和脚手架生成rpc调用代码,通过宏来进行编译期"反射"来实现高性能的调用,来满足rpc调用的简易性,同时支持服务的注册发现和断线重连等。

上篇文章我们对比了目前主流rust-rpc框架所存在的问题,比如说必须通过脚本或者脚手架先生成代码才能使用,并且也提出了使用Rust宏来实现一个更高效优雅的方案,并且实现了一个rpc框架,并进行了压力测试,事实证明了Rust宏的强大之处,本次我们继续讨论一个rpc框架还需要有那些优化。

RPC(Remote Procedure Call)框架的定义是:一种实现RPC协议的软件框架,它提供了一套工具和库,用于简化远程调用的过程。RPC框架的主要目标是隐藏底层网络通信的细节,使开发人员能够像调用本地函数一样调用远程函数,提供了一种透明的方式来实现跨网络的函数调用,使得分布式系统的开发更加简单和高效。

如果只解读这个定义的话,那实际上我们之前已经实现了一个完善的rpc框架,但其实现代定义下的rpc框架不只是简化调用这么简单,而是应该是一个具有扩展性,多组件并且具有服务注册和发现等一系列服务治理能力的框架,实际上就是我们所说的微服务框架,那么今天我们就来实现其中重要的一环,服务注册与发现。

我们直接看如果让我们的rpc框架具有服务注册与发现的能力。

快速开始

https://github.com/kwsc98/krpc-rust

Server

#[derive(Serialize, Deserialize, Default, Debug)]
struct ReqDto {
    str: String,
}
#[derive(Serialize, Deserialize, Default)]
struct ResDto {
    str: String,
}
#[derive(Clone)]
struct TestServer {
    _db: String,
}
//通过宏声明Server
krpc_server! {
   TestServer,
   //定义版本号
   "1.0.0",
   //实现rpc接口(错误响应)
   async fn do_run1(&self,res : ReqDto) -> Result<ResDto> {
      println!("{:?}" ,res);
      return Err("错误".to_string());
   }
   //实现rpc接口(正常响应)
   async fn do_run2(&self,res : ReqDto) -> Result<ResDto> {
     println!("{:?}" ,res);
     return Ok(ResDto { str : "TestServer say hello 1".to_string()});
    }
}

#[tokio::main(worker_threads = 512)]
async fn main() {
    //实例化Server
    let server: TestServer = TestServer {
        _db: "我是一个DB数据库".to_string(),
    };
    //启动rpc服务
    KrpcServer::build(
        //设置注册中心配置(地址,工作空间,注册中心类型)
        RegisterBuilder::new(
            &format!("127.0.0.1:{}", "2181"),
            "default",
            RegisterType::ZooKeeper,
        ),
        //设置服务端口
        "8081",
    )
    //注册服务
    .add_rpc_server(Box::new(server))
    .run()
    .await;
}

Client

//初始化RPC-Client
lazy_static! {
    static ref CLI: KrpcClient = KrpcClient::build(
        //设置注册中心配置(地址,工作空间,注册中心类型)
        RegisterBuilder::new(
            &format!("127.0.0.1:{}", "2181"),
            "default",
            RegisterType::ZooKeeper,
        )
    );
}
#[derive(Serialize, Deserialize, Default, Debug)]
struct ReqDto {
    str: String,
}
#[derive(Serialize, Deserialize, Default,Debug)]
struct ResDto {
    str: String,
}
struct TestServer;

//通过宏声明Client
krpc_client! {
   CLI,
   TestServer,
   "1.0.0",
   async fn do_run1(&self,res : ReqDto) -> Result<ResDto>
   async fn do_run2(&self,res : ReqDto) -> Result<ResDto> 
} 

#[tokio::main(worker_threads = 512)]
async fn main() {
    //实例化rpc接口
    let client = TestServer;
    //直接进行调用
    let res = client.do_run1(ReqDto{str : "client say hello 1".to_string()}).await;
    println!("{:?}",res);
    let res = client.do_run2(ReqDto{str : "client say hello 2".to_string()}).await;
    println!("{:?}",res);
}

我们可以看到,之前我们Server端的初始化只是指定的监听端口,而Client端只是配置了127.0.0.1:8081,由此可知我们之前是Client直连的Server进行了rpc调用。

lazy_static! {
    static ref CLI: KrpcClient = KrpcClient::build("http://127.0.0.1:8081".to_string());
}

KrpcServer::build()         
  .set_port("8081")         
  .add_rpc_server(Box::new(server))         
  .run()         
  .await;

而新版代码里Server端和Client端都配置了127.0.0.1:2181,熟悉dubbo的同学都知道这个是zookeeper的默认端口,我们这版的服务注册与发现也是通过zookeeper来实现的

//初始化RPC-Client
lazy_static! {
    static ref CLI: KrpcClient = KrpcClient::build(
        //设置注册中心配置(地址,工作空间,注册中心类型)
        RegisterBuilder::new(
            &format!("127.0.0.1:{}", "2181"),
            "default",
            RegisterType::ZooKeeper,
        )
    );
}


//启动rpc服务
KrpcServer::build(
    //设置注册中心配置(地址,工作空间,注册中心类型)
    RegisterBuilder::new(
        &format!("127.0.0.1:{}", "2181"),
         "default",
         RegisterType::ZooKeeper,
    ),
    //设置服务端口
    "8081",
)
//注册服务
.add_rpc_server(Box::new(server))
.run()
.await;  
    

并且本项目还为多注册中心做了支持,之后我计划会接入nacos或者reids作为注册中心。

//注册中心抽象接口
pub trait Register: Send + Sync {
    fn add_resource(&self, resource: Resource);
}
//注册中心类型
#[derive(Clone)]
pub enum RegisterType {
    ZooKeeper,
    Nacos,
    ...
}

同时为了实现高可用,本项目还支持了异常处理,比如说zookeeper节点下线重连,节点删除自动恢复等功能。

    tokio::spawn(async move {
        loop {
            let client = connect(&cluster, &path).await;
            match client
                .create(&node_name, node_data.as_bytes(), EPHEMERAL_OPEN)
                .await
            {
                Ok(_) => {}
                Err(_err) => {
                    tokio::time::sleep(Duration::from_secs(5)).await;
                    continue;
                }
            }
            match client.check_and_watch_stat(&node_name).await {
                Ok(watch) => {
                    let event = watch.1.changed().await;
                    info!("resource node event {:?}", event);
                }
                Err(err) => {
                    info!("resource node err {:?}", err);
                }
            };
            drop(client);
        }
    });
    
    tokio::spawn(async move {
        let mut client = connect(&cluster.clone(), &path).await;
        let map = map;
        let info = info;
        loop {
            let watcher: (Vec<String>, zk::Stat, OneshotWatcher) =
                match client.get_and_watch_children("/").await {
                    Ok(watcher) => watcher,
                    Err(_) => {
                        client = connect(&cluster.clone(), &path).await;
                        continue;
                    }
                };
            let mut server_list = vec![];
            for node in watcher.0 {
                let server_info: Vec<&str> = node.split(":").collect();
                let info = Info {
                    server_name: info.server_name.clone(),
                    version: info.version.clone(),
                    ip: server_info[0].to_string(),
                    port: Some(server_info[1].to_string()),
                };
                server_list.push(SocketInfo { info, sender: Arc::new(RwLock::new(None))});
            }
            let key = info.server_name.clone() + ":" + &info.version.clone();
            info!("update server cache {:?} : {:?}", key, server_list);
            let mut temp_map = map.write().await;
            temp_map.insert(key, server_list);
            drop(temp_map);
            let event: zk::WatchedEvent = watcher.2.changed().await;
            if let zk::EventType::NodeChildrenChanged = event.event_type {
                info!("Monitor node changes");
            } else {
                client = connect(&cluster.clone(), &path).await;
            }
        }
    });

有兴趣了解的同学可以阅读相关源码。

本项目接下来的计划还包括自定义组件支持等内容,希望大家多多点Star支持~

https://github.com/kwsc98/krpc-rust

评论区

写评论
langzi.me 2024-01-28 22:59

棒棒的

1 共 1 条评论, 1 页