Skip to content

Commit

Permalink
Merge pull request #254 from cmssnu/jyang-packet-size-write
Browse files Browse the repository at this point in the history
Send a packet to the cache server even when there's no data to send
  • Loading branch information
bchocho committed May 19, 2015
2 parents 2b6a53c + 0534513 commit e69db49
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*/
public final class SurfFSOutputStream extends OutputStream {
private static final Logger LOG = Logger.getLogger(SurfFSOutputStream.class.getName());
private static final int PACKET_SIZE = 4194304; // 4MB
private static final int PACKET_SIZE = 4 * 1024 * 1024; // 4MB
private static final int MAX_PACKETS = 80;
private static final int COMPLETE_FILE_RETRY_NUM = 5;
private static final int COMPLETE_FILE_RETRY_INTERVAL = 400;
Expand Down Expand Up @@ -131,13 +131,15 @@ public void close() throws IOException {
}

private void flush(final boolean close) throws IOException {
flushLocalBuf(close);
if (!nothingWasWritten()) { // if nothing got written at all, there's no need to send anything to a CacheServer
flushLocalBuf(close);

while (packetQueue.size() > 0) {
try {
Thread.sleep(FLUSH_CHECK_INTERVAL);
} catch (InterruptedException e) {
throw new IOException(e);
while (packetQueue.size() > 0) {
try {
Thread.sleep(FLUSH_CHECK_INTERVAL);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}
}
Expand All @@ -158,22 +160,18 @@ private void flushLocalBuf(final boolean close) throws IOException {
*/
private void flushBuf(final byte[] b, final int start, final int end, final boolean close) throws IOException {
final int len = end - start;
final boolean somethingToWrite = len > 0;

if (somethingToWrite) {
if (curBlockInnerOffset == 0) {
curWritableBlockMeta = allocateBlockAtMetaServer(curBlockOffset);
}
if (curBlockInnerOffset == 0 && len != 0) { // only when there's some more to write...
curWritableBlockMeta = allocateBlockAtMetaServer(curBlockOffset);
}

if (curBlockInnerOffset + len < blockSize) {
sendPacket(ByteBuffer.wrap(b, start, len), len, close);
} else if (curBlockInnerOffset + len == blockSize) {
sendPacket(ByteBuffer.wrap(b, start, len), len, true);
} else {
final int possibleLen = (int) (blockSize - curBlockInnerOffset); // this must be int because "possibleLen < len"
sendPacket(ByteBuffer.wrap(b, start, possibleLen), possibleLen, true);
flushBuf(b, start + possibleLen, end, close); // Create another packet with the leftovers
}
if (curBlockInnerOffset + len < blockSize) {
sendPacket(ByteBuffer.wrap(b, start, len), len, close);
} else if (curBlockInnerOffset + len == blockSize) {
sendPacket(ByteBuffer.wrap(b, start, len), len, true);
} else {
final int possibleLen = (int) (blockSize - curBlockInnerOffset); // this must be int because "possibleLen < len"
sendPacket(ByteBuffer.wrap(b, start, possibleLen), possibleLen, true);
flushBuf(b, start + possibleLen, end, close); // Create another packet with the leftovers
}
}

Expand Down Expand Up @@ -231,7 +229,7 @@ public void run() {
while(!isClosed) {
try {
final Packet packet = packetQueue.take();
if (packet.blockInnerOffset == 0) {
if (packet.blockInnerOffset == 0 && packet.buf.limit() > 0) { // only when we need to actually write sth...
// Initialize block at the cache server for the first packet of a block
initCacheClient(packet.writeableBlockMeta);
}
Expand Down Expand Up @@ -263,7 +261,14 @@ private void initCacheClient(final WriteableBlockMeta writableBlockMeta) {
}
}

private boolean nothingWasWritten() {
return localBufWriteCount == 0 && curBlockOffset == 0 && curBlockInnerOffset == 0;
}

// Methods used for unit tests.
protected static int getPacketSize() {
return PACKET_SIZE;
}

protected int getLocalBufWriteCount() {
return localBufWriteCount;
Expand All @@ -273,10 +278,6 @@ protected long getCurBlockInnerOffset() {
return curBlockInnerOffset;
}

protected int getPacketSize() {
return PACKET_SIZE;
}

protected long getCurBlockOffset() {
return curBlockOffset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testWrite(int fileSize) throws IOException {
final byte[] data = new byte[fileSize];

surfFSOutputStream.write(data);
assertEquals(fileSize % surfFSOutputStream.getPacketSize(), surfFSOutputStream.getLocalBufWriteCount());
assertEquals(fileSize % SurfFSOutputStream.getPacketSize(), surfFSOutputStream.getLocalBufWriteCount());

surfFSOutputStream.flush();
assertEquals(0, surfFSOutputStream.getLocalBufWriteCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,20 @@ public final class SurfFSWriteITCase {

private static final String TESTDIR = ITUtils.getTestDir();

private static final String SHORT = TESTDIR+"/"+"WRITE.short";
private static final int SIZE1 = 1;
private static final String SMALL = TESTDIR+"/"+"WRITE.short";
private static final int SMALL_SIZE = 1;

private static final String LONG = TESTDIR+"/"+"WRITE.long";
private static final int SIZE2 = 200;
private static final String ONE_MB = TESTDIR+"/"+"WRITE.onemb";
private static final int ONE_MB_SIZE = 1024 * 1024 / b.length;

private static final String PACKET = TESTDIR+"/"+"WRITE.packet";
private static final int PACKET_SIZE = SurfFSOutputStream.getPacketSize() / b.length;

private static final String SURF = "surf";
private static final String SURF_ADDRESS = "localhost:18000";

private static final short REPLICATION = 3;
private static final int BLOCK_SIZE = 1024; // Need to be a multiple of 512 (Hadoop Checksum Policy)
private static final int BLOCK_SIZE = 1024 * 1024; // 1MB, Need to be a multiple of 512 (Hadoop Checksum Policy)
private static final int BUFFER_SIZE = 4096;

private static final SurfLauncher surfLauncher = new SurfLauncher();
Expand All @@ -63,17 +66,26 @@ public static void setUpClass() throws IOException, InjectionException {
surfFs = new SurfFS();
surfFs.initialize(URI.create(SURF + "://" + SURF_ADDRESS), conf);

final FSDataOutputStream stream1 = surfFs.create(new Path(SHORT), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE);
for (int i = 0; i < SIZE1; i++) {
// FILE_SIZE(8B) < BLOCK_SIZE(1MB) < PACKET_SIZE(4MB)
final FSDataOutputStream stream1 = surfFs.create(new Path(SMALL), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE);
for (int i = 0; i < SMALL_SIZE; i++) {
stream1.write(b);
}
stream1.close();

final FSDataOutputStream stream2 = surfFs.create(new Path(LONG), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE);
for (int i = 0; i < SIZE2; i++) {
// FILE_SIZE(1MB) == BLOCK_SIZE(1MB) < PACKET_SIZE(4MB)
final FSDataOutputStream stream2 = surfFs.create(new Path(ONE_MB), true, BUFFER_SIZE, REPLICATION, BLOCK_SIZE);
for (int i = 0; i < ONE_MB_SIZE; i++) {
stream2.write(b);
}
stream2.close();

// FILE_SIZE(4MB) == PACKET_SIZE(4MB) < BLOCK_SIZE(64MB)
final FSDataOutputStream stream3 = surfFs.create(new Path(PACKET)); // use the default BLOCK_SIZE (64MB)
for (int i = 0; i < PACKET_SIZE; i++) {
stream3.write(b);
}
stream3.close();
}

@AfterClass
Expand Down Expand Up @@ -101,14 +113,16 @@ private void read(final String path, final int size) throws IOException {

@Test
public void testRead() throws IOException {
read(SHORT, SIZE1);
read(LONG, SIZE2);
read(SMALL, SMALL_SIZE);
read(ONE_MB, ONE_MB_SIZE);
read(PACKET, PACKET_SIZE);
}

@Test
public void testExists() throws IOException {
assertTrue(surfFs.exists(new Path(SHORT)));
assertTrue(surfFs.exists(new Path(LONG)));
assertFalse(surfFs.exists(new Path("Not_exists")));
assertTrue("Should exist", surfFs.exists(new Path(SMALL)));
assertTrue("Should exist", surfFs.exists(new Path(ONE_MB)));
assertTrue("Should exist", surfFs.exists(new Path(PACKET)));
assertFalse("Should not exist", surfFs.exists(new Path("Not_exists")));
}
}

0 comments on commit e69db49

Please sign in to comment.