Skip to content

Commit

Permalink
Made throughput handling with RPC Slaves more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
lonelycode committed Jun 17, 2016
1 parent 6c33347 commit 30757ce
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 41 deletions.
12 changes: 6 additions & 6 deletions handler_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ func (e ErrorHandler) HandleError(w http.ResponseWriter, r *http.Request, err st
expiresAfter := e.Spec.ExpireAnalyticsAfter
if config.EnforceOrgDataAge {
thisOrg := e.Spec.OrgID
orgSessionState, found := e.GetOrgSession(thisOrg)
if found {
if orgSessionState.DataExpires > 0 {
expiresAfter = orgSessionState.DataExpires
}
orgExpireDataTime := e.GetOrgSessionExpiry(thisOrg)

if orgExpireDataTime > 0 {
expiresAfter = orgExpireDataTime
}

}

thisRecord.SetExpiry(expiresAfter)
Expand All @@ -144,7 +144,7 @@ func (e ErrorHandler) HandleError(w http.ResponseWriter, r *http.Request, err st
}

var obfuscated string

if len(keyName) > 4 {
obfuscated = "****" + keyName[len(keyName)-4:]
}
Expand Down
36 changes: 29 additions & 7 deletions handler_success.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
)

var SessionCache *cache.Cache = cache.New(10*time.Second, 5*time.Second)
var ExpiryCache *cache.Cache = cache.New(600*time.Second, 5*time.Second)

type ReturningHttpHandler interface {
ServeHTTP(http.ResponseWriter, *http.Request) *http.Response
Expand Down Expand Up @@ -64,12 +65,32 @@ func (t TykMiddleware) GetOrgSession(key string) (SessionState, bool) {
thisSession, found = t.Spec.OrgSessionManager.GetSessionDetail(key)
if found {
// If exists, assume it has been authorized and pass on
if config.EnforceOrgDataAge {
// We cache org expiry data
log.Debug("Setting data expiry: ", thisSession.OrgID)
go t.SetOrgExpiry(thisSession.OrgID, thisSession.DataExpires)
}
return thisSession, true
}

return thisSession, found
}

func (t TykMiddleware) SetOrgExpiry(orgid string, expiry int64) {
ExpiryCache.Set(orgid, expiry, cache.DefaultExpiration)
}

func (t TykMiddleware) GetOrgSessionExpiry(orgid string) int64 {
log.Debug("Checking: ", orgid)
cachedVal, found := ExpiryCache.Get(orgid)
if !found {
log.Debug("no cached entry found, returning 7 days")
return 604800
}

return cachedVal.(int64)
}

// ApplyPolicyIfExists will check if a policy is loaded, if it is, it will overwrite the session state to use the policy values
func (t TykMiddleware) ApplyPolicyIfExists(key string, thisSession *SessionState) {
if thisSession.ApplyPolicyID != "" {
Expand Down Expand Up @@ -161,6 +182,7 @@ func (t TykMiddleware) CheckSessionAndIdentityForValidKey(key string) (SessionSt
}

// Check session store
log.Debug("Querying keystore")
thisSession, found = t.Spec.SessionManager.GetSessionDetail(key)
if found {
// If exists, assume it has been authorized and pass on
Expand All @@ -169,11 +191,11 @@ func (t TykMiddleware) CheckSessionAndIdentityForValidKey(key string) (SessionSt

// Check for a policy, if there is a policy, pull it and overwrite the session values
t.ApplyPolicyIfExists(key, &thisSession)
log.Debug("Got key")
return thisSession, true
}

// 2. If not there, get it from the AuthorizationHandler

thisSession, found = t.Spec.AuthManager.IsKeyAuthorised(key)
if found {
// If not in Session, and got it from AuthHandler, create a session with a new TTL
Expand Down Expand Up @@ -279,11 +301,10 @@ func (s SuccessHandler) RecordHit(w http.ResponseWriter, r *http.Request, timing
expiresAfter := s.Spec.ExpireAnalyticsAfter
if config.EnforceOrgDataAge {
thisOrg := s.Spec.OrgID
orgSessionState, found := s.GetOrgSession(thisOrg)
if found {
if orgSessionState.DataExpires > 0 {
expiresAfter = orgSessionState.DataExpires
}
orgExpireDataTime := s.GetOrgSessionExpiry(thisOrg)

if orgExpireDataTime > 0 {
expiresAfter = orgExpireDataTime
}
}

Expand All @@ -306,6 +327,7 @@ func (s SuccessHandler) RecordHit(w http.ResponseWriter, r *http.Request, timing
// final destination, this is invoked by the ProxyHandler or right at the start of a request chain if the URL
// Spec states the path is Ignored
func (s SuccessHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) *http.Response {
log.Debug("Started proxy")
// Make sure we get the correct target URL
if s.Spec.APIDefinition.Proxy.StripListenPath {
log.Debug("Stripping: ", s.Spec.Proxy.ListenPath)
Expand Down Expand Up @@ -333,7 +355,7 @@ func (s SuccessHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) *http.
if resp != nil {
s.RecordHit(w, r, int64(millisec), resp.StatusCode, copiedRequest, copiedResponse)
}

log.Debug("Done proxy")
return nil
}

Expand Down
1 change: 0 additions & 1 deletion middleware_auth_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func (k *AuthKey) ProcessRequest(w http.ResponseWriter, r *http.Request, configu
// Set session state on context, we will need it later
context.Set(r, SessionData, thisSessionState)
context.Set(r, AuthHeaderValue, key)

return nil, 200
}

Expand Down
1 change: 0 additions & 1 deletion middleware_ip_whitelist.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func (i *IPWhiteListMiddleware) GetConfig() (interface{}, error) {

// ProcessRequest will run any checks on the request on the way through the system, return an error to have the chain fail
func (i *IPWhiteListMiddleware) ProcessRequest(w http.ResponseWriter, r *http.Request, configuration interface{}) (error, int) {

// Disabled, pass through
if !i.TykMiddleware.Spec.EnableIpWhiteListing {
return nil, 200
Expand Down
1 change: 0 additions & 1 deletion middleware_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func (t *TransformMiddleware) ProcessRequest(w http.ResponseWriter, r *http.Requ
}
r.Body = ioutil.NopCloser(&bodyBuffer)
r.ContentLength = int64(bodyBuffer.Len())

}

return nil, 200
Expand Down
33 changes: 8 additions & 25 deletions res_handler_transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package main
import (
"bytes"
"encoding/json"
"github.com/lonelycode/tykcommon"
"github.com/Sirupsen/logrus"
"github.com/clbanning/mxj"
"github.com/lonelycode/tykcommon"
"github.com/mitchellh/mapstructure"
"io/ioutil"
"github.com/clbanning/mxj"
"net/http"
"strconv"
)
Expand All @@ -34,28 +34,11 @@ func (rt ResponseTransformMiddleware) New(c interface{}, spec *APISpec) (TykResp
thisHandler.config = thisModuleConfig
thisHandler.Spec = spec

log.Warning("Response body transform processor initialised")
log.Debug("Response body transform processor initialised")

return thisHandler, nil
}

// func (rt ResponseTransformMiddleware) copyResponse(dst io.Writer, src io.Reader) {
// if rt.FlushInterval != 0 {
// if wf, ok := dst.(writeFlusher); ok {
// mlw := &maxLatencyWriter{
// dst: wf,
// latency: p.FlushInterval,
// done: make(chan bool),
// }
// go mlw.flushLoop()
// defer mlw.stop()
// dst = mlw
// }
// }

// io.Copy(dst, src)
// }

func (rt ResponseTransformMiddleware) HandleResponse(rw http.ResponseWriter, res *http.Response, req *http.Request, ses *SessionState) error {
// New request checker, more targetted, less likely to fail
var stat RequestStatus
Expand Down Expand Up @@ -102,11 +85,11 @@ func (rt ResponseTransformMiddleware) HandleResponse(rw http.ResponseWriter, res

if err != nil {
log.WithFields(logrus.Fields{
"prefix": "outbound-transform",
"server_name": rt.Spec.APIDefinition.Proxy.TargetURL,
"api_id": rt.Spec.APIDefinition.APIID,
"path": req.URL.Path,
}).Error("Failed to apply template to request: ", err)
"prefix": "outbound-transform",
"server_name": rt.Spec.APIDefinition.Proxy.TargetURL,
"api_id": rt.Spec.APIDefinition.APIID,
"path": req.URL.Path,
}).Error("Failed to apply template to request: ", err)
}

res.ContentLength = int64(bodyBuffer.Len())
Expand Down

0 comments on commit 30757ce

Please sign in to comment.