Skip to content

Commit

Permalink
Pass-through flushing parameters, add FlushMode
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Feb 24, 2022
1 parent 8793fe8 commit cb3ed7b
Show file tree
Hide file tree
Showing 48 changed files with 287 additions and 227 deletions.
4 changes: 2 additions & 2 deletions include/openPMD/IO/ADIOS/ADIOS1IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class OPENPMDAPI_EXPORT ADIOS1IOHandler : public AbstractIOHandler
return "ADIOS1";
}

std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &) override;

void enqueue(IOTask const &) override;

Expand All @@ -72,7 +72,7 @@ class OPENPMDAPI_EXPORT ADIOS1IOHandler : public AbstractIOHandler
return "DUMMY_ADIOS1";
}

std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &) override;

private:
std::unique_ptr<ADIOS1IOHandlerImpl> m_impl;
Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/IO/ADIOS/ADIOS1IOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class OPENPMDAPI_EXPORT ADIOS1IOHandlerImpl

virtual void init();

std::future<void> flush() override;
std::future<void> flush();

virtual int64_t open_write(Writable *);
virtual ADIOS_FILE *open_read(std::string const &name);
Expand Down
6 changes: 3 additions & 3 deletions include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class ADIOS2IOHandlerImpl

~ADIOS2IOHandlerImpl() override;

std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &);

void
createFile(Writable *, Parameter<Operation::CREATE_FILE> const &) override;
Expand Down Expand Up @@ -1239,7 +1239,7 @@ class ADIOS2IOHandler : public AbstractIOHandler
// we must not throw in a destructor
try
{
this->flush();
this->flush(internal::defaultFlushParams);
}
catch (std::exception const &ex)
{
Expand Down Expand Up @@ -1278,6 +1278,6 @@ class ADIOS2IOHandler : public AbstractIOHandler
return "ADIOS2";
}

std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &) override;
}; // ADIOS2IOHandler
} // namespace openPMD
2 changes: 1 addition & 1 deletion include/openPMD/IO/ADIOS/ParallelADIOS1IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class OPENPMDAPI_EXPORT ParallelADIOS1IOHandler : public AbstractIOHandler
return "MPI_ADIOS1";
}

std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &) override;
#if openPMD_HAVE_ADIOS1
void enqueue(IOTask const &) override;
#endif
Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/IO/ADIOS/ParallelADIOS1IOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class OPENPMDAPI_EXPORT ParallelADIOS1IOHandlerImpl

virtual void init();

std::future<void> flush() override;
std::future<void> flush();

virtual int64_t open_write(Writable *);
virtual ADIOS_FILE *open_read(std::string const &name);
Expand Down
28 changes: 26 additions & 2 deletions include/openPMD/IO/AbstractIOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ class unsupported_data_error : public std::runtime_error
{}
};

enum class FlushMode : bool
{
NonCollective,
Collective
};

/**
* @brief Determine what items should be flushed upon Series::flush()
*
Expand Down Expand Up @@ -87,6 +93,25 @@ enum class FlushLevel : unsigned char
SkeletonOnly
};

namespace internal
{
/**
* Parameters recursively passed through the openPMD hierarchy when
* flushing.
*
*/
struct FlushParams
{
FlushLevel flushLevel = FlushLevel::InternalFlush;
FlushMode flushMode = FlushMode::NonCollective;
};

/*
* To be used for reading
*/
constexpr FlushParams defaultFlushParams{};
} // namespace internal

/** Interface for communicating between logical and physically persistent data.
*
* Input and output operations are channeled through a task queue that is
Expand Down Expand Up @@ -123,7 +148,7 @@ class AbstractIOHandler
* @return Future indicating the completion state of the operation for
* backends that decide to implement this operation asynchronously.
*/
virtual std::future<void> flush() = 0;
virtual std::future<void> flush(internal::FlushParams const &) = 0;

/** The currently used backend */
virtual std::string backendName() const = 0;
Expand All @@ -132,7 +157,6 @@ class AbstractIOHandler
Access const m_backendAccess;
Access const m_frontendAccess;
std::queue<IOTask> m_work;
FlushLevel m_flushLevel = FlushLevel::InternalFlush;
}; // AbstractIOHandler

} // namespace openPMD
2 changes: 1 addition & 1 deletion include/openPMD/IO/AbstractIOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class AbstractIOHandlerImpl

virtual ~AbstractIOHandlerImpl() = default;

virtual std::future<void> flush()
std::future<void> flush()
{
using namespace auxiliary;

Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/IO/DummyIOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ class DummyIOHandler : public AbstractIOHandler
/** No-op consistent with the IOHandler interface to enable library use
* without IO.
*/
std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &) override;
}; // DummyIOHandler
} // namespace openPMD
2 changes: 1 addition & 1 deletion include/openPMD/IO/HDF5/HDF5IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class HDF5IOHandler : public AbstractIOHandler
return "HDF5";
}

