Skip to content

Commit

Permalink
- stdio client never was completed, remove
Browse files Browse the repository at this point in the history
- remove tcp mode, we either do over http or over ssh/stdio
  • Loading branch information
poolpOrg committed Feb 2, 2025
1 parent b08ecf1 commit 3858701
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 144 deletions.
21 changes: 14 additions & 7 deletions cmd/plakar/subcommands/stdio/stdio.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,29 @@ import (
"flag"

"github.com/PlakarKorp/plakar/appcontext"
"github.com/PlakarKorp/plakar/cmd/plakar/subcommands"
"github.com/PlakarKorp/plakar/repository"
"github.com/PlakarKorp/plakar/server/plakard"
)

func cmd_stdio(ctx *appcontext.AppContext, args []string) (int, error) {
_ = ctx
func init() {
subcommands.Register("stdio", cmd_stdio)
}

var noDelete bool
func cmd_stdio(ctx *appcontext.AppContext, _ *repository.Repository, args []string) (int, error) {
var opt_allowdelete bool

flags := flag.NewFlagSet("stdio", flag.ExitOnError)
flags.BoolVar(&noDelete, "no-delete", false, "disable delete operations")
flags.BoolVar(&opt_allowdelete, "allow-delete", false, "disable delete operations")
flags.Parse(args)

options := &plakard.ServerOptions{
NoDelete: noDelete,
noDelete := true
if opt_allowdelete {
noDelete = false
}
if err := plakard.Stdio(ctx, options); err != nil {
if err := plakard.Stdio(ctx, &plakard.ServerOptions{
NoDelete: noDelete,
}); err != nil {
return 1, err
}
return 0, nil
Expand Down
139 changes: 2 additions & 137 deletions storage/backends/plakard/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@ import (
"encoding/gob"
"fmt"
"io"
"log"
"net"
"net/url"
"os"
"os/exec"
"strings"

"github.com/PlakarKorp/plakar/network"
"github.com/PlakarKorp/plakar/objects"
Expand Down Expand Up @@ -69,125 +66,18 @@ func (repo *Repository) Location() string {
func (repository *Repository) connect(location *url.URL) error {
scheme := location.Scheme
switch scheme {
case "tcp":
err := repository.connectTCP(location)
if err != nil {
return err
}
case "ssh":
err := repository.connectSSH(location)
if err != nil {
return err
}
case "stdio":
err := repository.connectStdio()
if err != nil {
return err
}
default:
return fmt.Errorf("unsupported protocol: %s", scheme)
}

return nil
}

func (repository *Repository) connectTCP(location *url.URL) error {
port := location.Port()
if port == "" {
port = "9876"
}

tcpAddr, err := net.ResolveTCPAddr("tcp", location.Hostname()+":"+port)
if err != nil {
log.Fatal(err)
}

conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
log.Fatal(err)
}

repository.encoder = gob.NewEncoder(conn)
repository.decoder = gob.NewDecoder(conn)

repository.inflightRequests = make(map[uuid.UUID]chan network.Request)
repository.notifications = make(chan network.Request)

//repository.maxConcurrentRequest = make(chan bool, 1024)

go func() {
for m := range repository.notifications {
repository.mu.Lock()
notify := repository.inflightRequests[m.Uuid]
repository.mu.Unlock()
notify <- m
}
}()

go func() {
for {
result := network.Request{}
err = repository.decoder.Decode(&result)
if err != nil {
conn.Close()
return
}
repository.notifications <- result
}
}()

return err
}

func (repository *Repository) connectStdio() error {
subProcess := exec.Command("plakar", "stdio")

stdin, err := subProcess.StdinPipe()
if err != nil {
return err
}

stdout, err := subProcess.StdoutPipe()
if err != nil {
return err
}
subProcess.Stderr = os.Stderr

repository.encoder = gob.NewEncoder(stdin)
repository.decoder = gob.NewDecoder(stdout)

if err = subProcess.Start(); err != nil {
return err
}

repository.inflightRequests = make(map[uuid.UUID]chan network.Request)
repository.notifications = make(chan network.Request)

go func() {
for m := range repository.notifications {
repository.mu.Lock()
notify := repository.inflightRequests[m.Uuid]
repository.mu.Unlock()
notify <- m
}
}()

go func() {
for {
result := network.Request{}
err = repository.decoder.Decode(&result)
if err != nil {
stdin.Close()
subProcess.Wait()
return
}
repository.notifications <- result
}
}()

return nil
}

func (repository *Repository) connectSSH(location *url.URL) error {
connectUrl := "ssh://"
if location.User != nil {
Expand Down Expand Up @@ -280,21 +170,11 @@ func (repository *Repository) sendRequest(Type string, Payload interface{}) (*ne
}

func (repository *Repository) Create(location string, config storage.Configuration) error {
isTcp := false
if strings.HasPrefix(location, "tcp://") {
isTcp = true
location = "ssh://" + location[6:]
}

parsed, err := giturls.Parse(location)
if err != nil {
return err
}

if isTcp {
parsed.Scheme = "tcp"
}

err = repository.connect(parsed)
if err != nil {
return err
Expand All @@ -317,22 +197,11 @@ func (repository *Repository) Create(location string, config storage.Configurati
}

func (repository *Repository) Open(location string) error {

isTcp := false
if strings.HasPrefix(location, "tcp://") {
isTcp = true
location = "ssh://" + location[6:]
}

parsed, err := giturls.Parse(location)
if err != nil {
return err
}

if isTcp {
parsed.Scheme = "tcp"
}

err = repository.connect(parsed)
if err != nil {
return err
Expand Down Expand Up @@ -380,9 +249,7 @@ func (repository *Repository) GetStates() ([]objects.Checksum, error) {
}

ret := make([]objects.Checksum, len(result.Payload.(network.ResGetStates).Checksums))
for i, checksum := range result.Payload.(network.ResGetStates).Checksums {
ret[i] = checksum
}
copy(ret, result.Payload.(network.ResGetStates).Checksums)
return ret, nil
}

Expand Down Expand Up @@ -445,9 +312,7 @@ func (repository *Repository) GetPackfiles() ([]objects.Checksum, error) {
}

ret := make([]objects.Checksum, len(result.Payload.(network.ResGetPackfiles).Checksums))
for i, checksum := range result.Payload.(network.ResGetPackfiles).Checksums {
ret[i] = checksum
}
copy(ret, result.Payload.(network.ResGetPackfiles).Checksums)
return ret, nil
}

Expand Down

0 comments on commit 3858701

Please sign in to comment.