13 "github.com/gin-gonic/gin"
14 "github.com/iotbzh/xds-agent/lib/apiv1"
15 "github.com/iotbzh/xds-agent/lib/xdsconfig"
16 common "github.com/iotbzh/xds-common/golib"
17 uuid "github.com/satori/go.uuid"
18 sio_client "github.com/sebd71/go-socket.io-client"
22 type XdsServer struct {
31 ServerConfig *XdsServerConfig
35 CBOnDisconnect func(error)
36 sockEvents map[string][]*caller
37 sockEventsLock *sync.Mutex
40 client *common.HTTPClient
41 ioSock *sio_client.Client
43 apiRouter *gin.RouterGroup
46 // XdsServerConfig Data return by GET /config
47 type XdsServerConfig struct {
49 Version string `json:"version"`
50 APIVersion string `json:"apiVersion"`
51 VersionGitTag string `json:"gitTag"`
52 SupportedSharing map[string]bool `json:"supportedSharing"`
53 Builder XdsBuilderConfig `json:"builder"`
56 // XdsBuilderConfig represents the builder container configuration
57 type XdsBuilderConfig struct {
59 Port string `json:"port"`
60 SyncThingID string `json:"syncThingID"`
63 // XdsFolderType XdsServer folder type
64 type XdsFolderType string
67 // XdsTypePathMap Path Mapping folder type
68 XdsTypePathMap = "PathMap"
69 // XdsTypeCloudSync Cloud synchronization (AKA syncthing) folder type
70 XdsTypeCloudSync = "CloudSync"
71 // XdsTypeCifsSmb CIFS (AKA samba) folder type
72 XdsTypeCifsSmb = "CIFS"
75 // XdsFolderConfig XdsServer folder config
76 type XdsFolderConfig struct {
78 Label string `json:"label"`
79 ClientPath string `json:"path"`
80 Type XdsFolderType `json:"type"`
81 Status string `json:"status"`
82 IsInSync bool `json:"isInSync"`
83 DefaultSdk string `json:"defaultSdk"`
84 ClientData string `json:"clientData"` // free form field that can used by client
86 // Specific data depending on which Type is used
87 DataPathMap XdsPathMapConfig `json:"dataPathMap,omitempty"`
88 DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"`
91 // XdsPathMapConfig Path mapping specific data
92 type XdsPathMapConfig struct {
93 ServerPath string `json:"serverPath"`
94 CheckFile string `json:"checkFile"`
95 CheckContent string `json:"checkContent"`
98 // XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data
99 type XdsCloudSyncConfig struct {
100 SyncThingID string `json:"syncThingID"`
101 STSvrStatus string `json:"-"`
102 STSvrIsInSync bool `json:"-"`
103 STLocStatus string `json:"-"`
104 STLocIsInSync bool `json:"-"`
107 // XdsEventRegisterArgs arguments used to register to XDS server events
108 type XdsEventRegisterArgs struct {
109 Name string `json:"name"`
110 ProjectID string `json:"filterProjectID"`
113 // XdsEventFolderChange Folder change event structure
114 type XdsEventFolderChange struct {
115 Time string `json:"time"`
116 Type string `json:"type"`
117 Folder XdsFolderConfig `json:"folder"`
120 // EventCB Event emitter callback
121 type EventCB func(privData interface{}, evtData interface{}) error
123 // caller Used to chain event listeners
128 PrivateData interface{}
131 const _IDTempoPrefix = "tempo-"
133 // NewXdsServer creates an instance of XdsServer
134 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
137 ID: _IDTempoPrefix + uuid.NewV1().String(),
139 APIURL: conf.APIBaseURL + conf.APIPartialURL,
140 PartialURL: conf.APIPartialURL,
141 ConnRetry: conf.ConnRetry,
145 sockEvents: make(map[string][]*caller),
146 sockEventsLock: &sync.Mutex{},
151 // Close Free and close XDS Server connection
152 func (xs *XdsServer) Close() error {
160 // Connect Establish HTTP connection with XDS Server
161 func (xs *XdsServer) Connect() error {
169 for retry = xs.ConnRetry; retry > 0; retry-- {
170 if err = xs._CreateConnectHTTP(); err == nil {
173 if retry == xs.ConnRetry {
174 // Notify only on the first conn error
175 // doing that avoid 2 notifs (conn false; conn true) on startup
178 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
179 time.Sleep(time.Second)
182 // FIXME: re-use _reconnect to wait longer in background
183 return fmt.Errorf("Connection to XDS Server failure")
189 // Check HTTP connection and establish WS connection
190 err = xs._connect(false)
195 // IsTempoID returns true when server as a temporary id
196 func (xs *XdsServer) IsTempoID() bool {
197 return strings.HasPrefix(xs.ID, _IDTempoPrefix)
200 // SetLoggerOutput Set logger ou
201 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
205 // SendCommand Send a command to XDS Server
206 func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) {
208 if !strings.HasPrefix("/", cmd) {
211 return xs.client.HTTPPostWithRes(url, string(body))
214 // GetVersion Send Get request to retrieve XDS Server version
215 func (xs *XdsServer) GetVersion(res interface{}) error {
216 return xs._HTTPGet("/version", &res)
219 // GetFolders Send GET request to get current folder configuration
220 func (xs *XdsServer) GetFolders(folders *[]XdsFolderConfig) error {
221 return xs._HTTPGet("/folders", folders)
224 // FolderAdd Send POST request to add a folder
225 func (xs *XdsServer) FolderAdd(fld *XdsFolderConfig, res interface{}) error {
226 response, err := xs._HTTPPost("/folders", fld)
230 if response.StatusCode != 200 {
231 return fmt.Errorf("FolderAdd error status=%s", response.Status)
233 // Result is a XdsFolderConfig that is equivalent to ProjectConfig
234 err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
239 // FolderDelete Send DELETE request to delete a folder
240 func (xs *XdsServer) FolderDelete(id string) error {
241 return xs.client.HTTPDelete("/folders/" + id)
244 // FolderSync Send POST request to force synchronization of a folder
245 func (xs *XdsServer) FolderSync(id string) error {
246 return xs.client.HTTPPost("/folders/sync/"+id, "")
249 // FolderUpdate Send PUT request to update a folder
250 func (xs *XdsServer) FolderUpdate(fld *XdsFolderConfig, resFld *XdsFolderConfig) error {
251 return xs.client.Put("/folders/"+fld.ID, fld, resFld)
254 // SetAPIRouterGroup .
255 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
259 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
260 func (xs *XdsServer) PassthroughGet(url string) {
261 if xs.apiRouter == nil {
262 xs.Log.Errorf("apiRouter not set !")
266 xs.apiRouter.GET(url, func(c *gin.Context) {
268 // Take care of param (eg. id in /projects/:id)
270 if strings.Contains(url, ":") {
271 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
274 if err := xs._HTTPGet(nURL, &data); err != nil {
275 if strings.Contains(err.Error(), "connection refused") {
279 common.APIError(c, err.Error())
283 c.JSON(http.StatusOK, data)
287 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
288 func (xs *XdsServer) PassthroughPost(url string) {
289 if xs.apiRouter == nil {
290 xs.Log.Errorf("apiRouter not set !")
294 xs.apiRouter.POST(url, func(c *gin.Context) {
296 n, err := c.Request.Body.Read(bodyReq)
298 common.APIError(c, err.Error())
302 // Take care of param (eg. id in /projects/:id)
304 if strings.Contains(url, ":") {
305 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
308 response, err := xs._HTTPPost(nURL, bodyReq[:n])
310 common.APIError(c, err.Error())
313 bodyRes, err := ioutil.ReadAll(response.Body)
315 common.APIError(c, "Cannot read response body")
318 c.JSON(http.StatusOK, string(bodyRes))
322 // EventRegister Post a request to register to an XdsServer event
323 func (xs *XdsServer) EventRegister(evName string, id string) error {
325 _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{
332 // EventOn Register a callback on events reception
333 func (xs *XdsServer) EventOn(evName string, privData interface{}, f EventCB) (uuid.UUID, error) {
334 if xs.ioSock == nil {
335 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
338 xs.sockEventsLock.Lock()
339 defer xs.sockEventsLock.Unlock()
341 if _, exist := xs.sockEvents[evName]; !exist {
342 // Register listener only the first time
345 // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange
347 if evName == "event:folder-state-change" {
348 err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
349 xs.sockEventsLock.Lock()
350 sEvts := make([]*caller, len(xs.sockEvents[evn]))
351 copy(sEvts, xs.sockEvents[evn])
352 xs.sockEventsLock.Unlock()
353 for _, c := range sEvts {
354 c.Func(c.PrivateData, data)
359 err = xs.ioSock.On(evn, func(data interface{}) error {
360 xs.sockEventsLock.Lock()
361 sEvts := make([]*caller, len(xs.sockEvents[evn]))
362 copy(sEvts, xs.sockEvents[evn])
363 xs.sockEventsLock.Unlock()
364 for _, c := range sEvts {
365 c.Func(c.PrivateData, data)
379 PrivateData: privData,
382 xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
386 // EventOff Un-register a (or all) callbacks associated to an event
387 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
388 xs.sockEventsLock.Lock()
389 defer xs.sockEventsLock.Unlock()
390 if _, exist := xs.sockEvents[evName]; exist {
393 xs.sockEvents[evName] = []*caller{}
395 // Un-register only the specified callback
396 for i, ff := range xs.sockEvents[evName] {
397 if uuid.Equal(ff.id, id) {
398 xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
407 // ProjectToFolder Convert Project structure to Folder structure
408 func (xs *XdsServer) ProjectToFolder(pPrj apiv1.ProjectConfig) *XdsFolderConfig {
410 if pPrj.Type == XdsTypeCloudSync {
411 stID, _ = xs.SThg.IDGet()
413 // TODO: limit ClientData size and gzip it (see https://golang.org/pkg/compress/gzip/)
414 fPrj := XdsFolderConfig{
417 ClientPath: pPrj.ClientPath,
418 Type: XdsFolderType(pPrj.Type),
420 IsInSync: pPrj.IsInSync,
421 DefaultSdk: pPrj.DefaultSdk,
422 ClientData: pPrj.ClientData,
423 DataPathMap: XdsPathMapConfig{
424 ServerPath: pPrj.ServerPath,
426 DataCloudSync: XdsCloudSyncConfig{
428 STLocIsInSync: pPrj.IsInSync,
429 STLocStatus: pPrj.Status,
430 STSvrIsInSync: pPrj.IsInSync,
431 STSvrStatus: pPrj.Status,
438 // FolderToProject Convert Folder structure to Project structure
439 func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) apiv1.ProjectConfig {
440 inSync := fPrj.IsInSync
443 if fPrj.Type == XdsTypeCloudSync {
444 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
446 sts = fPrj.DataCloudSync.STSvrStatus
447 switch fPrj.DataCloudSync.STLocStatus {
448 case apiv1.StatusErrorConfig, apiv1.StatusDisable, apiv1.StatusPause:
449 sts = fPrj.DataCloudSync.STLocStatus
451 case apiv1.StatusSyncing:
452 if sts != apiv1.StatusErrorConfig && sts != apiv1.StatusDisable && sts != apiv1.StatusPause {
453 sts = apiv1.StatusSyncing
456 case apiv1.StatusEnable:
462 pPrj := apiv1.ProjectConfig{
466 ClientPath: fPrj.ClientPath,
467 ServerPath: fPrj.DataPathMap.ServerPath,
468 Type: apiv1.ProjectType(fPrj.Type),
471 DefaultSdk: fPrj.DefaultSdk,
472 ClientData: fPrj.ClientData,
481 // Create HTTP client
482 func (xs *XdsServer) _CreateConnectHTTP() error {
484 xs.client, err = common.HTTPNewClient(xs.BaseURL,
485 common.HTTPClientConfig{
486 URLPrefix: "/api/v1",
487 HeaderClientKeyName: "Xds-Sid",
490 LogPrefix: "XDSSERVER: ",
491 LogLevel: common.HTTPLogLevelWarning,
494 xs.client.SetLogLevel(xs.Log.Level.String())
497 msg := ": " + err.Error()
498 if strings.Contains(err.Error(), "connection refused") {
499 msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
501 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
503 if xs.client == nil {
504 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
511 func (xs *XdsServer) _HTTPGet(url string, data interface{}) error {
513 if err := xs.client.HTTPGet(url, &dd); err != nil {
516 return json.Unmarshal(dd, &data)
520 func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) {
521 body, err := json.Marshal(data)
525 return xs.client.HTTPPostWithRes(url, string(body))
528 // Re-established connection
529 func (xs *XdsServer) _reconnect() error {
530 err := xs._connect(true)
532 // Reload projects list for this server
533 err = xs.projects.Init(xs)
538 // Established HTTP and WS connection and retrieve XDSServer config
539 func (xs *XdsServer) _connect(reConn bool) error {
541 xdsCfg := XdsServerConfig{}
542 if err := xs._HTTPGet("/config", &xdsCfg); err != nil {
550 if reConn && xs.ID != xdsCfg.ID {
551 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
554 // Update local XDS config
556 xs.ServerConfig = &xdsCfg
558 // Establish WS connection and register listen
559 if err := xs._SocketConnect(); err != nil {
570 // Create WebSocket (io.socket) connection
571 func (xs *XdsServer) _SocketConnect() error {
573 xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
575 opts := &sio_client.Options{
576 Transport: "websocket",
577 Header: make(map[string][]string),
579 opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
581 iosk, err := sio_client.NewClient(xs.BaseURL, opts)
583 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
587 // Register some listeners
589 iosk.On("error", func(err error) {
590 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
591 if xs.CBOnError != nil {
596 iosk.On("disconnection", func(err error) {
597 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
598 if xs.CBOnDisconnect != nil {
599 xs.CBOnDisconnect(err)
604 // Try to reconnect during 15min (or at least while not disabled)
608 for !xs.Disabled && !xs.Connected {
613 if waitTime > 15*60 {
614 xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
617 time.Sleep(time.Second * time.Duration(waitTime))
618 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
625 // XXX - There is no connection event generated so, just consider that
626 // we are connected when NewClient return successfully
627 /* iosk.On("connection", func() { ... }) */
628 xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
633 // Send event to notify changes
634 func (xs *XdsServer) _NotifyState() {
636 evSts := apiv1.ServerCfg{
640 PartialURL: xs.PartialURL,
641 ConnRetry: xs.ConnRetry,
642 Connected: xs.Connected,
644 if err := xs.events.Emit(apiv1.EVTServerConfig, evSts, ""); err != nil {
645 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)