Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: dead code removal and app router & rest handler restructuring #4595

Merged
merged 8 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: removed support for unused Prometheus and ArgoCd APIs
  • Loading branch information
Ash-exp committed Jan 25, 2024
commit ad9a8b072e12db499e244027d6efdf02d60c5a9e
5 changes: 1 addition & 4 deletions Wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,7 @@ func InitializeApp() (*App, error) {
repository2.NewServiceClientImpl,
wire.Bind(new(repository2.ServiceClient), new(*repository2.ServiceClientImpl)),
wire.Bind(new(connector.Pump), new(*connector.PumpImpl)),
restHandler.NewArgoApplicationRestHandlerImpl,
wire.Bind(new(restHandler.ArgoApplicationRestHandler), new(*restHandler.ArgoApplicationRestHandlerImpl)),
router.NewApplicationRouterImpl,
wire.Bind(new(router.ApplicationRouter), new(*router.ApplicationRouterImpl)),

//app.GetConfig,

router.NewCDRouterImpl,
Expand Down
194 changes: 8 additions & 186 deletions api/connector/Connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"bufio"
"encoding/json"
"fmt"
"github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
"github.com/devtron-labs/devtron/api/bean"
"github.com/gogo/protobuf/proto"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
Expand All @@ -41,9 +40,6 @@ import (
var delimiter = []byte("\n\n")

type Pump interface {
StartStream(w http.ResponseWriter, recv func() (proto.Message, error), err error)
StartStreamWithHeartBeat(w http.ResponseWriter, isReconnect bool, recv func() (*application.LogEntry, error), err error)
StartMessage(w http.ResponseWriter, resp proto.Message, perr error)
StartStreamWithTransformer(w http.ResponseWriter, recv func() (proto.Message, error), err error, transformer func(interface{}) interface{})
StartK8sStreamWithHeartBeat(w http.ResponseWriter, isReconnect bool, stream io.ReadCloser, err error)
}
Expand Down Expand Up @@ -148,7 +144,7 @@ func (impl PumpImpl) StartK8sStreamWithHeartBeat(w http.ResponseWriter, isReconn
// heartbeat end
}

func (impl PumpImpl) StartStreamWithHeartBeat(w http.ResponseWriter, isReconnect bool, recv func() (*application.LogEntry, error), err error) {
func (impl PumpImpl) StartStreamWithTransformer(w http.ResponseWriter, recv func() (proto.Message, error), err error, transformer func(interface{}) interface{}) {
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, "unexpected server doesnt support streaming", http.StatusInternalServerError)
Expand All @@ -162,41 +158,6 @@ func (impl PumpImpl) StartStreamWithHeartBeat(w http.ResponseWriter, isReconnect
w.Header().Set("X-Content-Type-Options", "nosniff")

var wroteHeader bool
if isReconnect {
err := impl.sendEvent(nil, []byte("RECONNECT_STREAM"), []byte("RECONNECT_STREAM"), w)
if err != nil {
impl.logger.Errorw("error in writing data over sse", "err", err)
return
}
}
// heartbeat start
ticker := time.NewTicker(30 * time.Second)
done := make(chan bool)
var mux sync.Mutex
go func() error {
for {
select {
case <-done:
return nil
case t := <-ticker.C:
mux.Lock()
err := impl.sendEvent(nil, []byte("PING"), []byte(t.String()), w)
mux.Unlock()
if err != nil {
impl.logger.Errorw("error in writing PING over sse", "err", err)
return err
}
f.Flush()
}
}
}()
defer func() {
ticker.Stop()
done <- true
}()

// heartbeat end

for {
resp, err := recv()
if err == io.EOF {
Expand All @@ -208,20 +169,18 @@ func (impl PumpImpl) StartStreamWithHeartBeat(w http.ResponseWriter, isReconnect
return
}
response := bean.Response{}
response.Result = resp
response.Result = transformer(resp)
buf, err := json.Marshal(response)
if err != nil {
impl.logger.Errorw("error in marshaling data", "err", err)
data := "data: " + string(buf)
if _, err = w.Write([]byte(data)); err != nil {
impl.logger.Errorf("Failed to send response chunk: %v", err)
return
}
mux.Lock()
err = impl.sendEvent([]byte(strconv.FormatInt(resp.GetTimeStamp().UnixNano(), 10)), nil, buf, w)
mux.Unlock()
if err != nil {
impl.logger.Errorw("error in writing data over sse", "err", err)
wroteHeader = true
if _, err = w.Write(delimiter); err != nil {
impl.logger.Errorf("Failed to send delimiter chunk: %v", err)
return
}
wroteHeader = true
f.Flush()
}
}
Expand Down Expand Up @@ -251,88 +210,6 @@ func (impl *PumpImpl) sendEvent(eventId []byte, eventName []byte, payload []byte
return nil
}

func (impl PumpImpl) StartStream(w http.ResponseWriter, recv func() (proto.Message, error), err error) {
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, "unexpected server doesnt support streaming", http.StatusInternalServerError)
}
if err != nil {
http.Error(w, errors.Details(err), http.StatusInternalServerError)
}
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("X-Accel-Buffering", "no")
w.Header().Set("X-Content-Type-Options", "nosniff")

var wroteHeader bool
for {
resp, err := recv()
if err == io.EOF {
return
}
if err != nil {
impl.logger.Errorf("Error occurred while reading data from argocd %+v\n", err)
impl.handleForwardResponseStreamError(wroteHeader, w, err)
return
}
response := bean.Response{}
response.Result = resp
buf, err := json.Marshal(response)
data := "data: " + string(buf)
if _, err = w.Write([]byte(data)); err != nil {
impl.logger.Errorf("Failed to send response chunk: %v", err)
return
}
wroteHeader = true
if _, err = w.Write(delimiter); err != nil {
impl.logger.Errorf("Failed to send delimiter chunk: %v", err)
return
}
f.Flush()
}
}

func (impl PumpImpl) StartStreamWithTransformer(w http.ResponseWriter, recv func() (proto.Message, error), err error, transformer func(interface{}) interface{}) {
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, "unexpected server doesnt support streaming", http.StatusInternalServerError)
}
if err != nil {
http.Error(w, errors.Details(err), http.StatusInternalServerError)
}
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("X-Accel-Buffering", "no")
w.Header().Set("X-Content-Type-Options", "nosniff")

var wroteHeader bool
for {
resp, err := recv()
if err == io.EOF {
return
}
if err != nil {
impl.logger.Errorf("Error occurred while reading data from argocd %+v\n", err)
impl.handleForwardResponseStreamError(wroteHeader, w, err)
return
}
response := bean.Response{}
response.Result = transformer(resp)
buf, err := json.Marshal(response)
data := "data: " + string(buf)
if _, err = w.Write([]byte(data)); err != nil {
impl.logger.Errorf("Failed to send response chunk: %v", err)
return
}
wroteHeader = true
if _, err = w.Write(delimiter); err != nil {
impl.logger.Errorf("Failed to send delimiter chunk: %v", err)
return
}
f.Flush()
}
}

func (impl PumpImpl) handleForwardResponseStreamError(wroteHeader bool, w http.ResponseWriter, err error) {
code := "000"
if !wroteHeader {
Expand All @@ -357,58 +234,3 @@ func (impl PumpImpl) handleForwardResponseStreamError(wroteHeader bool, w http.R
return
}
}

func (impl PumpImpl) StartMessage(w http.ResponseWriter, resp proto.Message, perr error) {
//w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Content-Type", "application/json")

response := bean.Response{}
if perr != nil {
impl.handleForwardResponseMessageError(w, perr)
return
}
var buf []byte
var err error
if rb, ok := resp.(responseBody); ok {
response.Result = rb.XXX_ResponseBody()
buf, err = json.Marshal(response)
} else {
response.Result = resp
buf, err = json.Marshal(response)
}
if err != nil {
impl.logger.Errorf("Marshal error: %v", err)
return
}

if _, err = w.Write(buf); err != nil {
impl.logger.Errorf("Failed to write response: %v", err)
}
}

func (impl PumpImpl) handleForwardResponseMessageError(w http.ResponseWriter, err error) {
code := "000"
s, ok := status.FromError(err)
if !ok {
s = status.New(codes.Unknown, err.Error())
}
w.WriteHeader(runtime.HTTPStatusFromCode(s.Code()))
code = fmt.Sprint(s.Code())
response := bean.Response{}
apiErr := bean.ApiError{}
apiErr.Code = code // 000=unknown
apiErr.InternalMessage = errors.Details(err)
response.Errors = []bean.ApiError{apiErr}
buf, merr := json.Marshal(response)
if merr != nil {
impl.logger.Errorf("Failed to marshal response %+v\n", merr)
}
if _, werr := w.Write(buf); werr != nil {
impl.logger.Errorf("Failed to notify error to client: %v", werr)
return
}
}

type responseBody interface {
XXX_ResponseBody() interface{}
}
Loading