Skip to main content

hydro_lang/compile/
deploy_provider.rs

1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::compile::builder::ExternalPortId;
12use crate::location::dynamic::LocationId;
13use crate::location::member_id::TaglessMemberId;
14use crate::location::{LocationKey, MembershipEvent, NetworkHint};
15
16pub trait Deploy<'a> {
17    type Meta: Default;
18    type InstantiateEnv;
19
20    type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
21    type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
22    type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
23        + RegisterPort<'a, Self>;
24
25    fn o2o_sink_source(
26        p1: &Self::Process,
27        p1_port: &<Self::Process as Node>::Port,
28        p2: &Self::Process,
29        p2_port: &<Self::Process as Node>::Port,
30    ) -> (syn::Expr, syn::Expr);
31    fn o2o_connect(
32        p1: &Self::Process,
33        p1_port: &<Self::Process as Node>::Port,
34        p2: &Self::Process,
35        p2_port: &<Self::Process as Node>::Port,
36    ) -> Box<dyn FnOnce()>;
37
38    fn o2m_sink_source(
39        p1: &Self::Process,
40        p1_port: &<Self::Process as Node>::Port,
41        c2: &Self::Cluster,
42        c2_port: &<Self::Cluster as Node>::Port,
43    ) -> (syn::Expr, syn::Expr);
44    fn o2m_connect(
45        p1: &Self::Process,
46        p1_port: &<Self::Process as Node>::Port,
47        c2: &Self::Cluster,
48        c2_port: &<Self::Cluster as Node>::Port,
49    ) -> Box<dyn FnOnce()>;
50
51    fn m2o_sink_source(
52        c1: &Self::Cluster,
53        c1_port: &<Self::Cluster as Node>::Port,
54        p2: &Self::Process,
55        p2_port: &<Self::Process as Node>::Port,
56    ) -> (syn::Expr, syn::Expr);
57    fn m2o_connect(
58        c1: &Self::Cluster,
59        c1_port: &<Self::Cluster as Node>::Port,
60        p2: &Self::Process,
61        p2_port: &<Self::Process as Node>::Port,
62    ) -> Box<dyn FnOnce()>;
63
64    fn m2m_sink_source(
65        c1: &Self::Cluster,
66        c1_port: &<Self::Cluster as Node>::Port,
67        c2: &Self::Cluster,
68        c2_port: &<Self::Cluster as Node>::Port,
69    ) -> (syn::Expr, syn::Expr);
70    fn m2m_connect(
71        c1: &Self::Cluster,
72        c1_port: &<Self::Cluster as Node>::Port,
73        c2: &Self::Cluster,
74        c2_port: &<Self::Cluster as Node>::Port,
75    ) -> Box<dyn FnOnce()>;
76
77    fn e2o_many_source(
78        extra_stmts: &mut Vec<syn::Stmt>,
79        p2: &Self::Process,
80        p2_port: &<Self::Process as Node>::Port,
81        codec_type: &syn::Type,
82        shared_handle: String,
83    ) -> syn::Expr;
84    fn e2o_many_sink(shared_handle: String) -> syn::Expr;
85
86    fn e2o_source(
87        extra_stmts: &mut Vec<syn::Stmt>,
88        p1: &Self::External,
89        p1_port: &<Self::External as Node>::Port,
90        p2: &Self::Process,
91        p2_port: &<Self::Process as Node>::Port,
92        codec_type: &syn::Type,
93        shared_handle: String,
94    ) -> syn::Expr;
95    fn e2o_connect(
96        p1: &Self::External,
97        p1_port: &<Self::External as Node>::Port,
98        p2: &Self::Process,
99        p2_port: &<Self::Process as Node>::Port,
100        many: bool,
101        server_hint: NetworkHint,
102    ) -> Box<dyn FnOnce()>;
103
104    fn o2e_sink(
105        p1: &Self::Process,
106        p1_port: &<Self::Process as Node>::Port,
107        p2: &Self::External,
108        p2_port: &<Self::External as Node>::Port,
109        shared_handle: String,
110    ) -> syn::Expr;
111
112    fn cluster_ids(
113        of_cluster: LocationKey,
114    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
115
116    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
117
118    fn cluster_membership_stream(
119        location_id: &LocationId,
120    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
121
122    /// Registers an embedded input for the given ident and element type.
123    ///
124    /// Only meaningful for the embedded deployment backend. The default
125    /// implementation panics.
126    fn register_embedded_input(
127        _env: &mut Self::InstantiateEnv,
128        _ident: &syn::Ident,
129        _element_type: &syn::Type,
130    ) {
131        panic!("register_embedded_input is only supported by EmbeddedDeploy");
132    }
133}
134
135pub trait ProcessSpec<'a, D>
136where
137    D: Deploy<'a> + ?Sized,
138{
139    fn build(self, location_key: LocationKey, name_hint: &str) -> D::Process;
140}
141
142pub trait IntoProcessSpec<'a, D>
143where
144    D: Deploy<'a> + ?Sized,
145{
146    type ProcessSpec: ProcessSpec<'a, D>;
147    fn into_process_spec(self) -> Self::ProcessSpec;
148}
149
150impl<'a, D, T> IntoProcessSpec<'a, D> for T
151where
152    D: Deploy<'a> + ?Sized,
153    T: ProcessSpec<'a, D>,
154{
155    type ProcessSpec = T;
156    fn into_process_spec(self) -> Self::ProcessSpec {
157        self
158    }
159}
160
161pub trait ClusterSpec<'a, D>
162where
163    D: Deploy<'a> + ?Sized,
164{
165    fn build(self, location_key: LocationKey, name_hint: &str) -> D::Cluster;
166}
167
168pub trait ExternalSpec<'a, D>
169where
170    D: Deploy<'a> + ?Sized,
171{
172    fn build(self, location_key: LocationKey, name_hint: &str) -> D::External;
173}
174
175pub trait Node {
176    /// A logical communication endpoint for this node.
177    ///
178    /// Implementors are free to choose the concrete representation (for example,
179    /// a handle or identifier), but it must be `Clone` so that a single logical
180    /// port can be duplicated and passed to multiple consumers. New ports are
181    /// allocated via [`Self::next_port`].
182    type Port: Clone;
183    type Meta: Default;
184    type InstantiateEnv;
185
186    /// Allocates and returns a new port.
187    fn next_port(&self) -> Self::Port;
188
189    fn update_meta(&self, meta: &Self::Meta);
190
191    fn instantiate(
192        &self,
193        env: &mut Self::InstantiateEnv,
194        meta: &mut Self::Meta,
195        graph: DfirGraph,
196        extra_stmts: &[syn::Stmt],
197        sidecars: &[syn::Expr],
198    );
199}
200
201pub type DynSourceSink<Out, In, InErr> = (
202    Pin<Box<dyn Stream<Item = Out>>>,
203    Pin<Box<dyn Sink<In, Error = InErr>>>,
204);
205
206pub trait RegisterPort<'a, D>: Node + Clone
207where
208    D: Deploy<'a> + ?Sized,
209{
210    fn register(&self, external_port_id: ExternalPortId, port: Self::Port);
211
212    fn as_bytes_bidi(
213        &self,
214        external_port_id: ExternalPortId,
215    ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
216
217    fn as_bincode_bidi<InT, OutT>(
218        &self,
219        external_port_id: ExternalPortId,
220    ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
221    where
222        InT: Serialize + 'static,
223        OutT: DeserializeOwned + 'static;
224
225    fn as_bincode_sink<T>(
226        &self,
227        external_port_id: ExternalPortId,
228    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
229    where
230        T: Serialize + 'static;
231
232    fn as_bincode_source<T>(
233        &self,
234        external_port_id: ExternalPortId,
235    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
236    where
237        T: DeserializeOwned + 'static;
238}