Add doc to create & start XDS AGL docker container.
[src/xds/xds-server.git] / lib / common / execPipeWs.go
1 package common
2
3 import (
4         "bufio"
5         "fmt"
6         "io"
7         "os"
8         "strings"
9         "time"
10
11         "syscall"
12
13         "github.com/Sirupsen/logrus"
14         "github.com/googollee/go-socket.io"
15 )
16
17 // EmitOutputCB is the function callback used to emit data
18 type EmitOutputCB func(sid string, cmdID int, stdout, stderr string, data *map[string]interface{})
19
20 // EmitExitCB is the function callback used to emit exit proc code
21 type EmitExitCB func(sid string, cmdID int, code int, err error)
22
23 // Inspired by :
24 // https://github.com/gorilla/websocket/blob/master/examples/command/main.go
25
26 // ExecPipeWs executes a command and redirect stdout/stderr into a WebSocket
27 func ExecPipeWs(cmd []string, env []string, so *socketio.Socket, sid string, cmdID int,
28         cmdExecTimeout int, log *logrus.Logger, eoCB EmitOutputCB, eeCB EmitExitCB, data *map[string]interface{}) error {
29
30         outr, outw, err := os.Pipe()
31         if err != nil {
32                 return fmt.Errorf("Pipe stdout error: " + err.Error())
33         }
34
35         // XXX - do we need to pipe stdin one day ?
36         inr, inw, err := os.Pipe()
37         if err != nil {
38                 outr.Close()
39                 outw.Close()
40                 return fmt.Errorf("Pipe stdin error: " + err.Error())
41         }
42
43         bashArgs := []string{"/bin/bash", "-c", strings.Join(cmd, " ")}
44         proc, err := os.StartProcess("/bin/bash", bashArgs, &os.ProcAttr{
45                 Files: []*os.File{inr, outw, outw},
46                 Env:   append(os.Environ(), env...),
47         })
48         if err != nil {
49                 outr.Close()
50                 outw.Close()
51                 inr.Close()
52                 inw.Close()
53                 return fmt.Errorf("Process start error: " + err.Error())
54         }
55
56         go func() {
57                 defer outr.Close()
58                 defer outw.Close()
59                 defer inr.Close()
60                 defer inw.Close()
61
62                 stdoutDone := make(chan struct{})
63                 go cmdPumpStdout(so, outr, stdoutDone, sid, cmdID, log, eoCB, data)
64
65                 // Blocking function that poll input or wait for end of process
66                 cmdPumpStdin(so, inw, proc, sid, cmdID, cmdExecTimeout, log, eeCB)
67
68                 // Some commands will exit when stdin is closed.
69                 inw.Close()
70
71                 defer outr.Close()
72
73                 if status, err := proc.Wait(); err == nil {
74                         // Other commands need a bonk on the head.
75                         if !status.Exited() {
76                                 if err := proc.Signal(os.Interrupt); err != nil {
77                                         log.Errorln("Proc interrupt:", err)
78                                 }
79
80                                 select {
81                                 case <-stdoutDone:
82                                 case <-time.After(time.Second):
83                                         // A bigger bonk on the head.
84                                         if err := proc.Signal(os.Kill); err != nil {
85                                                 log.Errorln("Proc term:", err)
86                                         }
87                                         <-stdoutDone
88                                 }
89                         }
90                 }
91         }()
92
93         return nil
94 }
95
96 func cmdPumpStdin(so *socketio.Socket, w io.Writer, proc *os.Process,
97         sid string, cmdID int, tmo int, log *logrus.Logger, exitFuncCB EmitExitCB) {
98         /* XXX - code to add to support stdin through WS
99         for {
100                 _, message, err := so. ?? ReadMessage()
101                 if err != nil {
102                         break
103                 }
104                 message = append(message, '\n')
105                 if _, err := w.Write(message); err != nil {
106                         break
107                 }
108         }
109         */
110
111         // Monitor process exit
112         type DoneChan struct {
113                 status int
114                 err    error
115         }
116         done := make(chan DoneChan, 1)
117         go func() {
118                 status := 0
119                 sts, err := proc.Wait()
120                 if !sts.Success() {
121                         s := sts.Sys().(syscall.WaitStatus)
122                         status = s.ExitStatus()
123                 }
124                 done <- DoneChan{status, err}
125         }()
126
127         // Wait cmd complete
128         select {
129         case dC := <-done:
130                 exitFuncCB(sid, cmdID, dC.status, dC.err)
131         case <-time.After(time.Duration(tmo) * time.Second):
132                 exitFuncCB(sid, cmdID, -99,
133                         fmt.Errorf("Exit Timeout for command ID %v", cmdID))
134         }
135 }
136
137 func cmdPumpStdout(so *socketio.Socket, r io.Reader, done chan struct{},
138         sid string, cmdID int, log *logrus.Logger, emitFuncCB EmitOutputCB, data *map[string]interface{}) {
139         defer func() {
140         }()
141
142         sc := bufio.NewScanner(r)
143         for sc.Scan() {
144                 emitFuncCB(sid, cmdID, string(sc.Bytes()), "", data)
145         }
146         if sc.Err() != nil {
147                 log.Errorln("scan:", sc.Err())
148         }
149         close(done)
150 }