2 * Copyright (C) 2017 "IoT.bzh"
3 * Author Sebastien Douheret <sebastien@iot.bzh>
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
29 "github.com/gin-gonic/gin"
30 "github.com/iotbzh/xds-agent/lib/xaapiv1"
31 "github.com/iotbzh/xds-agent/lib/xdsconfig"
32 common "github.com/iotbzh/xds-common/golib"
33 "github.com/iotbzh/xds-server/lib/xsapiv1"
34 uuid "github.com/satori/go.uuid"
35 sio_client "github.com/sebd71/go-socket.io-client"
39 type XdsServer struct {
48 ServerConfig *xsapiv1.APIConfig
52 CBOnDisconnect func(error)
53 sockEvents map[string][]*caller
54 sockEventsLock *sync.Mutex
57 client *common.HTTPClient
58 ioSock *sio_client.Client
60 apiRouter *gin.RouterGroup
61 cmdList map[string]interface{}
62 cbOnConnect OnConnectedCB
65 // EventCB Event emitter callback
66 type EventCB func(privData interface{}, evtData interface{}) error
68 // OnConnectedCB connect callback
69 type OnConnectedCB func(svr *XdsServer) error
71 // caller Used to chain event listeners
76 PrivateData interface{}
79 const _IDTempoPrefix = "tempo-"
81 // NewXdsServer creates an instance of XdsServer
82 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
85 ID: _IDTempoPrefix + uuid.NewV1().String(),
87 APIURL: conf.APIBaseURL + conf.APIPartialURL,
88 PartialURL: conf.APIPartialURL,
89 ConnRetry: conf.ConnRetry,
93 sockEvents: make(map[string][]*caller),
94 sockEventsLock: &sync.Mutex{},
96 cmdList: make(map[string]interface{}),
100 // Close Free and close XDS Server connection
101 func (xs *XdsServer) Close() error {
102 err := xs._Disconnected()
107 // Connect Establish HTTP connection with XDS Server
108 func (xs *XdsServer) Connect() error {
116 for retry = xs.ConnRetry; retry > 0; retry-- {
117 if err = xs._CreateConnectHTTP(); err == nil {
120 if retry == xs.ConnRetry {
121 // Notify only on the first conn error
122 // doing that avoid 2 notifs (conn false; conn true) on startup
125 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
126 time.Sleep(time.Second)
129 // FIXME: re-use _Reconnect to wait longer in background
130 return fmt.Errorf("Connection to XDS Server failure")
136 // Check HTTP connection and establish WS connection
137 err = xs._Connect(false)
142 // ConnectOn Register a callback on events reception
143 func (xs *XdsServer) ConnectOn(f OnConnectedCB) error {
148 // IsTempoID returns true when server as a temporary id
149 func (xs *XdsServer) IsTempoID() bool {
150 return strings.HasPrefix(xs.ID, _IDTempoPrefix)
153 // SetLoggerOutput Set logger ou
154 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
158 // SendCommand Send a command to XDS Server
159 func (xs *XdsServer) SendCommand(cmd string, body []byte, res interface{}) error {
161 if !strings.HasPrefix("/", cmd) {
164 return xs.client.Post(url, string(body), res)
167 // GetVersion Send Get request to retrieve XDS Server version
168 func (xs *XdsServer) GetVersion(res interface{}) error {
169 return xs.client.Get("/version", &res)
172 // GetFolders Send GET request to get current folder configuration
173 func (xs *XdsServer) GetFolders(folders *[]xsapiv1.FolderConfig) error {
174 return xs.client.Get("/folders", folders)
177 // FolderAdd Send POST request to add a folder
178 func (xs *XdsServer) FolderAdd(fld *xsapiv1.FolderConfig, res interface{}) error {
179 err := xs.client.Post("/folders", fld, res)
181 return fmt.Errorf("FolderAdd error: %s", err.Error())
186 // FolderDelete Send DELETE request to delete a folder
187 func (xs *XdsServer) FolderDelete(id string) error {
188 return xs.client.HTTPDelete("/folders/" + id)
191 // FolderSync Send POST request to force synchronization of a folder
192 func (xs *XdsServer) FolderSync(id string) error {
193 return xs.client.HTTPPost("/folders/sync/"+id, "")
196 // FolderUpdate Send PUT request to update a folder
197 func (xs *XdsServer) FolderUpdate(fld *xsapiv1.FolderConfig, resFld *xsapiv1.FolderConfig) error {
198 return xs.client.Put("/folders/"+fld.ID, fld, resFld)
201 // CommandExec Send POST request to execute a command
202 func (xs *XdsServer) CommandExec(args *xsapiv1.ExecArgs, res *xsapiv1.ExecResult) error {
203 return xs.client.Post("/exec", args, res)
206 // CommandSignal Send POST request to send a signal to a command
207 func (xs *XdsServer) CommandSignal(args *xsapiv1.ExecSignalArgs, res *xsapiv1.ExecSigResult) error {
208 return xs.client.Post("/signal", args, res)
211 // SetAPIRouterGroup .
212 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
216 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
217 func (xs *XdsServer) PassthroughGet(url string) {
218 if xs.apiRouter == nil {
219 xs.Log.Errorf("apiRouter not set !")
223 xs.apiRouter.GET(url, func(c *gin.Context) {
225 // Take care of param (eg. id in /projects/:id)
227 if strings.Contains(url, ":") {
228 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
231 if err := xs.client.Get(nURL, &data); err != nil {
232 if strings.Contains(err.Error(), "connection refused") {
235 common.APIError(c, err.Error())
239 c.JSON(http.StatusOK, data)
243 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
244 func (xs *XdsServer) PassthroughPost(url string) {
245 if xs.apiRouter == nil {
246 xs.Log.Errorf("apiRouter not set !")
250 xs.apiRouter.POST(url, func(c *gin.Context) {
255 body, err := c.GetRawData()
257 common.APIError(c, err.Error())
261 // Take care of param (eg. id in /projects/:id)
263 if strings.Contains(url, ":") {
264 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
268 response, err := xs.client.HTTPPostWithRes(nURL, string(body))
272 if response.StatusCode != 200 {
273 err = fmt.Errorf(response.Status)
276 err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
281 c.JSON(http.StatusOK, data)
284 /* Handle error case */
286 if strings.Contains(err.Error(), "connection refused") {
289 common.APIError(c, err.Error())
293 // PassthroughDelete Used to declare a route that sends directly a Delete request to XDS Server
294 func (xs *XdsServer) PassthroughDelete(url string) {
295 if xs.apiRouter == nil {
296 xs.Log.Errorf("apiRouter not set !")
300 xs.apiRouter.DELETE(url, func(c *gin.Context) {
304 // Take care of param (eg. id in /projects/:id)
306 if strings.Contains(url, ":") {
307 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
311 response, err := xs.client.HTTPDeleteWithRes(nURL)
315 if response.StatusCode != 200 {
316 err = fmt.Errorf(response.Status)
319 err = json.Unmarshal(xs.client.ResponseToBArray(response), &data)
324 c.JSON(http.StatusOK, data)
327 /* Handle error case */
329 if strings.Contains(err.Error(), "connection refused") {
332 common.APIError(c, err.Error())
336 // EventRegister Post a request to register to an XdsServer event
337 func (xs *XdsServer) EventRegister(evName string, filter string) error {
338 return xs.client.Post("/events/register",
339 xsapiv1.EventRegisterArgs{
346 // EventEmit Emit a event to XDS Server through WS
347 func (xs *XdsServer) EventEmit(message string, args ...interface{}) error {
348 if xs.ioSock == nil {
349 return fmt.Errorf("Io.Socket not initialized")
352 return xs.ioSock.Emit(message, args...)
355 // EventOn Register a callback on events reception
356 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
357 if xs.ioSock == nil {
358 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
361 xs.sockEventsLock.Lock()
362 defer xs.sockEventsLock.Unlock()
364 if _, exist := xs.sockEvents[evName]; !exist {
365 // Register listener only the first time
368 err := xs.ioSock.On(evn, func(data interface{}) error {
369 xs.sockEventsLock.Lock()
370 sEvts := make([]*caller, len(xs.sockEvents[evn]))
371 copy(sEvts, xs.sockEvents[evn])
372 xs.sockEventsLock.Unlock()
373 for _, c := range sEvts {
374 c.Func(c.PrivateData, data)
387 PrivateData: privData,
390 xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
391 xs.LogSillyf("XS EventOn: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
395 // EventOff Un-register a (or all) callbacks associated to an event
396 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
397 xs.sockEventsLock.Lock()
398 defer xs.sockEventsLock.Unlock()
399 if _, exist := xs.sockEvents[evName]; exist {
402 xs.sockEvents[evName] = []*caller{}
404 // Un-register only the specified callback
405 for i, ff := range xs.sockEvents[evName] {
406 if uuid.Equal(ff.id, id) {
407 xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
413 xs.LogSillyf("XS EventOff: sockEvents[\"%s\"]: len %d", evName, len(xs.sockEvents[evName]))
417 // ProjectToFolder Convert Project structure to Folder structure
418 func (xs *XdsServer) ProjectToFolder(pPrj xaapiv1.ProjectConfig) *xsapiv1.FolderConfig {
420 if pPrj.Type == xsapiv1.TypeCloudSync {
421 stID, _ = xs.SThg.IDGet()
423 // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
424 fPrj := xsapiv1.FolderConfig{
427 ClientPath: pPrj.ClientPath,
428 Type: xsapiv1.FolderType(pPrj.Type),
430 IsInSync: pPrj.IsInSync,
431 DefaultSdk: pPrj.DefaultSdk,
432 ClientData: pPrj.ClientData,
433 DataPathMap: xsapiv1.PathMapConfig{
434 ServerPath: pPrj.ServerPath,
436 DataCloudSync: xsapiv1.CloudSyncConfig{
438 STLocIsInSync: pPrj.IsInSync,
439 STLocStatus: pPrj.Status,
440 STSvrIsInSync: pPrj.IsInSync,
441 STSvrStatus: pPrj.Status,
448 // FolderToProject Convert Folder structure to Project structure
449 func (xs *XdsServer) FolderToProject(fPrj xsapiv1.FolderConfig) xaapiv1.ProjectConfig {
450 inSync := fPrj.IsInSync
453 if fPrj.Type == xsapiv1.TypeCloudSync {
454 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
456 sts = fPrj.DataCloudSync.STSvrStatus
457 switch fPrj.DataCloudSync.STLocStatus {
458 case xaapiv1.StatusErrorConfig, xaapiv1.StatusDisable, xaapiv1.StatusPause:
459 sts = fPrj.DataCloudSync.STLocStatus
461 case xaapiv1.StatusSyncing:
462 if sts != xaapiv1.StatusErrorConfig && sts != xaapiv1.StatusDisable && sts != xaapiv1.StatusPause {
463 sts = xaapiv1.StatusSyncing
466 case xaapiv1.StatusEnable:
472 pPrj := xaapiv1.ProjectConfig{
476 ClientPath: fPrj.ClientPath,
477 ServerPath: fPrj.DataPathMap.ServerPath,
478 Type: xaapiv1.ProjectType(fPrj.Type),
481 DefaultSdk: fPrj.DefaultSdk,
482 ClientData: fPrj.ClientData,
487 // CommandAdd Add a new command to the list of running commands
488 func (xs *XdsServer) CommandAdd(cmdID string, data interface{}) error {
489 if xs.CommandGet(cmdID) != nil {
490 return fmt.Errorf("command id already exist")
492 xs.cmdList[cmdID] = data
496 // CommandDelete Delete a command from the command list
497 func (xs *XdsServer) CommandDelete(cmdID string) error {
498 if xs.CommandGet(cmdID) == nil {
499 return fmt.Errorf("unknown command id")
501 delete(xs.cmdList, cmdID)
505 // CommandGet Retrieve a command data
506 func (xs *XdsServer) CommandGet(cmdID string) interface{} {
507 d, exist := xs.cmdList[cmdID]
518 // Create HTTP client
519 func (xs *XdsServer) _CreateConnectHTTP() error {
521 xs.client, err = common.HTTPNewClient(xs.BaseURL,
522 common.HTTPClientConfig{
523 URLPrefix: "/api/v1",
524 HeaderClientKeyName: "Xds-Sid",
527 LogPrefix: "XDSSERVER: ",
528 LogLevel: common.HTTPLogLevelWarning,
531 xs.client.SetLogLevel(xs.Log.Level.String())
534 msg := ": " + err.Error()
535 if strings.Contains(err.Error(), "connection refused") {
536 msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
538 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
540 if xs.client == nil {
541 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
547 // _Reconnect Re-established connection
548 func (xs *XdsServer) _Reconnect() error {
550 // Note that ConnectOn callback will be called (see apiv1.go file)
551 err := xs._Connect(true)
556 // _Connect Established HTTP and WS connection and retrieve XDSServer config
557 func (xs *XdsServer) _Connect(reConn bool) error {
559 xdsCfg := xsapiv1.APIConfig{}
560 if err := xs.client.Get("/config", &xdsCfg); err != nil {
568 if reConn && xs.ID != xdsCfg.ServerUID {
569 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ServerUID)
572 // Update local XDS config
573 xs.ID = xdsCfg.ServerUID
574 xs.ServerConfig = &xdsCfg
576 // Establish WS connection and register listen
577 if err := xs._SocketConnect(); err != nil {
584 // Call OnConnect callback
585 if xs.cbOnConnect != nil {
593 // _SocketConnect Create WebSocket (io.socket) connection
594 func (xs *XdsServer) _SocketConnect() error {
596 xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
598 opts := &sio_client.Options{
599 Transport: "websocket",
600 Header: make(map[string][]string),
602 opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
604 iosk, err := sio_client.NewClient(xs.BaseURL, opts)
606 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
610 // Register some listeners
612 iosk.On("error", func(err error) {
613 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
614 if xs.CBOnError != nil {
619 iosk.On("disconnection", func(err error) {
620 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
621 if xs.CBOnDisconnect != nil {
622 xs.CBOnDisconnect(err)
626 // Try to reconnect during 15min (or at least while not disabled)
630 for !xs.Disabled && !xs.Connected {
635 if waitTime > 15*60 {
636 xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
639 time.Sleep(time.Second * time.Duration(waitTime))
640 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
642 err := xs._Reconnect()
644 !(strings.Contains(err.Error(), "dial tcp") && strings.Contains(err.Error(), "connection refused")) {
645 xs.Log.Errorf("ERROR while reconnecting: %v", err.Error())
652 // XXX - There is no connection event generated so, just consider that
653 // we are connected when NewClient return successfully
654 /* iosk.On("connection", func() { ... }) */
655 xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
660 // _Disconnected Set XDS Server as disconnected
661 func (xs *XdsServer) _Disconnected() error {
662 // Clear all register events as socket is closed
663 for k := range xs.sockEvents {
664 delete(xs.sockEvents, k)
672 // _NotifyState Send event to notify changes
673 func (xs *XdsServer) _NotifyState() {
675 evSts := xaapiv1.ServerCfg{
679 PartialURL: xs.PartialURL,
680 ConnRetry: xs.ConnRetry,
681 Connected: xs.Connected,
683 if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil {
684 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)