hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::DynLocation;
21use crate::location::external_process::ExternalBincodeStream;
22use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
23use crate::networking::{NetworkFor, TCP};
24use crate::nondet::NonDet;
25#[cfg(feature = "sim")]
26use crate::sim::SimReceiver;
27use crate::staging_util::get_this_crate;
28
29// same as the one in `hydro_std`, but internal use only
30fn track_membership<'a, C, L: Location<'a> + NoTick>(
31 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
32) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
33 membership.fold(
34 q!(|| false),
35 q!(|present, event| {
36 match event {
37 MembershipEvent::Joined => *present = true,
38 MembershipEvent::Left => *present = false,
39 }
40 }),
41 )
42}
43
44fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
45 let root = get_this_crate();
46
47 if is_demux {
48 parse_quote! {
49 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
50 |(id, data)| {
51 (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
52 }
53 )
54 }
55 } else {
56 parse_quote! {
57 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
58 |data| {
59 #root::runtime_support::bincode::serialize(&data).unwrap().into()
60 }
61 )
62 }
63 }
64}
65
66pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
67 serialize_bincode_with_type(is_demux, "e_type::<T>())
68}
69
70fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
71 let root = get_this_crate();
72 if let Some(c_type) = tagged {
73 parse_quote! {
74 |res| {
75 let (id, b) = res.unwrap();
76 (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
77 }
78 }
79 } else {
80 parse_quote! {
81 |res| {
82 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
83 }
84 }
85 }
86}
87
88pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
89 deserialize_bincode_with_type(tagged, "e_type::<T>())
90}
91
92impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
93 #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
94 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
95 /// using [`bincode`] to serialize/deserialize messages.
96 ///
97 /// The returned stream captures the elements received at the destination, where values will
98 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
99 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
100 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
101 /// dropped no further messages will be sent.
102 ///
103 /// # Example
104 /// ```rust
105 /// # #[cfg(feature = "deploy")] {
106 /// # use hydro_lang::prelude::*;
107 /// # use futures::StreamExt;
108 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
109 /// let p1 = flow.process::<()>();
110 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
111 /// let p2 = flow.process::<()>();
112 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
113 /// // 1, 2, 3
114 /// # on_p2.send_bincode(&p_out)
115 /// # }, |mut stream| async move {
116 /// # for w in 1..=3 {
117 /// # assert_eq!(stream.next().await, Some(w));
118 /// # }
119 /// # }));
120 /// # }
121 /// ```
122 pub fn send_bincode<L2>(
123 self,
124 other: &Process<'a, L2>,
125 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
126 where
127 T: Serialize + DeserializeOwned,
128 {
129 self.send(other, TCP.fail_stop().bincode())
130 }
131
132 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
133 /// using the configuration in `via` to set up the message transport.
134 ///
135 /// The returned stream captures the elements received at the destination, where values will
136 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
137 /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
138 /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
139 /// dropped no further messages will be sent.
140 ///
141 /// # Example
142 /// ```rust
143 /// # #[cfg(feature = "deploy")] {
144 /// # use hydro_lang::prelude::*;
145 /// # use futures::StreamExt;
146 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
147 /// let p1 = flow.process::<()>();
148 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
149 /// let p2 = flow.process::<()>();
150 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
151 /// // 1, 2, 3
152 /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
153 /// # }, |mut stream| async move {
154 /// # for w in 1..=3 {
155 /// # assert_eq!(stream.next().await, Some(w));
156 /// # }
157 /// # }));
158 /// # }
159 /// ```
160 pub fn send<L2, N: NetworkFor<T>>(
161 self,
162 to: &Process<'a, L2>,
163 via: N,
164 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
165 where
166 T: Serialize + DeserializeOwned,
167 {
168 let serialize_pipeline = Some(N::serialize_thunk(false));
169 let deserialize_pipeline = Some(N::deserialize_thunk(None));
170
171 let name = via.name();
172 if to.multiversioned() && name.is_none() {
173 panic!(
174 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
175 );
176 }
177
178 Stream::new(
179 to.clone(),
180 HydroNode::Network {
181 name: name.map(ToOwned::to_owned),
182 serialize_fn: serialize_pipeline.map(|e| e.into()),
183 instantiate_fn: DebugInstantiate::Building,
184 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
185 input: Box::new(self.ir_node.into_inner()),
186 metadata: to.new_node_metadata(
187 Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
188 ),
189 },
190 )
191 }
192
193 #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
194 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
195 /// using [`bincode`] to serialize/deserialize messages.
196 ///
197 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
198 /// membership information. This is a common pattern in distributed systems for broadcasting data to
199 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
200 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
201 /// each element to all cluster members.
202 ///
203 /// # Non-Determinism
204 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
205 /// to the current cluster members _at that point in time_. Depending on when we are notified of
206 /// membership changes, we will broadcast each element to different members.
207 ///
208 /// # Example
209 /// ```rust
210 /// # #[cfg(feature = "deploy")] {
211 /// # use hydro_lang::prelude::*;
212 /// # use futures::StreamExt;
213 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
214 /// let p1 = flow.process::<()>();
215 /// let workers: Cluster<()> = flow.cluster::<()>();
216 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
217 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
218 /// # on_worker.send_bincode(&p2).entries()
219 /// // if there are 4 members in the cluster, each receives one element
220 /// // - MemberId::<()>(0): [123]
221 /// // - MemberId::<()>(1): [123]
222 /// // - MemberId::<()>(2): [123]
223 /// // - MemberId::<()>(3): [123]
224 /// # }, |mut stream| async move {
225 /// # let mut results = Vec::new();
226 /// # for w in 0..4 {
227 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
228 /// # }
229 /// # results.sort();
230 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
231 /// # }));
232 /// # }
233 /// ```
234 pub fn broadcast_bincode<L2: 'a>(
235 self,
236 other: &Cluster<'a, L2>,
237 nondet_membership: NonDet,
238 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
239 where
240 T: Clone + Serialize + DeserializeOwned,
241 {
242 self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
243 }
244
245 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
246 /// using the configuration in `via` to set up the message transport.
247 ///
248 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
249 /// membership information. This is a common pattern in distributed systems for broadcasting data to
250 /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
251 /// target specific members, `broadcast` takes a stream of **only data elements** and sends
252 /// each element to all cluster members.
253 ///
254 /// # Non-Determinism
255 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
256 /// to the current cluster members _at that point in time_. Depending on when we are notified of
257 /// membership changes, we will broadcast each element to different members.
258 ///
259 /// # Example
260 /// ```rust
261 /// # #[cfg(feature = "deploy")] {
262 /// # use hydro_lang::prelude::*;
263 /// # use futures::StreamExt;
264 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
265 /// let p1 = flow.process::<()>();
266 /// let workers: Cluster<()> = flow.cluster::<()>();
267 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
268 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
269 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
270 /// // if there are 4 members in the cluster, each receives one element
271 /// // - MemberId::<()>(0): [123]
272 /// // - MemberId::<()>(1): [123]
273 /// // - MemberId::<()>(2): [123]
274 /// // - MemberId::<()>(3): [123]
275 /// # }, |mut stream| async move {
276 /// # let mut results = Vec::new();
277 /// # for w in 0..4 {
278 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
279 /// # }
280 /// # results.sort();
281 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
282 /// # }));
283 /// # }
284 /// ```
285 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
286 self,
287 to: &Cluster<'a, L2>,
288 via: N,
289 nondet_membership: NonDet,
290 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
291 where
292 T: Clone + Serialize + DeserializeOwned,
293 {
294 let ids = track_membership(self.location.source_cluster_members(to));
295 sliced! {
296 let members_snapshot = use(ids, nondet_membership);
297 let elements = use(self, nondet_membership);
298
299 let current_members = members_snapshot.filter(q!(|b| *b));
300 elements.repeat_with_keys(current_members)
301 }
302 .demux(to, via)
303 }
304
305 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
306 /// serialization. The external process can receive these elements by establishing a TCP
307 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
308 ///
309 /// # Example
310 /// ```rust
311 /// # #[cfg(feature = "deploy")] {
312 /// # use hydro_lang::prelude::*;
313 /// # use futures::StreamExt;
314 /// # tokio_test::block_on(async move {
315 /// let mut flow = FlowBuilder::new();
316 /// let process = flow.process::<()>();
317 /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
318 /// let external = flow.external::<()>();
319 /// let external_handle = numbers.send_bincode_external(&external);
320 ///
321 /// let mut deployment = hydro_deploy::Deployment::new();
322 /// let nodes = flow
323 /// .with_process(&process, deployment.Localhost())
324 /// .with_external(&external, deployment.Localhost())
325 /// .deploy(&mut deployment);
326 ///
327 /// deployment.deploy().await.unwrap();
328 /// // establish the TCP connection
329 /// let mut external_recv_stream = nodes.connect(external_handle).await;
330 /// deployment.start().await.unwrap();
331 ///
332 /// for w in 1..=3 {
333 /// assert_eq!(external_recv_stream.next().await, Some(w));
334 /// }
335 /// # });
336 /// # }
337 /// ```
338 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
339 where
340 T: Serialize + DeserializeOwned,
341 {
342 let serialize_pipeline = Some(serialize_bincode::<T>(false));
343
344 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
345
346 let external_port_id = flow_state_borrow.next_external_port.get_and_increment();
347
348 flow_state_borrow.push_root(HydroRoot::SendExternal {
349 to_external_key: other.key,
350 to_port_id: external_port_id,
351 to_many: false,
352 unpaired: true,
353 serialize_fn: serialize_pipeline.map(|e| e.into()),
354 instantiate_fn: DebugInstantiate::Building,
355 input: Box::new(self.ir_node.into_inner()),
356 op_metadata: HydroIrOpMetadata::new(),
357 });
358
359 ExternalBincodeStream {
360 process_key: other.key,
361 port_id: external_port_id,
362 _phantom: PhantomData,
363 }
364 }
365
366 #[cfg(feature = "sim")]
367 /// Sets up a simulation output port for this stream, allowing test code to receive elements
368 /// sent to this stream during simulation.
369 pub fn sim_output(self) -> SimReceiver<T, O, R>
370 where
371 T: Serialize + DeserializeOwned,
372 {
373 let external_location: External<'a, ()> = External {
374 key: LocationKey::FIRST,
375 flow_state: self.location.flow_state().clone(),
376 _phantom: PhantomData,
377 };
378
379 let external = self.send_bincode_external(&external_location);
380
381 SimReceiver(external.port_id, PhantomData)
382 }
383}
384
385impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
386 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
387{
388 #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
389 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
390 /// using [`bincode`] to serialize/deserialize messages.
391 ///
392 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
393 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
394 /// this API allows precise targeting of specific cluster members rather than broadcasting to
395 /// all members.
396 ///
397 /// # Example
398 /// ```rust
399 /// # #[cfg(feature = "deploy")] {
400 /// # use hydro_lang::prelude::*;
401 /// # use futures::StreamExt;
402 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
403 /// let p1 = flow.process::<()>();
404 /// let workers: Cluster<()> = flow.cluster::<()>();
405 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
406 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
407 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
408 /// .demux_bincode(&workers);
409 /// # on_worker.send_bincode(&p2).entries()
410 /// // if there are 4 members in the cluster, each receives one element
411 /// // - MemberId::<()>(0): [0]
412 /// // - MemberId::<()>(1): [1]
413 /// // - MemberId::<()>(2): [2]
414 /// // - MemberId::<()>(3): [3]
415 /// # }, |mut stream| async move {
416 /// # let mut results = Vec::new();
417 /// # for w in 0..4 {
418 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
419 /// # }
420 /// # results.sort();
421 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
422 /// # }));
423 /// # }
424 /// ```
425 pub fn demux_bincode(
426 self,
427 other: &Cluster<'a, L2>,
428 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
429 where
430 T: Serialize + DeserializeOwned,
431 {
432 self.demux(other, TCP.fail_stop().bincode())
433 }
434
435 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
436 /// using the configuration in `via` to set up the message transport.
437 ///
438 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
439 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
440 /// this API allows precise targeting of specific cluster members rather than broadcasting to
441 /// all members.
442 ///
443 /// # Example
444 /// ```rust
445 /// # #[cfg(feature = "deploy")] {
446 /// # use hydro_lang::prelude::*;
447 /// # use futures::StreamExt;
448 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
449 /// let p1 = flow.process::<()>();
450 /// let workers: Cluster<()> = flow.cluster::<()>();
451 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
452 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
453 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
454 /// .demux(&workers, TCP.fail_stop().bincode());
455 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
456 /// // if there are 4 members in the cluster, each receives one element
457 /// // - MemberId::<()>(0): [0]
458 /// // - MemberId::<()>(1): [1]
459 /// // - MemberId::<()>(2): [2]
460 /// // - MemberId::<()>(3): [3]
461 /// # }, |mut stream| async move {
462 /// # let mut results = Vec::new();
463 /// # for w in 0..4 {
464 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
465 /// # }
466 /// # results.sort();
467 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
468 /// # }));
469 /// # }
470 /// ```
471 pub fn demux<N: NetworkFor<T>>(
472 self,
473 to: &Cluster<'a, L2>,
474 via: N,
475 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
476 where
477 T: Serialize + DeserializeOwned,
478 {
479 self.into_keyed().demux(to, via)
480 }
481}
482
483impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
484 #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
485 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
486 /// [`bincode`] to serialize/deserialize messages.
487 ///
488 /// This provides load balancing by evenly distributing work across cluster members. The
489 /// distribution is deterministic based on element order - the first element goes to member 0,
490 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
491 ///
492 /// # Non-Determinism
493 /// The set of cluster members may asynchronously change over time. Each element is distributed
494 /// based on the current cluster membership _at that point in time_. Depending on when cluster
495 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
496 /// membership is stable, the order of members in the round-robin pattern may change across runs.
497 ///
498 /// # Ordering Requirements
499 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
500 /// order of messages and retries affects the round-robin pattern.
501 ///
502 /// # Example
503 /// ```rust
504 /// # #[cfg(feature = "deploy")] {
505 /// # use hydro_lang::prelude::*;
506 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
507 /// # use futures::StreamExt;
508 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
509 /// let p1 = flow.process::<()>();
510 /// let workers: Cluster<()> = flow.cluster::<()>();
511 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
512 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
513 /// on_worker.send_bincode(&p2)
514 /// # .first().values() // we use first to assert that each member gets one element
515 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
516 /// // - MemberId::<()>(?): [1]
517 /// // - MemberId::<()>(?): [2]
518 /// // - MemberId::<()>(?): [3]
519 /// // - MemberId::<()>(?): [4]
520 /// # }, |mut stream| async move {
521 /// # let mut results = Vec::new();
522 /// # for w in 0..4 {
523 /// # results.push(stream.next().await.unwrap());
524 /// # }
525 /// # results.sort();
526 /// # assert_eq!(results, vec![1, 2, 3, 4]);
527 /// # }));
528 /// # }
529 /// ```
530 pub fn round_robin_bincode<L2: 'a>(
531 self,
532 other: &Cluster<'a, L2>,
533 nondet_membership: NonDet,
534 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
535 where
536 T: Serialize + DeserializeOwned,
537 {
538 self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
539 }
540
541 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
542 /// the configuration in `via` to set up the message transport.
543 ///
544 /// This provides load balancing by evenly distributing work across cluster members. The
545 /// distribution is deterministic based on element order - the first element goes to member 0,
546 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
547 ///
548 /// # Non-Determinism
549 /// The set of cluster members may asynchronously change over time. Each element is distributed
550 /// based on the current cluster membership _at that point in time_. Depending on when cluster
551 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
552 /// membership is stable, the order of members in the round-robin pattern may change across runs.
553 ///
554 /// # Ordering Requirements
555 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
556 /// order of messages and retries affects the round-robin pattern.
557 ///
558 /// # Example
559 /// ```rust
560 /// # #[cfg(feature = "deploy")] {
561 /// # use hydro_lang::prelude::*;
562 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
563 /// # use futures::StreamExt;
564 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
565 /// let p1 = flow.process::<()>();
566 /// let workers: Cluster<()> = flow.cluster::<()>();
567 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
568 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
569 /// on_worker.send(&p2, TCP.fail_stop().bincode())
570 /// # .first().values() // we use first to assert that each member gets one element
571 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
572 /// // - MemberId::<()>(?): [1]
573 /// // - MemberId::<()>(?): [2]
574 /// // - MemberId::<()>(?): [3]
575 /// // - MemberId::<()>(?): [4]
576 /// # }, |mut stream| async move {
577 /// # let mut results = Vec::new();
578 /// # for w in 0..4 {
579 /// # results.push(stream.next().await.unwrap());
580 /// # }
581 /// # results.sort();
582 /// # assert_eq!(results, vec![1, 2, 3, 4]);
583 /// # }));
584 /// # }
585 /// ```
586 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
587 self,
588 to: &Cluster<'a, L2>,
589 via: N,
590 nondet_membership: NonDet,
591 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
592 where
593 T: Serialize + DeserializeOwned,
594 {
595 let ids = track_membership(self.location.source_cluster_members(to));
596 sliced! {
597 let members_snapshot = use(ids, nondet_membership);
598 let elements = use(self.enumerate(), nondet_membership);
599
600 let current_members = members_snapshot
601 .filter(q!(|b| *b))
602 .keys()
603 .assume_ordering(nondet_membership)
604 .collect_vec();
605
606 elements
607 .cross_singleton(current_members)
608 .map(q!(|(data, members)| (
609 members[data.0 % members.len()].clone(),
610 data.1
611 )))
612 }
613 .demux(to, via)
614 }
615}
616
617impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
618 #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
619 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
620 /// [`bincode`] to serialize/deserialize messages.
621 ///
622 /// This provides load balancing by evenly distributing work across cluster members. The
623 /// distribution is deterministic based on element order - the first element goes to member 0,
624 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
625 ///
626 /// # Non-Determinism
627 /// The set of cluster members may asynchronously change over time. Each element is distributed
628 /// based on the current cluster membership _at that point in time_. Depending on when cluster
629 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
630 /// membership is stable, the order of members in the round-robin pattern may change across runs.
631 ///
632 /// # Ordering Requirements
633 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
634 /// order of messages and retries affects the round-robin pattern.
635 ///
636 /// # Example
637 /// ```rust
638 /// # #[cfg(feature = "deploy")] {
639 /// # use hydro_lang::prelude::*;
640 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
641 /// # use hydro_lang::location::MemberId;
642 /// # use futures::StreamExt;
643 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
644 /// let p1 = flow.process::<()>();
645 /// let workers1: Cluster<()> = flow.cluster::<()>();
646 /// let workers2: Cluster<()> = flow.cluster::<()>();
647 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
648 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
649 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
650 /// on_worker2.send_bincode(&p2)
651 /// # .entries()
652 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
653 /// # }, |mut stream| async move {
654 /// # let mut results = Vec::new();
655 /// # let mut locations = std::collections::HashSet::new();
656 /// # for w in 0..=16 {
657 /// # let (location, v) = stream.next().await.unwrap();
658 /// # locations.insert(location);
659 /// # results.push(v);
660 /// # }
661 /// # results.sort();
662 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
663 /// # assert_eq!(locations.len(), 16);
664 /// # }));
665 /// # }
666 /// ```
667 pub fn round_robin_bincode<L2: 'a>(
668 self,
669 other: &Cluster<'a, L2>,
670 nondet_membership: NonDet,
671 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
672 where
673 T: Serialize + DeserializeOwned,
674 {
675 self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
676 }
677
678 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
679 /// the configuration in `via` to set up the message transport.
680 ///
681 /// This provides load balancing by evenly distributing work across cluster members. The
682 /// distribution is deterministic based on element order - the first element goes to member 0,
683 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
684 ///
685 /// # Non-Determinism
686 /// The set of cluster members may asynchronously change over time. Each element is distributed
687 /// based on the current cluster membership _at that point in time_. Depending on when cluster
688 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
689 /// membership is stable, the order of members in the round-robin pattern may change across runs.
690 ///
691 /// # Ordering Requirements
692 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
693 /// order of messages and retries affects the round-robin pattern.
694 ///
695 /// # Example
696 /// ```rust
697 /// # #[cfg(feature = "deploy")] {
698 /// # use hydro_lang::prelude::*;
699 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
700 /// # use hydro_lang::location::MemberId;
701 /// # use futures::StreamExt;
702 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
703 /// let p1 = flow.process::<()>();
704 /// let workers1: Cluster<()> = flow.cluster::<()>();
705 /// let workers2: Cluster<()> = flow.cluster::<()>();
706 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
707 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
708 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
709 /// on_worker2.send(&p2, TCP.fail_stop().bincode())
710 /// # .entries()
711 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
712 /// # }, |mut stream| async move {
713 /// # let mut results = Vec::new();
714 /// # let mut locations = std::collections::HashSet::new();
715 /// # for w in 0..=16 {
716 /// # let (location, v) = stream.next().await.unwrap();
717 /// # locations.insert(location);
718 /// # results.push(v);
719 /// # }
720 /// # results.sort();
721 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
722 /// # assert_eq!(locations.len(), 16);
723 /// # }));
724 /// # }
725 /// ```
726 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
727 self,
728 to: &Cluster<'a, L2>,
729 via: N,
730 nondet_membership: NonDet,
731 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
732 where
733 T: Serialize + DeserializeOwned,
734 {
735 let ids = track_membership(self.location.source_cluster_members(to));
736 sliced! {
737 let members_snapshot = use(ids, nondet_membership);
738 let elements = use(self.enumerate(), nondet_membership);
739
740 let current_members = members_snapshot
741 .filter(q!(|b| *b))
742 .keys()
743 .assume_ordering(nondet_membership)
744 .collect_vec();
745
746 elements
747 .cross_singleton(current_members)
748 .map(q!(|(data, members)| (
749 members[data.0 % members.len()].clone(),
750 data.1
751 )))
752 }
753 .demux(to, via)
754 }
755}
756
757impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
758 #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
759 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
760 /// using [`bincode`] to serialize/deserialize messages.
761 ///
762 /// Each cluster member sends its local stream elements, and they are collected at the destination
763 /// as a [`KeyedStream`] where keys identify the source cluster member.
764 ///
765 /// # Example
766 /// ```rust
767 /// # #[cfg(feature = "deploy")] {
768 /// # use hydro_lang::prelude::*;
769 /// # use futures::StreamExt;
770 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
771 /// let workers: Cluster<()> = flow.cluster::<()>();
772 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
773 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
774 /// # all_received.entries()
775 /// # }, |mut stream| async move {
776 /// // if there are 4 members in the cluster, we should receive 4 elements
777 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
778 /// # let mut results = Vec::new();
779 /// # for w in 0..4 {
780 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
781 /// # }
782 /// # results.sort();
783 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
784 /// # }));
785 /// # }
786 /// ```
787 ///
788 /// If you don't need to know the source for each element, you can use `.values()`
789 /// to get just the data:
790 /// ```rust
791 /// # #[cfg(feature = "deploy")] {
792 /// # use hydro_lang::prelude::*;
793 /// # use hydro_lang::live_collections::stream::NoOrder;
794 /// # use futures::StreamExt;
795 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
796 /// # let workers: Cluster<()> = flow.cluster::<()>();
797 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
798 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
799 /// # values
800 /// # }, |mut stream| async move {
801 /// # let mut results = Vec::new();
802 /// # for w in 0..4 {
803 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
804 /// # }
805 /// # results.sort();
806 /// // if there are 4 members in the cluster, we should receive 4 elements
807 /// // 1, 1, 1, 1
808 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
809 /// # }));
810 /// # }
811 /// ```
812 pub fn send_bincode<L2>(
813 self,
814 other: &Process<'a, L2>,
815 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
816 where
817 T: Serialize + DeserializeOwned,
818 {
819 self.send(other, TCP.fail_stop().bincode())
820 }
821
822 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
823 /// using the configuration in `via` to set up the message transport.
824 ///
825 /// Each cluster member sends its local stream elements, and they are collected at the destination
826 /// as a [`KeyedStream`] where keys identify the source cluster member.
827 ///
828 /// # Example
829 /// ```rust
830 /// # #[cfg(feature = "deploy")] {
831 /// # use hydro_lang::prelude::*;
832 /// # use futures::StreamExt;
833 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
834 /// let workers: Cluster<()> = flow.cluster::<()>();
835 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
836 /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
837 /// # all_received.entries()
838 /// # }, |mut stream| async move {
839 /// // if there are 4 members in the cluster, we should receive 4 elements
840 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
841 /// # let mut results = Vec::new();
842 /// # for w in 0..4 {
843 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
844 /// # }
845 /// # results.sort();
846 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
847 /// # }));
848 /// # }
849 /// ```
850 ///
851 /// If you don't need to know the source for each element, you can use `.values()`
852 /// to get just the data:
853 /// ```rust
854 /// # #[cfg(feature = "deploy")] {
855 /// # use hydro_lang::prelude::*;
856 /// # use hydro_lang::live_collections::stream::NoOrder;
857 /// # use futures::StreamExt;
858 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
859 /// # let workers: Cluster<()> = flow.cluster::<()>();
860 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
861 /// let values: Stream<i32, _, _, NoOrder> =
862 /// numbers.send(&process, TCP.fail_stop().bincode()).values();
863 /// # values
864 /// # }, |mut stream| async move {
865 /// # let mut results = Vec::new();
866 /// # for w in 0..4 {
867 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
868 /// # }
869 /// # results.sort();
870 /// // if there are 4 members in the cluster, we should receive 4 elements
871 /// // 1, 1, 1, 1
872 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
873 /// # }));
874 /// # }
875 /// ```
876 pub fn send<L2, N: NetworkFor<T>>(
877 self,
878 to: &Process<'a, L2>,
879 via: N,
880 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
881 where
882 T: Serialize + DeserializeOwned,
883 {
884 let serialize_pipeline = Some(N::serialize_thunk(false));
885
886 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
887
888 let name = via.name();
889 if to.multiversioned() && name.is_none() {
890 panic!(
891 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
892 );
893 }
894
895 let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
896 to.clone(),
897 HydroNode::Network {
898 name: name.map(ToOwned::to_owned),
899 serialize_fn: serialize_pipeline.map(|e| e.into()),
900 instantiate_fn: DebugInstantiate::Building,
901 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
902 input: Box::new(self.ir_node.into_inner()),
903 metadata: to.new_node_metadata(Stream::<
904 (MemberId<L>, T),
905 Process<'a, L2>,
906 Unbounded,
907 O,
908 R,
909 >::collection_kind()),
910 },
911 );
912
913 raw_stream.into_keyed()
914 }
915
916 #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
917 /// Broadcasts elements of this stream at each source member to all members of a destination
918 /// cluster, using [`bincode`] to serialize/deserialize messages.
919 ///
920 /// Each source member sends each of its stream elements to **every** member of the cluster
921 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
922 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
923 /// **only data elements** and sends each element to all cluster members.
924 ///
925 /// # Non-Determinism
926 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
927 /// to the current cluster members known _at that point in time_ at the source member. Depending
928 /// on when each source member is notified of membership changes, it will broadcast each element
929 /// to different members.
930 ///
931 /// # Example
932 /// ```rust
933 /// # #[cfg(feature = "deploy")] {
934 /// # use hydro_lang::prelude::*;
935 /// # use hydro_lang::location::MemberId;
936 /// # use futures::StreamExt;
937 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
938 /// # type Source = ();
939 /// # type Destination = ();
940 /// let source: Cluster<Source> = flow.cluster::<Source>();
941 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
942 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
943 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
944 /// # on_destination.entries().send_bincode(&p2).entries()
945 /// // if there are 4 members in the desination, each receives one element from each source member
946 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
947 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
948 /// // - ...
949 /// # }, |mut stream| async move {
950 /// # let mut results = Vec::new();
951 /// # for w in 0..16 {
952 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
953 /// # }
954 /// # results.sort();
955 /// # assert_eq!(results, vec![
956 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
957 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
958 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
959 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
960 /// # ]);
961 /// # }));
962 /// # }
963 /// ```
964 pub fn broadcast_bincode<L2: 'a>(
965 self,
966 other: &Cluster<'a, L2>,
967 nondet_membership: NonDet,
968 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
969 where
970 T: Clone + Serialize + DeserializeOwned,
971 {
972 self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
973 }
974
975 /// Broadcasts elements of this stream at each source member to all members of a destination
976 /// cluster, using the configuration in `via` to set up the message transport.
977 ///
978 /// Each source member sends each of its stream elements to **every** member of the cluster
979 /// based on its latest membership information. Unlike [`Stream::demux`], which requires
980 /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
981 /// **only data elements** and sends each element to all cluster members.
982 ///
983 /// # Non-Determinism
984 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
985 /// to the current cluster members known _at that point in time_ at the source member. Depending
986 /// on when each source member is notified of membership changes, it will broadcast each element
987 /// to different members.
988 ///
989 /// # Example
990 /// ```rust
991 /// # #[cfg(feature = "deploy")] {
992 /// # use hydro_lang::prelude::*;
993 /// # use hydro_lang::location::MemberId;
994 /// # use futures::StreamExt;
995 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
996 /// # type Source = ();
997 /// # type Destination = ();
998 /// let source: Cluster<Source> = flow.cluster::<Source>();
999 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1000 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1001 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1002 /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1003 /// // if there are 4 members in the desination, each receives one element from each source member
1004 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1005 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1006 /// // - ...
1007 /// # }, |mut stream| async move {
1008 /// # let mut results = Vec::new();
1009 /// # for w in 0..16 {
1010 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1011 /// # }
1012 /// # results.sort();
1013 /// # assert_eq!(results, vec![
1014 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1015 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1016 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1017 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1018 /// # ]);
1019 /// # }));
1020 /// # }
1021 /// ```
1022 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1023 self,
1024 to: &Cluster<'a, L2>,
1025 via: N,
1026 nondet_membership: NonDet,
1027 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1028 where
1029 T: Clone + Serialize + DeserializeOwned,
1030 {
1031 let ids = track_membership(self.location.source_cluster_members(to));
1032 sliced! {
1033 let members_snapshot = use(ids, nondet_membership);
1034 let elements = use(self, nondet_membership);
1035
1036 let current_members = members_snapshot.filter(q!(|b| *b));
1037 elements.repeat_with_keys(current_members)
1038 }
1039 .demux(to, via)
1040 }
1041}
1042
1043impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1044 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1045{
1046 #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1047 /// Sends elements of this stream at each source member to specific members of a destination
1048 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1049 ///
1050 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1051 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1052 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1053 /// all members.
1054 ///
1055 /// Each cluster member sends its local stream elements, and they are collected at each
1056 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1057 ///
1058 /// # Example
1059 /// ```rust
1060 /// # #[cfg(feature = "deploy")] {
1061 /// # use hydro_lang::prelude::*;
1062 /// # use futures::StreamExt;
1063 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1064 /// # type Source = ();
1065 /// # type Destination = ();
1066 /// let source: Cluster<Source> = flow.cluster::<Source>();
1067 /// let to_send: Stream<_, Cluster<_>, _> = source
1068 /// .source_iter(q!(vec![0, 1, 2, 3]))
1069 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1070 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1071 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1072 /// # all_received.entries().send_bincode(&p2).entries()
1073 /// # }, |mut stream| async move {
1074 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1075 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1076 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1077 /// // - ...
1078 /// # let mut results = Vec::new();
1079 /// # for w in 0..16 {
1080 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1081 /// # }
1082 /// # results.sort();
1083 /// # assert_eq!(results, vec![
1084 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1085 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1086 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1087 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1088 /// # ]);
1089 /// # }));
1090 /// # }
1091 /// ```
1092 pub fn demux_bincode(
1093 self,
1094 other: &Cluster<'a, L2>,
1095 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1096 where
1097 T: Serialize + DeserializeOwned,
1098 {
1099 self.demux(other, TCP.fail_stop().bincode())
1100 }
1101
1102 /// Sends elements of this stream at each source member to specific members of a destination
1103 /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1104 /// message transport.
1105 ///
1106 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1107 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1108 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1109 /// all members.
1110 ///
1111 /// Each cluster member sends its local stream elements, and they are collected at each
1112 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1113 ///
1114 /// # Example
1115 /// ```rust
1116 /// # #[cfg(feature = "deploy")] {
1117 /// # use hydro_lang::prelude::*;
1118 /// # use futures::StreamExt;
1119 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1120 /// # type Source = ();
1121 /// # type Destination = ();
1122 /// let source: Cluster<Source> = flow.cluster::<Source>();
1123 /// let to_send: Stream<_, Cluster<_>, _> = source
1124 /// .source_iter(q!(vec![0, 1, 2, 3]))
1125 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1126 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1127 /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1128 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1129 /// # }, |mut stream| async move {
1130 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1131 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1132 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1133 /// // - ...
1134 /// # let mut results = Vec::new();
1135 /// # for w in 0..16 {
1136 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1137 /// # }
1138 /// # results.sort();
1139 /// # assert_eq!(results, vec![
1140 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1141 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1142 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1143 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1144 /// # ]);
1145 /// # }));
1146 /// # }
1147 /// ```
1148 pub fn demux<N: NetworkFor<T>>(
1149 self,
1150 to: &Cluster<'a, L2>,
1151 via: N,
1152 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1153 where
1154 T: Serialize + DeserializeOwned,
1155 {
1156 self.into_keyed().demux(to, via)
1157 }
1158}
1159
1160#[cfg(test)]
1161mod tests {
1162 #[cfg(feature = "sim")]
1163 use stageleft::q;
1164
1165 #[cfg(feature = "sim")]
1166 use crate::location::{Location, MemberId};
1167 #[cfg(feature = "sim")]
1168 use crate::networking::TCP;
1169 #[cfg(feature = "sim")]
1170 use crate::nondet::nondet;
1171 #[cfg(feature = "sim")]
1172 use crate::prelude::FlowBuilder;
1173
1174 #[cfg(feature = "sim")]
1175 #[test]
1176 fn sim_send_bincode_o2o() {
1177 use crate::networking::TCP;
1178
1179 let mut flow = FlowBuilder::new();
1180 let node = flow.process::<()>();
1181 let node2 = flow.process::<()>();
1182
1183 let (in_send, input) = node.sim_input();
1184
1185 let out_recv = input
1186 .send(&node2, TCP.fail_stop().bincode())
1187 .batch(&node2.tick(), nondet!(/** test */))
1188 .count()
1189 .all_ticks()
1190 .sim_output();
1191
1192 let instances = flow.sim().exhaustive(async || {
1193 in_send.send(());
1194 in_send.send(());
1195 in_send.send(());
1196
1197 let received = out_recv.collect::<Vec<_>>().await;
1198 assert!(received.into_iter().sum::<usize>() == 3);
1199 });
1200
1201 assert_eq!(instances, 4); // 2^{3 - 1}
1202 }
1203
1204 #[cfg(feature = "sim")]
1205 #[test]
1206 fn sim_send_bincode_m2o() {
1207 let mut flow = FlowBuilder::new();
1208 let cluster = flow.cluster::<()>();
1209 let node = flow.process::<()>();
1210
1211 let input = cluster.source_iter(q!(vec![1]));
1212
1213 let out_recv = input
1214 .send(&node, TCP.fail_stop().bincode())
1215 .entries()
1216 .batch(&node.tick(), nondet!(/** test */))
1217 .all_ticks()
1218 .sim_output();
1219
1220 let instances = flow
1221 .sim()
1222 .with_cluster_size(&cluster, 4)
1223 .exhaustive(async || {
1224 out_recv
1225 .assert_yields_only_unordered(vec![
1226 (MemberId::from_raw_id(0), 1),
1227 (MemberId::from_raw_id(1), 1),
1228 (MemberId::from_raw_id(2), 1),
1229 (MemberId::from_raw_id(3), 1),
1230 ])
1231 .await
1232 });
1233
1234 assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1235 }
1236
1237 #[cfg(feature = "sim")]
1238 #[test]
1239 fn sim_send_bincode_multiple_m2o() {
1240 let mut flow = FlowBuilder::new();
1241 let cluster1 = flow.cluster::<()>();
1242 let cluster2 = flow.cluster::<()>();
1243 let node = flow.process::<()>();
1244
1245 let out_recv_1 = cluster1
1246 .source_iter(q!(vec![1]))
1247 .send(&node, TCP.fail_stop().bincode())
1248 .entries()
1249 .sim_output();
1250
1251 let out_recv_2 = cluster2
1252 .source_iter(q!(vec![2]))
1253 .send(&node, TCP.fail_stop().bincode())
1254 .entries()
1255 .sim_output();
1256
1257 let instances = flow
1258 .sim()
1259 .with_cluster_size(&cluster1, 3)
1260 .with_cluster_size(&cluster2, 4)
1261 .exhaustive(async || {
1262 out_recv_1
1263 .assert_yields_only_unordered(vec![
1264 (MemberId::from_raw_id(0), 1),
1265 (MemberId::from_raw_id(1), 1),
1266 (MemberId::from_raw_id(2), 1),
1267 ])
1268 .await;
1269
1270 out_recv_2
1271 .assert_yields_only_unordered(vec![
1272 (MemberId::from_raw_id(0), 2),
1273 (MemberId::from_raw_id(1), 2),
1274 (MemberId::from_raw_id(2), 2),
1275 (MemberId::from_raw_id(3), 2),
1276 ])
1277 .await;
1278 });
1279
1280 assert_eq!(instances, 1);
1281 }
1282
1283 #[cfg(feature = "sim")]
1284 #[test]
1285 fn sim_send_bincode_o2m() {
1286 let mut flow = FlowBuilder::new();
1287 let cluster = flow.cluster::<()>();
1288 let node = flow.process::<()>();
1289
1290 let input = node.source_iter(q!(vec![
1291 (MemberId::from_raw_id(0), 123),
1292 (MemberId::from_raw_id(1), 456),
1293 ]));
1294
1295 let out_recv = input
1296 .demux(&cluster, TCP.fail_stop().bincode())
1297 .map(q!(|x| x + 1))
1298 .send(&node, TCP.fail_stop().bincode())
1299 .entries()
1300 .sim_output();
1301
1302 flow.sim()
1303 .with_cluster_size(&cluster, 4)
1304 .exhaustive(async || {
1305 out_recv
1306 .assert_yields_only_unordered(vec![
1307 (MemberId::from_raw_id(0), 124),
1308 (MemberId::from_raw_id(1), 457),
1309 ])
1310 .await
1311 });
1312 }
1313
1314 #[cfg(feature = "sim")]
1315 #[test]
1316 fn sim_broadcast_bincode_o2m() {
1317 let mut flow = FlowBuilder::new();
1318 let cluster = flow.cluster::<()>();
1319 let node = flow.process::<()>();
1320
1321 let input = node.source_iter(q!(vec![123, 456]));
1322
1323 let out_recv = input
1324 .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1325 .map(q!(|x| x + 1))
1326 .send(&node, TCP.fail_stop().bincode())
1327 .entries()
1328 .sim_output();
1329
1330 let mut c_1_produced = false;
1331 let mut c_2_produced = false;
1332
1333 flow.sim()
1334 .with_cluster_size(&cluster, 2)
1335 .exhaustive(async || {
1336 let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1337
1338 // check that order is preserved
1339 if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1340 assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1341 c_1_produced = true;
1342 }
1343
1344 if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1345 assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1346 c_2_produced = true;
1347 }
1348 });
1349
1350 assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1351 }
1352
1353 #[cfg(feature = "sim")]
1354 #[test]
1355 fn sim_send_bincode_m2m() {
1356 let mut flow = FlowBuilder::new();
1357 let cluster = flow.cluster::<()>();
1358 let node = flow.process::<()>();
1359
1360 let input = node.source_iter(q!(vec![
1361 (MemberId::from_raw_id(0), 123),
1362 (MemberId::from_raw_id(1), 456),
1363 ]));
1364
1365 let out_recv = input
1366 .demux(&cluster, TCP.fail_stop().bincode())
1367 .map(q!(|x| x + 1))
1368 .flat_map_ordered(q!(|x| vec![
1369 (MemberId::from_raw_id(0), x),
1370 (MemberId::from_raw_id(1), x),
1371 ]))
1372 .demux(&cluster, TCP.fail_stop().bincode())
1373 .entries()
1374 .send(&node, TCP.fail_stop().bincode())
1375 .entries()
1376 .sim_output();
1377
1378 flow.sim()
1379 .with_cluster_size(&cluster, 4)
1380 .exhaustive(async || {
1381 out_recv
1382 .assert_yields_only_unordered(vec![
1383 (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1384 (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1385 (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1386 (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1387 ])
1388 .await
1389 });
1390 }
1391}