From 858da9d1661b09a7aabf0fa16f5552e9ca275a30 Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 18 May 2015 21:21:18 +0900 Subject: [PATCH 1/6] fix writing a file whose size is a multiple of PACKET_SIZE --- .../inmemory/client/SurfFSOutputStream.java | 53 ++++++++++--------- .../client/SurfFSOutputStreamTest.java | 2 +- .../inmemory/client/SurfFSWriteITCase.java | 12 ++--- 3 files changed, 34 insertions(+), 33 deletions(-) diff --git a/reef-inmemory/src/main/java/org/apache/reef/inmemory/client/SurfFSOutputStream.java b/reef-inmemory/src/main/java/org/apache/reef/inmemory/client/SurfFSOutputStream.java index 9daf5f9e8..e3abc8749 100644 --- a/reef-inmemory/src/main/java/org/apache/reef/inmemory/client/SurfFSOutputStream.java +++ b/reef-inmemory/src/main/java/org/apache/reef/inmemory/client/SurfFSOutputStream.java @@ -131,13 +131,15 @@ public void close() throws IOException { } private void flush(final boolean close) throws IOException { - flushLocalBuf(close); + if (!nothingGotWritten()) { // if nothing got written at all, there's no need to send anything to a CacheServer + flushLocalBuf(close); - while (packetQueue.size() > 0) { - try { - Thread.sleep(FLUSH_CHECK_INTERVAL); - } catch (InterruptedException e) { - throw new IOException(e); + while (packetQueue.size() > 0) { + try { + Thread.sleep(FLUSH_CHECK_INTERVAL); + } catch (InterruptedException e) { + throw new IOException(e); + } } } } @@ -158,22 +160,18 @@ private void flushLocalBuf(final boolean close) throws IOException { */ private void flushBuf(final byte[] b, final int start, final int end, final boolean close) throws IOException { final int len = end - start; - final boolean somethingToWrite = len > 0; - - if (somethingToWrite) { - if (curBlockInnerOffset == 0) { - curWritableBlockMeta = allocateBlockAtMetaServer(curBlockOffset); - } + if (curBlockInnerOffset == 0 && len != 0) { // only when there's some more to write... + curWritableBlockMeta = allocateBlockAtMetaServer(curBlockOffset); + } - if (curBlockInnerOffset + len < blockSize) { - sendPacket(ByteBuffer.wrap(b, start, len), len, close); - } else if (curBlockInnerOffset + len == blockSize) { - sendPacket(ByteBuffer.wrap(b, start, len), len, true); - } else { - final int possibleLen = (int) (blockSize - curBlockInnerOffset); // this must be int because "possibleLen < len" - sendPacket(ByteBuffer.wrap(b, start, possibleLen), possibleLen, true); - flushBuf(b, start + possibleLen, end, close); // Create another packet with the leftovers - } + if (curBlockInnerOffset + len < blockSize) { + sendPacket(ByteBuffer.wrap(b, start, len), len, close); + } else if (curBlockInnerOffset + len == blockSize) { + sendPacket(ByteBuffer.wrap(b, start, len), len, true); + } else { + final int possibleLen = (int) (blockSize - curBlockInnerOffset); // this must be int because "possibleLen < len" + sendPacket(ByteBuffer.wrap(b, start, possibleLen), possibleLen, true); + flushBuf(b, start + possibleLen, end, close); // Create another packet with the leftovers } } @@ -231,7 +229,7 @@ public void run() { while(!isClosed) { try { final Packet packet = packetQueue.take(); - if (packet.blockInnerOffset == 0) { + if (packet.blockInnerOffset == 0 && packet.buf.limit() > 0) { // only when we need to actually write sth... // Initialize block at the cache server for the first packet of a block initCacheClient(packet.writeableBlockMeta); } @@ -263,7 +261,14 @@ private void initCacheClient(final WriteableBlockMeta writableBlockMeta) { } } + private boolean nothingGotWritten() { + return localBufWriteCount == 0 && curBlockOffset == 0 && curBlockInnerOffset == 0; + } + // Methods used for unit tests. + protected static int getPacketSize() { + return PACKET_SIZE; + } protected int getLocalBufWriteCount() { return localBufWriteCount; @@ -273,10 +278,6 @@ protected long getCurBlockInnerOffset() { return curBlockInnerOffset; } - protected int getPacketSize() { - return PACKET_SIZE; - } - protected long getCurBlockOffset() { return curBlockOffset; } diff --git a/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSOutputStreamTest.java b/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSOutputStreamTest.java index 4876023cb..aea53bb52 100644 --- a/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSOutputStreamTest.java +++ b/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSOutputStreamTest.java @@ -69,7 +69,7 @@ public void testWrite(int fileSize) throws IOException { final byte[] data = new byte[fileSize]; surfFSOutputStream.write(data); - assertEquals(fileSize % surfFSOutputStream.getPacketSize(), surfFSOutputStream.getLocalBufWriteCount()); + assertEquals(fileSize % SurfFSOutputStream.getPacketSize(), surfFSOutputStream.getLocalBufWriteCount()); surfFSOutputStream.flush(); assertEquals(0, surfFSOutputStream.getLocalBufWriteCount()); diff --git a/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java b/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java index 1ac1a1b14..9064e35c8 100644 --- a/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java +++ b/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java @@ -35,10 +35,10 @@ public final class SurfFSWriteITCase { private static final String TESTDIR = ITUtils.getTestDir(); private static final String SHORT = TESTDIR+"/"+"WRITE.short"; - private static final int SIZE1 = 1; + private static final int SMALL_SIZE = 1; private static final String LONG = TESTDIR+"/"+"WRITE.long"; - private static final int SIZE2 = 200; + private static final int PACKET_SIZE = SurfFSOutputStream.getPacketSize() / b.length; private static final String SURF = "surf"; private static final String SURF_ADDRESS = "localhost:18000"; @@ -64,13 +64,13 @@ public static void setUpClass() throws IOException, InjectionException { surfFs.initialize(URI.create(SURF + "://" + SURF_ADDRESS), conf); final FSDataOutputStream stream1 = surfFs.create(new Path(SHORT), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE); - for (int i = 0; i < SIZE1; i++) { + for (int i = 0; i < SMALL_SIZE; i++) { stream1.write(b); } stream1.close(); final FSDataOutputStream stream2 = surfFs.create(new Path(LONG), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE); - for (int i = 0; i < SIZE2; i++) { + for (int i = 0; i < PACKET_SIZE; i++) { stream2.write(b); } stream2.close(); @@ -101,8 +101,8 @@ private void read(final String path, final int size) throws IOException { @Test public void testRead() throws IOException { - read(SHORT, SIZE1); - read(LONG, SIZE2); + read(SHORT, SMALL_SIZE); + read(LONG, PACKET_SIZE); } @Test From accf1748c5b87f369d50dbd94222b8617040b237 Mon Sep 17 00:00:00 2001 From: John Yang Date: Mon, 18 May 2015 21:43:44 +0900 Subject: [PATCH 2/6] refine test to cover corner cases --- .../inmemory/client/SurfFSWriteITCase.java | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java b/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java index 9064e35c8..5719fb3b3 100644 --- a/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java +++ b/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java @@ -34,10 +34,13 @@ public final class SurfFSWriteITCase { private static final String TESTDIR = ITUtils.getTestDir(); - private static final String SHORT = TESTDIR+"/"+"WRITE.short"; + private static final String SMALL = TESTDIR+"/"+"WRITE.short"; private static final int SMALL_SIZE = 1; - private static final String LONG = TESTDIR+"/"+"WRITE.long"; + private static final String ONE_MB = TESTDIR+"/"+"WRITE.onemb"; + private static final int ONE_MB_SIZE = 1024 * 1024 / b.length; + + private static final String PACKET = TESTDIR+"/"+"WRITE.packet"; private static final int PACKET_SIZE = SurfFSOutputStream.getPacketSize() / b.length; private static final String SURF = "surf"; @@ -63,17 +66,26 @@ public static void setUpClass() throws IOException, InjectionException { surfFs = new SurfFS(); surfFs.initialize(URI.create(SURF + "://" + SURF_ADDRESS), conf); - final FSDataOutputStream stream1 = surfFs.create(new Path(SHORT), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE); + // FILE_SIZE < BLOCK_SIZE < PACKET_SIZE + final FSDataOutputStream stream1 = surfFs.create(new Path(SMALL), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE); for (int i = 0; i < SMALL_SIZE; i++) { stream1.write(b); } stream1.close(); - final FSDataOutputStream stream2 = surfFs.create(new Path(LONG), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE); - for (int i = 0; i < PACKET_SIZE; i++) { + // PACKET_SIZE > FILE_SIZE == BLOCK_SIZE + final FSDataOutputStream stream2 = surfFs.create(new Path(ONE_MB), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE); + for (int i = 0; i < ONE_MB_SIZE; i++) { stream2.write(b); } stream2.close(); + + // FILE_SIZE == PACKET_SIZE < BLOCK_SIZE + final FSDataOutputStream stream3 = surfFs.create(new Path(PACKET)); + for (int i = 0; i < PACKET_SIZE; i++) { + stream3.write(b); + } + stream3.close(); } @AfterClass @@ -101,14 +113,16 @@ private void read(final String path, final int size) throws IOException { @Test public void testRead() throws IOException { - read(SHORT, SMALL_SIZE); - read(LONG, PACKET_SIZE); + read(SMALL, SMALL_SIZE); + read(ONE_MB, ONE_MB_SIZE); + read(PACKET, PACKET_SIZE); } @Test public void testExists() throws IOException { - assertTrue(surfFs.exists(new Path(SHORT))); - assertTrue(surfFs.exists(new Path(LONG))); + assertTrue(surfFs.exists(new Path(SMALL))); + read(ONE_MB, ONE_MB_SIZE); + assertTrue(surfFs.exists(new Path(PACKET))); assertFalse(surfFs.exists(new Path("Not_exists"))); } } \ No newline at end of file From 7bdcaef7f98e69e93f292bfa09970cd7ee81b531 Mon Sep 17 00:00:00 2001 From: John Yang Date: Tue, 19 May 2015 12:55:33 +0900 Subject: [PATCH 3/6] change to nothingWasWritten --- .../org/apache/reef/inmemory/client/SurfFSOutputStream.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reef-inmemory/src/main/java/org/apache/reef/inmemory/client/SurfFSOutputStream.java b/reef-inmemory/src/main/java/org/apache/reef/inmemory/client/SurfFSOutputStream.java index e3abc8749..1738f1ad9 100644 --- a/reef-inmemory/src/main/java/org/apache/reef/inmemory/client/SurfFSOutputStream.java +++ b/reef-inmemory/src/main/java/org/apache/reef/inmemory/client/SurfFSOutputStream.java @@ -131,7 +131,7 @@ public void close() throws IOException { } private void flush(final boolean close) throws IOException { - if (!nothingGotWritten()) { // if nothing got written at all, there's no need to send anything to a CacheServer + if (!nothingWasWritten()) { // if nothing got written at all, there's no need to send anything to a CacheServer flushLocalBuf(close); while (packetQueue.size() > 0) { @@ -261,7 +261,7 @@ private void initCacheClient(final WriteableBlockMeta writableBlockMeta) { } } - private boolean nothingGotWritten() { + private boolean nothingWasWritten() { return localBufWriteCount == 0 && curBlockOffset == 0 && curBlockInnerOffset == 0; } From 26ec35aad553ea4c689f962984049fadaab9002f Mon Sep 17 00:00:00 2001 From: John Yang Date: Tue, 19 May 2015 13:01:15 +0900 Subject: [PATCH 4/6] address brian's comments --- .../apache/reef/inmemory/client/SurfFSWriteITCase.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java b/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java index 5719fb3b3..dc776fa24 100644 --- a/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java +++ b/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java @@ -120,9 +120,9 @@ public void testRead() throws IOException { @Test public void testExists() throws IOException { - assertTrue(surfFs.exists(new Path(SMALL))); - read(ONE_MB, ONE_MB_SIZE); - assertTrue(surfFs.exists(new Path(PACKET))); - assertFalse(surfFs.exists(new Path("Not_exists"))); + assertTrue("Should exist", surfFs.exists(new Path(SMALL))); + assertTrue("Should exist", surfFs.exists(new Path(ONE_MB))); + assertTrue("Should exist", surfFs.exists(new Path(PACKET))); + assertFalse("Should not exist", surfFs.exists(new Path("Not_exists"))); } } \ No newline at end of file From e0257a6428f529f4109efd4f569d033cadc2faf3 Mon Sep 17 00:00:00 2001 From: John Yang Date: Tue, 19 May 2015 13:03:04 +0900 Subject: [PATCH 5/6] comment corrections --- .../org/apache/reef/inmemory/client/SurfFSWriteITCase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java b/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java index dc776fa24..889bf8dcd 100644 --- a/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java +++ b/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java @@ -66,14 +66,14 @@ public static void setUpClass() throws IOException, InjectionException { surfFs = new SurfFS(); surfFs.initialize(URI.create(SURF + "://" + SURF_ADDRESS), conf); - // FILE_SIZE < BLOCK_SIZE < PACKET_SIZE + // FILE_SIZE < PACKET_SIZE final FSDataOutputStream stream1 = surfFs.create(new Path(SMALL), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE); for (int i = 0; i < SMALL_SIZE; i++) { stream1.write(b); } stream1.close(); - // PACKET_SIZE > FILE_SIZE == BLOCK_SIZE + // PACKET_SIZE > FILE_SIZE final FSDataOutputStream stream2 = surfFs.create(new Path(ONE_MB), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE); for (int i = 0; i < ONE_MB_SIZE; i++) { stream2.write(b); From 0534513b9c918862c5a5eafa3c5d3d52cbd62d57 Mon Sep 17 00:00:00 2001 From: John Yang Date: Tue, 19 May 2015 14:04:39 +0900 Subject: [PATCH 6/6] another comment corrections --- .../reef/inmemory/client/SurfFSOutputStream.java | 2 +- .../apache/reef/inmemory/client/SurfFSWriteITCase.java | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/reef-inmemory/src/main/java/org/apache/reef/inmemory/client/SurfFSOutputStream.java b/reef-inmemory/src/main/java/org/apache/reef/inmemory/client/SurfFSOutputStream.java index 1738f1ad9..12cb9b811 100644 --- a/reef-inmemory/src/main/java/org/apache/reef/inmemory/client/SurfFSOutputStream.java +++ b/reef-inmemory/src/main/java/org/apache/reef/inmemory/client/SurfFSOutputStream.java @@ -25,7 +25,7 @@ */ public final class SurfFSOutputStream extends OutputStream { private static final Logger LOG = Logger.getLogger(SurfFSOutputStream.class.getName()); - private static final int PACKET_SIZE = 4194304; // 4MB + private static final int PACKET_SIZE = 4 * 1024 * 1024; // 4MB private static final int MAX_PACKETS = 80; private static final int COMPLETE_FILE_RETRY_NUM = 5; private static final int COMPLETE_FILE_RETRY_INTERVAL = 400; diff --git a/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java b/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java index 889bf8dcd..e75a0624c 100644 --- a/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java +++ b/reef-inmemory/src/test/java/org/apache/reef/inmemory/client/SurfFSWriteITCase.java @@ -47,7 +47,7 @@ public final class SurfFSWriteITCase { private static final String SURF_ADDRESS = "localhost:18000"; private static final short REPLICATION = 3; - private static final int BLOCK_SIZE = 1024; // Need to be a multiple of 512 (Hadoop Checksum Policy) + private static final int BLOCK_SIZE = 1024 * 1024; // 1MB, Need to be a multiple of 512 (Hadoop Checksum Policy) private static final int BUFFER_SIZE = 4096; private static final SurfLauncher surfLauncher = new SurfLauncher(); @@ -66,22 +66,22 @@ public static void setUpClass() throws IOException, InjectionException { surfFs = new SurfFS(); surfFs.initialize(URI.create(SURF + "://" + SURF_ADDRESS), conf); - // FILE_SIZE < PACKET_SIZE + // FILE_SIZE(8B) < BLOCK_SIZE(1MB) < PACKET_SIZE(4MB) final FSDataOutputStream stream1 = surfFs.create(new Path(SMALL), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE); for (int i = 0; i < SMALL_SIZE; i++) { stream1.write(b); } stream1.close(); - // PACKET_SIZE > FILE_SIZE + // FILE_SIZE(1MB) == BLOCK_SIZE(1MB) < PACKET_SIZE(4MB) final FSDataOutputStream stream2 = surfFs.create(new Path(ONE_MB), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE); for (int i = 0; i < ONE_MB_SIZE; i++) { stream2.write(b); } stream2.close(); - // FILE_SIZE == PACKET_SIZE < BLOCK_SIZE - final FSDataOutputStream stream3 = surfFs.create(new Path(PACKET)); + // FILE_SIZE(4MB) == PACKET_SIZE(4MB) < BLOCK_SIZE(64MB) + final FSDataOutputStream stream3 = surfFs.create(new Path(PACKET)); // use the default BLOCK_SIZE (64MB) for (int i = 0; i < PACKET_SIZE; i++) { stream3.write(b); }