Program Listing for File Outlet.hpp

Return to documentation for file (include/uit/spouts/Outlet.hpp)

#pragma once
#ifndef UIT_SPOUTS_OUTLET_HPP_INCLUDE
#define UIT_SPOUTS_OUTLET_HPP_INCLUDE

#include <cassert>
#include <cstdint>
#include <iostream>
#include <limits>
#include <memory>
#include <optional>
#include <stddef.h>
#include <utility>

#include "../../uitsl/debug/occupancy_audit.hpp"
#include "../../uitsl/nonce/CircularIndex.hpp"
#include "../../uitsl/parallel/thread_utils.hpp"

#include "../ducts/Duct.hpp"

namespace uit {

template<typename ImplSpec_>
class Outlet {

public:
  using ImplSpec = ImplSpec_;

private:
  using T = typename ImplSpec::T;
  constexpr inline static size_t N{ImplSpec::N};

  using index_t = uitsl::CircularIndex<N>;

  using duct_t = internal::Duct<ImplSpec>;
  std::shared_ptr<duct_t> duct;

  // TODO move this to ImplSpec?
  static_assert(N > 0);

  mutable size_t read_count{0};

  mutable bool cur_revision_unread{true};

  mutable size_t fresh_read_count{0};

  size_t revision_count{1};

  size_t net_flux{0};

  size_t nonblocking_pull_attempt_count{0};

  size_t laden_nonblocking_pull_count{0};

  size_t blocking_pull_count{0};

  size_t pulls_that_blocked_count{0};

  uitsl_occupancy_auditor;

  size_t TryConsumeGets(const size_t n) {
    uitsl_occupancy_audit(1);
    return LogStep( duct->TryConsumeGets(n) );
  }

  size_t LogStep(const size_t n) {
    revision_count += (n > 0);
    if (n > 0) cur_revision_unread = true;
    net_flux += n;
    return n;
  }

  void LogRead() const {
    ++read_count;
    fresh_read_count += cur_revision_unread;
    cur_revision_unread = false;
  }

public:

  Outlet(
    std::shared_ptr<duct_t> duct_
  ) : duct(duct_) { ; }

  size_t TryStep(const size_t num_steps=1) {
    ++nonblocking_pull_attempt_count;
    const size_t res = TryConsumeGets(num_steps);
    if ( res ) ++laden_nonblocking_pull_count;
    return res;
  }

  size_t Jump() {
    return TryStep( std::numeric_limits<size_t>::max() );
  }

  const T& Get() const { LogRead(); return std::as_const(*duct).Get(); }

  T& Get() { LogRead(); return duct->Get(); }

  const T& JumpGet() {
    uitsl_occupancy_audit(1);
    Jump();
    return Get();
  }

  void Step(size_t num_steps=1) {
    uitsl_occupancy_audit(1);
    blocking_pull_count += num_steps;

    while (num_steps) {
      size_t uncounted_steps{};
      bool was_blocked{ false };
      do {
        uncounted_steps = TryConsumeGets(num_steps);
        was_blocked |= (uncounted_steps == 0);
      } while ( uncounted_steps == 0 );

      num_steps -= uncounted_steps;
      pulls_that_blocked_count += was_blocked;
    }

  }

  const T& GetNext(const size_t num_steps=1) {
    Step(num_steps);
    return Get();
  }

  using optional_ref_t = std::optional<std::reference_wrapper<const T>>;

  optional_ref_t GetNextOrNullopt() {
    uitsl_occupancy_audit(1);
    return TryStep()
      ? optional_ref_t{ std::reference_wrapper{ Get() } }
      : std::nullopt;
  }

  size_t GetNumReadsPerformed() const { return read_count; }

  size_t GetNumReadsThatWereFresh() const { return fresh_read_count; }

  size_t GetNumReadsThatWereStale() const {
    assert( read_count >= fresh_read_count );
    return read_count - fresh_read_count;
  }

