Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 5 additions & 30 deletions include/exec/detail/shared.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "../../stdexec/__detail/__optional.hpp"
#include "../../stdexec/__detail/__queries.hpp"
#include "../../stdexec/__detail/__receivers.hpp"
#include "../../stdexec/__detail/__storage.hpp"
#include "../../stdexec/__detail/__transform_completion_signatures.hpp"
#include "../../stdexec/__detail/__tuple.hpp"
#include "../../stdexec/__detail/__variant.hpp" // IWYU pragma: keep
Expand Down Expand Up @@ -77,24 +78,6 @@ namespace experimental::execution::__shared
template <class _Env>
using __env_t = __join_env_t<prop<get_stop_token_t, inplace_stop_token>, _Env>;

struct __notify_fn
{
template <class _Receiver, class _Tag, class... _Args>
constexpr void operator()(_Receiver& __rcvr, _Tag, _Args&&... __args) const noexcept
{
_Tag()(static_cast<_Receiver&&>(__rcvr), static_cast<_Args&&>(__args)...);
}
};

struct __notify_visitor
{
template <class _Receiver, class _Tuple>
constexpr void operator()(_Receiver& __rcvr, _Tuple&& __tupl) const noexcept
{
STDEXEC::__apply(__notify_fn(), static_cast<_Tuple&&>(__tupl), __rcvr);
};
};

////////////////////////////////////////////////////////////////////////////////////////
template <class _Env, class _Variant>
struct __receiver
Expand Down Expand Up @@ -133,13 +116,8 @@ namespace experimental::execution::__shared
////////////////////////////////////////////////////////////////////////////////////////
template <class _CvSender, class _Env>
using __result_variant_t =
__transform_reduce_completion_signatures_t<__completion_signatures_of_t<_CvSender, _Env>,
__mbind_front_q<__decayed_tuple, set_value_t>::__f,
__mbind_front_q<__decayed_tuple, set_error_t>::__f,
__tuple<set_stopped_t>,
__munique<__qq<__variant>>::__f,
__tuple<set_error_t, std::exception_ptr>,
__tuple<set_stopped_t>>;
__mapply<__mbind_front_q<__results_storage, set_stopped_t(), set_error_t(std::exception_ptr)>,
__completion_signatures_of_t<_CvSender, _Env>>;

////////////////////////////////////////////////////////////////////////////////////////
template <class _CvChild, class _Env>
Expand Down Expand Up @@ -269,10 +247,7 @@ namespace experimental::execution::__shared
using __cv_variant_t = __if_c<__is_split, __variant_t const &, __variant_t>;

__on_stop_.reset();

STDEXEC::__visit(__notify_visitor(),
static_cast<__cv_variant_t&&>(__sh_state_->__results_),
__rcvr_);
static_cast<__cv_variant_t&&>(__sh_state_->__results_).__complete(__rcvr_);
}

_Receiver __rcvr_;
Expand Down Expand Up @@ -353,7 +328,7 @@ namespace experimental::execution::__shared
__waiters_list_t __waiters_{};
inplace_stop_source __stop_source_{};
__env_t<_Env> __env_;
_Variant __results_{__no_init}; // Initialized to the "set_stopped" state in the ctor.
_Variant __results_; // Initialized to the "set_stopped" state in the ctor.
};

template <class _Env, class _Variant>
Expand Down
37 changes: 4 additions & 33 deletions include/exec/fork_join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma once

#include "../stdexec/__detail/__receiver_ref.hpp"
#include "../stdexec/__detail/__storage.hpp"
#include "../stdexec/execution.hpp"

#include <exception>
Expand All @@ -27,26 +28,6 @@ namespace experimental::execution

