13 "github.com/gin-gonic/gin"
14 "github.com/iotbzh/xds-agent/lib/apiv1"
15 "github.com/iotbzh/xds-agent/lib/xdsconfig"
16 common "github.com/iotbzh/xds-common/golib"
17 uuid "github.com/satori/go.uuid"
18 sio_client "github.com/sebd71/go-socket.io-client"
22 type XdsServer struct {
31 ServerConfig *XdsServerConfig
35 CBOnDisconnect func(error)
36 sockEvents map[string][]*caller
37 sockEventsLock *sync.Mutex
40 client *common.HTTPClient
41 ioSock *sio_client.Client
43 apiRouter *gin.RouterGroup
46 // XdsServerConfig Data return by GET /config
47 type XdsServerConfig struct {
49 Version string `json:"version"`
50 APIVersion string `json:"apiVersion"`
51 VersionGitTag string `json:"gitTag"`
52 SupportedSharing map[string]bool `json:"supportedSharing"`
53 Builder XdsBuilderConfig `json:"builder"`
56 // XdsBuilderConfig represents the builder container configuration
57 type XdsBuilderConfig struct {
59 Port string `json:"port"`
60 SyncThingID string `json:"syncThingID"`
63 // XdsFolderType XdsServer folder type
64 type XdsFolderType string
67 // XdsTypePathMap Path Mapping folder type
68 XdsTypePathMap = "PathMap"
69 // XdsTypeCloudSync Cloud synchronization (AKA syncthing) folder type
70 XdsTypeCloudSync = "CloudSync"
71 // XdsTypeCifsSmb CIFS (AKA samba) folder type
72 XdsTypeCifsSmb = "CIFS"
75 // XdsFolderConfig XdsServer folder config
76 type XdsFolderConfig struct {
78 Label string `json:"label"`
79 ClientPath string `json:"path"`
80 Type XdsFolderType `json:"type"`
81 Status string `json:"status"`
82 IsInSync bool `json:"isInSync"`
83 DefaultSdk string `json:"defaultSdk"`
84 ClientData string `json:"clientData"` // free form field that can used by client
86 // Specific data depending on which Type is used
87 DataPathMap XdsPathMapConfig `json:"dataPathMap,omitempty"`
88 DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"`
91 // XdsPathMapConfig Path mapping specific data
92 type XdsPathMapConfig struct {
93 ServerPath string `json:"serverPath"`
94 CheckFile string `json:"checkFile"`
95 CheckContent string `json:"checkContent"`
98 // XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data
99 type XdsCloudSyncConfig struct {
100 SyncThingID string `json:"syncThingID"`
101 STSvrStatus string `json:"-"`
102 STSvrIsInSync bool `json:"-"`
103 STLocStatus string `json:"-"`
104 STLocIsInSync bool `json:"-"`
107 // XdsEventRegisterArgs arguments used to register to XDS server events
108 type XdsEventRegisterArgs struct {
109 Name string `json:"name"`
110 ProjectID string `json:"filterProjectID"`
113 // XdsEventFolderChange Folder change event structure
114 type XdsEventFolderChange struct {
115 Time string `json:"time"`
116 Type string `json:"type"`
117 Folder XdsFolderConfig `json:"folder"`
120 // EventCB Event emitter callback
121 type EventCB func(privData interface{}, evtData interface{}) error
123 // caller Used to chain event listeners
128 PrivateData interface{}
131 const _IDTempoPrefix = "tempo-"
133 // NewXdsServer creates an instance of XdsServer
134 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
137 ID: _IDTempoPrefix + uuid.NewV1().String(),
139 APIURL: conf.APIBaseURL + conf.APIPartialURL,
140 PartialURL: conf.APIPartialURL,
141 ConnRetry: conf.ConnRetry,
145 sockEvents: make(map[string][]*caller),
146 sockEventsLock: &sync.Mutex{},
151 // Close Free and close XDS Server connection
152 func (xs *XdsServer) Close() error {
160 // Connect Establish HTTP connection with XDS Server
161 func (xs *XdsServer) Connect() error {
169 for retry = xs.ConnRetry; retry > 0; retry-- {
170 if err = xs._CreateConnectHTTP(); err == nil {
173 if retry == xs.ConnRetry {
174 // Notify only on the first conn error
175 // doing that avoid 2 notifs (conn false; conn true) on startup
178 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
179 time.Sleep(time.Second)
182 // FIXME: re-use _reconnect to wait longer in background
183 return fmt.Errorf("Connection to XDS Server failure")
189 // Check HTTP connection and establish WS connection
190 err = xs._connect(false)
195 // IsTempoID returns true when server as a temporary id
196 func (xs *XdsServer) IsTempoID() bool {
197 return strings.HasPrefix(xs.ID, _IDTempoPrefix)
200 // SetLoggerOutput Set logger ou
201 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
205 // SendCommand Send a command to XDS Server
206 func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) {
208 if !strings.HasPrefix("/", cmd) {
211 return xs.client.HTTPPostWithRes(url, string(body))
214 // GetVersion Send Get request to retrieve XDS Server version
215 func (xs *XdsServer) GetVersion(res interface{}) error {
216 return xs.client.Get("/version", &res)
219 // GetFolders Send GET request to get current folder configuration
220 func (xs *XdsServer) GetFolders(folders *[]XdsFolderConfig) error {
221 return xs.client.Get("/folders", folders)
224 // FolderAdd Send POST request to add a folder
225 func (xs *XdsServer) FolderAdd(fld *XdsFolderConfig, res interface{}) error {
226 err := xs.client.Post("/folders", fld, res)
228 return fmt.Errorf("FolderAdd error: %s", err.Error())
233 // FolderDelete Send DELETE request to delete a folder
234 func (xs *XdsServer) FolderDelete(id string) error {
235 return xs.client.HTTPDelete("/folders/" + id)
238 // FolderSync Send POST request to force synchronization of a folder
239 func (xs *XdsServer) FolderSync(id string) error {
240 return xs.client.HTTPPost("/folders/sync/"+id, "")
243 // FolderUpdate Send PUT request to update a folder
244 func (xs *XdsServer) FolderUpdate(fld *XdsFolderConfig, resFld *XdsFolderConfig) error {
245 return xs.client.Put("/folders/"+fld.ID, fld, resFld)
248 // SetAPIRouterGroup .
249 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
253 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
254 func (xs *XdsServer) PassthroughGet(url string) {
255 if xs.apiRouter == nil {
256 xs.Log.Errorf("apiRouter not set !")
260 xs.apiRouter.GET(url, func(c *gin.Context) {
262 // Take care of param (eg. id in /projects/:id)
264 if strings.Contains(url, ":") {
265 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
268 if err := xs.client.Get(nURL, &data); err != nil {
269 if strings.Contains(err.Error(), "connection refused") {
273 common.APIError(c, err.Error())
277 c.JSON(http.StatusOK, data)
281 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
282 func (xs *XdsServer) PassthroughPost(url string) {
283 if xs.apiRouter == nil {
284 xs.Log.Errorf("apiRouter not set !")
288 xs.apiRouter.POST(url, func(c *gin.Context) {
290 n, err := c.Request.Body.Read(bodyReq)
292 common.APIError(c, err.Error())
296 // Take care of param (eg. id in /projects/:id)
298 if strings.Contains(url, ":") {
299 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
303 body, err := json.Marshal(bodyReq[:n])
305 common.APIError(c, err.Error())
309 response, err := xs.client.HTTPPostWithRes(nURL, string(body))
311 common.APIError(c, err.Error())
315 bodyRes, err := ioutil.ReadAll(response.Body)
317 common.APIError(c, "Cannot read response body")
320 c.JSON(http.StatusOK, string(bodyRes))
324 // EventRegister Post a request to register to an XdsServer event
325 func (xs *XdsServer) EventRegister(evName string, id string) error {
326 return xs.client.Post(
328 XdsEventRegisterArgs{
335 // EventOn Register a callback on events reception
336 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
337 if xs.ioSock == nil {
338 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
341 xs.sockEventsLock.Lock()
342 defer xs.sockEventsLock.Unlock()
344 if _, exist := xs.sockEvents[evName]; !exist {
345 // Register listener only the first time
348 // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange
350 if evName == "event:folder-state-change" {
351 err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
352 xs.sockEventsLock.Lock()
353 sEvts := make([]*caller, len(xs.sockEvents[evn]))
354 copy(sEvts, xs.sockEvents[evn])
355 xs.sockEventsLock.Unlock()
356 for _, c := range sEvts {
357 c.Func(c.PrivateData, data)
362 err = xs.ioSock.On(evn, func(data interface{}) error {
363 xs.sockEventsLock.Lock()
364 sEvts := make([]*caller, len(xs.sockEvents[evn]))
365 copy(sEvts, xs.sockEvents[evn])
366 xs.sockEventsLock.Unlock()
367 for _, c := range sEvts {
368 c.Func(c.PrivateData, data)
382 PrivateData: privData,
385 xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
389 // EventOff Un-register a (or all) callbacks associated to an event
390 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
391 xs.sockEventsLock.Lock()
392 defer xs.sockEventsLock.Unlock()
393 if _, exist := xs.sockEvents[evName]; exist {
396 xs.sockEvents[evName] = []*caller{}
398 // Un-register only the specified callback
399 for i, ff := range xs.sockEvents[evName] {
400 if uuid.Equal(ff.id, id) {
401 xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
410 // ProjectToFolder Convert Project structure to Folder structure
411 func (xs *XdsServer) ProjectToFolder(pPrj apiv1.ProjectConfig) *XdsFolderConfig {
413 if pPrj.Type == XdsTypeCloudSync {
414 stID, _ = xs.SThg.IDGet()
416 // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
417 fPrj := XdsFolderConfig{
420 ClientPath: pPrj.ClientPath,
421 Type: XdsFolderType(pPrj.Type),
423 IsInSync: pPrj.IsInSync,
424 DefaultSdk: pPrj.DefaultSdk,
425 ClientData: pPrj.ClientData,
426 DataPathMap: XdsPathMapConfig{
427 ServerPath: pPrj.ServerPath,
429 DataCloudSync: XdsCloudSyncConfig{
431 STLocIsInSync: pPrj.IsInSync,
432 STLocStatus: pPrj.Status,
433 STSvrIsInSync: pPrj.IsInSync,
434 STSvrStatus: pPrj.Status,
441 // FolderToProject Convert Folder structure to Project structure
442 func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) apiv1.ProjectConfig {
443 inSync := fPrj.IsInSync
446 if fPrj.Type == XdsTypeCloudSync {
447 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
449 sts = fPrj.DataCloudSync.STSvrStatus
450 switch fPrj.DataCloudSync.STLocStatus {
451 case apiv1.StatusErrorConfig, apiv1.StatusDisable, apiv1.StatusPause:
452 sts = fPrj.DataCloudSync.STLocStatus
454 case apiv1.StatusSyncing:
455 if sts != apiv1.StatusErrorConfig && sts != apiv1.StatusDisable && sts != apiv1.StatusPause {
456 sts = apiv1.StatusSyncing
459 case apiv1.StatusEnable:
465 pPrj := apiv1.ProjectConfig{
469 ClientPath: fPrj.ClientPath,
470 ServerPath: fPrj.DataPathMap.ServerPath,
471 Type: apiv1.ProjectType(fPrj.Type),
474 DefaultSdk: fPrj.DefaultSdk,
475 ClientData: fPrj.ClientData,
484 // Create HTTP client
485 func (xs *XdsServer) _CreateConnectHTTP() error {
487 xs.client, err = common.HTTPNewClient(xs.BaseURL,
488 common.HTTPClientConfig{
489 URLPrefix: "/api/v1",
490 HeaderClientKeyName: "Xds-Sid",
493 LogPrefix: "XDSSERVER: ",
494 LogLevel: common.HTTPLogLevelWarning,
497 xs.client.SetLogLevel(xs.Log.Level.String())
500 msg := ": " + err.Error()
501 if strings.Contains(err.Error(), "connection refused") {
502 msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
504 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
506 if xs.client == nil {
507 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
513 // Re-established connection
514 func (xs *XdsServer) _reconnect() error {
515 err := xs._connect(true)
517 // Reload projects list for this server
518 err = xs.projects.Init(xs)
523 // Established HTTP and WS connection and retrieve XDSServer config
524 func (xs *XdsServer) _connect(reConn bool) error {
526 xdsCfg := XdsServerConfig{}
527 if err := xs.client.Get("/config", &xdsCfg); err != nil {
535 if reConn && xs.ID != xdsCfg.ID {
536 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
539 // Update local XDS config
541 xs.ServerConfig = &xdsCfg
543 // Establish WS connection and register listen
544 if err := xs._SocketConnect(); err != nil {
555 // Create WebSocket (io.socket) connection
556 func (xs *XdsServer) _SocketConnect() error {
558 xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
560 opts := &sio_client.Options{
561 Transport: "websocket",
562 Header: make(map[string][]string),
564 opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
566 iosk, err := sio_client.NewClient(xs.BaseURL, opts)
568 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
572 // Register some listeners
574 iosk.On("error", func(err error) {
575 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
576 if xs.CBOnError != nil {
581 iosk.On("disconnection", func(err error) {
582 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
583 if xs.CBOnDisconnect != nil {
584 xs.CBOnDisconnect(err)
589 // Try to reconnect during 15min (or at least while not disabled)
593 for !xs.Disabled && !xs.Connected {
598 if waitTime > 15*60 {
599 xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
602 time.Sleep(time.Second * time.Duration(waitTime))
603 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
610 // XXX - There is no connection event generated so, just consider that
611 // we are connected when NewClient return successfully
612 /* iosk.On("connection", func() { ... }) */
613 xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
618 // Send event to notify changes
619 func (xs *XdsServer) _NotifyState() {
621 evSts := apiv1.ServerCfg{
625 PartialURL: xs.PartialURL,
626 ConnRetry: xs.ConnRetry,
627 Connected: xs.Connected,
629 if err := xs.events.Emit(apiv1.EVTServerConfig, evSts, ""); err != nil {
630 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)