veecle_os_runtime/
actor.rs1use core::convert::Infallible;
3use core::pin::Pin;
4
5#[doc(inline)]
6pub use veecle_os_runtime_macros::actor;
7
8use crate::datastore::{ExclusiveReader, InitializedReader, Reader, Storable, Writer};
9use crate::datastore::{Slot, generational};
10
11mod sealed {
12 pub trait Sealed {}
13}
14
15pub trait Actor<'a> {
112 type StoreRequest: StoreRequest<'a>;
114
115 type InitContext;
117
118 type Error: core::error::Error;
122
123 fn new(input: Self::StoreRequest, init_context: Self::InitContext) -> Self;
127
128 fn run(
132 self,
133 ) -> impl core::future::Future<Output = Result<core::convert::Infallible, Self::Error>>;
134}
135
136pub trait StoreRequest<'a>: sealed::Sealed {
142 #[doc(hidden)]
144 #[allow(async_fn_in_trait)] async fn request(datastore: Pin<&'a impl Datastore>) -> Self;
146}
147
148impl sealed::Sealed for () {}
149
150pub trait Datastore {
152 fn source(self: Pin<&Self>) -> Pin<&generational::Source>;
157
158 #[expect(rustdoc::private_intra_doc_links)] #[expect(private_interfaces)] fn slot<T>(self: Pin<&Self>) -> Pin<&Slot<T>>
166 where
167 T: Storable + 'static;
168}
169
170impl<S> Datastore for Pin<&S>
171where
172 S: Datastore,
173{
174 fn source(self: Pin<&Self>) -> Pin<&generational::Source> {
175 Pin::into_inner(self).source()
176 }
177
178 #[expect(private_interfaces)] fn slot<T>(self: Pin<&Self>) -> Pin<&Slot<T>>
180 where
181 T: Storable + 'static,
182 {
183 Pin::into_inner(self).slot()
184 }
185}
186
187pub(crate) trait DatastoreExt<'a>: Copy {
188 #[cfg(test)]
189 fn increment_generation(self);
193
194 fn reader<T>(self) -> Reader<'a, T>
200 where
201 T: Storable + 'static;
202
203 fn exclusive_reader<T>(self) -> ExclusiveReader<'a, T>
212 where
213 T: Storable + 'static;
214
215 fn writer<T>(self) -> Writer<'a, T>
223 where
224 T: Storable + 'static;
225}
226
227impl<'a, S> DatastoreExt<'a> for Pin<&'a S>
228where
229 S: Datastore,
230{
231 #[cfg(test)]
232 #[cfg_attr(coverage_nightly, coverage(off))]
233 fn increment_generation(self) {
234 self.source().increment_generation()
235 }
236
237 fn reader<T>(self) -> Reader<'a, T>
238 where
239 T: Storable + 'static,
240 {
241 Reader::from_slot(self.slot::<T>())
242 }
243
244 fn exclusive_reader<T>(self) -> ExclusiveReader<'a, T>
245 where
246 T: Storable + 'static,
247 {
248 ExclusiveReader::from_slot(self.slot::<T>())
249 }
250
251 fn writer<T>(self) -> Writer<'a, T>
252 where
253 T: Storable + 'static,
254 {
255 Writer::new(self.source().waiter(), self.slot::<T>())
256 }
257}
258
259impl<'a> StoreRequest<'a> for () {
261 async fn request(_store: Pin<&'a impl Datastore>) -> Self {}
262}
263
264impl<T> sealed::Sealed for Reader<'_, T> where T: Storable + 'static {}
265
266impl<'a, T> StoreRequest<'a> for Reader<'a, T>
267where
268 T: Storable + 'static,
269{
270 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
271 datastore.reader()
272 }
273}
274
275impl<T> sealed::Sealed for ExclusiveReader<'_, T> where T: Storable + 'static {}
276
277impl<'a, T> StoreRequest<'a> for ExclusiveReader<'a, T>
278where
279 T: Storable + 'static,
280{
281 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
282 datastore.exclusive_reader()
283 }
284}
285
286impl<T> sealed::Sealed for InitializedReader<'_, T> where T: Storable + 'static {}
287
288impl<'a, T> StoreRequest<'a> for InitializedReader<'a, T>
289where
290 T: Storable + 'static,
291{
292 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
293 Reader::from_slot(datastore.slot()).wait_init().await
294 }
295}
296
297impl<T> sealed::Sealed for Writer<'_, T> where T: Storable + 'static {}
298
299impl<'a, T> StoreRequest<'a> for Writer<'a, T>
300where
301 T: Storable + 'static,
302{
303 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
304 datastore.writer()
305 }
306}
307
308macro_rules! impl_request_helper {
310 ($t:ident) => {
311 #[cfg_attr(docsrs, doc(fake_variadic))]
312 impl<'a, $t> sealed::Sealed for ($t,) { }
314
315 #[cfg_attr(docsrs, doc(fake_variadic))]
316 impl<'a, $t> StoreRequest<'a> for ($t,)
318 where
319 $t: StoreRequest<'a>,
320 {
321 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
322 (<$t as StoreRequest>::request(datastore).await,)
323 }
324 }
325 };
326
327 (@impl $($t:ident)*) => {
328 #[cfg_attr(docsrs, doc(hidden))]
329 impl<'a, $($t),*> sealed::Sealed for ( $( $t, )* )
330 where
331 $($t: sealed::Sealed),*
332 { }
333
334 #[cfg_attr(docsrs, doc(hidden))]
335 impl<'a, $($t),*> StoreRequest<'a> for ( $( $t, )* )
336 where
337 $($t: StoreRequest<'a>),*
338 {
339 async fn request(datastore: Pin<&'a impl Datastore>) -> Self {
340 futures::join!($( <$t as StoreRequest>::request(datastore), )*)
346 }
347 }
348 };
349
350 ($head:ident $($rest:ident)*) => {
351 impl_request_helper!(@impl $head $($rest)*);
352 impl_request_helper!($($rest)*);
353 };
354}
355
356impl_request_helper!(Z Y X W V U T);
357
358#[diagnostic::on_unimplemented(
360 message = "#[veecle_os_runtime::actor] functions should return either a `Result<Infallible, _>` or `Infallible`",
361 label = "not a valid actor return type"
362)]
363pub trait IsActorResult: sealed::Sealed {
364 type Error;
366
367 fn into_result(self) -> Result<Infallible, Self::Error>;
369}
370
371impl<E> sealed::Sealed for Result<Infallible, E> {}
372
373impl<E> IsActorResult for Result<Infallible, E> {
374 type Error = E;
375
376 fn into_result(self) -> Result<Infallible, E> {
377 self
378 }
379}
380
381impl sealed::Sealed for Infallible {}
382
383impl IsActorResult for Infallible {
384 type Error = Infallible;
385
386 fn into_result(self) -> Result<Infallible, Self::Error> {
387 match self {}
388 }
389}
390
391#[cfg(test)]
392#[cfg_attr(coverage_nightly, coverage(off))]
393mod tests {
394 use core::future::Future;
395 use core::pin::pin;
396 use core::task::{Context, Poll};
397
398 use futures::future::FutureExt;
399
400 use crate::actor::{DatastoreExt, StoreRequest};
401 use crate::cons::{Cons, Nil};
402 use crate::datastore::{InitializedReader, Storable};
403
404 #[test]
405 fn multi_request_order_independence() {
406 #[derive(Debug, Storable)]
407 #[storable(crate = crate)]
408 struct A;
409
410 #[derive(Debug, Storable)]
411 #[storable(crate = crate)]
412 struct B;
413
414 let datastore = pin!(crate::execute::make_store::<Cons<A, Cons<B, Nil>>>());
415
416 let mut a_writer = datastore.as_ref().writer::<A>();
417 let mut b_writer = datastore.as_ref().writer::<B>();
418
419 let mut request_1 = pin!(<(InitializedReader<A>, InitializedReader<B>)>::request(
422 datastore.as_ref()
423 ));
424 let mut request_2 = pin!(<(InitializedReader<B>, InitializedReader<A>)>::request(
425 datastore.as_ref()
426 ));
427
428 let (request_1_waker, request_1_wake_count) = futures_test::task::new_count_waker();
429 let (request_2_waker, request_2_wake_count) = futures_test::task::new_count_waker();
430
431 let mut request_1_context = Context::from_waker(&request_1_waker);
432 let mut request_2_context = Context::from_waker(&request_2_waker);
433
434 assert!(matches!(
435 request_1.as_mut().poll(&mut request_1_context),
436 Poll::Pending
437 ));
438 assert!(matches!(
439 request_2.as_mut().poll(&mut request_2_context),
440 Poll::Pending
441 ));
442
443 let old_request_1_wake_count = request_1_wake_count.get();
444 let old_request_2_wake_count = request_2_wake_count.get();
445
446 datastore.as_ref().increment_generation();
447
448 a_writer.write(A).now_or_never().unwrap();
449
450 if request_1_wake_count.get() > old_request_1_wake_count {
452 assert!(matches!(
453 request_1.as_mut().poll(&mut request_1_context),
454 Poll::Pending
455 ));
456 }
457 if request_2_wake_count.get() > old_request_2_wake_count {
458 assert!(matches!(
459 request_2.as_mut().poll(&mut request_2_context),
460 Poll::Pending
461 ));
462 }
463
464 let old_request_1_wake_count = request_1_wake_count.get();
465 let old_request_2_wake_count = request_2_wake_count.get();
466
467 datastore.as_ref().increment_generation();
468
469 b_writer.write(B).now_or_never().unwrap();
470
471 assert!(request_1_wake_count.get() > old_request_1_wake_count);
473 assert!(request_2_wake_count.get() > old_request_2_wake_count);
474
475 let Poll::Ready((mut request_1_a, mut request_1_b)) =
476 request_1.as_mut().poll(&mut request_1_context)
477 else {
478 panic!("request 1 was not ready")
479 };
480
481 let Poll::Ready((mut request_2_a, mut request_2_b)) =
482 request_2.as_mut().poll(&mut request_2_context)
483 else {
484 panic!("request 2 was not ready")
485 };
486
487 assert!(request_1_a.wait_for_update().now_or_never().is_some());
489 assert!(request_1_b.wait_for_update().now_or_never().is_some());
490
491 assert!(request_2_a.wait_for_update().now_or_never().is_some());
492 assert!(request_2_b.wait_for_update().now_or_never().is_some());
493 }
494}