struct fork_join_impl_t
{
struct _dematerialize_fn
{
struct _impl_fn
{
template <class Rcvr, class Tag, class... Args>
STDEXEC_ATTRIBUTE(always_inline, host, device)
constexpr void operator()(Rcvr& rcvr, Tag, Args const &... args) const noexcept
{
Tag{}(static_cast<Rcvr&&>(rcvr), args...);
}
};

template <class Rcvr, class Tuple>
STDEXEC_ATTRIBUTE(always_inline, host, device)
constexpr void operator()(Rcvr& rcvr, Tuple const & tupl) const noexcept
{
STDEXEC::__apply(_impl_fn{}, tupl, rcvr);
}
};

struct _mk_when_all_fn
{
template <class CacheSndr, class... Closures>
Expand All @@ -58,17 +39,7 @@ namespace experimental::execution
};

template <class Completions>
using _maybe_eptr_completion_t =
STDEXEC::__if_c<STDEXEC::__nothrow_decay_copyable_results_t<Completions>::value,
STDEXEC::__mset_nil,
STDEXEC::__tuple<STDEXEC::set_error_t, ::std::exception_ptr>>;

template <class Completions>
using _variant_t = STDEXEC::__mset_insert<
STDEXEC::__for_each_completion_signature_t<Completions,
STDEXEC::__decayed_tuple,
STDEXEC::__mset>,
_maybe_eptr_completion_t<Completions>>::template rebind<STDEXEC::__variant>;
using _variant_t = STDEXEC::__mapply_q<STDEXEC::__results_storage, Completions>;

template <class Domain>
struct _env_t
Expand Down Expand Up @@ -102,7 +73,7 @@ namespace experimental::execution

STDEXEC_ATTRIBUTE(host, device) void start() noexcept
{
STDEXEC::__visit(_dematerialize_fn{}, *_results_, _rcvr_);
std::as_const(*_results_).__complete(_rcvr_);
}

Rcvr _rcvr_;
Expand Down Expand Up @@ -230,7 +201,7 @@ namespace experimental::execution
}

Rcvr _rcvr_;
_variant_t<_child_completions_t> _cache_{STDEXEC::__no_init};
_variant_t<_child_completions_t> _cache_;
STDEXEC::__manual_lifetime<_child_opstate_t> _child_opstate_{};
_fork_opstate_t _fork_opstate_;
};
Expand Down
2 changes: 1 addition & 1 deletion include/exec/just_from.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ namespace experimental::execution
{
// Extract the tags from the completion signatures and use them to construct the attributes.
return STDEXEC::__mapply<
STDEXEC::__mtransform<STDEXEC::__q1<STDEXEC::__detail::__tag_of_sig_t>,
STDEXEC::__mtransform<STDEXEC::__q1<STDEXEC::__signature_tag_t>,
STDEXEC::__munique<STDEXEC::__qq<_just_from::_attrs>>>,
completion_signatures>();
}
Expand Down
85 changes: 41 additions & 44 deletions include/exec/sequence/ignore_all_values.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include "../../stdexec/execution.hpp"

// include these after execution.hpp
#include "../../stdexec/__detail/__senders.hpp"
#include "../../stdexec/__detail/__tuple.hpp"
#include "../../stdexec/__detail/__variant.hpp"
#include "../sender_for.hpp"
#include "../sequence_senders.hpp"

Expand Down Expand Up @@ -58,7 +58,7 @@ namespace experimental::execution
static_cast<_Receiver&&>(__rcvr));
};

template <class _ResultVariant>
template <class _ResultsStorage>
struct __result_type
{
template <class... _Args>
Expand All @@ -83,31 +83,28 @@ namespace experimental::execution
{
STDEXEC::set_value(static_cast<_Receiver&&>(__rcvr));
}
else if constexpr (STDEXEC::__mapply<STDEXEC::__msize, _ResultVariant>::value != 0)
else if constexpr (STDEXEC::__mapply<STDEXEC::__msize, _ResultsStorage>::value != 0)
{
STDEXEC_ASSERT(__result_.index() != __variant_npos);
STDEXEC::__visit(__visit_fn,
static_cast<_ResultVariant&&>(__result_),
static_cast<_Receiver&&>(__rcvr));
static_cast<_ResultsStorage&&>(__result_).__complete(__rcvr);
}
}

_ResultVariant __result_{STDEXEC::__no_init};
_ResultsStorage __result_;
__std::atomic<int> __emplaced_{0};
};

