生产级Rust代码品鉴(一)RisingWave一条SQL到运行的流程

.markdown-bodypre,.markdown-bodypre>code.hljs{color:#333;background:#f8f8f8}.hljs-comment,.hljs-quote{color:#998;font-style:italic}.hljs-keyword,.hljs-selector-tag,.hljs-subst{color:#333;font-weight:700}.hljs-literal,.hljs-number,.hljs-tag.hljs-attr,.hljs-template-variable,.hljs-variable{color:teal}.hljs-doctag,.hljs-string{color:#d14}.hljs-section,.hljs-selector-id,.hljs-title{color:#900;font-weight:700}.hljs-subst{font-weight:400}.hljs-class.hljs-title,.hljs-type{color:#458;font-weight:700}.hljs-attribute,.hljs-name,.hljs-tag{color:navy;font-weight:400}.hljs-link,.hljs-regexp{color:#009926}.hljs-bullet,.hljs-symbol{color:#990073}.hljs-built_in,.hljs-builtin-name{color:#0086b3}.hljs-meta{color:#999;font-weight:700}.hljs-deletion{background:#fdd}.hljs-addition{background:#dfd}.hljs-emphasis{font-style:italic}.hljs-strong{font-weight:700} 1.前言

选择RisingWave是因为我本身对Flink有些了解,再加上RW的代码本身写得还不错,比较易读,因此以它来做为生产级Rust代码的学习对象。

本文基于RWv2.2.0。面向的读者主要是Rust初学者,希望通过这个系列文章可以让你快速了解生产级的Rust代码是什么样的。

2.读代码

为了方便大家理清思路,这边直接给出一个简单的代码地图。

rust体验AI代码助手复制代码--utils/pgwite/pg_server.rs --pg_serve --handle_connection--utils/pgwite/pg_protocol.rs --process --do_process#分析见2.1 --do_process_inner --process_query_msg#选择query分支 --inner_process_query_msg#SqlParser在这里做 --inner_process_query_msg_one_stmt--frontend/session.rs --run_one_query --handle--handler/mod.rs --handle#选择QueryInsertDeleteUpdate分支 --handle_query#在别的系列文章中会分析--frontend/handler/query.rs --execute --create_stream --distribute_execute--fronted/scheduler/distributed/query_manager.rs --schedule--fronted/scheduler/distributed/query.rs --start#分析见2.2 --run--fronted/scheduler/distributed/stage.rs --start --StageRunner.run#分析见2.3 --schedule_tasks_for_all --schedule_tasks --schedule_task#分析见2.4--rpc_client/compute_client.rs --create_task--prost/task_service.rs --create_task --fire_task--prost/task_execution.rs --async_execute  --run 
2.1do_pcocess:weak与pin

这个函数主要用于处理来自客户端的消息。在函数的一开始我们可以看到一些奇怪的代码。

rust体验AI代码助手复制代码asyncfndo_process(&mutself,msg:FeMessage)->Option<()>{  letspan=self.root_span_for_msg(&msg);  letweak_session=self    .session    .as_ref()    .map(|s|Arc::downgrade(s)asWeak<dynAny+Send+Sync>);  //Processingthemessageitself.  //  //Note:pinthefuturetoavoidstackoverflowaswe'llwrapitmultipletimes  //inthefollowingcode.  letfut=Box::pin(self.do_process_inner(msg));

这个代码有两个点可以拎出来说一说:

  1. Weak是干啥的
  2. Box::pin是干啥的

那为什么要从强引用变成weak_session呢?

因为上层的PgProtocol调用的这个代码,然后会有多个session进来,PgProtocol里维护了一个session:Option<Arc<SM::Session>>,每个session进来相当于会赋到这个成员上。而使用weak并不会添加Arc的引用,不过我并不觉得在这里用Arc会有什么问题,因为这个栈调用完以后,引用肯定会被释放掉。

那么Arc是啥?(AtomicReferenceCounting,原子引用计数)是标准库提供的一个智能指针类型,用于在多线程环境中安全地共享所有权。

Box::pin是干啥的?Box::pin用于将一个异步操作(future)分配到堆上并将其固定(pin),确保该future在其生命周期内不会被移动。这是为了满足某些异步操作的要求,特别是当这些操作依赖于它们的内存位置不变时。

通过Box::pin,代码可以安全地对future进行多次转换和包装,而不会导致不可预期的行为或性能问题。

2.2start

这段代码主要负责查询的启动逻辑,检查并更新查询状态,创建阶段执行计划,设置超时任务以取消查询,启动查询执行器并返回结果获取器。

rust体验AI代码助手复制代码///Startexecutionofthisquery.///Notethetwoshutdownchannelsenderandreceiversarenotdual.///Oneisusedforpropagateerrorto`QueryResultFetcher`,oneisusedforlisteningon///cancelrequest(fromctrl-c,cli,uietc).#[allow(clippy::too_many_arguments)]pubasyncfnstart(  self:Arc<Self>,  context:ExecutionContextRef,  worker_node_manager:WorkerNodeSelector,  batch_query_epoch:BatchQueryEpoch,  compute_client_pool:ComputeClientPoolRef,  catalog_reader:CatalogReader,  query_execution_info:QueryExecutionInfoRef,  query_metrics:Arc<DistributedQueryMetrics>,)->SchedulerResult<QueryResultFetcher>{  //忽略一些代码  matchcur_state{    QueryState::Pending{msg_receiver}=>{      *state=QueryState::Running;      //Startatimertocancelthequery      letmuttimeout_abort_task_handle:Option<JoinHandle<()>>=None;      ifletSome(timeout)=context.timeout(){        letthis=self.clone();        timeout_abort_task_handle=Some(tokio::spawn(asyncmove{          tokio::time::sleep(timeout).await;          warn!(            "Query{:?}timeoutafter{}seconds,sendingcancelmessage.",            this.query.query_id,            timeout.as_secs(),          );          this.abort(format!("timeoutafter{}seconds",timeout.as_secs()))            .await;        }));      }
  • QueryState::Pending{msg_receiver}=>{…}表示当cur_state是QueryState::Pending变体时,会进入这个分支。同时,它将msg_receiver字段从Pending变体中解构出来,并绑定到同名变量msg_receiver,以便在后续代码中使用。
  • //Startatimertocancelthequery是用来做超时检测,sleep几秒后开始检测,而tokio::spawn是起一个以协程为单位的异步任务。
2.3StageRunner.run

实现了查询执行的异步运行逻辑。启动查询的叶子阶段,处理消息队列中的事件,如阶段调度、根阶段完成、阶段失败和查询取消,根据事件更新查询状态并处理相应的清理工作。

rust体验AI代码助手复制代码implStageRunner{  asyncfnrun(mutself,shutdown_rx:ShutdownToken){    ifletErr(e)=self.schedule_tasks_for_all(shutdown_rx).await{      error!(        error=%e.as_report(),        query_id=?self.stage.query_id,        stage_id=?self.stage.id,        "Failedtoscheduletasks"      );      self.send_event(QueryMessage::Stage(Failed{        id:self.stage.id,        reason:e,      }))      .await;    }  }

这里的代码是rust的iflet形式,熟悉go语言的同学应该会觉得有点像。

rust体验AI代码助手复制代码ifstr,ok:=i.(string);ok{  fmt.Printf("Thestringis:%s
",str)}else{  fmt.Println("Notastring.")}
  • 调用self.schedule_tasks_for_all(shutdown_rx)并等待其完成。
  • 如果该调用返回的结果是Err(e)(即发生了错误),则将错误值绑定到变量e,并执行后续的大括号内的代码块。
  • 如果返回的是Ok,则跳过这个iflet块。
2.4schedule_task

用于调度任务到计算节点。获取或选择一个工作节点,创建计算客户端并获取任务流状态,更新任务状态,返回任务流状态。

rust体验AI代码助手复制代码asyncfnschedule_task(  &self,  task_id:PbTaskId,  plan_fragment:PlanFragment,  worker:Option<WorkerNode>,  expr_context:ExprContext,)->SchedulerResult<Fuse<Streaming<TaskInfoResponse>>>{  letmutworker=worker.unwrap_or(self.worker_node_manager.next_random_worker()?);  letworker_node_addr=worker.host.take().unwrap();  letcompute_client=self    .compute_client_pool    .get_by_addr((&worker_node_addr).into())    .await    .inspect_err(|_|self.mask_failed_serving_worker(&worker))    .map_err(|e|anyhow!(e))?;  lett_id=task_id.task_id;  letstream_status:Fuse<Streaming<TaskInfoResponse>>=compute_client    .create_task(task_id,plan_fragment,self.epoch,expr_context)    .await    .inspect_err(|_|self.mask_failed_serving_worker(&worker))    .map_err(|e|anyhow!(e))?    .fuse();  self.tasks[&t_id].inner.store(Arc::new(TaskStatus{    _task_id:t_id,    location:Some(worker_node_addr),  }));  Ok(stream_status)}

这里面出现一个有趣的结构体,叫做fuse。

fuse是Rust中流(Stream)处理的一个常用方法,通常用于确保流在完成或出错后不再产生额外的项。具体来说:

  • 当流调用fuse方法后,会返回一个新的Fuse包装器。

  • 如果流已经完成(即调用了poll_next并返回了None),那么后续的所有轮询都会立即返回None,而不会再次尝试从原始流中获取数据。

  • 这样可以避免对已完成流的重复轮询,防止不必要的计算或副作用。

简单来说,fuse方法确保流一旦结束就不会再被重新激活,从而提供更好的行为控制和性能优化。

rust体验AI代码助手复制代码pin_project!{  ///Streamforthe[`fuse`](super::StreamExt::fuse)method.  #[derive(Debug)]  #[must_use="streamsdonothingunlesspolled"]  pubstructFuse<St>{    #[pin]    stream:St,    done:bool,  }}
2.5run

异步任务的执行和状态管理。初始化任务并增加任务计数,处理数据流。发送数据块并处理发送错误。监听关闭信号并根据信号类型设置任务状态。关闭通道并通知状态变化。减少任务计数。

ini体验AI代码助手复制代码asyncfnrun(  &self,  root:BoxedExecutor,  mutsender:ChanSenderImpl,  state_tx:Option<&mutStateReporter>,){  self.context    .batch_metrics()    .as_ref()    .inspect(|m|m.batch_manager_metrics().task_num.inc());  letmutdata_chunk_stream=root.execute();  letmutstate;  letmuterror=None;  letmutshutdown_rx=self.shutdown_rx.clone();  loop{    select!{      biased;      //`shutdown_rx`can'tberemovedheretoavoid`sender.send(data_chunk)`blockedwholeexecution.      _=shutdown_rx.cancelled()=>{        matchself.shutdown_rx.message(){          ShutdownMsg::Abort(e)=>{            error=Some(BatchError::Aborted(e));            state=TaskStatus::Aborted;            break;          }          ShutdownMsg::Cancel=>{            state=TaskStatus::Cancelled;            break;          }          ShutdownMsg::Init=>{            unreachable!("Initmessageshouldnotbereceivedhere!")          }        }      }      data_chunk=data_chunk_stream.next()=>{        matchdata_chunk{          Some(Ok(data_chunk))=>{            ifletErr(e)=sender.send(data_chunk).await{              matche{                BatchError::SenderError=>{                  //Thisispossiblesincewhenwehavelimitexecutorinparent                  //stage,itmayearlystopreceivingdatafromdownstream,which                  //leadstocloseofchannel.                  warn!("Taskreceiverclosed!");                  state=TaskStatus::Finished;                  break;                }                x=>{                  error!("Failedtosenddata!");                  error=Some(x);                  state=TaskStatus::Failed;                  break;                }              }            }          }          Some(Err(e))=>matchself.shutdown_rx.message(){            ShutdownMsg::Init=>{              //Thereisnomessagereceivedfromshutdownchannel,whichmeansitcaused              //taskfailed.              error!(error=%e.as_report(),"Batchtaskfailed");              error=Some(e);                state=TaskStatus::Failed;                break;              }              ShutdownMsg::Abort(_)=>{                error=Some(e);                state=TaskStatus::Aborted;                break;              }              ShutdownMsg::Cancel=>{                state=TaskStatus::Cancelled;                break;              }            },            None=>{              debug!("Batchtask{:?}finishedsuccessfully.",self.task_id);              state=TaskStatus::Finished;              break;            }          }        }      }    }  //忽略一些代码  }

相信很多人看到select!{会很懵逼。select!是个宏,具体含义如下:

  • loop{…}创建一个无限循环,持续执行其中的代码块,直到遇到break语句。
  • select!{biased;…}是Tokio提供的一个宏,用于在多个异步操作之间进行选择。biased关键字确保优先处理第一个分支,避免其他分支抢占先机。

宏就是在编译期时展开的代码逻辑增强。写过Java的同学可以理解为AOP的AspectJ实现。

<
后端

XXL-JOB:揭秘定时机制

2025-7-10 21:46:03

后端

Spring Boot 极速解析身份证 & 营业执照

2025-7-10 21:45:46

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索