-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommand.go
123 lines (103 loc) · 3.01 KB
/
command.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package main
import (
"bufio"
"bytes"
"fmt"
"io"
"os"
"os/exec"
"strings"
"syscall"
"text/template"
"github.com/nsqio/go-nsq"
)
// byLineCopy reads data from the pipe, prepends every line with a given prefix string
// and writes results to the Writer (e.g. stdout/stderr/file).
func byLineCopy(prefix string, sink io.Writer, pipe io.Reader) {
wg.Add(1)
defer wg.Done()
prefix += " "
buf := []byte(prefix)
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
buf := buf[:len(prefix)]
buf = append(buf, scanner.Bytes()...)
buf = append(buf, '\n')
// this is safe to write a single line to stdout/stderr without
// additional locking from multiple goroutines as OS guarantees
// those writes are atomic (for stdout/stderr only)
sink.Write(buf)
}
if err := scanner.Err(); err != nil {
switch err.(type) {
case *os.PathError:
default:
Logger.Println("string scanner error:", err)
}
}
}
// executeCommand takes a command template, a payload from the NSQ message,
// merges them and trying to execute a resulting command.
//
// A payload data is also passed to the stdin of the executing command.
func executeCommand(cmdTpl, prefix string, data map[string]interface{}, msg *nsq.Message, envs []string) (int, error) {
t := template.Must(template.New("cmd").Parse(cmdTpl))
var cmdB bytes.Buffer
if err := t.Execute(&cmdB, data); err != nil {
return -1, err
}
cmdArgs := strings.Fields(cmdB.String())
cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM}
cmd.Stdin = bytes.NewReader(append(msg.Body, '\n'))
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
return -1, fmt.Errorf("cannot create stdout pipe: %s", err)
}
stderrPipe, err := cmd.StderrPipe()
if err != nil {
return -1, fmt.Errorf("cannot create stderr pipe: %s", err)
}
go byLineCopy(prefix, os.Stdout, stdoutPipe)
go byLineCopy(prefix, os.Stderr, stderrPipe)
cmd.Env = append(os.Environ(), envs...)
if err := cmd.Start(); err != nil {
return -1, err
}
if err := cmd.Wait(); err != nil {
var exitCode int
if exiterr, ok := err.(*exec.ExitError); ok {
status := exiterr.Sys().(syscall.WaitStatus)
switch {
case status.Exited():
exitCode = status.ExitStatus()
case status.Signaled():
exitCode = 128 + int(status.Signal())
}
if exitCode == 100 {
return exitCode, NewRequeueError()
}
}
return exitCode, err
}
return 0, nil
}
// RequeueError means that the worker command should be restarted
// if it allows the maximum number of attempts specified for this topic.
type RequeueError struct {
}
// NewRequeueError returns a new instance of error.
func NewRequeueError() error {
return &RequeueError{}
}
// Error returns a string representation of error.
func (e *RequeueError) Error() string {
return fmt.Sprintf("re-queue needed")
}
// IsRequeueError returns a boolean indicating whether the type of error is RequeueError.
func IsRequeueError(err error) bool {
if _, ok := err.(*RequeueError); ok {
return true
}
return false
}