Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Azure Device Behavior for Non-Existent Files #1066

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -440,29 +440,56 @@ public override unsafe void ReadAsync(int segmentId, ulong sourceAddress, IntPtr
// Lazily cache the blob entry for the segment being read
if (!blobs.TryGetValue(segmentId, out BlobEntry blobEntry))
{
var blobClients = pageBlobDirectory.GetPageBlobClient(GetSegmentBlobName(segmentId));
var entry = new BlobEntry(blobClients, blobClients.Default.GetProperties().Value.ETag, this);
blobs.TryAdd(segmentId, entry);
BlobEntry entry = new(this);
if (blobs.TryAdd(segmentId, entry))
{
var pageBlob = pageBlobDirectory.GetPageBlobClient(GetSegmentBlobName(segmentId));

// If segment size is -1 we use a default
var size = segmentSize == -1 ? MAX_PAGEBLOB_SIZE : segmentSize;

// If no blob exists for the segment, we must first create the segment asynchronouly. (Create call takes ~70 ms by measurement)
// After creation is done, we can call read.
_ = entry.CreateAsync(size, pageBlob);
}
// Otherwise, some other thread beat us to it. Okay to use their blobs.
blobEntry = blobs[segmentId];
}

ReadFromBlobUnsafeAsync(blobEntry.PageBlob, (long)sourceAddress, (long)destinationAddress, readLength, id)
.ContinueWith((Task t) =>
{
if (pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request))
{
if (t.IsFaulted)
{
BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} (Failure)");
request.Callback(uint.MaxValue, request.NumBytes, request.Context);
}
else
{
BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id}");
request.Callback(0, request.NumBytes, request.Context);
}
}
}, TaskContinuationOptions.ExecuteSynchronously);
TryReadAsync(blobEntry, (long)sourceAddress, (long)destinationAddress, readLength, id);
}

void TryReadAsync(BlobEntry blobEntry, long sourceAddress, long destinationAddress, uint readLength, long id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming: This is not an async method (same goes for ReadFromBlobAsync)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The convention in AzureStorageDevice currently is that methods that perform work asynchronously end in Async, even if they are not strictly within the async-await paradigm. So I would leave it as is for this PR.

{
// If pageBlob is null, it means the blob has not been created yet. We should wait for it to be created.
if (blobEntry.PageBlob.Default == null
&& blobEntry.TryQueueAction(() => ReadFromBlobAsync(blobEntry, sourceAddress, destinationAddress, readLength, id)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

{
return;
}
// Otherwise, we can proceed with the read.
ReadFromBlobAsync(blobEntry, sourceAddress, destinationAddress, readLength, id);
}

unsafe void ReadFromBlobAsync(BlobEntry blobEntry, long sourceAddress, long destinationAddress, uint readLength, long id)
{
ReadFromBlobUnsafeAsync(blobEntry, sourceAddress, destinationAddress, readLength, id)
.ContinueWith((Task t) =>
{
if (pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request))
{
if (t.IsFaulted)
{
BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} (Failure)");
request.Callback(uint.MaxValue, request.NumBytes, request.Context);
}
else
{
BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} for blob={blobEntry.ETag}");
request.Callback(0, request.NumBytes, request.Context);
}
}
}, TaskContinuationOptions.ExecuteSynchronously);
}

/// <summary>
Expand Down Expand Up @@ -557,12 +584,12 @@ await BlobManager.PerformWithRetriesAsync(
}
}

unsafe Task ReadFromBlobUnsafeAsync(BlobUtilsV12.PageBlobClients blob, long sourceAddress, long destinationAddress, uint readLength, long id)
unsafe Task ReadFromBlobUnsafeAsync(BlobEntry blob, long sourceAddress, long destinationAddress, uint readLength, long id)
{
return ReadFromBlobAsync(new UnmanagedMemoryStream((byte*)destinationAddress, readLength, readLength, FileAccess.Write), blob, sourceAddress, readLength, id);
}

async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, BlobUtilsV12.PageBlobClients blob, long sourceAddress, uint readLength, long id)
async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, BlobEntry blob, long sourceAddress, uint readLength, long id)
{
using (stream)
{
Expand All @@ -577,7 +604,7 @@ await BlobManager.PerformWithRetriesAsync(
"PageBlobClient.DownloadStreamingAsync",
"ReadFromDevice",
$"id={id} readLength={length} sourceAddress={sourceAddress + offset}",
blob.Default.Name,
blob.PageBlob.Default.Name,
1000 + (int)length / 1000,
true,
async (numAttempts) =>
Expand All @@ -589,7 +616,7 @@ await BlobManager.PerformWithRetriesAsync(

if (length > 0)
{
var client = (numAttempts > 1 || length == MAX_DOWNLOAD_SIZE) ? blob.Default : blob.Aggressive;
var client = (numAttempts > 1 || length == MAX_DOWNLOAD_SIZE) ? blob.PageBlob.Default : blob.PageBlob.Aggressive;

var response = await client.DownloadStreamingAsync(
range: new Azure.HttpRange(sourceAddress + offset, length),
Expand Down