Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add simple RMGIpc for remage-cpp -> python intercom #246

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions include/RMGIpc.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (C) 2025 Manuel Huber
//
// This program is free software: you can redistribute it and/or modify it under
// the terms of the GNU Lesser General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option) any
// later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
// FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
// details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

#ifndef _RMG_IPC_HH_
#define _RMG_IPC_HH_

#include <atomic>
#include <string>

class RMGIpc final {

public:

RMGIpc() = delete;

static void Setup(int ipc_pipe_fd);

static std::string CreateMessage(const std::string& command, const std::string& param) {
// \x1e (record separator = end of entry)
// TODO: also implement a CreateMessage variant with \x1f (unit separator = key/value delimiter)
return command + "\x1e" + param;
}

static bool SendIpcNonBlocking(std::string msg);
static bool SendIpcBlocking(std::string msg);

inline static std::atomic<bool> fWaitForIpc = false;

private:

inline static int fIpcFd = -1;
};

#endif

// vim: tabstop=2 shiftwidth=2 expandtab
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ classifiers = [
dynamic = ["version"]
dependencies = [
"legend-pydataobj",
"colorlog",
]

[project.optional-dependencies]
Expand Down
83 changes: 79 additions & 4 deletions python/remage/cli.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,49 @@
from __future__ import annotations

import logging
import os
import shutil
import signal
import subprocess
import sys
import threading
from pathlib import Path

import colorlog

from .cpp_config import REMAGE_CPP_EXE_PATH
from .ipc import ipc_thread_fn

log = logging.getLogger(__name__)


def _run_remage_cpp() -> int:
"""run the remage-cpp executable and return the exit code as seen in bash."""

# open pipe for IPC C++ -> python.
pipe_r, pipe_w = os.pipe()
os.set_inheritable(pipe_r, False)
os.set_inheritable(pipe_w, True)

# reuse our own argv[0] to have helpful help messages.
# but is is expanded (by the kernel?), so find out if we are in $PATH.
argv = list(sys.argv)
exe_name = Path(argv[0]).name
if shutil.which(exe_name) == sys.argv[0]:
argv[0] = exe_name
proc = subprocess.Popen(argv, executable=REMAGE_CPP_EXE_PATH)

if "--pipe-fd" in argv[1:]:
msg = "cannot pass --pipe-fd"
raise RuntimeError(msg)

proc = subprocess.Popen(
[argv[0], "--pipe-fd", str(pipe_w), *argv[1:]],
executable=REMAGE_CPP_EXE_PATH,
pass_fds=(pipe_w,),
)

# close _our_ writing end of the pipe.
os.close(pipe_w)

# propagate signals to the C++ executable.
def new_signal_handler(sig: int, _):
Expand All @@ -31,29 +57,78 @@ def new_signal_handler(sig: int, _):
signal.SIGTSTP, # SIGSTOP cannot be caught, and will do nothing...
signal.SIGCONT,
signal.SIGUSR1,
signal.SIGUSR2,
# signal.SIGUSR2 is for internal IPC communication.
signal.SIGWINCH,
]

old_signal_handlers = [signal.signal(sig, new_signal_handler) for sig in signals]

# start a thread listening for IPC messages.
# remage-cpp will only continue to do real work after we handled one sync message.
unhandled_ipc_messages = []
ipc_thread = threading.Thread(
target=ipc_thread_fn, args=(pipe_r, proc, unhandled_ipc_messages)
)
ipc_thread.start()

# wait for C++ executable to finish.
proc.wait()

# restore signal handlers again, before running more python code.
for sig, handler in zip(signals, old_signal_handlers):
signal.signal(sig, handler)

return 128 - proc.returncode if proc.returncode < 0 else proc.returncode
# close the IPC pipe and end IPC handling.
ipc_thread.join()

ec = 128 - proc.returncode if proc.returncode < 0 else proc.returncode
return ec, unhandled_ipc_messages


def _setup_log() -> None:
"""Setup a colored logger for this package."""

logger = logging.getLogger("remage")

if sys.stderr.isatty():
fmt = "%(log_color)s[%(levelname)s %(name)s%(reset)s %(message)s"

handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(fmt))
logger.addHandler(handler)

logger.setLevel(logging.DEBUG)

return logger


def remage_cli() -> None:
ec = _run_remage_cpp()
logger = _setup_log()

ec, ipc_info = _run_remage_cpp()
if ec not in [0, 2]:
# remage had an error (::fatal -> ec==134 (SIGABRT); ::error -> ec==1)
# ec==2 is just a warning, continue in the execution flow.
sys.exit(ec)

# setup logging based on log level from C++.
log_level = next(
(msg[1] for msg in ipc_info if len(msg) == 2 and msg[0] == "loglevel"),
"summary",
)
levels_rmg_to_py = {
"debug": logging.DEBUG,
"detail": logging.INFO,
"summary": logging.INFO,
"warning": logging.WARNING,
"error": logging.ERROR,
"fatal": logging.CRITICAL,
"nothing": logging.CRITICAL,
}
logger.setLevel(levels_rmg_to_py[log_level])

# TODO: further post-processing
output_files = [msg[1] for msg in ipc_info if len(msg) == 2 and msg[0] == "output"]
print("output files to merge:", *output_files) # noqa: T201

sys.exit(ec)
65 changes: 65 additions & 0 deletions python/remage/ipc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import annotations

import logging
import os
import signal
import subprocess

log = logging.getLogger(__name__)


