Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.0.0 #32

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Better logging (#31)
* Switch to slf4j

* Adjust some levels

* Fix some JsonBox bugs and add logs
  • Loading branch information
SpencerPark authored Feb 10, 2024
commit 242f8f3ccddf03322724ade9af70791462dfa50a
6 changes: 4 additions & 2 deletions basekernel/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@ plugins {
}

group = "io.github.spencerpark"
version = "2.3.0-SNAPSHOT"
version = "3.0.0-SNAPSHOT"

repositories {
mavenLocal()
mavenCentral()
}

dependencies {
implementation(project(":jupyter-jvm-runtime"))
api(project(":jupyter-jvm-runtime"))
implementation(libs.jeromq)
api(libs.gson)
api(libs.slf4jApi)

testImplementation(testLibs.junit)
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
testImplementation(testLibs.hamcrest)
testRuntimeOnly(testLibs.slf4jImpl)

testImplementation(testLibs.jimfs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@
import io.github.spencerpark.jupyter.messages.MessageType;
import io.github.spencerpark.jupyter.messages.publish.PublishStatus;
import io.github.spencerpark.jupyter.messages.reply.ErrorReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Deque;
import java.util.LinkedList;

public class DefaultReplyEnvironment implements ReplyEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(DefaultReplyEnvironment.class);

private final JupyterSocket shell;
private final JupyterSocket iopub;

private final MessageContext context;

private Deque<Runnable> deferred = new LinkedList<>();
private final Deque<Runnable> deferred = new LinkedList<>();
private boolean defer = false;

public DefaultReplyEnvironment(JupyterSocket shell, JupyterSocket iopub, MessageContext context) {
Expand Down Expand Up @@ -73,8 +77,13 @@ public void resolveDeferrals() {
if (this.defer)
throw new IllegalStateException("Reply environment is in defer mode but a resolution was request.");

while (!deferred.isEmpty())
deferred.pop().run();
while (!deferred.isEmpty()) {
try {
deferred.pop().run();
} catch (Exception e) {
LOG.warn("Ignored exception while resolving deferral for " + this.context.getHeader().getType().getName() + ":", e);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

import io.github.spencerpark.jupyter.kernel.KernelConnectionProperties;
import io.github.spencerpark.jupyter.messages.HMACGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

public class HeartbeatChannel extends JupyterSocket {
private static final Logger LOG = LoggerFactory.getLogger(HeartbeatChannel.class);

private static final long HB_DEFAULT_SLEEP_MS = 500;

private static final AtomicInteger HEARTBEAT_ID = new AtomicInteger();
Expand All @@ -18,7 +20,7 @@ public class HeartbeatChannel extends JupyterSocket {
private volatile Loop pulse;

public HeartbeatChannel(ZMQ.Context context, HMACGenerator hmacGenerator, long sleep) {
super(context, SocketType.REP, hmacGenerator, Logger.getLogger("HeartbeatChannel"));
super(context, SocketType.REP, hmacGenerator);
this.sleep = sleep;
}

Expand All @@ -38,7 +40,7 @@ public void bind(KernelConnectionProperties connProps) {
String channelThreadName = "Heartbeat-" + HEARTBEAT_ID.getAndIncrement();
String addr = JupyterSocket.formatAddress(connProps.getTransport(), connProps.getIp(), connProps.getHbPort());

logger.log(Level.INFO, String.format("Binding %s to %s.", channelThreadName, addr));
LOG.info("Binding {} to {}.", channelThreadName, addr);
super.bind(addr);

ZMQ.Poller poller = super.ctx.poller(1);
Expand All @@ -49,22 +51,22 @@ public void bind(KernelConnectionProperties connProps) {
if (events > 0) {
byte[] msg = this.recv();
if (msg == null) {
//Error during receive, just continue
super.logger.log(Level.SEVERE, "Poll returned 1 event but could not read the echo string");
// Error during receive, just continue
LOG.error("Poll returned 1 event but could not read the echo string");
return;
}
if (!this.send(msg)) {
super.logger.log(Level.SEVERE, "Could not send heartbeat reply");
LOG.error("Could not send heartbeat reply");
}
super.logger.log(Level.FINEST, "Heartbeat pulse");
LOG.trace("Heartbeat pulse");
}
});
this.pulse.onClose(() -> {
logger.log(Level.INFO, channelThreadName + " shutdown.");
LOG.info("{} shutdown.", channelThreadName);
this.pulse = null;
});
this.pulse.start();
logger.log(Level.INFO, "Polling on " + channelThreadName);
LOG.info("Polling on {}", channelThreadName);
}

@Override
Expand All @@ -80,7 +82,8 @@ public void waitUntilClose() {
if (this.pulse != null) {
try {
this.pulse.join();
} catch (InterruptedException ignored) { }
} catch (InterruptedException ignored) {
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@

import io.github.spencerpark.jupyter.kernel.KernelConnectionProperties;
import io.github.spencerpark.jupyter.messages.HMACGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;

import java.util.logging.Level;
import java.util.logging.Logger;

public class IOPubChannel extends JupyterSocket {
private static final Logger LOG = LoggerFactory.getLogger(IOPubChannel.class);

public IOPubChannel(ZMQ.Context context, HMACGenerator hmacGenerator) {
super(context, SocketType.PUB, hmacGenerator, Logger.getLogger("IOPubChannel"));
super(context, SocketType.PUB, hmacGenerator);
}

@Override
public void bind(KernelConnectionProperties connProps) {
String addr = JupyterSocket.formatAddress(connProps.getTransport(), connProps.getIp(), connProps.getIopubPort());

logger.log(Level.INFO, String.format("Binding iopub to %s.", addr));
LOG.info("Binding iopub to {}.", addr);
super.bind(addr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,17 @@
import io.github.spencerpark.jupyter.messages.adapters.MessageTypeAdapter;
import io.github.spencerpark.jupyter.messages.adapters.PublishStatusAdapter;
import io.github.spencerpark.jupyter.messages.adapters.ReplyTypeAdapter;
import io.github.spencerpark.jupyter.messages.debug.DapCommandType;
import io.github.spencerpark.jupyter.messages.debug.DapEventType;
import io.github.spencerpark.jupyter.messages.debug.DapProtocolMessage;
import io.github.spencerpark.jupyter.messages.debug.adapters.DapCommandTypeAdapter;
import io.github.spencerpark.jupyter.messages.debug.adapters.DapEventTypeAdapter;
import io.github.spencerpark.jupyter.messages.debug.adapters.DapProtocolMessageAdapter;
import io.github.spencerpark.jupyter.messages.publish.PublishStatus;
import io.github.spencerpark.jupyter.messages.reply.ErrorReply;
import io.github.spencerpark.jupyter.messages.request.HistoryRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;

Expand All @@ -37,9 +45,10 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

public abstract class JupyterSocket extends ZMQ.Socket {
private static final Logger LOG = LoggerFactory.getLogger(JupyterSocket.class);

protected static String formatAddress(String transport, String ip, int port) {
return transport + "://" + ip + ":" + Integer.toString(port);
}
Expand All @@ -51,6 +60,9 @@ protected static String formatAddress(String transport, String ip, int port) {
private static final Gson replyGson = JsonBox.registerTypeAdapters(new GsonBuilder())
.registerTypeAdapter(HistoryEntry.class, HistoryEntryAdapter.INSTANCE)
.registerTypeAdapter(ExpressionValue.class, ExpressionValueAdapter.INSTANCE)
.registerTypeAdapter(DapCommandType.class, DapCommandTypeAdapter.INSTANCE)
.registerTypeAdapter(DapEventType.class, DapEventTypeAdapter.INSTANCE)
.registerTypeAdapter(DapProtocolMessage.class, DapProtocolMessageAdapter.INSTANCE)
.create();
private static final Gson gson = JsonBox.registerTypeAdapters(new GsonBuilder())
.registerTypeAdapter(KernelTimestamp.class, KernelTimestampAdapter.INSTANCE)
Expand All @@ -59,25 +71,22 @@ protected static String formatAddress(String transport, String ip, int port) {
.registerTypeAdapter(PublishStatus.class, PublishStatusAdapter.INSTANCE)
.registerTypeAdapter(HistoryRequest.class, HistoryRequestAdapter.INSTANCE)
.registerTypeHierarchyAdapter(ReplyType.class, new ReplyTypeAdapter(replyGson))
.registerTypeAdapter(DapCommandType.class, DapCommandTypeAdapter.INSTANCE)
.registerTypeAdapter(DapEventType.class, DapEventTypeAdapter.INSTANCE)
.registerTypeAdapter(DapProtocolMessage.class, DapProtocolMessageAdapter.INSTANCE)
//.setPrettyPrinting()
.create();
private static final byte[] EMPTY_JSON_OBJECT = "{}".getBytes(UTF_8);
private static final Type JSON_OBJ_AS_MAP = new TypeToken<Map<String, Object>>() {
}.getType();

public static final Logger JUPYTER_LOGGER = Logger.getLogger("Jupyter");
private static final Type JSON_OBJ_AS_MAP = new TypeToken<Map<String, Object>>() {}.getType();

protected final ZMQ.Context ctx;
protected final HMACGenerator hmacGenerator;
protected final Logger logger;
protected boolean closed;

protected JupyterSocket(ZMQ.Context context, SocketType type, HMACGenerator hmacGenerator, Logger logger) {
protected JupyterSocket(ZMQ.Context context, SocketType type, HMACGenerator hmacGenerator) {
super(context, type);
this.ctx = context;
this.hmacGenerator = hmacGenerator;
logger.setParent(JUPYTER_LOGGER);
this.logger = logger;
this.closed = false;
}

Expand Down Expand Up @@ -125,7 +134,9 @@ public synchronized Message<?> readMessage() {
@SuppressWarnings("unchecked")
Message<?> message = new Message(identities, header, parentHeader, metadata, content, blobs);

logger.finer(() -> "Received from " + super.base().getSocketOptx(zmq.ZMQ.ZMQ_LAST_ENDPOINT) + ":\n" + gson.toJson(message));
if (LOG.isTraceEnabled()) {
LOG.trace("Received from " + super.base().getSocketOptx(zmq.ZMQ.ZMQ_LAST_ENDPOINT) + ":\n" + gson.toJson(message));
}

return message;
}
Expand Down Expand Up @@ -154,7 +165,9 @@ public synchronized void sendMessage(Message<?> message) {

String hmac = hmacGenerator.calculateSignature(headerRaw, parentHeaderRaw, metadata, content);

logger.finer(() -> "Sending to " + super.base().getSocketOptx(zmq.ZMQ.ZMQ_LAST_ENDPOINT) + ":\n" + gson.toJson(message));
if (LOG.isTraceEnabled()) {
LOG.trace("Sending to " + super.base().getSocketOptx(zmq.ZMQ.ZMQ_LAST_ENDPOINT) + ":\n" + gson.toJson(message));
}

message.getIdentities().forEach(super::sendMore);
super.sendMore(IDENTITY_BLOB_DELIMITER);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package io.github.spencerpark.jupyter.channels;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.LongSupplier;
import java.util.function.ToLongFunction;
import java.util.logging.Logger;

public class Loop extends Thread {
private final Logger logger;
private static final Logger LOG = LoggerFactory.getLogger(Loop.class);

private volatile boolean running = false;
private final LongSupplier loopBody;
Expand All @@ -29,8 +31,6 @@ public Loop(String name, LongSupplier target) {
this.loopBody = target;

this.runNextQueue = new LinkedBlockingQueue<>();

this.logger = Logger.getLogger("Loop-" + name);
}

public void onClose(Runnable callback) {
Expand Down Expand Up @@ -91,36 +91,36 @@ public void run() {
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
this.logger.info("Loop interrupted. Stopping...");
LOG.info("Loop interrupted. Stopping...");
this.running = false;
}
} else if (sleep < 0) {
this.logger.info("Loop interrupted by a negative sleep request. Stopping...");
LOG.info("Loop interrupted by a negative sleep request. Stopping...");
this.running = false;
}
}

this.logger.info("Running loop shutdown callback.");
LOG.info("Running loop shutdown callback.");

if (this.onCloseCb != null)
this.onCloseCb.run();
this.onCloseCb = null;

this.logger.info("Loop stopped.");
LOG.info("Loop stopped.");
}

@Override
public synchronized void start() {
this.logger.info("Loop starting...");
LOG.info("Loop starting...");

this.running = true;
super.start();

this.logger.info("Loop started.");
LOG.info("Loop started.");
}

public void shutdown() {
this.running = false;
this.logger.info("Loop shutdown.");
LOG.info("Loop shutdown.");
}
}
Loading