Skip to content

Commit

Permalink
Add logging infra to PineconeClient and Index. (#126)
Browse files Browse the repository at this point in the history
Fixes #118

* Add logging infra to PineconeClient and Index.

The idea is, each operation has a pair of log entries - start is logged at Trace level (and should be ignored in most cases), and then we either log success at Debug level, or error at Error level.
Only logging basic information like operation type and the index name - we want to avoid printing sensitive data in the logs for security reasons.

Also wired up logger(factory) to GrpcChannel and HttpClient for more detailed logging.

* Iterate on logging impl and more
- Remove excessive logging for now (tentative)
- Skip .Trace log level in tests which outputs sensitive http headers
- Introduce exception types  to preserve and expose partial success state on parallel batch operation failure
- Remove dependency on CommunityToolkit.Diagnostics

* Further work:
- Redact API key from logged headers
- Match request headers behavior of the official clients
- Add support for client-side gRPC load balancing (it does not help much because Pinecone uses a misconfigured reverse-proxy that has barely any streams over a single H2 connection)
- Add batching and parallelization to Delete(vectors)
- Update dependencies

---------

Co-authored-by: neon-sunset <[email protected]>
  • Loading branch information
maumar and neon-sunset authored Sep 18, 2024
1 parent 3eab568 commit d462352
Show file tree
Hide file tree
Showing 18 changed files with 586 additions and 164 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ In the absence of an official SDK, it provides first-class support for Pinecone
- Standard operations on pod-based and serverless indexes
- gRPC and REST transports for vector operations
- Sparse-dense vectors
- Parallel vector upsert and fetch
- Automatic batching and parallelization for upsert, fetch and delete operations
- Efficient vector serialization
- Metadata support
- NativeAOT compatibility (e.g. for AWS Lambda)
Expand Down
10 changes: 10 additions & 0 deletions src/Constants.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
using Microsoft.Extensions.Http;

namespace Pinecone;

internal static class Constants
{
public const string RestApiKey = "Api-Key";
public const string GrpcApiKey = "api-key";

public static readonly string Version =
typeof(Constants).Assembly.GetName().Version?.ToString(3) ?? "0.0.0";

public static readonly HttpClientFactoryOptions RedactApiKeyOptions = new()
{
ShouldRedactHeaderValue = h => h.Equals(RestApiKey, StringComparison.OrdinalIgnoreCase)
};
}
37 changes: 29 additions & 8 deletions src/Extensions.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using Grpc.Core;

namespace Pinecone;

internal static class Extensions
{
internal static void AddPineconeHeaders(this HttpClient http, string apiKey)
{
var headers = http.DefaultRequestHeaders;

if (!headers.Contains(Constants.RestApiKey))
headers.Add(Constants.RestApiKey, apiKey);
if (!headers.Contains("X-Pinecone-Api-Version"))
headers.Add("X-Pinecone-Api-Version", "2024-07");
if (!headers.Contains("User-Agent"))
headers.TryAddWithoutValidation("User-Agent", $"lang=C#; Pinecone.NET/{Constants.Version}");
}

internal static Metadata WithPineconeProps(this Metadata metadata, string apiKey)
{
metadata.Add(Constants.GrpcApiKey, apiKey);
metadata.Add("X-Pinecone-Api-Version", "2024-07");
metadata.Add("User-Agent", $"lang=C#; Pinecone.NET/{Constants.Version}");

return metadata;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static ValueTask CheckStatusCode(
this HttpResponseMessage response, CancellationToken ct, [CallerMemberName] string requestName = "")
internal static ValueTask CheckStatusCode(this HttpResponseMessage response, CancellationToken ct, [CallerMemberName] string requestName = "")
{
#if NETSTANDARD2_0
return response.IsSuccessStatusCode ? default : ThrowOnFailedResponse(response, requestName, ct);
Expand All @@ -17,15 +38,15 @@ internal static ValueTask CheckStatusCode(
#endif

[DoesNotReturn, StackTraceHidden]
static async ValueTask ThrowOnFailedResponse(
HttpResponseMessage response, string requestName, CancellationToken ct)
static async ValueTask ThrowOnFailedResponse(HttpResponseMessage response, string requestName, CancellationToken ct)
{
throw new HttpRequestException($"{requestName} request has failed. " +
var message = $"{requestName} request has failed. " +
#if NETSTANDARD2_0
$"Code: {response.StatusCode}. Message: {await response.Content.ReadAsStringAsync().ConfigureAwait(false)}");
$"Code: {response.StatusCode}. Message: {await response.Content.ReadAsStringAsync().ConfigureAwait(false)}";
#else
$"Code: {response.StatusCode}. Message: {await response.Content.ReadAsStringAsync(ct).ConfigureAwait(false)}");
$"Code: {response.StatusCode}. Message: {await response.Content.ReadAsStringAsync(ct).ConfigureAwait(false)}";
#endif
throw new HttpRequestException(message);
}
}

Expand All @@ -36,4 +57,4 @@ internal static void Deconstruct<T1, T2>(this KeyValuePair<T1, T2> tuple, out T1
value = tuple.Value;
}
#endif
}
}
6 changes: 2 additions & 4 deletions src/Grpc/Converters.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System.Globalization;
using System.Reflection;
using System.Runtime.CompilerServices;
using CommunityToolkit.Diagnostics;
using Google.Protobuf.Collections;
using Google.Protobuf.WellKnownTypes;

Expand Down Expand Up @@ -32,7 +30,7 @@ public static Value ToProtoValue(this MetadataValue source)
bool boolean => Value.ForBool(boolean),
MetadataMap nested => Value.ForStruct(nested.ToProtoStruct()),
IEnumerable<MetadataValue> list => Value.ForList(list.Select(v => v.ToProtoValue()).ToArray()),
_ => ThrowHelper.ThrowArgumentException<Value>($"Unsupported metadata type: {source.Inner!.GetType()}")
_ => ThrowHelpers.ArgumentException<Value>($"Unsupported metadata type: {source.Inner!.GetType()}")
};
}