template <class _ItemReceiver, class _ResultVariant>
template <class _ItemReceiver, class _ResultsStorage>
struct __item_operation_base
{
STDEXEC_ATTRIBUTE(no_unique_address) _ItemReceiver __rcvr_;
__result_type<_ResultVariant>* __result_;
__result_type<_ResultsStorage>* __result_;
};

template <class _ItemReceiver, class _ResultVariant>
template <class _ItemReceiver, class _ResultsStorage>
struct __item_receiver
{
using receiver_concept = STDEXEC::receiver_tag;
__item_operation_base<_ItemReceiver, _ResultVariant>* __op_;
__item_operation_base<_ItemReceiver, _ResultsStorage>* __op_;

template <class... _Args>
void set_value([[maybe_unused]] _Args&&... __args) noexcept
Expand All @@ -117,7 +114,7 @@ namespace experimental::execution
}

template <class _Error>
requires __variant_emplaceable<_ResultVariant,
requires __variant_emplaceable<_ResultsStorage,
__decayed_tuple<set_error_t, _Error>,
set_error_t,
_Error>
Expand All @@ -130,7 +127,9 @@ namespace experimental::execution
}

void set_stopped() noexcept
requires __variant_emplaceable<_ResultVariant, __decayed_tuple<set_stopped_t>, set_stopped_t>
requires __variant_emplaceable<_ResultsStorage,
__decayed_tuple<set_stopped_t>,
set_stopped_t>
&& __callable<set_stopped_t, _ItemReceiver>
{
// stop without error
Expand All @@ -144,15 +143,15 @@ namespace experimental::execution
}
};

template <class _Sender, class _ItemReceiver, class _ResultVariant>
struct __item_operation : __item_operation_base<_ItemReceiver, _ResultVariant>
template <class _Sender, class _ItemReceiver, class _ResultsStorage>
struct __item_operation : __item_operation_base<_ItemReceiver, _ResultsStorage>
{
using __base_t = __item_operation_base<_ItemReceiver, _ResultVariant>;
using __item_receiver_t = __item_receiver<_ItemReceiver, _ResultVariant>;
using __base_t = __item_operation_base<_ItemReceiver, _ResultsStorage>;
using __item_receiver_t = __item_receiver<_ItemReceiver, _ResultsStorage>;

__item_operation(__result_type<_ResultVariant>* __parent,
_Sender&& __sndr,
_ItemReceiver __rcvr)
__item_operation(__result_type<_ResultsStorage>* __parent,
_Sender&& __sndr,
_ItemReceiver __rcvr)
noexcept(__nothrow_decay_copyable<_ItemReceiver>
&& __nothrow_connectable<_Sender, __item_receiver_t>)
: __base_t{static_cast<_ItemReceiver&&>(__rcvr), __parent}
Expand All @@ -167,18 +166,18 @@ namespace experimental::execution
connect_result_t<_Sender, __item_receiver_t> __op_;
};

template <class _Sender, class _ResultVariant>
template <class _Sender, class _ResultsStorage>
struct __item_sender
{
using sender_concept = STDEXEC::sender_tag;
using completion_signatures = STDEXEC::completion_signatures<set_value_t(), set_stopped_t()>;

template <class _Self, class _Receiver>
using __operation_t =
__item_operation<__copy_cvref_t<_Self, _Sender>, _Receiver, _ResultVariant>;
__item_operation<__copy_cvref_t<_Self, _Sender>, _Receiver, _ResultsStorage>;

template <class _Receiver>
using __item_receiver_t = __item_receiver<_Receiver, _ResultVariant>;
using __item_receiver_t = __item_receiver<_Receiver, _ResultsStorage>;

template <__decays_to<__item_sender> _Self,
STDEXEC::receiver_of<completion_signatures> _Receiver>
Expand All @@ -192,23 +191,23 @@ namespace experimental::execution
}
STDEXEC_EXPLICIT_THIS_END(connect)

_Sender __sender_;
__result_type<_ResultVariant>* __parent_;
_Sender __sender_;
__result_type<_ResultsStorage>* __parent_;
};

