Skip to content

Commit

Permalink
[taucorder] connect-rpc service #274 (#275)
Browse files Browse the repository at this point in the history
  • Loading branch information
samyfodil authored Dec 9, 2024
1 parent 1d84a0a commit 9de33d7
Show file tree
Hide file tree
Showing 135 changed files with 16,319 additions and 181 deletions.
29 changes: 19 additions & 10 deletions clients/http/dream/status.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,34 @@
package http

import "github.com/taubyte/tau/dream/api"
import (
"fmt"

type Status map[string]UniverseStatus
"github.com/taubyte/tau/dream"
"github.com/taubyte/tau/dream/api"
)

type UniverseStatus struct {
NodeCount int `json:"node-count"`
Nodes map[string][]string
func (c *Client) Status() (resp dream.Status, err error) {
err = c.get("/status", &resp)
if err != nil {
return
}
return
}

func (c *Client) Status() (Status, error) {
resp := make(Status)
err := c.get("/status", &resp)
func (u *Universe) Status() (resp *dream.UniverseStatus, err error) {
s, err := u.client.Status()
if err != nil {
return nil, err
}

return resp, nil
if us, ok := s[u.Name]; ok {
return &us, nil
}

return nil, fmt.Errorf("universe `%s` not found", u.Name)
}

func (u *Universe) Status() (resp api.Echart, err error) {
func (u *Universe) Chart() (resp api.Echart, err error) {
err = u.client.get("/les/miserables/"+u.Name, &resp)
return
}
Expand Down
2 changes: 1 addition & 1 deletion clients/http/dream/universes.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package http

type UniverseInfo struct {
SwarmKey []byte `json:"swarm"`
SwarmKey []byte `json:"swarm-key"`
NodeCount int `json:"node-count"`
}

Expand Down
40 changes: 33 additions & 7 deletions clients/p2p/auth/certificates.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,32 @@ func (c *Client) InjectKey(domain string, data []byte) error {
return nil
}

// Getting from /acme
func (c *Client) GetCertificate(domain string) ([]byte, error) {
func (c *Client) GetRawCertificate(domain string) ([]byte, error) {
resp, err := c.client.Send("acme", command.Body{"action": "get", "fqdn": domain}, c.peers...)
if err != nil {
return nil, fmt.Errorf("failed get certificate for %s with %v", domain, err)
}

return maps.ByteArray(resp, "certificate")
certData, err := maps.ByteArray(resp, "certificate")
if err != nil {
return nil, fmt.Errorf("failed finding certificate with %v", err)
}

return certData, nil
}

// Getting from /acme
func (c *Client) GetCertificate(domain string) (*tls.Certificate, error) {
certData, err := c.GetRawCertificate(domain)
if err != nil {
return nil, err
}

return decodeX509(certData)
}

// Getting from /static
func (c *Client) GetStaticCertificate(domain string) (*tls.Certificate, error) {
func (c *Client) GetRawStaticCertificate(domain string) ([]byte, error) {
var err error
if !strings.Contains(strings.Trim(domain, "."), ".") {
return nil, errors.New("acme/autocert: server name component count invalid")
Expand All @@ -58,7 +72,20 @@ func (c *Client) GetStaticCertificate(domain string) (*tls.Certificate, error) {
return nil, fmt.Errorf("failed finding certificate with %v", err)
}

cert := &tls.Certificate{}
return certData, nil
}

func (c *Client) GetStaticCertificate(domain string) (*tls.Certificate, error) {
certData, err := c.GetRawStaticCertificate(domain)
if err != nil {
return nil, err
}

return decodeX509(certData)
}

func decodeX509(certData []byte) (cert *tls.Certificate, err error) {
cert = &tls.Certificate{}
for {
block, rest := pem.Decode(certData)
if block == nil {
Expand All @@ -77,6 +104,5 @@ func (c *Client) GetStaticCertificate(domain string) (*tls.Certificate, error) {
}
certData = rest
}

return cert, nil
return
}
6 changes: 6 additions & 0 deletions clients/p2p/auth/projects.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func (p *Projects) New(obj map[string]interface{}) *iface.Project {
if err != nil {
return nil
}

configID, _ := maps.Int(obj, "config")
codeID, _ := maps.Int(obj, "code")
prj.Git.Config = &GithubRepository{
Expand All @@ -41,6 +42,11 @@ func (p *Projects) New(obj map[string]interface{}) *iface.Project {
return nil
}

prj.Provider, err = maps.String(obj, "provider")
if err != nil {
prj.Provider = "github"
}

return &prj
}

Expand Down
4 changes: 2 additions & 2 deletions clients/p2p/hoarder/stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"github.com/taubyte/tau/p2p/streams/command/response"
)

func (c *Client) Stash(cid string) (response.Response, error) {
func (c *Client) Stash(cid string, peers ...string) (response.Response, error) {
// sends to signal a peer and tells them to stash the cid
resp, err := c.Send("hoarder", command.Body{"cid": cid, "action": "stash"}, c.peers...)
resp, err := c.Send("hoarder", command.Body{"cid": cid, "peers": peers, "action": "stash"}, c.peers...)
if err != nil {
logger.Errorf("Failed stashing cid %s with: %s", cid, err.Error())
return nil, fmt.Errorf("failed calling send with: %w", err)
Expand Down
20 changes: 15 additions & 5 deletions clients/p2p/patrick/mock/patrick_struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mock

import (
"context"
"errors"
"fmt"
"testing"

Expand Down Expand Up @@ -65,8 +66,10 @@ func (s *Starfish) IsLocked(jid string) (bool, error) {

func (s *Starfish) Done(jid string, cid_log map[string]string, assetCid map[string]string) error {
job := s.Jobs[jid]
job.Logs = cid_log
job.Status = patrick.JobStatusSuccess
if job != nil {
job.Logs = cid_log
job.Status = patrick.JobStatusSuccess
}
return nil
}

Expand All @@ -79,12 +82,19 @@ func (s *Starfish) Failed(jid string, cid_log map[string]string, assetCid map[st

// added to satisfy the patrick interface
func (s *Starfish) Get(jid string) (*patrick.Job, error) {
return nil, fmt.Errorf("get not implemented")
job, ok := s.Jobs[jid]
if !ok {
return nil, errors.New("job not found")
}
return job, nil
}

// added to satisfy the patrick interface
func (s *Starfish) List() ([]string, error) {
return nil, fmt.Errorf("list not implemented")
func (s *Starfish) List() (ret []string, err error) {
for k := range s.Jobs {
ret = append(ret, k)
}
return
}

// added to satisfy the patrick interface
Expand Down
11 changes: 8 additions & 3 deletions clients/p2p/seer/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,17 @@ func (u *Usage) Get(id string) (*iface.UsageReturn, error) {
return usage, nil
}

func (u *Usage) ListServiceId(name string) (response.Response, error) {
func (u *Usage) ListServiceId(name string) ([]string, error) {
resp, err := u.client.Send("heartbeat", command.Body{"action": "listService", "name": name})
if err != nil {
logger.Error(fmt.Sprintf("List Specific for %s failed with: %s", name, err.Error()))
return nil, fmt.Errorf("calling heartbeat listService send failed with: %s", err)
return nil, fmt.Errorf("calling heartbeat listService send failed with: %w", err)
}

return resp, nil
ret, err := maps.StringArray(resp, "ids")
if err != nil {
return nil, fmt.Errorf("calling heartbeat listService failed with: %w", err)
}

return ret, nil
}
8 changes: 3 additions & 5 deletions clients/p2p/seer/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ import (
)

func TestHeartBeat(t *testing.T) {
defaultInterval := seerClient.DefaultUsageBeaconInterval
seerClient.DefaultUsageBeaconInterval = time.Second
defer func() {
seerClient.DefaultUsageBeaconInterval = defaultInterval
}()
seerClient.DefaultUsageBeaconInterval = 100 * time.Millisecond
seerClient.DefaultAnnounceBeaconInterval = 100 * time.Millisecond
seerClient.DefaultGeoBeaconInterval = 100 * time.Millisecond

u := dream.New(dream.UniverseConfig{Name: t.Name()})
defer u.Stop()
Expand Down
27 changes: 7 additions & 20 deletions clients/p2p/seer/tests/p2p_calls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@ import (
_ "github.com/taubyte/tau/services/substrate"
_ "github.com/taubyte/tau/services/tns"

seer "github.com/taubyte/tau/clients/p2p/seer"
seerClient "github.com/taubyte/tau/clients/p2p/seer"
)

func TestCalls(t *testing.T) {
defaultInterval := seer.DefaultUsageBeaconInterval
seer.DefaultUsageBeaconInterval = time.Millisecond * 100
defer func() {
seer.DefaultUsageBeaconInterval = defaultInterval
}()
seerClient.DefaultUsageBeaconInterval = 100 * time.Millisecond
seerClient.DefaultAnnounceBeaconInterval = 100 * time.Millisecond
seerClient.DefaultGeoBeaconInterval = 100 * time.Millisecond

u := dream.New(dream.UniverseConfig{Name: t.Name()})
defer u.Stop()
Expand Down Expand Up @@ -60,27 +58,16 @@ func TestCalls(t *testing.T) {
return
}

ids, err := seerClient.Usage().ListServiceId("auth")
serviceIds, err := seerClient.Usage().ListServiceId("auth")
if err != nil {
t.Error(err)
return
}

serviceIds, err := ids.Get("ids")
if err != nil {
t.Error(err)
return
}
fmt.Println("serviceIds: ", serviceIds)

serviceIds2, ok := serviceIds.([]interface{})
if !ok {
t.Errorf("serviceIds %#v is not []interface{}", nil)
return
}

if len(serviceIds2) != 2 {
t.Errorf("Expected 2 nodes got %d", len(serviceIds2))
if len(serviceIds) != 2 {
t.Errorf("Expected 2 nodes got %d", len(serviceIds))
}

}
3 changes: 0 additions & 3 deletions clients/p2p/seer/usage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,6 @@ func (u *UsageBeacon) Start() {
go func() {
var err error

// First update as soon as we start
time.Sleep(3 * time.Second)

_, err = u.usage.updateUsage(u.hostname, u.nodeId, u.clientNodeId, u.signature)
if err != nil {
u._status <- err
Expand Down
33 changes: 29 additions & 4 deletions clients/p2p/tns/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tns

import (
"context"
"errors"
"fmt"

"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -43,16 +44,40 @@ func (c *Client) Close() {
}

/****** LIST *******/
func (c *Client) List(depth int) ([]string, error) {
func (c *Client) List(depth int) ([][]string, error) {
response, err := c.client.Send("list", command.Body{"depth": depth}, c.peers...)
if err != nil {
logger.Error(err)
return nil, err
}

keys, err := maps.StringArray(response, "keys")
if err != nil {
return nil, fmt.Errorf("failed string array in list with error: %v", err)
keysIface, ok := response["keys"]
if !ok {
return nil, errors.New("no keys found")
}

// TODO: Use generics so streams client Do() can unmarshal directly to needed type
keysCont, ok := keysIface.([]any)
if !ok {
return nil, errors.New("returned keys have wrong type")
}

keys := make([][]string, 0, len(keysCont))
for _, k := range keysCont {
kc, ok := k.([]any)
if !ok {
return nil, errors.New("returned key have wrong type")
}

key := make([]string, 0, len(kc))
for _, vc := range kc {
v, ok := vc.(string)
if !ok {
return nil, errors.New("returned leaf have wrong type")
}
key = append(key, v)
}
keys = append(keys, key)
}

return keys, nil
Expand Down
3 changes: 3 additions & 0 deletions core/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/taubyte/tau/core/kvdb"
seerIface "github.com/taubyte/tau/core/services/seer"
)

type CommonConfig struct {
Expand All @@ -20,6 +21,7 @@ type ServiceConfig struct {
PrivateKey []byte
SwarmKey []byte
Databases kvdb.Factory
Location seerIface.Location
}

type SimpleConfig struct {
Expand All @@ -35,6 +37,7 @@ func (c *ServiceConfig) Clone() *ServiceConfig {
PrivateKey: c.PrivateKey,
PublicKey: c.PublicKey,
SwarmKey: c.SwarmKey,
Location: c.Location,
}

for key, value := range c.Others {
Expand Down
6 changes: 4 additions & 2 deletions core/services/auth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (

type Client interface {
InjectStaticCertificate(domain string, data []byte) error
GetCertificate(domain string) ([]byte, error)
GetCertificate(domain string) (*tls.Certificate, error)
GetStaticCertificate(domain string) (*tls.Certificate, error)
GetRawCertificate(domain string) ([]byte, error)
GetRawStaticCertificate(domain string) ([]byte, error)
Hooks() Hooks
Projects() Projects
Repositories() Repositories
Stats() Stats
Stats() Stats // TODO: rename State
Peers(...peerCore.ID) Client
Close()
}
Expand Down
2 changes: 1 addition & 1 deletion core/services/hoarder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

type Client interface {
Rare() ([]string, error)
Stash(cid string) (response.Response, error)
Stash(cid string, peers ...string) (response.Response, error)
List() ([]string, error)
Peers(...peerCore.ID) Client
Close()
Expand Down
Loading

0 comments on commit 9de33d7

Please sign in to comment.