X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=lib%2Fagent%2Fxdsserver.go;h=3ec612380585fcf019cc609d4bf132f4a3242fb4;hb=45f6472d1e8ecad428da314a6d762143f033865d;hp=620bae9e8b5813b75b458c360579751953f59c98;hpb=7c7d90a781082c6bd22d12a5e2451ca61a5198af;p=src%2Fxds%2Fxds-agent.git diff --git a/lib/agent/xdsserver.go b/lib/agent/xdsserver.go index 620bae9..3ec6123 100644 --- a/lib/agent/xdsserver.go +++ b/lib/agent/xdsserver.go @@ -1,10 +1,26 @@ +/* + * Copyright (C) 2017 "IoT.bzh" + * Author Sebastien Douheret + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package agent import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/http" "strings" "sync" @@ -42,6 +58,7 @@ type XdsServer struct { ioSock *sio_client.Client logOut io.Writer apiRouter *gin.RouterGroup + cmdList map[string]interface{} } // EventCB Event emitter callback @@ -72,16 +89,15 @@ func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer { sockEvents: make(map[string][]*caller), sockEventsLock: &sync.Mutex{}, logOut: ctx.Log.Out, + cmdList: make(map[string]interface{}), } } // Close Free and close XDS Server connection func (xs *XdsServer) Close() error { - xs.Connected = false + err := xs._Disconnected() xs.Disabled = true - xs.ioSock = nil - xs._NotifyState() - return nil + return err } // Connect Establish HTTP connection with XDS Server @@ -106,7 +122,7 @@ func (xs *XdsServer) Connect() error { time.Sleep(time.Second) } if retry == 0 { - // FIXME: re-use _reconnect to wait longer in background + // FIXME: re-use _Reconnect to wait longer in background return fmt.Errorf("Connection to XDS Server failure") } if err != nil { @@ -114,7 +130,7 @@ func (xs *XdsServer) Connect() error { } // Check HTTP connection and establish WS connection - err = xs._connect(false) + err = xs._Connect(false) return err } @@ -172,6 +188,16 @@ func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.Fol return xs.client.Put("/folders/"+fld.ID, fld, resFld) } +// CommandExec Send POST request to execute a command +func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error { + return xs.client.Post("/exec", args, res) +} + +// CommandSignal Send POST request to send a signal to a command +func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error { + return xs.client.Post("/signal", args, res) +} + // SetAPIRouterGroup . func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) { xs.apiRouter = r @@ -194,8 +220,7 @@ func (xs *XdsServer) PassthroughGet(url string) { // Send Get request if err := xs.client.Get(nURL, &data); err != nil { if strings.Contains(err.Error(), "connection refused") { - xs.Connected = false - xs._NotifyState() + xs._Disconnected() } common.APIError(c, err.Error()) return @@ -213,8 +238,11 @@ func (xs *XdsServer) PassthroughPost(url string) { } xs.apiRouter.POST(url, func(c *gin.Context) { - bodyReq := []byte{} - n, err := c.Request.Body.Read(bodyReq) + var err error + var data interface{} + + // Get raw body + body, err := c.GetRawData() if err != nil { common.APIError(c, err.Error()) return @@ -227,38 +255,93 @@ func (xs *XdsServer) PassthroughPost(url string) { } // Send Post request - body, err := json.Marshal(bodyReq[:n]) + response, err := xs.client.HTTPPostWithRes(nURL, string(body)) if err != nil { - common.APIError(c, err.Error()) + goto httpError + } + if response.StatusCode != 200 { + err = fmt.Errorf(response.Status) return } - - response, err := xs.client.HTTPPostWithRes(nURL, string(body)) + err = json.Unmarshal(xs.client.ResponseToBArray(response), &data) if err != nil { - common.APIError(c, err.Error()) - return + goto httpError } - bodyRes, err := ioutil.ReadAll(response.Body) + c.JSON(http.StatusOK, data) + return + + /* Handle error case */ + httpError: + if strings.Contains(err.Error(), "connection refused") { + xs._Disconnected() + } + common.APIError(c, err.Error()) + }) +} + +// PassthroughDelete Used to declare a route that sends directly a Delete request to XDS Server +func (xs *XdsServer) PassthroughDelete(url string) { + if xs.apiRouter == nil { + xs.Log.Errorf("apiRouter not set !") + return + } + + xs.apiRouter.DELETE(url, func(c *gin.Context) { + var err error + var data interface{} + + // Take care of param (eg. id in /projects/:id) + nURL := url + if strings.Contains(url, ":") { + nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL) + } + + // Send Post request + response, err := xs.client.HTTPDeleteWithRes(nURL) if err != nil { - common.APIError(c, "Cannot read response body") + goto httpError + } + if response.StatusCode != 200 { + err = fmt.Errorf(response.Status) return } - c.JSON(http.StatusOK, string(bodyRes)) + err = json.Unmarshal(xs.client.ResponseToBArray(response), &data) + if err != nil { + goto httpError + } + + c.JSON(http.StatusOK, data) + return + + /* Handle error case */ + httpError: + if strings.Contains(err.Error(), "connection refused") { + xs._Disconnected() + } + common.APIError(c, err.Error()) }) } // EventRegister Post a request to register to an XdsServer event -func (xs *XdsServer) EventRegister(evName string, id string) error { - return xs.client.Post( - "/events/register", +func (xs *XdsServer) EventRegister(evName string, filter string) error { + return xs.client.Post("/events/register", xsapiv1.EventRegisterArgs{ - Name: evName, - ProjectID: id, + Name: evName, + Filter: filter, }, nil) } +// EventEmit Emit a event to XDS Server through WS +func (xs *XdsServer) EventEmit(message string, args ...interface{}) error { + if xs.ioSock == nil { + return fmt.Errorf("Io.Socket not initialized") + } + + return xs.ioSock.Emit(message, args...) +} + // EventOn Register a callback on events reception func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) { if xs.ioSock == nil { @@ -272,31 +355,16 @@ func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uu // Register listener only the first time evn := evName - // FIXME: use generic type: data interface{} instead of data xsapiv1.EventMsg - var err error - if evName == "event:folder-state-change" { - err = xs.ioSock.On(evn, func(data xsapiv1.EventMsg) error { - xs.sockEventsLock.Lock() - sEvts := make([]*caller, len(xs.sockEvents[evn])) - copy(sEvts, xs.sockEvents[evn]) - xs.sockEventsLock.Unlock() - for _, c := range sEvts { - c.Func(c.PrivateData, data) - } - return nil - }) - } else { - err = xs.ioSock.On(evn, func(data interface{}) error { - xs.sockEventsLock.Lock() - sEvts := make([]*caller, len(xs.sockEvents[evn])) - copy(sEvts, xs.sockEvents[evn]) - xs.sockEventsLock.Unlock() - for _, c := range sEvts { - c.Func(c.PrivateData, data) - } - return nil - }) - } + err := xs.ioSock.On(evn, func(data interface{}) error { + xs.sockEventsLock.Lock() + sEvts := make([]*caller, len(xs.sockEvents[evn])) + copy(sEvts, xs.sockEvents[evn]) + xs.sockEventsLock.Unlock() + for _, c := range sEvts { + c.Func(c.PrivateData, data) + } + return nil + }) if err != nil { return uuid.Nil, err } @@ -310,6 +378,7 @@ func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uu } xs.sockEvents[evName] = append(xs.sockEvents[evName], c) + xs.LogSillyf("XS EventOn: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName])) return c.id, nil } @@ -331,6 +400,7 @@ func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error { } } } + xs.LogSillyf("XS EventOff: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName])) return nil } @@ -404,6 +474,33 @@ func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectC return pPrj } +// CommandAdd Add a new command to the list of running commands +func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error { + if xs.CommandGet(cmdID) != nil { + return fmt.Errorf("command id already exist") + } + xs.cmdList[cmdID] = data + return nil +} + +// CommandDelete Delete a command from the command list +func (xs *XdsServer) CommandDelete(cmdID string) error { + if xs.CommandGet(cmdID) == nil { + return fmt.Errorf("unknown command id") + } + delete(xs.cmdList, cmdID) + return nil +} + +// CommandGet Retrieve a command data +func (xs *XdsServer) CommandGet(cmdID string) interface{} { + d, exist := xs.cmdList[cmdID] + if exist { + return d + } + return nil +} + /*** ** Private functions ***/ @@ -437,18 +534,22 @@ func (xs *XdsServer) _CreateConnectHTTP() error { return nil } -// Re-established connection -func (xs *XdsServer) _reconnect() error { - err := xs._connect(true) +// _Reconnect Re-established connection +func (xs *XdsServer) _Reconnect() error { + err := xs._Connect(true) if err == nil { // Reload projects list for this server err = xs.projects.Init(xs) } + if err == nil { + // Register again to all events + err = xs.EventRegister(xsapiv1.EVTAll, "") + } return err } -// Established HTTP and WS connection and retrieve XDSServer config -func (xs *XdsServer) _connect(reConn bool) error { +// _Connect Established HTTP and WS connection and retrieve XDSServer config +func (xs *XdsServer) _Connect(reConn bool) error { xdsCfg := xsapiv1.APIConfig{} if err := xs.client.Get("/config", &xdsCfg); err != nil { @@ -469,8 +570,7 @@ func (xs *XdsServer) _connect(reConn bool) error { // Establish WS connection and register listen if err := xs._SocketConnect(); err != nil { - xs.Connected = false - xs._NotifyState() + xs._Disconnected() return err } @@ -479,7 +579,7 @@ func (xs *XdsServer) _connect(reConn bool) error { return nil } -// Create WebSocket (io.socket) connection +// _SocketConnect Create WebSocket (io.socket) connection func (xs *XdsServer) _SocketConnect() error { xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL) @@ -510,8 +610,7 @@ func (xs *XdsServer) _SocketConnect() error { if xs.CBOnDisconnect != nil { xs.CBOnDisconnect(err) } - xs.Connected = false - xs._NotifyState() + xs._Disconnected() // Try to reconnect during 15min (or at least while not disabled) go func() { @@ -529,7 +628,12 @@ func (xs *XdsServer) _SocketConnect() error { time.Sleep(time.Second * time.Duration(waitTime)) xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count) - xs._reconnect() + err := xs._Reconnect() + if err != nil && + !(strings.Contains(err.Error(), "dial tcp") && strings.Contains(err.Error(), "connection refused")) { + xs.Log.Errorf("ERROR while reconnecting: %v", err.Error()) + } + } }() }) @@ -542,7 +646,19 @@ func (xs *XdsServer) _SocketConnect() error { return nil } -// Send event to notify changes +// _Disconnected Set XDS Server as disconnected +func (xs *XdsServer) _Disconnected() error { + // Clear all register events as socket is closed + for k := range xs.sockEvents { + delete(xs.sockEvents, k) + } + xs.Connected = false + xs.ioSock = nil + xs._NotifyState() + return nil +} + +// _NotifyState Send event to notify changes func (xs *XdsServer) _NotifyState() { evSts := xaapiv1.ServerCfg{