Skip to content

Commit

Permalink
ledger: mgmt protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
tianyuan129 committed Sep 20, 2022
1 parent 723c089 commit dee7ff3
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 58 deletions.
4 changes: 0 additions & 4 deletions src/dag/edge-state-list.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@

namespace cledger::dag {

static const std::string stateListNameHeader = "/32=EdgeStateList";

static const std::string stateListNull = "/32=EdgeStateList/null";

enum : uint32_t {
TLV_EDGE_STATE_LIST_TYPE = 351,
TLV_EDGE_STATE_LIST_NAME = 352,
Expand Down
2 changes: 2 additions & 0 deletions src/dag/edge-state-list.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "edge-state.hpp"
namespace cledger::dag {

const std::string stateListNameHeader = "/32=EdgeStateList";
const std::string stateListNull = "/32=EdgeStateList/null";
const uint32_t globalTracker = 0;

struct EdgeStateList
Expand Down
3 changes: 1 addition & 2 deletions src/dag/edge-state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

namespace cledger::dag {

static const std::string stateNameHeader = "/32=EdgeState";

enum : uint32_t {
TLV_EDGE_STATE_TYPE= 311,
TLV_EDGE_STATE_STATUS = 312,
Expand Down Expand Up @@ -97,6 +95,7 @@ std::ostream&
operator<<(std::ostream& os, const EdgeState& state)
{
os << "Edge State Name: " << state.stateName << "\n";
os << " Record Payload Data Name:" << Data(Block(state.record.getPayload())).getName() << "\n";
for (auto& p : state.record.getPointers()) {
os << " Pointer: " << p << "\n";
}
Expand Down
2 changes: 2 additions & 0 deletions src/dag/edge-state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "record.hpp"
namespace cledger::dag {

const std::string stateNameHeader = "/32=EdgeState";

struct EdgeState
{
enum Status {
Expand Down
54 changes: 37 additions & 17 deletions src/ledger-module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ LedgerModule::registerPrefix()
{
// register prefixes
Name prefix = m_config.ledgerPrefix;
// let's first use "LEDGER" in protocol
// handle record zone
prefix.append("LEDGER");
auto prefixId = m_face.registerPrefix(
prefix,
Expand All @@ -109,6 +109,7 @@ LedgerModule::registerPrefix()
);
m_handle.handlePrefix(prefixId);

// serving own certificate
auto cert = m_keyChain.getPib().getIdentity(Name(m_config.ledgerPrefix).append(m_config.instanceSuffix))
.getDefaultKey()
.getDefaultCertificate();
Expand All @@ -121,6 +122,25 @@ LedgerModule::registerPrefix()
[this] (auto&&, const auto& reason) { onRegisterFailed(reason); }
);
m_handle.handlePrefix(prefixId);

// serving internal data structure: EdgeState and EdgeStateList
auto dsResponder = [this] (auto& name) {
auto filterId = m_face.setInterestFilter(
name,
[this] (auto&&, auto& i) {
try {
auto block = m_storage->getBlock(i.getName());
sendResponse(i.getName(), block, true);
}
catch (const std::runtime_error&) {
sendNack(i.getName());
}
}
);
m_handle.handleFilter(filterId);
};
dsResponder(dag::stateNameHeader);
dsResponder(dag::stateListNameHeader);
}

AppendStatus
Expand Down Expand Up @@ -199,16 +219,7 @@ LedgerModule::onQuery(const Interest& query)
encoder(content, dag::fromStateName(des));
}
}

Name dataName(query.getName());
dataName.append("data").appendTimestamp();
Data data(dataName);
// this deserves some considerations
data.setFreshnessPeriod(m_config.nackFreshnessPeriod);
data.setContent(content);
m_keyChain.sign(data, signingByIdentity(Name(m_config.ledgerPrefix).append(m_config.instanceSuffix)));
NDN_LOG_TRACE("Ledger replies with: " << data.getName());
m_face.put(data);
sendResponse(query.getName(), content);
}
catch (const std::exception& e) {
NDN_LOG_DEBUG("Ledger storage cannot get the Data for reason: " << e.what());
Expand All @@ -224,12 +235,7 @@ LedgerModule::onQuery(const Interest& query)
catch (std::exception& e) {
NDN_LOG_DEBUG("Ledger storage cannot get the Data for reason: " << e.what());
// reply with app layer nack
Nack nack;
auto data = nack.prepareData(query.getName(), time::toUnixTimestamp(time::system_clock::now()));
data->setFreshnessPeriod(m_config.nackFreshnessPeriod);
m_keyChain.sign(*data, signingByIdentity(Name(m_config.ledgerPrefix).append(m_config.instanceSuffix)));
NDN_LOG_TRACE("Ledger replies with: " << data->getName());
m_face.put(*data);
sendNack(query.getName());
}
}
}
Expand Down Expand Up @@ -410,4 +416,18 @@ LedgerModule::updateStatesTracker(const Name& stateName, bool interlocked)
m_storage->addBlock(trackerName, dag::encodeEdgeStateList(statesTracker));
}

void
LedgerModule::sendResponse(const Name& name, const Block& block, bool realtime)
{
Name dataName(name);
dataName.append("data").appendTimestamp();
Data data(dataName);
// this deserves some considerations
data.setFreshnessPeriod((realtime? 100_ms : m_config.nackFreshnessPeriod));
data.setContent(makeBinaryBlock(ndn::tlv::Content, block));
m_keyChain.sign(data, signingByIdentity(Name(m_config.ledgerPrefix).append(m_config.instanceSuffix)));
NDN_LOG_TRACE("Ledger replies with: " << data.getName());
m_face.put(data);
}

} // namespace cledger::ledger
3 changes: 3 additions & 0 deletions src/ledger-module.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class LedgerModule : boost::noncopyable
void
replyOrSendNack(const Name& name);

void
sendResponse(const Name& name, const Block& block, bool realtime = false);

void
dagHarvest();

Expand Down
93 changes: 58 additions & 35 deletions tools/ndncledger-ledger-status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,71 @@

namespace cledger::ledger {

static ndn::Face face;

static void
fetchEdgeState(const Name& ledgerPrefix, const Name& stateName, bool isBenchmark)
{
Interest listFetcher(stateName);
listFetcher.setCanBePrefix(true);
listFetcher.setForwardingHint({ledgerPrefix});
listFetcher.setMustBeFresh(true);
face.expressInterest(listFetcher,
[isBenchmark] (auto&&, auto& data) {
Block contentBlock = data.getContent();
Block stateBlock = contentBlock.blockFromValue();
dag::EdgeState state = dag::decodeEdgeState(stateBlock);
if (!isBenchmark) {
std::cerr << "***************************************\n"
<< state
<< "***************************************\n";
}
if (state.interlocked > state.created) {
std::cerr << "=======================================\n"
<< state.stateName << " Interlock Latency: "
<< ndn::time::toUnixTimestamp(state.interlocked) - ndn::time::toUnixTimestamp(state.created) << " ms\n"
<< "=======================================\n";
}
},
[] (auto&&...) {},
[] (auto&&...) {});
}

static void
fetchEdgeStateList(const Name& ledgerPrefix, bool isBenchmark)
{
Interest listFetcher(dag::toStateListName(dag::globalTracker));
listFetcher.setCanBePrefix(true);
listFetcher.setForwardingHint({ledgerPrefix});
listFetcher.setMustBeFresh(true);
face.expressInterest(listFetcher,
[ledgerPrefix, isBenchmark] (auto&&, auto& data) {
Block contentBlock = data.getContent();
Block trackerBlock = contentBlock.blockFromValue();
dag::EdgeStateList statesTracker = dag::decodeEdgeStateList(trackerBlock);
std::cerr << "There are " << statesTracker.value.size() << " EdgeStates: " << std::endl;
for (const auto& entry : statesTracker.value) {
fetchEdgeState(ledgerPrefix, entry, isBenchmark);
}
},
[] (auto&&...) {},
[] (auto&&...) {});
}

static int
main(int argc, char* argv[])
{
namespace po = boost::program_options;
std::string ledgerConfigString = "";
std::string ledgerPrefixString = "";
bool isBenchmark = false;
po::options_description description(
"Usage: ndncledger-ledger-status [-h] -c ledgerConfig\n"
"Usage: ndncledger-ledger-status [-h] -l ledgerPrefix\n"
"\n"
"Options");
description.add_options()
("help,h", "produce help message")
("ledgerConfig,c", po::value<std::string>(&ledgerConfigString),
"ledger config file name (e.g., ledger.conf.sample)")
("ledgerPrefix,l", po::value<std::string>(&ledgerPrefixString),
"ledger prefix (e.g., /ndn/site1/LEDGER)")
("benchmark,b", po::bool_switch(&isBenchmark), "only print interlock latency");
po::positional_options_description p;
po::variables_map vm;
Expand All @@ -37,40 +88,12 @@ main(int argc, char* argv[])
std::cerr << description << std::endl;
return 0;
}
if (vm.count("ledgerConfig") == 0) {
if (vm.count("ledgerPrefix") == 0) {
std::cerr << "ERROR: you must specify a ledger configuration." << std::endl;
return 2;
}

LedgerConfig config;
config.load(ledgerConfigString);
// access to corresponding memory
if (config.storageType == "storage-memory") {
std::cerr << "ERROR: does not support in-memory (default) ledger storage." << std::endl;
return 2;
}

auto storage = storage::LedgerStorage::createLedgerStorage(config.storageType, config.ledgerPrefix, config.storagePath);
Name trackerName = dag::toStateListName(dag::globalTracker);
auto trackerBlock = storage->getBlock(trackerName);
dag::EdgeStateList statesTracker = dag::decodeEdgeStateList(trackerBlock);
std::cerr << "There are " << statesTracker.value.size() << " EdgeStates: " << std::endl;
for (const auto& entry : statesTracker.value) {
auto stateBlock = storage->getBlock(entry);
dag::EdgeState state = dag::decodeEdgeState(stateBlock);
if (!isBenchmark) {
std::cerr << "***************************************\n"
<< state
<< "***************************************\n";
}
if (state.interlocked > state.created) {
std::cerr << "=======================================\n"
<< state.stateName << " Interlock Latency: "
<< ndn::time::toUnixTimestamp(state.interlocked) - ndn::time::toUnixTimestamp(state.created) << " ms\n"
<< "=======================================\n";
}

}
fetchEdgeStateList(Name(ledgerPrefixString), isBenchmark);
face.processEvents();
return 0;
}

Expand Down

0 comments on commit dee7ff3

Please sign in to comment.