Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended view detection #44

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
45 changes: 44 additions & 1 deletion src/resilience/AutomaticCheckpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,50 @@
* Questions? Contact Christian R. Trott ([email protected])
*/
#include "AutomaticCheckpoint.hpp"
#include <algorithm>

namespace KokkosResilience
{
}
namespace Detail
{
bool iter_is_unfiltered;

bool viewHolderEQ(KokkosResilience::ViewHolder &a, KokkosResilience::ViewHolder &b){
bool labelsMatch = a->label() == b->label();
bool pointersMatch = a->data() == b->data();

//Don't remove duplicates to different pointers here, do it later so we can remove both.
return labelsMatch && pointersMatch;
}

bool viewHolderLT(KokkosResilience::ViewHolder &a, KokkosResilience::ViewHolder &b){
return a->label() < b->label();
}

void removeDuplicateViews(std::vector< KokkosResilience::ViewHolder> &viewVec){
std::sort(viewVec.begin(), viewVec.end(), viewHolderLT);
viewVec.erase(std::unique(viewVec.begin(), viewVec.end(), viewHolderEQ), viewVec.end());

//Remove all copies of views matching labels but with different pointers
/*for(int i = 0; i < viewVec.size()-1; ++i){
int nDups = 0;
fprintf(stderr, "Checking %d (%p) vs %d (%p) w/ size %d\n", i, viewVec[i].get(), i+1, viewVec[i+1].get(), viewVec.size());
fprintf(stderr, "Checking \"%s\" vs \"%s\"\n", viewVec[i]->label().c_str(), viewVec[i+1]->label().c_str());
while( (i+nDups+1 < viewVec.size()) &&
(viewVec[i]->label() == viewVec[i+1+nDups]->label())){
nDups++;
fprintf(stderr, "Match! checking %d vs %d (%p)\n", i, i+1+nDups, viewVec[i+1+nDups].get());
fprintf(stderr, "Checking \"%s\" vs \"%s\"\n", viewVec[i]->label().c_str(), viewVec[i+1+nDups]->label().c_str());
}

if(nDups > 0){
nDups++; //Count the original too
std::cerr << "Warning: Found " << nDups << " views w/ same label (\"" << viewVec[i]->label() << "\") and different pointers in same context!" << std::endl;
viewVec.erase(viewVec.begin()+i, viewVec.begin()+i+nDups);
--i;
}
}*/
}

}
}
149 changes: 117 additions & 32 deletions src/resilience/AutomaticCheckpoint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
namespace KokkosResilience
{


template< typename Context >
int latest_version( Context &ctx, const std::string &label )
{
Expand All @@ -75,9 +76,19 @@ namespace KokkosResilience
}

namespace Detail
{
{
void removeDuplicateViews(std::vector< KokkosResilience::ViewHolder> &viewVec);

extern std::vector< KokkosResilience::ViewHolder > views;
extern bool iter_is_unfiltered;

//Capture only means don't do any checkpointing/recovering here, we're just capturing for a larger checkpoint
//scope.
//chkpt_gathering indicates that we are calling gather_views within this function. Needed to make sure the
//function is run before we try restarting.
template< typename Context, typename F, typename FilterFunc >
void checkpoint_impl( Context &ctx, const std::string &label, int iteration, F &&fun, FilterFunc &&filter )
void checkpoint_impl( Context &ctx, const std::string &label, int iteration, F &&fun, FilterFunc &&filter,
bool chkpt_internal, bool chkpt_captured, bool capture_only, bool chkpt_gathering)
{
#if defined( KOKKOS_ACTIVE_EXECUTION_MEMORY_SPACE_HOST )

Expand All @@ -92,70 +103,119 @@ namespace KokkosResilience

using fun_type = typename std::remove_reference< F >::type;

if ( filter( iteration ) )
{
// Copy the functor, since if it has any views we can turn on view tracking
std::vector< KokkosResilience::ViewHolder > views;
//On capture only calls use the surrounding scope's filter.
if(!capture_only){
iter_is_unfiltered = filter(iteration);
}

if ( iter_is_unfiltered )
{
// Don't do anything with const views since they can never be checkpointed in this context
KokkosResilience::DynamicViewHooks::copy_constructor_set.set_callback( [&views]( const KokkosResilience::ViewHolder &view ) {
views.emplace_back( view );
KokkosResilience::DynamicViewHooks::copy_constructor_set.set_callback( [&ctx]( const KokkosResilience::ViewHolder &view ) {
ctx.view_holders.emplace_back( view );
} );

std::vector< Detail::CrefImpl > crefs;
Detail::Cref::check_ref_list = &crefs;
//Detail::Cref::check_ref_list = &crefs; //TODO: Figure out why this is broken w/ chckpt_internal

if(chkpt_captured){
// Copy the functor, since if it has any views we can turn on view tracking
fun_type f = fun;
}

if(!chkpt_internal){
Detail::Cref::check_ref_list = nullptr;

KokkosResilience::DynamicViewHooks::copy_constructor_set.reset();
}

fun_type f = fun;
if(chkpt_internal || chkpt_gathering){
// Execute functor and checkpoint
#ifdef KR_ENABLE_TRACING
auto function_trace = Util::begin_trace< Util::TimingTrace< std::string > >( ctx, "function" );
#endif
fun();
#ifdef KR_ENABLE_TRACING
Kokkos::fence(); // Get accurate measurements for function_trace end
function_trace.end();
#endif
}

if(chkpt_internal){
Detail::Cref::check_ref_list = nullptr;

Detail::Cref::check_ref_list = nullptr;
KokkosResilience::DynamicViewHooks::copy_constructor_set.reset();
}


KokkosResilience::DynamicViewHooks::copy_constructor_set.reset();
if(!capture_only && (chkpt_internal || chkpt_gathering)){
removeDuplicateViews(ctx.view_holders);
}

#ifdef KR_ENABLE_TRACING
auto reg_hashes = Util::begin_trace< Util::TimingTrace< std::string > >( ctx, "register" );
#endif
// Register any views that haven't already been registered
ctx.register_hashes( views, crefs );

ctx.register_hashes( ctx.view_holders, crefs );
#ifdef KR_ENABLE_TRACING
reg_hashes.end();
auto check_restart = Util::begin_trace< Util::TimingTrace< std::string > >( ctx, "check" );
reg_hashes.end();
#endif

bool restart_available = ctx.restart_available( label, iteration );
bool restart_available;
if(!capture_only){
#ifdef KR_ENABLE_TRACING
check_restart.end();
overhead_trace.end();
auto check_restart = Util::begin_trace< Util::TimingTrace< std::string > >( ctx, "check" );
#endif
restart_available = ctx.restart_available( label, iteration );
#ifdef KR_ENABLE_TRACING
check_restart.end();
overhead_trace.end();
#endif
} else {
restart_available = false;
}

if ( restart_available )
if (restart_available)
{
// Load views with data
#ifdef KR_ENABLE_TRACING
auto restart_trace = Util::begin_trace< Util::TimingTrace< std::string > >( ctx, "restart" );
#endif
ctx.restart( label, iteration, views );
auto ts = std::chrono::system_clock::to_time_t( std::chrono::system_clock::now() );
std::cout << '[' << std::put_time( std::localtime( &ts ), "%c" ) << "] initiating restart\n";
ctx.restart( label, iteration, ctx.view_holders );
} else
{
// Execute functor and checkpoint
if(!chkpt_internal && !chkpt_gathering){
// Execute functor and checkpoint
#ifdef KR_ENABLE_TRACING
auto function_trace = Util::begin_trace< Util::TimingTrace< std::string > >( ctx, "function" );
auto function_trace = Util::begin_trace< Util::TimingTrace< std::string > >( ctx, "function" );
#endif
fun();
fun();
#ifdef KR_ENABLE_TRACING
Kokkos::fence(); // Get accurate measurements for function_trace end
function_trace.end();
Kokkos::fence(); // Get accurate measurements for function_trace end
function_trace.end();
#endif
}

if(!capture_only){
if(!chkpt_internal){
removeDuplicateViews(ctx.view_holders);
}

{
#ifdef KR_ENABLE_TRACING
auto write_trace = Util::begin_trace< Util::TimingTrace< std::string > >( ctx, "checkpoint" );
#endif
auto ts = std::chrono::system_clock::to_time_t( std::chrono::system_clock::now() );
std::cout << '[' << std::put_time( std::localtime( &ts ), "%c" ) << "] initiating checkpoint\n";
ctx.checkpoint( label, iteration, views );
ctx.checkpoint( label, iteration, ctx.view_holders );
}
}

if(!capture_only){
ctx.view_holders.clear();
}
} else
{ // Iteration is filtered, just execute
#ifdef KR_ENABLE_TRACING
Expand All @@ -180,18 +240,43 @@ namespace KokkosResilience
}
}

//chkpt_internal captures all views constructed within the function for checkpointing
//chkpt_captured captures all views captured by the function for checkpointing
template< typename Context, typename F, typename FilterFunc >
void checkpoint( Context &ctx, const std::string &label, int iteration, F &&fun, FilterFunc &&filter )
void checkpoint( Context &ctx, const std::string &label, int iteration, F &&fun, FilterFunc &&filter,
bool chkpt_internal = false, bool chkpt_captured = true)
{
Detail::checkpoint_impl( ctx, label, iteration, std::forward< F >( fun ), std::forward< FilterFunc >( filter ) );
Detail::checkpoint_impl( ctx, label, iteration, std::forward< F >( fun ), std::forward< FilterFunc >( filter ), chkpt_internal, chkpt_captured, false, false);
}

template< typename Context, typename F >
void checkpoint( Context &ctx, const std::string &label, int iteration, F &&fun )
void checkpoint( Context &ctx, const std::string &label, int iteration, F &&fun, bool chkpt_internal = false,
bool chkpt_captured = true)
{
Detail::checkpoint_impl( ctx, label, iteration, std::forward< F >( fun ), ctx.default_filter(), chkpt_internal, chkpt_captured, false, false);
}

template< typename Context, typename F, typename FilterFunc >
void checkpoint_gather( Context &ctx, const std::string &label, int iteration, F &&fun, FilterFunc &&filter,
bool chkpt_captured = true)
{
Detail::checkpoint_impl( ctx, label, iteration, std::forward< F >( fun ), std::forward< FilterFunc >( filter ), false, chkpt_captured, false, true);
}

template< typename Context, typename F>
void checkpoint_gather( Context &ctx, const std::string &label, int iteration, F &&fun, bool chkpt_captured = true)
{
Detail::checkpoint_impl( ctx, label, iteration, std::forward< F >( fun ), ctx.default_filter() );
Detail::checkpoint_impl( ctx, label, iteration, std::forward< F >( fun ), ctx.default_filter(), false, chkpt_captured, false, true);
}

template< typename Context, typename F>
void gather_views( Context &ctx, F &&fun, bool chkpt_internal = false, bool chkpt_captured = true)
{
Detail::checkpoint_impl( ctx, "gather", 0, std::forward< F >( fun ), ctx.default_filter(), chkpt_internal, chkpt_captured, true, false);
}



}

namespace kr = KokkosResilience;
Expand Down
2 changes: 2 additions & 0 deletions src/resilience/Context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ namespace KokkosResilience
Util::detail::TraceStack &trace() { return m_trace; };
#endif

std::vector<KokkosResilience::ViewHolder> view_holders;

private:

Config m_config;
Expand Down
37 changes: 26 additions & 11 deletions src/resilience/stdfile/StdFileBackend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ namespace KokkosResilience {

namespace detail {

std::string versionless_filename(std::string const &filename, std::string const &label) {
return filename + "." + label;
}

std::string full_filename(std::string const &filename, std::string const &label,
int version) {
return filename + "." + label + "." + std::to_string(version);
return versionless_filename(filename, label) + "." + std::to_string(version);
}
} // namespace detail

Expand All @@ -71,7 +75,6 @@ StdFileBackend::~StdFileBackend() = default;
void StdFileBackend::checkpoint(
const std::string &label, int version,
const std::vector< KokkosResilience::ViewHolder > &views) {
bool status = true;
try {
std::string filename = detail::full_filename(m_filename, label, version);
std::ofstream file(filename, std::ios::binary);
Expand All @@ -90,7 +93,7 @@ void StdFileBackend::checkpoint(
write_trace.end();
#endif
} catch (...) {
status = false;
return; //TODO: error handling?
}
}

Expand All @@ -101,11 +104,24 @@ bool StdFileBackend::restart_available(const std::string &label, int version) {

int StdFileBackend::latest_version(const std::string &label) const noexcept {
int result = -1;
for (int version = 0; /**/; ++version) {
std::string filename = detail::full_filename(m_filename, label, version);
if (!boost::filesystem::exists(filename)) {
result = version - 1;
break;
std::string filename = detail::versionless_filename(m_filename, label);
boost::filesystem::path dir(filename);
filename = dir.filename().string();

dir = boost::filesystem::absolute(dir).parent_path();

for(auto &entry : boost::filesystem::directory_iterator(dir)){
if (!boost::filesystem::is_regular_file(entry)) {
continue;
}
if(filename == entry.path().filename().stem().string()){
//This is a checkpoint, probably.
try{
int vers = std::stoi(entry.path().filename().extension().string().substr(1));
result = std::max(result,vers);
} catch(...) {
//Just not the filename format we expected, could be unrelated.
}
}
}
return result;
Expand All @@ -114,7 +130,6 @@ int StdFileBackend::latest_version(const std::string &label) const noexcept {
void StdFileBackend::restart(
const std::string &label, int version,
const std::vector< KokkosResilience::ViewHolder > &views) {
bool status = true;
try {
std::string filename = detail::full_filename(m_filename, label, version);
std::ifstream file(filename, std::ios::binary);
Expand All @@ -126,14 +141,14 @@ void StdFileBackend::restart(
for (auto &&v : views) {
char *bytes = static_cast<char *>(v->data());
std::size_t len = v->span() * v->data_type_size();

file.read(bytes, len);
}
#ifdef KR_ENABLE_TRACING
read_trace.end();
#endif
} catch (...) {
status = false;
return; //TODO: Error handling?
}
}
} // namespace KokkosResilience