1use std::future::Future;
17use std::io::Error;
18use std::pin::Pin;
19
20use bytes::{Bytes, BytesMut};
21use dfir_lang::diagnostic::Diagnostics;
22use dfir_lang::graph::DfirGraph;
23use futures::{Sink, Stream};
24use proc_macro2::Span;
25use quote::quote;
26use serde::Serialize;
27use serde::de::DeserializeOwned;
28use stageleft::{QuotedWithContext, q};
29
30use super::deploy_provider::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
31use crate::compile::builder::ExternalPortId;
32use crate::location::dynamic::LocationId;
33use crate::location::member_id::TaglessMemberId;
34use crate::location::{LocationKey, MembershipEvent, NetworkHint};
35
36pub enum EmbeddedDeploy {}
40
41#[derive(Clone)]
43pub struct EmbeddedNode {
44 pub fn_name: String,
46}
47
48impl Node for EmbeddedNode {
49 type Port = ();
50 type Meta = ();
51 type InstantiateEnv = EmbeddedInstantiateEnv;
52
53 fn next_port(&self) -> Self::Port {}
54
55 fn update_meta(&self, _meta: &Self::Meta) {}
56
57 fn instantiate(
58 &self,
59 _env: &mut Self::InstantiateEnv,
60 _meta: &mut Self::Meta,
61 _graph: DfirGraph,
62 _extra_stmts: &[syn::Stmt],
63 _sidecars: &[syn::Expr],
64 ) {
65 }
67}
68
69impl<'a> RegisterPort<'a, EmbeddedDeploy> for EmbeddedNode {
70 fn register(&self, _external_port_id: ExternalPortId, _port: Self::Port) {
71 panic!("EmbeddedDeploy does not support external ports");
72 }
73
74 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
75 fn as_bytes_bidi(
76 &self,
77 _external_port_id: ExternalPortId,
78 ) -> impl Future<
79 Output = super::deploy_provider::DynSourceSink<Result<BytesMut, Error>, Bytes, Error>,
80 > + 'a {
81 async { panic!("EmbeddedDeploy does not support external ports") }
82 }
83
84 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
85 fn as_bincode_bidi<InT, OutT>(
86 &self,
87 _external_port_id: ExternalPortId,
88 ) -> impl Future<Output = super::deploy_provider::DynSourceSink<OutT, InT, Error>> + 'a
89 where
90 InT: Serialize + 'static,
91 OutT: DeserializeOwned + 'static,
92 {
93 async { panic!("EmbeddedDeploy does not support external ports") }
94 }
95
96 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
97 fn as_bincode_sink<T>(
98 &self,
99 _external_port_id: ExternalPortId,
100 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
101 where
102 T: Serialize + 'static,
103 {
104 async { panic!("EmbeddedDeploy does not support external ports") }
105 }
106
107 #[expect(clippy::manual_async_fn, reason = "false positive, involves lifetimes")]
108 fn as_bincode_source<T>(
109 &self,
110 _external_port_id: ExternalPortId,
111 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
112 where
113 T: DeserializeOwned + 'static,
114 {
115 async { panic!("EmbeddedDeploy does not support external ports") }
116 }
117}
118
119impl<S: Into<String>> ProcessSpec<'_, EmbeddedDeploy> for S {
120 fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
121 EmbeddedNode {
122 fn_name: self.into(),
123 }
124 }
125}
126
127impl<S: Into<String>> ClusterSpec<'_, EmbeddedDeploy> for S {
128 fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
129 EmbeddedNode {
130 fn_name: self.into(),
131 }
132 }
133}
134
135impl<S: Into<String>> ExternalSpec<'_, EmbeddedDeploy> for S {
136 fn build(self, _location_key: LocationKey, _name_hint: &str) -> EmbeddedNode {
137 EmbeddedNode {
138 fn_name: self.into(),
139 }
140 }
141}
142
143#[derive(Default)]
149pub struct EmbeddedInstantiateEnv {
150 pub inputs: Vec<(syn::Ident, syn::Type)>,
152}
153
154impl<'a> Deploy<'a> for EmbeddedDeploy {
155 type Meta = ();
156 type InstantiateEnv = EmbeddedInstantiateEnv;
157
158 type Process = EmbeddedNode;
159 type Cluster = EmbeddedNode;
160 type External = EmbeddedNode;
161
162 fn o2o_sink_source(
163 _p1: &Self::Process,
164 _p1_port: &(),
165 _p2: &Self::Process,
166 _p2_port: &(),
167 ) -> (syn::Expr, syn::Expr) {
168 panic!("EmbeddedDeploy does not support networking (o2o)")
169 }
170
171 fn o2o_connect(
172 _p1: &Self::Process,
173 _p1_port: &(),
174 _p2: &Self::Process,
175 _p2_port: &(),
176 ) -> Box<dyn FnOnce()> {
177 panic!("EmbeddedDeploy does not support networking (o2o)")
178 }
179
180 fn o2m_sink_source(
181 _p1: &Self::Process,
182 _p1_port: &(),
183 _c2: &Self::Cluster,
184 _c2_port: &(),
185 ) -> (syn::Expr, syn::Expr) {
186 panic!("EmbeddedDeploy does not support networking (o2m)")
187 }
188
189 fn o2m_connect(
190 _p1: &Self::Process,
191 _p1_port: &(),
192 _c2: &Self::Cluster,
193 _c2_port: &(),
194 ) -> Box<dyn FnOnce()> {
195 panic!("EmbeddedDeploy does not support networking (o2m)")
196 }
197
198 fn m2o_sink_source(
199 _c1: &Self::Cluster,
200 _c1_port: &(),
201 _p2: &Self::Process,
202 _p2_port: &(),
203 ) -> (syn::Expr, syn::Expr) {
204 panic!("EmbeddedDeploy does not support networking (m2o)")
205 }
206
207 fn m2o_connect(
208 _c1: &Self::Cluster,
209 _c1_port: &(),
210 _p2: &Self::Process,
211 _p2_port: &(),
212 ) -> Box<dyn FnOnce()> {
213 panic!("EmbeddedDeploy does not support networking (m2o)")
214 }
215
216 fn m2m_sink_source(
217 _c1: &Self::Cluster,
218 _c1_port: &(),
219 _c2: &Self::Cluster,
220 _c2_port: &(),
221 ) -> (syn::Expr, syn::Expr) {
222 panic!("EmbeddedDeploy does not support networking (m2m)")
223 }
224
225 fn m2m_connect(
226 _c1: &Self::Cluster,
227 _c1_port: &(),
228 _c2: &Self::Cluster,
229 _c2_port: &(),
230 ) -> Box<dyn FnOnce()> {
231 panic!("EmbeddedDeploy does not support networking (m2m)")
232 }
233
234 fn e2o_many_source(
235 _extra_stmts: &mut Vec<syn::Stmt>,
236 _p2: &Self::Process,
237 _p2_port: &(),
238 _codec_type: &syn::Type,
239 _shared_handle: String,
240 ) -> syn::Expr {
241 panic!("EmbeddedDeploy does not support networking (e2o)")
242 }
243
244 fn e2o_many_sink(_shared_handle: String) -> syn::Expr {
245 panic!("EmbeddedDeploy does not support networking (e2o)")
246 }
247
248 fn e2o_source(
249 _extra_stmts: &mut Vec<syn::Stmt>,
250 _p1: &Self::External,
251 _p1_port: &(),
252 _p2: &Self::Process,
253 _p2_port: &(),
254 _codec_type: &syn::Type,
255 _shared_handle: String,
256 ) -> syn::Expr {
257 panic!("EmbeddedDeploy does not support networking (e2o)")
258 }
259
260 fn e2o_connect(
261 _p1: &Self::External,
262 _p1_port: &(),
263 _p2: &Self::Process,
264 _p2_port: &(),
265 _many: bool,
266 _server_hint: NetworkHint,
267 ) -> Box<dyn FnOnce()> {
268 panic!("EmbeddedDeploy does not support networking (e2o)")
269 }
270
271 fn o2e_sink(
272 _p1: &Self::Process,
273 _p1_port: &(),
274 _p2: &Self::External,
275 _p2_port: &(),
276 _shared_handle: String,
277 ) -> syn::Expr {
278 panic!("EmbeddedDeploy does not support networking (o2e)")
279 }
280
281 #[expect(
282 unreachable_code,
283 reason = "panic before q! which is only for return type"
284 )]
285 fn cluster_ids(
286 _of_cluster: LocationKey,
287 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
288 panic!("EmbeddedDeploy does not support cluster IDs");
289 q!(unreachable!("EmbeddedDeploy does not support cluster IDs"))
290 }
291
292 #[expect(
293 unreachable_code,
294 reason = "panic before q! which is only for return type"
295 )]
296 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
297 panic!("EmbeddedDeploy does not support cluster self ID");
298 q!(unreachable!(
299 "EmbeddedDeploy does not support cluster self ID"
300 ))
301 }
302
303 #[expect(
304 unreachable_code,
305 reason = "panic before q! which is only for return type"
306 )]
307 fn cluster_membership_stream(
308 _location_id: &LocationId,
309 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
310 {
311 panic!("EmbeddedDeploy does not support cluster membership streams");
312 q!(unreachable!(
313 "EmbeddedDeploy does not support cluster membership streams"
314 ))
315 }
316
317 fn register_embedded_input(
318 env: &mut Self::InstantiateEnv,
319 ident: &syn::Ident,
320 element_type: &syn::Type,
321 ) {
322 env.inputs.push((ident.clone(), element_type.clone()));
323 }
324}
325
326impl super::deploy::DeployFlow<'_, EmbeddedDeploy> {
327 pub fn generate_embedded(mut self, crate_name: &str) -> syn::File {
358 let mut env = EmbeddedInstantiateEnv::default();
359 let compiled = self.compile_internal(&mut env);
360
361 let mut inputs = env.inputs;
363 inputs.sort_by(|a, b| a.0.to_string().cmp(&b.0.to_string()));
364
365 let root = crate::staging_util::get_this_crate();
366 let orig_crate_name = quote::format_ident!("{}", crate_name.replace('-', "_"));
367
368 let mut functions: Vec<syn::Item> = Vec::new();
369
370 let mut location_keys: Vec<_> = compiled.all_dfir().keys().collect();
372 location_keys.sort();
373
374 let input_params: Vec<proc_macro2::TokenStream> = inputs
376 .iter()
377 .map(|(ident, element_type)| {
378 quote! { #ident: impl __root_dfir_rs::futures::Stream<Item = #element_type> + Unpin + 'a }
379 })
380 .collect();
381
382 for location_key in location_keys {
383 let graph = &compiled.all_dfir()[location_key];
384
385 let fn_name = self
387 .processes
388 .get(location_key)
389 .map(|n| &n.fn_name)
390 .or_else(|| self.clusters.get(location_key).map(|n| &n.fn_name))
391 .or_else(|| self.externals.get(location_key).map(|n| &n.fn_name))
392 .expect("location key not found in any node map");
393
394 let fn_ident = syn::Ident::new(fn_name, Span::call_site());
395
396 let mut diagnostics = Diagnostics::new();
397 let dfir_tokens = graph
398 .as_code("e! { __root_dfir_rs }, true, quote!(), &mut diagnostics)
399 .expect("DFIR code generation failed with diagnostics.");
400
401 let func: syn::Item = syn::parse_quote! {
402 #[allow(unused, non_snake_case, clippy::suspicious_else_formatting)]
403 pub fn #fn_ident<'a>(#(#input_params),*) -> #root::runtime_support::dfir_rs::scheduled::graph::Dfir<'a> {
404 #dfir_tokens
405 }
406 };
407 functions.push(func);
408 }
409
410 syn::parse_quote! {
411 use #orig_crate_name::__staged::__deps::*;
412 use #root::prelude::*;
413 use #root::runtime_support::dfir_rs as __root_dfir_rs;
414 pub use #orig_crate_name::__staged;
415
416 #( #functions )*
417 }
418 }
419}