/* * Copyright (C) 2019 "IoT.bzh" * Author Sebastien Douheret * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package agent import ( "fmt" "io" "strings" "time" "gerrit.automotivelinux.org/gerrit/src/xds/xds-agent.git/lib/aglafb" common "gerrit.automotivelinux.org/gerrit/src/xds/xds-common.git" uuid "github.com/satori/go.uuid" ) // XdsLowCollector . type XdsLowCollector struct { *Context ID string BaseURL string ConnRetry int Connected bool Disabled bool DefaultPlugins []string DefaultCollectTime int // Private fields client *common.HTTPClient logOut io.Writer cbOnConnect OnConnectedXdsAlcCB } // OnConnectedXdsAlcCB connect callback type OnConnectedXdsAlcCB func(svr *XdsLowCollector) error // NewXdsLowCollector creates an instance of XdsLowCollector func NewXdsLowCollector(ctx *Context) *XdsLowCollector { return &XdsLowCollector{ Context: ctx, ID: "XdsAlc-" + uuid.NewV1().String(), BaseURL: ctx.Config.FileConf.ProfileConf.XDSLowCollector.URL, ConnRetry: ctx.Config.FileConf.ProfileConf.XDSLowCollector.ConnRetry, Connected: false, Disabled: false, logOut: ctx.Log.Out, DefaultPlugins: []string{ "cpu", "memory", // SEB "processes", //"cpufreq", //"thermal", //"systemd_journal", // SEB "systemd_file", }, DefaultCollectTime: 5, } } // Connect Establish HTTP connection with XDS Low Collector Dameon func (xs *XdsLowCollector) Connect() error { var err error var retry int xs.Disabled = false xs.Connected = false err = nil for retry = xs.ConnRetry; retry > 0; retry-- { if err = xs._CreateConnectHTTP(); err == nil { break } if retry == xs.ConnRetry { // Notify only on the first conn error // doing that avoid 2 notifs (conn false; conn true) on startup xs._NotifyState() } xs.Log.Infof("Establishing connection to XDS Low Collector daemon (retry %d/%d)", retry, xs.ConnRetry) time.Sleep(time.Second) } if retry == 0 { // FIXME: re-use _Reconnect to wait longer in background return fmt.Errorf("Connection to XDS Low Collector daemon failure") } if err != nil { return err } // Check HTTP connection and establish WS connection err = xs._Connect(false) return err } // ConnectOn Register a callback on events reception func (xs *XdsLowCollector) ConnectOn(f OnConnectedXdsAlcCB) error { xs.cbOnConnect = f return nil } // GetVersion Send Get request to retrieve XDS Low Collector version func (xs *XdsLowCollector) GetVersion(res interface{}) error { // FIXME add suffix URLSuffix in common HTTP client lib instead of _BuildURL return xs.client.Get(xs._BuildURL("/version"), &res) } // Init Initialize collector plugins func (xs *XdsLowCollector) Init() error { var err error // Directly send config in order to init and config plugins type alcCfgPluginT struct { Plugin string `json:"plugin"` Config string `json:"config"` } cfg := []alcCfgPluginT{} for _, p := range xs.DefaultPlugins { cfg = append(cfg, alcCfgPluginT{ Plugin: p, Config: "default", }) } res := aglafb.NewAfbReply() xs.Log.Debugf("Low Collector /config %v", cfg) err = xs.client.Post(xs._BuildURL("/config"), cfg, res) if err == nil && !res.Success() { err = res.GetError() } return err } // Start data collection func (xs *XdsLowCollector) Start(time int) error { var err error // TODO - SEB : support start one or multiple plugins if time == 0 { time = xs.DefaultCollectTime } type alcStartT struct { Plugin string `json:"plugin"` Time int `json:"time"` } // TODO SEB : allow to start only 1 plugin allInOne := true if allInOne { cfg := []alcStartT{} for _, p := range xs.DefaultPlugins { cfg = append(cfg, alcStartT{Plugin: p, Time: time}) } res := aglafb.NewAfbReply() xs.Log.Debugf("Low Collector /start %v", cfg) err = xs.client.Post(xs._BuildURL("/start"), cfg, res) if err == nil && !res.Success() { err = res.GetError() } } else { for _, p := range xs.DefaultPlugins { cfg := alcStartT{Plugin: p, Time: time} res := aglafb.NewAfbReply() xs.Log.Debugf("Low Collector /start %v", cfg) err = xs.client.Post(xs._BuildURL("/start"), cfg, res) if err != nil { return err } if !res.Success() { return res.GetError() } } } return err } // Stop data collection func (xs *XdsLowCollector) Stop() error { // TODO - SEB : support start one or multiple plugins type alcStopT struct { Plugin []string `json:"plugin"` } cfg := alcStopT{} for _, p := range xs.DefaultPlugins { cfg.Plugin = append(cfg.Plugin, p) } res := aglafb.NewAfbReply() xs.Log.Debugf("Low Collector /stop %v", cfg) err := xs.client.Post(xs._BuildURL("/stop"), cfg, res) if err == nil && !res.Success() { err = res.GetError() } return err } // Read a single data of a specific plugin func (xs *XdsLowCollector) Read(data interface{}) error { return fmt.Errorf("No implemented") } /*** ** Private functions ***/ // _BuildURL . func (xs *XdsLowCollector) _BuildURL(url string) string { return url + "?token=HELLO&uuid=magic" } // Create HTTP client func (xs *XdsLowCollector) _CreateConnectHTTP() error { var err error // FIXME SEB - Client key not in header but in cookie // temporary workaround: used _BuildURL to append uuid=magic in URL // map[Set-Cookie:[x-afb-uuid-5678=2b185cc3-276b-4097-91fa-d607eaf937e6; Path=/api; Max-Age=32000000; ... //port := strings.Split(xs.BaseURL, ":")[2] //"x-afb-uuid-" + port xs.client, err = common.HTTPNewClient(xs.BaseURL, common.HTTPClientConfig{ //HeaderClientKeyName: "Xds-Sid", HeaderAPIKeyName: "token", Apikey: "HELLO", URLPrefix: "/api/alc", CsrfDisable: true, LogOut: xs.logOut, LogPrefix: "XDSALC: ", LogLevel: common.HTTPLogLevelWarning, }) xs.client.SetLogLevel(xs.Log.Level.String()) if err != nil { msg := ": " + err.Error() if strings.Contains(err.Error(), "connection refused") { msg = fmt.Sprintf("(url: %s)", xs.BaseURL) } return fmt.Errorf("ERROR: cannot connect to XDS Low Collector %s", msg) } if xs.client == nil { return fmt.Errorf("ERROR: cannot connect to XDS Low Collector (null client)") } return nil } // _Connect Established HTTP and WS connection func (xs *XdsLowCollector) _Connect(reConn bool) error { var res interface{} if err := xs.client.Get(xs._BuildURL("/ping"), &res); err != nil { // SEB FIXME tempo Hack time.Sleep(time.Microsecond * 300) if err := xs.client.Get(xs._BuildURL("/ping"), &res); err != nil { // SEB Hack tempo // xs.Connected = false // if !reConn { // xs._NotifyState() // } // return err } } xs.Connected = true // Call OnConnect callback if xs.cbOnConnect != nil { xs.cbOnConnect(xs) } xs._NotifyState() return nil } // _NotifyState Send event to notify changes func (xs *XdsLowCollector) _NotifyState() { /* TODO evSts := xaapiv1.ServerCfg{ ID: xs.ID, URL: xs.BaseURL, APIURL: xs.APIURL, PartialURL: xs.PartialURL, ConnRetry: xs.ConnRetry, Connected: xs.Connected, } if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil { xs.Log.Warningf("Cannot notify XdsServer state change: %v", err) } */ }