diff --git a/integration-tests/smoke/capabilities/amd64_capproxy b/integration-tests/smoke/capabilities/amd64_capproxy new file mode 100755 index 00000000000..73d7d56204b Binary files /dev/null and b/integration-tests/smoke/capabilities/amd64_capproxy differ diff --git a/integration-tests/smoke/capabilities/environment_proxycap.toml b/integration-tests/smoke/capabilities/environment_proxycap.toml new file mode 100644 index 00000000000..0b1d885d296 --- /dev/null +++ b/integration-tests/smoke/capabilities/environment_proxycap.toml @@ -0,0 +1,128 @@ +[blockchain_a] +type = "anvil" +docker_cmd_params = ["-b", "5"] + +[jd] +# change to your version +image = "localhost:5000/job-distributor:sha-707362c-amd64" + +[workflow_config] +don_id = 1 +workflow_name = "abcdefgasd" +# without 0x prefix! +feed_id = "018bfe8840700040000000000000000000000000000000000000000000000000" + +use_cre_cli = true +should_compile_new_workflow = false +workflow_folder_location = "path-to-folder-with-main.go-of-your-workflow" + +[workflow_config.dependencies] +capabilities_version = "v1.0.0-alpha" +cre_cli_version = "v0.0.2" + +[workflow_config.compiled_config] +binary_url = "https://gist.githubusercontent.com/Tofel/8a39af5b68c213d2200446c175b5c99e/raw/cb7b2a56b37e333fe0bdce07b79538c4ce332f5f/binary.wasm.br" +config_url = "https://gist.githubusercontent.com/Tofel/19c80e6297914a79449f916e5e65dfdd/raw/1344c259ef7e970dbabaa1e9e885845b8eba5da9/config.json3674692696" + +[nodeset] +nodes = 5 +override_mode = "each" + +[nodeset.db] +image = "postgres:15.6" + +[[nodeset.node_specs]] + +[nodeset.node_specs.node] +custom_ports = ["13300:3456"] +# docker_ctx = "../../.." +# docker_file = "plugins/chainlink.Dockerfile" +image = "localhost:5000/chainlink:develop" +user_config_overrides = """ + [Feature] + LogPoller = true + + [OCR2] + Enabled = true + DatabaseTimeout = '1s' + + [P2P.V2] + Enabled = true + ListenAddresses = ['0.0.0.0:5001'] + """ + +[[nodeset.node_specs]] + +[nodeset.node_specs.node] +custom_ports = ["13301:3456"] +capabilities = [ "./amd64_capproxy"] +image = "localhost:5000/chainlink:develop" +user_config_overrides = """ + [Feature] + LogPoller = true + + [OCR2] + Enabled = true + DatabaseTimeout = '1s' + + [P2P.V2] + Enabled = true + ListenAddresses = ['0.0.0.0:5001'] + """ + +[[nodeset.node_specs]] + +[nodeset.node_specs.node] +custom_ports = ["13302:3456"] +capabilities = [ "./amd64_capproxy"] +image = "localhost:5000/chainlink:develop" +user_config_overrides = """ + [Feature] + LogPoller = true + + [OCR2] + Enabled = true + DatabaseTimeout = '1s' + + [P2P.V2] + Enabled = true + ListenAddresses = ['0.0.0.0:5001'] + """ + +[[nodeset.node_specs]] + +[nodeset.node_specs.node] +custom_ports = ["13303:3456"] +capabilities = [ "./amd64_capproxy"] +image = "localhost:5000/chainlink:develop" +user_config_overrides = """ + [Feature] + LogPoller = true + + [OCR2] + Enabled = true + DatabaseTimeout = '1s' + + [P2P.V2] + Enabled = true + ListenAddresses = ['0.0.0.0:5001'] + """ + +[[nodeset.node_specs]] + +[nodeset.node_specs.node] +custom_ports = ["13304:3456"] +capabilities = [ "./amd64_capproxy"] +image = "localhost:5000/chainlink:develop" +user_config_overrides = """ + [Feature] + LogPoller = true + + [OCR2] + Enabled = true + DatabaseTimeout = '1s' + + [P2P.V2] + Enabled = true + ListenAddresses = ['0.0.0.0:5001'] + """ diff --git a/integration-tests/smoke/capabilities/proxy.pb.go b/integration-tests/smoke/capabilities/proxy.pb.go new file mode 100644 index 00000000000..b604e43053a --- /dev/null +++ b/integration-tests/smoke/capabilities/proxy.pb.go @@ -0,0 +1,649 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v4.24.3 +// source: proxy.proto + +package capabilities + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + _ "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type CapabilityType int32 + +const ( + CapabilityType_Unknown CapabilityType = 0 + CapabilityType_Trigger CapabilityType = 1 + CapabilityType_Action CapabilityType = 2 + CapabilityType_Consensus CapabilityType = 3 + CapabilityType_Target CapabilityType = 4 +) + +// Enum value maps for CapabilityType. +var ( + CapabilityType_name = map[int32]string{ + 0: "Unknown", + 1: "Trigger", + 2: "Action", + 3: "Consensus", + 4: "Target", + } + CapabilityType_value = map[string]int32{ + "Unknown": 0, + "Trigger": 1, + "Action": 2, + "Consensus": 3, + "Target": 4, + } +) + +func (x CapabilityType) Enum() *CapabilityType { + p := new(CapabilityType) + *p = x + return p +} + +func (x CapabilityType) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (CapabilityType) Descriptor() protoreflect.EnumDescriptor { + return file_proxy_proto_enumTypes[0].Descriptor() +} + +func (CapabilityType) Type() protoreflect.EnumType { + return &file_proxy_proto_enumTypes[0] +} + +func (x CapabilityType) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use CapabilityType.Descriptor instead. +func (CapabilityType) EnumDescriptor() ([]byte, []int) { + return file_proxy_proto_rawDescGZIP(), []int{0} +} + +type ListRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *ListRequest) Reset() { + *x = ListRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proxy_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListRequest) ProtoMessage() {} + +func (x *ListRequest) ProtoReflect() protoreflect.Message { + mi := &file_proxy_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListRequest.ProtoReflect.Descriptor instead. +func (*ListRequest) Descriptor() ([]byte, []int) { + return file_proxy_proto_rawDescGZIP(), []int{0} +} + +type ListResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + CapInfos []*CapabilityInfo `protobuf:"bytes,1,rep,name=capInfos,proto3" json:"capInfos,omitempty"` +} + +func (x *ListResponse) Reset() { + *x = ListResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proxy_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ListResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListResponse) ProtoMessage() {} + +func (x *ListResponse) ProtoReflect() protoreflect.Message { + mi := &file_proxy_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ListResponse.ProtoReflect.Descriptor instead. +func (*ListResponse) Descriptor() ([]byte, []int) { + return file_proxy_proto_rawDescGZIP(), []int{1} +} + +func (x *ListResponse) GetCapInfos() []*CapabilityInfo { + if x != nil { + return x.CapInfos + } + return nil +} + +type DON struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *DON) Reset() { + *x = DON{} + if protoimpl.UnsafeEnabled { + mi := &file_proxy_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DON) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DON) ProtoMessage() {} + +func (x *DON) ProtoReflect() protoreflect.Message { + mi := &file_proxy_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DON.ProtoReflect.Descriptor instead. +func (*DON) Descriptor() ([]byte, []int) { + return file_proxy_proto_rawDescGZIP(), []int{2} +} + +type CapabilityInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ProxyID string `protobuf:"bytes,1,opt,name=ProxyID,proto3" json:"ProxyID,omitempty"` + ID string `protobuf:"bytes,2,opt,name=ID,proto3" json:"ID,omitempty"` + CapabilityType CapabilityType `protobuf:"varint,3,opt,name=CapabilityType,proto3,enum=proxy.CapabilityType" json:"CapabilityType,omitempty"` + Description string `protobuf:"bytes,4,opt,name=Description,proto3" json:"Description,omitempty"` + DON *DON `protobuf:"bytes,5,opt,name=DON,proto3" json:"DON,omitempty"` + IsLocal bool `protobuf:"varint,6,opt,name=IsLocal,proto3" json:"IsLocal,omitempty"` +} + +func (x *CapabilityInfo) Reset() { + *x = CapabilityInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_proxy_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CapabilityInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CapabilityInfo) ProtoMessage() {} + +func (x *CapabilityInfo) ProtoReflect() protoreflect.Message { + mi := &file_proxy_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CapabilityInfo.ProtoReflect.Descriptor instead. +func (*CapabilityInfo) Descriptor() ([]byte, []int) { + return file_proxy_proto_rawDescGZIP(), []int{3} +} + +func (x *CapabilityInfo) GetProxyID() string { + if x != nil { + return x.ProxyID + } + return "" +} + +func (x *CapabilityInfo) GetID() string { + if x != nil { + return x.ID + } + return "" +} + +func (x *CapabilityInfo) GetCapabilityType() CapabilityType { + if x != nil { + return x.CapabilityType + } + return CapabilityType_Unknown +} + +func (x *CapabilityInfo) GetDescription() string { + if x != nil { + return x.Description + } + return "" +} + +func (x *CapabilityInfo) GetDON() *DON { + if x != nil { + return x.DON + } + return nil +} + +func (x *CapabilityInfo) GetIsLocal() bool { + if x != nil { + return x.IsLocal + } + return false +} + +type CreateTriggerResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ProxyID string `protobuf:"bytes,1,opt,name=ProxyID,proto3" json:"ProxyID,omitempty"` +} + +func (x *CreateTriggerResponse) Reset() { + *x = CreateTriggerResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proxy_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CreateTriggerResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CreateTriggerResponse) ProtoMessage() {} + +func (x *CreateTriggerResponse) ProtoReflect() protoreflect.Message { + mi := &file_proxy_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CreateTriggerResponse.ProtoReflect.Descriptor instead. +func (*CreateTriggerResponse) Descriptor() ([]byte, []int) { + return file_proxy_proto_rawDescGZIP(), []int{4} +} + +func (x *CreateTriggerResponse) GetProxyID() string { + if x != nil { + return x.ProxyID + } + return "" +} + +type SendTriggerRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ProxyID string `protobuf:"bytes,1,opt,name=ProxyID,proto3" json:"ProxyID,omitempty"` + EventID string `protobuf:"bytes,2,opt,name=EventID,proto3" json:"EventID,omitempty"` + Payload []byte `protobuf:"bytes,3,opt,name=Payload,proto3" json:"Payload,omitempty"` +} + +func (x *SendTriggerRequest) Reset() { + *x = SendTriggerRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proxy_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SendTriggerRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SendTriggerRequest) ProtoMessage() {} + +func (x *SendTriggerRequest) ProtoReflect() protoreflect.Message { + mi := &file_proxy_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SendTriggerRequest.ProtoReflect.Descriptor instead. +func (*SendTriggerRequest) Descriptor() ([]byte, []int) { + return file_proxy_proto_rawDescGZIP(), []int{5} +} + +func (x *SendTriggerRequest) GetProxyID() string { + if x != nil { + return x.ProxyID + } + return "" +} + +func (x *SendTriggerRequest) GetEventID() string { + if x != nil { + return x.EventID + } + return "" +} + +func (x *SendTriggerRequest) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +type SendTriggerResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *SendTriggerResponse) Reset() { + *x = SendTriggerResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proxy_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SendTriggerResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SendTriggerResponse) ProtoMessage() {} + +func (x *SendTriggerResponse) ProtoReflect() protoreflect.Message { + mi := &file_proxy_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SendTriggerResponse.ProtoReflect.Descriptor instead. +func (*SendTriggerResponse) Descriptor() ([]byte, []int) { + return file_proxy_proto_rawDescGZIP(), []int{6} +} + +var File_proxy_proto protoreflect.FileDescriptor + +var file_proxy_proto_rawDesc = []byte{ + 0x0a, 0x0b, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x70, + 0x72, 0x6f, 0x78, 0x79, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x22, 0x0d, 0x0a, 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x22, 0x41, 0x0a, 0x0c, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x31, 0x0a, 0x08, 0x63, 0x61, 0x70, 0x49, 0x6e, 0x66, 0x6f, 0x73, 0x18, 0x01, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x43, 0x61, 0x70, 0x61, 0x62, + 0x69, 0x6c, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x08, 0x63, 0x61, 0x70, 0x49, 0x6e, + 0x66, 0x6f, 0x73, 0x22, 0x05, 0x0a, 0x03, 0x44, 0x4f, 0x4e, 0x22, 0xd3, 0x01, 0x0a, 0x0e, 0x43, + 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x18, 0x0a, + 0x07, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, + 0x50, 0x72, 0x6f, 0x78, 0x79, 0x49, 0x44, 0x12, 0x0e, 0x0a, 0x02, 0x49, 0x44, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x02, 0x49, 0x44, 0x12, 0x3d, 0x0a, 0x0e, 0x43, 0x61, 0x70, 0x61, 0x62, + 0x69, 0x6c, 0x69, 0x74, 0x79, 0x54, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, + 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x43, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, + 0x74, 0x79, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0e, 0x43, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, + 0x74, 0x79, 0x54, 0x79, 0x70, 0x65, 0x12, 0x20, 0x0a, 0x0b, 0x44, 0x65, 0x73, 0x63, 0x72, 0x69, + 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x44, 0x65, 0x73, + 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1c, 0x0a, 0x03, 0x44, 0x4f, 0x4e, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x44, 0x4f, + 0x4e, 0x52, 0x03, 0x44, 0x4f, 0x4e, 0x12, 0x18, 0x0a, 0x07, 0x49, 0x73, 0x4c, 0x6f, 0x63, 0x61, + 0x6c, 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x49, 0x73, 0x4c, 0x6f, 0x63, 0x61, 0x6c, + 0x22, 0x31, 0x0a, 0x15, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, + 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x50, 0x72, 0x6f, + 0x78, 0x79, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x50, 0x72, 0x6f, 0x78, + 0x79, 0x49, 0x44, 0x22, 0x62, 0x0a, 0x12, 0x53, 0x65, 0x6e, 0x64, 0x54, 0x72, 0x69, 0x67, 0x67, + 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x50, 0x72, 0x6f, + 0x78, 0x79, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x50, 0x72, 0x6f, 0x78, + 0x79, 0x49, 0x44, 0x12, 0x18, 0x0a, 0x07, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x44, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x44, 0x12, 0x18, 0x0a, + 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, + 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x15, 0x0a, 0x13, 0x53, 0x65, 0x6e, 0x64, 0x54, + 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2a, 0x51, + 0x0a, 0x0e, 0x43, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x79, 0x54, 0x79, 0x70, 0x65, + 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x6e, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x10, 0x00, 0x12, 0x0b, 0x0a, + 0x07, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x10, 0x02, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x73, 0x65, 0x6e, + 0x73, 0x75, 0x73, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x10, + 0x04, 0x32, 0xca, 0x01, 0x0a, 0x05, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x12, 0x31, 0x0a, 0x04, 0x4c, + 0x69, 0x73, 0x74, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x4c, 0x69, 0x73, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, + 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x46, + 0x0a, 0x0d, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x12, + 0x15, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x43, 0x61, 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, + 0x74, 0x79, 0x49, 0x6e, 0x66, 0x6f, 0x1a, 0x1c, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x43, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x46, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x54, 0x72, + 0x69, 0x67, 0x67, 0x65, 0x72, 0x12, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x53, 0x65, + 0x6e, 0x64, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x54, 0x72, 0x69, + 0x67, 0x67, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x44, + 0x5a, 0x42, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x6d, 0x61, + 0x72, 0x74, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x6b, 0x69, 0x74, 0x2f, 0x63, 0x61, + 0x70, 0x61, 0x62, 0x69, 0x6c, 0x69, 0x74, 0x69, 0x65, 0x73, 0x2f, 0x6c, 0x6f, 0x61, 0x64, 0x74, + 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, + 0x6c, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proxy_proto_rawDescOnce sync.Once + file_proxy_proto_rawDescData = file_proxy_proto_rawDesc +) + +func file_proxy_proto_rawDescGZIP() []byte { + file_proxy_proto_rawDescOnce.Do(func() { + file_proxy_proto_rawDescData = protoimpl.X.CompressGZIP(file_proxy_proto_rawDescData) + }) + return file_proxy_proto_rawDescData +} + +var file_proxy_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_proxy_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_proxy_proto_goTypes = []interface{}{ + (CapabilityType)(0), // 0: proxy.CapabilityType + (*ListRequest)(nil), // 1: proxy.ListRequest + (*ListResponse)(nil), // 2: proxy.ListResponse + (*DON)(nil), // 3: proxy.DON + (*CapabilityInfo)(nil), // 4: proxy.CapabilityInfo + (*CreateTriggerResponse)(nil), // 5: proxy.CreateTriggerResponse + (*SendTriggerRequest)(nil), // 6: proxy.SendTriggerRequest + (*SendTriggerResponse)(nil), // 7: proxy.SendTriggerResponse +} +var file_proxy_proto_depIdxs = []int32{ + 4, // 0: proxy.ListResponse.capInfos:type_name -> proxy.CapabilityInfo + 0, // 1: proxy.CapabilityInfo.CapabilityType:type_name -> proxy.CapabilityType + 3, // 2: proxy.CapabilityInfo.DON:type_name -> proxy.DON + 1, // 3: proxy.Proxy.List:input_type -> proxy.ListRequest + 4, // 4: proxy.Proxy.CreateTrigger:input_type -> proxy.CapabilityInfo + 6, // 5: proxy.Proxy.SendTrigger:input_type -> proxy.SendTriggerRequest + 2, // 6: proxy.Proxy.List:output_type -> proxy.ListResponse + 5, // 7: proxy.Proxy.CreateTrigger:output_type -> proxy.CreateTriggerResponse + 7, // 8: proxy.Proxy.SendTrigger:output_type -> proxy.SendTriggerResponse + 6, // [6:9] is the sub-list for method output_type + 3, // [3:6] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_proxy_proto_init() } +func file_proxy_proto_init() { + if File_proxy_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proxy_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proxy_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proxy_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DON); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proxy_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CapabilityInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proxy_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CreateTriggerResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proxy_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SendTriggerRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proxy_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SendTriggerResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proxy_proto_rawDesc, + NumEnums: 1, + NumMessages: 7, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_proxy_proto_goTypes, + DependencyIndexes: file_proxy_proto_depIdxs, + EnumInfos: file_proxy_proto_enumTypes, + MessageInfos: file_proxy_proto_msgTypes, + }.Build() + File_proxy_proto = out.File + file_proxy_proto_rawDesc = nil + file_proxy_proto_goTypes = nil + file_proxy_proto_depIdxs = nil +} diff --git a/integration-tests/smoke/capabilities/proxy.proto b/integration-tests/smoke/capabilities/proxy.proto new file mode 100644 index 00000000000..eb04994b9c3 --- /dev/null +++ b/integration-tests/smoke/capabilities/proxy.proto @@ -0,0 +1,47 @@ +syntax = "proto3"; +option go_package = "github.com/smartcontractkit/capabilities/loadtestproxy/internal/pb"; + +package capabilities; + +import "google/protobuf/empty.proto"; + + +service Proxy { + rpc List(ListRequest) returns (ListResponse){} + rpc CreateTrigger(CapabilityInfo) returns(CreateTriggerResponse) {} + rpc SendTrigger(SendTriggerRequest)returns(SendTriggerResponse) {} +} + +message ListRequest {} +message ListResponse { + repeated CapabilityInfo capInfos = 1; +} +enum CapabilityType { + Unknown = 0; + Trigger = 1; + Action = 2; + Consensus = 3; + Target = 4; +} +message DON{ + +} +message CapabilityInfo{ + string ProxyID = 1; + string ID = 2; + CapabilityType CapabilityType = 3; + string Description = 4; + DON DON = 5; + bool IsLocal = 6; +} + + +message CreateTriggerResponse{ + string ProxyID=1; +} +message SendTriggerRequest{ + string ProxyID=1; + string EventID=2; + bytes Payload=3; +} +message SendTriggerResponse{} diff --git a/integration-tests/smoke/capabilities/proxy_grpc.pb.go b/integration-tests/smoke/capabilities/proxy_grpc.pb.go new file mode 100644 index 00000000000..45e1140a3cc --- /dev/null +++ b/integration-tests/smoke/capabilities/proxy_grpc.pb.go @@ -0,0 +1,183 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.24.3 +// source: proxy.proto + +package capabilities + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + Proxy_List_FullMethodName = "/proxy.Proxy/List" + Proxy_CreateTrigger_FullMethodName = "/proxy.Proxy/CreateTrigger" + Proxy_SendTrigger_FullMethodName = "/proxy.Proxy/SendTrigger" +) + +// ProxyClient is the client API for Proxy service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type ProxyClient interface { + List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) + CreateTrigger(ctx context.Context, in *CapabilityInfo, opts ...grpc.CallOption) (*CreateTriggerResponse, error) + SendTrigger(ctx context.Context, in *SendTriggerRequest, opts ...grpc.CallOption) (*SendTriggerResponse, error) +} + +type proxyClient struct { + cc grpc.ClientConnInterface +} + +func NewProxyClient(cc grpc.ClientConnInterface) ProxyClient { + return &proxyClient{cc} +} + +func (c *proxyClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) { + out := new(ListResponse) + err := c.cc.Invoke(ctx, Proxy_List_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *proxyClient) CreateTrigger(ctx context.Context, in *CapabilityInfo, opts ...grpc.CallOption) (*CreateTriggerResponse, error) { + out := new(CreateTriggerResponse) + err := c.cc.Invoke(ctx, Proxy_CreateTrigger_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *proxyClient) SendTrigger(ctx context.Context, in *SendTriggerRequest, opts ...grpc.CallOption) (*SendTriggerResponse, error) { + out := new(SendTriggerResponse) + err := c.cc.Invoke(ctx, Proxy_SendTrigger_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// ProxyServer is the server API for Proxy service. +// All implementations must embed UnimplementedProxyServer +// for forward compatibility +type ProxyServer interface { + List(context.Context, *ListRequest) (*ListResponse, error) + CreateTrigger(context.Context, *CapabilityInfo) (*CreateTriggerResponse, error) + SendTrigger(context.Context, *SendTriggerRequest) (*SendTriggerResponse, error) + mustEmbedUnimplementedProxyServer() +} + +// UnimplementedProxyServer must be embedded to have forward compatible implementations. +type UnimplementedProxyServer struct { +} + +func (UnimplementedProxyServer) List(context.Context, *ListRequest) (*ListResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method List not implemented") +} +func (UnimplementedProxyServer) CreateTrigger(context.Context, *CapabilityInfo) (*CreateTriggerResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method CreateTrigger not implemented") +} +func (UnimplementedProxyServer) SendTrigger(context.Context, *SendTriggerRequest) (*SendTriggerResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SendTrigger not implemented") +} +func (UnimplementedProxyServer) mustEmbedUnimplementedProxyServer() {} + +// UnsafeProxyServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to ProxyServer will +// result in compilation errors. +type UnsafeProxyServer interface { + mustEmbedUnimplementedProxyServer() +} + +func RegisterProxyServer(s grpc.ServiceRegistrar, srv ProxyServer) { + s.RegisterService(&Proxy_ServiceDesc, srv) +} + +func _Proxy_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ListRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ProxyServer).List(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Proxy_List_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ProxyServer).List(ctx, req.(*ListRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Proxy_CreateTrigger_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CapabilityInfo) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ProxyServer).CreateTrigger(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Proxy_CreateTrigger_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ProxyServer).CreateTrigger(ctx, req.(*CapabilityInfo)) + } + return interceptor(ctx, in, info, handler) +} + +func _Proxy_SendTrigger_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SendTriggerRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ProxyServer).SendTrigger(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Proxy_SendTrigger_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ProxyServer).SendTrigger(ctx, req.(*SendTriggerRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Proxy_ServiceDesc is the grpc.ServiceDesc for Proxy service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Proxy_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "proxy.Proxy", + HandlerType: (*ProxyServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "List", + Handler: _Proxy_List_Handler, + }, + { + MethodName: "CreateTrigger", + Handler: _Proxy_CreateTrigger_Handler, + }, + { + MethodName: "SendTrigger", + Handler: _Proxy_SendTrigger_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "proxy.proto", +} diff --git a/integration-tests/smoke/capabilities/proxy_test.go b/integration-tests/smoke/capabilities/proxy_test.go new file mode 100644 index 00000000000..2f938651560 --- /dev/null +++ b/integration-tests/smoke/capabilities/proxy_test.go @@ -0,0 +1,267 @@ +package capabilities_test + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers/cron" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink-testing-framework/framework" + "github.com/smartcontractkit/chainlink-testing-framework/framework/components/blockchain" + ns "github.com/smartcontractkit/chainlink-testing-framework/framework/components/simple_node_set" + "github.com/smartcontractkit/chainlink-testing-framework/lib/utils/ptr" + "github.com/smartcontractkit/chainlink-testing-framework/seth" + "github.com/smartcontractkit/chainlink/integration-tests/smoke/capabilities" + + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/feeds_consumer" +) + +// Copy-paste of the OCR3 Workflow test, used for development of the proxy capability +func TestKeystoneWithOCR3WorkflowWithProxy(t *testing.T) { + testLogger := framework.L + + // we need to use double-pointers, so that what's captured in the cleanup function is a pointer, not the actual object, + // which is only set later in the test, after the cleanup function is defined + var nodes **ns.Output + var wsRPCURL *string + + // clean up is LIFO, so we need to make sure we execute the debug report transmission after logs are written down + // by function added to clean up by framework.Load() method + t.Cleanup(func() { + if t.Failed() { + if nodes == nil { + testLogger.Warn().Msg("nodeset output is nil, skipping debug report transmission") + return + } + printTestDebug(t, testLogger, *nodes, *wsRPCURL) + } + }) + + // Load test configuration + in, err := framework.Load[WorkflowTestConfig](t) + require.NoError(t, err, "couldn't load test config") + validateInputsAndEnvVars(t, in) + + pkey := os.Getenv("PRIVATE_KEY") + + // Create a new blockchain network and Seth client to interact with it + bc, err := blockchain.NewBlockchainNetwork(in.BlockchainA) + require.NoError(t, err) + + sc, err := seth.NewClientBuilder(). + WithRpcUrl(bc.Nodes[0].HostWSUrl). + WithPrivateKeys([]string{pkey}). + Build() + require.NoError(t, err, "failed to create seth client") + + // Start job distributor + jdOutput := startJobDistributor(t, in) + + // Deploy the DON + nodeOutput := startNodes(t, in, bc) + + // Prepare the chainlink/deployment environment + ctfEnv, don, chainSelector := buildChainlinkDeploymentEnv(t, jdOutput, nodeOutput, bc, sc) + + // Fund the nodes¡™¡ + fundNodes(t, don, sc) + + // Deploy keystone contracts (forwarder, capability registry, ocr3 capability) + keystoneContractSet := deployKeystoneContracts(t, testLogger, ctfEnv, chainSelector) + + // Deploy and pre-configure workflow registry contract + workflowRegistryAddr := prepareWorkflowRegistry(t, testLogger, ctfEnv, chainSelector, sc, in.WorkflowConfig.DonID) + + // Deploy and configure Keystone Feeds Consumer contract + feedsConsumerAddress := prepareFeedsConsumer(t, testLogger, ctfEnv, chainSelector, sc, keystoneContractSet.Forwarder.Address(), in.WorkflowConfig.WorkflowName) + + // Register the workflow (either via CRE CLI or by calling the workflow registry directly) + registerWorkflow(t, in, sc, keystoneContractSet.CapabilitiesRegistry.Address(), workflowRegistryAddr, feedsConsumerAddress, in.WorkflowConfig.DonID, chainSelector, in.WorkflowConfig.WorkflowName, pkey, bc.Nodes[0].HostHTTPUrl) + + // Create OCR3 and capability jobs for each node JD + ns, nodeClient := configureNodes(t, don, in, bc, keystoneContractSet.CapabilitiesRegistry.Address(), workflowRegistryAddr, keystoneContractSet.Forwarder.Address()) + // JD client needs to be reinitialised after restarting nodes + ctfEnv = ptr.Ptr(reinitialiseJDClient(t, ctfEnv, jdOutput, nodeOutput)) + createNodeJobsWithJd(t, ctfEnv, don, bc, keystoneContractSet) //TODO @george-dorin: Split this so we don't create the cron cap job + + // Log extra information that might help debugging + t.Cleanup(func() { + if t.Failed() { + logTestInfo(testLogger, in.WorkflowConfig.FeedID, in.WorkflowConfig.WorkflowName, feedsConsumerAddress.Hex(), keystoneContractSet.Forwarder.Address().Hex()) + } + }) + + //Start proxy cap job + //TODO @george-dorin: Switch to jd and don't create on bootstrap node + for i := range nodeClient { + _, _, err2 := nodeClient[i].CreateJobRaw(` + type = "standardcapabilities" + schemaVersion = 1 + externalJobID = "44fc1902-82ef-4cb6-b94f-03d7dc1617f2" + name = "proxy-capability" + forwardingAllowed = false + command = "/home/capabilities/amd64_capproxy" + config = "" + `) + require.NoError(t, err2) + } + + //TODO @george-dorin: FixME! + time.Sleep(time.Second * 10) + //We have hardcode the grpc server ports as 13300-13304 + //1. Connect to each grpc server + proxyClients := newCapProxyClient() + require.NoError(t, proxyClients.connectAll([]int{13301, 13302, 13303, 13304})) + //2. List all capabilities + for _, c := range proxyClients.Clients { + r, err2 := c.List(context.TODO(), &capabilities.ListRequest{}) + require.NoError(t, err2) + framework.L.Info().Msg(fmt.Sprintf("Cap-Proxy got List() response %+v", r.CapInfos)) + } + //3. Register as cron-trigger@1.0.0 + require.NoError(t, proxyClients.CreateTriggerCap("cron-trigger@1.0.0")) + + //4. Send data to all fake crons + payload := cron.Payload{ + ScheduledExecutionTime: time.Now().UTC().Format(time.RFC3339Nano), + ActualExecutionTime: time.Now().UTC().Format(time.RFC3339Nano), + } + wrappedPayload, err := values.WrapMap(payload) + + bytes, err := proto.Marshal(values.Proto(wrappedPayload)) + require.NoError(t, err) + require.NoError(t, proxyClients.SendTrigger("cron-trigger@1.0.0", uuid.New().String(), bytes)) + + // set variables that are needed for the cleanup function, which debugs report transmissions + nodes = &ns + wsRPCURL = &bc.Nodes[0].HostWSUrl + + // CAUTION: It is crucial to configure OCR3 jobs on nodes before configuring the workflow contracts. + // Wait for OCR listeners to be ready before setting the configuration. + // If the ConfigSet event is missed, OCR protocol will not start. + // TODO make it fluent! + testLogger.Info().Msg("Waiting 30s for OCR listeners to be ready...") + time.Sleep(30 * time.Second) + testLogger.Info().Msg("Proceeding to set OCR3 configuration.") + + // Configure the workflow DON and contracts + configureWorkflowDON(t, ctfEnv, don, chainSelector) + + // It can take a while before the first report is produced, particularly on CI. + timeout := 10 * time.Minute + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + feedsConsumerInstance, err := feeds_consumer.NewKeystoneFeedsConsumer(feedsConsumerAddress, sc.Client) + require.NoError(t, err, "failed to create feeds consumer instance") + + testLogger.Info().Msg("Waiting for feed to update...") + startTime := time.Now() + feedBytes := common.HexToHash(in.WorkflowConfig.FeedID) + + for { + select { + case <-ctx.Done(): + testLogger.Error().Msgf("feed did not update, timeout after %s", timeout) + t.FailNow() + case <-time.After(10 * time.Second): + //Send trigger message + payload = cron.Payload{ + ScheduledExecutionTime: time.Now().UTC().Format(time.RFC3339Nano), + ActualExecutionTime: time.Now().UTC().Format(time.RFC3339Nano), + } + wrappedPayload, err = values.WrapMap(payload) + + bytes, err = proto.Marshal(values.Proto(wrappedPayload)) + require.NoError(t, err) + require.NoError(t, proxyClients.SendTrigger("cron-trigger@1.0.0", uuid.New().String(), bytes)) + + elapsed := time.Since(startTime).Round(time.Second) + price, _, err := feedsConsumerInstance.GetPrice( + sc.NewCallOpts(), + feedBytes, + ) + require.NoError(t, err, "failed to get price from Keystone Consumer contract") + + if price.String() != "0" { + testLogger.Info().Msgf("Feed updated after %s - price set, price=%s", elapsed, price) + return + } + testLogger.Info().Msgf("Feed not updated yet, waiting for %s", elapsed) + } + } +} + +// TODO @george-dorin: Refactor! +type capProxy struct { + Clients []capabilities.ProxyClient + pID2CapID map[string][]string +} + +func newCapProxyClient() *capProxy { + return &capProxy{Clients: make([]capabilities.ProxyClient, 0), pID2CapID: make(map[string][]string, 0)} +} + +func (c *capProxy) connectAll(ports []int) error { + for _, p := range ports { + client, err := proxyConnectToOne(p) + if err != nil { + return err + } + c.Clients = append(c.Clients, client) + } + return nil +} + +func (c *capProxy) CreateTriggerCap(id string) error { + for _, client := range c.Clients { + r, err := client.CreateTrigger(context.TODO(), &capabilities.CapabilityInfo{ + ProxyID: "", + ID: id, + CapabilityType: capabilities.CapabilityType(1), + Description: "", + DON: nil, + IsLocal: true, + }) + if err != nil { + return err + } + c.pID2CapID[id] = append(c.pID2CapID[id], r.ProxyID) + } + return nil +} + +func (c *capProxy) SendTrigger(id string, eventID string, payload []byte) error { + for i, client := range c.Clients { + data := capabilities.SendTriggerRequest{ + ProxyID: c.pID2CapID[id][i], + EventID: eventID, + Payload: payload, + } + framework.L.Info().Msg(fmt.Sprintf("Sending trigger response to %s: %v+", c.pID2CapID[id][i], data)) + _, err := client.SendTrigger(context.TODO(), &data) + if err != nil { + return err + } + } + return nil +} + +func proxyConnectToOne(port int) (capabilities.ProxyClient, error) { + conn, err := grpc.NewClient(fmt.Sprintf("localhost:%d", port), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, err + } + client := capabilities.NewProxyClient(conn) //TODO @george-dorin: Move the proxy pb file + return client, nil +}