Skip to main content

hydro_lang/compile/ir/
mod.rs

1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4use std::fmt::{Debug, Display};
5use std::hash::{Hash, Hasher};
6use std::ops::Deref;
7use std::rc::Rc;
8
9#[cfg(feature = "build")]
10use dfir_lang::graph::FlatGraphBuilder;
11#[cfg(feature = "build")]
12use proc_macro2::Span;
13use proc_macro2::TokenStream;
14use quote::ToTokens;
15#[cfg(feature = "build")]
16use quote::quote;
17#[cfg(feature = "build")]
18use slotmap::{SecondaryMap, SparseSecondaryMap};
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24use crate::compile::builder::ExternalPortId;
25#[cfg(feature = "build")]
26use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
27use crate::location::dynamic::LocationId;
28use crate::location::{LocationKey, NetworkHint};
29
30pub mod backtrace;
31use backtrace::Backtrace;
32
33/// Wrapper that displays only the tokens of a parsed expr.
34///
35/// Boxes `syn::Type` which is ~240 bytes.
36#[derive(Clone, Hash)]
37pub struct DebugExpr(pub Box<syn::Expr>);
38
39impl From<syn::Expr> for DebugExpr {
40    fn from(expr: syn::Expr) -> Self {
41        Self(Box::new(expr))
42    }
43}
44
45impl Deref for DebugExpr {
46    type Target = syn::Expr;
47
48    fn deref(&self) -> &Self::Target {
49        &self.0
50    }
51}
52
53impl ToTokens for DebugExpr {
54    fn to_tokens(&self, tokens: &mut TokenStream) {
55        self.0.to_tokens(tokens);
56    }
57}
58
59impl Debug for DebugExpr {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        write!(f, "{}", self.0.to_token_stream())
62    }
63}
64
65impl Display for DebugExpr {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        let original = self.0.as_ref().clone();
68        let simplified = simplify_q_macro(original);
69
70        // For now, just use quote formatting without trying to parse as a statement
71        // This avoids the syn::parse_quote! issues entirely
72        write!(f, "q!({})", quote::quote!(#simplified))
73    }
74}
75
76/// Simplify expanded q! macro calls back to q!(...) syntax for better readability
77fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
78    // Try to parse the token string as a syn::Expr
79    // Use a visitor to simplify q! macro expansions
80    let mut simplifier = QMacroSimplifier::new();
81    simplifier.visit_expr_mut(&mut expr);
82
83    // If we found and simplified a q! macro, return the simplified version
84    if let Some(simplified) = simplifier.simplified_result {
85        simplified
86    } else {
87        expr
88    }
89}
90
91/// AST visitor that simplifies q! macro expansions
92#[derive(Default)]
93pub struct QMacroSimplifier {
94    pub simplified_result: Option<syn::Expr>,
95}
96
97impl QMacroSimplifier {
98    pub fn new() -> Self {
99        Self::default()
100    }
101}
102
103impl VisitMut for QMacroSimplifier {
104    fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
105        // Check if we already found a result to avoid further processing
106        if self.simplified_result.is_some() {
107            return;
108        }
109
110        if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
111            // Look for calls to stageleft::runtime_support::fn*
112            && self.is_stageleft_runtime_support_call(&path_expr.path)
113            // Try to extract the closure from the arguments
114            && let Some(closure) = self.extract_closure_from_args(&call.args)
115        {
116            self.simplified_result = Some(closure);
117            return;
118        }
119
120        // Continue visiting child expressions using the default implementation
121        // Use the default visitor to avoid infinite recursion
122        syn::visit_mut::visit_expr_mut(self, expr);
123    }
124}
125
126impl QMacroSimplifier {
127    fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
128        // Check if this is a call to stageleft::runtime_support::fn*
129        if let Some(last_segment) = path.segments.last() {
130            let fn_name = last_segment.ident.to_string();
131            // if fn_name.starts_with("fn") && fn_name.contains("_expr") {
132            fn_name.contains("_type_hint")
133                && path.segments.len() > 2
134                && path.segments[0].ident == "stageleft"
135                && path.segments[1].ident == "runtime_support"
136        } else {
137            false
138        }
139    }
140
141    fn extract_closure_from_args(
142        &self,
143        args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
144    ) -> Option<syn::Expr> {
145        // Look through the arguments for a closure expression
146        for arg in args {
147            if let syn::Expr::Closure(_) = arg {
148                return Some(arg.clone());
149            }
150            // Also check for closures nested in other expressions (like blocks)
151            if let Some(closure_expr) = self.find_closure_in_expr(arg) {
152                return Some(closure_expr);
153            }
154        }
155        None
156    }
157
158    fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
159        let mut visitor = ClosureFinder {
160            found_closure: None,
161            prefer_inner_blocks: true,
162        };
163        visitor.visit_expr(expr);
164        visitor.found_closure
165    }
166}
167
168/// Visitor that finds closures in expressions with special block handling
169struct ClosureFinder {
170    found_closure: Option<syn::Expr>,
171    prefer_inner_blocks: bool,
172}
173
174impl<'ast> Visit<'ast> for ClosureFinder {
175    fn visit_expr(&mut self, expr: &'ast syn::Expr) {
176        // If we already found a closure, don't continue searching
177        if self.found_closure.is_some() {
178            return;
179        }
180
181        match expr {
182            syn::Expr::Closure(_) => {
183                self.found_closure = Some(expr.clone());
184            }
185            syn::Expr::Block(block) if self.prefer_inner_blocks => {
186                // Special handling for blocks - look for inner blocks that contain closures
187                for stmt in &block.block.stmts {
188                    if let syn::Stmt::Expr(stmt_expr, _) = stmt
189                        && let syn::Expr::Block(_) = stmt_expr
190                    {
191                        // Check if this nested block contains a closure
192                        let mut inner_visitor = ClosureFinder {
193                            found_closure: None,
194                            prefer_inner_blocks: false, // Avoid infinite recursion
195                        };
196                        inner_visitor.visit_expr(stmt_expr);
197                        if inner_visitor.found_closure.is_some() {
198                            // Found a closure in an inner block, return that block
199                            self.found_closure = Some(stmt_expr.clone());
200                            return;
201                        }
202                    }
203                }
204
205                // If no inner block with closure found, continue with normal visitation
206                visit::visit_expr(self, expr);
207
208                // If we found a closure, just return the closure itself, not the whole block
209                // unless we're in the special case where we want the containing block
210                if self.found_closure.is_some() {
211                    // The closure was found during visitation, no need to wrap in block
212                }
213            }
214            _ => {
215                // Use default visitor behavior for all other expressions
216                visit::visit_expr(self, expr);
217            }
218        }
219    }
220}
221
222/// Debug displays the type's tokens.
223///
224/// Boxes `syn::Type` which is ~320 bytes.
225#[derive(Clone, PartialEq, Eq, Hash)]
226pub struct DebugType(pub Box<syn::Type>);
227
228impl From<syn::Type> for DebugType {
229    fn from(t: syn::Type) -> Self {
230        Self(Box::new(t))
231    }
232}
233
234impl Deref for DebugType {
235    type Target = syn::Type;
236
237    fn deref(&self) -> &Self::Target {
238        &self.0
239    }
240}
241
242impl ToTokens for DebugType {
243    fn to_tokens(&self, tokens: &mut TokenStream) {
244        self.0.to_tokens(tokens);
245    }
246}
247
248impl Debug for DebugType {
249    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250        write!(f, "{}", self.0.to_token_stream())
251    }
252}
253
254pub enum DebugInstantiate {
255    Building,
256    Finalized(Box<DebugInstantiateFinalized>),
257}
258
259#[cfg_attr(
260    not(feature = "build"),
261    expect(
262        dead_code,
263        reason = "sink, source unused without `feature = \"build\"`."
264    )
265)]
266pub struct DebugInstantiateFinalized {
267    sink: syn::Expr,
268    source: syn::Expr,
269    connect_fn: Option<Box<dyn FnOnce()>>,
270}
271
272impl From<DebugInstantiateFinalized> for DebugInstantiate {
273    fn from(f: DebugInstantiateFinalized) -> Self {
274        Self::Finalized(Box::new(f))
275    }
276}
277
278impl Debug for DebugInstantiate {
279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280        write!(f, "<network instantiate>")
281    }
282}
283
284impl Hash for DebugInstantiate {
285    fn hash<H: Hasher>(&self, _state: &mut H) {
286        // Do nothing
287    }
288}
289
290impl Clone for DebugInstantiate {
291    fn clone(&self) -> Self {
292        match self {
293            DebugInstantiate::Building => DebugInstantiate::Building,
294            DebugInstantiate::Finalized(_) => {
295                panic!("DebugInstantiate::Finalized should not be cloned")
296            }
297        }
298    }
299}
300
301/// A source in a Hydro graph, where data enters the graph.
302#[derive(Debug, Hash, Clone)]
303pub enum HydroSource {
304    Stream(DebugExpr),
305    ExternalNetwork(),
306    Iter(DebugExpr),
307    Spin(),
308    ClusterMembers(LocationId),
309}
310
311#[cfg(feature = "build")]
312/// A trait that abstracts over elements of DFIR code-gen that differ between production deployment
313/// and simulations.
314///
315/// In particular, this lets the simulator fuse together all locations into one DFIR graph, spit
316/// out separate graphs for each tick, and emit hooks for controlling non-deterministic operators.
317pub trait DfirBuilder {
318    /// Whether the representation of singletons should include intermediate states.
319    fn singleton_intermediates(&self) -> bool;
320
321    /// Gets the DFIR builder for the given location, creating it if necessary.
322    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
323
324    fn batch(
325        &mut self,
326        in_ident: syn::Ident,
327        in_location: &LocationId,
328        in_kind: &CollectionKind,
329        out_ident: &syn::Ident,
330        out_location: &LocationId,
331        op_meta: &HydroIrOpMetadata,
332    );
333    fn yield_from_tick(
334        &mut self,
335        in_ident: syn::Ident,
336        in_location: &LocationId,
337        in_kind: &CollectionKind,
338        out_ident: &syn::Ident,
339        out_location: &LocationId,
340    );
341
342    fn begin_atomic(
343        &mut self,
344        in_ident: syn::Ident,
345        in_location: &LocationId,
346        in_kind: &CollectionKind,
347        out_ident: &syn::Ident,
348        out_location: &LocationId,
349        op_meta: &HydroIrOpMetadata,
350    );
351    fn end_atomic(
352        &mut self,
353        in_ident: syn::Ident,
354        in_location: &LocationId,
355        in_kind: &CollectionKind,
356        out_ident: &syn::Ident,
357    );
358
359    #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
360    fn observe_nondet(
361        &mut self,
362        trusted: bool,
363        location: &LocationId,
364        in_ident: syn::Ident,
365        in_kind: &CollectionKind,
366        out_ident: &syn::Ident,
367        out_kind: &CollectionKind,
368        op_meta: &HydroIrOpMetadata,
369    );
370
371    #[expect(clippy::too_many_arguments, reason = "TODO")]
372    fn create_network(
373        &mut self,
374        from: &LocationId,
375        to: &LocationId,
376        input_ident: syn::Ident,
377        out_ident: &syn::Ident,
378        serialize: Option<&DebugExpr>,
379        sink: syn::Expr,
380        source: syn::Expr,
381        deserialize: Option<&DebugExpr>,
382        tag_id: usize,
383    );
384
385    fn create_external_source(
386        &mut self,
387        on: &LocationId,
388        source_expr: syn::Expr,
389        out_ident: &syn::Ident,
390        deserialize: Option<&DebugExpr>,
391        tag_id: usize,
392    );
393
394    fn create_external_output(
395        &mut self,
396        on: &LocationId,
397        sink_expr: syn::Expr,
398        input_ident: &syn::Ident,
399        serialize: Option<&DebugExpr>,
400        tag_id: usize,
401    );
402}
403
404#[cfg(feature = "build")]
405impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
406    fn singleton_intermediates(&self) -> bool {
407        false
408    }
409
410    fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
411        self.entry(location.root().key())
412            .expect("location was removed")
413            .or_default()
414    }
415
416    fn batch(
417        &mut self,
418        in_ident: syn::Ident,
419        in_location: &LocationId,
420        in_kind: &CollectionKind,
421        out_ident: &syn::Ident,
422        _out_location: &LocationId,
423        _op_meta: &HydroIrOpMetadata,
424    ) {
425        let builder = self.get_dfir_mut(in_location.root());
426        if in_kind.is_bounded()
427            && matches!(
428                in_kind,
429                CollectionKind::Singleton { .. }
430                    | CollectionKind::Optional { .. }
431                    | CollectionKind::KeyedSingleton { .. }
432            )
433        {
434            assert!(in_location.is_top_level());
435            builder.add_dfir(
436                parse_quote! {
437                    #out_ident = #in_ident -> persist::<'static>();
438                },
439                None,
440                None,
441            );
442        } else {
443            builder.add_dfir(
444                parse_quote! {
445                    #out_ident = #in_ident;
446                },
447                None,
448                None,
449            );
450        }
451    }
452
453    fn yield_from_tick(
454        &mut self,
455        in_ident: syn::Ident,
456        in_location: &LocationId,
457        _in_kind: &CollectionKind,
458        out_ident: &syn::Ident,
459        _out_location: &LocationId,
460    ) {
461        let builder = self.get_dfir_mut(in_location.root());
462        builder.add_dfir(
463            parse_quote! {
464                #out_ident = #in_ident;
465            },
466            None,
467            None,
468        );
469    }
470
471    fn begin_atomic(
472        &mut self,
473        in_ident: syn::Ident,
474        in_location: &LocationId,
475        _in_kind: &CollectionKind,
476        out_ident: &syn::Ident,
477        _out_location: &LocationId,
478        _op_meta: &HydroIrOpMetadata,
479    ) {
480        let builder = self.get_dfir_mut(in_location.root());
481        builder.add_dfir(
482            parse_quote! {
483                #out_ident = #in_ident;
484            },
485            None,
486            None,
487        );
488    }
489
490    fn end_atomic(
491        &mut self,
492        in_ident: syn::Ident,
493        in_location: &LocationId,
494        _in_kind: &CollectionKind,
495        out_ident: &syn::Ident,
496    ) {
497        let builder = self.get_dfir_mut(in_location.root());
498        builder.add_dfir(
499            parse_quote! {
500                #out_ident = #in_ident;
501            },
502            None,
503            None,
504        );
505    }
506
507    fn observe_nondet(
508        &mut self,
509        _trusted: bool,
510        location: &LocationId,
511        in_ident: syn::Ident,
512        _in_kind: &CollectionKind,
513        out_ident: &syn::Ident,
514        _out_kind: &CollectionKind,
515        _op_meta: &HydroIrOpMetadata,
516    ) {
517        let builder = self.get_dfir_mut(location);
518        builder.add_dfir(
519            parse_quote! {
520                #out_ident = #in_ident;
521            },
522            None,
523            None,
524        );
525    }
526
527    fn create_network(
528        &mut self,
529        from: &LocationId,
530        to: &LocationId,
531        input_ident: syn::Ident,
532        out_ident: &syn::Ident,
533        serialize: Option<&DebugExpr>,
534        sink: syn::Expr,
535        source: syn::Expr,
536        deserialize: Option<&DebugExpr>,
537        tag_id: usize,
538    ) {
539        let sender_builder = self.get_dfir_mut(from);
540        if let Some(serialize_pipeline) = serialize {
541            sender_builder.add_dfir(
542                parse_quote! {
543                    #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
544                },
545                None,
546                // operator tag separates send and receive, which otherwise have the same next_stmt_id
547                Some(&format!("send{}", tag_id)),
548            );
549        } else {
550            sender_builder.add_dfir(
551                parse_quote! {
552                    #input_ident -> dest_sink(#sink);
553                },
554                None,
555                Some(&format!("send{}", tag_id)),
556            );
557        }
558
559        let receiver_builder = self.get_dfir_mut(to);
560        if let Some(deserialize_pipeline) = deserialize {
561            receiver_builder.add_dfir(
562                parse_quote! {
563                    #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
564                },
565                None,
566                Some(&format!("recv{}", tag_id)),
567            );
568        } else {
569            receiver_builder.add_dfir(
570                parse_quote! {
571                    #out_ident = source_stream(#source);
572                },
573                None,
574                Some(&format!("recv{}", tag_id)),
575            );
576        }
577    }
578
579    fn create_external_source(
580        &mut self,
581        on: &LocationId,
582        source_expr: syn::Expr,
583        out_ident: &syn::Ident,
584        deserialize: Option<&DebugExpr>,
585        tag_id: usize,
586    ) {
587        let receiver_builder = self.get_dfir_mut(on);
588        if let Some(deserialize_pipeline) = deserialize {
589            receiver_builder.add_dfir(
590                parse_quote! {
591                    #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
592                },
593                None,
594                Some(&format!("recv{}", tag_id)),
595            );
596        } else {
597            receiver_builder.add_dfir(
598                parse_quote! {
599                    #out_ident = source_stream(#source_expr);
600                },
601                None,
602                Some(&format!("recv{}", tag_id)),
603            );
604        }
605    }
606
607    fn create_external_output(
608        &mut self,
609        on: &LocationId,
610        sink_expr: syn::Expr,
611        input_ident: &syn::Ident,
612        serialize: Option<&DebugExpr>,
613        tag_id: usize,
614    ) {
615        let sender_builder = self.get_dfir_mut(on);
616        if let Some(serialize_fn) = serialize {
617            sender_builder.add_dfir(
618                parse_quote! {
619                    #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
620                },
621                None,
622                // operator tag separates send and receive, which otherwise have the same next_stmt_id
623                Some(&format!("send{}", tag_id)),
624            );
625        } else {
626            sender_builder.add_dfir(
627                parse_quote! {
628                    #input_ident -> dest_sink(#sink_expr);
629                },
630                None,
631                Some(&format!("send{}", tag_id)),
632            );
633        }
634    }
635}
636
637#[cfg(feature = "build")]
638pub enum BuildersOrCallback<'a, L, N>
639where
640    L: FnMut(&mut HydroRoot, &mut usize),
641    N: FnMut(&mut HydroNode, &mut usize),
642{
643    Builders(&'a mut dyn DfirBuilder),
644    Callback(L, N),
645}
646
647/// An root in a Hydro graph, which is an pipeline that doesn't emit
648/// any downstream values. Traversals over the dataflow graph and
649/// generating DFIR IR start from roots.
650#[derive(Debug, Hash)]
651pub enum HydroRoot {
652    ForEach {
653        f: DebugExpr,
654        input: Box<HydroNode>,
655        op_metadata: HydroIrOpMetadata,
656    },
657    SendExternal {
658        to_external_key: LocationKey,
659        to_port_id: ExternalPortId,
660        to_many: bool,
661        unpaired: bool,
662        serialize_fn: Option<DebugExpr>,
663        instantiate_fn: DebugInstantiate,
664        input: Box<HydroNode>,
665        op_metadata: HydroIrOpMetadata,
666    },
667    DestSink {
668        sink: DebugExpr,
669        input: Box<HydroNode>,
670        op_metadata: HydroIrOpMetadata,
671    },
672    CycleSink {
673        ident: syn::Ident,
674        input: Box<HydroNode>,
675        op_metadata: HydroIrOpMetadata,
676    },
677}
678
679impl HydroRoot {
680    #[cfg(feature = "build")]
681    pub fn compile_network<'a, D>(
682        &mut self,
683        extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
684        seen_tees: &mut SeenTees,
685        processes: &SparseSecondaryMap<LocationKey, D::Process>,
686        clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
687        externals: &SparseSecondaryMap<LocationKey, D::External>,
688        env: &mut D::InstantiateEnv,
689    ) where
690        D: Deploy<'a>,
691    {
692        let refcell_extra_stmts = RefCell::new(extra_stmts);
693        let refcell_env = RefCell::new(env);
694        self.transform_bottom_up(
695            &mut |l| {
696                if let HydroRoot::SendExternal {
697                    input,
698                    to_external_key,
699                    to_port_id,
700                    to_many,
701                    unpaired,
702                    instantiate_fn,
703                    ..
704                } = l
705                {
706                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
707                        DebugInstantiate::Building => {
708                            let to_node = externals
709                                .get(*to_external_key)
710                                .unwrap_or_else(|| {
711                                    panic!("A external used in the graph was not instantiated: {}", to_external_key)
712                                })
713                                .clone();
714
715                            match input.metadata().location_id.root() {
716                                &LocationId::Process(process_key) => {
717                                    if *to_many {
718                                        (
719                                            (
720                                                D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
721                                                parse_quote!(DUMMY),
722                                            ),
723                                            Box::new(|| {}) as Box<dyn FnOnce()>,
724                                        )
725                                    } else {
726                                        let from_node = processes
727                                            .get(process_key)
728                                            .unwrap_or_else(|| {
729                                                panic!("A process used in the graph was not instantiated: {}", process_key)
730                                            })
731                                            .clone();
732
733                                        let sink_port = from_node.next_port();
734                                        let source_port = to_node.next_port();
735
736                                        if *unpaired {
737                                            use stageleft::quote_type;
738                                            use tokio_util::codec::LengthDelimitedCodec;
739
740                                            to_node.register(*to_port_id, source_port.clone());
741
742                                            let _ = D::e2o_source(
743                                                refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
744                                                &to_node, &source_port,
745                                                &from_node, &sink_port,
746                                                &quote_type::<LengthDelimitedCodec>(),
747                                                format!("{}_{}", *to_external_key, *to_port_id)
748                                            );
749                                        }
750
751                                        (
752                                            (
753                                                D::o2e_sink(
754                                                    &from_node,
755                                                    &sink_port,
756                                                    &to_node,
757                                                    &source_port,
758                                                    format!("{}_{}", *to_external_key, *to_port_id)
759                                                ),
760                                                parse_quote!(DUMMY),
761                                            ),
762                                            if *unpaired {
763                                                D::e2o_connect(
764                                                    &to_node,
765                                                    &source_port,
766                                                    &from_node,
767                                                    &sink_port,
768                                                    *to_many,
769                                                    NetworkHint::Auto,
770                                                )
771                                            } else {
772                                                Box::new(|| {}) as Box<dyn FnOnce()>
773                                            },
774                                        )
775                                    }
776                                }
777                                LocationId::Cluster(_) => todo!(),
778                                _ => panic!()
779                            }
780                        },
781
782                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
783                    };
784
785                    *instantiate_fn = DebugInstantiateFinalized {
786                        sink: sink_expr,
787                        source: source_expr,
788                        connect_fn: Some(connect_fn),
789                    }
790                    .into();
791                }
792            },
793            &mut |n| {
794                if let HydroNode::Network {
795                    input,
796                    instantiate_fn,
797                    metadata,
798                    ..
799                } = n
800                {
801                    let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
802                        DebugInstantiate::Building => instantiate_network::<D>(
803                            input.metadata().location_id.root(),
804                            metadata.location_id.root(),
805                            processes,
806                            clusters,
807                        ),
808
809                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
810                    };
811
812                    *instantiate_fn = DebugInstantiateFinalized {
813                        sink: sink_expr,
814                        source: source_expr,
815                        connect_fn: Some(connect_fn),
816                    }
817                    .into();
818                } else if let HydroNode::ExternalInput {
819                    from_external_key,
820                    from_port_id,
821                    from_many,
822                    codec_type,
823                    port_hint,
824                    instantiate_fn,
825                    metadata,
826                    ..
827                } = n
828                {
829                    let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
830                        DebugInstantiate::Building => {
831                            let from_node = externals
832                                .get(*from_external_key)
833                                .unwrap_or_else(|| {
834                                    panic!(
835                                        "A external used in the graph was not instantiated: {}",
836                                        from_external_key,
837                                    )
838                                })
839                                .clone();
840
841                            match metadata.location_id.root() {
842                                &LocationId::Process(process_key) => {
843                                    let to_node = processes
844                                        .get(process_key)
845                                        .unwrap_or_else(|| {
846                                            panic!("A process used in the graph was not instantiated: {}", process_key)
847                                        })
848                                        .clone();
849
850                                    let sink_port = from_node.next_port();
851                                    let source_port = to_node.next_port();
852
853                                    from_node.register(*from_port_id, sink_port.clone());
854
855                                    (
856                                        (
857                                            parse_quote!(DUMMY),
858                                            if *from_many {
859                                                D::e2o_many_source(
860                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
861                                                    &to_node, &source_port,
862                                                    codec_type.0.as_ref(),
863                                                    format!("{}_{}", *from_external_key, *from_port_id)
864                                                )
865                                            } else {
866                                                D::e2o_source(
867                                                    refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
868                                                    &from_node, &sink_port,
869                                                    &to_node, &source_port,
870                                                    codec_type.0.as_ref(),
871                                                    format!("{}_{}", *from_external_key, *from_port_id)
872                                                )
873                                            },
874                                        ),
875                                        D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
876                                    )
877                                }
878                                LocationId::Cluster(_) => todo!(),
879                                _ => panic!()
880                            }
881                        },
882
883                        DebugInstantiate::Finalized(_) => panic!("network already finalized"),
884                    };
885
886                    *instantiate_fn = DebugInstantiateFinalized {
887                        sink: sink_expr,
888                        source: source_expr,
889                        connect_fn: Some(connect_fn),
890                    }
891                    .into();
892                } else if let HydroNode::EmbeddedInput { ident, metadata } = n {
893                    let element_type = match &metadata.collection_kind {
894                        CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
895                        _ => panic!("EmbeddedInput must have Stream collection kind"),
896                    };
897                    D::register_embedded_input(
898                        &mut refcell_env.borrow_mut(),
899                        ident,
900                        &element_type,
901                    );
902                }
903            },
904            seen_tees,
905            false,
906        );
907    }
908
909    pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
910        self.transform_bottom_up(
911            &mut |l| {
912                if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
913                    match instantiate_fn {
914                        DebugInstantiate::Building => panic!("network not built"),
915
916                        DebugInstantiate::Finalized(finalized) => {
917                            (finalized.connect_fn.take().unwrap())();
918                        }
919                    }
920                }
921            },
922            &mut |n| {
923                if let HydroNode::Network { instantiate_fn, .. }
924                | HydroNode::ExternalInput { instantiate_fn, .. } = n
925                {
926                    match instantiate_fn {
927                        DebugInstantiate::Building => panic!("network not built"),
928
929                        DebugInstantiate::Finalized(finalized) => {
930                            (finalized.connect_fn.take().unwrap())();
931                        }
932                    }
933                }
934            },
935            seen_tees,
936            false,
937        );
938    }
939
940    pub fn transform_bottom_up(
941        &mut self,
942        transform_root: &mut impl FnMut(&mut HydroRoot),
943        transform_node: &mut impl FnMut(&mut HydroNode),
944        seen_tees: &mut SeenTees,
945        check_well_formed: bool,
946    ) {
947        self.transform_children(
948            |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
949            seen_tees,
950        );
951
952        transform_root(self);
953    }
954
955    pub fn transform_children(
956        &mut self,
957        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
958        seen_tees: &mut SeenTees,
959    ) {
960        match self {
961            HydroRoot::ForEach { input, .. }
962            | HydroRoot::SendExternal { input, .. }
963            | HydroRoot::DestSink { input, .. }
964            | HydroRoot::CycleSink { input, .. } => {
965                transform(input, seen_tees);
966            }
967        }
968    }
969
970    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
971        match self {
972            HydroRoot::ForEach {
973                f,
974                input,
975                op_metadata,
976            } => HydroRoot::ForEach {
977                f: f.clone(),
978                input: Box::new(input.deep_clone(seen_tees)),
979                op_metadata: op_metadata.clone(),
980            },
981            HydroRoot::SendExternal {
982                to_external_key,
983                to_port_id,
984                to_many,
985                unpaired,
986                serialize_fn,
987                instantiate_fn,
988                input,
989                op_metadata,
990            } => HydroRoot::SendExternal {
991                to_external_key: *to_external_key,
992                to_port_id: *to_port_id,
993                to_many: *to_many,
994                unpaired: *unpaired,
995                serialize_fn: serialize_fn.clone(),
996                instantiate_fn: instantiate_fn.clone(),
997                input: Box::new(input.deep_clone(seen_tees)),
998                op_metadata: op_metadata.clone(),
999            },
1000            HydroRoot::DestSink {
1001                sink,
1002                input,
1003                op_metadata,
1004            } => HydroRoot::DestSink {
1005                sink: sink.clone(),
1006                input: Box::new(input.deep_clone(seen_tees)),
1007                op_metadata: op_metadata.clone(),
1008            },
1009            HydroRoot::CycleSink {
1010                ident,
1011                input,
1012                op_metadata,
1013            } => HydroRoot::CycleSink {
1014                ident: ident.clone(),
1015                input: Box::new(input.deep_clone(seen_tees)),
1016                op_metadata: op_metadata.clone(),
1017            },
1018        }
1019    }
1020
1021    #[cfg(feature = "build")]
1022    pub fn emit<'a, D: Deploy<'a>>(
1023        &mut self,
1024        graph_builders: &mut dyn DfirBuilder,
1025        seen_tees: &mut SeenTees,
1026        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1027        next_stmt_id: &mut usize,
1028    ) {
1029        self.emit_core::<D>(
1030            &mut BuildersOrCallback::<
1031                fn(&mut HydroRoot, &mut usize),
1032                fn(&mut HydroNode, &mut usize),
1033            >::Builders(graph_builders),
1034            seen_tees,
1035            built_tees,
1036            next_stmt_id,
1037        );
1038    }
1039
1040    #[cfg(feature = "build")]
1041    pub fn emit_core<'a, D: Deploy<'a>>(
1042        &mut self,
1043        builders_or_callback: &mut BuildersOrCallback<
1044            impl FnMut(&mut HydroRoot, &mut usize),
1045            impl FnMut(&mut HydroNode, &mut usize),
1046        >,
1047        seen_tees: &mut SeenTees,
1048        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1049        next_stmt_id: &mut usize,
1050    ) {
1051        match self {
1052            HydroRoot::ForEach { f, input, .. } => {
1053                let input_ident =
1054                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1055
1056                match builders_or_callback {
1057                    BuildersOrCallback::Builders(graph_builders) => {
1058                        graph_builders
1059                            .get_dfir_mut(&input.metadata().location_id)
1060                            .add_dfir(
1061                                parse_quote! {
1062                                    #input_ident -> for_each(#f);
1063                                },
1064                                None,
1065                                Some(&next_stmt_id.to_string()),
1066                            );
1067                    }
1068                    BuildersOrCallback::Callback(leaf_callback, _) => {
1069                        leaf_callback(self, next_stmt_id);
1070                    }
1071                }
1072
1073                *next_stmt_id += 1;
1074            }
1075
1076            HydroRoot::SendExternal {
1077                serialize_fn,
1078                instantiate_fn,
1079                input,
1080                ..
1081            } => {
1082                let input_ident =
1083                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1084
1085                match builders_or_callback {
1086                    BuildersOrCallback::Builders(graph_builders) => {
1087                        let (sink_expr, _) = match instantiate_fn {
1088                            DebugInstantiate::Building => (
1089                                syn::parse_quote!(DUMMY_SINK),
1090                                syn::parse_quote!(DUMMY_SOURCE),
1091                            ),
1092
1093                            DebugInstantiate::Finalized(finalized) => {
1094                                (finalized.sink.clone(), finalized.source.clone())
1095                            }
1096                        };
1097
1098                        graph_builders.create_external_output(
1099                            &input.metadata().location_id,
1100                            sink_expr,
1101                            &input_ident,
1102                            serialize_fn.as_ref(),
1103                            *next_stmt_id,
1104                        );
1105                    }
1106                    BuildersOrCallback::Callback(leaf_callback, _) => {
1107                        leaf_callback(self, next_stmt_id);
1108                    }
1109                }
1110
1111                *next_stmt_id += 1;
1112            }
1113
1114            HydroRoot::DestSink { sink, input, .. } => {
1115                let input_ident =
1116                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1117
1118                match builders_or_callback {
1119                    BuildersOrCallback::Builders(graph_builders) => {
1120                        graph_builders
1121                            .get_dfir_mut(&input.metadata().location_id)
1122                            .add_dfir(
1123                                parse_quote! {
1124                                    #input_ident -> dest_sink(#sink);
1125                                },
1126                                None,
1127                                Some(&next_stmt_id.to_string()),
1128                            );
1129                    }
1130                    BuildersOrCallback::Callback(leaf_callback, _) => {
1131                        leaf_callback(self, next_stmt_id);
1132                    }
1133                }
1134
1135                *next_stmt_id += 1;
1136            }
1137
1138            HydroRoot::CycleSink { ident, input, .. } => {
1139                let input_ident =
1140                    input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1141
1142                match builders_or_callback {
1143                    BuildersOrCallback::Builders(graph_builders) => {
1144                        let elem_type: syn::Type = match &input.metadata().collection_kind {
1145                            CollectionKind::KeyedSingleton {
1146                                key_type,
1147                                value_type,
1148                                ..
1149                            }
1150                            | CollectionKind::KeyedStream {
1151                                key_type,
1152                                value_type,
1153                                ..
1154                            } => {
1155                                parse_quote!((#key_type, #value_type))
1156                            }
1157                            CollectionKind::Stream { element_type, .. }
1158                            | CollectionKind::Singleton { element_type, .. }
1159                            | CollectionKind::Optional { element_type, .. } => {
1160                                parse_quote!(#element_type)
1161                            }
1162                        };
1163
1164                        graph_builders
1165                            .get_dfir_mut(&input.metadata().location_id)
1166                            .add_dfir(
1167                                parse_quote! {
1168                                    #ident = #input_ident -> identity::<#elem_type>();
1169                                },
1170                                None,
1171                                None,
1172                            );
1173                    }
1174                    // No ID, no callback
1175                    BuildersOrCallback::Callback(_, _) => {}
1176                }
1177            }
1178        }
1179    }
1180
1181    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1182        match self {
1183            HydroRoot::ForEach { op_metadata, .. }
1184            | HydroRoot::SendExternal { op_metadata, .. }
1185            | HydroRoot::DestSink { op_metadata, .. }
1186            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1187        }
1188    }
1189
1190    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1191        match self {
1192            HydroRoot::ForEach { op_metadata, .. }
1193            | HydroRoot::SendExternal { op_metadata, .. }
1194            | HydroRoot::DestSink { op_metadata, .. }
1195            | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1196        }
1197    }
1198
1199    pub fn input(&self) -> &HydroNode {
1200        match self {
1201            HydroRoot::ForEach { input, .. }
1202            | HydroRoot::SendExternal { input, .. }
1203            | HydroRoot::DestSink { input, .. }
1204            | HydroRoot::CycleSink { input, .. } => input,
1205        }
1206    }
1207
1208    pub fn input_metadata(&self) -> &HydroIrMetadata {
1209        self.input().metadata()
1210    }
1211
1212    pub fn print_root(&self) -> String {
1213        match self {
1214            HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1215            HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1216            HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1217            HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1218        }
1219    }
1220
1221    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1222        match self {
1223            HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1224                transform(f);
1225            }
1226            HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1227        }
1228    }
1229}
1230
1231#[cfg(feature = "build")]
1232pub fn emit<'a, D: Deploy<'a>>(
1233    ir: &mut Vec<HydroRoot>,
1234) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1235    let mut builders = SecondaryMap::new();
1236    let mut seen_tees = HashMap::new();
1237    let mut built_tees = HashMap::new();
1238    let mut next_stmt_id = 0;
1239    for leaf in ir {
1240        leaf.emit::<D>(
1241            &mut builders,
1242            &mut seen_tees,
1243            &mut built_tees,
1244            &mut next_stmt_id,
1245        );
1246    }
1247    builders
1248}
1249
1250#[cfg(feature = "build")]
1251pub fn traverse_dfir<'a, D: Deploy<'a>>(
1252    ir: &mut [HydroRoot],
1253    transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1254    transform_node: impl FnMut(&mut HydroNode, &mut usize),
1255) {
1256    let mut seen_tees = HashMap::new();
1257    let mut built_tees = HashMap::new();
1258    let mut next_stmt_id = 0;
1259    let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1260    ir.iter_mut().for_each(|leaf| {
1261        leaf.emit_core::<D>(
1262            &mut callback,
1263            &mut seen_tees,
1264            &mut built_tees,
1265            &mut next_stmt_id,
1266        );
1267    });
1268}
1269
1270pub fn transform_bottom_up(
1271    ir: &mut [HydroRoot],
1272    transform_root: &mut impl FnMut(&mut HydroRoot),
1273    transform_node: &mut impl FnMut(&mut HydroNode),
1274    check_well_formed: bool,
1275) {
1276    let mut seen_tees = HashMap::new();
1277    ir.iter_mut().for_each(|leaf| {
1278        leaf.transform_bottom_up(
1279            transform_root,
1280            transform_node,
1281            &mut seen_tees,
1282            check_well_formed,
1283        );
1284    });
1285}
1286
1287pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1288    let mut seen_tees = HashMap::new();
1289    ir.iter()
1290        .map(|leaf| leaf.deep_clone(&mut seen_tees))
1291        .collect()
1292}
1293
1294type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1295thread_local! {
1296    static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1297}
1298
1299pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1300    PRINTED_TEES.with(|printed_tees| {
1301        let mut printed_tees_mut = printed_tees.borrow_mut();
1302        *printed_tees_mut = Some((0, HashMap::new()));
1303        drop(printed_tees_mut);
1304
1305        let ret = f();
1306
1307        let mut printed_tees_mut = printed_tees.borrow_mut();
1308        *printed_tees_mut = None;
1309
1310        ret
1311    })
1312}
1313
1314pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1315
1316impl TeeNode {
1317    pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1318        Rc::as_ptr(&self.0)
1319    }
1320}
1321
1322impl Debug for TeeNode {
1323    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1324        PRINTED_TEES.with(|printed_tees| {
1325            let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1326            let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1327
1328            if let Some(printed_tees_mut) = printed_tees_mut {
1329                if let Some(existing) = printed_tees_mut
1330                    .1
1331                    .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1332                {
1333                    write!(f, "<tee {}>", existing)
1334                } else {
1335                    let next_id = printed_tees_mut.0;
1336                    printed_tees_mut.0 += 1;
1337                    printed_tees_mut
1338                        .1
1339                        .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1340                    drop(printed_tees_mut_borrow);
1341                    write!(f, "<tee {}>: ", next_id)?;
1342                    Debug::fmt(&self.0.borrow(), f)
1343                }
1344            } else {
1345                drop(printed_tees_mut_borrow);
1346                write!(f, "<tee>: ")?;
1347                Debug::fmt(&self.0.borrow(), f)
1348            }
1349        })
1350    }
1351}
1352
1353impl Hash for TeeNode {
1354    fn hash<H: Hasher>(&self, state: &mut H) {
1355        self.0.borrow_mut().hash(state);
1356    }
1357}
1358
1359#[derive(Clone, PartialEq, Eq, Debug)]
1360pub enum BoundKind {
1361    Unbounded,
1362    Bounded,
1363}
1364
1365#[derive(Clone, PartialEq, Eq, Debug)]
1366pub enum StreamOrder {
1367    NoOrder,
1368    TotalOrder,
1369}
1370
1371#[derive(Clone, PartialEq, Eq, Debug)]
1372pub enum StreamRetry {
1373    AtLeastOnce,
1374    ExactlyOnce,
1375}
1376
1377#[derive(Clone, PartialEq, Eq, Debug)]
1378pub enum KeyedSingletonBoundKind {
1379    Unbounded,
1380    BoundedValue,
1381    Bounded,
1382}
1383
1384#[derive(Clone, PartialEq, Eq, Debug)]
1385pub enum CollectionKind {
1386    Stream {
1387        bound: BoundKind,
1388        order: StreamOrder,
1389        retry: StreamRetry,
1390        element_type: DebugType,
1391    },
1392    Singleton {
1393        bound: BoundKind,
1394        element_type: DebugType,
1395    },
1396    Optional {
1397        bound: BoundKind,
1398        element_type: DebugType,
1399    },
1400    KeyedStream {
1401        bound: BoundKind,
1402        value_order: StreamOrder,
1403        value_retry: StreamRetry,
1404        key_type: DebugType,
1405        value_type: DebugType,
1406    },
1407    KeyedSingleton {
1408        bound: KeyedSingletonBoundKind,
1409        key_type: DebugType,
1410        value_type: DebugType,
1411    },
1412}
1413
1414impl CollectionKind {
1415    pub fn is_bounded(&self) -> bool {
1416        matches!(
1417            self,
1418            CollectionKind::Stream {
1419                bound: BoundKind::Bounded,
1420                ..
1421            } | CollectionKind::Singleton {
1422                bound: BoundKind::Bounded,
1423                ..
1424            } | CollectionKind::Optional {
1425                bound: BoundKind::Bounded,
1426                ..
1427            } | CollectionKind::KeyedStream {
1428                bound: BoundKind::Bounded,
1429                ..
1430            } | CollectionKind::KeyedSingleton {
1431                bound: KeyedSingletonBoundKind::Bounded,
1432                ..
1433            }
1434        )
1435    }
1436}
1437
1438#[derive(Clone)]
1439pub struct HydroIrMetadata {
1440    pub location_id: LocationId,
1441    pub collection_kind: CollectionKind,
1442    pub cardinality: Option<usize>,
1443    pub tag: Option<String>,
1444    pub op: HydroIrOpMetadata,
1445}
1446
1447// HydroIrMetadata shouldn't be used to hash or compare
1448impl Hash for HydroIrMetadata {
1449    fn hash<H: Hasher>(&self, _: &mut H) {}
1450}
1451
1452impl PartialEq for HydroIrMetadata {
1453    fn eq(&self, _: &Self) -> bool {
1454        true
1455    }
1456}
1457
1458impl Eq for HydroIrMetadata {}
1459
1460impl Debug for HydroIrMetadata {
1461    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1462        f.debug_struct("HydroIrMetadata")
1463            .field("location_id", &self.location_id)
1464            .field("collection_kind", &self.collection_kind)
1465            .finish()
1466    }
1467}
1468
1469/// Metadata that is specific to the operator itself, rather than its outputs.
1470/// This is available on _both_ inner nodes and roots.
1471#[derive(Clone)]
1472pub struct HydroIrOpMetadata {
1473    pub backtrace: Backtrace,
1474    pub cpu_usage: Option<f64>,
1475    pub network_recv_cpu_usage: Option<f64>,
1476    pub id: Option<usize>,
1477}
1478
1479impl HydroIrOpMetadata {
1480    #[expect(
1481        clippy::new_without_default,
1482        reason = "explicit calls to new ensure correct backtrace bounds"
1483    )]
1484    pub fn new() -> HydroIrOpMetadata {
1485        Self::new_with_skip(1)
1486    }
1487
1488    fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1489        HydroIrOpMetadata {
1490            backtrace: Backtrace::get_backtrace(2 + skip_count),
1491            cpu_usage: None,
1492            network_recv_cpu_usage: None,
1493            id: None,
1494        }
1495    }
1496}
1497
1498impl Debug for HydroIrOpMetadata {
1499    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1500        f.debug_struct("HydroIrOpMetadata").finish()
1501    }
1502}
1503
1504impl Hash for HydroIrOpMetadata {
1505    fn hash<H: Hasher>(&self, _: &mut H) {}
1506}
1507
1508/// An intermediate node in a Hydro graph, which consumes data
1509/// from upstream nodes and emits data to downstream nodes.
1510#[derive(Debug, Hash)]
1511pub enum HydroNode {
1512    Placeholder,
1513
1514    /// Manually "casts" between two different collection kinds.
1515    ///
1516    /// Using this IR node requires special care, since it bypasses many of Hydro's core
1517    /// correctness checks. In particular, the user must ensure that every possible
1518    /// "interpretation" of the input corresponds to a distinct "interpretation" of the output,
1519    /// where an "interpretation" is a possible output of `ObserveNonDet` applied to the
1520    /// collection. This ensures that the simulator does not miss any possible outputs.
1521    Cast {
1522        inner: Box<HydroNode>,
1523        metadata: HydroIrMetadata,
1524    },
1525
1526    /// Strengthens the guarantees of a stream by non-deterministically selecting a possible
1527    /// interpretation of the input stream.
1528    ///
1529    /// In production, this simply passes through the input, but in simulation, this operator
1530    /// explicitly selects a randomized interpretation.
1531    ObserveNonDet {
1532        inner: Box<HydroNode>,
1533        trusted: bool, // if true, we do not need to simulate non-determinism
1534        metadata: HydroIrMetadata,
1535    },
1536
1537    Source {
1538        source: HydroSource,
1539        metadata: HydroIrMetadata,
1540    },
1541
1542    SingletonSource {
1543        value: DebugExpr,
1544        metadata: HydroIrMetadata,
1545    },
1546
1547    CycleSource {
1548        ident: syn::Ident,
1549        metadata: HydroIrMetadata,
1550    },
1551
1552    Tee {
1553        inner: TeeNode,
1554        metadata: HydroIrMetadata,
1555    },
1556
1557    BeginAtomic {
1558        inner: Box<HydroNode>,
1559        metadata: HydroIrMetadata,
1560    },
1561
1562    EndAtomic {
1563        inner: Box<HydroNode>,
1564        metadata: HydroIrMetadata,
1565    },
1566
1567    Batch {
1568        inner: Box<HydroNode>,
1569        metadata: HydroIrMetadata,
1570    },
1571
1572    YieldConcat {
1573        inner: Box<HydroNode>,
1574        metadata: HydroIrMetadata,
1575    },
1576
1577    Chain {
1578        first: Box<HydroNode>,
1579        second: Box<HydroNode>,
1580        metadata: HydroIrMetadata,
1581    },
1582
1583    ChainFirst {
1584        first: Box<HydroNode>,
1585        second: Box<HydroNode>,
1586        metadata: HydroIrMetadata,
1587    },
1588
1589    CrossProduct {
1590        left: Box<HydroNode>,
1591        right: Box<HydroNode>,
1592        metadata: HydroIrMetadata,
1593    },
1594
1595    CrossSingleton {
1596        left: Box<HydroNode>,
1597        right: Box<HydroNode>,
1598        metadata: HydroIrMetadata,
1599    },
1600
1601    Join {
1602        left: Box<HydroNode>,
1603        right: Box<HydroNode>,
1604        metadata: HydroIrMetadata,
1605    },
1606
1607    Difference {
1608        pos: Box<HydroNode>,
1609        neg: Box<HydroNode>,
1610        metadata: HydroIrMetadata,
1611    },
1612
1613    AntiJoin {
1614        pos: Box<HydroNode>,
1615        neg: Box<HydroNode>,
1616        metadata: HydroIrMetadata,
1617    },
1618
1619    ResolveFutures {
1620        input: Box<HydroNode>,
1621        metadata: HydroIrMetadata,
1622    },
1623    ResolveFuturesOrdered {
1624        input: Box<HydroNode>,
1625        metadata: HydroIrMetadata,
1626    },
1627
1628    Map {
1629        f: DebugExpr,
1630        input: Box<HydroNode>,
1631        metadata: HydroIrMetadata,
1632    },
1633    FlatMap {
1634        f: DebugExpr,
1635        input: Box<HydroNode>,
1636        metadata: HydroIrMetadata,
1637    },
1638    Filter {
1639        f: DebugExpr,
1640        input: Box<HydroNode>,
1641        metadata: HydroIrMetadata,
1642    },
1643    FilterMap {
1644        f: DebugExpr,
1645        input: Box<HydroNode>,
1646        metadata: HydroIrMetadata,
1647    },
1648
1649    DeferTick {
1650        input: Box<HydroNode>,
1651        metadata: HydroIrMetadata,
1652    },
1653    Enumerate {
1654        input: Box<HydroNode>,
1655        metadata: HydroIrMetadata,
1656    },
1657    Inspect {
1658        f: DebugExpr,
1659        input: Box<HydroNode>,
1660        metadata: HydroIrMetadata,
1661    },
1662
1663    Unique {
1664        input: Box<HydroNode>,
1665        metadata: HydroIrMetadata,
1666    },
1667
1668    Sort {
1669        input: Box<HydroNode>,
1670        metadata: HydroIrMetadata,
1671    },
1672    Fold {
1673        init: DebugExpr,
1674        acc: DebugExpr,
1675        input: Box<HydroNode>,
1676        metadata: HydroIrMetadata,
1677    },
1678
1679    Scan {
1680        init: DebugExpr,
1681        acc: DebugExpr,
1682        input: Box<HydroNode>,
1683        metadata: HydroIrMetadata,
1684    },
1685    FoldKeyed {
1686        init: DebugExpr,
1687        acc: DebugExpr,
1688        input: Box<HydroNode>,
1689        metadata: HydroIrMetadata,
1690    },
1691
1692    Reduce {
1693        f: DebugExpr,
1694        input: Box<HydroNode>,
1695        metadata: HydroIrMetadata,
1696    },
1697    ReduceKeyed {
1698        f: DebugExpr,
1699        input: Box<HydroNode>,
1700        metadata: HydroIrMetadata,
1701    },
1702    ReduceKeyedWatermark {
1703        f: DebugExpr,
1704        input: Box<HydroNode>,
1705        watermark: Box<HydroNode>,
1706        metadata: HydroIrMetadata,
1707    },
1708
1709    Network {
1710        name: Option<String>,
1711        serialize_fn: Option<DebugExpr>,
1712        instantiate_fn: DebugInstantiate,
1713        deserialize_fn: Option<DebugExpr>,
1714        input: Box<HydroNode>,
1715        metadata: HydroIrMetadata,
1716    },
1717
1718    ExternalInput {
1719        from_external_key: LocationKey,
1720        from_port_id: ExternalPortId,
1721        from_many: bool,
1722        codec_type: DebugType,
1723        port_hint: NetworkHint,
1724        instantiate_fn: DebugInstantiate,
1725        deserialize_fn: Option<DebugExpr>,
1726        metadata: HydroIrMetadata,
1727    },
1728
1729    Counter {
1730        tag: String,
1731        duration: DebugExpr,
1732        prefix: String,
1733        input: Box<HydroNode>,
1734        metadata: HydroIrMetadata,
1735    },
1736
1737    /// An external input for embedded deployment mode.
1738    ///
1739    /// This node compiles to `source_stream(ident)` where `ident` is a parameter
1740    /// added to the generated function signature.
1741    EmbeddedInput {
1742        ident: syn::Ident,
1743        metadata: HydroIrMetadata,
1744    },
1745}
1746
1747pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1748pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1749
1750impl HydroNode {
1751    pub fn transform_bottom_up(
1752        &mut self,
1753        transform: &mut impl FnMut(&mut HydroNode),
1754        seen_tees: &mut SeenTees,
1755        check_well_formed: bool,
1756    ) {
1757        self.transform_children(
1758            |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1759            seen_tees,
1760        );
1761
1762        transform(self);
1763
1764        let self_location = self.metadata().location_id.root();
1765
1766        if check_well_formed {
1767            match &*self {
1768                HydroNode::Network { .. } => {}
1769                _ => {
1770                    self.input_metadata().iter().for_each(|i| {
1771                        if i.location_id.root() != self_location {
1772                            panic!(
1773                                "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1774                                i,
1775                                i.location_id.root(),
1776                                self,
1777                                self_location
1778                            )
1779                        }
1780                    });
1781                }
1782            }
1783        }
1784    }
1785
1786    #[inline(always)]
1787    pub fn transform_children(
1788        &mut self,
1789        mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1790        seen_tees: &mut SeenTees,
1791    ) {
1792        match self {
1793            HydroNode::Placeholder => {
1794                panic!();
1795            }
1796
1797            HydroNode::Source { .. }
1798            | HydroNode::SingletonSource { .. }
1799            | HydroNode::CycleSource { .. }
1800            | HydroNode::ExternalInput { .. }
1801            | HydroNode::EmbeddedInput { .. } => {}
1802
1803            HydroNode::Tee { inner, .. } => {
1804                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1805                    *inner = TeeNode(transformed.clone());
1806                } else {
1807                    let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1808                    seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1809                    let mut orig = inner.0.replace(HydroNode::Placeholder);
1810                    transform(&mut orig, seen_tees);
1811                    *transformed_cell.borrow_mut() = orig;
1812                    *inner = TeeNode(transformed_cell);
1813                }
1814            }
1815
1816            HydroNode::Cast { inner, .. }
1817            | HydroNode::ObserveNonDet { inner, .. }
1818            | HydroNode::BeginAtomic { inner, .. }
1819            | HydroNode::EndAtomic { inner, .. }
1820            | HydroNode::Batch { inner, .. }
1821            | HydroNode::YieldConcat { inner, .. } => {
1822                transform(inner.as_mut(), seen_tees);
1823            }
1824
1825            HydroNode::Chain { first, second, .. } => {
1826                transform(first.as_mut(), seen_tees);
1827                transform(second.as_mut(), seen_tees);
1828            }
1829
1830            HydroNode::ChainFirst { first, second, .. } => {
1831                transform(first.as_mut(), seen_tees);
1832                transform(second.as_mut(), seen_tees);
1833            }
1834
1835            HydroNode::CrossSingleton { left, right, .. }
1836            | HydroNode::CrossProduct { left, right, .. }
1837            | HydroNode::Join { left, right, .. } => {
1838                transform(left.as_mut(), seen_tees);
1839                transform(right.as_mut(), seen_tees);
1840            }
1841
1842            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1843                transform(pos.as_mut(), seen_tees);
1844                transform(neg.as_mut(), seen_tees);
1845            }
1846
1847            HydroNode::ReduceKeyedWatermark {
1848                input, watermark, ..
1849            } => {
1850                transform(input.as_mut(), seen_tees);
1851                transform(watermark.as_mut(), seen_tees);
1852            }
1853
1854            HydroNode::Map { input, .. }
1855            | HydroNode::ResolveFutures { input, .. }
1856            | HydroNode::ResolveFuturesOrdered { input, .. }
1857            | HydroNode::FlatMap { input, .. }
1858            | HydroNode::Filter { input, .. }
1859            | HydroNode::FilterMap { input, .. }
1860            | HydroNode::Sort { input, .. }
1861            | HydroNode::DeferTick { input, .. }
1862            | HydroNode::Enumerate { input, .. }
1863            | HydroNode::Inspect { input, .. }
1864            | HydroNode::Unique { input, .. }
1865            | HydroNode::Network { input, .. }
1866            | HydroNode::Fold { input, .. }
1867            | HydroNode::Scan { input, .. }
1868            | HydroNode::FoldKeyed { input, .. }
1869            | HydroNode::Reduce { input, .. }
1870            | HydroNode::ReduceKeyed { input, .. }
1871            | HydroNode::Counter { input, .. } => {
1872                transform(input.as_mut(), seen_tees);
1873            }
1874        }
1875    }
1876
1877    pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1878        match self {
1879            HydroNode::Placeholder => HydroNode::Placeholder,
1880            HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1881                inner: Box::new(inner.deep_clone(seen_tees)),
1882                metadata: metadata.clone(),
1883            },
1884            HydroNode::ObserveNonDet {
1885                inner,
1886                trusted,
1887                metadata,
1888            } => HydroNode::ObserveNonDet {
1889                inner: Box::new(inner.deep_clone(seen_tees)),
1890                trusted: *trusted,
1891                metadata: metadata.clone(),
1892            },
1893            HydroNode::Source { source, metadata } => HydroNode::Source {
1894                source: source.clone(),
1895                metadata: metadata.clone(),
1896            },
1897            HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1898                value: value.clone(),
1899                metadata: metadata.clone(),
1900            },
1901            HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1902                ident: ident.clone(),
1903                metadata: metadata.clone(),
1904            },
1905            HydroNode::Tee { inner, metadata } => {
1906                if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1907                    HydroNode::Tee {
1908                        inner: TeeNode(transformed.clone()),
1909                        metadata: metadata.clone(),
1910                    }
1911                } else {
1912                    let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1913                    seen_tees.insert(inner.as_ptr(), new_rc.clone());
1914                    let cloned = inner.0.borrow().deep_clone(seen_tees);
1915                    *new_rc.borrow_mut() = cloned;
1916                    HydroNode::Tee {
1917                        inner: TeeNode(new_rc),
1918                        metadata: metadata.clone(),
1919                    }
1920                }
1921            }
1922            HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1923                inner: Box::new(inner.deep_clone(seen_tees)),
1924                metadata: metadata.clone(),
1925            },
1926            HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1927                inner: Box::new(inner.deep_clone(seen_tees)),
1928                metadata: metadata.clone(),
1929            },
1930            HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1931                inner: Box::new(inner.deep_clone(seen_tees)),
1932                metadata: metadata.clone(),
1933            },
1934            HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1935                inner: Box::new(inner.deep_clone(seen_tees)),
1936                metadata: metadata.clone(),
1937            },
1938            HydroNode::Chain {
1939                first,
1940                second,
1941                metadata,
1942            } => HydroNode::Chain {
1943                first: Box::new(first.deep_clone(seen_tees)),
1944                second: Box::new(second.deep_clone(seen_tees)),
1945                metadata: metadata.clone(),
1946            },
1947            HydroNode::ChainFirst {
1948                first,
1949                second,
1950                metadata,
1951            } => HydroNode::ChainFirst {
1952                first: Box::new(first.deep_clone(seen_tees)),
1953                second: Box::new(second.deep_clone(seen_tees)),
1954                metadata: metadata.clone(),
1955            },
1956            HydroNode::CrossProduct {
1957                left,
1958                right,
1959                metadata,
1960            } => HydroNode::CrossProduct {
1961                left: Box::new(left.deep_clone(seen_tees)),
1962                right: Box::new(right.deep_clone(seen_tees)),
1963                metadata: metadata.clone(),
1964            },
1965            HydroNode::CrossSingleton {
1966                left,
1967                right,
1968                metadata,
1969            } => HydroNode::CrossSingleton {
1970                left: Box::new(left.deep_clone(seen_tees)),
1971                right: Box::new(right.deep_clone(seen_tees)),
1972                metadata: metadata.clone(),
1973            },
1974            HydroNode::Join {
1975                left,
1976                right,
1977                metadata,
1978            } => HydroNode::Join {
1979                left: Box::new(left.deep_clone(seen_tees)),
1980                right: Box::new(right.deep_clone(seen_tees)),
1981                metadata: metadata.clone(),
1982            },
1983            HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1984                pos: Box::new(pos.deep_clone(seen_tees)),
1985                neg: Box::new(neg.deep_clone(seen_tees)),
1986                metadata: metadata.clone(),
1987            },
1988            HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1989                pos: Box::new(pos.deep_clone(seen_tees)),
1990                neg: Box::new(neg.deep_clone(seen_tees)),
1991                metadata: metadata.clone(),
1992            },
1993            HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1994                input: Box::new(input.deep_clone(seen_tees)),
1995                metadata: metadata.clone(),
1996            },
1997            HydroNode::ResolveFuturesOrdered { input, metadata } => {
1998                HydroNode::ResolveFuturesOrdered {
1999                    input: Box::new(input.deep_clone(seen_tees)),
2000                    metadata: metadata.clone(),
2001                }
2002            }
2003            HydroNode::Map { f, input, metadata } => HydroNode::Map {
2004                f: f.clone(),
2005                input: Box::new(input.deep_clone(seen_tees)),
2006                metadata: metadata.clone(),
2007            },
2008            HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2009                f: f.clone(),
2010                input: Box::new(input.deep_clone(seen_tees)),
2011                metadata: metadata.clone(),
2012            },
2013            HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2014                f: f.clone(),
2015                input: Box::new(input.deep_clone(seen_tees)),
2016                metadata: metadata.clone(),
2017            },
2018            HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2019                f: f.clone(),
2020                input: Box::new(input.deep_clone(seen_tees)),
2021                metadata: metadata.clone(),
2022            },
2023            HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2024                input: Box::new(input.deep_clone(seen_tees)),
2025                metadata: metadata.clone(),
2026            },
2027            HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2028                input: Box::new(input.deep_clone(seen_tees)),
2029                metadata: metadata.clone(),
2030            },
2031            HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2032                f: f.clone(),
2033                input: Box::new(input.deep_clone(seen_tees)),
2034                metadata: metadata.clone(),
2035            },
2036            HydroNode::Unique { input, metadata } => HydroNode::Unique {
2037                input: Box::new(input.deep_clone(seen_tees)),
2038                metadata: metadata.clone(),
2039            },
2040            HydroNode::Sort { input, metadata } => HydroNode::Sort {
2041                input: Box::new(input.deep_clone(seen_tees)),
2042                metadata: metadata.clone(),
2043            },
2044            HydroNode::Fold {
2045                init,
2046                acc,
2047                input,
2048                metadata,
2049            } => HydroNode::Fold {
2050                init: init.clone(),
2051                acc: acc.clone(),
2052                input: Box::new(input.deep_clone(seen_tees)),
2053                metadata: metadata.clone(),
2054            },
2055            HydroNode::Scan {
2056                init,
2057                acc,
2058                input,
2059                metadata,
2060            } => HydroNode::Scan {
2061                init: init.clone(),
2062                acc: acc.clone(),
2063                input: Box::new(input.deep_clone(seen_tees)),
2064                metadata: metadata.clone(),
2065            },
2066            HydroNode::FoldKeyed {
2067                init,
2068                acc,
2069                input,
2070                metadata,
2071            } => HydroNode::FoldKeyed {
2072                init: init.clone(),
2073                acc: acc.clone(),
2074                input: Box::new(input.deep_clone(seen_tees)),
2075                metadata: metadata.clone(),
2076            },
2077            HydroNode::ReduceKeyedWatermark {
2078                f,
2079                input,
2080                watermark,
2081                metadata,
2082            } => HydroNode::ReduceKeyedWatermark {
2083                f: f.clone(),
2084                input: Box::new(input.deep_clone(seen_tees)),
2085                watermark: Box::new(watermark.deep_clone(seen_tees)),
2086                metadata: metadata.clone(),
2087            },
2088            HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2089                f: f.clone(),
2090                input: Box::new(input.deep_clone(seen_tees)),
2091                metadata: metadata.clone(),
2092            },
2093            HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2094                f: f.clone(),
2095                input: Box::new(input.deep_clone(seen_tees)),
2096                metadata: metadata.clone(),
2097            },
2098            HydroNode::Network {
2099                name,
2100                serialize_fn,
2101                instantiate_fn,
2102                deserialize_fn,
2103                input,
2104                metadata,
2105            } => HydroNode::Network {
2106                name: name.clone(),
2107                serialize_fn: serialize_fn.clone(),
2108                instantiate_fn: instantiate_fn.clone(),
2109                deserialize_fn: deserialize_fn.clone(),
2110                input: Box::new(input.deep_clone(seen_tees)),
2111                metadata: metadata.clone(),
2112            },
2113            HydroNode::ExternalInput {
2114                from_external_key,
2115                from_port_id,
2116                from_many,
2117                codec_type,
2118                port_hint,
2119                instantiate_fn,
2120                deserialize_fn,
2121                metadata,
2122            } => HydroNode::ExternalInput {
2123                from_external_key: *from_external_key,
2124                from_port_id: *from_port_id,
2125                from_many: *from_many,
2126                codec_type: codec_type.clone(),
2127                port_hint: *port_hint,
2128                instantiate_fn: instantiate_fn.clone(),
2129                deserialize_fn: deserialize_fn.clone(),
2130                metadata: metadata.clone(),
2131            },
2132            HydroNode::Counter {
2133                tag,
2134                duration,
2135                prefix,
2136                input,
2137                metadata,
2138            } => HydroNode::Counter {
2139                tag: tag.clone(),
2140                duration: duration.clone(),
2141                prefix: prefix.clone(),
2142                input: Box::new(input.deep_clone(seen_tees)),
2143                metadata: metadata.clone(),
2144            },
2145            HydroNode::EmbeddedInput { ident, metadata } => HydroNode::EmbeddedInput {
2146                ident: ident.clone(),
2147                metadata: metadata.clone(),
2148            },
2149        }
2150    }
2151
2152    #[cfg(feature = "build")]
2153    pub fn emit_core<'a, D: Deploy<'a>>(
2154        &mut self,
2155        builders_or_callback: &mut BuildersOrCallback<
2156            impl FnMut(&mut HydroRoot, &mut usize),
2157            impl FnMut(&mut HydroNode, &mut usize),
2158        >,
2159        seen_tees: &mut SeenTees,
2160        built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2161        next_stmt_id: &mut usize,
2162    ) -> syn::Ident {
2163        let mut ident_stack: Vec<syn::Ident> = Vec::new();
2164
2165        self.transform_bottom_up(
2166            &mut |node: &mut HydroNode| {
2167                let out_location = node.metadata().location_id.clone();
2168                match node {
2169                    HydroNode::Placeholder => {
2170                        panic!()
2171                    }
2172
2173                    HydroNode::Cast { .. } => {
2174                        // Cast passes through the input ident unchanged
2175                        // The input ident is already on the stack from processing the child
2176                        match builders_or_callback {
2177                            BuildersOrCallback::Builders(_) => {}
2178                            BuildersOrCallback::Callback(_, node_callback) => {
2179                                node_callback(node, next_stmt_id);
2180                            }
2181                        }
2182
2183                        *next_stmt_id += 1;
2184                        // input_ident stays on stack as output
2185                    }
2186
2187                    HydroNode::ObserveNonDet {
2188                        inner,
2189                        trusted,
2190                        metadata,
2191                        ..
2192                    } => {
2193                        let inner_ident = ident_stack.pop().unwrap();
2194
2195                        let observe_ident =
2196                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2197
2198                        match builders_or_callback {
2199                            BuildersOrCallback::Builders(graph_builders) => {
2200                                graph_builders.observe_nondet(
2201                                    *trusted,
2202                                    &inner.metadata().location_id,
2203                                    inner_ident,
2204                                    &inner.metadata().collection_kind,
2205                                    &observe_ident,
2206                                    &metadata.collection_kind,
2207                                    &metadata.op,
2208                                );
2209                            }
2210                            BuildersOrCallback::Callback(_, node_callback) => {
2211                                node_callback(node, next_stmt_id);
2212                            }
2213                        }
2214
2215                        *next_stmt_id += 1;
2216
2217                        ident_stack.push(observe_ident);
2218                    }
2219
2220                    HydroNode::Batch {
2221                        inner, metadata, ..
2222                    } => {
2223                        let inner_ident = ident_stack.pop().unwrap();
2224
2225                        let batch_ident =
2226                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2227
2228                        match builders_or_callback {
2229                            BuildersOrCallback::Builders(graph_builders) => {
2230                                graph_builders.batch(
2231                                    inner_ident,
2232                                    &inner.metadata().location_id,
2233                                    &inner.metadata().collection_kind,
2234                                    &batch_ident,
2235                                    &out_location,
2236                                    &metadata.op,
2237                                );
2238                            }
2239                            BuildersOrCallback::Callback(_, node_callback) => {
2240                                node_callback(node, next_stmt_id);
2241                            }
2242                        }
2243
2244                        *next_stmt_id += 1;
2245
2246                        ident_stack.push(batch_ident);
2247                    }
2248
2249                    HydroNode::YieldConcat { inner, .. } => {
2250                        let inner_ident = ident_stack.pop().unwrap();
2251
2252                        let yield_ident =
2253                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2254
2255                        match builders_or_callback {
2256                            BuildersOrCallback::Builders(graph_builders) => {
2257                                graph_builders.yield_from_tick(
2258                                    inner_ident,
2259                                    &inner.metadata().location_id,
2260                                    &inner.metadata().collection_kind,
2261                                    &yield_ident,
2262                                    &out_location,
2263                                );
2264                            }
2265                            BuildersOrCallback::Callback(_, node_callback) => {
2266                                node_callback(node, next_stmt_id);
2267                            }
2268                        }
2269
2270                        *next_stmt_id += 1;
2271
2272                        ident_stack.push(yield_ident);
2273                    }
2274
2275                    HydroNode::BeginAtomic { inner, metadata } => {
2276                        let inner_ident = ident_stack.pop().unwrap();
2277
2278                        let begin_ident =
2279                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2280
2281                        match builders_or_callback {
2282                            BuildersOrCallback::Builders(graph_builders) => {
2283                                graph_builders.begin_atomic(
2284                                    inner_ident,
2285                                    &inner.metadata().location_id,
2286                                    &inner.metadata().collection_kind,
2287                                    &begin_ident,
2288                                    &out_location,
2289                                    &metadata.op,
2290                                );
2291                            }
2292                            BuildersOrCallback::Callback(_, node_callback) => {
2293                                node_callback(node, next_stmt_id);
2294                            }
2295                        }
2296
2297                        *next_stmt_id += 1;
2298
2299                        ident_stack.push(begin_ident);
2300                    }
2301
2302                    HydroNode::EndAtomic { inner, .. } => {
2303                        let inner_ident = ident_stack.pop().unwrap();
2304
2305                        let end_ident =
2306                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2307
2308                        match builders_or_callback {
2309                            BuildersOrCallback::Builders(graph_builders) => {
2310                                graph_builders.end_atomic(
2311                                    inner_ident,
2312                                    &inner.metadata().location_id,
2313                                    &inner.metadata().collection_kind,
2314                                    &end_ident,
2315                                );
2316                            }
2317                            BuildersOrCallback::Callback(_, node_callback) => {
2318                                node_callback(node, next_stmt_id);
2319                            }
2320                        }
2321
2322                        *next_stmt_id += 1;
2323
2324                        ident_stack.push(end_ident);
2325                    }
2326
2327                    HydroNode::Source {
2328                        source, metadata, ..
2329                    } => {
2330                        if let HydroSource::ExternalNetwork() = source {
2331                            ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2332                        } else {
2333                            let source_ident =
2334                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2335
2336                            let source_stmt = match source {
2337                                HydroSource::Stream(expr) => {
2338                                    debug_assert!(metadata.location_id.is_top_level());
2339                                    parse_quote! {
2340                                        #source_ident = source_stream(#expr);
2341                                    }
2342                                }
2343
2344                                HydroSource::ExternalNetwork() => {
2345                                    unreachable!()
2346                                }
2347
2348                                HydroSource::Iter(expr) => {
2349                                    if metadata.location_id.is_top_level() {
2350                                        parse_quote! {
2351                                            #source_ident = source_iter(#expr);
2352                                        }
2353                                    } else {
2354                                        // TODO(shadaj): a more natural semantics would be to to re-evaluate the expression on each tick
2355                                        parse_quote! {
2356                                            #source_ident = source_iter(#expr) -> persist::<'static>();
2357                                        }
2358                                    }
2359                                }
2360
2361                                HydroSource::Spin() => {
2362                                    debug_assert!(metadata.location_id.is_top_level());
2363                                    parse_quote! {
2364                                        #source_ident = spin();
2365                                    }
2366                                }
2367
2368                                HydroSource::ClusterMembers(location_id) => {
2369                                    debug_assert!(metadata.location_id.is_top_level());
2370
2371                                    let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
2372                                        D::cluster_membership_stream(location_id),
2373                                        &(),
2374                                    );
2375
2376                                    parse_quote! {
2377                                        #source_ident = source_stream(#expr);
2378                                    }
2379                                }
2380                            };
2381
2382                            match builders_or_callback {
2383                                BuildersOrCallback::Builders(graph_builders) => {
2384                                    let builder = graph_builders.get_dfir_mut(&out_location);
2385                                    builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2386                                }
2387                                BuildersOrCallback::Callback(_, node_callback) => {
2388                                    node_callback(node, next_stmt_id);
2389                                }
2390                            }
2391
2392                            *next_stmt_id += 1;
2393
2394                            ident_stack.push(source_ident);
2395                        }
2396                    }
2397
2398                    HydroNode::SingletonSource { value, metadata } => {
2399                        let source_ident =
2400                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2401
2402                        match builders_or_callback {
2403                            BuildersOrCallback::Builders(graph_builders) => {
2404                                let builder = graph_builders.get_dfir_mut(&out_location);
2405
2406                                if metadata.location_id.is_top_level()
2407                                    && metadata.collection_kind.is_bounded()
2408                                {
2409                                    builder.add_dfir(
2410                                        parse_quote! {
2411                                            #source_ident = source_iter([#value]);
2412                                        },
2413                                        None,
2414                                        Some(&next_stmt_id.to_string()),
2415                                    );
2416                                } else {
2417                                    builder.add_dfir(
2418                                        parse_quote! {
2419                                            #source_ident = source_iter([#value]) -> persist::<'static>();
2420                                        },
2421                                        None,
2422                                        Some(&next_stmt_id.to_string()),
2423                                    );
2424                                }
2425                            }
2426                            BuildersOrCallback::Callback(_, node_callback) => {
2427                                node_callback(node, next_stmt_id);
2428                            }
2429                        }
2430
2431                        *next_stmt_id += 1;
2432
2433                        ident_stack.push(source_ident);
2434                    }
2435
2436                    HydroNode::CycleSource { ident, .. } => {
2437                        let ident = ident.clone();
2438
2439                        match builders_or_callback {
2440                            BuildersOrCallback::Builders(_) => {}
2441                            BuildersOrCallback::Callback(_, node_callback) => {
2442                                node_callback(node, next_stmt_id);
2443                            }
2444                        }
2445
2446                        // consume a stmt id even though we did not emit anything so that we can instrument this
2447                        *next_stmt_id += 1;
2448
2449                        ident_stack.push(ident);
2450                    }
2451
2452                    HydroNode::Tee { inner, .. } => {
2453                        let ret_ident = if let Some(teed_from) =
2454                            built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2455                        {
2456                            match builders_or_callback {
2457                                BuildersOrCallback::Builders(_) => {}
2458                                BuildersOrCallback::Callback(_, node_callback) => {
2459                                    node_callback(node, next_stmt_id);
2460                                }
2461                            }
2462
2463                            teed_from.clone()
2464                        } else {
2465                            // The inner node was already processed by transform_bottom_up,
2466                            // so its ident is on the stack
2467                            let inner_ident = ident_stack.pop().unwrap();
2468
2469                            let tee_ident =
2470                                syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2471
2472                            built_tees.insert(
2473                                inner.0.as_ref() as *const RefCell<HydroNode>,
2474                                tee_ident.clone(),
2475                            );
2476
2477                            match builders_or_callback {
2478                                BuildersOrCallback::Builders(graph_builders) => {
2479                                    let builder = graph_builders.get_dfir_mut(&out_location);
2480                                    builder.add_dfir(
2481                                        parse_quote! {
2482                                            #tee_ident = #inner_ident -> tee();
2483                                        },
2484                                        None,
2485                                        Some(&next_stmt_id.to_string()),
2486                                    );
2487                                }
2488                                BuildersOrCallback::Callback(_, node_callback) => {
2489                                    node_callback(node, next_stmt_id);
2490                                }
2491                            }
2492
2493                            tee_ident
2494                        };
2495
2496                        // we consume a stmt id regardless of if we emit the tee() operator,
2497                        // so that during rewrites we touch all recipients of the tee()
2498
2499                        *next_stmt_id += 1;
2500                        ident_stack.push(ret_ident);
2501                    }
2502
2503                    HydroNode::Chain { .. } => {
2504                        // Children are processed left-to-right, so second is on top
2505                        let second_ident = ident_stack.pop().unwrap();
2506                        let first_ident = ident_stack.pop().unwrap();
2507
2508                        let chain_ident =
2509                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2510
2511                        match builders_or_callback {
2512                            BuildersOrCallback::Builders(graph_builders) => {
2513                                let builder = graph_builders.get_dfir_mut(&out_location);
2514                                builder.add_dfir(
2515                                    parse_quote! {
2516                                        #chain_ident = chain();
2517                                        #first_ident -> [0]#chain_ident;
2518                                        #second_ident -> [1]#chain_ident;
2519                                    },
2520                                    None,
2521                                    Some(&next_stmt_id.to_string()),
2522                                );
2523                            }
2524                            BuildersOrCallback::Callback(_, node_callback) => {
2525                                node_callback(node, next_stmt_id);
2526                            }
2527                        }
2528
2529                        *next_stmt_id += 1;
2530
2531                        ident_stack.push(chain_ident);
2532                    }
2533
2534                    HydroNode::ChainFirst { .. } => {
2535                        let second_ident = ident_stack.pop().unwrap();
2536                        let first_ident = ident_stack.pop().unwrap();
2537
2538                        let chain_ident =
2539                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2540
2541                        match builders_or_callback {
2542                            BuildersOrCallback::Builders(graph_builders) => {
2543                                let builder = graph_builders.get_dfir_mut(&out_location);
2544                                builder.add_dfir(
2545                                    parse_quote! {
2546                                        #chain_ident = chain_first_n(1);
2547                                        #first_ident -> [0]#chain_ident;
2548                                        #second_ident -> [1]#chain_ident;
2549                                    },
2550                                    None,
2551                                    Some(&next_stmt_id.to_string()),
2552                                );
2553                            }
2554                            BuildersOrCallback::Callback(_, node_callback) => {
2555                                node_callback(node, next_stmt_id);
2556                            }
2557                        }
2558
2559                        *next_stmt_id += 1;
2560
2561                        ident_stack.push(chain_ident);
2562                    }
2563
2564                    HydroNode::CrossSingleton { right, .. } => {
2565                        let right_ident = ident_stack.pop().unwrap();
2566                        let left_ident = ident_stack.pop().unwrap();
2567
2568                        let cross_ident =
2569                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2570
2571                        match builders_or_callback {
2572                            BuildersOrCallback::Builders(graph_builders) => {
2573                                let builder = graph_builders.get_dfir_mut(&out_location);
2574
2575                                if right.metadata().location_id.is_top_level()
2576                                    && right.metadata().collection_kind.is_bounded()
2577                                {
2578                                    builder.add_dfir(
2579                                        parse_quote! {
2580                                            #cross_ident = cross_singleton();
2581                                            #left_ident -> [input]#cross_ident;
2582                                            #right_ident -> persist::<'static>() -> [single]#cross_ident;
2583                                        },
2584                                        None,
2585                                        Some(&next_stmt_id.to_string()),
2586                                    );
2587                                } else {
2588                                    builder.add_dfir(
2589                                        parse_quote! {
2590                                            #cross_ident = cross_singleton();
2591                                            #left_ident -> [input]#cross_ident;
2592                                            #right_ident -> [single]#cross_ident;
2593                                        },
2594                                        None,
2595                                        Some(&next_stmt_id.to_string()),
2596                                    );
2597                                }
2598                            }
2599                            BuildersOrCallback::Callback(_, node_callback) => {
2600                                node_callback(node, next_stmt_id);
2601                            }
2602                        }
2603
2604                        *next_stmt_id += 1;
2605
2606                        ident_stack.push(cross_ident);
2607                    }
2608
2609                    HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2610                        let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2611                            parse_quote!(cross_join_multiset)
2612                        } else {
2613                            parse_quote!(join_multiset)
2614                        };
2615
2616                        let (HydroNode::CrossProduct { left, right, .. }
2617                        | HydroNode::Join { left, right, .. }) = node
2618                        else {
2619                            unreachable!()
2620                        };
2621
2622                        let is_top_level = left.metadata().location_id.is_top_level()
2623                            && right.metadata().location_id.is_top_level();
2624                        let left_lifetime = if left.metadata().location_id.is_top_level() {
2625                            quote!('static)
2626                        } else {
2627                            quote!('tick)
2628                        };
2629
2630                        let right_lifetime = if right.metadata().location_id.is_top_level() {
2631                            quote!('static)
2632                        } else {
2633                            quote!('tick)
2634                        };
2635
2636                        let right_ident = ident_stack.pop().unwrap();
2637                        let left_ident = ident_stack.pop().unwrap();
2638
2639                        let stream_ident =
2640                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2641
2642                        match builders_or_callback {
2643                            BuildersOrCallback::Builders(graph_builders) => {
2644                                let builder = graph_builders.get_dfir_mut(&out_location);
2645                                builder.add_dfir(
2646                                    if is_top_level {
2647                                        // if both inputs are root, the output is expected to have streamy semantics, so we need
2648                                        // a multiset_delta() to negate the replay behavior
2649                                        parse_quote! {
2650                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2651                                            #left_ident -> [0]#stream_ident;
2652                                            #right_ident -> [1]#stream_ident;
2653                                        }
2654                                    } else {
2655                                        parse_quote! {
2656                                            #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2657                                            #left_ident -> [0]#stream_ident;
2658                                            #right_ident -> [1]#stream_ident;
2659                                        }
2660                                    }
2661                                    ,
2662                                    None,
2663                                    Some(&next_stmt_id.to_string()),
2664                                );
2665                            }
2666                            BuildersOrCallback::Callback(_, node_callback) => {
2667                                node_callback(node, next_stmt_id);
2668                            }
2669                        }
2670
2671                        *next_stmt_id += 1;
2672
2673                        ident_stack.push(stream_ident);
2674                    }
2675
2676                    HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2677                        let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2678                            parse_quote!(difference)
2679                        } else {
2680                            parse_quote!(anti_join)
2681                        };
2682
2683                        let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2684                            node
2685                        else {
2686                            unreachable!()
2687                        };
2688
2689                        let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2690                            quote!('static)
2691                        } else {
2692                            quote!('tick)
2693                        };
2694
2695                        let neg_ident = ident_stack.pop().unwrap();
2696                        let pos_ident = ident_stack.pop().unwrap();
2697
2698                        let stream_ident =
2699                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2700
2701                        match builders_or_callback {
2702                            BuildersOrCallback::Builders(graph_builders) => {
2703                                let builder = graph_builders.get_dfir_mut(&out_location);
2704                                builder.add_dfir(
2705                                    parse_quote! {
2706                                        #stream_ident = #operator::<'tick, #neg_lifetime>();
2707                                        #pos_ident -> [pos]#stream_ident;
2708                                        #neg_ident -> [neg]#stream_ident;
2709                                    },
2710                                    None,
2711                                    Some(&next_stmt_id.to_string()),
2712                                );
2713                            }
2714                            BuildersOrCallback::Callback(_, node_callback) => {
2715                                node_callback(node, next_stmt_id);
2716                            }
2717                        }
2718
2719                        *next_stmt_id += 1;
2720
2721                        ident_stack.push(stream_ident);
2722                    }
2723
2724                    HydroNode::ResolveFutures { .. } => {
2725                        let input_ident = ident_stack.pop().unwrap();
2726
2727                        let futures_ident =
2728                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2729
2730                        match builders_or_callback {
2731                            BuildersOrCallback::Builders(graph_builders) => {
2732                                let builder = graph_builders.get_dfir_mut(&out_location);
2733                                builder.add_dfir(
2734                                    parse_quote! {
2735                                        #futures_ident = #input_ident -> resolve_futures();
2736                                    },
2737                                    None,
2738                                    Some(&next_stmt_id.to_string()),
2739                                );
2740                            }
2741                            BuildersOrCallback::Callback(_, node_callback) => {
2742                                node_callback(node, next_stmt_id);
2743                            }
2744                        }
2745
2746                        *next_stmt_id += 1;
2747
2748                        ident_stack.push(futures_ident);
2749                    }
2750
2751                    HydroNode::ResolveFuturesOrdered { .. } => {
2752                        let input_ident = ident_stack.pop().unwrap();
2753
2754                        let futures_ident =
2755                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2756
2757                        match builders_or_callback {
2758                            BuildersOrCallback::Builders(graph_builders) => {
2759                                let builder = graph_builders.get_dfir_mut(&out_location);
2760                                builder.add_dfir(
2761                                    parse_quote! {
2762                                        #futures_ident = #input_ident -> resolve_futures_ordered();
2763                                    },
2764                                    None,
2765                                    Some(&next_stmt_id.to_string()),
2766                                );
2767                            }
2768                            BuildersOrCallback::Callback(_, node_callback) => {
2769                                node_callback(node, next_stmt_id);
2770                            }
2771                        }
2772
2773                        *next_stmt_id += 1;
2774
2775                        ident_stack.push(futures_ident);
2776                    }
2777
2778                    HydroNode::Map { f, .. } => {
2779                        let input_ident = ident_stack.pop().unwrap();
2780
2781                        let map_ident =
2782                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2783
2784                        match builders_or_callback {
2785                            BuildersOrCallback::Builders(graph_builders) => {
2786                                let builder = graph_builders.get_dfir_mut(&out_location);
2787                                builder.add_dfir(
2788                                    parse_quote! {
2789                                        #map_ident = #input_ident -> map(#f);
2790                                    },
2791                                    None,
2792                                    Some(&next_stmt_id.to_string()),
2793                                );
2794                            }
2795                            BuildersOrCallback::Callback(_, node_callback) => {
2796                                node_callback(node, next_stmt_id);
2797                            }
2798                        }
2799
2800                        *next_stmt_id += 1;
2801
2802                        ident_stack.push(map_ident);
2803                    }
2804
2805                    HydroNode::FlatMap { f, .. } => {
2806                        let input_ident = ident_stack.pop().unwrap();
2807
2808                        let flat_map_ident =
2809                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2810
2811                        match builders_or_callback {
2812                            BuildersOrCallback::Builders(graph_builders) => {
2813                                let builder = graph_builders.get_dfir_mut(&out_location);
2814                                builder.add_dfir(
2815                                    parse_quote! {
2816                                        #flat_map_ident = #input_ident -> flat_map(#f);
2817                                    },
2818                                    None,
2819                                    Some(&next_stmt_id.to_string()),
2820                                );
2821                            }
2822                            BuildersOrCallback::Callback(_, node_callback) => {
2823                                node_callback(node, next_stmt_id);
2824                            }
2825                        }
2826
2827                        *next_stmt_id += 1;
2828
2829                        ident_stack.push(flat_map_ident);
2830                    }
2831
2832                    HydroNode::Filter { f, .. } => {
2833                        let input_ident = ident_stack.pop().unwrap();
2834
2835                        let filter_ident =
2836                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2837
2838                        match builders_or_callback {
2839                            BuildersOrCallback::Builders(graph_builders) => {
2840                                let builder = graph_builders.get_dfir_mut(&out_location);
2841                                builder.add_dfir(
2842                                    parse_quote! {
2843                                        #filter_ident = #input_ident -> filter(#f);
2844                                    },
2845                                    None,
2846                                    Some(&next_stmt_id.to_string()),
2847                                );
2848                            }
2849                            BuildersOrCallback::Callback(_, node_callback) => {
2850                                node_callback(node, next_stmt_id);
2851                            }
2852                        }
2853
2854                        *next_stmt_id += 1;
2855
2856                        ident_stack.push(filter_ident);
2857                    }
2858
2859                    HydroNode::FilterMap { f, .. } => {
2860                        let input_ident = ident_stack.pop().unwrap();
2861
2862                        let filter_map_ident =
2863                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2864
2865                        match builders_or_callback {
2866                            BuildersOrCallback::Builders(graph_builders) => {
2867                                let builder = graph_builders.get_dfir_mut(&out_location);
2868                                builder.add_dfir(
2869                                    parse_quote! {
2870                                        #filter_map_ident = #input_ident -> filter_map(#f);
2871                                    },
2872                                    None,
2873                                    Some(&next_stmt_id.to_string()),
2874                                );
2875                            }
2876                            BuildersOrCallback::Callback(_, node_callback) => {
2877                                node_callback(node, next_stmt_id);
2878                            }
2879                        }
2880
2881                        *next_stmt_id += 1;
2882
2883                        ident_stack.push(filter_map_ident);
2884                    }
2885
2886                    HydroNode::Sort { .. } => {
2887                        let input_ident = ident_stack.pop().unwrap();
2888
2889                        let sort_ident =
2890                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2891
2892                        match builders_or_callback {
2893                            BuildersOrCallback::Builders(graph_builders) => {
2894                                let builder = graph_builders.get_dfir_mut(&out_location);
2895                                builder.add_dfir(
2896                                    parse_quote! {
2897                                        #sort_ident = #input_ident -> sort();
2898                                    },
2899                                    None,
2900                                    Some(&next_stmt_id.to_string()),
2901                                );
2902                            }
2903                            BuildersOrCallback::Callback(_, node_callback) => {
2904                                node_callback(node, next_stmt_id);
2905                            }
2906                        }
2907
2908                        *next_stmt_id += 1;
2909
2910                        ident_stack.push(sort_ident);
2911                    }
2912
2913                    HydroNode::DeferTick { .. } => {
2914                        let input_ident = ident_stack.pop().unwrap();
2915
2916                        let defer_tick_ident =
2917                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2918
2919                        match builders_or_callback {
2920                            BuildersOrCallback::Builders(graph_builders) => {
2921                                let builder = graph_builders.get_dfir_mut(&out_location);
2922                                builder.add_dfir(
2923                                    parse_quote! {
2924                                        #defer_tick_ident = #input_ident -> defer_tick_lazy();
2925                                    },
2926                                    None,
2927                                    Some(&next_stmt_id.to_string()),
2928                                );
2929                            }
2930                            BuildersOrCallback::Callback(_, node_callback) => {
2931                                node_callback(node, next_stmt_id);
2932                            }
2933                        }
2934
2935                        *next_stmt_id += 1;
2936
2937                        ident_stack.push(defer_tick_ident);
2938                    }
2939
2940                    HydroNode::Enumerate { input, .. } => {
2941                        let input_ident = ident_stack.pop().unwrap();
2942
2943                        let enumerate_ident =
2944                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2945
2946                        match builders_or_callback {
2947                            BuildersOrCallback::Builders(graph_builders) => {
2948                                let builder = graph_builders.get_dfir_mut(&out_location);
2949                                let lifetime = if input.metadata().location_id.is_top_level() {
2950                                    quote!('static)
2951                                } else {
2952                                    quote!('tick)
2953                                };
2954                                builder.add_dfir(
2955                                    parse_quote! {
2956                                        #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2957                                    },
2958                                    None,
2959                                    Some(&next_stmt_id.to_string()),
2960                                );
2961                            }
2962                            BuildersOrCallback::Callback(_, node_callback) => {
2963                                node_callback(node, next_stmt_id);
2964                            }
2965                        }
2966
2967                        *next_stmt_id += 1;
2968
2969                        ident_stack.push(enumerate_ident);
2970                    }
2971
2972                    HydroNode::Inspect { f, .. } => {
2973                        let input_ident = ident_stack.pop().unwrap();
2974
2975                        let inspect_ident =
2976                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2977
2978                        match builders_or_callback {
2979                            BuildersOrCallback::Builders(graph_builders) => {
2980                                let builder = graph_builders.get_dfir_mut(&out_location);
2981                                builder.add_dfir(
2982                                    parse_quote! {
2983                                        #inspect_ident = #input_ident -> inspect(#f);
2984                                    },
2985                                    None,
2986                                    Some(&next_stmt_id.to_string()),
2987                                );
2988                            }
2989                            BuildersOrCallback::Callback(_, node_callback) => {
2990                                node_callback(node, next_stmt_id);
2991                            }
2992                        }
2993
2994                        *next_stmt_id += 1;
2995
2996                        ident_stack.push(inspect_ident);
2997                    }
2998
2999                    HydroNode::Unique { input, .. } => {
3000                        let input_ident = ident_stack.pop().unwrap();
3001
3002                        let unique_ident =
3003                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3004
3005                        match builders_or_callback {
3006                            BuildersOrCallback::Builders(graph_builders) => {
3007                                let builder = graph_builders.get_dfir_mut(&out_location);
3008                                let lifetime = if input.metadata().location_id.is_top_level() {
3009                                    quote!('static)
3010                                } else {
3011                                    quote!('tick)
3012                                };
3013
3014                                builder.add_dfir(
3015                                    parse_quote! {
3016                                        #unique_ident = #input_ident -> unique::<#lifetime>();
3017                                    },
3018                                    None,
3019                                    Some(&next_stmt_id.to_string()),
3020                                );
3021                            }
3022                            BuildersOrCallback::Callback(_, node_callback) => {
3023                                node_callback(node, next_stmt_id);
3024                            }
3025                        }
3026
3027                        *next_stmt_id += 1;
3028
3029                        ident_stack.push(unique_ident);
3030                    }
3031
3032                    HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3033                        let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3034                            if input.metadata().location_id.is_top_level()
3035                                && input.metadata().collection_kind.is_bounded()
3036                            {
3037                                parse_quote!(fold_no_replay)
3038                            } else {
3039                                parse_quote!(fold)
3040                            }
3041                        } else if matches!(node, HydroNode::Scan { .. }) {
3042                            parse_quote!(scan)
3043                        } else if let HydroNode::FoldKeyed { input, .. } = node {
3044                            if input.metadata().location_id.is_top_level()
3045                                && input.metadata().collection_kind.is_bounded()
3046                            {
3047                                todo!("Fold keyed on a top-level bounded collection is not yet supported")
3048                            } else {
3049                                parse_quote!(fold_keyed)
3050                            }
3051                        } else {
3052                            unreachable!()
3053                        };
3054
3055                        let (HydroNode::Fold { input, .. }
3056                        | HydroNode::FoldKeyed { input, .. }
3057                        | HydroNode::Scan { input, .. }) = node
3058                        else {
3059                            unreachable!()
3060                        };
3061
3062                        let lifetime = if input.metadata().location_id.is_top_level() {
3063                            quote!('static)
3064                        } else {
3065                            quote!('tick)
3066                        };
3067
3068                        let input_ident = ident_stack.pop().unwrap();
3069
3070                        let (HydroNode::Fold { init, acc, .. }
3071                        | HydroNode::FoldKeyed { init, acc, .. }
3072                        | HydroNode::Scan { init, acc, .. }) = &*node
3073                        else {
3074                            unreachable!()
3075                        };
3076
3077                        let fold_ident =
3078                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3079
3080                        match builders_or_callback {
3081                            BuildersOrCallback::Builders(graph_builders) => {
3082                                if matches!(node, HydroNode::Fold { .. })
3083                                    && node.metadata().location_id.is_top_level()
3084                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3085                                    && graph_builders.singleton_intermediates()
3086                                    && !node.metadata().collection_kind.is_bounded()
3087                                {
3088                                    let builder = graph_builders.get_dfir_mut(&out_location);
3089
3090                                    let acc: syn::Expr = parse_quote!({
3091                                        let mut __inner = #acc;
3092                                        move |__state, __value| {
3093                                            __inner(__state, __value);
3094                                            Some(__state.clone())
3095                                        }
3096                                    });
3097
3098                                    builder.add_dfir(
3099                                        parse_quote! {
3100                                            source_iter([(#init)()]) -> [0]#fold_ident;
3101                                            #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3102                                            #fold_ident = chain();
3103                                        },
3104                                        None,
3105                                        Some(&next_stmt_id.to_string()),
3106                                    );
3107                                } else if matches!(node, HydroNode::FoldKeyed { .. })
3108                                    && node.metadata().location_id.is_top_level()
3109                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3110                                    && graph_builders.singleton_intermediates()
3111                                    && !node.metadata().collection_kind.is_bounded()
3112                                {
3113                                    let builder = graph_builders.get_dfir_mut(&out_location);
3114
3115                                    let acc: syn::Expr = parse_quote!({
3116                                        let mut __init = #init;
3117                                        let mut __inner = #acc;
3118                                        move |__state, __kv: (_, _)| {
3119                                            // TODO(shadaj): we can avoid the clone when the entry exists
3120                                            let __state = __state
3121                                                .entry(::std::clone::Clone::clone(&__kv.0))
3122                                                .or_insert_with(|| (__init)());
3123                                            __inner(__state, __kv.1);
3124                                            Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3125                                        }
3126                                    });
3127
3128                                    builder.add_dfir(
3129                                        parse_quote! {
3130                                            #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3131                                        },
3132                                        None,
3133                                        Some(&next_stmt_id.to_string()),
3134                                    );
3135                                } else {
3136                                    let builder = graph_builders.get_dfir_mut(&out_location);
3137                                    builder.add_dfir(
3138                                        parse_quote! {
3139                                            #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3140                                        },
3141                                        None,
3142                                        Some(&next_stmt_id.to_string()),
3143                                    );
3144                                }
3145                            }
3146                            BuildersOrCallback::Callback(_, node_callback) => {
3147                                node_callback(node, next_stmt_id);
3148                            }
3149                        }
3150
3151                        *next_stmt_id += 1;
3152
3153                        ident_stack.push(fold_ident);
3154                    }
3155
3156                    HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3157                        let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3158                            if input.metadata().location_id.is_top_level()
3159                                && input.metadata().collection_kind.is_bounded()
3160                            {
3161                                parse_quote!(reduce_no_replay)
3162                            } else {
3163                                parse_quote!(reduce)
3164                            }
3165                        } else if let HydroNode::ReduceKeyed { input, .. } = node {
3166                            if input.metadata().location_id.is_top_level()
3167                                && input.metadata().collection_kind.is_bounded()
3168                            {
3169                                todo!(
3170                                    "Calling keyed reduce on a top-level bounded collection is not supported"
3171                                )
3172                            } else {
3173                                parse_quote!(reduce_keyed)
3174                            }
3175                        } else {
3176                            unreachable!()
3177                        };
3178
3179                        let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3180                        else {
3181                            unreachable!()
3182                        };
3183
3184                        let lifetime = if input.metadata().location_id.is_top_level() {
3185                            quote!('static)
3186                        } else {
3187                            quote!('tick)
3188                        };
3189
3190                        let input_ident = ident_stack.pop().unwrap();
3191
3192                        let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3193                        else {
3194                            unreachable!()
3195                        };
3196
3197                        let reduce_ident =
3198                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3199
3200                        match builders_or_callback {
3201                            BuildersOrCallback::Builders(graph_builders) => {
3202                                if matches!(node, HydroNode::Reduce { .. })
3203                                    && node.metadata().location_id.is_top_level()
3204                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3205                                    && graph_builders.singleton_intermediates()
3206                                    && !node.metadata().collection_kind.is_bounded()
3207                                {
3208                                    todo!(
3209                                        "Reduce with optional intermediates is not yet supported in simulator"
3210                                    );
3211                                } else if matches!(node, HydroNode::ReduceKeyed { .. })
3212                                    && node.metadata().location_id.is_top_level()
3213                                    && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3214                                    && graph_builders.singleton_intermediates()
3215                                    && !node.metadata().collection_kind.is_bounded()
3216                                {
3217                                    todo!(
3218                                        "Reduce keyed with optional intermediates is not yet supported in simulator"
3219                                    );
3220                                } else {
3221                                    let builder = graph_builders.get_dfir_mut(&out_location);
3222                                    builder.add_dfir(
3223                                        parse_quote! {
3224                                            #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3225                                        },
3226                                        None,
3227                                        Some(&next_stmt_id.to_string()),
3228                                    );
3229                                }
3230                            }
3231                            BuildersOrCallback::Callback(_, node_callback) => {
3232                                node_callback(node, next_stmt_id);
3233                            }
3234                        }
3235
3236                        *next_stmt_id += 1;
3237
3238                        ident_stack.push(reduce_ident);
3239                    }
3240
3241                    HydroNode::ReduceKeyedWatermark {
3242                        f,
3243                        input,
3244                        metadata,
3245                        ..
3246                    } => {
3247                        let lifetime = if input.metadata().location_id.is_top_level() {
3248                            quote!('static)
3249                        } else {
3250                            quote!('tick)
3251                        };
3252
3253                        // watermark is processed second, so it's on top
3254                        let watermark_ident = ident_stack.pop().unwrap();
3255                        let input_ident = ident_stack.pop().unwrap();
3256
3257                        let chain_ident = syn::Ident::new(
3258                            &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3259                            Span::call_site(),
3260                        );
3261
3262                        let fold_ident =
3263                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3264
3265                        let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3266                            && input.metadata().collection_kind.is_bounded()
3267                        {
3268                            parse_quote!(fold_no_replay)
3269                        } else {
3270                            parse_quote!(fold)
3271                        };
3272
3273                        match builders_or_callback {
3274                            BuildersOrCallback::Builders(graph_builders) => {
3275                                if metadata.location_id.is_top_level()
3276                                    && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3277                                    && graph_builders.singleton_intermediates()
3278                                    && !metadata.collection_kind.is_bounded()
3279                                {
3280                                    todo!(
3281                                        "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3282                                    )
3283                                } else {
3284                                    let builder = graph_builders.get_dfir_mut(&out_location);
3285                                    builder.add_dfir(
3286                                        parse_quote! {
3287                                            #chain_ident = chain();
3288                                            #input_ident
3289                                                -> map(|x| (Some(x), None))
3290                                                -> [0]#chain_ident;
3291                                            #watermark_ident
3292                                                -> map(|watermark| (None, Some(watermark)))
3293                                                -> [1]#chain_ident;
3294
3295                                            #fold_ident = #chain_ident
3296                                                -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3297                                                    let __reduce_keyed_fn = #f;
3298                                                    move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3299                                                        if let Some((k, v)) = opt_payload {
3300                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3301                                                                if k <= curr_watermark {
3302                                                                    return;
3303                                                                }
3304                                                            }
3305                                                            match map.entry(k) {
3306                                                                ::std::collections::hash_map::Entry::Vacant(e) => {
3307                                                                    e.insert(v);
3308                                                                }
3309                                                                ::std::collections::hash_map::Entry::Occupied(mut e) => {
3310                                                                    __reduce_keyed_fn(e.get_mut(), v);
3311                                                                }
3312                                                            }
3313                                                        } else {
3314                                                            let watermark = opt_watermark.unwrap();
3315                                                            if let Some(curr_watermark) = *opt_curr_watermark {
3316                                                                if watermark <= curr_watermark {
3317                                                                    return;
3318                                                                }
3319                                                            }
3320                                                            *opt_curr_watermark = opt_watermark;
3321                                                            map.retain(|k, _| *k > watermark);
3322                                                        }
3323                                                    }
3324                                                })
3325                                                -> flat_map(|(map, _curr_watermark)| map);
3326                                        },
3327                                        None,
3328                                        Some(&next_stmt_id.to_string()),
3329                                    );
3330                                }
3331                            }
3332                            BuildersOrCallback::Callback(_, node_callback) => {
3333                                node_callback(node, next_stmt_id);
3334                            }
3335                        }
3336
3337                        *next_stmt_id += 1;
3338
3339                        ident_stack.push(fold_ident);
3340                    }
3341
3342                    HydroNode::Network {
3343                        serialize_fn: serialize_pipeline,
3344                        instantiate_fn,
3345                        deserialize_fn: deserialize_pipeline,
3346                        input,
3347                        ..
3348                    } => {
3349                        let input_ident = ident_stack.pop().unwrap();
3350
3351                        let receiver_stream_ident =
3352                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3353
3354                        match builders_or_callback {
3355                            BuildersOrCallback::Builders(graph_builders) => {
3356                                let (sink_expr, source_expr) = match instantiate_fn {
3357                                    DebugInstantiate::Building => (
3358                                        syn::parse_quote!(DUMMY_SINK),
3359                                        syn::parse_quote!(DUMMY_SOURCE),
3360                                    ),
3361
3362                                    DebugInstantiate::Finalized(finalized) => {
3363                                        (finalized.sink.clone(), finalized.source.clone())
3364                                    }
3365                                };
3366
3367                                graph_builders.create_network(
3368                                    &input.metadata().location_id,
3369                                    &out_location,
3370                                    input_ident,
3371                                    &receiver_stream_ident,
3372                                    serialize_pipeline.as_ref(),
3373                                    sink_expr,
3374                                    source_expr,
3375                                    deserialize_pipeline.as_ref(),
3376                                    *next_stmt_id,
3377                                );
3378                            }
3379                            BuildersOrCallback::Callback(_, node_callback) => {
3380                                node_callback(node, next_stmt_id);
3381                            }
3382                        }
3383
3384                        *next_stmt_id += 1;
3385
3386                        ident_stack.push(receiver_stream_ident);
3387                    }
3388
3389                    HydroNode::ExternalInput {
3390                        instantiate_fn,
3391                        deserialize_fn: deserialize_pipeline,
3392                        ..
3393                    } => {
3394                        let receiver_stream_ident =
3395                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3396
3397                        match builders_or_callback {
3398                            BuildersOrCallback::Builders(graph_builders) => {
3399                                let (_, source_expr) = match instantiate_fn {
3400                                    DebugInstantiate::Building => (
3401                                        syn::parse_quote!(DUMMY_SINK),
3402                                        syn::parse_quote!(DUMMY_SOURCE),
3403                                    ),
3404
3405                                    DebugInstantiate::Finalized(finalized) => {
3406                                        (finalized.sink.clone(), finalized.source.clone())
3407                                    }
3408                                };
3409
3410                                graph_builders.create_external_source(
3411                                    &out_location,
3412                                    source_expr,
3413                                    &receiver_stream_ident,
3414                                    deserialize_pipeline.as_ref(),
3415                                    *next_stmt_id,
3416                                );
3417                            }
3418                            BuildersOrCallback::Callback(_, node_callback) => {
3419                                node_callback(node, next_stmt_id);
3420                            }
3421                        }
3422
3423                        *next_stmt_id += 1;
3424
3425                        ident_stack.push(receiver_stream_ident);
3426                    }
3427
3428                    HydroNode::Counter {
3429                        tag,
3430                        duration,
3431                        prefix,
3432                        ..
3433                    } => {
3434                        let input_ident = ident_stack.pop().unwrap();
3435
3436                        let counter_ident =
3437                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3438
3439                        match builders_or_callback {
3440                            BuildersOrCallback::Builders(graph_builders) => {
3441                                let builder = graph_builders.get_dfir_mut(&out_location);
3442                                builder.add_dfir(
3443                                    parse_quote! {
3444                                        #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3445                                    },
3446                                    None,
3447                                    Some(&next_stmt_id.to_string()),
3448                                );
3449                            }
3450                            BuildersOrCallback::Callback(_, node_callback) => {
3451                                node_callback(node, next_stmt_id);
3452                            }
3453                        }
3454
3455                        *next_stmt_id += 1;
3456
3457                        ident_stack.push(counter_ident);
3458                    }
3459
3460                    HydroNode::EmbeddedInput { ident, .. } => {
3461                        let source_ident =
3462                            syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3463
3464                        let ident = ident.clone();
3465
3466                        match builders_or_callback {
3467                            BuildersOrCallback::Builders(graph_builders) => {
3468                                let builder = graph_builders.get_dfir_mut(&out_location);
3469                                builder.add_dfir(
3470                                    parse_quote! {
3471                                        #source_ident = source_stream(#ident);
3472                                    },
3473                                    None,
3474                                    Some(&next_stmt_id.to_string()),
3475                                );
3476                            }
3477                            BuildersOrCallback::Callback(_, node_callback) => {
3478                                node_callback(node, next_stmt_id);
3479                            }
3480                        }
3481
3482                        *next_stmt_id += 1;
3483
3484                        ident_stack.push(source_ident);
3485                    }
3486                }
3487            },
3488            seen_tees,
3489            false,
3490        );
3491
3492        ident_stack
3493            .pop()
3494            .expect("ident_stack should have exactly one element after traversal")
3495    }
3496
3497    pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3498        match self {
3499            HydroNode::Placeholder => {
3500                panic!()
3501            }
3502            HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3503            HydroNode::Source { source, .. } => match source {
3504                HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3505                HydroSource::ExternalNetwork()
3506                | HydroSource::Spin()
3507                | HydroSource::ClusterMembers(_) => {} // TODO: what goes here?
3508            },
3509            HydroNode::SingletonSource { value, .. } => {
3510                transform(value);
3511            }
3512            HydroNode::CycleSource { .. }
3513            | HydroNode::Tee { .. }
3514            | HydroNode::YieldConcat { .. }
3515            | HydroNode::BeginAtomic { .. }
3516            | HydroNode::EndAtomic { .. }
3517            | HydroNode::Batch { .. }
3518            | HydroNode::Chain { .. }
3519            | HydroNode::ChainFirst { .. }
3520            | HydroNode::CrossProduct { .. }
3521            | HydroNode::CrossSingleton { .. }
3522            | HydroNode::ResolveFutures { .. }
3523            | HydroNode::ResolveFuturesOrdered { .. }
3524            | HydroNode::Join { .. }
3525            | HydroNode::Difference { .. }
3526            | HydroNode::AntiJoin { .. }
3527            | HydroNode::DeferTick { .. }
3528            | HydroNode::Enumerate { .. }
3529            | HydroNode::Unique { .. }
3530            | HydroNode::Sort { .. } => {}
3531            HydroNode::Map { f, .. }
3532            | HydroNode::FlatMap { f, .. }
3533            | HydroNode::Filter { f, .. }
3534            | HydroNode::FilterMap { f, .. }
3535            | HydroNode::Inspect { f, .. }
3536            | HydroNode::Reduce { f, .. }
3537            | HydroNode::ReduceKeyed { f, .. }
3538            | HydroNode::ReduceKeyedWatermark { f, .. } => {
3539                transform(f);
3540            }
3541            HydroNode::Fold { init, acc, .. }
3542            | HydroNode::Scan { init, acc, .. }
3543            | HydroNode::FoldKeyed { init, acc, .. } => {
3544                transform(init);
3545                transform(acc);
3546            }
3547            HydroNode::Network {
3548                serialize_fn,
3549                deserialize_fn,
3550                ..
3551            } => {
3552                if let Some(serialize_fn) = serialize_fn {
3553                    transform(serialize_fn);
3554                }
3555                if let Some(deserialize_fn) = deserialize_fn {
3556                    transform(deserialize_fn);
3557                }
3558            }
3559            HydroNode::ExternalInput { deserialize_fn, .. } => {
3560                if let Some(deserialize_fn) = deserialize_fn {
3561                    transform(deserialize_fn);
3562                }
3563            }
3564            HydroNode::EmbeddedInput { .. } => {}
3565            HydroNode::Counter { duration, .. } => {
3566                transform(duration);
3567            }
3568        }
3569    }
3570
3571    pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3572        &self.metadata().op
3573    }
3574
3575    pub fn metadata(&self) -> &HydroIrMetadata {
3576        match self {
3577            HydroNode::Placeholder => {
3578                panic!()
3579            }
3580            HydroNode::Cast { metadata, .. } => metadata,
3581            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3582            HydroNode::Source { metadata, .. } => metadata,
3583            HydroNode::SingletonSource { metadata, .. } => metadata,
3584            HydroNode::CycleSource { metadata, .. } => metadata,
3585            HydroNode::Tee { metadata, .. } => metadata,
3586            HydroNode::YieldConcat { metadata, .. } => metadata,
3587            HydroNode::BeginAtomic { metadata, .. } => metadata,
3588            HydroNode::EndAtomic { metadata, .. } => metadata,
3589            HydroNode::Batch { metadata, .. } => metadata,
3590            HydroNode::Chain { metadata, .. } => metadata,
3591            HydroNode::ChainFirst { metadata, .. } => metadata,
3592            HydroNode::CrossProduct { metadata, .. } => metadata,
3593            HydroNode::CrossSingleton { metadata, .. } => metadata,
3594            HydroNode::Join { metadata, .. } => metadata,
3595            HydroNode::Difference { metadata, .. } => metadata,
3596            HydroNode::AntiJoin { metadata, .. } => metadata,
3597            HydroNode::ResolveFutures { metadata, .. } => metadata,
3598            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3599            HydroNode::Map { metadata, .. } => metadata,
3600            HydroNode::FlatMap { metadata, .. } => metadata,
3601            HydroNode::Filter { metadata, .. } => metadata,
3602            HydroNode::FilterMap { metadata, .. } => metadata,
3603            HydroNode::DeferTick { metadata, .. } => metadata,
3604            HydroNode::Enumerate { metadata, .. } => metadata,
3605            HydroNode::Inspect { metadata, .. } => metadata,
3606            HydroNode::Unique { metadata, .. } => metadata,
3607            HydroNode::Sort { metadata, .. } => metadata,
3608            HydroNode::Scan { metadata, .. } => metadata,
3609            HydroNode::Fold { metadata, .. } => metadata,
3610            HydroNode::FoldKeyed { metadata, .. } => metadata,
3611            HydroNode::Reduce { metadata, .. } => metadata,
3612            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3613            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3614            HydroNode::ExternalInput { metadata, .. } => metadata,
3615            HydroNode::Network { metadata, .. } => metadata,
3616            HydroNode::Counter { metadata, .. } => metadata,
3617            HydroNode::EmbeddedInput { metadata, .. } => metadata,
3618        }
3619    }
3620
3621    pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3622        &mut self.metadata_mut().op
3623    }
3624
3625    pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3626        match self {
3627            HydroNode::Placeholder => {
3628                panic!()
3629            }
3630            HydroNode::Cast { metadata, .. } => metadata,
3631            HydroNode::ObserveNonDet { metadata, .. } => metadata,
3632            HydroNode::Source { metadata, .. } => metadata,
3633            HydroNode::SingletonSource { metadata, .. } => metadata,
3634            HydroNode::CycleSource { metadata, .. } => metadata,
3635            HydroNode::Tee { metadata, .. } => metadata,
3636            HydroNode::YieldConcat { metadata, .. } => metadata,
3637            HydroNode::BeginAtomic { metadata, .. } => metadata,
3638            HydroNode::EndAtomic { metadata, .. } => metadata,
3639            HydroNode::Batch { metadata, .. } => metadata,
3640            HydroNode::Chain { metadata, .. } => metadata,
3641            HydroNode::ChainFirst { metadata, .. } => metadata,
3642            HydroNode::CrossProduct { metadata, .. } => metadata,
3643            HydroNode::CrossSingleton { metadata, .. } => metadata,
3644            HydroNode::Join { metadata, .. } => metadata,
3645            HydroNode::Difference { metadata, .. } => metadata,
3646            HydroNode::AntiJoin { metadata, .. } => metadata,
3647            HydroNode::ResolveFutures { metadata, .. } => metadata,
3648            HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3649            HydroNode::Map { metadata, .. } => metadata,
3650            HydroNode::FlatMap { metadata, .. } => metadata,
3651            HydroNode::Filter { metadata, .. } => metadata,
3652            HydroNode::FilterMap { metadata, .. } => metadata,
3653            HydroNode::DeferTick { metadata, .. } => metadata,
3654            HydroNode::Enumerate { metadata, .. } => metadata,
3655            HydroNode::Inspect { metadata, .. } => metadata,
3656            HydroNode::Unique { metadata, .. } => metadata,
3657            HydroNode::Sort { metadata, .. } => metadata,
3658            HydroNode::Scan { metadata, .. } => metadata,
3659            HydroNode::Fold { metadata, .. } => metadata,
3660            HydroNode::FoldKeyed { metadata, .. } => metadata,
3661            HydroNode::Reduce { metadata, .. } => metadata,
3662            HydroNode::ReduceKeyed { metadata, .. } => metadata,
3663            HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3664            HydroNode::ExternalInput { metadata, .. } => metadata,
3665            HydroNode::Network { metadata, .. } => metadata,
3666            HydroNode::Counter { metadata, .. } => metadata,
3667            HydroNode::EmbeddedInput { metadata, .. } => metadata,
3668        }
3669    }
3670
3671    pub fn input(&self) -> Vec<&HydroNode> {
3672        match self {
3673            HydroNode::Placeholder => {
3674                panic!()
3675            }
3676            HydroNode::Source { .. }
3677            | HydroNode::SingletonSource { .. }
3678            | HydroNode::ExternalInput { .. }
3679            | HydroNode::CycleSource { .. }
3680            | HydroNode::EmbeddedInput { .. }
3681            | HydroNode::Tee { .. } => {
3682                // Tee should find its input in separate special ways
3683                vec![]
3684            }
3685            HydroNode::Cast { inner, .. }
3686            | HydroNode::ObserveNonDet { inner, .. }
3687            | HydroNode::YieldConcat { inner, .. }
3688            | HydroNode::BeginAtomic { inner, .. }
3689            | HydroNode::EndAtomic { inner, .. }
3690            | HydroNode::Batch { inner, .. } => {
3691                vec![inner]
3692            }
3693            HydroNode::Chain { first, second, .. } => {
3694                vec![first, second]
3695            }
3696            HydroNode::ChainFirst { first, second, .. } => {
3697                vec![first, second]
3698            }
3699            HydroNode::CrossProduct { left, right, .. }
3700            | HydroNode::CrossSingleton { left, right, .. }
3701            | HydroNode::Join { left, right, .. } => {
3702                vec![left, right]
3703            }
3704            HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3705                vec![pos, neg]
3706            }
3707            HydroNode::Map { input, .. }
3708            | HydroNode::FlatMap { input, .. }
3709            | HydroNode::Filter { input, .. }
3710            | HydroNode::FilterMap { input, .. }
3711            | HydroNode::Sort { input, .. }
3712            | HydroNode::DeferTick { input, .. }
3713            | HydroNode::Enumerate { input, .. }
3714            | HydroNode::Inspect { input, .. }
3715            | HydroNode::Unique { input, .. }
3716            | HydroNode::Network { input, .. }
3717            | HydroNode::Counter { input, .. }
3718            | HydroNode::ResolveFutures { input, .. }
3719            | HydroNode::ResolveFuturesOrdered { input, .. }
3720            | HydroNode::Fold { input, .. }
3721            | HydroNode::FoldKeyed { input, .. }
3722            | HydroNode::Reduce { input, .. }
3723            | HydroNode::ReduceKeyed { input, .. }
3724            | HydroNode::Scan { input, .. } => {
3725                vec![input]
3726            }
3727            HydroNode::ReduceKeyedWatermark {
3728                input, watermark, ..
3729            } => {
3730                vec![input, watermark]
3731            }
3732        }
3733    }
3734
3735    pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3736        self.input()
3737            .iter()
3738            .map(|input_node| input_node.metadata())
3739            .collect()
3740    }
3741
3742    pub fn print_root(&self) -> String {
3743        match self {
3744            HydroNode::Placeholder => {
3745                panic!()
3746            }
3747            HydroNode::Cast { .. } => "Cast()".to_owned(),
3748            HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3749            HydroNode::Source { source, .. } => format!("Source({:?})", source),
3750            HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3751            HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3752            HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3753            HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3754            HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3755            HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3756            HydroNode::Batch { .. } => "Batch()".to_owned(),
3757            HydroNode::Chain { first, second, .. } => {
3758                format!("Chain({}, {})", first.print_root(), second.print_root())
3759            }
3760            HydroNode::ChainFirst { first, second, .. } => {
3761                format!(
3762                    "ChainFirst({}, {})",
3763                    first.print_root(),
3764                    second.print_root()
3765                )
3766            }
3767            HydroNode::CrossProduct { left, right, .. } => {
3768                format!(
3769                    "CrossProduct({}, {})",
3770                    left.print_root(),
3771                    right.print_root()
3772                )
3773            }
3774            HydroNode::CrossSingleton { left, right, .. } => {
3775                format!(
3776                    "CrossSingleton({}, {})",
3777                    left.print_root(),
3778                    right.print_root()
3779                )
3780            }
3781            HydroNode::Join { left, right, .. } => {
3782                format!("Join({}, {})", left.print_root(), right.print_root())
3783            }
3784            HydroNode::Difference { pos, neg, .. } => {
3785                format!("Difference({}, {})", pos.print_root(), neg.print_root())
3786            }
3787            HydroNode::AntiJoin { pos, neg, .. } => {
3788                format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3789            }
3790            HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
3791            HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
3792            HydroNode::Map { f, .. } => format!("Map({:?})", f),
3793            HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3794            HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3795            HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3796            HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
3797            HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
3798            HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3799            HydroNode::Unique { .. } => "Unique()".to_owned(),
3800            HydroNode::Sort { .. } => "Sort()".to_owned(),
3801            HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3802            HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3803            HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3804            HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3805            HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3806            HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3807            HydroNode::Network { .. } => "Network()".to_owned(),
3808            HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
3809            HydroNode::Counter { tag, duration, .. } => {
3810                format!("Counter({:?}, {:?})", tag, duration)
3811            }
3812            HydroNode::EmbeddedInput { ident, .. } => format!("EmbeddedInput({})", ident),
3813        }
3814    }
3815}
3816
3817#[cfg(feature = "build")]
3818fn instantiate_network<'a, D>(
3819    from_location: &LocationId,
3820    to_location: &LocationId,
3821    processes: &SparseSecondaryMap<LocationKey, D::Process>,
3822    clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
3823) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3824where
3825    D: Deploy<'a>,
3826{
3827    let ((sink, source), connect_fn) = match (from_location, to_location) {
3828        (&LocationId::Process(from), &LocationId::Process(to)) => {
3829            let from_node = processes
3830                .get(from)
3831                .unwrap_or_else(|| {
3832                    panic!("A process used in the graph was not instantiated: {}", from)
3833                })
3834                .clone();
3835            let to_node = processes
3836                .get(to)
3837                .unwrap_or_else(|| {
3838                    panic!("A process used in the graph was not instantiated: {}", to)
3839                })
3840                .clone();
3841
3842            let sink_port = from_node.next_port();
3843            let source_port = to_node.next_port();
3844
3845            (
3846                D::o2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3847                D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3848            )
3849        }
3850        (&LocationId::Process(from), &LocationId::Cluster(to)) => {
3851            let from_node = processes
3852                .get(from)
3853                .unwrap_or_else(|| {
3854                    panic!("A process used in the graph was not instantiated: {}", from)
3855                })
3856                .clone();
3857            let to_node = clusters
3858                .get(to)
3859                .unwrap_or_else(|| {
3860                    panic!("A cluster used in the graph was not instantiated: {}", to)
3861                })
3862                .clone();
3863
3864            let sink_port = from_node.next_port();
3865            let source_port = to_node.next_port();
3866
3867            (
3868                D::o2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3869                D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3870            )
3871        }
3872        (&LocationId::Cluster(from), &LocationId::Process(to)) => {
3873            let from_node = clusters
3874                .get(from)
3875                .unwrap_or_else(|| {
3876                    panic!("A cluster used in the graph was not instantiated: {}", from)
3877                })
3878                .clone();
3879            let to_node = processes
3880                .get(to)
3881                .unwrap_or_else(|| {
3882                    panic!("A process used in the graph was not instantiated: {}", to)
3883                })
3884                .clone();
3885
3886            let sink_port = from_node.next_port();
3887            let source_port = to_node.next_port();
3888
3889            (
3890                D::m2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3891                D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3892            )
3893        }
3894        (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
3895            let from_node = clusters
3896                .get(from)
3897                .unwrap_or_else(|| {
3898                    panic!("A cluster used in the graph was not instantiated: {}", from)
3899                })
3900                .clone();
3901            let to_node = clusters
3902                .get(to)
3903                .unwrap_or_else(|| {
3904                    panic!("A cluster used in the graph was not instantiated: {}", to)
3905                })
3906                .clone();
3907
3908            let sink_port = from_node.next_port();
3909            let source_port = to_node.next_port();
3910
3911            (
3912                D::m2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3913                D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3914            )
3915        }
3916        (LocationId::Tick(_, _), _) => panic!(),
3917        (_, LocationId::Tick(_, _)) => panic!(),
3918        (LocationId::Atomic(_), _) => panic!(),
3919        (_, LocationId::Atomic(_)) => panic!(),
3920    };
3921    (sink, source, connect_fn)
3922}
3923
3924#[cfg(test)]
3925mod test {
3926    use std::mem::size_of;
3927
3928    use stageleft::{QuotedWithContext, q};
3929
3930    use super::*;
3931
3932    #[test]
3933    #[cfg_attr(
3934        not(feature = "build"),
3935        ignore = "expects inclusion of feature-gated fields"
3936    )]
3937    fn hydro_node_size() {
3938        assert_eq!(size_of::<HydroNode>(), 240);
3939    }
3940
3941    #[test]
3942    #[cfg_attr(
3943        not(feature = "build"),
3944        ignore = "expects inclusion of feature-gated fields"
3945    )]
3946    fn hydro_root_size() {
3947        assert_eq!(size_of::<HydroRoot>(), 136);
3948    }
3949
3950    #[test]
3951    fn test_simplify_q_macro_basic() {
3952        // Test basic non-q! expression
3953        let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3954        let result = simplify_q_macro(simple_expr.clone());
3955        assert_eq!(result, simple_expr);
3956    }
3957
3958    #[test]
3959    fn test_simplify_q_macro_actual_stageleft_call() {
3960        // Test a simplified version of what a real stageleft call might look like
3961        let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3962        let result = simplify_q_macro(stageleft_call);
3963        // This should be processed by our visitor and simplified to q!(...)
3964        // since we detect the stageleft::runtime_support::fn_* pattern
3965        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3966    }
3967
3968    #[test]
3969    fn test_closure_no_pipe_at_start() {
3970        // Test a closure that does not start with a pipe
3971        let stageleft_call = q!({
3972            let foo = 123;
3973            move |b: usize| b + foo
3974        })
3975        .splice_fn1_ctx(&());
3976        let result = simplify_q_macro(stageleft_call);
3977        hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3978    }
3979}