diff --git a/reef-inmemory/src/main/java/org/apache/reef/inmemory/client/SurfFSOutputStream.java b/reef-inmemory/src/main/java/org/apache/reef/inmemory/client/SurfFSOutputStream.java index 9daf5f9e8..12cb9b811 100644 --- a/reef-inmemory/src/main/java/org/apache/reef/inmemory/client/SurfFSOutputStream.java +++ b/reef-inmemory/src/main/java/org/apache/reef/inmemory/client/SurfFSOutputStream.java @@ -25,7 +25,7 @@ */ public final class SurfFSOutputStream extends OutputStream { private static final Logger LOG = Logger.getLogger(SurfFSOutputStream.class.getName()); - private static final int PACKET_SIZE = 4194304; // 4MB + private static final int PACKET_SIZE = 4 * 1024 * 1024; // 4MB private static final int MAX_PACKETS = 80; private static final int COMPLETE_FILE_RETRY_NUM = 5; private static final int COMPLETE_FILE_RETRY_INTERVAL = 400; @@ -131,13 +131,15 @@ public void close() throws IOException { } private void flush(final boolean close) throws IOException { - flushLocalBuf(close); + if (!nothingWasWritten()) { // if nothing got written at all, there's no need to send anything to a CacheServer + flushLocalBuf(close); - while (packetQueue.size() > 0) { - try { - Thread.sleep(FLUSH_CHECK_INTERVAL); - } catch (InterruptedException e) { - throw new IOException(e); + while (packetQueue.size() > 0) { + try { + Thread.sleep(FLUSH_CHECK_INTERVAL); + } catch (InterruptedException e) { + throw new IOException(e); + } } } } @@ -158,22 +160,18 @@ private void flushLocalBuf(final boolean close) throws IOException { */ private void flushBuf(final byte[] b, final int start, final int end, final boolean close) throws IOException { final int len = end - start; - final boolean somethingToWrite = len > 0; - - if (somethingToWrite) { - if (curBlockInnerOffset == 0) { - curWritableBlockMeta = allocateBlockAtMetaServer(curBlockOffset); - } + if (curBlockInnerOffset == 0 && len != 0) { // only when there's some more to write... + curWritableBlockMeta = allocateBlockAtMetaServer(curBlockOffset); + } - if (curBlockInnerOffset + len < blockSize) { - sendPacket(ByteBuffer.wrap(b, start, len), len, close); - } else if (curBlockInnerOffset + len == blockSize) { - sendPacket(ByteBuffer.wrap(b, start, len), len, true); - } else { - final int possibleLen = (int) (blockSize - curBlockInnerOffset); // this must be int because "possibleLen < len" - sendPacket(ByteBuffer.wrap(b, start, possibleLen), possibleLen, true); - flushBuf(b, start + possibleLen, end, close); // Create another packet with the leftovers - } + if (curBlockInnerOffset + len < blockSize) { + sendPacket(ByteBuffer.wrap(b, start, len), len, close); + } else if (curBlockInnerOffset + len == blockSize) { + sendPacket(ByteBuffer.wrap(b, start, len), len, true); + } else { + final int possibleLen = (int) (blockSize - curBlockInnerOffset); // this must be int because "possibleLen < len" + sendPacket(ByteBuffer.wrap(b, start, possibleLen), possibleLen, true); + flushBuf(b, start + possibleLen, end, close); // Create another packet with the leftovers } } @@ -231,7 +229,7 @@ public void run() { while(!isClosed) { try { final Packet packet = packetQueue.take(); - if (packet.blockInnerOffset == 0) { + if (packet.blockInnerOffset == 0 && packet.buf.limit() > 0) { // only when we need to actually write sth... // Initialize block at the cache server for the first packet of a block initCacheClient(packet.writeableBlockMeta); } @@ -263,7 +261,14 @@ private void initCacheClient(final WriteableBlockMeta writableBlockMeta) { } } + private boolean nothingWasWritten() { + return localBufWriteCount == 0 && curBlockOffset == 0 && curBlockInnerOffset == 0; + } + // Methods used for unit tests. + protected static int getPacketSize() { + return PACKET_SIZE; + } protected int getLocalBufWriteCount() { return localBufWriteCount; @@ -273,10 +278,6 @@ protected long getCurBlockInnerOffset() { return curBlockInnerOffset; } - protected int getPacketSize() { - return PACKET_SIZE; - } - protected long getCurBlockOffset() { return curBlockOffset; } diff --git a/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSOutputStreamTest.java b/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSOutputStreamTest.java index 4876023cb..aea53bb52 100644 --- a/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSOutputStreamTest.java +++ b/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSOutputStreamTest.java @@ -69,7 +69,7 @@ public void testWrite(int fileSize) throws IOException { final byte[] data = new byte[fileSize]; surfFSOutputStream.write(data); - assertEquals(fileSize % surfFSOutputStream.getPacketSize(), surfFSOutputStream.getLocalBufWriteCount()); + assertEquals(fileSize % SurfFSOutputStream.getPacketSize(), surfFSOutputStream.getLocalBufWriteCount()); surfFSOutputStream.flush(); assertEquals(0, surfFSOutputStream.getLocalBufWriteCount()); diff --git a/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java b/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java index 1ac1a1b14..e75a0624c 100644 --- a/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java +++ b/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java @@ -34,17 +34,20 @@ public final class SurfFSWriteITCase { private static final String TESTDIR = ITUtils.getTestDir(); - private static final String SHORT = TESTDIR+"/"+"WRITE.short"; - private static final int SIZE1 = 1; + private static final String SMALL = TESTDIR+"/"+"WRITE.short"; + private static final int SMALL_SIZE = 1; - private static final String LONG = TESTDIR+"/"+"WRITE.long"; - private static final int SIZE2 = 200; + private static final String ONE_MB = TESTDIR+"/"+"WRITE.onemb"; + private static final int ONE_MB_SIZE = 1024 * 1024 / b.length; + + private static final String PACKET = TESTDIR+"/"+"WRITE.packet"; + private static final int PACKET_SIZE = SurfFSOutputStream.getPacketSize() / b.length; private static final String SURF = "surf"; private static final String SURF_ADDRESS = "localhost:18000"; private static final short REPLICATION = 3; - private static final int BLOCK_SIZE = 1024; // Need to be a multiple of 512 (Hadoop Checksum Policy) + private static final int BLOCK_SIZE = 1024 * 1024; // 1MB, Need to be a multiple of 512 (Hadoop Checksum Policy) private static final int BUFFER_SIZE = 4096; private static final SurfLauncher surfLauncher = new SurfLauncher(); @@ -63,17 +66,26 @@ public static void setUpClass() throws IOException, InjectionException { surfFs = new SurfFS(); surfFs.initialize(URI.create(SURF + "://" + SURF_ADDRESS), conf); - final FSDataOutputStream stream1 = surfFs.create(new Path(SHORT), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE); - for (int i = 0; i < SIZE1; i++) { + // FILE_SIZE(8B) < BLOCK_SIZE(1MB) < PACKET_SIZE(4MB) + final FSDataOutputStream stream1 = surfFs.create(new Path(SMALL), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE); + for (int i = 0; i < SMALL_SIZE; i++) { stream1.write(b); } stream1.close(); - final FSDataOutputStream stream2 = surfFs.create(new Path(LONG), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE); - for (int i = 0; i < SIZE2; i++) { + // FILE_SIZE(1MB) == BLOCK_SIZE(1MB) < PACKET_SIZE(4MB) + final FSDataOutputStream stream2 = surfFs.create(new Path(ONE_MB), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE); + for (int i = 0; i < ONE_MB_SIZE; i++) { stream2.write(b); } stream2.close(); + + // FILE_SIZE(4MB) == PACKET_SIZE(4MB) < BLOCK_SIZE(64MB) + final FSDataOutputStream stream3 = surfFs.create(new Path(PACKET)); // use the default BLOCK_SIZE (64MB) + for (int i = 0; i < PACKET_SIZE; i++) { + stream3.write(b); + } + stream3.close(); } @AfterClass @@ -101,14 +113,16 @@ private void read(final String path, final int size) throws IOException { @Test public void testRead() throws IOException { - read(SHORT, SIZE1); - read(LONG, SIZE2); + read(SMALL, SMALL_SIZE); + read(ONE_MB, ONE_MB_SIZE); + read(PACKET, PACKET_SIZE); } @Test public void testExists() throws IOException { - assertTrue(surfFs.exists(new Path(SHORT))); - assertTrue(surfFs.exists(new Path(LONG))); - assertFalse(surfFs.exists(new Path("Not_exists"))); + assertTrue("Should exist", surfFs.exists(new Path(SMALL))); + assertTrue("Should exist", surfFs.exists(new Path(ONE_MB))); + assertTrue("Should exist", surfFs.exists(new Path(PACKET))); + assertFalse("Should not exist", surfFs.exists(new Path("Not_exists"))); } } \ No newline at end of file