def handle_ipc_message(msg: bytes) -> tuple[bool, list]:
# parse the IPC message structure.
is_blocking = False
msg = msg[0:-1] # strip trailing ASCII GS ("group separator")
if msg[-1] == "\x05": # ASCII ENQ ("enquiry")
msg = msg[0:-1]
is_blocking = True
msg = msg.split("\x1e") # ASCII RS ("record separator")
msg = [record.split("\x1f") for record in msg] # ASCII US ("unit separator")
msg = [tuple(record) if len(record) > 1 else record[0] for record in msg]

msg_ret = msg
# handle blocking messages, if necessary.
if msg == ["ipc_available", "1"]:
msg_ret = None
elif is_blocking:
log.warning("unhandled blocking IPC message %s", str(msg))

return is_blocking, msg_ret


def ipc_thread_fn(
pipe_r: int, proc: subprocess.Popen, unhandled_ipc_messages: list
) -> None:
try:
msg_buf = b""
with os.fdopen(pipe_r, "br", 0) as pipe_file:
while True:
line = pipe_file.read(1024)
if not line:
return

msg_buf += line
if b"\x1d" not in msg_buf:
# not a full message yet.
continue

# handle message buffer.
for _ in range(msg_buf.count(b"\x1d")):
msg_end = msg_buf.index(b"\x1d") + 1
# note: switching between binary and strting mode is safe here. ASCII
# bytes are binary equal to their utf-8 encoding, and their bytes will
# not be part of any utf-8 multibyte sequence.
msg = msg_buf[0:msg_end].decode("utf-8")
msg_buf = msg_buf[msg_end:]

is_blocking, unhandled_msg = handle_ipc_message(msg)
if unhandled_msg is not None:
unhandled_ipc_messages.append(unhandled_msg)
if is_blocking:
proc.send_signal(signal.SIGUSR2) # send continuation signal.
except OSError as e:
if e.errno == 9: # bad file descriptor.
return
raise e
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ set(PROJECT_PUBLIC_HEADERS
${_root}/include/RMGGermaniumOutputScheme.hh
${_root}/include/RMGGrabmayrGCReader.hh
${_root}/include/RMGIsotopeFilterOutputScheme.hh
${_root}/include/RMGIpc.hh
${_root}/include/RMGLog.hh
${_root}/include/RMGManager.hh
${_root}/include/RMGMasterGenerator.hh
Expand Down Expand Up @@ -59,6 +60,7 @@ set(PROJECT_SOURCES
${_root}/src/RMGGermaniumDetector.cc
${_root}/src/RMGGermaniumOutputScheme.cc
${_root}/src/RMGGrabmayrGCReader.cc
${_root}/src/RMGIpc.cc
${_root}/src/RMGIsotopeFilterOutputScheme.cc
${_root}/src/RMGLog.cc
${_root}/src/RMGManager.cc
Expand Down
105 changes: 105 additions & 0 deletions src/RMGIpc.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright (C) 2025 Manuel Huber
//
// This program is free software: you can redistribute it and/or modify it under
// the terms of the GNU Lesser General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option) any
// later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
// FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
// details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

#include "RMGIpc.hh"

#include <csignal>
#include <unistd.h>

#include "G4Threading.hh"

#include "RMGLog.hh"

namespace {
void ipc_signal_handler(int signal) {
if (!RMGIpc::fWaitForIpc.is_lock_free()) return;
RMGIpc::fWaitForIpc = true;
}
} // namespace

void RMGIpc::Setup(int ipc_pipe_fd) {
if (!G4Threading::IsMasterThread()) {
RMGLog::OutDev(RMGLog::fatal, "can only be used on the master thread");
}

fIpcFd = ipc_pipe_fd;
if (fIpcFd < 0) return;

struct sigaction sig{};
sig.sa_handler = &ipc_signal_handler;
sigemptyset(&sig.sa_mask);
sig.sa_flags = 0;

if (sigaction(SIGUSR2, &sig, nullptr) != 0) {
RMGLog::Out(RMGLog::error, "IPC SIGUSR2 signal handler install failed.");
fIpcFd = -1;
}

// note: this is just a test for the blocking mode.
if (!SendIpcBlocking(CreateMessage("ipc_available", "1"))) {
RMGLog::Out(RMGLog::error, "blocking test IPC call failed, disabling.");
fIpcFd = -1;
}
}

bool RMGIpc::SendIpcBlocking(std::string msg) {
if (!G4Threading::IsMasterThread()) {
RMGLog::OutDev(RMGLog::fatal, "can only be used on the master thread");
}
if (fIpcFd < 0) return false;

msg += "\x05"; // ASCII ENQ enquiry = ask for continuation.

sigset_t sig2, sigold;
sigemptyset(&sig2);
sigaddset(&sig2, SIGUSR2);

// Block SIGUSR2 _before_ writing message.
pthread_sigmask(SIG_BLOCK, &sig2, &sigold);

if (!SendIpcNonBlocking(msg)) {
pthread_sigmask(SIG_UNBLOCK, &sig2, nullptr);
return false;
}

// now wait for continuation signal.
while (!fWaitForIpc) sigsuspend(&sigold);
fWaitForIpc = false;
pthread_sigmask(SIG_UNBLOCK, &sig2, nullptr);

return true;
}

bool RMGIpc::SendIpcNonBlocking(std::string msg) {
if (fIpcFd < 0) return false;

msg += "\x1d"; // ASCII GS group separator = end of message.
auto len = write(fIpcFd, msg.c_str(), msg.size());

if (len < 0) {
RMGLog::Out(RMGLog::error, "IPC message transmit failed with errno=", errno);
return false;
}

if (len != msg.size()) {
// TODO: better handle this case.
RMGLog::Out(RMGLog::error, "IPC message not fully transmitted. missing=", msg.size() - len);
return false;
}

return true;
}

// vim: tabstop=2 shiftwidth=2 expandtab
Loading
Loading