1use super::super::error;
2use crate::discover::{Change, Discover};
3use crate::load::Load;
4use crate::ready_cache::{error::Failed, ReadyCache};
5use crate::util::rng::{sample_floyd2, HasherRng, Rng};
6use futures_util::future::{self, TryFutureExt};
7use std::hash::Hash;
8use std::marker::PhantomData;
9use std::{
10 fmt,
11 pin::Pin,
12 task::{ready, Context, Poll},
13};
14use tower_service::Service;
15use tracing::{debug, trace};
16
17pub struct Balance<D, Req>
29where
30 D: Discover,
31 D::Key: Hash,
32{
33 discover: D,
34
35 services: ReadyCache<D::Key, D::Service, Req>,
36 ready_index: Option<usize>,
37
38 rng: Box<dyn Rng + Send + Sync>,
39
40 _req: PhantomData<Req>,
41}
42
43impl<D: Discover, Req> fmt::Debug for Balance<D, Req>
44where
45 D: fmt::Debug,
46 D::Key: Hash + fmt::Debug,
47 D::Service: fmt::Debug,
48{
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 f.debug_struct("Balance")
51 .field("discover", &self.discover)
52 .field("services", &self.services)
53 .finish()
54 }
55}
56
57impl<D, Req> Balance<D, Req>
58where
59 D: Discover,
60 D::Key: Hash,
61 D::Service: Service<Req>,
62 <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
63{
64 pub fn new(discover: D) -> Self {
66 Self::from_rng(discover, HasherRng::default())
67 }
68
69 pub fn from_rng<R: Rng + Send + Sync + 'static>(discover: D, rng: R) -> Self {
71 let rng = Box::new(rng);
72 Self {
73 rng,
74 discover,
75 services: ReadyCache::default(),
76 ready_index: None,
77
78 _req: PhantomData,
79 }
80 }
81
82 pub fn len(&self) -> usize {
84 self.services.len()
85 }
86
87 pub fn is_empty(&self) -> bool {
89 self.services.is_empty()
90 }
91}
92
93impl<D, Req> Balance<D, Req>
94where
95 D: Discover + Unpin,
96 D::Key: Hash + Clone,
97 D::Error: Into<crate::BoxError>,
98 D::Service: Service<Req> + Load,
99 <D::Service as Load>::Metric: std::fmt::Debug,
100 <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
101{
102 fn update_pending_from_discover(
106 &mut self,
107 cx: &mut Context<'_>,
108 ) -> Poll<Option<Result<(), error::Discover>>> {
109 debug!("updating from discover");
110 loop {
111 match ready!(Pin::new(&mut self.discover).poll_discover(cx))
112 .transpose()
113 .map_err(|e| error::Discover(e.into()))?
114 {
115 None => return Poll::Ready(None),
116 Some(Change::Remove(key)) => {
117 trace!("remove");
118 self.services.evict(&key);
119 }
120 Some(Change::Insert(key, svc)) => {
121 trace!("insert");
122 self.services.push(key, svc);
125 }
126 }
127 }
128 }
129
130 fn promote_pending_to_ready(&mut self, cx: &mut Context<'_>) {
131 loop {
132 match self.services.poll_pending(cx) {
133 Poll::Ready(Ok(())) => {
134 debug_assert_eq!(self.services.pending_len(), 0);
136 break;
137 }
138 Poll::Pending => {
139 debug_assert!(self.services.pending_len() > 0);
141 break;
142 }
143 Poll::Ready(Err(error)) => {
144 debug!(%error, "dropping failed endpoint");
147 }
148 }
149 }
150 trace!(
151 ready = %self.services.ready_len(),
152 pending = %self.services.pending_len(),
153 "poll_unready"
154 );
155 }
156
157 fn p2c_ready_index(&mut self) -> Option<usize> {
159 match self.services.ready_len() {
160 0 => None,
161 1 => Some(0),
162 len => {
163 let [aidx, bidx] = sample_floyd2(&mut self.rng, len as u64);
166 debug_assert_ne!(aidx, bidx, "random indices must be distinct");
167
168 let aload = self.ready_index_load(aidx as usize);
169 let bload = self.ready_index_load(bidx as usize);
170 let chosen = if aload <= bload { aidx } else { bidx };
171
172 trace!(
173 a.index = aidx,
174 a.load = ?aload,
175 b.index = bidx,
176 b.load = ?bload,
177 chosen = if chosen == aidx { "a" } else { "b" },
178 "p2c",
179 );
180 Some(chosen as usize)
181 }
182 }
183 }
184
185 fn ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric {
187 let (_, svc) = self.services.get_ready_index(index).expect("invalid index");
188 svc.load()
189 }
190}
191
192impl<D, Req> Service<Req> for Balance<D, Req>
193where
194 D: Discover + Unpin,
195 D::Key: Hash + Clone,
196 D::Error: Into<crate::BoxError>,
197 D::Service: Service<Req> + Load,
198 <D::Service as Load>::Metric: std::fmt::Debug,
199 <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
200{
201 type Response = <D::Service as Service<Req>>::Response;
202 type Error = crate::BoxError;
203 type Future = future::MapErr<
204 <D::Service as Service<Req>>::Future,
205 fn(<D::Service as Service<Req>>::Error) -> crate::BoxError,
206 >;
207
208 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
209 let _ = self.update_pending_from_discover(cx)?;
212 self.promote_pending_to_ready(cx);
213
214 loop {
215 if let Some(index) = self.ready_index.take() {
222 match self.services.check_ready_index(cx, index) {
223 Ok(true) => {
224 self.ready_index = Some(index);
226 return Poll::Ready(Ok(()));
227 }
228 Ok(false) => {
229 trace!("ready service became unavailable");
231 }
232 Err(Failed(_, error)) => {
233 debug!(%error, "endpoint failed");
236 }
237 }
238 }
239
240 self.ready_index = self.p2c_ready_index();
243 if self.ready_index.is_none() {
244 debug_assert_eq!(self.services.ready_len(), 0);
245 return Poll::Pending;
248 }
249 }
250 }
251
252 fn call(&mut self, request: Req) -> Self::Future {
253 let index = self.ready_index.take().expect("called before ready");
254 self.services
255 .call_ready_index(index, request)
256 .map_err(Into::into)
257 }
258}