  size_t GetNumRevisionsPulled() const { return revision_count; }

  size_t GetNumTryPullsAttempted() const {
    return nonblocking_pull_attempt_count;
  }

  size_t GetNumBlockingPulls() const {
    return blocking_pull_count;
  }

  size_t GetNumBlockingPullsThatBlocked() const {
    return pulls_that_blocked_count;
  }

  size_t GetNumRevisionsFromTryPulls() const {
    return laden_nonblocking_pull_count;
  }

  size_t GetNumRevisionsFromBlockingPulls() const {
    assert(revision_count >= GetNumRevisionsFromTryPulls());
    return revision_count - GetNumRevisionsFromTryPulls();
  }

  size_t GetNumPullsAttempted() const {
    return nonblocking_pull_attempt_count + blocking_pull_count;
  }

  size_t GetNumPullsThatWereLadenEventually() const {
    return GetNumTryPullsThatWereLaden() + GetNumBlockingPulls();
  }

  size_t GetNumBlockingPullsThatWereLadenImmediately() const {
    assert( GetNumBlockingPulls() >= GetNumBlockingPullsThatBlocked() );
    return GetNumBlockingPulls() - GetNumBlockingPullsThatBlocked();
  }

  size_t GetNumBlockingPullsThatWereLadenEventually() const {
    return GetNumBlockingPulls();
  }

  size_t GetNumPullsThatWereLadenImmediately() const {
    return GetNumTryPullsThatWereLaden()
    + GetNumBlockingPullsThatWereLadenImmediately();
  }

  size_t GetNumTryPullsThatWereLaden() const {
    return laden_nonblocking_pull_count;
  }

  size_t GetNumTryPullsThatWereUnladen() const {
    assert(nonblocking_pull_attempt_count >= laden_nonblocking_pull_count);
    return nonblocking_pull_attempt_count - laden_nonblocking_pull_count;
  }

  double GetFractionTryPullsThatWereLaden() const {
    assert(GetNumTryPullsThatWereLaden() >= GetNumTryPullsAttempted());
    return (
      GetNumTryPullsThatWereLaden()
      / static_cast<double>( GetNumTryPullsAttempted() )
    );
  }

  double GetFractionTryPullsThatWereUnladen() const {
    return 1.0 - GetFractionTryPullsThatWereLaden();
  }

  double GetFractionBlockingPullsThatBlocked() const {
    assert(
      GetNumBlockingPullsThatBlocked()
      >= GetNumBlockingPulls()
    );
    return (
      GetNumBlockingPullsThatBlocked()
      / static_cast<double>( GetNumBlockingPulls() )
    );
  }

  double GetFractionBlockingPullsThatWereLadenImmediately() const {
    return 1.0 - GetFractionBlockingPullsThatBlocked();
  }

  double GetFractionPullsThatWereLadenImmediately() const {
    assert(GetNumPullsThatWereLadenImmediately() >= GetNumPullsAttempted());
    return (
      GetNumPullsThatWereLadenImmediately()
      / static_cast<double>( GetNumPullsAttempted() )
    );
  }

  double GetFractionPullsThatWereLadenEventually() const {
    assert(GetNumPullsThatWereLadenEventually() >= GetNumPullsAttempted());
    return (
      GetNumPullsThatWereLadenEventually()
      / static_cast<double>( GetNumPullsAttempted() )
    );
  }


  size_t GetNetFluxThroughDuct() const { return net_flux; }

  double GetFractionReadsThatWereFresh() const {
    return fresh_read_count / static_cast<double>(read_count);
  }

  double GetFractionReadsThatWereStale() const {
    return 1.0 - GetFractionReadsThatWereFresh();
  }

  double GetFractionRevisionsThatWereRead() const {
    return fresh_read_count / static_cast<double>(revision_count);
  }

  double GetFractionRevisionsThatWereNotRead() const {
    return 1.0 - GetFractionRevisionsThatWereRead();
  }

