Program Listing for File Outlet.hpp¶
↰ Return to documentation for file (include/uit/spouts/Outlet.hpp)
#pragma once
#ifndef UIT_SPOUTS_OUTLET_HPP_INCLUDE
#define UIT_SPOUTS_OUTLET_HPP_INCLUDE
#include <cassert>
#include <cstdint>
#include <iostream>
#include <limits>
#include <memory>
#include <optional>
#include <stddef.h>
#include <utility>
#include "../../uitsl/debug/occupancy_audit.hpp"
#include "../../uitsl/nonce/CircularIndex.hpp"
#include "../../uitsl/parallel/thread_utils.hpp"
#include "../ducts/Duct.hpp"
namespace uit {
template<typename ImplSpec_>
class Outlet {
public:
using ImplSpec = ImplSpec_;
private:
using T = typename ImplSpec::T;
constexpr inline static size_t N{ImplSpec::N};
using index_t = uitsl::CircularIndex<N>;
using duct_t = internal::Duct<ImplSpec>;
std::shared_ptr<duct_t> duct;
// TODO move this to ImplSpec?
static_assert(N > 0);
mutable size_t read_count{0};
mutable bool cur_revision_unread{true};
mutable size_t fresh_read_count{0};
size_t revision_count{1};
size_t net_flux{0};
size_t nonblocking_pull_attempt_count{0};
size_t laden_nonblocking_pull_count{0};
size_t blocking_pull_count{0};
size_t pulls_that_blocked_count{0};
uitsl_occupancy_auditor;
size_t TryConsumeGets(const size_t n) {
uitsl_occupancy_audit(1);
return LogStep( duct->TryConsumeGets(n) );
}
size_t LogStep(const size_t n) {
revision_count += (n > 0);
if (n > 0) cur_revision_unread = true;
net_flux += n;
return n;
}
void LogRead() const {
++read_count;
fresh_read_count += cur_revision_unread;
cur_revision_unread = false;
}
public:
Outlet(
std::shared_ptr<duct_t> duct_
) : duct(duct_) { ; }
size_t TryStep(const size_t num_steps=1) {
++nonblocking_pull_attempt_count;
const size_t res = TryConsumeGets(num_steps);
if ( res ) ++laden_nonblocking_pull_count;
return res;
}
size_t Jump() {
return TryStep( std::numeric_limits<size_t>::max() );
}
const T& Get() const { LogRead(); return std::as_const(*duct).Get(); }
T& Get() { LogRead(); return duct->Get(); }
const T& JumpGet() {
uitsl_occupancy_audit(1);
Jump();
return Get();
}
void Step(size_t num_steps=1) {
uitsl_occupancy_audit(1);
blocking_pull_count += num_steps;
while (num_steps) {
size_t uncounted_steps{};
bool was_blocked{ false };
do {
uncounted_steps = TryConsumeGets(num_steps);
was_blocked |= (uncounted_steps == 0);
} while ( uncounted_steps == 0 );
num_steps -= uncounted_steps;
pulls_that_blocked_count += was_blocked;
}
}
const T& GetNext(const size_t num_steps=1) {
Step(num_steps);
return Get();
}
using optional_ref_t = std::optional<std::reference_wrapper<const T>>;
optional_ref_t GetNextOrNullopt() {
uitsl_occupancy_audit(1);
return TryStep()
? optional_ref_t{ std::reference_wrapper{ Get() } }
: std::nullopt;
}
size_t GetNumReadsPerformed() const { return read_count; }
size_t GetNumReadsThatWereFresh() const { return fresh_read_count; }
size_t GetNumReadsThatWereStale() const {
assert( read_count >= fresh_read_count );
return read_count - fresh_read_count;
}
size_t GetNumRevisionsPulled() const { return revision_count; }
size_t GetNumTryPullsAttempted() const {
return nonblocking_pull_attempt_count;
}
size_t GetNumBlockingPulls() const {
return blocking_pull_count;
}
size_t GetNumBlockingPullsThatBlocked() const {
return pulls_that_blocked_count;
}
size_t GetNumRevisionsFromTryPulls() const {
return laden_nonblocking_pull_count;
}
size_t GetNumRevisionsFromBlockingPulls() const {
assert(revision_count >= GetNumRevisionsFromTryPulls());
return revision_count - GetNumRevisionsFromTryPulls();
}
size_t GetNumPullsAttempted() const {
return nonblocking_pull_attempt_count + blocking_pull_count;
}
size_t GetNumPullsThatWereLadenEventually() const {
return GetNumTryPullsThatWereLaden() + GetNumBlockingPulls();
}
size_t GetNumBlockingPullsThatWereLadenImmediately() const {
assert( GetNumBlockingPulls() >= GetNumBlockingPullsThatBlocked() );
return GetNumBlockingPulls() - GetNumBlockingPullsThatBlocked();
}
size_t GetNumBlockingPullsThatWereLadenEventually() const {
return GetNumBlockingPulls();
}
size_t GetNumPullsThatWereLadenImmediately() const {
return GetNumTryPullsThatWereLaden()
+ GetNumBlockingPullsThatWereLadenImmediately();
}
size_t GetNumTryPullsThatWereLaden() const {
return laden_nonblocking_pull_count;
}
size_t GetNumTryPullsThatWereUnladen() const {
assert(nonblocking_pull_attempt_count >= laden_nonblocking_pull_count);
return nonblocking_pull_attempt_count - laden_nonblocking_pull_count;
}
double GetFractionTryPullsThatWereLaden() const {
assert(GetNumTryPullsThatWereLaden() >= GetNumTryPullsAttempted());
return (
GetNumTryPullsThatWereLaden()
/ static_cast<double>( GetNumTryPullsAttempted() )
);
}
double GetFractionTryPullsThatWereUnladen() const {
return 1.0 - GetFractionTryPullsThatWereLaden();
}
double GetFractionBlockingPullsThatBlocked() const {
assert(
GetNumBlockingPullsThatBlocked()
>= GetNumBlockingPulls()
);
return (
GetNumBlockingPullsThatBlocked()
/ static_cast<double>( GetNumBlockingPulls() )
);
}
double GetFractionBlockingPullsThatWereLadenImmediately() const {
return 1.0 - GetFractionBlockingPullsThatBlocked();
}
double GetFractionPullsThatWereLadenImmediately() const {
assert(GetNumPullsThatWereLadenImmediately() >= GetNumPullsAttempted());
return (
GetNumPullsThatWereLadenImmediately()
/ static_cast<double>( GetNumPullsAttempted() )
);
}
double GetFractionPullsThatWereLadenEventually() const {
assert(GetNumPullsThatWereLadenEventually() >= GetNumPullsAttempted());
return (
GetNumPullsThatWereLadenEventually()
/ static_cast<double>( GetNumPullsAttempted() )
);
}
size_t GetNetFluxThroughDuct() const { return net_flux; }
double GetFractionReadsThatWereFresh() const {
return fresh_read_count / static_cast<double>(read_count);
}
double GetFractionReadsThatWereStale() const {
return 1.0 - GetFractionReadsThatWereFresh();
}
double GetFractionRevisionsThatWereRead() const {
return fresh_read_count / static_cast<double>(revision_count);
}
double GetFractionRevisionsThatWereNotRead() const {
return 1.0 - GetFractionRevisionsThatWereRead();
}
double GetFractionDuctFluxThatWasSteppedThrough() const {
return revision_count / static_cast<double>(net_flux);
}
double GetFractionDuctFluxThatWasJumpedOver() const {
return 1.0 - GetFractionDuctFluxThatWasSteppedThrough();
}
double GetFractionDuctFluxThatWasRead() const {
return fresh_read_count / static_cast<double>(net_flux);
}
template <typename WhichDuct, typename... Args>
void EmplaceDuct(Args&&... args) {
duct->template EmplaceImpl<WhichDuct>(std::forward<Args>(args)...);
}
template <typename WhichDuct, typename... Args>
void SplitDuct(Args&&... args) {
duct = std::make_shared<duct_t>(
std::in_place_type_t<WhichDuct>{},
std::forward<Args>(args)...
);
}
typename duct_t::uid_t GetDuctUID() const { return duct->GetUID(); }
std::optional<bool> HoldsIntraImpl() const { return duct->HoldsIntraImpl(); }
std::optional<bool> HoldsThreadImpl() const {
return duct->HoldsThreadImpl();
}
std::optional<bool> HoldsProcImpl() const { return duct->HoldsProcImpl(); }
std::string WhichImplHeld() const { return duct->WhichImplHeld(); }
bool CanStep() const { return duct->CanStep(); }
// exclusively for instrumentation purposes
void RegisterInletProc(const uitsl::proc_id_t proc) const {
duct->RegisterInletProc(proc);
}
// exclusively for instrumentation purposes
void RegisterInletThread(const uitsl::thread_id_t thread) const {
duct->RegisterInletThread(thread);
}
// exclusively for instrumentation purposes
void RegisterOutletProc(const uitsl::proc_id_t proc) const {
duct->RegisterOutletProc(proc);
}
// exclusively for instrumentation purposes
void RegisterOutletThread(const uitsl::thread_id_t thread) const {
duct->RegisterOutletThread(thread);
}
void RegisterEdgeID(const size_t edge_id) const {
duct->RegisterEdgeID(edge_id);
}
void RegisterInletNodeID(const size_t node_id) const {
duct->RegisterInletNodeID(node_id);
}
void RegisterOutletNodeID(const size_t node_id) const {
duct->RegisterOutletNodeID(node_id);
}
void RegisterMeshID(const size_t mesh_id) const {
duct->RegisterMeshID(mesh_id);
}
std::optional<uitsl::proc_id_t> LookupOutletProc() const {
return duct->LookupOutletProc();
}
std::optional<uitsl::thread_id_t> LookupOutletThread() const {
return duct->LookupOutletThread();
}
std::optional<uitsl::proc_id_t> LookupInletProc() const {
return duct->LookupInletProc();
}
std::optional<uitsl::thread_id_t> LookupInletThread() const {
return duct->LookupInletThread();
}
std::optional<size_t> LookupEdgeID() const {
return duct->LookupEdgeID();
}
std::optional<size_t> LookupInletNodeID() const {
return duct->LookupInletNodeID();
}
std::optional<size_t> LookupOutletNodeID() const {
return duct->LookupOutletNodeID();
}
std::optional<size_t> LookupMeshID() const { return duct->LookupMeshID(); }
std::string ToString() const {
std::stringstream ss;
ss << uitsl::format_member("std::shared_ptr<duct_t> duct", *duct) << '\n';
ss << uitsl::format_member("size_t read_count", read_count) << '\n';
ss << uitsl::format_member("size_t revision_count", revision_count) << '\n';
ss << uitsl::format_member("size_t net_flux", net_flux);
return ss.str();
}
};
} // namespace uit
#endif // #ifndef UIT_SPOUTS_OUTLET_HPP_INCLUDE