std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &) override;

private:
std::unique_ptr<HDF5IOHandlerImpl> m_impl;
Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/IO/HDF5/ParallelHDF5IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ParallelHDF5IOHandler : public AbstractIOHandler
return "MPI_HDF5";
}

std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &) override;

private:
std::unique_ptr<ParallelHDF5IOHandlerImpl> m_impl;
Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/IO/JSON/JSONIOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class JSONIOHandler : public AbstractIOHandler
return "JSON";
}

std::future<void> flush() override;
std::future<void> flush(internal::FlushParams const &) override;

private:
JSONIOHandlerImpl m_impl;
Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl

void listAttributes(Writable *, Parameter<Operation::LIST_ATTS> &) override;

std::future<void> flush() override;
std::future<void> flush();

private:
using FILEHANDLE = std::fstream;
Expand Down
9 changes: 5 additions & 4 deletions include/openPMD/Iteration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,11 @@ class Iteration : public Attributable
return *m_iterationData;
}

void flushFileBased(std::string const &, uint64_t);
void flushGroupBased(uint64_t);
void flushVariableBased(uint64_t);
void flush();
void flushFileBased(
std::string const &, uint64_t, internal::FlushParams const &);
void flushGroupBased(uint64_t, internal::FlushParams const &);
void flushVariableBased(uint64_t, internal::FlushParams const &);
void flush(internal::FlushParams const &);
void deferParseAccess(internal::DeferredParseAccess);
/*
* Control flow for read(), readFileBased(), readGroupBased() and
Expand Down
3 changes: 2 additions & 1 deletion include/openPMD/Mesh.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ class Mesh : public BaseRecord<MeshRecordComponent>
private:
Mesh();

void flush_impl(std::string const &) override;
void
flush_impl(std::string const &, internal::FlushParams const &) override;
void read() override;
}; // Mesh

Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/ParticleSpecies.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ParticleSpecies : public Container<Record>
ParticleSpecies();

void read();
void flush(std::string const &) override;
void flush(std::string const &, internal::FlushParams const &) override;

/**
* @brief Check recursively whether this ParticleSpecies is dirty.
Expand Down
3 changes: 2 additions & 1 deletion include/openPMD/Record.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class Record : public BaseRecord<RecordComponent>
private:
Record();

void flush_impl(std::string const &) override;
void
flush_impl(std::string const &, internal::FlushParams const &) override;
void read() override;
}; // Record

Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/RecordComponent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class RecordComponent : public BaseRecordComponent
static constexpr char const *const SCALAR = "\vScalar";

private:
void flush(std::string const &);
void flush(std::string const &, internal::FlushParams const &);
virtual void read();

/**
Expand Down
16 changes: 8 additions & 8 deletions include/openPMD/RecordComponent.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ RecordComponent::storeChunk( Offset o, Extent e, F && createBuffer )
* Flush the openPMD hierarchy to the backend without flushing any actual
* data yet.
*/
seriesFlush( FlushLevel::SkeletonOnly );
seriesFlush( {FlushLevel::SkeletonOnly, FlushMode::NonCollective} );

size_t size = 1;
for( auto ext : e )
Expand All @@ -305,16 +305,16 @@ RecordComponent::storeChunk( Offset o, Extent e, F && createBuffer )
getBufferView.offset = o;
getBufferView.extent = e;
getBufferView.dtype = getDatatype();
IOHandler()->enqueue( IOTask( this, getBufferView ) );
IOHandler()->flush();
IOHandler()->enqueue(IOTask(this, getBufferView));
IOHandler()->flush(internal::defaultFlushParams);
auto &out = *getBufferView.out;
if( !out.backendManagedBuffer )
if (!out.backendManagedBuffer)
{
auto data = std::forward< F >( createBuffer )( size );
out.ptr = static_cast< void * >( data.get() );
storeChunk( std::move( data ), std::move( o ), std::move( e ) );
auto data = std::forward<F>(createBuffer)(size);
out.ptr = static_cast<void *>(data.get());
storeChunk(std::move(data), std::move(o), std::move(e));
}
return DynamicMemoryView< T >{ std::move( getBufferView ), size, *this };
return DynamicMemoryView<T>{std::move(getBufferView), size, *this};
}

template< typename T >
Expand Down
14 changes: 10 additions & 4 deletions include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ class Series : public Attributable

/** Execute all required remaining IO operations to write or read data.
*/
void flush();
void flush(FlushMode = FlushMode::NonCollective);

