2 * Copyright (C) 2017 "IoT.bzh"
3 * Author Sebastien Douheret <sebastien@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
29 "github.com/gin-gonic/gin"
30 "github.com/iotbzh/xds-agent/lib/xaapiv1"
31 "github.com/iotbzh/xds-agent/lib/xdsconfig"
32 common "github.com/iotbzh/xds-common/golib"
33 "github.com/iotbzh/xds-server/lib/xsapiv1"
34 uuid "github.com/satori/go.uuid"
35 sio_client "github.com/sebd71/go-socket.io-client"
39 type XdsServer struct {
48 ServerConfig *xsapiv1.APIConfig
52 CBOnDisconnect func(error)
53 sockEvents map[string][]*caller
54 sockEventsLock *sync.Mutex
57 client *common.HTTPClient
58 ioSock *sio_client.Client
60 apiRouter *gin.RouterGroup
61 cmdList map[string]interface{}
64 // EventCB Event emitter callback
65 type EventCB func(privData interface{}, evtData interface{}) error
67 // caller Used to chain event listeners
72 PrivateData interface{}
75 const _IDTempoPrefix = "tempo-"
77 // NewXdsServer creates an instance of XdsServer
78 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
81 ID: _IDTempoPrefix + uuid.NewV1().String(),
83 APIURL: conf.APIBaseURL + conf.APIPartialURL,
84 PartialURL: conf.APIPartialURL,
85 ConnRetry: conf.ConnRetry,
89 sockEvents: make(map[string][]*caller),
90 sockEventsLock: &sync.Mutex{},
92 cmdList: make(map[string]interface{}),
96 // Close Free and close XDS Server connection
97 func (xs *XdsServer) Close() error {
98 err := xs._Disconnected()
103 // Connect Establish HTTP connection with XDS Server
104 func (xs *XdsServer) Connect() error {
112 for retry = xs.ConnRetry; retry > 0; retry-- {
113 if err = xs._CreateConnectHTTP(); err == nil {
116 if retry == xs.ConnRetry {
117 // Notify only on the first conn error
118 // doing that avoid 2 notifs (conn false; conn true) on startup
121 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
122 time.Sleep(time.Second)
125 // FIXME: re-use _Reconnect to wait longer in background
126 return fmt.Errorf("Connection to XDS Server failure")
132 // Check HTTP connection and establish WS connection
133 err = xs._Connect(false)
138 // IsTempoID returns true when server as a temporary id
139 func (xs *XdsServer) IsTempoID() bool {
140 return strings.HasPrefix(xs.ID, _IDTempoPrefix)
143 // SetLoggerOutput Set logger ou
144 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
148 // SendCommand Send a command to XDS Server
149 func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
151 if !strings.HasPrefix("/", cmd) {
154 return xs.client.Post(url, string(body), res)
157 // GetVersion Send Get request to retrieve XDS Server version
158 func (xs *XdsServer) GetVersion(res interface{}) error {
159 return xs.client.Get("/version", &res)
162 // GetFolders Send GET request to get current folder configuration
163 func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
164 return xs.client.Get("/folders", folders)
167 // FolderAdd Send POST request to add a folder
168 func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
169 err := xs.client.Post("/folders", fld, res)
171 return fmt.Errorf("FolderAdd error: %s", err.Error())
176 // FolderDelete Send DELETE request to delete a folder
177 func (xs *XdsServer) FolderDelete(id string) error {
178 return xs.client.HTTPDelete("/folders/" + id)
181 // FolderSync Send POST request to force synchronization of a folder
182 func (xs *XdsServer) FolderSync(id string) error {
183 return xs.client.HTTPPost("/folders/sync/"+id, "")
186 // FolderUpdate Send PUT request to update a folder
187 func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
188 return xs.client.Put("/folders/"+fld.ID, fld, resFld)
191 // CommandExec Send POST request to execute a command
192 func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error {
193 return xs.client.Post("/exec", args, res)
196 // CommandSignal Send POST request to send a signal to a command
197 func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error {
198 return xs.client.Post("/signal", args, res)
201 // SetAPIRouterGroup .
202 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
206 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
207 func (xs *XdsServer) PassthroughGet(url string) {
208 if xs.apiRouter == nil {
209 xs.Log.Errorf("apiRouter not set !")
213 xs.apiRouter.GET(url, func(c *gin.Context) {
215 // Take care of param (eg. id in /projects/:id)
217 if strings.Contains(url, ":") {
218 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
221 if err := xs.client.Get(nURL, &data); err != nil {
222 if strings.Contains(err.Error(), "connection refused") {
225 common.APIError(c, err.Error())
229 c.JSON(http.StatusOK, data)
233 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
234 func (xs *XdsServer) PassthroughPost(url string) {
235 if xs.apiRouter == nil {
236 xs.Log.Errorf("apiRouter not set !")
240 xs.apiRouter.POST(url, func(c *gin.Context) {
245 body, err := c.GetRawData()
247 common.APIError(c, err.Error())
251 // Take care of param (eg. id in /projects/:id)
253 if strings.Contains(url, ":") {
254 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
258 response, err := xs.client.HTTPPostWithRes(nURL, string(body))
262 if response.StatusCode != 200 {
263 err = fmt.Errorf(response.Status)
266 err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
271 c.JSON(http.StatusOK, data)
274 /* Handle error case */
276 if strings.Contains(err.Error(), "connection refused") {
279 common.APIError(c, err.Error())
283 // PassthroughDelete Used to declare a route that sends directly a Delete request to XDS Server
284 func (xs *XdsServer) PassthroughDelete(url string) {
285 if xs.apiRouter == nil {
286 xs.Log.Errorf("apiRouter not set !")
290 xs.apiRouter.DELETE(url, func(c *gin.Context) {
294 // Take care of param (eg. id in /projects/:id)
296 if strings.Contains(url, ":") {
297 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
301 response, err := xs.client.HTTPDeleteWithRes(nURL)
305 if response.StatusCode != 200 {
306 err = fmt.Errorf(response.Status)
309 err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
314 c.JSON(http.StatusOK, data)
317 /* Handle error case */
319 if strings.Contains(err.Error(), "connection refused") {
322 common.APIError(c, err.Error())
326 // EventRegister Post a request to register to an XdsServer event
327 func (xs *XdsServer) EventRegister(evName string, filter string) error {
328 return xs.client.Post("/events/register",
329 xsapiv1.EventRegisterArgs{
336 // EventEmit Emit a event to XDS Server through WS
337 func (xs *XdsServer) EventEmit(message string, args ...interface{}) error {
338 if xs.ioSock == nil {
339 return fmt.Errorf("Io.Socket not initialized")
342 return xs.ioSock.Emit(message, args...)
345 // EventOn Register a callback on events reception
346 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
347 if xs.ioSock == nil {
348 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
351 xs.sockEventsLock.Lock()
352 defer xs.sockEventsLock.Unlock()
354 if _, exist := xs.sockEvents[evName]; !exist {
355 // Register listener only the first time
358 err := xs.ioSock.On(evn, func(data interface{}) error {
359 xs.sockEventsLock.Lock()
360 sEvts := make([]*caller, len(xs.sockEvents[evn]))
361 copy(sEvts, xs.sockEvents[evn])
362 xs.sockEventsLock.Unlock()
363 for _, c := range sEvts {
364 c.Func(c.PrivateData, data)
377 PrivateData: privData,
380 xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
381 xs.LogSillyf("XS EventOn: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
385 // EventOff Un-register a (or all) callbacks associated to an event
386 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
387 xs.sockEventsLock.Lock()
388 defer xs.sockEventsLock.Unlock()
389 if _, exist := xs.sockEvents[evName]; exist {
392 xs.sockEvents[evName] = []*caller{}
394 // Un-register only the specified callback
395 for i, ff := range xs.sockEvents[evName] {
396 if uuid.Equal(ff.id, id) {
397 xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
403 xs.LogSillyf("XS EventOff: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
407 // ProjectToFolder Convert Project structure to Folder structure
408 func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
410 if pPrj.Type == xsapiv1.TypeCloudSync {
411 stID, _ = xs.SThg.IDGet()
413 // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
414 fPrj := xsapiv1.FolderConfig{
417 ClientPath: pPrj.ClientPath,
418 Type: xsapiv1.FolderType(pPrj.Type),
420 IsInSync: pPrj.IsInSync,
421 DefaultSdk: pPrj.DefaultSdk,
422 ClientData: pPrj.ClientData,
423 DataPathMap: xsapiv1.PathMapConfig{
424 ServerPath: pPrj.ServerPath,
426 DataCloudSync: xsapiv1.CloudSyncConfig{
428 STLocIsInSync: pPrj.IsInSync,
429 STLocStatus: pPrj.Status,
430 STSvrIsInSync: pPrj.IsInSync,
431 STSvrStatus: pPrj.Status,
438 // FolderToProject Convert Folder structure to Project structure
439 func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
440 inSync := fPrj.IsInSync
443 if fPrj.Type == xsapiv1.TypeCloudSync {
444 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
446 sts = fPrj.DataCloudSync.STSvrStatus
447 switch fPrj.DataCloudSync.STLocStatus {
448 case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
449 sts = fPrj.DataCloudSync.STLocStatus
451 case xaapiv1.StatusSyncing:
452 if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
453 sts = xaapiv1.StatusSyncing
456 case xaapiv1.StatusEnable:
462 pPrj := xaapiv1.ProjectConfig{
466 ClientPath: fPrj.ClientPath,
467 ServerPath: fPrj.DataPathMap.ServerPath,
468 Type: xaapiv1.ProjectType(fPrj.Type),
471 DefaultSdk: fPrj.DefaultSdk,
472 ClientData: fPrj.ClientData,
477 // CommandAdd Add a new command to the list of running commands
478 func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error {
479 if xs.CommandGet(cmdID) != nil {
480 return fmt.Errorf("command id already exist")
482 xs.cmdList[cmdID] = data
486 // CommandDelete Delete a command from the command list
487 func (xs *XdsServer) CommandDelete(cmdID string) error {
488 if xs.CommandGet(cmdID) == nil {
489 return fmt.Errorf("unknown command id")
491 delete(xs.cmdList, cmdID)
495 // CommandGet Retrieve a command data
496 func (xs *XdsServer) CommandGet(cmdID string) interface{} {
497 d, exist := xs.cmdList[cmdID]
508 // Create HTTP client
509 func (xs *XdsServer) _CreateConnectHTTP() error {
511 xs.client, err = common.HTTPNewClient(xs.BaseURL,
512 common.HTTPClientConfig{
513 URLPrefix: "/api/v1",
514 HeaderClientKeyName: "Xds-Sid",
517 LogPrefix: "XDSSERVER: ",
518 LogLevel: common.HTTPLogLevelWarning,
521 xs.client.SetLogLevel(xs.Log.Level.String())
524 msg := ": " + err.Error()
525 if strings.Contains(err.Error(), "connection refused") {
526 msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
528 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
530 if xs.client == nil {
531 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
537 // _Reconnect Re-established connection
538 func (xs *XdsServer) _Reconnect() error {
539 err := xs._Connect(true)
541 // Reload projects list for this server
542 err = xs.projects.Init(xs)
545 // Register again to all events
546 err = xs.EventRegister(xsapiv1.EVTAll, "")
551 // _Connect Established HTTP and WS connection and retrieve XDSServer config
552 func (xs *XdsServer) _Connect(reConn bool) error {
554 xdsCfg := xsapiv1.APIConfig{}
555 if err := xs.client.Get("/config", &xdsCfg); err != nil {
563 if reConn && xs.ID != xdsCfg.ServerUID {
564 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
567 // Update local XDS config
568 xs.ID = xdsCfg.ServerUID
569 xs.ServerConfig = &xdsCfg
571 // Establish WS connection and register listen
572 if err := xs._SocketConnect(); err != nil {
582 // _SocketConnect Create WebSocket (io.socket) connection
583 func (xs *XdsServer) _SocketConnect() error {
585 xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
587 opts := &sio_client.Options{
588 Transport: "websocket",
589 Header: make(map[string][]string),
591 opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
593 iosk, err := sio_client.NewClient(xs.BaseURL, opts)
595 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
599 // Register some listeners
601 iosk.On("error", func(err error) {
602 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
603 if xs.CBOnError != nil {
608 iosk.On("disconnection", func(err error) {
609 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
610 if xs.CBOnDisconnect != nil {
611 xs.CBOnDisconnect(err)
615 // Try to reconnect during 15min (or at least while not disabled)
619 for !xs.Disabled && !xs.Connected {
624 if waitTime > 15*60 {
625 xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
628 time.Sleep(time.Second * time.Duration(waitTime))
629 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
631 err := xs._Reconnect()
633 !(strings.Contains(err.Error(), "dial tcp") && strings.Contains(err.Error(), "connection refused")) {
634 xs.Log.Errorf("ERROR while reconnecting: %v", err.Error())
641 // XXX - There is no connection event generated so, just consider that
642 // we are connected when NewClient return successfully
643 /* iosk.On("connection", func() { ... }) */
644 xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
649 // _Disconnected Set XDS Server as disconnected
650 func (xs *XdsServer) _Disconnected() error {
651 // Clear all register events as socket is closed
652 for k := range xs.sockEvents {
653 delete(xs.sockEvents, k)
661 // _NotifyState Send event to notify changes
662 func (xs *XdsServer) _NotifyState() {
664 evSts := xaapiv1.ServerCfg{
668 PartialURL: xs.PartialURL,
669 ConnRetry: xs.ConnRetry,
670 Connected: xs.Connected,
672 if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
673 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)