diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java index 5c6454e69..b8166ed2c 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java @@ -21,7 +21,7 @@ * MessageBufferInput adapter for byte arrays */ public class ArrayBufferInput - implements MessageBufferInput + implements MessageBufferInput { private MessageBuffer buffer; private boolean isEmpty; @@ -66,9 +66,14 @@ public MessageBuffer reset(MessageBuffer buf) return old; } - public void reset(byte[] arr) + @Override + public byte[] reset(byte[] arr) { - reset(MessageBuffer.wrap(checkNotNull(arr, "input array is null"))); + final MessageBuffer messageBuffer = reset(MessageBuffer.wrap(checkNotNull(arr, "input array is null"))); + if (messageBuffer == null) { + return null; + } + return messageBuffer.array(); } public void reset(byte[] arr, int offset, int len) diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java index fd0311b83..0cb5d4238 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java @@ -23,7 +23,7 @@ * {@link MessageBufferInput} adapter for {@link java.nio.ByteBuffer} */ public class ByteBufferInput - implements MessageBufferInput + implements MessageBufferInput { private ByteBuffer input; private boolean isRead = false; @@ -39,6 +39,7 @@ public ByteBufferInput(ByteBuffer input) * @param input new buffer * @return the old buffer */ + @Override public ByteBuffer reset(ByteBuffer input) { ByteBuffer old = this.input; diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java index e8d7c1de8..ea636240e 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java @@ -26,7 +26,7 @@ * {@link MessageBufferInput} adapter for {@link java.nio.channels.ReadableByteChannel} */ public class ChannelBufferInput - implements MessageBufferInput + implements MessageBufferInput { private ReadableByteChannel channel; private final MessageBuffer buffer; @@ -49,8 +49,8 @@ public ChannelBufferInput(ReadableByteChannel channel, int bufferSize) * @param channel new channel * @return the old resource */ + @Override public ReadableByteChannel reset(ReadableByteChannel channel) - throws IOException { ReadableByteChannel old = this.channel; this.channel = channel; diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java index d605fec3a..39d93309a 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java @@ -26,7 +26,7 @@ * {@link MessageBufferInput} adapter for {@link InputStream} */ public class InputStreamBufferInput - implements MessageBufferInput + implements MessageBufferInput { private InputStream in; private final byte[] buffer; @@ -60,8 +60,8 @@ public InputStreamBufferInput(InputStream in, int bufferSize) * @param in new stream * @return the old resource */ + @Override public InputStream reset(InputStream in) - throws IOException { InputStream old = this.in; this.in = in; diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java index 77f7a06f6..716933928 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java @@ -24,7 +24,7 @@ * A MessageBufferInput implementation has control of lifecycle of the memory so that it can reuse previously * allocated memory, use memory pools, or use memory-mapped files. */ -public interface MessageBufferInput +public interface MessageBufferInput extends Closeable { /** @@ -40,6 +40,8 @@ public interface MessageBufferInput MessageBuffer next() throws IOException; + T reset(T input); + /** * Closes the input. *

diff --git a/msgpack-core/src/test/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java b/msgpack-core/src/test/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java index 10b91d20a..6b11915ea 100644 --- a/msgpack-core/src/test/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java +++ b/msgpack-core/src/test/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java @@ -24,7 +24,7 @@ * {@link MessageBufferInput} adapter for {@link MessageBufferInput} Enumeration */ public class SequenceMessageBufferInput - implements MessageBufferInput + implements MessageBufferInput { private Enumeration sequence; private MessageBufferInput input; @@ -54,6 +54,11 @@ public MessageBuffer next() throws IOException return buffer; } + @Override + public Void reset(Void input) { + throw new UnsupportedOperationException("reset"); + } + private void nextInput() throws IOException { if (input != null) { diff --git a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala index 3ea5e911d..1fb2ae4d1 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala @@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._ import scala.util.Random object MessageUnpackerTest { - class SplitMessageBufferInput(array: Array[Array[Byte]]) extends MessageBufferInput { + class SplitMessageBufferInput(array: Array[Array[Byte]]) extends MessageBufferInput[Void] { var cursor = 0 override def next(): MessageBuffer = { if (cursor < array.length) { @@ -41,6 +41,9 @@ object MessageUnpackerTest { } } + + override def reset(input: Void): Void = throw new UnsupportedOperationException("reset") + override def close(): Unit = {} } } diff --git a/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala index 42872fc44..109566973 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala @@ -26,7 +26,7 @@ class ByteStringTest extends AirSpec { private val byteString = ByteString(createMessagePackData(_.packString(unpackedString))) private def unpackString(messageBuffer: MessageBuffer) = { - val input = new MessageBufferInput { + val input = new MessageBufferInput[Void] { private var isRead = false @@ -38,6 +38,8 @@ class ByteStringTest extends AirSpec { messageBuffer } override def close(): Unit = {} + + override def reset(input: Void): Void = throw new UnsupportedOperationException("reset") } MessagePack.newDefaultUnpacker(input).unpackString() diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputLocator.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputLocator.java new file mode 100644 index 000000000..a431a4e0a --- /dev/null +++ b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputLocator.java @@ -0,0 +1,23 @@ +// +// MessagePack for Java +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.msgpack.jackson.dataformat; + +import org.msgpack.core.buffer.MessageBufferInput; + +public interface MessageBufferInputLocator +{ + MessageBufferInput get(Class clazz); +} diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputProvider.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputProvider.java new file mode 100644 index 000000000..96ce91ab8 --- /dev/null +++ b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputProvider.java @@ -0,0 +1,23 @@ +// +// MessagePack for Java +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.msgpack.jackson.dataformat; + +import org.msgpack.core.buffer.MessageBufferInput; + +interface MessageBufferInputProvider +{ + MessageBufferInput provide(); +} diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputRegistry.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputRegistry.java new file mode 100644 index 000000000..06f3409c3 --- /dev/null +++ b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputRegistry.java @@ -0,0 +1,46 @@ +// +// MessagePack for Java +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.msgpack.jackson.dataformat; + +import org.msgpack.core.buffer.MessageBufferInput; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +public class MessageBufferInputRegistry implements MessageBufferInputLocator +{ + private final Map messageBufferInputMap = new HashMap<>(1); + + @Override + public MessageBufferInput get(Class clazz) + { + return messageBufferInputMap.get(clazz); + } + + public boolean register(Class clazz, MessageBufferInputProvider provider) + { + Objects.requireNonNull(clazz, "clazz"); + Objects.requireNonNull(provider, "provider"); + + if (messageBufferInputMap.containsKey(clazz)) { + return false; + } + + messageBufferInputMap.put(clazz, provider.provide()); + return true; + } +} diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java index 2a95b69a0..9f40b0d11 100644 --- a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java +++ b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java @@ -47,8 +47,8 @@ public class MessagePackParser extends ParserMinimalBase { - private static final ThreadLocal> messageUnpackerHolder = - new ThreadLocal>(); + private static final ThreadLocal> reuseObjectHolder = + new ThreadLocal<>(); private final MessageUnpacker messageUnpacker; private static final BigInteger LONG_MIN = BigInteger.valueOf((long) Long.MIN_VALUE); @@ -126,11 +126,17 @@ public MessagePackParser( IOContext ctxt, int features, ObjectCodec objectCodec, - InputStream in, + final InputStream in, boolean reuseResourceInParser) throws IOException { - this(ctxt, features, new InputStreamBufferInput(in), objectCodec, in, reuseResourceInParser); + this(ctxt, features, new MessageBufferInputProvider() { + @Override + public MessageBufferInput provide() + { + return new InputStreamBufferInput(in); + } + }, objectCodec, in, reuseResourceInParser); } public MessagePackParser(IOContext ctxt, int features, ObjectCodec objectCodec, byte[] bytes) @@ -143,16 +149,22 @@ public MessagePackParser( IOContext ctxt, int features, ObjectCodec objectCodec, - byte[] bytes, + final byte[] bytes, boolean reuseResourceInParser) throws IOException { - this(ctxt, features, new ArrayBufferInput(bytes), objectCodec, bytes, reuseResourceInParser); + this(ctxt, features, new MessageBufferInputProvider() { + @Override + public MessageBufferInput provide() + { + return new ArrayBufferInput(bytes); + } + }, objectCodec, bytes, reuseResourceInParser); } private MessagePackParser(IOContext ctxt, int features, - MessageBufferInput input, + MessageBufferInputProvider bufferInputProvider, ObjectCodec objectCodec, Object src, boolean reuseResourceInParser) @@ -167,7 +179,7 @@ private MessagePackParser(IOContext ctxt, parsingContext = JsonReadContext.createRootContext(dups); this.reuseResourceInParser = reuseResourceInParser; if (!reuseResourceInParser) { - this.messageUnpacker = MessagePack.newDefaultUnpacker(input); + this.messageUnpacker = MessagePack.newDefaultUnpacker(bufferInputProvider.provide()); return; } else { @@ -175,21 +187,40 @@ private MessagePackParser(IOContext ctxt, } MessageUnpacker messageUnpacker; - Tuple messageUnpackerTuple = messageUnpackerHolder.get(); - if (messageUnpackerTuple == null) { - messageUnpacker = MessagePack.newDefaultUnpacker(input); + MessageBufferInputLocator messageBufferInputLocator; + Triple messageUnpackerTriple = reuseObjectHolder.get(); + if (messageUnpackerTriple == null) { + final MessageBufferInputRegistry messageBufferInputRegistry = new MessageBufferInputRegistry(); + messageBufferInputRegistry.register(src.getClass(), bufferInputProvider); + messageBufferInputLocator = messageBufferInputRegistry; + messageUnpacker = MessagePack.newDefaultUnpacker(messageBufferInputRegistry.get(src.getClass())); } else { // Considering to reuse InputStream with JsonParser.Feature.AUTO_CLOSE_SOURCE, // MessagePackParser needs to use the MessageUnpacker that has the same InputStream // since it has buffer which has loaded the InputStream data ahead. // However, it needs to call MessageUnpacker#reset when the source is different from the previous one. - if (isEnabled(JsonParser.Feature.AUTO_CLOSE_SOURCE) || messageUnpackerTuple.first() != src) { - messageUnpackerTuple.second().reset(input); + if (isEnabled(JsonParser.Feature.AUTO_CLOSE_SOURCE) || messageUnpackerTriple.first() != src) { + final MessageBufferInputLocator bufferInputLocator = messageUnpackerTriple.third(); + MessageBufferInput messageBufferInput = bufferInputLocator.get(src.getClass()); + if (messageBufferInput != null) { + messageBufferInput.reset(src); + } + else { + if (bufferInputLocator instanceof MessageBufferInputRegistry) { + ((MessageBufferInputRegistry) bufferInputLocator).register(src.getClass(), bufferInputProvider); + messageBufferInput = bufferInputLocator.get(src.getClass()); + } + else { + messageBufferInput = bufferInputProvider.provide(); + } + } + messageUnpackerTriple.second().reset(messageBufferInput); } - messageUnpacker = messageUnpackerTuple.second(); + messageUnpacker = messageUnpackerTriple.second(); + messageBufferInputLocator = messageUnpackerTriple.third(); } - messageUnpackerHolder.set(new Tuple(src, messageUnpacker)); + reuseObjectHolder.set(new Triple(src, messageUnpacker, messageBufferInputLocator)); } public void setExtensionTypeCustomDeserializers(ExtensionTypeCustomDeserializers extTypeCustomDesers) @@ -690,7 +721,7 @@ private MessageUnpacker getMessageUnpacker() return this.messageUnpacker; } - Tuple messageUnpackerTuple = messageUnpackerHolder.get(); + Triple messageUnpackerTuple = reuseObjectHolder.get(); if (messageUnpackerTuple == null) { throw new IllegalStateException("messageUnpacker is null"); } diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/Triple.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/Triple.java new file mode 100644 index 000000000..db630ae4a --- /dev/null +++ b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/Triple.java @@ -0,0 +1,49 @@ +// +// MessagePack for Java +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.msgpack.jackson.dataformat; + +/** + * Created by komamitsu on 5/28/15. + */ +public class Triple +{ + private final F first; + private final S second; + + private final T third; + + public Triple(F first, S second, T third) + { + this.first = first; + this.second = second; + this.third = third; + } + + public F first() + { + return first; + } + + public S second() + { + return second; + } + + public T third() + { + return third; + } +}