2 * Copyright (C) 2017 "IoT.bzh"
3 * Author Sebastien Douheret <sebastien@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
30 "github.com/gin-gonic/gin"
31 "github.com/iotbzh/xds-agent/lib/xaapiv1"
32 "github.com/iotbzh/xds-agent/lib/xdsconfig"
33 common "github.com/iotbzh/xds-common/golib"
34 "github.com/iotbzh/xds-server/lib/xsapiv1"
35 uuid "github.com/satori/go.uuid"
36 sio_client "github.com/sebd71/go-socket.io-client"
40 type XdsServer struct {
49 ServerConfig *xsapiv1.APIConfig
53 CBOnDisconnect func(error)
54 sockEvents map[string][]*caller
55 sockEventsLock *sync.Mutex
58 client *common.HTTPClient
59 ioSock *sio_client.Client
61 apiRouter *gin.RouterGroup
62 cmdList map[string]interface{}
65 // EventCB Event emitter callback
66 type EventCB func(privData interface{}, evtData interface{}) error
68 // caller Used to chain event listeners
73 PrivateData interface{}
76 const _IDTempoPrefix = "tempo-"
78 // NewXdsServer creates an instance of XdsServer
79 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
82 ID: _IDTempoPrefix + uuid.NewV1().String(),
84 APIURL: conf.APIBaseURL + conf.APIPartialURL,
85 PartialURL: conf.APIPartialURL,
86 ConnRetry: conf.ConnRetry,
90 sockEvents: make(map[string][]*caller),
91 sockEventsLock: &sync.Mutex{},
93 cmdList: make(map[string]interface{}),
97 // Close Free and close XDS Server connection
98 func (xs *XdsServer) Close() error {
99 err := xs._Disconnected()
104 // Connect Establish HTTP connection with XDS Server
105 func (xs *XdsServer) Connect() error {
113 for retry = xs.ConnRetry; retry > 0; retry-- {
114 if err = xs._CreateConnectHTTP(); err == nil {
117 if retry == xs.ConnRetry {
118 // Notify only on the first conn error
119 // doing that avoid 2 notifs (conn false; conn true) on startup
122 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
123 time.Sleep(time.Second)
126 // FIXME: re-use _Reconnect to wait longer in background
127 return fmt.Errorf("Connection to XDS Server failure")
133 // Check HTTP connection and establish WS connection
134 err = xs._Connect(false)
139 // IsTempoID returns true when server as a temporary id
140 func (xs *XdsServer) IsTempoID() bool {
141 return strings.HasPrefix(xs.ID, _IDTempoPrefix)
144 // SetLoggerOutput Set logger ou
145 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
149 // SendCommand Send a command to XDS Server
150 func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
152 if !strings.HasPrefix("/", cmd) {
155 return xs.client.Post(url, string(body), res)
158 // GetVersion Send Get request to retrieve XDS Server version
159 func (xs *XdsServer) GetVersion(res interface{}) error {
160 return xs.client.Get("/version", &res)
163 // GetFolders Send GET request to get current folder configuration
164 func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
165 return xs.client.Get("/folders", folders)
168 // FolderAdd Send POST request to add a folder
169 func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
170 err := xs.client.Post("/folders", fld, res)
172 return fmt.Errorf("FolderAdd error: %s", err.Error())
177 // FolderDelete Send DELETE request to delete a folder
178 func (xs *XdsServer) FolderDelete(id string) error {
179 return xs.client.HTTPDelete("/folders/" + id)
182 // FolderSync Send POST request to force synchronization of a folder
183 func (xs *XdsServer) FolderSync(id string) error {
184 return xs.client.HTTPPost("/folders/sync/"+id, "")
187 // FolderUpdate Send PUT request to update a folder
188 func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
189 return xs.client.Put("/folders/"+fld.ID, fld, resFld)
192 // CommandExec Send POST request to execute a command
193 func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error {
194 return xs.client.Post("/exec", args, res)
197 // CommandSignal Send POST request to send a signal to a command
198 func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error {
199 return xs.client.Post("/signal", args, res)
202 // SetAPIRouterGroup .
203 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
207 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
208 func (xs *XdsServer) PassthroughGet(url string) {
209 if xs.apiRouter == nil {
210 xs.Log.Errorf("apiRouter not set !")
214 xs.apiRouter.GET(url, func(c *gin.Context) {
216 // Take care of param (eg. id in /projects/:id)
218 if strings.Contains(url, ":") {
219 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
222 if err := xs.client.Get(nURL, &data); err != nil {
223 if strings.Contains(err.Error(), "connection refused") {
226 common.APIError(c, err.Error())
230 c.JSON(http.StatusOK, data)
234 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
235 func (xs *XdsServer) PassthroughPost(url string) {
236 if xs.apiRouter == nil {
237 xs.Log.Errorf("apiRouter not set !")
241 xs.apiRouter.POST(url, func(c *gin.Context) {
243 n, err := c.Request.Body.Read(bodyReq)
245 common.APIError(c, err.Error())
249 // Take care of param (eg. id in /projects/:id)
251 if strings.Contains(url, ":") {
252 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
256 body, err := json.Marshal(bodyReq[:n])
258 common.APIError(c, err.Error())
262 response, err := xs.client.HTTPPostWithRes(nURL, string(body))
264 common.APIError(c, err.Error())
268 bodyRes, err := ioutil.ReadAll(response.Body)
270 common.APIError(c, "Cannot read response body")
273 c.JSON(http.StatusOK, string(bodyRes))
277 // EventRegister Post a request to register to an XdsServer event
278 func (xs *XdsServer) EventRegister(evName string, id string) error {
279 return xs.client.Post(
281 xsapiv1.EventRegisterArgs{
288 // EventEmit Emit a event to XDS Server through WS
289 func (xs *XdsServer) EventEmit(message string, args ...interface{}) error {
290 if xs.ioSock == nil {
291 return fmt.Errorf("Io.Socket not initialized")
294 return xs.ioSock.Emit(message, args...)
297 // EventOn Register a callback on events reception
298 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
299 if xs.ioSock == nil {
300 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
303 xs.sockEventsLock.Lock()
304 defer xs.sockEventsLock.Unlock()
306 if _, exist := xs.sockEvents[evName]; !exist {
307 // Register listener only the first time
310 // FIXME: use generic type: data interface{} instead of data xsapiv1.EventMsg
312 if evName == "event:folder-state-change" {
313 err = xs.ioSock.On(evn, func(data xsapiv1.EventMsg) error {
314 xs.sockEventsLock.Lock()
315 sEvts := make([]*caller, len(xs.sockEvents[evn]))
316 copy(sEvts, xs.sockEvents[evn])
317 xs.sockEventsLock.Unlock()
318 for _, c := range sEvts {
319 c.Func(c.PrivateData, data)
324 err = xs.ioSock.On(evn, func(data interface{}) error {
325 xs.sockEventsLock.Lock()
326 sEvts := make([]*caller, len(xs.sockEvents[evn]))
327 copy(sEvts, xs.sockEvents[evn])
328 xs.sockEventsLock.Unlock()
329 for _, c := range sEvts {
330 c.Func(c.PrivateData, data)
344 PrivateData: privData,
347 xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
348 xs.LogSillyf("XS EventOn: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
352 // EventOff Un-register a (or all) callbacks associated to an event
353 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
354 xs.sockEventsLock.Lock()
355 defer xs.sockEventsLock.Unlock()
356 if _, exist := xs.sockEvents[evName]; exist {
359 xs.sockEvents[evName] = []*caller{}
361 // Un-register only the specified callback
362 for i, ff := range xs.sockEvents[evName] {
363 if uuid.Equal(ff.id, id) {
364 xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
370 xs.LogSillyf("XS EventOff: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
374 // ProjectToFolder Convert Project structure to Folder structure
375 func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
377 if pPrj.Type == xsapiv1.TypeCloudSync {
378 stID, _ = xs.SThg.IDGet()
380 // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
381 fPrj := xsapiv1.FolderConfig{
384 ClientPath: pPrj.ClientPath,
385 Type: xsapiv1.FolderType(pPrj.Type),
387 IsInSync: pPrj.IsInSync,
388 DefaultSdk: pPrj.DefaultSdk,
389 ClientData: pPrj.ClientData,
390 DataPathMap: xsapiv1.PathMapConfig{
391 ServerPath: pPrj.ServerPath,
393 DataCloudSync: xsapiv1.CloudSyncConfig{
395 STLocIsInSync: pPrj.IsInSync,
396 STLocStatus: pPrj.Status,
397 STSvrIsInSync: pPrj.IsInSync,
398 STSvrStatus: pPrj.Status,
405 // FolderToProject Convert Folder structure to Project structure
406 func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
407 inSync := fPrj.IsInSync
410 if fPrj.Type == xsapiv1.TypeCloudSync {
411 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
413 sts = fPrj.DataCloudSync.STSvrStatus
414 switch fPrj.DataCloudSync.STLocStatus {
415 case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
416 sts = fPrj.DataCloudSync.STLocStatus
418 case xaapiv1.StatusSyncing:
419 if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
420 sts = xaapiv1.StatusSyncing
423 case xaapiv1.StatusEnable:
429 pPrj := xaapiv1.ProjectConfig{
433 ClientPath: fPrj.ClientPath,
434 ServerPath: fPrj.DataPathMap.ServerPath,
435 Type: xaapiv1.ProjectType(fPrj.Type),
438 DefaultSdk: fPrj.DefaultSdk,
439 ClientData: fPrj.ClientData,
444 // CommandAdd Add a new command to the list of running commands
445 func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error {
446 if xs.CommandGet(cmdID) != nil {
447 return fmt.Errorf("command id already exist")
449 xs.cmdList[cmdID] = data
453 // CommandDelete Delete a command from the command list
454 func (xs *XdsServer) CommandDelete(cmdID string) error {
455 if xs.CommandGet(cmdID) == nil {
456 return fmt.Errorf("unknown command id")
458 delete(xs.cmdList, cmdID)
462 // CommandGet Retrieve a command data
463 func (xs *XdsServer) CommandGet(cmdID string) interface{} {
464 d, exist := xs.cmdList[cmdID]
475 // Create HTTP client
476 func (xs *XdsServer) _CreateConnectHTTP() error {
478 xs.client, err = common.HTTPNewClient(xs.BaseURL,
479 common.HTTPClientConfig{
480 URLPrefix: "/api/v1",
481 HeaderClientKeyName: "Xds-Sid",
484 LogPrefix: "XDSSERVER: ",
485 LogLevel: common.HTTPLogLevelWarning,
488 xs.client.SetLogLevel(xs.Log.Level.String())
491 msg := ": " + err.Error()
492 if strings.Contains(err.Error(), "connection refused") {
493 msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
495 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
497 if xs.client == nil {
498 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
504 // _Reconnect Re-established connection
505 func (xs *XdsServer) _Reconnect() error {
506 err := xs._Connect(true)
508 // Reload projects list for this server
509 err = xs.projects.Init(xs)
514 // _Connect Established HTTP and WS connection and retrieve XDSServer config
515 func (xs *XdsServer) _Connect(reConn bool) error {
517 xdsCfg := xsapiv1.APIConfig{}
518 if err := xs.client.Get("/config", &xdsCfg); err != nil {
526 if reConn && xs.ID != xdsCfg.ServerUID {
527 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
530 // Update local XDS config
531 xs.ID = xdsCfg.ServerUID
532 xs.ServerConfig = &xdsCfg
534 // Establish WS connection and register listen
535 if err := xs._SocketConnect(); err != nil {
545 // _SocketConnect Create WebSocket (io.socket) connection
546 func (xs *XdsServer) _SocketConnect() error {
548 xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
550 opts := &sio_client.Options{
551 Transport: "websocket",
552 Header: make(map[string][]string),
554 opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
556 iosk, err := sio_client.NewClient(xs.BaseURL, opts)
558 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
562 // Register some listeners
564 iosk.On("error", func(err error) {
565 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
566 if xs.CBOnError != nil {
571 iosk.On("disconnection", func(err error) {
572 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
573 if xs.CBOnDisconnect != nil {
574 xs.CBOnDisconnect(err)
578 // Try to reconnect during 15min (or at least while not disabled)
582 for !xs.Disabled && !xs.Connected {
587 if waitTime > 15*60 {
588 xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
591 time.Sleep(time.Second * time.Duration(waitTime))
592 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
594 err := xs._Reconnect()
596 !(strings.Contains(err.Error(), "dial tcp") && strings.Contains(err.Error(), "connection refused")) {
597 xs.Log.Errorf("ERROR while reconnecting: %v", err.Error())
604 // XXX - There is no connection event generated so, just consider that
605 // we are connected when NewClient return successfully
606 /* iosk.On("connection", func() { ... }) */
607 xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
612 // _Disconnected Set XDS Server as disconnected
613 func (xs *XdsServer) _Disconnected() error {
614 // Clear all register events as socket is closed
615 for k := range xs.sockEvents {
616 delete(xs.sockEvents, k)
624 // _NotifyState Send event to notify changes
625 func (xs *XdsServer) _NotifyState() {
627 evSts := xaapiv1.ServerCfg{
631 PartialURL: xs.PartialURL,
632 ConnRetry: xs.ConnRetry,
633 Connected: xs.Connected,
635 if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
636 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)