Wait folder insync before sending exit event.
[src/xds/xds-server.git] / lib / common / execPipeWs.go
1 package common
2
3 import (
4         "bufio"
5         "fmt"
6         "io"
7         "os"
8         "strings"
9         "time"
10
11         "syscall"
12
13         "github.com/Sirupsen/logrus"
14         "github.com/googollee/go-socket.io"
15 )
16
17 // EmitOutputCB is the function callback used to emit data
18 type EmitOutputCB func(sid string, cmdID int, stdout, stderr string, data *map[string]interface{})
19
20 // EmitExitCB is the function callback used to emit exit proc code
21 type EmitExitCB func(sid string, cmdID int, code int, err error, data *map[string]interface{})
22
23 // Inspired by :
24 // https://github.com/gorilla/websocket/blob/master/examples/command/main.go
25
26 // ExecPipeWs executes a command and redirect stdout/stderr into a WebSocket
27 func ExecPipeWs(cmd []string, env []string, so *socketio.Socket, sid string, cmdID int,
28         cmdExecTimeout int, log *logrus.Logger, eoCB EmitOutputCB, eeCB EmitExitCB, data *map[string]interface{}) error {
29
30         outr, outw, err := os.Pipe()
31         if err != nil {
32                 return fmt.Errorf("Pipe stdout error: " + err.Error())
33         }
34
35         // XXX - do we need to pipe stdin one day ?
36         inr, inw, err := os.Pipe()
37         if err != nil {
38                 outr.Close()
39                 outw.Close()
40                 return fmt.Errorf("Pipe stdin error: " + err.Error())
41         }
42
43         bashArgs := []string{"/bin/bash", "-c", strings.Join(cmd, " ")}
44         proc, err := os.StartProcess("/bin/bash", bashArgs, &os.ProcAttr{
45                 Files: []*os.File{inr, outw, outw},
46                 Env:   append(os.Environ(), env...),
47         })
48         if err != nil {
49                 outr.Close()
50                 outw.Close()
51                 inr.Close()
52                 inw.Close()
53                 return fmt.Errorf("Process start error: " + err.Error())
54         }
55
56         go func() {
57                 defer outr.Close()
58                 defer outw.Close()
59                 defer inr.Close()
60                 defer inw.Close()
61
62                 stdoutDone := make(chan struct{})
63                 go cmdPumpStdout(so, outr, stdoutDone, sid, cmdID, log, eoCB, data)
64
65                 // Blocking function that poll input or wait for end of process
66                 cmdPumpStdin(so, inw, proc, sid, cmdID, cmdExecTimeout, log, eeCB, data)
67
68                 // Some commands will exit when stdin is closed.
69                 inw.Close()
70
71                 defer outr.Close()
72
73                 if status, err := proc.Wait(); err == nil {
74                         // Other commands need a bonk on the head.
75                         if !status.Exited() {
76                                 if err := proc.Signal(os.Interrupt); err != nil {
77                                         log.Errorln("Proc interrupt:", err)
78                                 }
79
80                                 select {
81                                 case <-stdoutDone:
82                                 case <-time.After(time.Second):
83                                         // A bigger bonk on the head.
84                                         if err := proc.Signal(os.Kill); err != nil {
85                                                 log.Errorln("Proc term:", err)
86                                         }
87                                         <-stdoutDone
88                                 }
89                         }
90                 }
91         }()
92
93         return nil
94 }
95
96 func cmdPumpStdin(so *socketio.Socket, w io.Writer, proc *os.Process,
97         sid string, cmdID int, tmo int, log *logrus.Logger, exitFuncCB EmitExitCB,
98         data *map[string]interface{}) {
99         /* XXX - code to add to support stdin through WS
100         for {
101                 _, message, err := so. ?? ReadMessage()
102                 if err != nil {
103                         break
104                 }
105                 message = append(message, '\n')
106                 if _, err := w.Write(message); err != nil {
107                         break
108                 }
109         }
110         */
111
112         // Monitor process exit
113         type DoneChan struct {
114                 status int
115                 err    error
116         }
117         done := make(chan DoneChan, 1)
118         go func() {
119                 status := 0
120                 sts, err := proc.Wait()
121                 if !sts.Success() {
122                         s := sts.Sys().(syscall.WaitStatus)
123                         status = s.ExitStatus()
124                 }
125                 done <- DoneChan{status, err}
126         }()
127
128         // Wait cmd complete
129         select {
130         case dC := <-done:
131                 exitFuncCB(sid, cmdID, dC.status, dC.err, data)
132         case <-time.After(time.Duration(tmo) * time.Second):
133                 exitFuncCB(sid, cmdID, -99,
134                         fmt.Errorf("Exit Timeout for command ID %v", cmdID), data)
135         }
136 }
137
138 func cmdPumpStdout(so *socketio.Socket, r io.Reader, done chan struct{},
139         sid string, cmdID int, log *logrus.Logger, emitFuncCB EmitOutputCB, data *map[string]interface{}) {
140         defer func() {
141         }()
142
143         sc := bufio.NewScanner(r)
144         for sc.Scan() {
145                 emitFuncCB(sid, cmdID, string(sc.Bytes()), "", data)
146         }
147         if sc.Err() != nil {
148                 log.Errorln("scan:", sc.Err())
149         }
150         close(done)
151 }