13 "github.com/gin-gonic/gin"
14 "github.com/iotbzh/xds-agent/lib/apiv1"
15 "github.com/iotbzh/xds-agent/lib/xdsconfig"
16 common "github.com/iotbzh/xds-common/golib"
17 uuid "github.com/satori/go.uuid"
18 sio_client "github.com/sebd71/go-socket.io-client"
22 type XdsServer struct {
31 ServerConfig *XdsServerConfig
35 CBOnDisconnect func(error)
36 sockEvents map[string][]*caller
37 sockEventsLock *sync.Mutex
40 client *common.HTTPClient
41 ioSock *sio_client.Client
43 apiRouter *gin.RouterGroup
46 // XdsServerConfig Data return by GET /config
47 type XdsServerConfig struct {
49 Version string `json:"version"`
50 APIVersion string `json:"apiVersion"`
51 VersionGitTag string `json:"gitTag"`
52 SupportedSharing map[string]bool `json:"supportedSharing"`
53 Builder XdsBuilderConfig `json:"builder"`
56 // XdsBuilderConfig represents the builder container configuration
57 type XdsBuilderConfig struct {
59 Port string `json:"port"`
60 SyncThingID string `json:"syncThingID"`
63 // XdsFolderType XdsServer folder type
64 type XdsFolderType string
67 XdsTypePathMap = "PathMap"
68 XdsTypeCloudSync = "CloudSync"
69 XdsTypeCifsSmb = "CIFS"
72 // XdsFolderConfig XdsServer folder config
73 type XdsFolderConfig struct {
75 Label string `json:"label"`
76 ClientPath string `json:"path"`
77 Type XdsFolderType `json:"type"`
78 Status string `json:"status"`
79 IsInSync bool `json:"isInSync"`
80 DefaultSdk string `json:"defaultSdk"`
81 // Specific data depending on which Type is used
82 DataPathMap XdsPathMapConfig `json:"dataPathMap,omitempty"`
83 DataCloudSync XdsCloudSyncConfig `json:"dataCloudSync,omitempty"`
86 // XdsPathMapConfig Path mapping specific data
87 type XdsPathMapConfig struct {
88 ServerPath string `json:"serverPath"`
89 CheckFile string `json:"checkFile"`
90 CheckContent string `json:"checkContent"`
93 // XdsCloudSyncConfig CloudSync (AKA Syncthing) specific data
94 type XdsCloudSyncConfig struct {
95 SyncThingID string `json:"syncThingID"`
96 STSvrStatus string `json:"-"`
97 STSvrIsInSync bool `json:"-"`
98 STLocStatus string `json:"-"`
99 STLocIsInSync bool `json:"-"`
102 // XdsEventRegisterArgs arguments used to register to XDS server events
103 type XdsEventRegisterArgs struct {
104 Name string `json:"name"`
105 ProjectID string `json:"filterProjectID"`
108 // XdsEventFolderChange Folder change event structure
109 type XdsEventFolderChange struct {
110 Time string `json:"time"`
111 Type string `json:"type"`
112 Folder XdsFolderConfig `json:"folder"`
115 // caller Used to chain event listeners
119 Func func(interface{})
122 const _IDTempoPrefix = "tempo-"
124 // NewXdsServer creates an instance of XdsServer
125 func NewXdsServer(ctx *Context, conf xdsconfig.XDSServerConf) *XdsServer {
128 ID: _IDTempoPrefix + uuid.NewV1().String(),
130 APIURL: conf.APIBaseURL + conf.APIPartialURL,
131 PartialURL: conf.APIPartialURL,
132 ConnRetry: conf.ConnRetry,
136 sockEvents: make(map[string][]*caller),
137 sockEventsLock: &sync.Mutex{},
142 // Close Free and close XDS Server connection
143 func (xs *XdsServer) Close() error {
151 // Connect Establish HTTP connection with XDS Server
152 func (xs *XdsServer) Connect() error {
160 for retry = xs.ConnRetry; retry > 0; retry-- {
161 if err = xs._CreateConnectHTTP(); err == nil {
164 if retry == xs.ConnRetry {
165 // Notify only on the first conn error
166 // doing that avoid 2 notifs (conn false; conn true) on startup
169 xs.Log.Infof("Establishing connection to XDS Server (retry %d/%d)", retry, xs.ConnRetry)
170 time.Sleep(time.Second)
173 // FIXME: re-use _reconnect to wait longer in background
174 return fmt.Errorf("Connection to XDS Server failure")
180 // Check HTTP connection and establish WS connection
181 err = xs._connect(false)
186 // IsTempoID returns true when server as a temporary id
187 func (xs *XdsServer) IsTempoID() bool {
188 return strings.HasPrefix(xs.ID, _IDTempoPrefix)
191 // SetLoggerOutput Set logger ou
192 func (xs *XdsServer) SetLoggerOutput(out io.Writer) {
196 // SendCommand Send a command to XDS Server
197 func (xs *XdsServer) SendCommand(cmd string, body []byte) (*http.Response, error) {
199 if !strings.HasPrefix("/", cmd) {
202 return xs.client.HTTPPostWithRes(url, string(body))
205 // GetVersion Send Get request to retrieve XDS Server version
206 func (xs *XdsServer) GetVersion(res interface{}) error {
207 return xs._HTTPGet("/version", &res)
210 // GetFolders Send GET request to get current folder configuration
211 func (xs *XdsServer) GetFolders(folders *[]XdsFolderConfig) error {
212 return xs._HTTPGet("/folders", folders)
215 // FolderAdd Send POST request to add a folder
216 func (xs *XdsServer) FolderAdd(fld *XdsFolderConfig, res interface{}) error {
217 response, err := xs._HTTPPost("/folders", fld)
221 if response.StatusCode != 200 {
222 return fmt.Errorf("FolderAdd error status=%s", response.Status)
224 // Result is a XdsFolderConfig that is equivalent to ProjectConfig
225 err = json.Unmarshal(xs.client.ResponseToBArray(response), res)
230 // FolderDelete Send DELETE request to delete a folder
231 func (xs *XdsServer) FolderDelete(id string) error {
232 return xs.client.HTTPDelete("/folders/" + id)
235 // FolderSync Send POST request to force synchronization of a folder
236 func (xs *XdsServer) FolderSync(id string) error {
237 return xs.client.HTTPPost("/folders/sync/"+id, "")
240 // SetAPIRouterGroup .
241 func (xs *XdsServer) SetAPIRouterGroup(r *gin.RouterGroup) {
245 // PassthroughGet Used to declare a route that sends directly a GET request to XDS Server
246 func (xs *XdsServer) PassthroughGet(url string) {
247 if xs.apiRouter == nil {
248 xs.Log.Errorf("apiRouter not set !")
252 xs.apiRouter.GET(url, func(c *gin.Context) {
254 // Take care of param (eg. id in /projects/:id)
256 if strings.Contains(url, ":") {
257 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
260 if err := xs._HTTPGet(nURL, &data); err != nil {
261 if strings.Contains(err.Error(), "connection refused") {
265 common.APIError(c, err.Error())
269 c.JSON(http.StatusOK, data)
273 // PassthroughPost Used to declare a route that sends directly a POST request to XDS Server
274 func (xs *XdsServer) PassthroughPost(url string) {
275 if xs.apiRouter == nil {
276 xs.Log.Errorf("apiRouter not set !")
280 xs.apiRouter.POST(url, func(c *gin.Context) {
282 n, err := c.Request.Body.Read(bodyReq)
284 common.APIError(c, err.Error())
288 // Take care of param (eg. id in /projects/:id)
290 if strings.Contains(url, ":") {
291 nURL = strings.TrimPrefix(c.Request.URL.Path, xs.APIURL)
294 response, err := xs._HTTPPost(nURL, bodyReq[:n])
296 common.APIError(c, err.Error())
299 bodyRes, err := ioutil.ReadAll(response.Body)
301 common.APIError(c, "Cannot read response body")
304 c.JSON(http.StatusOK, string(bodyRes))
308 // EventRegister Post a request to register to an XdsServer event
309 func (xs *XdsServer) EventRegister(evName string, id string) error {
311 _, err = xs._HTTPPost("/events/register", XdsEventRegisterArgs{
318 // EventOn Register a callback on events reception
319 func (xs *XdsServer) EventOn(evName string, f func(interface{})) (uuid.UUID, error) {
320 if xs.ioSock == nil {
321 return uuid.Nil, fmt.Errorf("Io.Socket not initialized")
324 xs.sockEventsLock.Lock()
325 defer xs.sockEventsLock.Unlock()
327 if _, exist := xs.sockEvents[evName]; !exist {
328 // Register listener only the first time
331 // FIXME: use generic type: data interface{} instead of data XdsEventFolderChange
333 if evName == "event:FolderStateChanged" {
334 err = xs.ioSock.On(evn, func(data XdsEventFolderChange) error {
335 xs.sockEventsLock.Lock()
336 defer xs.sockEventsLock.Unlock()
337 for _, c := range xs.sockEvents[evn] {
343 err = xs.ioSock.On(evn, f)
356 xs.sockEvents[evName] = append(xs.sockEvents[evName], c)
360 // EventOff Un-register a (or all) callbacks associated to an event
361 func (xs *XdsServer) EventOff(evName string, id uuid.UUID) error {
362 xs.sockEventsLock.Lock()
363 defer xs.sockEventsLock.Unlock()
364 if _, exist := xs.sockEvents[evName]; exist {
367 xs.sockEvents[evName] = []*caller{}
369 // Un-register only the specified callback
370 for i, ff := range xs.sockEvents[evName] {
371 if uuid.Equal(ff.id, id) {
372 xs.sockEvents[evName] = append(xs.sockEvents[evName][:i], xs.sockEvents[evName][i+1:]...)
381 // ProjectToFolder Convert Project structure to Folder structure
382 func (xs *XdsServer) ProjectToFolder(pPrj apiv1.ProjectConfig) *XdsFolderConfig {
384 if pPrj.Type == XdsTypeCloudSync {
385 stID, _ = xs.SThg.IDGet()
387 fPrj := XdsFolderConfig{
390 ClientPath: pPrj.ClientPath,
391 Type: XdsFolderType(pPrj.Type),
393 IsInSync: pPrj.IsInSync,
394 DefaultSdk: pPrj.DefaultSdk,
395 DataPathMap: XdsPathMapConfig{
396 ServerPath: pPrj.ServerPath,
398 DataCloudSync: XdsCloudSyncConfig{
400 STLocIsInSync: pPrj.IsInSync,
401 STLocStatus: pPrj.Status,
402 STSvrIsInSync: pPrj.IsInSync,
403 STSvrStatus: pPrj.Status,
410 // FolderToProject Convert Folder structure to Project structure
411 func (xs *XdsServer) FolderToProject(fPrj XdsFolderConfig) apiv1.ProjectConfig {
412 inSync := fPrj.IsInSync
415 if fPrj.Type == XdsTypeCloudSync {
416 inSync = fPrj.DataCloudSync.STSvrIsInSync && fPrj.DataCloudSync.STLocIsInSync
418 sts = fPrj.DataCloudSync.STSvrStatus
419 switch fPrj.DataCloudSync.STLocStatus {
420 case apiv1.StatusErrorConfig, apiv1.StatusDisable, apiv1.StatusPause:
421 sts = fPrj.DataCloudSync.STLocStatus
423 case apiv1.StatusSyncing:
424 if sts != apiv1.StatusErrorConfig && sts != apiv1.StatusDisable && sts != apiv1.StatusPause {
425 sts = apiv1.StatusSyncing
428 case apiv1.StatusEnable:
434 pPrj := apiv1.ProjectConfig{
438 ClientPath: fPrj.ClientPath,
439 ServerPath: fPrj.DataPathMap.ServerPath,
440 Type: apiv1.ProjectType(fPrj.Type),
443 DefaultSdk: fPrj.DefaultSdk,
452 // Create HTTP client
453 func (xs *XdsServer) _CreateConnectHTTP() error {
455 xs.client, err = common.HTTPNewClient(xs.BaseURL,
456 common.HTTPClientConfig{
457 URLPrefix: "/api/v1",
458 HeaderClientKeyName: "Xds-Sid",
461 LogPrefix: "XDSSERVER: ",
462 LogLevel: common.HTTPLogLevelWarning,
465 xs.client.SetLogLevel(xs.Log.Level.String())
468 msg := ": " + err.Error()
469 if strings.Contains(err.Error(), "connection refused") {
470 msg = fmt.Sprintf("(url: %s)", xs.BaseURL)
472 return fmt.Errorf("ERROR: cannot connect to XDS Server %s", msg)
474 if xs.client == nil {
475 return fmt.Errorf("ERROR: cannot connect to XDS Server (null client)")
482 func (xs *XdsServer) _HTTPGet(url string, data interface{}) error {
484 if err := xs.client.HTTPGet(url, &dd); err != nil {
487 return json.Unmarshal(dd, &data)
491 func (xs *XdsServer) _HTTPPost(url string, data interface{}) (*http.Response, error) {
492 body, err := json.Marshal(data)
496 return xs.client.HTTPPostWithRes(url, string(body))
499 // Re-established connection
500 func (xs *XdsServer) _reconnect() error {
501 err := xs._connect(true)
503 // Reload projects list for this server
504 err = xs.projects.Init(xs)
509 // Established HTTP and WS connection and retrieve XDSServer config
510 func (xs *XdsServer) _connect(reConn bool) error {
512 xdsCfg := XdsServerConfig{}
513 if err := xs._HTTPGet("/config", &xdsCfg); err != nil {
521 if reConn && xs.ID != xdsCfg.ID {
522 xs.Log.Warningf("Reconnected to server but ID differs: old=%s, new=%s", xs.ID, xdsCfg.ID)
525 // Update local XDS config
527 xs.ServerConfig = &xdsCfg
529 // Establish WS connection and register listen
530 if err := xs._SocketConnect(); err != nil {
541 // Create WebSocket (io.socket) connection
542 func (xs *XdsServer) _SocketConnect() error {
544 xs.Log.Infof("Connecting IO.socket for server %s (url %s)", xs.ID, xs.BaseURL)
546 opts := &sio_client.Options{
547 Transport: "websocket",
548 Header: make(map[string][]string),
550 opts.Header["XDS-SID"] = []string{xs.client.GetClientID()}
552 iosk, err := sio_client.NewClient(xs.BaseURL, opts)
554 return fmt.Errorf("IO.socket connection error for server %s: %v", xs.ID, err)
558 // Register some listeners
560 iosk.On("error", func(err error) {
561 xs.Log.Infof("IO.socket Error server %s; err: %v", xs.ID, err)
562 if xs.CBOnError != nil {
567 iosk.On("disconnection", func(err error) {
568 xs.Log.Infof("IO.socket disconnection server %s", xs.ID)
569 if xs.CBOnDisconnect != nil {
570 xs.CBOnDisconnect(err)
575 // Try to reconnect during 15min (or at least while not disabled)
579 for !xs.Disabled && !xs.Connected {
584 if waitTime > 15*60 {
585 xs.Log.Infof("Stop reconnection to server url=%s id=%s !", xs.BaseURL, xs.ID)
588 time.Sleep(time.Second * time.Duration(waitTime))
589 xs.Log.Infof("Try to reconnect to server %s (%d)", xs.BaseURL, count)
596 // XXX - There is no connection event generated so, just consider that
597 // we are connected when NewClient return successfully
598 /* iosk.On("connection", func() { ... }) */
599 xs.Log.Infof("IO.socket connected server url=%s id=%s", xs.BaseURL, xs.ID)
604 // Send event to notify changes
605 func (xs *XdsServer) _NotifyState() {
607 evSts := apiv1.ServerCfg{
611 PartialURL: xs.PartialURL,
612 ConnRetry: xs.ConnRetry,
613 Connected: xs.Connected,
615 if err := xs.events.Emit(apiv1.EVTServerConfig, evSts); err != nil {
616 xs.Log.Warningf("Cannot notify XdsServer state change: %v", err)