13 "github.com/gin-gonic/gin"
14 "github.com/iotbzh/xds-agent/lib/apiv1"
15 "github.com/iotbzh/xds-agent/lib/xdsconfig"
16 common "github.com/iotbzh/xds-common/golib"
17 uuid "github.com/satori/go.uuid"
18 sio_client "github.com/sebd71/go-socket.io-client"
22 type XdsServer struct {
31 ServerConfig *XdsServerConfig
35 CBOnDisconnect func(error)
36 sockEvents map[string][]*caller
37 sockEventsLock *sync.Mutex
40 client *common.HTTPClient
41 ioSock *sio_client.Client
43 apiRouter *gin.RouterGroup
46 // XdsServerConfig Data return by GET /config
47 type XdsServerConfig struct {
49 Version string `json:"version"`
50 APIVersion string `json:"apiVersion"`
51 VersionGitTag string `json:"gitTag"`
52 SupportedSharing map[string]bool `json:"supportedSharing"`
53 Builder XdsBuilderConfig `json:"builder"`
56 // XdsBuilderConfig represents the builder container configuration
57 type XdsBuilderConfig struct {
59 Port string `json:"port"`
60 SyncThingID string `json:"syncThingID"`
63 // XdsFolderType XdsServer folder type
64 type XdsFolderType string
67 XdsTypePathMap = "PathMap"
68 XdsTypeCloudSync = "CloudSync"
69 XdsTypeCifsSmb = "CIFS"
72 // XdsFolderConfig XdsServer folder config
73 type XdsFolderConfig struct {
75 Label string `json:"label"`
76 ClientPath string `json:"path"`
77 Type XdsFolderType `json:"type"`
78 Status string `json:"status"`
79 IsInSync bool `json:"isInSync"`
80 DefaultSdk string `json:"defaultSdk"`
81 // Specific data depending on which Type is used
82 DataPathMap XdsPathMapConfig `json:"dataPathMap,omitempty"`
83 DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"`
86 // XdsPathMapConfig Path mapping specific data
87 type XdsPathMapConfig struct {
88 ServerPath string `json:"serverPath"`
89 CheckFile string `json:"checkFile"`
90 CheckContent string `json:"checkContent"`
93 // XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data
94 type XdsCloudSyncConfig struct {
95 SyncThingID string `json:"syncThingID"`
96 STSvrStatus string `json:"-"`
97 STSvrIsInSync bool `json:"-"`
98 STLocStatus string `json:"-"`
99 STLocIsInSync bool `json:"-"`
102 // XdsEventRegisterArgs arguments used to register to XDS server events
103 type XdsEventRegisterArgs struct {
104 Name string `json:"name"`
105 ProjectID string `json:"filterProjectID"`
108 // XdsEventFolderChange Folder change event structure
109 type XdsEventFolderChange struct {
110 Time string `json:"time"`
111 Type string `json:"type"`
112 Folder XdsFolderConfig `json:"folder"`
115 // caller Used to chain event listeners
119 Func func(interface{})
122 const _IDTempoPrefix = "tempo-"
124 // NewXdsServer creates an instance of XdsServer
125 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
128 ID: _IDTempoPrefix + uuid.NewV1().String(),
130 APIURL: conf.APIBaseURL + conf.APIPartialURL,
131 PartialURL: conf.APIPartialURL,
132 ConnRetry: conf.ConnRetry,
136 sockEvents: make(map[string][]*caller),
137 sockEventsLock: &sync.Mutex{},
142 // Close Free and close XDS Server connection
143 func (xs *XdsServer) Close() error {
151 // Connect Establish HTTP connection with XDS Server
152 func (xs *XdsServer) Connect() error {
160 for retry = xs.ConnRetry; retry > 0; retry-- {
161 if err = xs._CreateConnectHTTP(); err == nil {
164 if retry == xs.ConnRetry {
165 // Notify only on the first conn error
166 // doing that avoid 2 notifs (conn false; conn true) on startup
169 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
170 time.Sleep(time.Second)
173 // FIXME: re-use _reconnect to wait longer in background
174 return fmt.Errorf("Connection to XDS Server failure")
180 // Check HTTP connection and establish WS connection
181 err = xs._connect(false)
186 // IsTempoID returns true when server as a temporary id
187 func (xs *XdsServer) IsTempoID() bool {
188 return strings.HasPrefix(xs.ID, _IDTempoPrefix)
191 // SetLoggerOutput Set logger ou
192 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
196 // SendCommand Send a command to XDS Server
197 func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) {
199 if !strings.HasPrefix("/", cmd) {
202 return xs.client.HTTPPostWithRes(url, string(body))
205 // GetVersion Send Get request to retrieve XDS Server version
206 func (xs *XdsServer) GetVersion(res interface{}) error {
207 return xs._HTTPGet("/version", &res)
210 // GetFolders Send GET request to get current folder configuration
211 func (xs *XdsServer) GetFolders(folders *[]XdsFolderConfig) error {
212 return xs._HTTPGet("/folders", folders)
215 // FolderAdd Send POST request to add a folder
216 func (xs *XdsServer) FolderAdd(fld *XdsFolderConfig, res interface{}) error {
217 response, err := xs._HTTPPost("/folder", fld)
221 if response.StatusCode != 200 {
222 return fmt.Errorf("FolderAdd error status=%s", response.Status)
224 // Result is a XdsFolderConfig that is equivalent to ProjectConfig
225 err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
230 // FolderDelete Send DELETE request to delete a folder
231 func (xs *XdsServer) FolderDelete(id string) error {
232 return xs.client.HTTPDelete("/folder/" + id)
235 // FolderSync Send POST request to force synchronization of a folder
236 func (xs *XdsServer) FolderSync(id string) error {
237 return xs.client.HTTPPost("/folder/sync/"+id, "")
240 // SetAPIRouterGroup .
241 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
245 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
246 func (xs *XdsServer) PassthroughGet(url string) {
247 if xs.apiRouter == nil {
248 xs.Log.Errorf("apiRouter not set !")
252 xs.apiRouter.GET(url, func(c *gin.Context) {
254 if err := xs._HTTPGet(url, &data); err != nil {
255 if strings.Contains(err.Error(), "connection refused") {
259 common.APIError(c, err.Error())
263 c.JSON(http.StatusOK, data)
267 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
268 func (xs *XdsServer) PassthroughPost(url string) {
269 if xs.apiRouter == nil {
270 xs.Log.Errorf("apiRouter not set !")
274 xs.apiRouter.POST(url, func(c *gin.Context) {
276 n, err := c.Request.Body.Read(bodyReq)
278 common.APIError(c, err.Error())
282 response, err := xs._HTTPPost(url, bodyReq[:n])
284 common.APIError(c, err.Error())
287 bodyRes, err := ioutil.ReadAll(response.Body)
289 common.APIError(c, "Cannot read response body")
292 c.JSON(http.StatusOK, string(bodyRes))
296 // EventRegister Post a request to register to an XdsServer event
297 func (xs *XdsServer) EventRegister(evName string, id string) error {
299 _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{
306 // EventOn Register a callback on events reception
307 func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) {
308 if xs.ioSock == nil {
309 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
312 xs.sockEventsLock.Lock()
313 defer xs.sockEventsLock.Unlock()
315 if _, exist := xs.sockEvents[evName]; !exist {
316 // Register listener only the first time
319 // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange
321 if evName == "event:FolderStateChanged" {
322 err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
323 xs.sockEventsLock.Lock()
324 defer xs.sockEventsLock.Unlock()
325 for _, c := range xs.sockEvents[evn] {
331 err = xs.ioSock.On(evn, f)
344 xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
348 // EventOff Un-register a (or all) callbacks associated to an event
349 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
350 xs.sockEventsLock.Lock()
351 defer xs.sockEventsLock.Unlock()
352 if _, exist := xs.sockEvents[evName]; exist {
355 xs.sockEvents[evName] = []*caller{}
357 // Un-register only the specified callback
358 for i, ff := range xs.sockEvents[evName] {
359 if uuid.Equal(ff.id, id) {
360 xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
369 // ProjectToFolder Convert Project structure to Folder structure
370 func (xs *XdsServer) ProjectToFolder(pPrj apiv1.ProjectConfig) *XdsFolderConfig {
372 if pPrj.Type == XdsTypeCloudSync {
373 stID, _ = xs.SThg.IDGet()
375 fPrj := XdsFolderConfig{
378 ClientPath: pPrj.ClientPath,
379 Type: XdsFolderType(pPrj.Type),
381 IsInSync: pPrj.IsInSync,
382 DefaultSdk: pPrj.DefaultSdk,
383 DataPathMap: XdsPathMapConfig{
384 ServerPath: pPrj.ServerPath,
386 DataCloudSync: XdsCloudSyncConfig{
388 STLocIsInSync: pPrj.IsInSync,
389 STLocStatus: pPrj.Status,
390 STSvrIsInSync: pPrj.IsInSync,
391 STSvrStatus: pPrj.Status,
398 // FolderToProject Convert Folder structure to Project structure
399 func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) apiv1.ProjectConfig {
400 inSync := fPrj.IsInSync
403 if fPrj.Type == XdsTypeCloudSync {
404 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
406 sts = fPrj.DataCloudSync.STSvrStatus
407 switch fPrj.DataCloudSync.STLocStatus {
408 case apiv1.StatusErrorConfig, apiv1.StatusDisable, apiv1.StatusPause:
409 sts = fPrj.DataCloudSync.STLocStatus
411 case apiv1.StatusSyncing:
412 if sts != apiv1.StatusErrorConfig && sts != apiv1.StatusDisable && sts != apiv1.StatusPause {
413 sts = apiv1.StatusSyncing
416 case apiv1.StatusEnable:
422 pPrj := apiv1.ProjectConfig{
426 ClientPath: fPrj.ClientPath,
427 ServerPath: fPrj.DataPathMap.ServerPath,
428 Type: apiv1.ProjectType(fPrj.Type),
431 DefaultSdk: fPrj.DefaultSdk,
440 // Create HTTP client
441 func (xs *XdsServer) _CreateConnectHTTP() error {
443 xs.client, err = common.HTTPNewClient(xs.BaseURL,
444 common.HTTPClientConfig{
445 URLPrefix: "/api/v1",
446 HeaderClientKeyName: "Xds-Sid",
449 LogPrefix: "XDSSERVER: ",
450 LogLevel: common.HTTPLogLevelWarning,
453 xs.client.SetLogLevel(xs.Log.Level.String())
456 msg := ": " + err.Error()
457 if strings.Contains(err.Error(), "connection refused") {
458 msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
460 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
462 if xs.client == nil {
463 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
470 func (xs *XdsServer) _HTTPGet(url string, data interface{}) error {
472 if err := xs.client.HTTPGet(url, &dd); err != nil {
475 return json.Unmarshal(dd, &data)
479 func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) {
480 body, err := json.Marshal(data)
484 return xs.client.HTTPPostWithRes(url, string(body))
487 // Re-established connection
488 func (xs *XdsServer) _reconnect() error {
489 err := xs._connect(true)
491 // Reload projects list for this server
492 err = xs.projects.Init(xs)
497 // Established HTTP and WS connection and retrieve XDSServer config
498 func (xs *XdsServer) _connect(reConn bool) error {
500 xdsCfg := XdsServerConfig{}
501 if err := xs._HTTPGet("/config", &xdsCfg); err != nil {
509 if reConn && xs.ID != xdsCfg.ID {
510 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
513 // Update local XDS config
515 xs.ServerConfig = &xdsCfg
517 // Establish WS connection and register listen
518 if err := xs._SocketConnect(); err != nil {
529 // Create WebSocket (io.socket) connection
530 func (xs *XdsServer) _SocketConnect() error {
532 xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
534 opts := &sio_client.Options{
535 Transport: "websocket",
536 Header: make(map[string][]string),
538 opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
540 iosk, err := sio_client.NewClient(xs.BaseURL, opts)
542 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
546 // Register some listeners
548 iosk.On("error", func(err error) {
549 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
550 if xs.CBOnError != nil {
555 iosk.On("disconnection", func(err error) {
556 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
557 if xs.CBOnDisconnect != nil {
558 xs.CBOnDisconnect(err)
563 // Try to reconnect during 15min (or at least while not disabled)
567 for !xs.Disabled && !xs.Connected {
572 if waitTime > 15*60 {
573 xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
576 time.Sleep(time.Second * time.Duration(waitTime))
577 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
584 // XXX - There is no connection event generated so, just consider that
585 // we are connected when NewClient return successfully
586 /* iosk.On("connection", func() { ... }) */
587 xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
592 // Send event to notify changes
593 func (xs *XdsServer) _NotifyState() {
595 evSts := apiv1.ServerCfg{
599 PartialURL: xs.PartialURL,
600 ConnRetry: xs.ConnRetry,
601 Connected: xs.Connected,
603 if err := xs.events.Emit(apiv1.EVTServerConfig, evSts); err != nil {
604 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)