template <class _Receiver, class _ResultVariant>
struct __operation_base : __result_type<_ResultVariant>
template <class _Receiver, class _ResultsStorage>
struct __operation_base : __result_type<_ResultsStorage>
{
STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
_Receiver __rcvr_;
};

template <class _Receiver, class _ResultVariant>
template <class _Receiver, class _ResultsStorage>
struct __receiver
{
using receiver_concept = STDEXEC::receiver_tag;

constexpr explicit __receiver(__operation_base<_Receiver, _ResultVariant>* __op) noexcept
constexpr explicit __receiver(__operation_base<_Receiver, _ResultsStorage>* __op) noexcept
: __op_{__op}
{}

Expand All @@ -218,7 +217,7 @@ namespace experimental::execution
template <sender _Item>
[[nodiscard]]
auto set_next(_Item&& __item) & noexcept(__nothrow_decay_copyable<_Item>)
-> __item_sender<__decay_t<_Item>, _ResultVariant>
-> __item_sender<__decay_t<_Item>, _ResultsStorage>
{
return {static_cast<_Item&&>(__item), __op_};
}
Expand All @@ -245,32 +244,30 @@ namespace experimental::execution
}

private:
__operation_base<_Receiver, _ResultVariant>* __op_;
__operation_base<_Receiver, _ResultsStorage>* __op_;
};

template <class _Sigs>
using __result_variant_ = __transform_reduce_completion_signatures_t<
_Sigs,
__mconst<__mlist<>>::__f,
__mcompose_q<__mlist, __mbind_front_q<__decayed_tuple, set_error_t>::__f>::__f,
__mlist<__tuple<set_stopped_t>>,
__mconcat<__qq<__variant>>::__f>;
template <class _Signature>
using __is_set_value_signature_t =
__mbool<__same_as<set_value_t, __signature_tag_t<_Signature>>>;

// Storage for the non-set_value completions
template <class _Sender, class _Env>
using __result_variant_t =
__result_variant_<__sequence_completion_signatures_of_t<_Sender, _Env>>;
__mapply<__mremove_if<__q1<__is_set_value_signature_t>, __qq<__results_storage>>,
__sequence_completion_signatures_of_t<_Sender, _Env>>;

template <class _Sender, class _Receiver>
struct __operation
: __operation_base<_Receiver, __result_variant_t<_Sender, env_of_t<_Receiver>>>
{
using _ResultVariant = __result_variant_t<_Sender, env_of_t<_Receiver>>;
using __base_type = __operation_base<_Receiver, _ResultVariant>;
using __receiver_t = __receiver<_Receiver, _ResultVariant>;
using __variant_t = __result_variant_t<_Sender, env_of_t<_Receiver>>;
using __base_t = __operation_base<_Receiver, __variant_t>;
using __receiver_t = __receiver<_Receiver, __variant_t>;

explicit __operation(_Sender&& __sndr, _Receiver __rcvr)
noexcept(__nothrow_subscribable<_Sender, __receiver_t>)
: __base_type{{}, static_cast<_Receiver&&>(__rcvr)}
: __base_t{{}, static_cast<_Receiver&&>(__rcvr)}
, __op_{exec::subscribe(static_cast<_Sender&&>(__sndr), __receiver_t{this})}
{}

Expand Down
9 changes: 2 additions & 7 deletions include/exec/sequence/merge_each.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,8 @@ namespace experimental::execution
void start() & noexcept
{
// emit delayed error into the sequence
STDEXEC::__visit(
[this](auto&& __error) noexcept
{
STDEXEC::set_error(static_cast<_ErrorReceiver&&>(__rcvr_),
static_cast<decltype(__error)&&>(__error));
},
static_cast<_ErrorStorage&&>(*__op_->__error_storage_));
STDEXEC::__visit(STDEXEC::__mk_completion_fn(STDEXEC::set_error, __rcvr_),
static_cast<_ErrorStorage&&>(*__op_->__error_storage_));
}

_ErrorReceiver __rcvr_;
Expand Down
Loading
Loading