X-Git-Url: https://gerrit.automotivelinux.org/gerrit/gitweb?a=blobdiff_plain;f=lib%2Fagent%2Fxds-low-collector.go;fp=lib%2Fagent%2Fxds-low-collector.go;h=fdf696db5d01ff1486391df330f8558ae4012c82;hb=701548c5b25efba70c3818e96a4394701cfb913e;hp=0000000000000000000000000000000000000000;hpb=247bb7c2db5f0d48178398599348249bf886ebbc;p=src%2Fxds%2Fxds-agent.git diff --git a/lib/agent/xds-low-collector.go b/lib/agent/xds-low-collector.go new file mode 100644 index 0000000..fdf696d --- /dev/null +++ b/lib/agent/xds-low-collector.go @@ -0,0 +1,321 @@ +/* + * Copyright (C) 2019 "IoT.bzh" + * Author Sebastien Douheret + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package agent + +import ( + "fmt" + "io" + "strings" + "time" + + "gerrit.automotivelinux.org/gerrit/src/xds/xds-agent.git/lib/aglafb" + common "gerrit.automotivelinux.org/gerrit/src/xds/xds-common.git" + uuid "github.com/satori/go.uuid" +) + +// XdsLowCollector . +type XdsLowCollector struct { + *Context + ID string + BaseURL string + ConnRetry int + Connected bool + Disabled bool + DefaultPlugins []string + DefaultCollectTime int + // Private fields + client *common.HTTPClient + logOut io.Writer + cbOnConnect OnConnectedXdsAlcCB +} + +// OnConnectedXdsAlcCB connect callback +type OnConnectedXdsAlcCB func(svr *XdsLowCollector) error + +// NewXdsLowCollector creates an instance of XdsLowCollector +func NewXdsLowCollector(ctx *Context) *XdsLowCollector { + return &XdsLowCollector{ + Context: ctx, + ID: "XdsAlc-" + uuid.NewV1().String(), + BaseURL: ctx.Config.FileConf.ProfileConf.XDSLowCollector.URL, + ConnRetry: ctx.Config.FileConf.ProfileConf.XDSLowCollector.ConnRetry, + Connected: false, + Disabled: false, + logOut: ctx.Log.Out, + DefaultPlugins: []string{ + "cpu", + "memory", + // SEB "processes", + //"cpufreq", + //"thermal", + //"systemd_journal", + // SEB "systemd_file", + }, + DefaultCollectTime: 5, + } +} + +// Connect Establish HTTP connection with XDS Low Collector Dameon +func (xs *XdsLowCollector) Connect() error { + var err error + var retry int + + xs.Disabled = false + xs.Connected = false + + err = nil + for retry = xs.ConnRetry; retry > 0; retry-- { + if err = xs._CreateConnectHTTP(); err == nil { + break + } + if retry == xs.ConnRetry { + // Notify only on the first conn error + // doing that avoid 2 notifs (conn false; conn true) on startup + xs._NotifyState() + } + xs.Log.Infof("Establishing connection to XDS Low Collector daemon (retry %d/%d)", retry, xs.ConnRetry) + time.Sleep(time.Second) + } + if retry == 0 { + // FIXME: re-use _Reconnect to wait longer in background + return fmt.Errorf("Connection to XDS Low Collector daemon failure") + } + if err != nil { + return err + } + + // Check HTTP connection and establish WS connection + err = xs._Connect(false) + + return err +} + +// ConnectOn Register a callback on events reception +func (xs *XdsLowCollector) ConnectOn(f OnConnectedXdsAlcCB) error { + xs.cbOnConnect = f + return nil +} + +// GetVersion Send Get request to retrieve XDS Low Collector version +func (xs *XdsLowCollector) GetVersion(res interface{}) error { + // FIXME add suffix URLSuffix in common HTTP client lib instead of _BuildURL + return xs.client.Get(xs._BuildURL("/version"), &res) +} + +// Init Initialize collector plugins +func (xs *XdsLowCollector) Init() error { + var err error + + // Directly send config in order to init and config plugins + + type alcCfgPluginT struct { + Plugin string `json:"plugin"` + Config string `json:"config"` + } + + cfg := []alcCfgPluginT{} + + for _, p := range xs.DefaultPlugins { + cfg = append(cfg, alcCfgPluginT{ + Plugin: p, + Config: "default", + }) + } + + res := aglafb.NewAfbReply() + xs.Log.Debugf("Low Collector /config %v", cfg) + err = xs.client.Post(xs._BuildURL("/config"), cfg, res) + + if err == nil && !res.Success() { + err = res.GetError() + } + + return err +} + +// Start data collection +func (xs *XdsLowCollector) Start(time int) error { + var err error + + // TODO - SEB : support start one or multiple plugins + + if time == 0 { + time = xs.DefaultCollectTime + } + + type alcStartT struct { + Plugin string `json:"plugin"` + Time int `json:"time"` + } + + // TODO SEB : allow to start only 1 plugin + allInOne := true + if allInOne { + + cfg := []alcStartT{} + for _, p := range xs.DefaultPlugins { + cfg = append(cfg, alcStartT{Plugin: p, Time: time}) + } + + res := aglafb.NewAfbReply() + xs.Log.Debugf("Low Collector /start %v", cfg) + err = xs.client.Post(xs._BuildURL("/start"), cfg, res) + + if err == nil && !res.Success() { + err = res.GetError() + } + } else { + for _, p := range xs.DefaultPlugins { + cfg := alcStartT{Plugin: p, Time: time} + + res := aglafb.NewAfbReply() + xs.Log.Debugf("Low Collector /start %v", cfg) + err = xs.client.Post(xs._BuildURL("/start"), cfg, res) + if err != nil { + return err + } + if !res.Success() { + return res.GetError() + } + } + } + + return err +} + +// Stop data collection +func (xs *XdsLowCollector) Stop() error { + + // TODO - SEB : support start one or multiple plugins + + type alcStopT struct { + Plugin []string `json:"plugin"` + } + + cfg := alcStopT{} + for _, p := range xs.DefaultPlugins { + cfg.Plugin = append(cfg.Plugin, p) + } + + res := aglafb.NewAfbReply() + xs.Log.Debugf("Low Collector /stop %v", cfg) + err := xs.client.Post(xs._BuildURL("/stop"), cfg, res) + + if err == nil && !res.Success() { + err = res.GetError() + } + + return err +} + +// Read a single data of a specific plugin +func (xs *XdsLowCollector) Read(data interface{}) error { + return fmt.Errorf("No implemented") +} + +/*** +** Private functions +***/ + +// _BuildURL . +func (xs *XdsLowCollector) _BuildURL(url string) string { + return url + "?token=HELLO&uuid=magic" +} + +// Create HTTP client +func (xs *XdsLowCollector) _CreateConnectHTTP() error { + var err error + // FIXME SEB - Client key not in header but in cookie + // temporary workaround: used _BuildURL to append uuid=magic in URL + // map[Set-Cookie:[x-afb-uuid-5678=2b185cc3-276b-4097-91fa-d607eaf937e6; Path=/api; Max-Age=32000000; ... + //port := strings.Split(xs.BaseURL, ":")[2] + //"x-afb-uuid-" + port + + xs.client, err = common.HTTPNewClient(xs.BaseURL, + common.HTTPClientConfig{ + //HeaderClientKeyName: "Xds-Sid", + HeaderAPIKeyName: "token", + Apikey: "HELLO", + URLPrefix: "/api/alc", + CsrfDisable: true, + LogOut: xs.logOut, + LogPrefix: "XDSALC: ", + LogLevel: common.HTTPLogLevelWarning, + }) + + xs.client.SetLogLevel(xs.Log.Level.String()) + + if err != nil { + msg := ": " + err.Error() + if strings.Contains(err.Error(), "connection refused") { + msg = fmt.Sprintf("(url: %s)", xs.BaseURL) + } + return fmt.Errorf("ERROR: cannot connect to XDS Low Collector %s", msg) + } + if xs.client == nil { + return fmt.Errorf("ERROR: cannot connect to XDS Low Collector (null client)") + } + + return nil +} + +// _Connect Established HTTP and WS connection +func (xs *XdsLowCollector) _Connect(reConn bool) error { + var res interface{} + if err := xs.client.Get(xs._BuildURL("/ping"), &res); err != nil { + + // SEB FIXME tempo Hack + time.Sleep(time.Microsecond * 300) + if err := xs.client.Get(xs._BuildURL("/ping"), &res); err != nil { + // SEB Hack tempo + // xs.Connected = false + // if !reConn { + // xs._NotifyState() + // } + // return err + } + } + + xs.Connected = true + + // Call OnConnect callback + if xs.cbOnConnect != nil { + xs.cbOnConnect(xs) + } + + xs._NotifyState() + return nil +} + +// _NotifyState Send event to notify changes +func (xs *XdsLowCollector) _NotifyState() { + + /* TODO + evSts := xaapiv1.ServerCfg{ + ID: xs.ID, + URL: xs.BaseURL, + APIURL: xs.APIURL, + PartialURL: xs.PartialURL, + ConnRetry: xs.ConnRetry, + Connected: xs.Connected, + } + if err := xs.events.Emit(xaapiv1.EVTServerConfig, evSts, ""); err != nil { + xs.Log.Warningf("Cannot notify XdsServer state change: %v", err) + } + */ +}