@@ -146,7 +146,7 @@ func (c *Client) handleResponse() {
146146 }
147147}
148148
149- func (c * Client ) handleSaslHandshakeResponse (streamingRes * ReaderProtocol , r * bufio.Reader ) interface {} {
149+ func (c * Client ) handleSaslHandshakeResponse (streamingRes * ReaderProtocol , r * bufio.Reader ) {
150150 streamingRes .CorrelationId , _ = readUInt (r )
151151 streamingRes .ResponseCode = uShortExtractResponseCode (readUShort (r ))
152152 mechanismsCount , _ := readUInt (r )
@@ -158,12 +158,11 @@ func (c *Client) handleSaslHandshakeResponse(streamingRes *ReaderProtocol, r *bu
158158
159159 res , err := c .coordinator .GetResponseById (streamingRes .CorrelationId )
160160 if err != nil {
161- // TODO handle response
162- return err
161+ logErrorCommand ( err , "handleSaslHandshakeResponse" )
162+ return
163163 }
164- res .data <- mechanisms
165164
166- return mechanisms
165+ res . data <- mechanisms
167166}
168167
169168func (c * Client ) handlePeerProperties (readProtocol * ReaderProtocol , r * bufio.Reader ) {
@@ -178,7 +177,11 @@ func (c *Client) handlePeerProperties(readProtocol *ReaderProtocol, r *bufio.Rea
178177 serverProperties [key ] = value
179178 }
180179 res , err := c .coordinator .GetResponseById (readProtocol .CorrelationId )
181- logErrorCommand (err , "handlePeerProperties" )
180+ if err != nil {
181+ logErrorCommand (err , "handlePeerProperties" )
182+ return
183+ }
184+
182185 res .code <- Code {id : readProtocol .ResponseCode }
183186 res .data <- serverProperties
184187
@@ -210,7 +213,11 @@ func (c *Client) handleGenericResponse(readProtocol *ReaderProtocol, r *bufio.Re
210213 readProtocol .CorrelationId , _ = readUInt (r )
211214 readProtocol .ResponseCode = uShortExtractResponseCode (readUShort (r ))
212215 res , err := c .coordinator .GetResponseById (readProtocol .CorrelationId )
213- logErrorCommand (err , "handleGenericResponse" )
216+ if err != nil {
217+ logErrorCommand (err , "handleGenericResponse" )
218+ return
219+ }
220+
214221 res .code <- Code {id : readProtocol .ResponseCode }
215222}
216223
@@ -237,7 +244,11 @@ func (c *Client) commandOpen(readProtocol *ReaderProtocol, r *bufio.Reader) {
237244 }
238245
239246 res , err := c .coordinator .GetResponseById (readProtocol .CorrelationId )
240- logErrorCommand (err , "commandOpen" )
247+ if err != nil {
248+ logErrorCommand (err , "commandOpen" )
249+ return
250+ }
251+
241252 res .code <- Code {id : readProtocol .ResponseCode }
242253 res .data <- clientProperties
243254
@@ -277,7 +288,11 @@ func (c *Client) queryPublisherSequenceFrameHandler(readProtocol *ReaderProtocol
277288 readProtocol .ResponseCode = uShortExtractResponseCode (readUShort (r ))
278289 sequence := readInt64 (r )
279290 res , err := c .coordinator .GetResponseById (readProtocol .CorrelationId )
280- logErrorCommand (err , "queryPublisherSequenceFrameHandler" )
291+ if err != nil {
292+ logErrorCommand (err , "queryPublisherSequenceFrameHandler" )
293+ return
294+ }
295+
281296 res .code <- Code {id : readProtocol .ResponseCode }
282297 res .data <- sequence
283298}
@@ -458,7 +473,11 @@ func (c *Client) queryOffsetFrameHandler(readProtocol *ReaderProtocol,
458473 c .handleGenericResponse (readProtocol , r )
459474 offset := readInt64 (r )
460475 res , err := c .coordinator .GetResponseById (readProtocol .CorrelationId )
461- logErrorCommand (err , "queryOffsetFrameHandler" )
476+ if err != nil {
477+ logErrorCommand (err , "queryOffsetFrameHandler" )
478+ return
479+ }
480+
462481 res .data <- offset
463482}
464483
@@ -516,7 +535,11 @@ func (c *Client) streamStatusFrameHandler(readProtocol *ReaderProtocol,
516535 streamStatus [key ] = value
517536 }
518537 res , err := c .coordinator .GetResponseById (readProtocol .CorrelationId )
519- logErrorCommand (err , "streamStatusFrameHandler" )
538+ if err != nil {
539+ logErrorCommand (err , "streamStatusFrameHandler" )
540+ return
541+ }
542+
520543 res .code <- Code {id : readProtocol .ResponseCode }
521544 res .data <- streamStatus
522545
@@ -553,7 +576,10 @@ func (c *Client) metadataFrameHandler(readProtocol *ReaderProtocol,
553576 }
554577
555578 res , err := c .coordinator .GetResponseById (readProtocol .CorrelationId )
556- logErrorCommand (err , "metadataFrameHandler" )
579+ if err != nil {
580+ logErrorCommand (err , "metadataFrameHandler" )
581+ return
582+ }
557583
558584 res .code <- Code {id : readProtocol .ResponseCode }
559585 res .data <- streamsMetadata
@@ -612,7 +638,11 @@ func (c *Client) handleQueryPartitions(readProtocol *ReaderProtocol, r *bufio.Re
612638 partitions = append (partitions , partition )
613639 }
614640 res , err := c .coordinator .GetResponseById (readProtocol .CorrelationId )
615- logErrorCommand (err , "handleQueryPartitions" )
641+ if err != nil {
642+ logErrorCommand (err , "handleQueryPartitions" )
643+ return
644+ }
645+
616646 res .code <- Code {id : readProtocol .ResponseCode }
617647 res .data <- partitions
618648}
@@ -629,7 +659,11 @@ func (c *Client) handleQueryRoute(readProtocol *ReaderProtocol, r *bufio.Reader)
629659 }
630660
631661 res , err := c .coordinator .GetResponseById (readProtocol .CorrelationId )
632- logErrorCommand (err , "handleQueryRoute" )
662+ if err != nil {
663+ logErrorCommand (err , "handleQueryRoute" )
664+ return
665+ }
666+
633667 res .code <- Code {id : readProtocol .ResponseCode }
634668 res .data <- routes
635669}
@@ -646,7 +680,11 @@ func (c *Client) handleExchangeVersionResponse(readProtocol *ReaderProtocol, r *
646680 commands = append (commands , newCommandVersionResponse (minVersion , maxVersion , commandKey ))
647681 }
648682 res , err := c .coordinator .GetResponseById (readProtocol .CorrelationId )
649- logErrorCommand (err , "handleExchangeVersionResponse" )
683+ if err != nil {
684+ logErrorCommand (err , "handleExchangeVersionResponse" )
685+ return
686+ }
687+
650688 res .code <- Code {id : readProtocol .ResponseCode }
651689 res .data <- commands
652690}
0 commit comments