Initial main commit.
[src/xds/xds-server.git] / lib / common / execPipeWs.go
1 package common
2
3 import (
4         "bufio"
5         "fmt"
6         "io"
7         "os"
8         "time"
9
10         "syscall"
11
12         "github.com/Sirupsen/logrus"
13         "github.com/googollee/go-socket.io"
14 )
15
16 // EmitOutputCB is the function callback used to emit data
17 type EmitOutputCB func(sid string, cmdID int, stdout, stderr string)
18
19 // EmitExitCB is the function callback used to emit exit proc code
20 type EmitExitCB func(sid string, cmdID int, code int, err error)
21
22 // Inspired by :
23 // https://github.com/gorilla/websocket/blob/master/examples/command/main.go
24
25 // ExecPipeWs executes a command and redirect stdout/stderr into a WebSocket
26 func ExecPipeWs(cmd string, so *socketio.Socket, sid string, cmdID int,
27         cmdExecTimeout int, log *logrus.Logger, eoCB EmitOutputCB, eeCB EmitExitCB) error {
28
29         outr, outw, err := os.Pipe()
30         if err != nil {
31                 return fmt.Errorf("Pipe stdout error: " + err.Error())
32         }
33
34         // XXX - do we need to pipe stdin one day ?
35         inr, inw, err := os.Pipe()
36         if err != nil {
37                 outr.Close()
38                 outw.Close()
39                 return fmt.Errorf("Pipe stdin error: " + err.Error())
40         }
41
42         bashArgs := []string{"/bin/bash", "-c", cmd}
43         proc, err := os.StartProcess("/bin/bash", bashArgs, &os.ProcAttr{
44                 Files: []*os.File{inr, outw, outw},
45         })
46         if err != nil {
47                 outr.Close()
48                 outw.Close()
49                 inr.Close()
50                 inw.Close()
51                 return fmt.Errorf("Process start error: " + err.Error())
52         }
53
54         go func() {
55                 defer outr.Close()
56                 defer outw.Close()
57                 defer inr.Close()
58                 defer inw.Close()
59
60                 stdoutDone := make(chan struct{})
61                 go cmdPumpStdout(so, outr, stdoutDone, sid, cmdID, log, eoCB)
62
63                 // Blocking function that poll input or wait for end of process
64                 cmdPumpStdin(so, inw, proc, sid, cmdID, cmdExecTimeout, log, eeCB)
65
66                 // Some commands will exit when stdin is closed.
67                 inw.Close()
68
69                 defer outr.Close()
70
71                 if status, err := proc.Wait(); err == nil {
72                         // Other commands need a bonk on the head.
73                         if !status.Exited() {
74                                 if err := proc.Signal(os.Interrupt); err != nil {
75                                         log.Errorln("Proc interrupt:", err)
76                                 }
77
78                                 select {
79                                 case <-stdoutDone:
80                                 case <-time.After(time.Second):
81                                         // A bigger bonk on the head.
82                                         if err := proc.Signal(os.Kill); err != nil {
83                                                 log.Errorln("Proc term:", err)
84                                         }
85                                         <-stdoutDone
86                                 }
87                         }
88                 }
89         }()
90
91         return nil
92 }
93
94 func cmdPumpStdin(so *socketio.Socket, w io.Writer, proc *os.Process,
95         sid string, cmdID int, tmo int, log *logrus.Logger, exitFuncCB EmitExitCB) {
96         /* XXX - code to add to support stdin through WS
97         for {
98                 _, message, err := so. ?? ReadMessage()
99                 if err != nil {
100                         break
101                 }
102                 message = append(message, '\n')
103                 if _, err := w.Write(message); err != nil {
104                         break
105                 }
106         }
107         */
108
109         // Monitor process exit
110         type DoneChan struct {
111                 status int
112                 err    error
113         }
114         done := make(chan DoneChan, 1)
115         go func() {
116                 status := 0
117                 sts, err := proc.Wait()
118                 if !sts.Success() {
119                         s := sts.Sys().(syscall.WaitStatus)
120                         status = s.ExitStatus()
121                 }
122                 done <- DoneChan{status, err}
123         }()
124
125         // Wait cmd complete
126         select {
127         case dC := <-done:
128                 exitFuncCB(sid, cmdID, dC.status, dC.err)
129         case <-time.After(time.Duration(tmo) * time.Second):
130                 exitFuncCB(sid, cmdID, -99,
131                         fmt.Errorf("Exit Timeout for command ID %v", cmdID))
132         }
133 }
134
135 func cmdPumpStdout(so *socketio.Socket, r io.Reader, done chan struct{},
136         sid string, cmdID int, log *logrus.Logger, emitFuncCB EmitOutputCB) {
137         defer func() {
138         }()
139
140         sc := bufio.NewScanner(r)
141         for sc.Scan() {
142                 emitFuncCB(sid, cmdID, string(sc.Bytes()), "")
143         }
144         if sc.Err() != nil {
145                 log.Errorln("scan:", sc.Err())
146         }
147         close(done)
148 }