13 "github.com/gin-gonic/gin"
14 "github.com/iotbzh/xds-agent/lib/apiv1"
15 "github.com/iotbzh/xds-agent/lib/xdsconfig"
16 common "github.com/iotbzh/xds-common/golib"
17 uuid "github.com/satori/go.uuid"
18 sio_client "github.com/sebd71/go-socket.io-client"
22 type XdsServer struct {
31 ServerConfig *XdsServerConfig
35 CBOnDisconnect func(error)
36 sockEvents map[string][]*caller
37 sockEventsLock *sync.Mutex
40 client *common.HTTPClient
41 ioSock *sio_client.Client
43 apiRouter *gin.RouterGroup
46 // XdsServerConfig Data return by GET /config
47 type XdsServerConfig struct {
49 Version string `json:"version"`
50 APIVersion string `json:"apiVersion"`
51 VersionGitTag string `json:"gitTag"`
52 SupportedSharing map[string]bool `json:"supportedSharing"`
53 Builder XdsBuilderConfig `json:"builder"`
56 // XdsBuilderConfig represents the builder container configuration
57 type XdsBuilderConfig struct {
59 Port string `json:"port"`
60 SyncThingID string `json:"syncThingID"`
63 // XdsFolderType XdsServer folder type
64 type XdsFolderType string
67 XdsTypePathMap = "PathMap"
68 XdsTypeCloudSync = "CloudSync"
69 XdsTypeCifsSmb = "CIFS"
72 // XdsFolderConfig XdsServer folder config
73 type XdsFolderConfig struct {
75 Label string `json:"label"`
76 ClientPath string `json:"path"`
77 Type XdsFolderType `json:"type"`
78 Status string `json:"status"`
79 IsInSync bool `json:"isInSync"`
80 DefaultSdk string `json:"defaultSdk"`
81 // Specific data depending on which Type is used
82 DataPathMap XdsPathMapConfig `json:"dataPathMap,omitempty"`
83 DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"`
86 // XdsPathMapConfig Path mapping specific data
87 type XdsPathMapConfig struct {
88 ServerPath string `json:"serverPath"`
89 CheckFile string `json:"checkFile"`
90 CheckContent string `json:"checkContent"`
93 // XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data
94 type XdsCloudSyncConfig struct {
95 SyncThingID string `json:"syncThingID"`
96 STSvrStatus string `json:"-"`
97 STSvrIsInSync bool `json:"-"`
98 STLocStatus string `json:"-"`
99 STLocIsInSync bool `json:"-"`
102 // XdsEventRegisterArgs arguments used to register to XDS server events
103 type XdsEventRegisterArgs struct {
104 Name string `json:"name"`
105 ProjectID string `json:"filterProjectID"`
108 // XdsEventFolderChange Folder change event structure
109 type XdsEventFolderChange struct {
110 Time string `json:"time"`
111 Type string `json:"type"`
112 Folder XdsFolderConfig `json:"folder"`
115 // Event emitter callback
116 type EventCB func(privData interface{}, evtData interface{}) error
118 // caller Used to chain event listeners
123 PrivateData interface{}
126 const _IDTempoPrefix = "tempo-"
128 // NewXdsServer creates an instance of XdsServer
129 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
132 ID: _IDTempoPrefix + uuid.NewV1().String(),
134 APIURL: conf.APIBaseURL + conf.APIPartialURL,
135 PartialURL: conf.APIPartialURL,
136 ConnRetry: conf.ConnRetry,
140 sockEvents: make(map[string][]*caller),
141 sockEventsLock: &sync.Mutex{},
146 // Close Free and close XDS Server connection
147 func (xs *XdsServer) Close() error {
155 // Connect Establish HTTP connection with XDS Server
156 func (xs *XdsServer) Connect() error {
164 for retry = xs.ConnRetry; retry > 0; retry-- {
165 if err = xs._CreateConnectHTTP(); err == nil {
168 if retry == xs.ConnRetry {
169 // Notify only on the first conn error
170 // doing that avoid 2 notifs (conn false; conn true) on startup
173 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
174 time.Sleep(time.Second)
177 // FIXME: re-use _reconnect to wait longer in background
178 return fmt.Errorf("Connection to XDS Server failure")
184 // Check HTTP connection and establish WS connection
185 err = xs._connect(false)
190 // IsTempoID returns true when server as a temporary id
191 func (xs *XdsServer) IsTempoID() bool {
192 return strings.HasPrefix(xs.ID, _IDTempoPrefix)
195 // SetLoggerOutput Set logger ou
196 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
200 // SendCommand Send a command to XDS Server
201 func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) {
203 if !strings.HasPrefix("/", cmd) {
206 return xs.client.HTTPPostWithRes(url, string(body))
209 // GetVersion Send Get request to retrieve XDS Server version
210 func (xs *XdsServer) GetVersion(res interface{}) error {
211 return xs._HTTPGet("/version", &res)
214 // GetFolders Send GET request to get current folder configuration
215 func (xs *XdsServer) GetFolders(folders *[]XdsFolderConfig) error {
216 return xs._HTTPGet("/folders", folders)
219 // FolderAdd Send POST request to add a folder
220 func (xs *XdsServer) FolderAdd(fld *XdsFolderConfig, res interface{}) error {
221 response, err := xs._HTTPPost("/folders", fld)
225 if response.StatusCode != 200 {
226 return fmt.Errorf("FolderAdd error status=%s", response.Status)
228 // Result is a XdsFolderConfig that is equivalent to ProjectConfig
229 err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
234 // FolderDelete Send DELETE request to delete a folder
235 func (xs *XdsServer) FolderDelete(id string) error {
236 return xs.client.HTTPDelete("/folders/" + id)
239 // FolderSync Send POST request to force synchronization of a folder
240 func (xs *XdsServer) FolderSync(id string) error {
241 return xs.client.HTTPPost("/folders/sync/"+id, "")
244 // SetAPIRouterGroup .
245 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
249 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
250 func (xs *XdsServer) PassthroughGet(url string) {
251 if xs.apiRouter == nil {
252 xs.Log.Errorf("apiRouter not set !")
256 xs.apiRouter.GET(url, func(c *gin.Context) {
258 // Take care of param (eg. id in /projects/:id)
260 if strings.Contains(url, ":") {
261 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
264 if err := xs._HTTPGet(nURL, &data); err != nil {
265 if strings.Contains(err.Error(), "connection refused") {
269 common.APIError(c, err.Error())
273 c.JSON(http.StatusOK, data)
277 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
278 func (xs *XdsServer) PassthroughPost(url string) {
279 if xs.apiRouter == nil {
280 xs.Log.Errorf("apiRouter not set !")
284 xs.apiRouter.POST(url, func(c *gin.Context) {
286 n, err := c.Request.Body.Read(bodyReq)
288 common.APIError(c, err.Error())
292 // Take care of param (eg. id in /projects/:id)
294 if strings.Contains(url, ":") {
295 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
298 response, err := xs._HTTPPost(nURL, bodyReq[:n])
300 common.APIError(c, err.Error())
303 bodyRes, err := ioutil.ReadAll(response.Body)
305 common.APIError(c, "Cannot read response body")
308 c.JSON(http.StatusOK, string(bodyRes))
312 // EventRegister Post a request to register to an XdsServer event
313 func (xs *XdsServer) EventRegister(evName string, id string) error {
315 _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{
322 // EventOn Register a callback on events reception
323 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
324 if xs.ioSock == nil {
325 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
328 xs.sockEventsLock.Lock()
329 defer xs.sockEventsLock.Unlock()
331 if _, exist := xs.sockEvents[evName]; !exist {
332 // Register listener only the first time
335 // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange
337 if evName == "event:FolderStateChanged" {
338 err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
339 xs.sockEventsLock.Lock()
340 sEvts := make([]*caller, len(xs.sockEvents[evn]))
341 copy(sEvts, xs.sockEvents[evn])
342 xs.sockEventsLock.Unlock()
343 for _, c := range sEvts {
344 c.Func(c.PrivateData, data)
349 err = xs.ioSock.On(evn, func(data interface{}) error {
350 xs.sockEventsLock.Lock()
351 sEvts := make([]*caller, len(xs.sockEvents[evn]))
352 copy(sEvts, xs.sockEvents[evn])
353 xs.sockEventsLock.Unlock()
354 for _, c := range sEvts {
355 c.Func(c.PrivateData, data)
369 PrivateData: privData,
372 xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
376 // EventOff Un-register a (or all) callbacks associated to an event
377 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
378 xs.sockEventsLock.Lock()
379 defer xs.sockEventsLock.Unlock()
380 if _, exist := xs.sockEvents[evName]; exist {
383 xs.sockEvents[evName] = []*caller{}
385 // Un-register only the specified callback
386 for i, ff := range xs.sockEvents[evName] {
387 if uuid.Equal(ff.id, id) {
388 xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
397 // ProjectToFolder Convert Project structure to Folder structure
398 func (xs *XdsServer) ProjectToFolder(pPrj apiv1.ProjectConfig) *XdsFolderConfig {
400 if pPrj.Type == XdsTypeCloudSync {
401 stID, _ = xs.SThg.IDGet()
403 fPrj := XdsFolderConfig{
406 ClientPath: pPrj.ClientPath,
407 Type: XdsFolderType(pPrj.Type),
409 IsInSync: pPrj.IsInSync,
410 DefaultSdk: pPrj.DefaultSdk,
411 DataPathMap: XdsPathMapConfig{
412 ServerPath: pPrj.ServerPath,
414 DataCloudSync: XdsCloudSyncConfig{
416 STLocIsInSync: pPrj.IsInSync,
417 STLocStatus: pPrj.Status,
418 STSvrIsInSync: pPrj.IsInSync,
419 STSvrStatus: pPrj.Status,
426 // FolderToProject Convert Folder structure to Project structure
427 func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) apiv1.ProjectConfig {
428 inSync := fPrj.IsInSync
431 if fPrj.Type == XdsTypeCloudSync {
432 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
434 sts = fPrj.DataCloudSync.STSvrStatus
435 switch fPrj.DataCloudSync.STLocStatus {
436 case apiv1.StatusErrorConfig, apiv1.StatusDisable, apiv1.StatusPause:
437 sts = fPrj.DataCloudSync.STLocStatus
439 case apiv1.StatusSyncing:
440 if sts != apiv1.StatusErrorConfig && sts != apiv1.StatusDisable && sts != apiv1.StatusPause {
441 sts = apiv1.StatusSyncing
444 case apiv1.StatusEnable:
450 pPrj := apiv1.ProjectConfig{
454 ClientPath: fPrj.ClientPath,
455 ServerPath: fPrj.DataPathMap.ServerPath,
456 Type: apiv1.ProjectType(fPrj.Type),
459 DefaultSdk: fPrj.DefaultSdk,
468 // Create HTTP client
469 func (xs *XdsServer) _CreateConnectHTTP() error {
471 xs.client, err = common.HTTPNewClient(xs.BaseURL,
472 common.HTTPClientConfig{
473 URLPrefix: "/api/v1",
474 HeaderClientKeyName: "Xds-Sid",
477 LogPrefix: "XDSSERVER: ",
478 LogLevel: common.HTTPLogLevelWarning,
481 xs.client.SetLogLevel(xs.Log.Level.String())
484 msg := ": " + err.Error()
485 if strings.Contains(err.Error(), "connection refused") {
486 msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
488 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
490 if xs.client == nil {
491 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
498 func (xs *XdsServer) _HTTPGet(url string, data interface{}) error {
500 if err := xs.client.HTTPGet(url, &dd); err != nil {
503 return json.Unmarshal(dd, &data)
507 func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) {
508 body, err := json.Marshal(data)
512 return xs.client.HTTPPostWithRes(url, string(body))
515 // Re-established connection
516 func (xs *XdsServer) _reconnect() error {
517 err := xs._connect(true)
519 // Reload projects list for this server
520 err = xs.projects.Init(xs)
525 // Established HTTP and WS connection and retrieve XDSServer config
526 func (xs *XdsServer) _connect(reConn bool) error {
528 xdsCfg := XdsServerConfig{}
529 if err := xs._HTTPGet("/config", &xdsCfg); err != nil {
537 if reConn && xs.ID != xdsCfg.ID {
538 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
541 // Update local XDS config
543 xs.ServerConfig = &xdsCfg
545 // Establish WS connection and register listen
546 if err := xs._SocketConnect(); err != nil {
557 // Create WebSocket (io.socket) connection
558 func (xs *XdsServer) _SocketConnect() error {
560 xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
562 opts := &sio_client.Options{
563 Transport: "websocket",
564 Header: make(map[string][]string),
566 opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
568 iosk, err := sio_client.NewClient(xs.BaseURL, opts)
570 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
574 // Register some listeners
576 iosk.On("error", func(err error) {
577 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
578 if xs.CBOnError != nil {
583 iosk.On("disconnection", func(err error) {
584 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
585 if xs.CBOnDisconnect != nil {
586 xs.CBOnDisconnect(err)
591 // Try to reconnect during 15min (or at least while not disabled)
595 for !xs.Disabled && !xs.Connected {
600 if waitTime > 15*60 {
601 xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
604 time.Sleep(time.Second * time.Duration(waitTime))
605 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
612 // XXX - There is no connection event generated so, just consider that
613 // we are connected when NewClient return successfully
614 /* iosk.On("connection", func() { ... }) */
615 xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
620 // Send event to notify changes
621 func (xs *XdsServer) _NotifyState() {
623 evSts := apiv1.ServerCfg{
627 PartialURL: xs.PartialURL,
628 ConnRetry: xs.ConnRetry,
629 Connected: xs.Connected,
631 if err := xs.events.Emit(apiv1.EVTServerConfig, evSts); err != nil {
632 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)