  double GetFractionDuctFluxThatWasSteppedThrough() const {
    return revision_count / static_cast<double>(net_flux);
  }

  double GetFractionDuctFluxThatWasJumpedOver() const {
    return 1.0 - GetFractionDuctFluxThatWasSteppedThrough();
  }

  double GetFractionDuctFluxThatWasRead() const {
    return fresh_read_count / static_cast<double>(net_flux);
  }

  template <typename WhichDuct, typename... Args>
  void EmplaceDuct(Args&&... args) {
    duct->template EmplaceImpl<WhichDuct>(std::forward<Args>(args)...);
  }

  template <typename WhichDuct, typename... Args>
  void SplitDuct(Args&&... args) {
    duct = std::make_shared<duct_t>(
      std::in_place_type_t<WhichDuct>{},
      std::forward<Args>(args)...
    );
  }

  typename duct_t::uid_t GetDuctUID() const { return duct->GetUID(); }

  std::optional<bool> HoldsIntraImpl() const { return duct->HoldsIntraImpl(); }

  std::optional<bool> HoldsThreadImpl() const {
    return duct->HoldsThreadImpl();
  }

  std::optional<bool> HoldsProcImpl() const { return duct->HoldsProcImpl(); }

  std::string WhichImplHeld() const { return duct->WhichImplHeld(); }

  bool CanStep() const { return duct->CanStep(); }

  // exclusively for instrumentation purposes
  void RegisterInletProc(const uitsl::proc_id_t proc) const {
    duct->RegisterInletProc(proc);
  }

  // exclusively for instrumentation purposes
  void RegisterInletThread(const uitsl::thread_id_t thread) const {
    duct->RegisterInletThread(thread);
  }

  // exclusively for instrumentation purposes
  void RegisterOutletProc(const uitsl::proc_id_t proc) const {
    duct->RegisterOutletProc(proc);
  }

  // exclusively for instrumentation purposes
  void RegisterOutletThread(const uitsl::thread_id_t thread) const {
    duct->RegisterOutletThread(thread);
  }

  void RegisterEdgeID(const size_t edge_id) const {
    duct->RegisterEdgeID(edge_id);
  }

  void RegisterInletNodeID(const size_t node_id) const {
    duct->RegisterInletNodeID(node_id);
  }

  void RegisterOutletNodeID(const size_t node_id) const {
    duct->RegisterOutletNodeID(node_id);
  }

  void RegisterMeshID(const size_t mesh_id) const {
    duct->RegisterMeshID(mesh_id);
  }

  std::optional<uitsl::proc_id_t> LookupOutletProc() const {
    return duct->LookupOutletProc();
  }

  std::optional<uitsl::thread_id_t> LookupOutletThread() const {
    return duct->LookupOutletThread();
  }

  std::optional<uitsl::proc_id_t> LookupInletProc() const {
    return duct->LookupInletProc();
  }

  std::optional<uitsl::thread_id_t> LookupInletThread() const {
    return duct->LookupInletThread();
  }

  std::optional<size_t> LookupEdgeID() const {
    return duct->LookupEdgeID();
  }

  std::optional<size_t> LookupInletNodeID() const {
    return duct->LookupInletNodeID();
  }

  std::optional<size_t> LookupOutletNodeID() const {
    return duct->LookupOutletNodeID();
  }

  std::optional<size_t> LookupMeshID() const { return duct->LookupMeshID(); }

  std::string ToString() const {
    std::stringstream ss;
    ss << uitsl::format_member("std::shared_ptr<duct_t> duct", *duct) << '\n';
    ss << uitsl::format_member("size_t read_count", read_count) << '\n';
    ss << uitsl::format_member("size_t revision_count", revision_count) << '\n';
    ss << uitsl::format_member("size_t net_flux", net_flux);
    return ss.str();
  }

};

} // namespace uit

#endif // #ifndef UIT_SPOUTS_OUTLET_HPP_INCLUDE