选择RisingWave是因为我本身对Flink有些了解,再加上RW的代码本身写得还不错,比较易读,因此以它来做为生产级Rust代码的学习对象。
本文基于RWv2.2.0。面向的读者主要是Rust初学者,希望通过这个系列文章可以让你快速了解生产级的Rust代码是什么样的。
为了方便大家理清思路,这边直接给出一个简单的代码地图。
rust 体验AI代码助手 复制代码 --utils/pgwite/pg_server.rs --pg_serve --handle_connection --utils/pgwite/pg_protocol.rs --process --do_process#分析见 2.1 --do_process_inner --process_query_msg#选择query分支 --inner_process_query_msg#SqlParser在这里做 --inner_process_query_msg_one_stmt --frontend/session.rs --run_one_query --handle --handler/ mod.rs --handle#选择QueryInsertDeleteUpdate分支 --handle_query#在别的系列文章中会分析 --frontend/handler/query.rs --execute --create_stream --distribute_execute --fronted/scheduler/distributed/query_manager.rs --schedule --fronted/scheduler/distributed/query.rs --start#分析见 2.2 --run --fronted/scheduler/distributed/stage.rs --start --StageRunner.run#分析见 2.3 --schedule_tasks_for_all --schedule_tasks --schedule_task#分析见 2.4 --rpc_client/compute_client.rs --create_task --prost/task_service.rs --create_task --fire_task --prost/task_execution.rs --async_execute --run
这个函数主要用于处理来自客户端的消息。在函数的一开始我们可以看到一些奇怪的代码。
rust 体验AI代码助手 复制代码 async fn do_process(& mut self,msg:FeMessage) -> Option<()>{ let span= self. root_span_for_msg(&msg); let weak_session= self .session . as_ref() . map(|s|Arc:: downgrade(s) asWeak< dynAny+ Send+ Sync>); //Processingthemessageitself. // //Note:pinthefuturetoavoidstackoverflowaswe'llwrapitmultipletimes //inthefollowingcode. let fut= Box:: pin( self. do_process_inner(msg));
这个代码有两个点可以拎出来说一说:
- Weak是干啥的
- Box::pin是干啥的
那为什么要从强引用变成weak_session呢?
因为上层的PgProtocol调用的这个代码,然后会有多个session进来,PgProtocol里维护了一个session:Option<Arc<SM::Session>>
,每个session进来相当于会赋到这个成员上。而使用weak并不会添加Arc的引用,不过我并不觉得在这里用Arc会有什么问题,因为这个栈调用完以后,引用肯定会被释放掉。
那么Arc
是啥?(AtomicReferenceCounting,原子引用计数)是标准库提供的一个智能指针类型,用于在多线程环境中安全地共享所有权。
Box::pin是干啥的?Box::pin用于将一个异步操作(future)分配到堆上并将其固定(pin),确保该future在其生命周期内不会被移动。这是为了满足某些异步操作的要求,特别是当这些操作依赖于它们的内存位置不变时。
通过Box::pin,代码可以安全地对future进行多次转换和包装,而不会导致不可预期的行为或性能问题。
这段代码主要负责查询的启动逻辑,检查并更新查询状态,创建阶段执行计划,设置超时任务以取消查询,启动查询执行器并返回结果获取器。
rust 体验AI代码助手 复制代码 ///Startexecutionofthisquery. ///Notethetwoshutdownchannelsenderandreceiversarenotdual. ///Oneisusedforpropagateerrorto`QueryResultFetcher`,oneisusedforlisteningon ///cancelrequest(fromctrl-c,cli,uietc). #[allow(clippy::too_many_arguments)] pub async fn start( self:Arc< Self>, context:ExecutionContextRef, worker_node_manager:WorkerNodeSelector, batch_query_epoch:BatchQueryEpoch, compute_client_pool:ComputeClientPoolRef, catalog_reader:CatalogReader, query_execution_info:QueryExecutionInfoRef, query_metrics:Arc<DistributedQueryMetrics>, ) ->SchedulerResult<QueryResultFetcher>{ //忽略一些代码 matchcur_state{ QueryState::Pending{msg_receiver}=>{ *state=QueryState::Running; //Startatimertocancelthequery let mut timeout_abort_task_handle: Option<JoinHandle<()>>= None; if let Some(timeout)=context. timeout(){ let this= self. clone(); timeout_abort_task_handle= Some(tokio:: spawn( async move{ tokio::time:: sleep(timeout). await; warn!( "Query{:?}timeoutafter{}seconds,sendingcancelmessage.", this.query.query_id, timeout. as_secs(), ); this. abort( format!( "timeoutafter{}seconds",timeout. as_secs())) . await; })); }
- QueryState::Pending{msg_receiver}=>{…}表示当cur_state是QueryState::Pending变体时,会进入这个分支。同时,它将msg_receiver字段从Pending变体中解构出来,并绑定到同名变量msg_receiver,以便在后续代码中使用。
//Startatimertocancelthequery
是用来做超时检测,sleep几秒后开始检测,而tokio::spawn是起一个以协程为单位的异步任务。
实现了查询执行的异步运行逻辑。启动查询的叶子阶段,处理消息队列中的事件,如阶段调度、根阶段完成、阶段失败和查询取消,根据事件更新查询状态并处理相应的清理工作。
rust 体验AI代码助手 复制代码 impl StageRunner{ async fn run( mut self,shutdown_rx:ShutdownToken){ if let Err(e)= self. schedule_tasks_for_all(shutdown_rx). await{ error!( error=%e. as_report(), query_id=? self.stage.query_id, stage_id=? self.stage.id, "Failedtoscheduletasks" ); self. send_event(QueryMessage:: Stage(Failed{ id: self.stage.id, reason:e, })) . await; } }
这里的代码是rust的iflet形式,熟悉go语言的同学应该会觉得有点像。
rust 体验AI代码助手 复制代码 if str,ok:=i.(string);ok{ fmt. Printf( "Thestringis:%s ", str) } else{ fmt. Println( "Notastring.") }
- 调用self.schedule_tasks_for_all(shutdown_rx)并等待其完成。
- 如果该调用返回的结果是Err(e)(即发生了错误),则将错误值绑定到变量e,并执行后续的大括号内的代码块。
- 如果返回的是Ok,则跳过这个iflet块。
用于调度任务到计算节点。获取或选择一个工作节点,创建计算客户端并获取任务流状态,更新任务状态,返回任务流状态。
rust 体验AI代码助手 复制代码 async fn schedule_task( & self, task_id:PbTaskId, plan_fragment:PlanFragment, worker: Option<WorkerNode>, expr_context:ExprContext, ) ->SchedulerResult<Fuse<Streaming<TaskInfoResponse>>>{ let mut worker=worker. unwrap_or( self.worker_node_manager. next_random_worker()?); let worker_node_addr=worker.host. take(). unwrap(); let compute_client= self .compute_client_pool . get_by_addr((&worker_node_addr). into()) . await . inspect_err(|_| self. mask_failed_serving_worker(&worker)) . map_err(|e|anyhow!(e))?; let t_id=task_id.task_id; let stream_status:Fuse<Streaming<TaskInfoResponse>>=compute_client . create_task(task_id,plan_fragment, self.epoch,expr_context) . await . inspect_err(|_| self. mask_failed_serving_worker(&worker)) . map_err(|e|anyhow!(e))? . fuse(); self.tasks[&t_id].inner. store(Arc:: new(TaskStatus{ _task_id:t_id, location: Some(worker_node_addr), })); Ok(stream_status) }
这里面出现一个有趣的结构体,叫做fuse。
fuse是Rust中流(Stream)处理的一个常用方法,通常用于确保流在完成或出错后不再产生额外的项。具体来说:
-
当流调用fuse方法后,会返回一个新的Fuse包装器。
-
如果流已经完成(即调用了poll_next并返回了None),那么后续的所有轮询都会立即返回None,而不会再次尝试从原始流中获取数据。
-
这样可以避免对已完成流的重复轮询,防止不必要的计算或副作用。
简单来说,fuse方法确保流一旦结束就不会再被重新激活,从而提供更好的行为控制和性能优化。
rust 体验AI代码助手 复制代码 pin_project!{ ///Streamforthe[`fuse`](super::StreamExt::fuse)method. #[derive(Debug)] #[must_use= "streamsdonothingunlesspolled"] pub struct Fuse<St>{ #[pin] stream:St, done: bool, } }
异步任务的执行和状态管理。初始化任务并增加任务计数,处理数据流。发送数据块并处理发送错误。监听关闭信号并根据信号类型设置任务状态。关闭通道并通知状态变化。减少任务计数。
ini 体验AI代码助手 复制代码 asyncfnrun( &self, root:BoxedExecutor, mutsender:ChanSenderImpl, state_tx:Option<&mutStateReporter>, ){ self.context .batch_metrics() .as_ref() .inspect(|m|m.batch_manager_metrics().task_num.inc()) ; letmut data_chunk_stream=root.execute() ; letmutstate ; letmut error=None ; letmut shutdown_rx=self.shutdown_rx.clone() ; loop{ select!{ biased ; //`shutdown_rx`can'tberemovedheretoavoid`sender.send(data_chunk)`blockedwhole execution. _=shutdown_rx.cancelled()=>{ matchself.shutdown_rx.message(){ ShutdownMsg::Abort(e)=>{ error=Some(BatchError::Aborted(e)) ; state=TaskStatus::Aborted ; break ; } ShutdownMsg:: Cancel=>{ state=TaskStatus::Cancelled ; break ; } ShutdownMsg:: Init=>{ unreachable!("Initmessageshouldnotbereceivedhere!") } } } data_chunk=data_chunk_stream.next()=>{ matchdata_chunk{ Some(Ok(data_chunk))=>{ ifletErr(e)=sender.send(data_chunk).await{ matche{ BatchError:: SenderError=>{ //Thisispossiblesincewhenwehavelimitexecutorinparent //stage,itmayearlystopreceivingdatafromdownstream,which //leadstocloseofchannel. warn!("Taskreceiverclosed!") ; state=TaskStatus::Finished ; break ; } x=>{ error!("Failedtosenddata!") ; error=Some(x) ; state=TaskStatus::Failed ; break ; } } } } Some(Err(e))=>matchself.shutdown_rx.message(){ ShutdownMsg:: Init=>{ //Thereisnomessagereceivedfromshutdownchannel,whichmeansitcaused //taskfailed. error!( error=%e.as_report(), "Batchtaskfailed") ; error=Some(e) ; state=TaskStatus::Failed ; break ; } ShutdownMsg::Abort(_)=>{ error=Some(e) ; state=TaskStatus::Aborted ; break ; } ShutdownMsg:: Cancel=>{ state=TaskStatus::Cancelled ; break ; } }, None=>{ debug!("Batchtask{:?}finishedsuccessfully.",self.task_id) ; state=TaskStatus::Finished ; break ; } } } } } //忽略一些代码 }
相信很多人看到select!{
会很懵逼。select!是个宏,具体含义如下:
- loop{…}创建一个无限循环,持续执行其中的代码块,直到遇到break语句。
- select!{biased;…}是Tokio提供的一个宏,用于在多个异步操作之间进行选择。biased关键字确保优先处理第一个分支,避免其他分支抢占先机。
宏就是在编译期时展开的代码逻辑增强。写过Java的同学可以理解为AOP的AspectJ实现。