Expand Down Expand Up @@ -122,7 +120,7 @@ Value.KindOneofCase.None or
Value.KindOneofCase.BoolValue => new(source.BoolValue),
Value.KindOneofCase.StructValue => new(source.StructValue.Fields.ToPublicType()),
Value.KindOneofCase.ListValue => new(source.ListValue.Values.Select(v => v.ToPublicType()).ToArray()),
_ => ThrowHelper.ThrowArgumentException<MetadataValue>($"Unsupported metadata type: {source.KindCase}")
_ => ThrowHelpers.ArgumentException<MetadataValue>($"Unsupported metadata type: {source.KindCase}")
};
}

Expand Down
52 changes: 30 additions & 22 deletions src/Grpc/GrpcTransport.cs
Original file line number Diff line number Diff line change
@@ -1,28 +1,36 @@
using CommunityToolkit.Diagnostics;
using Grpc.Core;
using Grpc.Net.Client;
using Grpc.Net.Client.Configuration;
using Microsoft.Extensions.Logging;

namespace Pinecone.Grpc;

public readonly record struct GrpcTransport : ITransport<GrpcTransport>
{
private readonly Metadata Auth;
readonly Metadata Metadata;
readonly GrpcChannel Channel;
readonly VectorService.VectorServiceClient Grpc;

private readonly GrpcChannel Channel;

private readonly VectorService.VectorServiceClient Grpc;

public GrpcTransport(string host, string apiKey)
public GrpcTransport(string host, string apiKey, ILoggerFactory? loggerFactory = null)
{
Guard.IsNotNullOrWhiteSpace(host);
Guard.IsNotNullOrWhiteSpace(apiKey);
ThrowHelpers.CheckNullOrWhiteSpace(host);
ThrowHelpers.CheckNullOrWhiteSpace(apiKey);

Auth = new() { { Constants.GrpcApiKey, apiKey } };
Channel = GrpcChannel.ForAddress($"https://{host}");
Metadata = new Metadata().WithPineconeProps(apiKey);
Channel = GrpcChannel.ForAddress($"dns:///{host}", new()
{
Credentials = ChannelCredentials.SecureSsl,
#if NET6_0_OR_GREATER
DisposeHttpClient = true,
HttpHandler = new SocketsHttpHandler { EnableMultipleHttp2Connections = true },
#endif
ServiceConfig = new() { LoadBalancingConfigs = { new RoundRobinConfig() } },
LoggerFactory = loggerFactory
});
Grpc = new(Channel);
}

public static GrpcTransport Create(string host, string apiKey) => new(host, apiKey);
public static GrpcTransport Create(string host, string apiKey, ILoggerFactory? loggerFactory) => new(host, apiKey, loggerFactory);

public async Task<IndexStats> DescribeStats(MetadataMap? filter = null, CancellationToken ct = default)
{
Expand All @@ -32,7 +40,7 @@ public async Task<IndexStats> DescribeStats(MetadataMap? filter = null, Cancella
request.Filter = filter.ToProtoStruct();
}

using var call = Grpc.DescribeIndexStatsAsync(request, Auth, cancellationToken: ct);
using var call = Grpc.DescribeIndexStatsAsync(request, Metadata, cancellationToken: ct);

return (await call.ConfigureAwait(false)).ToPublicType();
}
Expand Down Expand Up @@ -68,11 +76,11 @@ public async Task<ScoredVector[]> Query(
}
else
{
ThrowHelper.ThrowArgumentException(
"At least one of the following parameters must be non-null: id, values, sparseValues");
ThrowHelpers.ArgumentException(
"At least one of the following parameters must be non-null: id, values, sparseValues.");
}

using var call = Grpc.QueryAsync(request, Auth, cancellationToken: ct);
using var call = Grpc.QueryAsync(request, Metadata, cancellationToken: ct);
var response = await call.ConfigureAwait(false);

var matches = response.Matches;
Expand All @@ -90,7 +98,7 @@ public async Task<uint> Upsert(IEnumerable<Vector> vectors, string? indexNamespa
var request = new UpsertRequest { Namespace = indexNamespace ?? "" };
request.Vectors.AddRange(vectors.Select(v => v.ToProtoVector()));

using var call = Grpc.UpsertAsync(request, Auth, cancellationToken: ct);
using var call = Grpc.UpsertAsync(request, Metadata, cancellationToken: ct);

return (await call.ConfigureAwait(false)).UpsertedCount;
}
Expand All @@ -113,8 +121,8 @@ public async Task Update(
{
if (values is null && sparseValues is null && metadata is null)
{
ThrowHelper.ThrowArgumentException(
"At least one of the following parameters must be non-null: values, sparseValues, metadata");
ThrowHelpers.ArgumentException(
"At least one of the following parameters must be non-null: values, sparseValues, metadata.");
}

var request = new UpdateRequest
Expand All @@ -126,7 +134,7 @@ public async Task Update(
};
request.Values.OverwriteWith(values);

using var call = Grpc.UpdateAsync(request, Auth, cancellationToken: ct);
using var call = Grpc.UpdateAsync(request, Metadata, cancellationToken: ct);
_ = await call.ConfigureAwait(false);
}

Expand All @@ -139,7 +147,7 @@ public async Task<Dictionary<string, Vector>> Fetch(
Namespace = indexNamespace ?? ""
};

using var call = Grpc.FetchAsync(request, Auth, cancellationToken: ct);
using var call = Grpc.FetchAsync(request, Metadata, cancellationToken: ct);
var response = await call.ConfigureAwait(false);

return response.Vectors.ToDictionary(
Expand Down Expand Up @@ -168,7 +176,7 @@ public Task DeleteAll(string? indexNamespace = null, CancellationToken ct = defa

private async Task Delete(DeleteRequest request, CancellationToken ct)
{
using var call = Grpc.DeleteAsync(request, Auth, cancellationToken: ct);
using var call = Grpc.DeleteAsync(request, Metadata, cancellationToken: ct);
_ = await call.ConfigureAwait(false);
}

Expand Down
5 changes: 3 additions & 2 deletions src/ITransport.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Diagnostics.CodeAnalysis;
using Microsoft.Extensions.Logging;
using Pinecone.Grpc;
using Pinecone.Rest;

Expand All @@ -12,9 +13,9 @@ public interface ITransport<
#endif
{
#if NET7_0_OR_GREATER
static abstract T Create(string host, string apiKey);
static abstract T Create(string host, string apiKey, ILoggerFactory? loggerFactory = null);
#elif NET6_0
static T Create(string host, string apiKey) => PineconeClient.CreateTransport<T>(host, apiKey);
static T Create(string host, string apiKey, ILoggerFactory? loggerFactory = null) => PineconeClient.CreateTransport<T>(host, apiKey, loggerFactory);
#endif

Task<IndexStats> DescribeStats(MetadataMap? filter = null, CancellationToken ct = default);
Expand Down
Loading

0 comments on commit d462352

Please sign in to comment.