diff --git a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs index 0413245f9c..9825a09248 100644 --- a/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs +++ b/libs/storage/Tsavorite/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs @@ -440,29 +440,56 @@ public override unsafe void ReadAsync(int segmentId, ulong sourceAddress, IntPtr // Lazily cache the blob entry for the segment being read if (!blobs.TryGetValue(segmentId, out BlobEntry blobEntry)) { - var blobClients = pageBlobDirectory.GetPageBlobClient(GetSegmentBlobName(segmentId)); - var entry = new BlobEntry(blobClients, blobClients.Default.GetProperties().Value.ETag, this); - blobs.TryAdd(segmentId, entry); + BlobEntry entry = new(this); + if (blobs.TryAdd(segmentId, entry)) + { + var pageBlob = pageBlobDirectory.GetPageBlobClient(GetSegmentBlobName(segmentId)); + + // If segment size is -1 we use a default + var size = segmentSize == -1 ? MAX_PAGEBLOB_SIZE : segmentSize; + + // If no blob exists for the segment, we must first create the segment asynchronouly. (Create call takes ~70 ms by measurement) + // After creation is done, we can call read. + _ = entry.CreateAsync(size, pageBlob); + } + // Otherwise, some other thread beat us to it. Okay to use their blobs. blobEntry = blobs[segmentId]; } - ReadFromBlobUnsafeAsync(blobEntry.PageBlob, (long)sourceAddress, (long)destinationAddress, readLength, id) - .ContinueWith((Task t) => - { - if (pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request)) - { - if (t.IsFaulted) - { - BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} (Failure)"); - request.Callback(uint.MaxValue, request.NumBytes, request.Context); - } - else - { - BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id}"); - request.Callback(0, request.NumBytes, request.Context); - } - } - }, TaskContinuationOptions.ExecuteSynchronously); + TryReadAsync(blobEntry, (long)sourceAddress, (long)destinationAddress, readLength, id); + } + + void TryReadAsync(BlobEntry blobEntry, long sourceAddress, long destinationAddress, uint readLength, long id) + { + // If pageBlob is null, it means the blob has not been created yet. We should wait for it to be created. + if (blobEntry.PageBlob.Default == null + && blobEntry.TryQueueAction(() => ReadFromBlobAsync(blobEntry, sourceAddress, destinationAddress, readLength, id))) + { + return; + } + // Otherwise, we can proceed with the read. + ReadFromBlobAsync(blobEntry, sourceAddress, destinationAddress, readLength, id); + } + + unsafe void ReadFromBlobAsync(BlobEntry blobEntry, long sourceAddress, long destinationAddress, uint readLength, long id) + { + ReadFromBlobUnsafeAsync(blobEntry, sourceAddress, destinationAddress, readLength, id) + .ContinueWith((Task t) => + { + if (pendingReadWriteOperations.TryRemove(id, out ReadWriteRequestInfo request)) + { + if (t.IsFaulted) + { + BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} (Failure)"); + request.Callback(uint.MaxValue, request.NumBytes, request.Context); + } + else + { + BlobManager?.StorageTracer?.TsavoriteStorageProgress($"StorageOpReturned AzureStorageDevice.ReadAsync id={id} for blob={blobEntry.ETag}"); + request.Callback(0, request.NumBytes, request.Context); + } + } + }, TaskContinuationOptions.ExecuteSynchronously); } /// @@ -557,12 +584,12 @@ await BlobManager.PerformWithRetriesAsync( } } - unsafe Task ReadFromBlobUnsafeAsync(BlobUtilsV12.PageBlobClients blob, long sourceAddress, long destinationAddress, uint readLength, long id) + unsafe Task ReadFromBlobUnsafeAsync(BlobEntry blob, long sourceAddress, long destinationAddress, uint readLength, long id) { return ReadFromBlobAsync(new UnmanagedMemoryStream((byte*)destinationAddress, readLength, readLength, FileAccess.Write), blob, sourceAddress, readLength, id); } - async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, BlobUtilsV12.PageBlobClients blob, long sourceAddress, uint readLength, long id) + async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, BlobEntry blob, long sourceAddress, uint readLength, long id) { using (stream) { @@ -577,7 +604,7 @@ await BlobManager.PerformWithRetriesAsync( "PageBlobClient.DownloadStreamingAsync", "ReadFromDevice", $"id={id} readLength={length} sourceAddress={sourceAddress + offset}", - blob.Default.Name, + blob.PageBlob.Default.Name, 1000 + (int)length / 1000, true, async (numAttempts) => @@ -589,7 +616,7 @@ await BlobManager.PerformWithRetriesAsync( if (length > 0) { - var client = (numAttempts > 1 || length == MAX_DOWNLOAD_SIZE) ? blob.Default : blob.Aggressive; + var client = (numAttempts > 1 || length == MAX_DOWNLOAD_SIZE) ? blob.PageBlob.Default : blob.PageBlob.Aggressive; var response = await client.DownloadStreamingAsync( range: new Azure.HttpRange(sourceAddress + offset, length),