1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4use std::fmt::{Debug, Display};
5use std::hash::{Hash, Hasher};
6use std::ops::Deref;
7use std::rc::Rc;
8
9#[cfg(feature = "build")]
10use dfir_lang::graph::FlatGraphBuilder;
11#[cfg(feature = "build")]
12use proc_macro2::Span;
13use proc_macro2::TokenStream;
14use quote::ToTokens;
15#[cfg(feature = "build")]
16use quote::quote;
17#[cfg(feature = "build")]
18use slotmap::{SecondaryMap, SparseSecondaryMap};
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24use crate::compile::builder::ExternalPortId;
25#[cfg(feature = "build")]
26use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
27use crate::location::dynamic::LocationId;
28use crate::location::{LocationKey, NetworkHint};
29
30pub mod backtrace;
31use backtrace::Backtrace;
32
33#[derive(Clone, Hash)]
37pub struct DebugExpr(pub Box<syn::Expr>);
38
39impl From<syn::Expr> for DebugExpr {
40 fn from(expr: syn::Expr) -> Self {
41 Self(Box::new(expr))
42 }
43}
44
45impl Deref for DebugExpr {
46 type Target = syn::Expr;
47
48 fn deref(&self) -> &Self::Target {
49 &self.0
50 }
51}
52
53impl ToTokens for DebugExpr {
54 fn to_tokens(&self, tokens: &mut TokenStream) {
55 self.0.to_tokens(tokens);
56 }
57}
58
59impl Debug for DebugExpr {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 write!(f, "{}", self.0.to_token_stream())
62 }
63}
64
65impl Display for DebugExpr {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 let original = self.0.as_ref().clone();
68 let simplified = simplify_q_macro(original);
69
70 write!(f, "q!({})", quote::quote!(#simplified))
73 }
74}
75
76fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
78 let mut simplifier = QMacroSimplifier::new();
81 simplifier.visit_expr_mut(&mut expr);
82
83 if let Some(simplified) = simplifier.simplified_result {
85 simplified
86 } else {
87 expr
88 }
89}
90
91#[derive(Default)]
93pub struct QMacroSimplifier {
94 pub simplified_result: Option<syn::Expr>,
95}
96
97impl QMacroSimplifier {
98 pub fn new() -> Self {
99 Self::default()
100 }
101}
102
103impl VisitMut for QMacroSimplifier {
104 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
105 if self.simplified_result.is_some() {
107 return;
108 }
109
110 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
111 && self.is_stageleft_runtime_support_call(&path_expr.path)
113 && let Some(closure) = self.extract_closure_from_args(&call.args)
115 {
116 self.simplified_result = Some(closure);
117 return;
118 }
119
120 syn::visit_mut::visit_expr_mut(self, expr);
123 }
124}
125
126impl QMacroSimplifier {
127 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
128 if let Some(last_segment) = path.segments.last() {
130 let fn_name = last_segment.ident.to_string();
131 fn_name.contains("_type_hint")
133 && path.segments.len() > 2
134 && path.segments[0].ident == "stageleft"
135 && path.segments[1].ident == "runtime_support"
136 } else {
137 false
138 }
139 }
140
141 fn extract_closure_from_args(
142 &self,
143 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
144 ) -> Option<syn::Expr> {
145 for arg in args {
147 if let syn::Expr::Closure(_) = arg {
148 return Some(arg.clone());
149 }
150 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
152 return Some(closure_expr);
153 }
154 }
155 None
156 }
157
158 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
159 let mut visitor = ClosureFinder {
160 found_closure: None,
161 prefer_inner_blocks: true,
162 };
163 visitor.visit_expr(expr);
164 visitor.found_closure
165 }
166}
167
168struct ClosureFinder {
170 found_closure: Option<syn::Expr>,
171 prefer_inner_blocks: bool,
172}
173
174impl<'ast> Visit<'ast> for ClosureFinder {
175 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
176 if self.found_closure.is_some() {
178 return;
179 }
180
181 match expr {
182 syn::Expr::Closure(_) => {
183 self.found_closure = Some(expr.clone());
184 }
185 syn::Expr::Block(block) if self.prefer_inner_blocks => {
186 for stmt in &block.block.stmts {
188 if let syn::Stmt::Expr(stmt_expr, _) = stmt
189 && let syn::Expr::Block(_) = stmt_expr
190 {
191 let mut inner_visitor = ClosureFinder {
193 found_closure: None,
194 prefer_inner_blocks: false, };
196 inner_visitor.visit_expr(stmt_expr);
197 if inner_visitor.found_closure.is_some() {
198 self.found_closure = Some(stmt_expr.clone());
200 return;
201 }
202 }
203 }
204
205 visit::visit_expr(self, expr);
207
208 if self.found_closure.is_some() {
211 }
213 }
214 _ => {
215 visit::visit_expr(self, expr);
217 }
218 }
219 }
220}
221
222#[derive(Clone, PartialEq, Eq, Hash)]
226pub struct DebugType(pub Box<syn::Type>);
227
228impl From<syn::Type> for DebugType {
229 fn from(t: syn::Type) -> Self {
230 Self(Box::new(t))
231 }
232}
233
234impl Deref for DebugType {
235 type Target = syn::Type;
236
237 fn deref(&self) -> &Self::Target {
238 &self.0
239 }
240}
241
242impl ToTokens for DebugType {
243 fn to_tokens(&self, tokens: &mut TokenStream) {
244 self.0.to_tokens(tokens);
245 }
246}
247
248impl Debug for DebugType {
249 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250 write!(f, "{}", self.0.to_token_stream())
251 }
252}
253
254pub enum DebugInstantiate {
255 Building,
256 Finalized(Box<DebugInstantiateFinalized>),
257}
258
259#[cfg_attr(
260 not(feature = "build"),
261 expect(
262 dead_code,
263 reason = "sink, source unused without `feature = \"build\"`."
264 )
265)]
266pub struct DebugInstantiateFinalized {
267 sink: syn::Expr,
268 source: syn::Expr,
269 connect_fn: Option<Box<dyn FnOnce()>>,
270}
271
272impl From<DebugInstantiateFinalized> for DebugInstantiate {
273 fn from(f: DebugInstantiateFinalized) -> Self {
274 Self::Finalized(Box::new(f))
275 }
276}
277
278impl Debug for DebugInstantiate {
279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280 write!(f, "<network instantiate>")
281 }
282}
283
284impl Hash for DebugInstantiate {
285 fn hash<H: Hasher>(&self, _state: &mut H) {
286 }
288}
289
290impl Clone for DebugInstantiate {
291 fn clone(&self) -> Self {
292 match self {
293 DebugInstantiate::Building => DebugInstantiate::Building,
294 DebugInstantiate::Finalized(_) => {
295 panic!("DebugInstantiate::Finalized should not be cloned")
296 }
297 }
298 }
299}
300
301#[derive(Debug, Hash, Clone)]
303pub enum HydroSource {
304 Stream(DebugExpr),
305 ExternalNetwork(),
306 Iter(DebugExpr),
307 Spin(),
308 ClusterMembers(LocationId),
309}
310
311#[cfg(feature = "build")]
312pub trait DfirBuilder {
318 fn singleton_intermediates(&self) -> bool;
320
321 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
323
324 fn batch(
325 &mut self,
326 in_ident: syn::Ident,
327 in_location: &LocationId,
328 in_kind: &CollectionKind,
329 out_ident: &syn::Ident,
330 out_location: &LocationId,
331 op_meta: &HydroIrOpMetadata,
332 );
333 fn yield_from_tick(
334 &mut self,
335 in_ident: syn::Ident,
336 in_location: &LocationId,
337 in_kind: &CollectionKind,
338 out_ident: &syn::Ident,
339 out_location: &LocationId,
340 );
341
342 fn begin_atomic(
343 &mut self,
344 in_ident: syn::Ident,
345 in_location: &LocationId,
346 in_kind: &CollectionKind,
347 out_ident: &syn::Ident,
348 out_location: &LocationId,
349 op_meta: &HydroIrOpMetadata,
350 );
351 fn end_atomic(
352 &mut self,
353 in_ident: syn::Ident,
354 in_location: &LocationId,
355 in_kind: &CollectionKind,
356 out_ident: &syn::Ident,
357 );
358
359 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
360 fn observe_nondet(
361 &mut self,
362 trusted: bool,
363 location: &LocationId,
364 in_ident: syn::Ident,
365 in_kind: &CollectionKind,
366 out_ident: &syn::Ident,
367 out_kind: &CollectionKind,
368 op_meta: &HydroIrOpMetadata,
369 );
370
371 #[expect(clippy::too_many_arguments, reason = "TODO")]
372 fn create_network(
373 &mut self,
374 from: &LocationId,
375 to: &LocationId,
376 input_ident: syn::Ident,
377 out_ident: &syn::Ident,
378 serialize: Option<&DebugExpr>,
379 sink: syn::Expr,
380 source: syn::Expr,
381 deserialize: Option<&DebugExpr>,
382 tag_id: usize,
383 );
384
385 fn create_external_source(
386 &mut self,
387 on: &LocationId,
388 source_expr: syn::Expr,
389 out_ident: &syn::Ident,
390 deserialize: Option<&DebugExpr>,
391 tag_id: usize,
392 );
393
394 fn create_external_output(
395 &mut self,
396 on: &LocationId,
397 sink_expr: syn::Expr,
398 input_ident: &syn::Ident,
399 serialize: Option<&DebugExpr>,
400 tag_id: usize,
401 );
402}
403
404#[cfg(feature = "build")]
405impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
406 fn singleton_intermediates(&self) -> bool {
407 false
408 }
409
410 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
411 self.entry(location.root().key())
412 .expect("location was removed")
413 .or_default()
414 }
415
416 fn batch(
417 &mut self,
418 in_ident: syn::Ident,
419 in_location: &LocationId,
420 in_kind: &CollectionKind,
421 out_ident: &syn::Ident,
422 _out_location: &LocationId,
423 _op_meta: &HydroIrOpMetadata,
424 ) {
425 let builder = self.get_dfir_mut(in_location.root());
426 if in_kind.is_bounded()
427 && matches!(
428 in_kind,
429 CollectionKind::Singleton { .. }
430 | CollectionKind::Optional { .. }
431 | CollectionKind::KeyedSingleton { .. }
432 )
433 {
434 assert!(in_location.is_top_level());
435 builder.add_dfir(
436 parse_quote! {
437 #out_ident = #in_ident -> persist::<'static>();
438 },
439 None,
440 None,
441 );
442 } else {
443 builder.add_dfir(
444 parse_quote! {
445 #out_ident = #in_ident;
446 },
447 None,
448 None,
449 );
450 }
451 }
452
453 fn yield_from_tick(
454 &mut self,
455 in_ident: syn::Ident,
456 in_location: &LocationId,
457 _in_kind: &CollectionKind,
458 out_ident: &syn::Ident,
459 _out_location: &LocationId,
460 ) {
461 let builder = self.get_dfir_mut(in_location.root());
462 builder.add_dfir(
463 parse_quote! {
464 #out_ident = #in_ident;
465 },
466 None,
467 None,
468 );
469 }
470
471 fn begin_atomic(
472 &mut self,
473 in_ident: syn::Ident,
474 in_location: &LocationId,
475 _in_kind: &CollectionKind,
476 out_ident: &syn::Ident,
477 _out_location: &LocationId,
478 _op_meta: &HydroIrOpMetadata,
479 ) {
480 let builder = self.get_dfir_mut(in_location.root());
481 builder.add_dfir(
482 parse_quote! {
483 #out_ident = #in_ident;
484 },
485 None,
486 None,
487 );
488 }
489
490 fn end_atomic(
491 &mut self,
492 in_ident: syn::Ident,
493 in_location: &LocationId,
494 _in_kind: &CollectionKind,
495 out_ident: &syn::Ident,
496 ) {
497 let builder = self.get_dfir_mut(in_location.root());
498 builder.add_dfir(
499 parse_quote! {
500 #out_ident = #in_ident;
501 },
502 None,
503 None,
504 );
505 }
506
507 fn observe_nondet(
508 &mut self,
509 _trusted: bool,
510 location: &LocationId,
511 in_ident: syn::Ident,
512 _in_kind: &CollectionKind,
513 out_ident: &syn::Ident,
514 _out_kind: &CollectionKind,
515 _op_meta: &HydroIrOpMetadata,
516 ) {
517 let builder = self.get_dfir_mut(location);
518 builder.add_dfir(
519 parse_quote! {
520 #out_ident = #in_ident;
521 },
522 None,
523 None,
524 );
525 }
526
527 fn create_network(
528 &mut self,
529 from: &LocationId,
530 to: &LocationId,
531 input_ident: syn::Ident,
532 out_ident: &syn::Ident,
533 serialize: Option<&DebugExpr>,
534 sink: syn::Expr,
535 source: syn::Expr,
536 deserialize: Option<&DebugExpr>,
537 tag_id: usize,
538 ) {
539 let sender_builder = self.get_dfir_mut(from);
540 if let Some(serialize_pipeline) = serialize {
541 sender_builder.add_dfir(
542 parse_quote! {
543 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
544 },
545 None,
546 Some(&format!("send{}", tag_id)),
548 );
549 } else {
550 sender_builder.add_dfir(
551 parse_quote! {
552 #input_ident -> dest_sink(#sink);
553 },
554 None,
555 Some(&format!("send{}", tag_id)),
556 );
557 }
558
559 let receiver_builder = self.get_dfir_mut(to);
560 if let Some(deserialize_pipeline) = deserialize {
561 receiver_builder.add_dfir(
562 parse_quote! {
563 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
564 },
565 None,
566 Some(&format!("recv{}", tag_id)),
567 );
568 } else {
569 receiver_builder.add_dfir(
570 parse_quote! {
571 #out_ident = source_stream(#source);
572 },
573 None,
574 Some(&format!("recv{}", tag_id)),
575 );
576 }
577 }
578
579 fn create_external_source(
580 &mut self,
581 on: &LocationId,
582 source_expr: syn::Expr,
583 out_ident: &syn::Ident,
584 deserialize: Option<&DebugExpr>,
585 tag_id: usize,
586 ) {
587 let receiver_builder = self.get_dfir_mut(on);
588 if let Some(deserialize_pipeline) = deserialize {
589 receiver_builder.add_dfir(
590 parse_quote! {
591 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
592 },
593 None,
594 Some(&format!("recv{}", tag_id)),
595 );
596 } else {
597 receiver_builder.add_dfir(
598 parse_quote! {
599 #out_ident = source_stream(#source_expr);
600 },
601 None,
602 Some(&format!("recv{}", tag_id)),
603 );
604 }
605 }
606
607 fn create_external_output(
608 &mut self,
609 on: &LocationId,
610 sink_expr: syn::Expr,
611 input_ident: &syn::Ident,
612 serialize: Option<&DebugExpr>,
613 tag_id: usize,
614 ) {
615 let sender_builder = self.get_dfir_mut(on);
616 if let Some(serialize_fn) = serialize {
617 sender_builder.add_dfir(
618 parse_quote! {
619 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
620 },
621 None,
622 Some(&format!("send{}", tag_id)),
624 );
625 } else {
626 sender_builder.add_dfir(
627 parse_quote! {
628 #input_ident -> dest_sink(#sink_expr);
629 },
630 None,
631 Some(&format!("send{}", tag_id)),
632 );
633 }
634 }
635}
636
637#[cfg(feature = "build")]
638pub enum BuildersOrCallback<'a, L, N>
639where
640 L: FnMut(&mut HydroRoot, &mut usize),
641 N: FnMut(&mut HydroNode, &mut usize),
642{
643 Builders(&'a mut dyn DfirBuilder),
644 Callback(L, N),
645}
646
647#[derive(Debug, Hash)]
651pub enum HydroRoot {
652 ForEach {
653 f: DebugExpr,
654 input: Box<HydroNode>,
655 op_metadata: HydroIrOpMetadata,
656 },
657 SendExternal {
658 to_external_key: LocationKey,
659 to_port_id: ExternalPortId,
660 to_many: bool,
661 unpaired: bool,
662 serialize_fn: Option<DebugExpr>,
663 instantiate_fn: DebugInstantiate,
664 input: Box<HydroNode>,
665 op_metadata: HydroIrOpMetadata,
666 },
667 DestSink {
668 sink: DebugExpr,
669 input: Box<HydroNode>,
670 op_metadata: HydroIrOpMetadata,
671 },
672 CycleSink {
673 ident: syn::Ident,
674 input: Box<HydroNode>,
675 op_metadata: HydroIrOpMetadata,
676 },
677}
678
679impl HydroRoot {
680 #[cfg(feature = "build")]
681 pub fn compile_network<'a, D>(
682 &mut self,
683 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
684 seen_tees: &mut SeenTees,
685 processes: &SparseSecondaryMap<LocationKey, D::Process>,
686 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
687 externals: &SparseSecondaryMap<LocationKey, D::External>,
688 env: &mut D::InstantiateEnv,
689 ) where
690 D: Deploy<'a>,
691 {
692 let refcell_extra_stmts = RefCell::new(extra_stmts);
693 let refcell_env = RefCell::new(env);
694 self.transform_bottom_up(
695 &mut |l| {
696 if let HydroRoot::SendExternal {
697 input,
698 to_external_key,
699 to_port_id,
700 to_many,
701 unpaired,
702 instantiate_fn,
703 ..
704 } = l
705 {
706 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
707 DebugInstantiate::Building => {
708 let to_node = externals
709 .get(*to_external_key)
710 .unwrap_or_else(|| {
711 panic!("A external used in the graph was not instantiated: {}", to_external_key)
712 })
713 .clone();
714
715 match input.metadata().location_id.root() {
716 &LocationId::Process(process_key) => {
717 if *to_many {
718 (
719 (
720 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
721 parse_quote!(DUMMY),
722 ),
723 Box::new(|| {}) as Box<dyn FnOnce()>,
724 )
725 } else {
726 let from_node = processes
727 .get(process_key)
728 .unwrap_or_else(|| {
729 panic!("A process used in the graph was not instantiated: {}", process_key)
730 })
731 .clone();
732
733 let sink_port = from_node.next_port();
734 let source_port = to_node.next_port();
735
736 if *unpaired {
737 use stageleft::quote_type;
738 use tokio_util::codec::LengthDelimitedCodec;
739
740 to_node.register(*to_port_id, source_port.clone());
741
742 let _ = D::e2o_source(
743 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
744 &to_node, &source_port,
745 &from_node, &sink_port,
746 "e_type::<LengthDelimitedCodec>(),
747 format!("{}_{}", *to_external_key, *to_port_id)
748 );
749 }
750
751 (
752 (
753 D::o2e_sink(
754 &from_node,
755 &sink_port,
756 &to_node,
757 &source_port,
758 format!("{}_{}", *to_external_key, *to_port_id)
759 ),
760 parse_quote!(DUMMY),
761 ),
762 if *unpaired {
763 D::e2o_connect(
764 &to_node,
765 &source_port,
766 &from_node,
767 &sink_port,
768 *to_many,
769 NetworkHint::Auto,
770 )
771 } else {
772 Box::new(|| {}) as Box<dyn FnOnce()>
773 },
774 )
775 }
776 }
777 LocationId::Cluster(_) => todo!(),
778 _ => panic!()
779 }
780 },
781
782 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
783 };
784
785 *instantiate_fn = DebugInstantiateFinalized {
786 sink: sink_expr,
787 source: source_expr,
788 connect_fn: Some(connect_fn),
789 }
790 .into();
791 }
792 },
793 &mut |n| {
794 if let HydroNode::Network {
795 input,
796 instantiate_fn,
797 metadata,
798 ..
799 } = n
800 {
801 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
802 DebugInstantiate::Building => instantiate_network::<D>(
803 input.metadata().location_id.root(),
804 metadata.location_id.root(),
805 processes,
806 clusters,
807 ),
808
809 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
810 };
811
812 *instantiate_fn = DebugInstantiateFinalized {
813 sink: sink_expr,
814 source: source_expr,
815 connect_fn: Some(connect_fn),
816 }
817 .into();
818 } else if let HydroNode::ExternalInput {
819 from_external_key,
820 from_port_id,
821 from_many,
822 codec_type,
823 port_hint,
824 instantiate_fn,
825 metadata,
826 ..
827 } = n
828 {
829 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
830 DebugInstantiate::Building => {
831 let from_node = externals
832 .get(*from_external_key)
833 .unwrap_or_else(|| {
834 panic!(
835 "A external used in the graph was not instantiated: {}",
836 from_external_key,
837 )
838 })
839 .clone();
840
841 match metadata.location_id.root() {
842 &LocationId::Process(process_key) => {
843 let to_node = processes
844 .get(process_key)
845 .unwrap_or_else(|| {
846 panic!("A process used in the graph was not instantiated: {}", process_key)
847 })
848 .clone();
849
850 let sink_port = from_node.next_port();
851 let source_port = to_node.next_port();
852
853 from_node.register(*from_port_id, sink_port.clone());
854
855 (
856 (
857 parse_quote!(DUMMY),
858 if *from_many {
859 D::e2o_many_source(
860 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
861 &to_node, &source_port,
862 codec_type.0.as_ref(),
863 format!("{}_{}", *from_external_key, *from_port_id)
864 )
865 } else {
866 D::e2o_source(
867 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
868 &from_node, &sink_port,
869 &to_node, &source_port,
870 codec_type.0.as_ref(),
871 format!("{}_{}", *from_external_key, *from_port_id)
872 )
873 },
874 ),
875 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
876 )
877 }
878 LocationId::Cluster(_) => todo!(),
879 _ => panic!()
880 }
881 },
882
883 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
884 };
885
886 *instantiate_fn = DebugInstantiateFinalized {
887 sink: sink_expr,
888 source: source_expr,
889 connect_fn: Some(connect_fn),
890 }
891 .into();
892 } else if let HydroNode::EmbeddedInput { ident, metadata } = n {
893 let element_type = match &metadata.collection_kind {
894 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
895 _ => panic!("EmbeddedInput must have Stream collection kind"),
896 };
897 D::register_embedded_input(
898 &mut refcell_env.borrow_mut(),
899 ident,
900 &element_type,
901 );
902 }
903 },
904 seen_tees,
905 false,
906 );
907 }
908
909 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
910 self.transform_bottom_up(
911 &mut |l| {
912 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
913 match instantiate_fn {
914 DebugInstantiate::Building => panic!("network not built"),
915
916 DebugInstantiate::Finalized(finalized) => {
917 (finalized.connect_fn.take().unwrap())();
918 }
919 }
920 }
921 },
922 &mut |n| {
923 if let HydroNode::Network { instantiate_fn, .. }
924 | HydroNode::ExternalInput { instantiate_fn, .. } = n
925 {
926 match instantiate_fn {
927 DebugInstantiate::Building => panic!("network not built"),
928
929 DebugInstantiate::Finalized(finalized) => {
930 (finalized.connect_fn.take().unwrap())();
931 }
932 }
933 }
934 },
935 seen_tees,
936 false,
937 );
938 }
939
940 pub fn transform_bottom_up(
941 &mut self,
942 transform_root: &mut impl FnMut(&mut HydroRoot),
943 transform_node: &mut impl FnMut(&mut HydroNode),
944 seen_tees: &mut SeenTees,
945 check_well_formed: bool,
946 ) {
947 self.transform_children(
948 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
949 seen_tees,
950 );
951
952 transform_root(self);
953 }
954
955 pub fn transform_children(
956 &mut self,
957 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
958 seen_tees: &mut SeenTees,
959 ) {
960 match self {
961 HydroRoot::ForEach { input, .. }
962 | HydroRoot::SendExternal { input, .. }
963 | HydroRoot::DestSink { input, .. }
964 | HydroRoot::CycleSink { input, .. } => {
965 transform(input, seen_tees);
966 }
967 }
968 }
969
970 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
971 match self {
972 HydroRoot::ForEach {
973 f,
974 input,
975 op_metadata,
976 } => HydroRoot::ForEach {
977 f: f.clone(),
978 input: Box::new(input.deep_clone(seen_tees)),
979 op_metadata: op_metadata.clone(),
980 },
981 HydroRoot::SendExternal {
982 to_external_key,
983 to_port_id,
984 to_many,
985 unpaired,
986 serialize_fn,
987 instantiate_fn,
988 input,
989 op_metadata,
990 } => HydroRoot::SendExternal {
991 to_external_key: *to_external_key,
992 to_port_id: *to_port_id,
993 to_many: *to_many,
994 unpaired: *unpaired,
995 serialize_fn: serialize_fn.clone(),
996 instantiate_fn: instantiate_fn.clone(),
997 input: Box::new(input.deep_clone(seen_tees)),
998 op_metadata: op_metadata.clone(),
999 },
1000 HydroRoot::DestSink {
1001 sink,
1002 input,
1003 op_metadata,
1004 } => HydroRoot::DestSink {
1005 sink: sink.clone(),
1006 input: Box::new(input.deep_clone(seen_tees)),
1007 op_metadata: op_metadata.clone(),
1008 },
1009 HydroRoot::CycleSink {
1010 ident,
1011 input,
1012 op_metadata,
1013 } => HydroRoot::CycleSink {
1014 ident: ident.clone(),
1015 input: Box::new(input.deep_clone(seen_tees)),
1016 op_metadata: op_metadata.clone(),
1017 },
1018 }
1019 }
1020
1021 #[cfg(feature = "build")]
1022 pub fn emit<'a, D: Deploy<'a>>(
1023 &mut self,
1024 graph_builders: &mut dyn DfirBuilder,
1025 seen_tees: &mut SeenTees,
1026 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1027 next_stmt_id: &mut usize,
1028 ) {
1029 self.emit_core::<D>(
1030 &mut BuildersOrCallback::<
1031 fn(&mut HydroRoot, &mut usize),
1032 fn(&mut HydroNode, &mut usize),
1033 >::Builders(graph_builders),
1034 seen_tees,
1035 built_tees,
1036 next_stmt_id,
1037 );
1038 }
1039
1040 #[cfg(feature = "build")]
1041 pub fn emit_core<'a, D: Deploy<'a>>(
1042 &mut self,
1043 builders_or_callback: &mut BuildersOrCallback<
1044 impl FnMut(&mut HydroRoot, &mut usize),
1045 impl FnMut(&mut HydroNode, &mut usize),
1046 >,
1047 seen_tees: &mut SeenTees,
1048 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1049 next_stmt_id: &mut usize,
1050 ) {
1051 match self {
1052 HydroRoot::ForEach { f, input, .. } => {
1053 let input_ident =
1054 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1055
1056 match builders_or_callback {
1057 BuildersOrCallback::Builders(graph_builders) => {
1058 graph_builders
1059 .get_dfir_mut(&input.metadata().location_id)
1060 .add_dfir(
1061 parse_quote! {
1062 #input_ident -> for_each(#f);
1063 },
1064 None,
1065 Some(&next_stmt_id.to_string()),
1066 );
1067 }
1068 BuildersOrCallback::Callback(leaf_callback, _) => {
1069 leaf_callback(self, next_stmt_id);
1070 }
1071 }
1072
1073 *next_stmt_id += 1;
1074 }
1075
1076 HydroRoot::SendExternal {
1077 serialize_fn,
1078 instantiate_fn,
1079 input,
1080 ..
1081 } => {
1082 let input_ident =
1083 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1084
1085 match builders_or_callback {
1086 BuildersOrCallback::Builders(graph_builders) => {
1087 let (sink_expr, _) = match instantiate_fn {
1088 DebugInstantiate::Building => (
1089 syn::parse_quote!(DUMMY_SINK),
1090 syn::parse_quote!(DUMMY_SOURCE),
1091 ),
1092
1093 DebugInstantiate::Finalized(finalized) => {
1094 (finalized.sink.clone(), finalized.source.clone())
1095 }
1096 };
1097
1098 graph_builders.create_external_output(
1099 &input.metadata().location_id,
1100 sink_expr,
1101 &input_ident,
1102 serialize_fn.as_ref(),
1103 *next_stmt_id,
1104 );
1105 }
1106 BuildersOrCallback::Callback(leaf_callback, _) => {
1107 leaf_callback(self, next_stmt_id);
1108 }
1109 }
1110
1111 *next_stmt_id += 1;
1112 }
1113
1114 HydroRoot::DestSink { sink, input, .. } => {
1115 let input_ident =
1116 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1117
1118 match builders_or_callback {
1119 BuildersOrCallback::Builders(graph_builders) => {
1120 graph_builders
1121 .get_dfir_mut(&input.metadata().location_id)
1122 .add_dfir(
1123 parse_quote! {
1124 #input_ident -> dest_sink(#sink);
1125 },
1126 None,
1127 Some(&next_stmt_id.to_string()),
1128 );
1129 }
1130 BuildersOrCallback::Callback(leaf_callback, _) => {
1131 leaf_callback(self, next_stmt_id);
1132 }
1133 }
1134
1135 *next_stmt_id += 1;
1136 }
1137
1138 HydroRoot::CycleSink { ident, input, .. } => {
1139 let input_ident =
1140 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1141
1142 match builders_or_callback {
1143 BuildersOrCallback::Builders(graph_builders) => {
1144 let elem_type: syn::Type = match &input.metadata().collection_kind {
1145 CollectionKind::KeyedSingleton {
1146 key_type,
1147 value_type,
1148 ..
1149 }
1150 | CollectionKind::KeyedStream {
1151 key_type,
1152 value_type,
1153 ..
1154 } => {
1155 parse_quote!((#key_type, #value_type))
1156 }
1157 CollectionKind::Stream { element_type, .. }
1158 | CollectionKind::Singleton { element_type, .. }
1159 | CollectionKind::Optional { element_type, .. } => {
1160 parse_quote!(#element_type)
1161 }
1162 };
1163
1164 graph_builders
1165 .get_dfir_mut(&input.metadata().location_id)
1166 .add_dfir(
1167 parse_quote! {
1168 #ident = #input_ident -> identity::<#elem_type>();
1169 },
1170 None,
1171 None,
1172 );
1173 }
1174 BuildersOrCallback::Callback(_, _) => {}
1176 }
1177 }
1178 }
1179 }
1180
1181 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1182 match self {
1183 HydroRoot::ForEach { op_metadata, .. }
1184 | HydroRoot::SendExternal { op_metadata, .. }
1185 | HydroRoot::DestSink { op_metadata, .. }
1186 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1187 }
1188 }
1189
1190 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1191 match self {
1192 HydroRoot::ForEach { op_metadata, .. }
1193 | HydroRoot::SendExternal { op_metadata, .. }
1194 | HydroRoot::DestSink { op_metadata, .. }
1195 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1196 }
1197 }
1198
1199 pub fn input(&self) -> &HydroNode {
1200 match self {
1201 HydroRoot::ForEach { input, .. }
1202 | HydroRoot::SendExternal { input, .. }
1203 | HydroRoot::DestSink { input, .. }
1204 | HydroRoot::CycleSink { input, .. } => input,
1205 }
1206 }
1207
1208 pub fn input_metadata(&self) -> &HydroIrMetadata {
1209 self.input().metadata()
1210 }
1211
1212 pub fn print_root(&self) -> String {
1213 match self {
1214 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1215 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1216 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1217 HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1218 }
1219 }
1220
1221 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1222 match self {
1223 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1224 transform(f);
1225 }
1226 HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1227 }
1228 }
1229}
1230
1231#[cfg(feature = "build")]
1232pub fn emit<'a, D: Deploy<'a>>(
1233 ir: &mut Vec<HydroRoot>,
1234) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1235 let mut builders = SecondaryMap::new();
1236 let mut seen_tees = HashMap::new();
1237 let mut built_tees = HashMap::new();
1238 let mut next_stmt_id = 0;
1239 for leaf in ir {
1240 leaf.emit::<D>(
1241 &mut builders,
1242 &mut seen_tees,
1243 &mut built_tees,
1244 &mut next_stmt_id,
1245 );
1246 }
1247 builders
1248}
1249
1250#[cfg(feature = "build")]
1251pub fn traverse_dfir<'a, D: Deploy<'a>>(
1252 ir: &mut [HydroRoot],
1253 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1254 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1255) {
1256 let mut seen_tees = HashMap::new();
1257 let mut built_tees = HashMap::new();
1258 let mut next_stmt_id = 0;
1259 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1260 ir.iter_mut().for_each(|leaf| {
1261 leaf.emit_core::<D>(
1262 &mut callback,
1263 &mut seen_tees,
1264 &mut built_tees,
1265 &mut next_stmt_id,
1266 );
1267 });
1268}
1269
1270pub fn transform_bottom_up(
1271 ir: &mut [HydroRoot],
1272 transform_root: &mut impl FnMut(&mut HydroRoot),
1273 transform_node: &mut impl FnMut(&mut HydroNode),
1274 check_well_formed: bool,
1275) {
1276 let mut seen_tees = HashMap::new();
1277 ir.iter_mut().for_each(|leaf| {
1278 leaf.transform_bottom_up(
1279 transform_root,
1280 transform_node,
1281 &mut seen_tees,
1282 check_well_formed,
1283 );
1284 });
1285}
1286
1287pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1288 let mut seen_tees = HashMap::new();
1289 ir.iter()
1290 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1291 .collect()
1292}
1293
1294type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1295thread_local! {
1296 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1297}
1298
1299pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1300 PRINTED_TEES.with(|printed_tees| {
1301 let mut printed_tees_mut = printed_tees.borrow_mut();
1302 *printed_tees_mut = Some((0, HashMap::new()));
1303 drop(printed_tees_mut);
1304
1305 let ret = f();
1306
1307 let mut printed_tees_mut = printed_tees.borrow_mut();
1308 *printed_tees_mut = None;
1309
1310 ret
1311 })
1312}
1313
1314pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1315
1316impl TeeNode {
1317 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1318 Rc::as_ptr(&self.0)
1319 }
1320}
1321
1322impl Debug for TeeNode {
1323 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1324 PRINTED_TEES.with(|printed_tees| {
1325 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1326 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1327
1328 if let Some(printed_tees_mut) = printed_tees_mut {
1329 if let Some(existing) = printed_tees_mut
1330 .1
1331 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1332 {
1333 write!(f, "<tee {}>", existing)
1334 } else {
1335 let next_id = printed_tees_mut.0;
1336 printed_tees_mut.0 += 1;
1337 printed_tees_mut
1338 .1
1339 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1340 drop(printed_tees_mut_borrow);
1341 write!(f, "<tee {}>: ", next_id)?;
1342 Debug::fmt(&self.0.borrow(), f)
1343 }
1344 } else {
1345 drop(printed_tees_mut_borrow);
1346 write!(f, "<tee>: ")?;
1347 Debug::fmt(&self.0.borrow(), f)
1348 }
1349 })
1350 }
1351}
1352
1353impl Hash for TeeNode {
1354 fn hash<H: Hasher>(&self, state: &mut H) {
1355 self.0.borrow_mut().hash(state);
1356 }
1357}
1358
1359#[derive(Clone, PartialEq, Eq, Debug)]
1360pub enum BoundKind {
1361 Unbounded,
1362 Bounded,
1363}
1364
1365#[derive(Clone, PartialEq, Eq, Debug)]
1366pub enum StreamOrder {
1367 NoOrder,
1368 TotalOrder,
1369}
1370
1371#[derive(Clone, PartialEq, Eq, Debug)]
1372pub enum StreamRetry {
1373 AtLeastOnce,
1374 ExactlyOnce,
1375}
1376
1377#[derive(Clone, PartialEq, Eq, Debug)]
1378pub enum KeyedSingletonBoundKind {
1379 Unbounded,
1380 BoundedValue,
1381 Bounded,
1382}
1383
1384#[derive(Clone, PartialEq, Eq, Debug)]
1385pub enum CollectionKind {
1386 Stream {
1387 bound: BoundKind,
1388 order: StreamOrder,
1389 retry: StreamRetry,
1390 element_type: DebugType,
1391 },
1392 Singleton {
1393 bound: BoundKind,
1394 element_type: DebugType,
1395 },
1396 Optional {
1397 bound: BoundKind,
1398 element_type: DebugType,
1399 },
1400 KeyedStream {
1401 bound: BoundKind,
1402 value_order: StreamOrder,
1403 value_retry: StreamRetry,
1404 key_type: DebugType,
1405 value_type: DebugType,
1406 },
1407 KeyedSingleton {
1408 bound: KeyedSingletonBoundKind,
1409 key_type: DebugType,
1410 value_type: DebugType,
1411 },
1412}
1413
1414impl CollectionKind {
1415 pub fn is_bounded(&self) -> bool {
1416 matches!(
1417 self,
1418 CollectionKind::Stream {
1419 bound: BoundKind::Bounded,
1420 ..
1421 } | CollectionKind::Singleton {
1422 bound: BoundKind::Bounded,
1423 ..
1424 } | CollectionKind::Optional {
1425 bound: BoundKind::Bounded,
1426 ..
1427 } | CollectionKind::KeyedStream {
1428 bound: BoundKind::Bounded,
1429 ..
1430 } | CollectionKind::KeyedSingleton {
1431 bound: KeyedSingletonBoundKind::Bounded,
1432 ..
1433 }
1434 )
1435 }
1436}
1437
1438#[derive(Clone)]
1439pub struct HydroIrMetadata {
1440 pub location_id: LocationId,
1441 pub collection_kind: CollectionKind,
1442 pub cardinality: Option<usize>,
1443 pub tag: Option<String>,
1444 pub op: HydroIrOpMetadata,
1445}
1446
1447impl Hash for HydroIrMetadata {
1449 fn hash<H: Hasher>(&self, _: &mut H) {}
1450}
1451
1452impl PartialEq for HydroIrMetadata {
1453 fn eq(&self, _: &Self) -> bool {
1454 true
1455 }
1456}
1457
1458impl Eq for HydroIrMetadata {}
1459
1460impl Debug for HydroIrMetadata {
1461 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1462 f.debug_struct("HydroIrMetadata")
1463 .field("location_id", &self.location_id)
1464 .field("collection_kind", &self.collection_kind)
1465 .finish()
1466 }
1467}
1468
1469#[derive(Clone)]
1472pub struct HydroIrOpMetadata {
1473 pub backtrace: Backtrace,
1474 pub cpu_usage: Option<f64>,
1475 pub network_recv_cpu_usage: Option<f64>,
1476 pub id: Option<usize>,
1477}
1478
1479impl HydroIrOpMetadata {
1480 #[expect(
1481 clippy::new_without_default,
1482 reason = "explicit calls to new ensure correct backtrace bounds"
1483 )]
1484 pub fn new() -> HydroIrOpMetadata {
1485 Self::new_with_skip(1)
1486 }
1487
1488 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1489 HydroIrOpMetadata {
1490 backtrace: Backtrace::get_backtrace(2 + skip_count),
1491 cpu_usage: None,
1492 network_recv_cpu_usage: None,
1493 id: None,
1494 }
1495 }
1496}
1497
1498impl Debug for HydroIrOpMetadata {
1499 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1500 f.debug_struct("HydroIrOpMetadata").finish()
1501 }
1502}
1503
1504impl Hash for HydroIrOpMetadata {
1505 fn hash<H: Hasher>(&self, _: &mut H) {}
1506}
1507
1508#[derive(Debug, Hash)]
1511pub enum HydroNode {
1512 Placeholder,
1513
1514 Cast {
1522 inner: Box<HydroNode>,
1523 metadata: HydroIrMetadata,
1524 },
1525
1526 ObserveNonDet {
1532 inner: Box<HydroNode>,
1533 trusted: bool, metadata: HydroIrMetadata,
1535 },
1536
1537 Source {
1538 source: HydroSource,
1539 metadata: HydroIrMetadata,
1540 },
1541
1542 SingletonSource {
1543 value: DebugExpr,
1544 metadata: HydroIrMetadata,
1545 },
1546
1547 CycleSource {
1548 ident: syn::Ident,
1549 metadata: HydroIrMetadata,
1550 },
1551
1552 Tee {
1553 inner: TeeNode,
1554 metadata: HydroIrMetadata,
1555 },
1556
1557 BeginAtomic {
1558 inner: Box<HydroNode>,
1559 metadata: HydroIrMetadata,
1560 },
1561
1562 EndAtomic {
1563 inner: Box<HydroNode>,
1564 metadata: HydroIrMetadata,
1565 },
1566
1567 Batch {
1568 inner: Box<HydroNode>,
1569 metadata: HydroIrMetadata,
1570 },
1571
1572 YieldConcat {
1573 inner: Box<HydroNode>,
1574 metadata: HydroIrMetadata,
1575 },
1576
1577 Chain {
1578 first: Box<HydroNode>,
1579 second: Box<HydroNode>,
1580 metadata: HydroIrMetadata,
1581 },
1582
1583 ChainFirst {
1584 first: Box<HydroNode>,
1585 second: Box<HydroNode>,
1586 metadata: HydroIrMetadata,
1587 },
1588
1589 CrossProduct {
1590 left: Box<HydroNode>,
1591 right: Box<HydroNode>,
1592 metadata: HydroIrMetadata,
1593 },
1594
1595 CrossSingleton {
1596 left: Box<HydroNode>,
1597 right: Box<HydroNode>,
1598 metadata: HydroIrMetadata,
1599 },
1600
1601 Join {
1602 left: Box<HydroNode>,
1603 right: Box<HydroNode>,
1604 metadata: HydroIrMetadata,
1605 },
1606
1607 Difference {
1608 pos: Box<HydroNode>,
1609 neg: Box<HydroNode>,
1610 metadata: HydroIrMetadata,
1611 },
1612
1613 AntiJoin {
1614 pos: Box<HydroNode>,
1615 neg: Box<HydroNode>,
1616 metadata: HydroIrMetadata,
1617 },
1618
1619 ResolveFutures {
1620 input: Box<HydroNode>,
1621 metadata: HydroIrMetadata,
1622 },
1623 ResolveFuturesOrdered {
1624 input: Box<HydroNode>,
1625 metadata: HydroIrMetadata,
1626 },
1627
1628 Map {
1629 f: DebugExpr,
1630 input: Box<HydroNode>,
1631 metadata: HydroIrMetadata,
1632 },
1633 FlatMap {
1634 f: DebugExpr,
1635 input: Box<HydroNode>,
1636 metadata: HydroIrMetadata,
1637 },
1638 Filter {
1639 f: DebugExpr,
1640 input: Box<HydroNode>,
1641 metadata: HydroIrMetadata,
1642 },
1643 FilterMap {
1644 f: DebugExpr,
1645 input: Box<HydroNode>,
1646 metadata: HydroIrMetadata,
1647 },
1648
1649 DeferTick {
1650 input: Box<HydroNode>,
1651 metadata: HydroIrMetadata,
1652 },
1653 Enumerate {
1654 input: Box<HydroNode>,
1655 metadata: HydroIrMetadata,
1656 },
1657 Inspect {
1658 f: DebugExpr,
1659 input: Box<HydroNode>,
1660 metadata: HydroIrMetadata,
1661 },
1662
1663 Unique {
1664 input: Box<HydroNode>,
1665 metadata: HydroIrMetadata,
1666 },
1667
1668 Sort {
1669 input: Box<HydroNode>,
1670 metadata: HydroIrMetadata,
1671 },
1672 Fold {
1673 init: DebugExpr,
1674 acc: DebugExpr,
1675 input: Box<HydroNode>,
1676 metadata: HydroIrMetadata,
1677 },
1678
1679 Scan {
1680 init: DebugExpr,
1681 acc: DebugExpr,
1682 input: Box<HydroNode>,
1683 metadata: HydroIrMetadata,
1684 },
1685 FoldKeyed {
1686 init: DebugExpr,
1687 acc: DebugExpr,
1688 input: Box<HydroNode>,
1689 metadata: HydroIrMetadata,
1690 },
1691
1692 Reduce {
1693 f: DebugExpr,
1694 input: Box<HydroNode>,
1695 metadata: HydroIrMetadata,
1696 },
1697 ReduceKeyed {
1698 f: DebugExpr,
1699 input: Box<HydroNode>,
1700 metadata: HydroIrMetadata,
1701 },
1702 ReduceKeyedWatermark {
1703 f: DebugExpr,
1704 input: Box<HydroNode>,
1705 watermark: Box<HydroNode>,
1706 metadata: HydroIrMetadata,
1707 },
1708
1709 Network {
1710 name: Option<String>,
1711 serialize_fn: Option<DebugExpr>,
1712 instantiate_fn: DebugInstantiate,
1713 deserialize_fn: Option<DebugExpr>,
1714 input: Box<HydroNode>,
1715 metadata: HydroIrMetadata,
1716 },
1717
1718 ExternalInput {
1719 from_external_key: LocationKey,
1720 from_port_id: ExternalPortId,
1721 from_many: bool,
1722 codec_type: DebugType,
1723 port_hint: NetworkHint,
1724 instantiate_fn: DebugInstantiate,
1725 deserialize_fn: Option<DebugExpr>,
1726 metadata: HydroIrMetadata,
1727 },
1728
1729 Counter {
1730 tag: String,
1731 duration: DebugExpr,
1732 prefix: String,
1733 input: Box<HydroNode>,
1734 metadata: HydroIrMetadata,
1735 },
1736
1737 EmbeddedInput {
1742 ident: syn::Ident,
1743 metadata: HydroIrMetadata,
1744 },
1745}
1746
1747pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1748pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1749
1750impl HydroNode {
1751 pub fn transform_bottom_up(
1752 &mut self,
1753 transform: &mut impl FnMut(&mut HydroNode),
1754 seen_tees: &mut SeenTees,
1755 check_well_formed: bool,
1756 ) {
1757 self.transform_children(
1758 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1759 seen_tees,
1760 );
1761
1762 transform(self);
1763
1764 let self_location = self.metadata().location_id.root();
1765
1766 if check_well_formed {
1767 match &*self {
1768 HydroNode::Network { .. } => {}
1769 _ => {
1770 self.input_metadata().iter().for_each(|i| {
1771 if i.location_id.root() != self_location {
1772 panic!(
1773 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1774 i,
1775 i.location_id.root(),
1776 self,
1777 self_location
1778 )
1779 }
1780 });
1781 }
1782 }
1783 }
1784 }
1785
1786 #[inline(always)]
1787 pub fn transform_children(
1788 &mut self,
1789 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1790 seen_tees: &mut SeenTees,
1791 ) {
1792 match self {
1793 HydroNode::Placeholder => {
1794 panic!();
1795 }
1796
1797 HydroNode::Source { .. }
1798 | HydroNode::SingletonSource { .. }
1799 | HydroNode::CycleSource { .. }
1800 | HydroNode::ExternalInput { .. }
1801 | HydroNode::EmbeddedInput { .. } => {}
1802
1803 HydroNode::Tee { inner, .. } => {
1804 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1805 *inner = TeeNode(transformed.clone());
1806 } else {
1807 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1808 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1809 let mut orig = inner.0.replace(HydroNode::Placeholder);
1810 transform(&mut orig, seen_tees);
1811 *transformed_cell.borrow_mut() = orig;
1812 *inner = TeeNode(transformed_cell);
1813 }
1814 }
1815
1816 HydroNode::Cast { inner, .. }
1817 | HydroNode::ObserveNonDet { inner, .. }
1818 | HydroNode::BeginAtomic { inner, .. }
1819 | HydroNode::EndAtomic { inner, .. }
1820 | HydroNode::Batch { inner, .. }
1821 | HydroNode::YieldConcat { inner, .. } => {
1822 transform(inner.as_mut(), seen_tees);
1823 }
1824
1825 HydroNode::Chain { first, second, .. } => {
1826 transform(first.as_mut(), seen_tees);
1827 transform(second.as_mut(), seen_tees);
1828 }
1829
1830 HydroNode::ChainFirst { first, second, .. } => {
1831 transform(first.as_mut(), seen_tees);
1832 transform(second.as_mut(), seen_tees);
1833 }
1834
1835 HydroNode::CrossSingleton { left, right, .. }
1836 | HydroNode::CrossProduct { left, right, .. }
1837 | HydroNode::Join { left, right, .. } => {
1838 transform(left.as_mut(), seen_tees);
1839 transform(right.as_mut(), seen_tees);
1840 }
1841
1842 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1843 transform(pos.as_mut(), seen_tees);
1844 transform(neg.as_mut(), seen_tees);
1845 }
1846
1847 HydroNode::ReduceKeyedWatermark {
1848 input, watermark, ..
1849 } => {
1850 transform(input.as_mut(), seen_tees);
1851 transform(watermark.as_mut(), seen_tees);
1852 }
1853
1854 HydroNode::Map { input, .. }
1855 | HydroNode::ResolveFutures { input, .. }
1856 | HydroNode::ResolveFuturesOrdered { input, .. }
1857 | HydroNode::FlatMap { input, .. }
1858 | HydroNode::Filter { input, .. }
1859 | HydroNode::FilterMap { input, .. }
1860 | HydroNode::Sort { input, .. }
1861 | HydroNode::DeferTick { input, .. }
1862 | HydroNode::Enumerate { input, .. }
1863 | HydroNode::Inspect { input, .. }
1864 | HydroNode::Unique { input, .. }
1865 | HydroNode::Network { input, .. }
1866 | HydroNode::Fold { input, .. }
1867 | HydroNode::Scan { input, .. }
1868 | HydroNode::FoldKeyed { input, .. }
1869 | HydroNode::Reduce { input, .. }
1870 | HydroNode::ReduceKeyed { input, .. }
1871 | HydroNode::Counter { input, .. } => {
1872 transform(input.as_mut(), seen_tees);
1873 }
1874 }
1875 }
1876
1877 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1878 match self {
1879 HydroNode::Placeholder => HydroNode::Placeholder,
1880 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1881 inner: Box::new(inner.deep_clone(seen_tees)),
1882 metadata: metadata.clone(),
1883 },
1884 HydroNode::ObserveNonDet {
1885 inner,
1886 trusted,
1887 metadata,
1888 } => HydroNode::ObserveNonDet {
1889 inner: Box::new(inner.deep_clone(seen_tees)),
1890 trusted: *trusted,
1891 metadata: metadata.clone(),
1892 },
1893 HydroNode::Source { source, metadata } => HydroNode::Source {
1894 source: source.clone(),
1895 metadata: metadata.clone(),
1896 },
1897 HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1898 value: value.clone(),
1899 metadata: metadata.clone(),
1900 },
1901 HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1902 ident: ident.clone(),
1903 metadata: metadata.clone(),
1904 },
1905 HydroNode::Tee { inner, metadata } => {
1906 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1907 HydroNode::Tee {
1908 inner: TeeNode(transformed.clone()),
1909 metadata: metadata.clone(),
1910 }
1911 } else {
1912 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1913 seen_tees.insert(inner.as_ptr(), new_rc.clone());
1914 let cloned = inner.0.borrow().deep_clone(seen_tees);
1915 *new_rc.borrow_mut() = cloned;
1916 HydroNode::Tee {
1917 inner: TeeNode(new_rc),
1918 metadata: metadata.clone(),
1919 }
1920 }
1921 }
1922 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1923 inner: Box::new(inner.deep_clone(seen_tees)),
1924 metadata: metadata.clone(),
1925 },
1926 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1927 inner: Box::new(inner.deep_clone(seen_tees)),
1928 metadata: metadata.clone(),
1929 },
1930 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1931 inner: Box::new(inner.deep_clone(seen_tees)),
1932 metadata: metadata.clone(),
1933 },
1934 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1935 inner: Box::new(inner.deep_clone(seen_tees)),
1936 metadata: metadata.clone(),
1937 },
1938 HydroNode::Chain {
1939 first,
1940 second,
1941 metadata,
1942 } => HydroNode::Chain {
1943 first: Box::new(first.deep_clone(seen_tees)),
1944 second: Box::new(second.deep_clone(seen_tees)),
1945 metadata: metadata.clone(),
1946 },
1947 HydroNode::ChainFirst {
1948 first,
1949 second,
1950 metadata,
1951 } => HydroNode::ChainFirst {
1952 first: Box::new(first.deep_clone(seen_tees)),
1953 second: Box::new(second.deep_clone(seen_tees)),
1954 metadata: metadata.clone(),
1955 },
1956 HydroNode::CrossProduct {
1957 left,
1958 right,
1959 metadata,
1960 } => HydroNode::CrossProduct {
1961 left: Box::new(left.deep_clone(seen_tees)),
1962 right: Box::new(right.deep_clone(seen_tees)),
1963 metadata: metadata.clone(),
1964 },
1965 HydroNode::CrossSingleton {
1966 left,
1967 right,
1968 metadata,
1969 } => HydroNode::CrossSingleton {
1970 left: Box::new(left.deep_clone(seen_tees)),
1971 right: Box::new(right.deep_clone(seen_tees)),
1972 metadata: metadata.clone(),
1973 },
1974 HydroNode::Join {
1975 left,
1976 right,
1977 metadata,
1978 } => HydroNode::Join {
1979 left: Box::new(left.deep_clone(seen_tees)),
1980 right: Box::new(right.deep_clone(seen_tees)),
1981 metadata: metadata.clone(),
1982 },
1983 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1984 pos: Box::new(pos.deep_clone(seen_tees)),
1985 neg: Box::new(neg.deep_clone(seen_tees)),
1986 metadata: metadata.clone(),
1987 },
1988 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1989 pos: Box::new(pos.deep_clone(seen_tees)),
1990 neg: Box::new(neg.deep_clone(seen_tees)),
1991 metadata: metadata.clone(),
1992 },
1993 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1994 input: Box::new(input.deep_clone(seen_tees)),
1995 metadata: metadata.clone(),
1996 },
1997 HydroNode::ResolveFuturesOrdered { input, metadata } => {
1998 HydroNode::ResolveFuturesOrdered {
1999 input: Box::new(input.deep_clone(seen_tees)),
2000 metadata: metadata.clone(),
2001 }
2002 }
2003 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2004 f: f.clone(),
2005 input: Box::new(input.deep_clone(seen_tees)),
2006 metadata: metadata.clone(),
2007 },
2008 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2009 f: f.clone(),
2010 input: Box::new(input.deep_clone(seen_tees)),
2011 metadata: metadata.clone(),
2012 },
2013 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2014 f: f.clone(),
2015 input: Box::new(input.deep_clone(seen_tees)),
2016 metadata: metadata.clone(),
2017 },
2018 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2019 f: f.clone(),
2020 input: Box::new(input.deep_clone(seen_tees)),
2021 metadata: metadata.clone(),
2022 },
2023 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2024 input: Box::new(input.deep_clone(seen_tees)),
2025 metadata: metadata.clone(),
2026 },
2027 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2028 input: Box::new(input.deep_clone(seen_tees)),
2029 metadata: metadata.clone(),
2030 },
2031 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2032 f: f.clone(),
2033 input: Box::new(input.deep_clone(seen_tees)),
2034 metadata: metadata.clone(),
2035 },
2036 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2037 input: Box::new(input.deep_clone(seen_tees)),
2038 metadata: metadata.clone(),
2039 },
2040 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2041 input: Box::new(input.deep_clone(seen_tees)),
2042 metadata: metadata.clone(),
2043 },
2044 HydroNode::Fold {
2045 init,
2046 acc,
2047 input,
2048 metadata,
2049 } => HydroNode::Fold {
2050 init: init.clone(),
2051 acc: acc.clone(),
2052 input: Box::new(input.deep_clone(seen_tees)),
2053 metadata: metadata.clone(),
2054 },
2055 HydroNode::Scan {
2056 init,
2057 acc,
2058 input,
2059 metadata,
2060 } => HydroNode::Scan {
2061 init: init.clone(),
2062 acc: acc.clone(),
2063 input: Box::new(input.deep_clone(seen_tees)),
2064 metadata: metadata.clone(),
2065 },
2066 HydroNode::FoldKeyed {
2067 init,
2068 acc,
2069 input,
2070 metadata,
2071 } => HydroNode::FoldKeyed {
2072 init: init.clone(),
2073 acc: acc.clone(),
2074 input: Box::new(input.deep_clone(seen_tees)),
2075 metadata: metadata.clone(),
2076 },
2077 HydroNode::ReduceKeyedWatermark {
2078 f,
2079 input,
2080 watermark,
2081 metadata,
2082 } => HydroNode::ReduceKeyedWatermark {
2083 f: f.clone(),
2084 input: Box::new(input.deep_clone(seen_tees)),
2085 watermark: Box::new(watermark.deep_clone(seen_tees)),
2086 metadata: metadata.clone(),
2087 },
2088 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2089 f: f.clone(),
2090 input: Box::new(input.deep_clone(seen_tees)),
2091 metadata: metadata.clone(),
2092 },
2093 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2094 f: f.clone(),
2095 input: Box::new(input.deep_clone(seen_tees)),
2096 metadata: metadata.clone(),
2097 },
2098 HydroNode::Network {
2099 name,
2100 serialize_fn,
2101 instantiate_fn,
2102 deserialize_fn,
2103 input,
2104 metadata,
2105 } => HydroNode::Network {
2106 name: name.clone(),
2107 serialize_fn: serialize_fn.clone(),
2108 instantiate_fn: instantiate_fn.clone(),
2109 deserialize_fn: deserialize_fn.clone(),
2110 input: Box::new(input.deep_clone(seen_tees)),
2111 metadata: metadata.clone(),
2112 },
2113 HydroNode::ExternalInput {
2114 from_external_key,
2115 from_port_id,
2116 from_many,
2117 codec_type,
2118 port_hint,
2119 instantiate_fn,
2120 deserialize_fn,
2121 metadata,
2122 } => HydroNode::ExternalInput {
2123 from_external_key: *from_external_key,
2124 from_port_id: *from_port_id,
2125 from_many: *from_many,
2126 codec_type: codec_type.clone(),
2127 port_hint: *port_hint,
2128 instantiate_fn: instantiate_fn.clone(),
2129 deserialize_fn: deserialize_fn.clone(),
2130 metadata: metadata.clone(),
2131 },
2132 HydroNode::Counter {
2133 tag,
2134 duration,
2135 prefix,
2136 input,
2137 metadata,
2138 } => HydroNode::Counter {
2139 tag: tag.clone(),
2140 duration: duration.clone(),
2141 prefix: prefix.clone(),
2142 input: Box::new(input.deep_clone(seen_tees)),
2143 metadata: metadata.clone(),
2144 },
2145 HydroNode::EmbeddedInput { ident, metadata } => HydroNode::EmbeddedInput {
2146 ident: ident.clone(),
2147 metadata: metadata.clone(),
2148 },
2149 }
2150 }
2151
2152 #[cfg(feature = "build")]
2153 pub fn emit_core<'a, D: Deploy<'a>>(
2154 &mut self,
2155 builders_or_callback: &mut BuildersOrCallback<
2156 impl FnMut(&mut HydroRoot, &mut usize),
2157 impl FnMut(&mut HydroNode, &mut usize),
2158 >,
2159 seen_tees: &mut SeenTees,
2160 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2161 next_stmt_id: &mut usize,
2162 ) -> syn::Ident {
2163 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2164
2165 self.transform_bottom_up(
2166 &mut |node: &mut HydroNode| {
2167 let out_location = node.metadata().location_id.clone();
2168 match node {
2169 HydroNode::Placeholder => {
2170 panic!()
2171 }
2172
2173 HydroNode::Cast { .. } => {
2174 match builders_or_callback {
2177 BuildersOrCallback::Builders(_) => {}
2178 BuildersOrCallback::Callback(_, node_callback) => {
2179 node_callback(node, next_stmt_id);
2180 }
2181 }
2182
2183 *next_stmt_id += 1;
2184 }
2186
2187 HydroNode::ObserveNonDet {
2188 inner,
2189 trusted,
2190 metadata,
2191 ..
2192 } => {
2193 let inner_ident = ident_stack.pop().unwrap();
2194
2195 let observe_ident =
2196 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2197
2198 match builders_or_callback {
2199 BuildersOrCallback::Builders(graph_builders) => {
2200 graph_builders.observe_nondet(
2201 *trusted,
2202 &inner.metadata().location_id,
2203 inner_ident,
2204 &inner.metadata().collection_kind,
2205 &observe_ident,
2206 &metadata.collection_kind,
2207 &metadata.op,
2208 );
2209 }
2210 BuildersOrCallback::Callback(_, node_callback) => {
2211 node_callback(node, next_stmt_id);
2212 }
2213 }
2214
2215 *next_stmt_id += 1;
2216
2217 ident_stack.push(observe_ident);
2218 }
2219
2220 HydroNode::Batch {
2221 inner, metadata, ..
2222 } => {
2223 let inner_ident = ident_stack.pop().unwrap();
2224
2225 let batch_ident =
2226 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2227
2228 match builders_or_callback {
2229 BuildersOrCallback::Builders(graph_builders) => {
2230 graph_builders.batch(
2231 inner_ident,
2232 &inner.metadata().location_id,
2233 &inner.metadata().collection_kind,
2234 &batch_ident,
2235 &out_location,
2236 &metadata.op,
2237 );
2238 }
2239 BuildersOrCallback::Callback(_, node_callback) => {
2240 node_callback(node, next_stmt_id);
2241 }
2242 }
2243
2244 *next_stmt_id += 1;
2245
2246 ident_stack.push(batch_ident);
2247 }
2248
2249 HydroNode::YieldConcat { inner, .. } => {
2250 let inner_ident = ident_stack.pop().unwrap();
2251
2252 let yield_ident =
2253 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2254
2255 match builders_or_callback {
2256 BuildersOrCallback::Builders(graph_builders) => {
2257 graph_builders.yield_from_tick(
2258 inner_ident,
2259 &inner.metadata().location_id,
2260 &inner.metadata().collection_kind,
2261 &yield_ident,
2262 &out_location,
2263 );
2264 }
2265 BuildersOrCallback::Callback(_, node_callback) => {
2266 node_callback(node, next_stmt_id);
2267 }
2268 }
2269
2270 *next_stmt_id += 1;
2271
2272 ident_stack.push(yield_ident);
2273 }
2274
2275 HydroNode::BeginAtomic { inner, metadata } => {
2276 let inner_ident = ident_stack.pop().unwrap();
2277
2278 let begin_ident =
2279 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2280
2281 match builders_or_callback {
2282 BuildersOrCallback::Builders(graph_builders) => {
2283 graph_builders.begin_atomic(
2284 inner_ident,
2285 &inner.metadata().location_id,
2286 &inner.metadata().collection_kind,
2287 &begin_ident,
2288 &out_location,
2289 &metadata.op,
2290 );
2291 }
2292 BuildersOrCallback::Callback(_, node_callback) => {
2293 node_callback(node, next_stmt_id);
2294 }
2295 }
2296
2297 *next_stmt_id += 1;
2298
2299 ident_stack.push(begin_ident);
2300 }
2301
2302 HydroNode::EndAtomic { inner, .. } => {
2303 let inner_ident = ident_stack.pop().unwrap();
2304
2305 let end_ident =
2306 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2307
2308 match builders_or_callback {
2309 BuildersOrCallback::Builders(graph_builders) => {
2310 graph_builders.end_atomic(
2311 inner_ident,
2312 &inner.metadata().location_id,
2313 &inner.metadata().collection_kind,
2314 &end_ident,
2315 );
2316 }
2317 BuildersOrCallback::Callback(_, node_callback) => {
2318 node_callback(node, next_stmt_id);
2319 }
2320 }
2321
2322 *next_stmt_id += 1;
2323
2324 ident_stack.push(end_ident);
2325 }
2326
2327 HydroNode::Source {
2328 source, metadata, ..
2329 } => {
2330 if let HydroSource::ExternalNetwork() = source {
2331 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2332 } else {
2333 let source_ident =
2334 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2335
2336 let source_stmt = match source {
2337 HydroSource::Stream(expr) => {
2338 debug_assert!(metadata.location_id.is_top_level());
2339 parse_quote! {
2340 #source_ident = source_stream(#expr);
2341 }
2342 }
2343
2344 HydroSource::ExternalNetwork() => {
2345 unreachable!()
2346 }
2347
2348 HydroSource::Iter(expr) => {
2349 if metadata.location_id.is_top_level() {
2350 parse_quote! {
2351 #source_ident = source_iter(#expr);
2352 }
2353 } else {
2354 parse_quote! {
2356 #source_ident = source_iter(#expr) -> persist::<'static>();
2357 }
2358 }
2359 }
2360
2361 HydroSource::Spin() => {
2362 debug_assert!(metadata.location_id.is_top_level());
2363 parse_quote! {
2364 #source_ident = spin();
2365 }
2366 }
2367
2368 HydroSource::ClusterMembers(location_id) => {
2369 debug_assert!(metadata.location_id.is_top_level());
2370
2371 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
2372 D::cluster_membership_stream(location_id),
2373 &(),
2374 );
2375
2376 parse_quote! {
2377 #source_ident = source_stream(#expr);
2378 }
2379 }
2380 };
2381
2382 match builders_or_callback {
2383 BuildersOrCallback::Builders(graph_builders) => {
2384 let builder = graph_builders.get_dfir_mut(&out_location);
2385 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2386 }
2387 BuildersOrCallback::Callback(_, node_callback) => {
2388 node_callback(node, next_stmt_id);
2389 }
2390 }
2391
2392 *next_stmt_id += 1;
2393
2394 ident_stack.push(source_ident);
2395 }
2396 }
2397
2398 HydroNode::SingletonSource { value, metadata } => {
2399 let source_ident =
2400 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2401
2402 match builders_or_callback {
2403 BuildersOrCallback::Builders(graph_builders) => {
2404 let builder = graph_builders.get_dfir_mut(&out_location);
2405
2406 if metadata.location_id.is_top_level()
2407 && metadata.collection_kind.is_bounded()
2408 {
2409 builder.add_dfir(
2410 parse_quote! {
2411 #source_ident = source_iter([#value]);
2412 },
2413 None,
2414 Some(&next_stmt_id.to_string()),
2415 );
2416 } else {
2417 builder.add_dfir(
2418 parse_quote! {
2419 #source_ident = source_iter([#value]) -> persist::<'static>();
2420 },
2421 None,
2422 Some(&next_stmt_id.to_string()),
2423 );
2424 }
2425 }
2426 BuildersOrCallback::Callback(_, node_callback) => {
2427 node_callback(node, next_stmt_id);
2428 }
2429 }
2430
2431 *next_stmt_id += 1;
2432
2433 ident_stack.push(source_ident);
2434 }
2435
2436 HydroNode::CycleSource { ident, .. } => {
2437 let ident = ident.clone();
2438
2439 match builders_or_callback {
2440 BuildersOrCallback::Builders(_) => {}
2441 BuildersOrCallback::Callback(_, node_callback) => {
2442 node_callback(node, next_stmt_id);
2443 }
2444 }
2445
2446 *next_stmt_id += 1;
2448
2449 ident_stack.push(ident);
2450 }
2451
2452 HydroNode::Tee { inner, .. } => {
2453 let ret_ident = if let Some(teed_from) =
2454 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2455 {
2456 match builders_or_callback {
2457 BuildersOrCallback::Builders(_) => {}
2458 BuildersOrCallback::Callback(_, node_callback) => {
2459 node_callback(node, next_stmt_id);
2460 }
2461 }
2462
2463 teed_from.clone()
2464 } else {
2465 let inner_ident = ident_stack.pop().unwrap();
2468
2469 let tee_ident =
2470 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2471
2472 built_tees.insert(
2473 inner.0.as_ref() as *const RefCell<HydroNode>,
2474 tee_ident.clone(),
2475 );
2476
2477 match builders_or_callback {
2478 BuildersOrCallback::Builders(graph_builders) => {
2479 let builder = graph_builders.get_dfir_mut(&out_location);
2480 builder.add_dfir(
2481 parse_quote! {
2482 #tee_ident = #inner_ident -> tee();
2483 },
2484 None,
2485 Some(&next_stmt_id.to_string()),
2486 );
2487 }
2488 BuildersOrCallback::Callback(_, node_callback) => {
2489 node_callback(node, next_stmt_id);
2490 }
2491 }
2492
2493 tee_ident
2494 };
2495
2496 *next_stmt_id += 1;
2500 ident_stack.push(ret_ident);
2501 }
2502
2503 HydroNode::Chain { .. } => {
2504 let second_ident = ident_stack.pop().unwrap();
2506 let first_ident = ident_stack.pop().unwrap();
2507
2508 let chain_ident =
2509 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2510
2511 match builders_or_callback {
2512 BuildersOrCallback::Builders(graph_builders) => {
2513 let builder = graph_builders.get_dfir_mut(&out_location);
2514 builder.add_dfir(
2515 parse_quote! {
2516 #chain_ident = chain();
2517 #first_ident -> [0]#chain_ident;
2518 #second_ident -> [1]#chain_ident;
2519 },
2520 None,
2521 Some(&next_stmt_id.to_string()),
2522 );
2523 }
2524 BuildersOrCallback::Callback(_, node_callback) => {
2525 node_callback(node, next_stmt_id);
2526 }
2527 }
2528
2529 *next_stmt_id += 1;
2530
2531 ident_stack.push(chain_ident);
2532 }
2533
2534 HydroNode::ChainFirst { .. } => {
2535 let second_ident = ident_stack.pop().unwrap();
2536 let first_ident = ident_stack.pop().unwrap();
2537
2538 let chain_ident =
2539 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2540
2541 match builders_or_callback {
2542 BuildersOrCallback::Builders(graph_builders) => {
2543 let builder = graph_builders.get_dfir_mut(&out_location);
2544 builder.add_dfir(
2545 parse_quote! {
2546 #chain_ident = chain_first_n(1);
2547 #first_ident -> [0]#chain_ident;
2548 #second_ident -> [1]#chain_ident;
2549 },
2550 None,
2551 Some(&next_stmt_id.to_string()),
2552 );
2553 }
2554 BuildersOrCallback::Callback(_, node_callback) => {
2555 node_callback(node, next_stmt_id);
2556 }
2557 }
2558
2559 *next_stmt_id += 1;
2560
2561 ident_stack.push(chain_ident);
2562 }
2563
2564 HydroNode::CrossSingleton { right, .. } => {
2565 let right_ident = ident_stack.pop().unwrap();
2566 let left_ident = ident_stack.pop().unwrap();
2567
2568 let cross_ident =
2569 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2570
2571 match builders_or_callback {
2572 BuildersOrCallback::Builders(graph_builders) => {
2573 let builder = graph_builders.get_dfir_mut(&out_location);
2574
2575 if right.metadata().location_id.is_top_level()
2576 && right.metadata().collection_kind.is_bounded()
2577 {
2578 builder.add_dfir(
2579 parse_quote! {
2580 #cross_ident = cross_singleton();
2581 #left_ident -> [input]#cross_ident;
2582 #right_ident -> persist::<'static>() -> [single]#cross_ident;
2583 },
2584 None,
2585 Some(&next_stmt_id.to_string()),
2586 );
2587 } else {
2588 builder.add_dfir(
2589 parse_quote! {
2590 #cross_ident = cross_singleton();
2591 #left_ident -> [input]#cross_ident;
2592 #right_ident -> [single]#cross_ident;
2593 },
2594 None,
2595 Some(&next_stmt_id.to_string()),
2596 );
2597 }
2598 }
2599 BuildersOrCallback::Callback(_, node_callback) => {
2600 node_callback(node, next_stmt_id);
2601 }
2602 }
2603
2604 *next_stmt_id += 1;
2605
2606 ident_stack.push(cross_ident);
2607 }
2608
2609 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2610 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2611 parse_quote!(cross_join_multiset)
2612 } else {
2613 parse_quote!(join_multiset)
2614 };
2615
2616 let (HydroNode::CrossProduct { left, right, .. }
2617 | HydroNode::Join { left, right, .. }) = node
2618 else {
2619 unreachable!()
2620 };
2621
2622 let is_top_level = left.metadata().location_id.is_top_level()
2623 && right.metadata().location_id.is_top_level();
2624 let left_lifetime = if left.metadata().location_id.is_top_level() {
2625 quote!('static)
2626 } else {
2627 quote!('tick)
2628 };
2629
2630 let right_lifetime = if right.metadata().location_id.is_top_level() {
2631 quote!('static)
2632 } else {
2633 quote!('tick)
2634 };
2635
2636 let right_ident = ident_stack.pop().unwrap();
2637 let left_ident = ident_stack.pop().unwrap();
2638
2639 let stream_ident =
2640 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2641
2642 match builders_or_callback {
2643 BuildersOrCallback::Builders(graph_builders) => {
2644 let builder = graph_builders.get_dfir_mut(&out_location);
2645 builder.add_dfir(
2646 if is_top_level {
2647 parse_quote! {
2650 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2651 #left_ident -> [0]#stream_ident;
2652 #right_ident -> [1]#stream_ident;
2653 }
2654 } else {
2655 parse_quote! {
2656 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2657 #left_ident -> [0]#stream_ident;
2658 #right_ident -> [1]#stream_ident;
2659 }
2660 }
2661 ,
2662 None,
2663 Some(&next_stmt_id.to_string()),
2664 );
2665 }
2666 BuildersOrCallback::Callback(_, node_callback) => {
2667 node_callback(node, next_stmt_id);
2668 }
2669 }
2670
2671 *next_stmt_id += 1;
2672
2673 ident_stack.push(stream_ident);
2674 }
2675
2676 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2677 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2678 parse_quote!(difference)
2679 } else {
2680 parse_quote!(anti_join)
2681 };
2682
2683 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2684 node
2685 else {
2686 unreachable!()
2687 };
2688
2689 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2690 quote!('static)
2691 } else {
2692 quote!('tick)
2693 };
2694
2695 let neg_ident = ident_stack.pop().unwrap();
2696 let pos_ident = ident_stack.pop().unwrap();
2697
2698 let stream_ident =
2699 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2700
2701 match builders_or_callback {
2702 BuildersOrCallback::Builders(graph_builders) => {
2703 let builder = graph_builders.get_dfir_mut(&out_location);
2704 builder.add_dfir(
2705 parse_quote! {
2706 #stream_ident = #operator::<'tick, #neg_lifetime>();
2707 #pos_ident -> [pos]#stream_ident;
2708 #neg_ident -> [neg]#stream_ident;
2709 },
2710 None,
2711 Some(&next_stmt_id.to_string()),
2712 );
2713 }
2714 BuildersOrCallback::Callback(_, node_callback) => {
2715 node_callback(node, next_stmt_id);
2716 }
2717 }
2718
2719 *next_stmt_id += 1;
2720
2721 ident_stack.push(stream_ident);
2722 }
2723
2724 HydroNode::ResolveFutures { .. } => {
2725 let input_ident = ident_stack.pop().unwrap();
2726
2727 let futures_ident =
2728 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2729
2730 match builders_or_callback {
2731 BuildersOrCallback::Builders(graph_builders) => {
2732 let builder = graph_builders.get_dfir_mut(&out_location);
2733 builder.add_dfir(
2734 parse_quote! {
2735 #futures_ident = #input_ident -> resolve_futures();
2736 },
2737 None,
2738 Some(&next_stmt_id.to_string()),
2739 );
2740 }
2741 BuildersOrCallback::Callback(_, node_callback) => {
2742 node_callback(node, next_stmt_id);
2743 }
2744 }
2745
2746 *next_stmt_id += 1;
2747
2748 ident_stack.push(futures_ident);
2749 }
2750
2751 HydroNode::ResolveFuturesOrdered { .. } => {
2752 let input_ident = ident_stack.pop().unwrap();
2753
2754 let futures_ident =
2755 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2756
2757 match builders_or_callback {
2758 BuildersOrCallback::Builders(graph_builders) => {
2759 let builder = graph_builders.get_dfir_mut(&out_location);
2760 builder.add_dfir(
2761 parse_quote! {
2762 #futures_ident = #input_ident -> resolve_futures_ordered();
2763 },
2764 None,
2765 Some(&next_stmt_id.to_string()),
2766 );
2767 }
2768 BuildersOrCallback::Callback(_, node_callback) => {
2769 node_callback(node, next_stmt_id);
2770 }
2771 }
2772
2773 *next_stmt_id += 1;
2774
2775 ident_stack.push(futures_ident);
2776 }
2777
2778 HydroNode::Map { f, .. } => {
2779 let input_ident = ident_stack.pop().unwrap();
2780
2781 let map_ident =
2782 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2783
2784 match builders_or_callback {
2785 BuildersOrCallback::Builders(graph_builders) => {
2786 let builder = graph_builders.get_dfir_mut(&out_location);
2787 builder.add_dfir(
2788 parse_quote! {
2789 #map_ident = #input_ident -> map(#f);
2790 },
2791 None,
2792 Some(&next_stmt_id.to_string()),
2793 );
2794 }
2795 BuildersOrCallback::Callback(_, node_callback) => {
2796 node_callback(node, next_stmt_id);
2797 }
2798 }
2799
2800 *next_stmt_id += 1;
2801
2802 ident_stack.push(map_ident);
2803 }
2804
2805 HydroNode::FlatMap { f, .. } => {
2806 let input_ident = ident_stack.pop().unwrap();
2807
2808 let flat_map_ident =
2809 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2810
2811 match builders_or_callback {
2812 BuildersOrCallback::Builders(graph_builders) => {
2813 let builder = graph_builders.get_dfir_mut(&out_location);
2814 builder.add_dfir(
2815 parse_quote! {
2816 #flat_map_ident = #input_ident -> flat_map(#f);
2817 },
2818 None,
2819 Some(&next_stmt_id.to_string()),
2820 );
2821 }
2822 BuildersOrCallback::Callback(_, node_callback) => {
2823 node_callback(node, next_stmt_id);
2824 }
2825 }
2826
2827 *next_stmt_id += 1;
2828
2829 ident_stack.push(flat_map_ident);
2830 }
2831
2832 HydroNode::Filter { f, .. } => {
2833 let input_ident = ident_stack.pop().unwrap();
2834
2835 let filter_ident =
2836 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2837
2838 match builders_or_callback {
2839 BuildersOrCallback::Builders(graph_builders) => {
2840 let builder = graph_builders.get_dfir_mut(&out_location);
2841 builder.add_dfir(
2842 parse_quote! {
2843 #filter_ident = #input_ident -> filter(#f);
2844 },
2845 None,
2846 Some(&next_stmt_id.to_string()),
2847 );
2848 }
2849 BuildersOrCallback::Callback(_, node_callback) => {
2850 node_callback(node, next_stmt_id);
2851 }
2852 }
2853
2854 *next_stmt_id += 1;
2855
2856 ident_stack.push(filter_ident);
2857 }
2858
2859 HydroNode::FilterMap { f, .. } => {
2860 let input_ident = ident_stack.pop().unwrap();
2861
2862 let filter_map_ident =
2863 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2864
2865 match builders_or_callback {
2866 BuildersOrCallback::Builders(graph_builders) => {
2867 let builder = graph_builders.get_dfir_mut(&out_location);
2868 builder.add_dfir(
2869 parse_quote! {
2870 #filter_map_ident = #input_ident -> filter_map(#f);
2871 },
2872 None,
2873 Some(&next_stmt_id.to_string()),
2874 );
2875 }
2876 BuildersOrCallback::Callback(_, node_callback) => {
2877 node_callback(node, next_stmt_id);
2878 }
2879 }
2880
2881 *next_stmt_id += 1;
2882
2883 ident_stack.push(filter_map_ident);
2884 }
2885
2886 HydroNode::Sort { .. } => {
2887 let input_ident = ident_stack.pop().unwrap();
2888
2889 let sort_ident =
2890 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2891
2892 match builders_or_callback {
2893 BuildersOrCallback::Builders(graph_builders) => {
2894 let builder = graph_builders.get_dfir_mut(&out_location);
2895 builder.add_dfir(
2896 parse_quote! {
2897 #sort_ident = #input_ident -> sort();
2898 },
2899 None,
2900 Some(&next_stmt_id.to_string()),
2901 );
2902 }
2903 BuildersOrCallback::Callback(_, node_callback) => {
2904 node_callback(node, next_stmt_id);
2905 }
2906 }
2907
2908 *next_stmt_id += 1;
2909
2910 ident_stack.push(sort_ident);
2911 }
2912
2913 HydroNode::DeferTick { .. } => {
2914 let input_ident = ident_stack.pop().unwrap();
2915
2916 let defer_tick_ident =
2917 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2918
2919 match builders_or_callback {
2920 BuildersOrCallback::Builders(graph_builders) => {
2921 let builder = graph_builders.get_dfir_mut(&out_location);
2922 builder.add_dfir(
2923 parse_quote! {
2924 #defer_tick_ident = #input_ident -> defer_tick_lazy();
2925 },
2926 None,
2927 Some(&next_stmt_id.to_string()),
2928 );
2929 }
2930 BuildersOrCallback::Callback(_, node_callback) => {
2931 node_callback(node, next_stmt_id);
2932 }
2933 }
2934
2935 *next_stmt_id += 1;
2936
2937 ident_stack.push(defer_tick_ident);
2938 }
2939
2940 HydroNode::Enumerate { input, .. } => {
2941 let input_ident = ident_stack.pop().unwrap();
2942
2943 let enumerate_ident =
2944 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2945
2946 match builders_or_callback {
2947 BuildersOrCallback::Builders(graph_builders) => {
2948 let builder = graph_builders.get_dfir_mut(&out_location);
2949 let lifetime = if input.metadata().location_id.is_top_level() {
2950 quote!('static)
2951 } else {
2952 quote!('tick)
2953 };
2954 builder.add_dfir(
2955 parse_quote! {
2956 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2957 },
2958 None,
2959 Some(&next_stmt_id.to_string()),
2960 );
2961 }
2962 BuildersOrCallback::Callback(_, node_callback) => {
2963 node_callback(node, next_stmt_id);
2964 }
2965 }
2966
2967 *next_stmt_id += 1;
2968
2969 ident_stack.push(enumerate_ident);
2970 }
2971
2972 HydroNode::Inspect { f, .. } => {
2973 let input_ident = ident_stack.pop().unwrap();
2974
2975 let inspect_ident =
2976 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2977
2978 match builders_or_callback {
2979 BuildersOrCallback::Builders(graph_builders) => {
2980 let builder = graph_builders.get_dfir_mut(&out_location);
2981 builder.add_dfir(
2982 parse_quote! {
2983 #inspect_ident = #input_ident -> inspect(#f);
2984 },
2985 None,
2986 Some(&next_stmt_id.to_string()),
2987 );
2988 }
2989 BuildersOrCallback::Callback(_, node_callback) => {
2990 node_callback(node, next_stmt_id);
2991 }
2992 }
2993
2994 *next_stmt_id += 1;
2995
2996 ident_stack.push(inspect_ident);
2997 }
2998
2999 HydroNode::Unique { input, .. } => {
3000 let input_ident = ident_stack.pop().unwrap();
3001
3002 let unique_ident =
3003 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3004
3005 match builders_or_callback {
3006 BuildersOrCallback::Builders(graph_builders) => {
3007 let builder = graph_builders.get_dfir_mut(&out_location);
3008 let lifetime = if input.metadata().location_id.is_top_level() {
3009 quote!('static)
3010 } else {
3011 quote!('tick)
3012 };
3013
3014 builder.add_dfir(
3015 parse_quote! {
3016 #unique_ident = #input_ident -> unique::<#lifetime>();
3017 },
3018 None,
3019 Some(&next_stmt_id.to_string()),
3020 );
3021 }
3022 BuildersOrCallback::Callback(_, node_callback) => {
3023 node_callback(node, next_stmt_id);
3024 }
3025 }
3026
3027 *next_stmt_id += 1;
3028
3029 ident_stack.push(unique_ident);
3030 }
3031
3032 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3033 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3034 if input.metadata().location_id.is_top_level()
3035 && input.metadata().collection_kind.is_bounded()
3036 {
3037 parse_quote!(fold_no_replay)
3038 } else {
3039 parse_quote!(fold)
3040 }
3041 } else if matches!(node, HydroNode::Scan { .. }) {
3042 parse_quote!(scan)
3043 } else if let HydroNode::FoldKeyed { input, .. } = node {
3044 if input.metadata().location_id.is_top_level()
3045 && input.metadata().collection_kind.is_bounded()
3046 {
3047 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3048 } else {
3049 parse_quote!(fold_keyed)
3050 }
3051 } else {
3052 unreachable!()
3053 };
3054
3055 let (HydroNode::Fold { input, .. }
3056 | HydroNode::FoldKeyed { input, .. }
3057 | HydroNode::Scan { input, .. }) = node
3058 else {
3059 unreachable!()
3060 };
3061
3062 let lifetime = if input.metadata().location_id.is_top_level() {
3063 quote!('static)
3064 } else {
3065 quote!('tick)
3066 };
3067
3068 let input_ident = ident_stack.pop().unwrap();
3069
3070 let (HydroNode::Fold { init, acc, .. }
3071 | HydroNode::FoldKeyed { init, acc, .. }
3072 | HydroNode::Scan { init, acc, .. }) = &*node
3073 else {
3074 unreachable!()
3075 };
3076
3077 let fold_ident =
3078 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3079
3080 match builders_or_callback {
3081 BuildersOrCallback::Builders(graph_builders) => {
3082 if matches!(node, HydroNode::Fold { .. })
3083 && node.metadata().location_id.is_top_level()
3084 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3085 && graph_builders.singleton_intermediates()
3086 && !node.metadata().collection_kind.is_bounded()
3087 {
3088 let builder = graph_builders.get_dfir_mut(&out_location);
3089
3090 let acc: syn::Expr = parse_quote!({
3091 let mut __inner = #acc;
3092 move |__state, __value| {
3093 __inner(__state, __value);
3094 Some(__state.clone())
3095 }
3096 });
3097
3098 builder.add_dfir(
3099 parse_quote! {
3100 source_iter([(#init)()]) -> [0]#fold_ident;
3101 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3102 #fold_ident = chain();
3103 },
3104 None,
3105 Some(&next_stmt_id.to_string()),
3106 );
3107 } else if matches!(node, HydroNode::FoldKeyed { .. })
3108 && node.metadata().location_id.is_top_level()
3109 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3110 && graph_builders.singleton_intermediates()
3111 && !node.metadata().collection_kind.is_bounded()
3112 {
3113 let builder = graph_builders.get_dfir_mut(&out_location);
3114
3115 let acc: syn::Expr = parse_quote!({
3116 let mut __init = #init;
3117 let mut __inner = #acc;
3118 move |__state, __kv: (_, _)| {
3119 let __state = __state
3121 .entry(::std::clone::Clone::clone(&__kv.0))
3122 .or_insert_with(|| (__init)());
3123 __inner(__state, __kv.1);
3124 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3125 }
3126 });
3127
3128 builder.add_dfir(
3129 parse_quote! {
3130 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3131 },
3132 None,
3133 Some(&next_stmt_id.to_string()),
3134 );
3135 } else {
3136 let builder = graph_builders.get_dfir_mut(&out_location);
3137 builder.add_dfir(
3138 parse_quote! {
3139 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3140 },
3141 None,
3142 Some(&next_stmt_id.to_string()),
3143 );
3144 }
3145 }
3146 BuildersOrCallback::Callback(_, node_callback) => {
3147 node_callback(node, next_stmt_id);
3148 }
3149 }
3150
3151 *next_stmt_id += 1;
3152
3153 ident_stack.push(fold_ident);
3154 }
3155
3156 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3157 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3158 if input.metadata().location_id.is_top_level()
3159 && input.metadata().collection_kind.is_bounded()
3160 {
3161 parse_quote!(reduce_no_replay)
3162 } else {
3163 parse_quote!(reduce)
3164 }
3165 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3166 if input.metadata().location_id.is_top_level()
3167 && input.metadata().collection_kind.is_bounded()
3168 {
3169 todo!(
3170 "Calling keyed reduce on a top-level bounded collection is not supported"
3171 )
3172 } else {
3173 parse_quote!(reduce_keyed)
3174 }
3175 } else {
3176 unreachable!()
3177 };
3178
3179 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3180 else {
3181 unreachable!()
3182 };
3183
3184 let lifetime = if input.metadata().location_id.is_top_level() {
3185 quote!('static)
3186 } else {
3187 quote!('tick)
3188 };
3189
3190 let input_ident = ident_stack.pop().unwrap();
3191
3192 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3193 else {
3194 unreachable!()
3195 };
3196
3197 let reduce_ident =
3198 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3199
3200 match builders_or_callback {
3201 BuildersOrCallback::Builders(graph_builders) => {
3202 if matches!(node, HydroNode::Reduce { .. })
3203 && node.metadata().location_id.is_top_level()
3204 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3205 && graph_builders.singleton_intermediates()
3206 && !node.metadata().collection_kind.is_bounded()
3207 {
3208 todo!(
3209 "Reduce with optional intermediates is not yet supported in simulator"
3210 );
3211 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3212 && node.metadata().location_id.is_top_level()
3213 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3214 && graph_builders.singleton_intermediates()
3215 && !node.metadata().collection_kind.is_bounded()
3216 {
3217 todo!(
3218 "Reduce keyed with optional intermediates is not yet supported in simulator"
3219 );
3220 } else {
3221 let builder = graph_builders.get_dfir_mut(&out_location);
3222 builder.add_dfir(
3223 parse_quote! {
3224 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3225 },
3226 None,
3227 Some(&next_stmt_id.to_string()),
3228 );
3229 }
3230 }
3231 BuildersOrCallback::Callback(_, node_callback) => {
3232 node_callback(node, next_stmt_id);
3233 }
3234 }
3235
3236 *next_stmt_id += 1;
3237
3238 ident_stack.push(reduce_ident);
3239 }
3240
3241 HydroNode::ReduceKeyedWatermark {
3242 f,
3243 input,
3244 metadata,
3245 ..
3246 } => {
3247 let lifetime = if input.metadata().location_id.is_top_level() {
3248 quote!('static)
3249 } else {
3250 quote!('tick)
3251 };
3252
3253 let watermark_ident = ident_stack.pop().unwrap();
3255 let input_ident = ident_stack.pop().unwrap();
3256
3257 let chain_ident = syn::Ident::new(
3258 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3259 Span::call_site(),
3260 );
3261
3262 let fold_ident =
3263 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3264
3265 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3266 && input.metadata().collection_kind.is_bounded()
3267 {
3268 parse_quote!(fold_no_replay)
3269 } else {
3270 parse_quote!(fold)
3271 };
3272
3273 match builders_or_callback {
3274 BuildersOrCallback::Builders(graph_builders) => {
3275 if metadata.location_id.is_top_level()
3276 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3277 && graph_builders.singleton_intermediates()
3278 && !metadata.collection_kind.is_bounded()
3279 {
3280 todo!(
3281 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3282 )
3283 } else {
3284 let builder = graph_builders.get_dfir_mut(&out_location);
3285 builder.add_dfir(
3286 parse_quote! {
3287 #chain_ident = chain();
3288 #input_ident
3289 -> map(|x| (Some(x), None))
3290 -> [0]#chain_ident;
3291 #watermark_ident
3292 -> map(|watermark| (None, Some(watermark)))
3293 -> [1]#chain_ident;
3294
3295 #fold_ident = #chain_ident
3296 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3297 let __reduce_keyed_fn = #f;
3298 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3299 if let Some((k, v)) = opt_payload {
3300 if let Some(curr_watermark) = *opt_curr_watermark {
3301 if k <= curr_watermark {
3302 return;
3303 }
3304 }
3305 match map.entry(k) {
3306 ::std::collections::hash_map::Entry::Vacant(e) => {
3307 e.insert(v);
3308 }
3309 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3310 __reduce_keyed_fn(e.get_mut(), v);
3311 }
3312 }
3313 } else {
3314 let watermark = opt_watermark.unwrap();
3315 if let Some(curr_watermark) = *opt_curr_watermark {
3316 if watermark <= curr_watermark {
3317 return;
3318 }
3319 }
3320 *opt_curr_watermark = opt_watermark;
3321 map.retain(|k, _| *k > watermark);
3322 }
3323 }
3324 })
3325 -> flat_map(|(map, _curr_watermark)| map);
3326 },
3327 None,
3328 Some(&next_stmt_id.to_string()),
3329 );
3330 }
3331 }
3332 BuildersOrCallback::Callback(_, node_callback) => {
3333 node_callback(node, next_stmt_id);
3334 }
3335 }
3336
3337 *next_stmt_id += 1;
3338
3339 ident_stack.push(fold_ident);
3340 }
3341
3342 HydroNode::Network {
3343 serialize_fn: serialize_pipeline,
3344 instantiate_fn,
3345 deserialize_fn: deserialize_pipeline,
3346 input,
3347 ..
3348 } => {
3349 let input_ident = ident_stack.pop().unwrap();
3350
3351 let receiver_stream_ident =
3352 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3353
3354 match builders_or_callback {
3355 BuildersOrCallback::Builders(graph_builders) => {
3356 let (sink_expr, source_expr) = match instantiate_fn {
3357 DebugInstantiate::Building => (
3358 syn::parse_quote!(DUMMY_SINK),
3359 syn::parse_quote!(DUMMY_SOURCE),
3360 ),
3361
3362 DebugInstantiate::Finalized(finalized) => {
3363 (finalized.sink.clone(), finalized.source.clone())
3364 }
3365 };
3366
3367 graph_builders.create_network(
3368 &input.metadata().location_id,
3369 &out_location,
3370 input_ident,
3371 &receiver_stream_ident,
3372 serialize_pipeline.as_ref(),
3373 sink_expr,
3374 source_expr,
3375 deserialize_pipeline.as_ref(),
3376 *next_stmt_id,
3377 );
3378 }
3379 BuildersOrCallback::Callback(_, node_callback) => {
3380 node_callback(node, next_stmt_id);
3381 }
3382 }
3383
3384 *next_stmt_id += 1;
3385
3386 ident_stack.push(receiver_stream_ident);
3387 }
3388
3389 HydroNode::ExternalInput {
3390 instantiate_fn,
3391 deserialize_fn: deserialize_pipeline,
3392 ..
3393 } => {
3394 let receiver_stream_ident =
3395 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3396
3397 match builders_or_callback {
3398 BuildersOrCallback::Builders(graph_builders) => {
3399 let (_, source_expr) = match instantiate_fn {
3400 DebugInstantiate::Building => (
3401 syn::parse_quote!(DUMMY_SINK),
3402 syn::parse_quote!(DUMMY_SOURCE),
3403 ),
3404
3405 DebugInstantiate::Finalized(finalized) => {
3406 (finalized.sink.clone(), finalized.source.clone())
3407 }
3408 };
3409
3410 graph_builders.create_external_source(
3411 &out_location,
3412 source_expr,
3413 &receiver_stream_ident,
3414 deserialize_pipeline.as_ref(),
3415 *next_stmt_id,
3416 );
3417 }
3418 BuildersOrCallback::Callback(_, node_callback) => {
3419 node_callback(node, next_stmt_id);
3420 }
3421 }
3422
3423 *next_stmt_id += 1;
3424
3425 ident_stack.push(receiver_stream_ident);
3426 }
3427
3428 HydroNode::Counter {
3429 tag,
3430 duration,
3431 prefix,
3432 ..
3433 } => {
3434 let input_ident = ident_stack.pop().unwrap();
3435
3436 let counter_ident =
3437 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3438
3439 match builders_or_callback {
3440 BuildersOrCallback::Builders(graph_builders) => {
3441 let builder = graph_builders.get_dfir_mut(&out_location);
3442 builder.add_dfir(
3443 parse_quote! {
3444 #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3445 },
3446 None,
3447 Some(&next_stmt_id.to_string()),
3448 );
3449 }
3450 BuildersOrCallback::Callback(_, node_callback) => {
3451 node_callback(node, next_stmt_id);
3452 }
3453 }
3454
3455 *next_stmt_id += 1;
3456
3457 ident_stack.push(counter_ident);
3458 }
3459
3460 HydroNode::EmbeddedInput { ident, .. } => {
3461 let source_ident =
3462 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3463
3464 let ident = ident.clone();
3465
3466 match builders_or_callback {
3467 BuildersOrCallback::Builders(graph_builders) => {
3468 let builder = graph_builders.get_dfir_mut(&out_location);
3469 builder.add_dfir(
3470 parse_quote! {
3471 #source_ident = source_stream(#ident);
3472 },
3473 None,
3474 Some(&next_stmt_id.to_string()),
3475 );
3476 }
3477 BuildersOrCallback::Callback(_, node_callback) => {
3478 node_callback(node, next_stmt_id);
3479 }
3480 }
3481
3482 *next_stmt_id += 1;
3483
3484 ident_stack.push(source_ident);
3485 }
3486 }
3487 },
3488 seen_tees,
3489 false,
3490 );
3491
3492 ident_stack
3493 .pop()
3494 .expect("ident_stack should have exactly one element after traversal")
3495 }
3496
3497 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3498 match self {
3499 HydroNode::Placeholder => {
3500 panic!()
3501 }
3502 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3503 HydroNode::Source { source, .. } => match source {
3504 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3505 HydroSource::ExternalNetwork()
3506 | HydroSource::Spin()
3507 | HydroSource::ClusterMembers(_) => {} },
3509 HydroNode::SingletonSource { value, .. } => {
3510 transform(value);
3511 }
3512 HydroNode::CycleSource { .. }
3513 | HydroNode::Tee { .. }
3514 | HydroNode::YieldConcat { .. }
3515 | HydroNode::BeginAtomic { .. }
3516 | HydroNode::EndAtomic { .. }
3517 | HydroNode::Batch { .. }
3518 | HydroNode::Chain { .. }
3519 | HydroNode::ChainFirst { .. }
3520 | HydroNode::CrossProduct { .. }
3521 | HydroNode::CrossSingleton { .. }
3522 | HydroNode::ResolveFutures { .. }
3523 | HydroNode::ResolveFuturesOrdered { .. }
3524 | HydroNode::Join { .. }
3525 | HydroNode::Difference { .. }
3526 | HydroNode::AntiJoin { .. }
3527 | HydroNode::DeferTick { .. }
3528 | HydroNode::Enumerate { .. }
3529 | HydroNode::Unique { .. }
3530 | HydroNode::Sort { .. } => {}
3531 HydroNode::Map { f, .. }
3532 | HydroNode::FlatMap { f, .. }
3533 | HydroNode::Filter { f, .. }
3534 | HydroNode::FilterMap { f, .. }
3535 | HydroNode::Inspect { f, .. }
3536 | HydroNode::Reduce { f, .. }
3537 | HydroNode::ReduceKeyed { f, .. }
3538 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3539 transform(f);
3540 }
3541 HydroNode::Fold { init, acc, .. }
3542 | HydroNode::Scan { init, acc, .. }
3543 | HydroNode::FoldKeyed { init, acc, .. } => {
3544 transform(init);
3545 transform(acc);
3546 }
3547 HydroNode::Network {
3548 serialize_fn,
3549 deserialize_fn,
3550 ..
3551 } => {
3552 if let Some(serialize_fn) = serialize_fn {
3553 transform(serialize_fn);
3554 }
3555 if let Some(deserialize_fn) = deserialize_fn {
3556 transform(deserialize_fn);
3557 }
3558 }
3559 HydroNode::ExternalInput { deserialize_fn, .. } => {
3560 if let Some(deserialize_fn) = deserialize_fn {
3561 transform(deserialize_fn);
3562 }
3563 }
3564 HydroNode::EmbeddedInput { .. } => {}
3565 HydroNode::Counter { duration, .. } => {
3566 transform(duration);
3567 }
3568 }
3569 }
3570
3571 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3572 &self.metadata().op
3573 }
3574
3575 pub fn metadata(&self) -> &HydroIrMetadata {
3576 match self {
3577 HydroNode::Placeholder => {
3578 panic!()
3579 }
3580 HydroNode::Cast { metadata, .. } => metadata,
3581 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3582 HydroNode::Source { metadata, .. } => metadata,
3583 HydroNode::SingletonSource { metadata, .. } => metadata,
3584 HydroNode::CycleSource { metadata, .. } => metadata,
3585 HydroNode::Tee { metadata, .. } => metadata,
3586 HydroNode::YieldConcat { metadata, .. } => metadata,
3587 HydroNode::BeginAtomic { metadata, .. } => metadata,
3588 HydroNode::EndAtomic { metadata, .. } => metadata,
3589 HydroNode::Batch { metadata, .. } => metadata,
3590 HydroNode::Chain { metadata, .. } => metadata,
3591 HydroNode::ChainFirst { metadata, .. } => metadata,
3592 HydroNode::CrossProduct { metadata, .. } => metadata,
3593 HydroNode::CrossSingleton { metadata, .. } => metadata,
3594 HydroNode::Join { metadata, .. } => metadata,
3595 HydroNode::Difference { metadata, .. } => metadata,
3596 HydroNode::AntiJoin { metadata, .. } => metadata,
3597 HydroNode::ResolveFutures { metadata, .. } => metadata,
3598 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3599 HydroNode::Map { metadata, .. } => metadata,
3600 HydroNode::FlatMap { metadata, .. } => metadata,
3601 HydroNode::Filter { metadata, .. } => metadata,
3602 HydroNode::FilterMap { metadata, .. } => metadata,
3603 HydroNode::DeferTick { metadata, .. } => metadata,
3604 HydroNode::Enumerate { metadata, .. } => metadata,
3605 HydroNode::Inspect { metadata, .. } => metadata,
3606 HydroNode::Unique { metadata, .. } => metadata,
3607 HydroNode::Sort { metadata, .. } => metadata,
3608 HydroNode::Scan { metadata, .. } => metadata,
3609 HydroNode::Fold { metadata, .. } => metadata,
3610 HydroNode::FoldKeyed { metadata, .. } => metadata,
3611 HydroNode::Reduce { metadata, .. } => metadata,
3612 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3613 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3614 HydroNode::ExternalInput { metadata, .. } => metadata,
3615 HydroNode::Network { metadata, .. } => metadata,
3616 HydroNode::Counter { metadata, .. } => metadata,
3617 HydroNode::EmbeddedInput { metadata, .. } => metadata,
3618 }
3619 }
3620
3621 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3622 &mut self.metadata_mut().op
3623 }
3624
3625 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3626 match self {
3627 HydroNode::Placeholder => {
3628 panic!()
3629 }
3630 HydroNode::Cast { metadata, .. } => metadata,
3631 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3632 HydroNode::Source { metadata, .. } => metadata,
3633 HydroNode::SingletonSource { metadata, .. } => metadata,
3634 HydroNode::CycleSource { metadata, .. } => metadata,
3635 HydroNode::Tee { metadata, .. } => metadata,
3636 HydroNode::YieldConcat { metadata, .. } => metadata,
3637 HydroNode::BeginAtomic { metadata, .. } => metadata,
3638 HydroNode::EndAtomic { metadata, .. } => metadata,
3639 HydroNode::Batch { metadata, .. } => metadata,
3640 HydroNode::Chain { metadata, .. } => metadata,
3641 HydroNode::ChainFirst { metadata, .. } => metadata,
3642 HydroNode::CrossProduct { metadata, .. } => metadata,
3643 HydroNode::CrossSingleton { metadata, .. } => metadata,
3644 HydroNode::Join { metadata, .. } => metadata,
3645 HydroNode::Difference { metadata, .. } => metadata,
3646 HydroNode::AntiJoin { metadata, .. } => metadata,
3647 HydroNode::ResolveFutures { metadata, .. } => metadata,
3648 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3649 HydroNode::Map { metadata, .. } => metadata,
3650 HydroNode::FlatMap { metadata, .. } => metadata,
3651 HydroNode::Filter { metadata, .. } => metadata,
3652 HydroNode::FilterMap { metadata, .. } => metadata,
3653 HydroNode::DeferTick { metadata, .. } => metadata,
3654 HydroNode::Enumerate { metadata, .. } => metadata,
3655 HydroNode::Inspect { metadata, .. } => metadata,
3656 HydroNode::Unique { metadata, .. } => metadata,
3657 HydroNode::Sort { metadata, .. } => metadata,
3658 HydroNode::Scan { metadata, .. } => metadata,
3659 HydroNode::Fold { metadata, .. } => metadata,
3660 HydroNode::FoldKeyed { metadata, .. } => metadata,
3661 HydroNode::Reduce { metadata, .. } => metadata,
3662 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3663 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3664 HydroNode::ExternalInput { metadata, .. } => metadata,
3665 HydroNode::Network { metadata, .. } => metadata,
3666 HydroNode::Counter { metadata, .. } => metadata,
3667 HydroNode::EmbeddedInput { metadata, .. } => metadata,
3668 }
3669 }
3670
3671 pub fn input(&self) -> Vec<&HydroNode> {
3672 match self {
3673 HydroNode::Placeholder => {
3674 panic!()
3675 }
3676 HydroNode::Source { .. }
3677 | HydroNode::SingletonSource { .. }
3678 | HydroNode::ExternalInput { .. }
3679 | HydroNode::CycleSource { .. }
3680 | HydroNode::EmbeddedInput { .. }
3681 | HydroNode::Tee { .. } => {
3682 vec![]
3684 }
3685 HydroNode::Cast { inner, .. }
3686 | HydroNode::ObserveNonDet { inner, .. }
3687 | HydroNode::YieldConcat { inner, .. }
3688 | HydroNode::BeginAtomic { inner, .. }
3689 | HydroNode::EndAtomic { inner, .. }
3690 | HydroNode::Batch { inner, .. } => {
3691 vec![inner]
3692 }
3693 HydroNode::Chain { first, second, .. } => {
3694 vec![first, second]
3695 }
3696 HydroNode::ChainFirst { first, second, .. } => {
3697 vec![first, second]
3698 }
3699 HydroNode::CrossProduct { left, right, .. }
3700 | HydroNode::CrossSingleton { left, right, .. }
3701 | HydroNode::Join { left, right, .. } => {
3702 vec![left, right]
3703 }
3704 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3705 vec![pos, neg]
3706 }
3707 HydroNode::Map { input, .. }
3708 | HydroNode::FlatMap { input, .. }
3709 | HydroNode::Filter { input, .. }
3710 | HydroNode::FilterMap { input, .. }
3711 | HydroNode::Sort { input, .. }
3712 | HydroNode::DeferTick { input, .. }
3713 | HydroNode::Enumerate { input, .. }
3714 | HydroNode::Inspect { input, .. }
3715 | HydroNode::Unique { input, .. }
3716 | HydroNode::Network { input, .. }
3717 | HydroNode::Counter { input, .. }
3718 | HydroNode::ResolveFutures { input, .. }
3719 | HydroNode::ResolveFuturesOrdered { input, .. }
3720 | HydroNode::Fold { input, .. }
3721 | HydroNode::FoldKeyed { input, .. }
3722 | HydroNode::Reduce { input, .. }
3723 | HydroNode::ReduceKeyed { input, .. }
3724 | HydroNode::Scan { input, .. } => {
3725 vec![input]
3726 }
3727 HydroNode::ReduceKeyedWatermark {
3728 input, watermark, ..
3729 } => {
3730 vec![input, watermark]
3731 }
3732 }
3733 }
3734
3735 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3736 self.input()
3737 .iter()
3738 .map(|input_node| input_node.metadata())
3739 .collect()
3740 }
3741
3742 pub fn print_root(&self) -> String {
3743 match self {
3744 HydroNode::Placeholder => {
3745 panic!()
3746 }
3747 HydroNode::Cast { .. } => "Cast()".to_owned(),
3748 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3749 HydroNode::Source { source, .. } => format!("Source({:?})", source),
3750 HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3751 HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3752 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3753 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3754 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3755 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3756 HydroNode::Batch { .. } => "Batch()".to_owned(),
3757 HydroNode::Chain { first, second, .. } => {
3758 format!("Chain({}, {})", first.print_root(), second.print_root())
3759 }
3760 HydroNode::ChainFirst { first, second, .. } => {
3761 format!(
3762 "ChainFirst({}, {})",
3763 first.print_root(),
3764 second.print_root()
3765 )
3766 }
3767 HydroNode::CrossProduct { left, right, .. } => {
3768 format!(
3769 "CrossProduct({}, {})",
3770 left.print_root(),
3771 right.print_root()
3772 )
3773 }
3774 HydroNode::CrossSingleton { left, right, .. } => {
3775 format!(
3776 "CrossSingleton({}, {})",
3777 left.print_root(),
3778 right.print_root()
3779 )
3780 }
3781 HydroNode::Join { left, right, .. } => {
3782 format!("Join({}, {})", left.print_root(), right.print_root())
3783 }
3784 HydroNode::Difference { pos, neg, .. } => {
3785 format!("Difference({}, {})", pos.print_root(), neg.print_root())
3786 }
3787 HydroNode::AntiJoin { pos, neg, .. } => {
3788 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3789 }
3790 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
3791 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
3792 HydroNode::Map { f, .. } => format!("Map({:?})", f),
3793 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3794 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3795 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3796 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
3797 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
3798 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3799 HydroNode::Unique { .. } => "Unique()".to_owned(),
3800 HydroNode::Sort { .. } => "Sort()".to_owned(),
3801 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3802 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3803 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3804 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3805 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3806 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3807 HydroNode::Network { .. } => "Network()".to_owned(),
3808 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
3809 HydroNode::Counter { tag, duration, .. } => {
3810 format!("Counter({:?}, {:?})", tag, duration)
3811 }
3812 HydroNode::EmbeddedInput { ident, .. } => format!("EmbeddedInput({})", ident),
3813 }
3814 }
3815}
3816
3817#[cfg(feature = "build")]
3818fn instantiate_network<'a, D>(
3819 from_location: &LocationId,
3820 to_location: &LocationId,
3821 processes: &SparseSecondaryMap<LocationKey, D::Process>,
3822 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
3823) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3824where
3825 D: Deploy<'a>,
3826{
3827 let ((sink, source), connect_fn) = match (from_location, to_location) {
3828 (&LocationId::Process(from), &LocationId::Process(to)) => {
3829 let from_node = processes
3830 .get(from)
3831 .unwrap_or_else(|| {
3832 panic!("A process used in the graph was not instantiated: {}", from)
3833 })
3834 .clone();
3835 let to_node = processes
3836 .get(to)
3837 .unwrap_or_else(|| {
3838 panic!("A process used in the graph was not instantiated: {}", to)
3839 })
3840 .clone();
3841
3842 let sink_port = from_node.next_port();
3843 let source_port = to_node.next_port();
3844
3845 (
3846 D::o2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3847 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3848 )
3849 }
3850 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
3851 let from_node = processes
3852 .get(from)
3853 .unwrap_or_else(|| {
3854 panic!("A process used in the graph was not instantiated: {}", from)
3855 })
3856 .clone();
3857 let to_node = clusters
3858 .get(to)
3859 .unwrap_or_else(|| {
3860 panic!("A cluster used in the graph was not instantiated: {}", to)
3861 })
3862 .clone();
3863
3864 let sink_port = from_node.next_port();
3865 let source_port = to_node.next_port();
3866
3867 (
3868 D::o2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3869 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3870 )
3871 }
3872 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
3873 let from_node = clusters
3874 .get(from)
3875 .unwrap_or_else(|| {
3876 panic!("A cluster used in the graph was not instantiated: {}", from)
3877 })
3878 .clone();
3879 let to_node = processes
3880 .get(to)
3881 .unwrap_or_else(|| {
3882 panic!("A process used in the graph was not instantiated: {}", to)
3883 })
3884 .clone();
3885
3886 let sink_port = from_node.next_port();
3887 let source_port = to_node.next_port();
3888
3889 (
3890 D::m2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3891 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3892 )
3893 }
3894 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
3895 let from_node = clusters
3896 .get(from)
3897 .unwrap_or_else(|| {
3898 panic!("A cluster used in the graph was not instantiated: {}", from)
3899 })
3900 .clone();
3901 let to_node = clusters
3902 .get(to)
3903 .unwrap_or_else(|| {
3904 panic!("A cluster used in the graph was not instantiated: {}", to)
3905 })
3906 .clone();
3907
3908 let sink_port = from_node.next_port();
3909 let source_port = to_node.next_port();
3910
3911 (
3912 D::m2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3913 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3914 )
3915 }
3916 (LocationId::Tick(_, _), _) => panic!(),
3917 (_, LocationId::Tick(_, _)) => panic!(),
3918 (LocationId::Atomic(_), _) => panic!(),
3919 (_, LocationId::Atomic(_)) => panic!(),
3920 };
3921 (sink, source, connect_fn)
3922}
3923
3924#[cfg(test)]
3925mod test {
3926 use std::mem::size_of;
3927
3928 use stageleft::{QuotedWithContext, q};
3929
3930 use super::*;
3931
3932 #[test]
3933 #[cfg_attr(
3934 not(feature = "build"),
3935 ignore = "expects inclusion of feature-gated fields"
3936 )]
3937 fn hydro_node_size() {
3938 assert_eq!(size_of::<HydroNode>(), 240);
3939 }
3940
3941 #[test]
3942 #[cfg_attr(
3943 not(feature = "build"),
3944 ignore = "expects inclusion of feature-gated fields"
3945 )]
3946 fn hydro_root_size() {
3947 assert_eq!(size_of::<HydroRoot>(), 136);
3948 }
3949
3950 #[test]
3951 fn test_simplify_q_macro_basic() {
3952 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3954 let result = simplify_q_macro(simple_expr.clone());
3955 assert_eq!(result, simple_expr);
3956 }
3957
3958 #[test]
3959 fn test_simplify_q_macro_actual_stageleft_call() {
3960 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3962 let result = simplify_q_macro(stageleft_call);
3963 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3966 }
3967
3968 #[test]
3969 fn test_closure_no_pipe_at_start() {
3970 let stageleft_call = q!({
3972 let foo = 123;
3973 move |b: usize| b + foo
3974 })
3975 .splice_fn1_ctx(&());
3976 let result = simplify_q_macro(stageleft_call);
3977 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3978 }
3979}