/**
* @brief Entry point to the reading end of the streaming API.
Expand Down Expand Up @@ -539,17 +539,23 @@ OPENPMD_private
std::future<void> flush_impl(
iterations_iterator begin,
iterations_iterator end,
FlushLevel level,
internal::FlushParams flushParams,
bool flushIOHandler = true);
void flushFileBased(iterations_iterator begin, iterations_iterator end);
void flushFileBased(
iterations_iterator begin,
iterations_iterator end,
internal::FlushParams flushParams);
/*
* Group-based and variable-based iteration layouts share a lot of logic
* (realistically, the variable-based iteration layout only throws out
* one layer in the hierarchy).
* As a convention, methods that deal with both layouts are called
* .*GorVBased, short for .*GroupOrVariableBased
*/
void flushGorVBased(iterations_iterator begin, iterations_iterator end);
void flushGorVBased(
iterations_iterator begin,
iterations_iterator end,
internal::FlushParams flushParams);
void flushMeshesPath();
void flushParticlesPath();
void readFileBased();
Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/Span.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ template <typename T> class DynamicMemoryView
// might need to update
m_recordComponent.IOHandler()->enqueue(
IOTask(&m_recordComponent, m_param));
m_recordComponent.IOHandler()->flush();
m_recordComponent.IOHandler()->flush(internal::defaultFlushParams);
}
return Span<T>{static_cast<T *>(m_param.out->ptr), m_size};
}
Expand Down
6 changes: 3 additions & 3 deletions include/openPMD/backend/Attributable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class Attributable
* an object that has no parent, which is the Series object, and flush()-es
* it.
*/
void seriesFlush();
void seriesFlush(FlushMode = FlushMode::NonCollective);

/** String serialization to describe an Attributable
*
Expand Down Expand Up @@ -266,9 +266,9 @@ OPENPMD_protected
Iteration &containingIteration();
/** @} */

void seriesFlush(FlushLevel);
void seriesFlush(internal::FlushParams);

void flushAttributes();
void flushAttributes(internal::FlushParams const &);
enum ReadMode
{
/**
Expand Down
18 changes: 10 additions & 8 deletions include/openPMD/backend/BaseRecord.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ template <typename T_elem> class BaseRecord : public Container<T_elem>
void readBase();

private:
void flush(std::string const &) final;
virtual void flush_impl(std::string const &) = 0;
void flush(std::string const &, internal::FlushParams const &) final;
virtual void
flush_impl(std::string const &, internal::FlushParams const &) = 0;
virtual void read() = 0;

/**
Expand Down Expand Up @@ -248,7 +249,7 @@ BaseRecord<T_elem>::erase(key_type const &key)
Parameter<Operation::DELETE_DATASET> dDelete;
dDelete.name = ".";
this->IOHandler()->enqueue(IOTask(&rc, dDelete));
this->IOHandler()->flush();
this->IOHandler()->flush(internal::defaultFlushParams);
}
res = Container<T_elem>::erase(key);
}
Expand Down Expand Up @@ -278,7 +279,7 @@ BaseRecord<T_elem>::erase(iterator res)
Parameter<Operation::DELETE_DATASET> dDelete;
dDelete.name = ".";
this->IOHandler()->enqueue(IOTask(&rc, dDelete));
this->IOHandler()->flush();
this->IOHandler()->flush(internal::defaultFlushParams);
}
ret = Container<T_elem>::erase(res);
}
Expand Down Expand Up @@ -311,7 +312,7 @@ template <typename T_elem> inline void BaseRecord<T_elem>::readBase()

aRead.name = "unitDimension";
this->IOHandler()->enqueue(IOTask(this, aRead));
this->IOHandler()->flush();
this->IOHandler()->flush(internal::defaultFlushParams);
if (*aRead.dtype == DT::ARR_DBL_7)
this->setAttribute(
"unitDimension",
Expand All @@ -336,7 +337,7 @@ template <typename T_elem> inline void BaseRecord<T_elem>::readBase()

aRead.name = "timeOffset";
this->IOHandler()->enqueue(IOTask(this, aRead));
this->IOHandler()->flush();
this->IOHandler()->flush(internal::defaultFlushParams);
if (*aRead.dtype == DT::FLOAT)
this->setAttribute(
"timeOffset", Attribute(*aRead.resource).template get<float>());
Expand All @@ -349,15 +350,16 @@ template <typename T_elem> inline void BaseRecord<T_elem>::readBase()
}

template <typename T_elem>
inline void BaseRecord<T_elem>::flush(std::string const &name)
inline void BaseRecord<T_elem>::flush(
std::string const &name, internal::FlushParams const &flushParams)
{
if (!this->written() && this->empty())
throw std::runtime_error(
"A Record can not be written without any contained "
"RecordComponents: " +
name);

this->flush_impl(name);
this->flush_impl(name, flushParams);
// flush_impl must take care to correctly set the dirty() flag so this
// method doesn't do it
}
Expand Down
Loading

0 comments on commit cb3ed7b

Please sign in to comment.