Skip to content

Sp/issue102 #207

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: sp/issue102
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixed outstanding checkstyle issues. fixed producer config controller…
… tests. fixed unique producer/topic issue
  • Loading branch information
stuart-spradling committed Mar 31, 2020
commit 3a13fa7df80b51db415faac536799a7af14d7510
2 changes: 1 addition & 1 deletion kafka-webview-ui/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>kafka-webview</artifactId>
<groupId>org.sourcelab</groupId>
<version>2.6.0</version>
<version>2.5.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-webview-ui</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,19 @@
import org.sourcelab.kafka.webview.ui.controller.BaseController;
import org.sourcelab.kafka.webview.ui.controller.api.exceptions.ApiException;
import org.sourcelab.kafka.webview.ui.controller.api.exceptions.NotFoundApiException;
import org.sourcelab.kafka.webview.ui.controller.api.requests.*;
import org.sourcelab.kafka.webview.ui.controller.api.requests.CreateTopicRequest;
import org.sourcelab.kafka.webview.ui.controller.api.requests.ModifyTopicConfigRequest;
import org.sourcelab.kafka.webview.ui.controller.api.requests.SendMessageRequest;
import org.sourcelab.kafka.webview.ui.controller.api.requests.DeleteTopicRequest;
import org.sourcelab.kafka.webview.ui.controller.api.requests.ConsumerRemoveRequest;
import org.sourcelab.kafka.webview.ui.controller.api.requests.ConsumeRequest;
import org.sourcelab.kafka.webview.ui.controller.api.responses.ResultResponse;
import org.sourcelab.kafka.webview.ui.manager.kafka.*;
import org.sourcelab.kafka.webview.ui.manager.kafka.ViewCustomizer;
import org.sourcelab.kafka.webview.ui.manager.kafka.SessionIdentifier;
import org.sourcelab.kafka.webview.ui.manager.kafka.WebKafkaConsumer;
import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaOperations;
import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaOperationsFactory;
import org.sourcelab.kafka.webview.ui.manager.kafka.WebKafkaConsumerFactory;
import org.sourcelab.kafka.webview.ui.manager.kafka.config.FilterDefinition;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ApiErrorResponse;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConfigItem;
Expand All @@ -47,8 +57,6 @@
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.TopicDetails;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.TopicList;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.TopicListing;
import org.sourcelab.kafka.webview.ui.model.*;
import org.sourcelab.kafka.webview.ui.repository.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
Expand All @@ -57,16 +65,47 @@
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.*;

import java.util.*;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.ModelAttribute;


import org.sourcelab.kafka.webview.ui.repository.ViewRepository;
import org.sourcelab.kafka.webview.ui.repository.ClusterRepository;
import org.sourcelab.kafka.webview.ui.repository.FilterRepository;
import org.sourcelab.kafka.webview.ui.repository.ProducerRepository;

import org.sourcelab.kafka.webview.ui.model.View;
import org.sourcelab.kafka.webview.ui.model.Producer;
import org.sourcelab.kafka.webview.ui.model.Cluster;
import org.sourcelab.kafka.webview.ui.model.Filter;

import java.util.UUID;
import java.util.Comparator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Optional;
import java.util.Collection;
import java.util.Set;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

/**
* Handles API requests.
*/
@Controller
@RequestMapping("/api")
@RequestMapping( "/api")
public class ApiController extends BaseController {
@Autowired
private ViewRepository viewRepository;
Expand Down Expand Up @@ -585,15 +624,19 @@ public boolean removeConsumer(
}

/**
* POST put a message on kafka bus
* POST put a message on kafka bus.
*/
@ResponseBody
@PostMapping(path="/producer/{id}/send-message", produces = "application/json")
public void sendMessage(@PathVariable final Long id, @RequestBody final SendMessageRequest request )
{
@PostMapping(path = "/producer/{id}/send-message", produces = "application/json")
public void sendMessage(@PathVariable final Long id, @RequestBody final SendMessageRequest request ) {

final Producer producer = producerRepository
.findById( id )
.orElseThrow( () -> new NotFoundApiException( "Producer", "Unable to find producer" ) );

final Producer producer = producerRepository.findById( id ).orElseThrow( () -> new NotFoundApiException( "Producer", "Unable to find producer" ) );
final Cluster cluster = clusterRepository.findById( producer.getCluster().getId() ).orElseThrow( () -> new NotFoundApiException( "Producer", "Unable to find cluster" ) );
final Cluster cluster = clusterRepository
.findById( producer.getCluster().getId() )
.orElseThrow( () -> new NotFoundApiException( "Producer", "Unable to find cluster" ) );


Map<String, Object> producerFactoryConfigs = new HashMap<>();
Expand All @@ -603,15 +646,14 @@ public void sendMessage(@PathVariable final Long id, @RequestBody final SendMess

KafkaTemplate<String, String> template = new KafkaTemplate<>( new DefaultKafkaProducerFactory<>( producerFactoryConfigs ) );

ProducerRecord<String, String> record = new ProducerRecord<>( producer.getTopic(), UUID.randomUUID().toString(), request.getMessageAsJson() );
ProducerRecord<String, String> record =
new ProducerRecord<>( producer.getTopic(), UUID.randomUUID().toString(), request.getMessageAsJson() );

ListenableFuture<SendResult<String, String>> future = template.send( record );

try
{
try {
future.get(); // ensure it was successful. will throw exception otherwise
} catch( ExecutionException | InterruptedException e )
{
} catch ( ExecutionException | InterruptedException e ) {
//pfffft?????
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,31 @@

import java.util.Map;

public class SendMessageRequest
{
/**
* An API request body to put a message on the Kafka Bus.
*/
public class SendMessageRequest {

private Map<String, Object> messageMap;

public Map<String, Object> getMessageMap()
{
public Map<String, Object> getMessageMap() {
return messageMap;
}

public void setMessageMap( Map<String, Object> messageMap )
{
public void setMessageMap( Map<String, Object> messageMap ) {
this.messageMap = messageMap;
}

public String getMessageAsJson()
{
/**
* Get the message map as a JSON representation string.
* @return a JSON representation of the message map
*/
public String getMessageAsJson() {
String result = "";
ObjectMapper mapper = new ObjectMapper();
try
{
try {
result = mapper.writeValueAsString( messageMap );
}catch ( JsonProcessingException e )
{
} catch ( JsonProcessingException e ) {
//lol oops
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
package org.sourcelab.kafka.webview.ui.controller.configuration.producer;

/**
* MIT License
*
Expand All @@ -21,11 +23,9 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package org.sourcelab.kafka.webview.ui.controller.configuration.producer;

import org.sourcelab.kafka.webview.ui.controller.BaseController;
import org.sourcelab.kafka.webview.ui.controller.configuration.producer.forms.ProducerForm;
import org.sourcelab.kafka.webview.ui.controller.configuration.view.forms.ViewForm;
import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaOperations;
import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaOperationsFactory;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.TopicDetails;
Expand All @@ -44,20 +44,26 @@
import org.springframework.ui.Model;
import org.springframework.validation.BindingResult;
import org.springframework.validation.FieldError;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.servlet.mvc.support.RedirectAttributes;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PathVariable;


import javax.validation.Valid;
import java.sql.Timestamp;
import java.util.*;
import java.util.ArrayList;
import java.util.Optional;

/**
* Controller for CRUD over Producer entities.
*/
@Controller
@RequestMapping("/configuration/producer")
public class ProducerConfigController extends BaseController
{
@RequestMapping( "/configuration/producer")
public class ProducerConfigController extends BaseController {
@Autowired
private ClusterRepository clusterRepository;

Expand All @@ -73,7 +79,9 @@ public class ProducerConfigController extends BaseController
@Autowired
private KafkaOperationsFactory kafkaOperationsFactory;


/**
* Initialize the index page for Configuring Producers.
*/
@GetMapping
public String index(final Model model) {
// Setup breadcrumbs
Expand All @@ -87,14 +95,12 @@ public String index(final Model model) {
}

/**
* GET Displays create producer form.
* Get the Create Producer page.
*/
@GetMapping( "/create")
public String createProducerForm( final ProducerForm producerForm, final Model model)
{
public String createProducerForm( final ProducerForm producerForm, final Model model) {
// Setup breadcrubs
if(!model.containsAttribute( "BreadCrumbs" ))
{
if (!model.containsAttribute( "BreadCrumbs" )) {
setupBreadCrumbs( model, "Create", null );
}

Expand All @@ -107,8 +113,11 @@ public String createProducerForm( final ProducerForm producerForm, final Model m

model.addAttribute("topics", new ArrayList<>());

if( producerForm.getClusterId() != null )
{
if ( producerForm.hasPropertyList() ) {
model.addAttribute( "producerMessagePropertyNames", producerForm.getPropertyNameListAsArray() );
}

if ( producerForm.getClusterId() != null ) {
clusterRepository.findById( producerForm.getClusterId() ).ifPresent( (cluster) -> {
try (final KafkaOperations operations = kafkaOperationsFactory.create(cluster, getLoggedInUserId())) {
final TopicList topics = operations.getAvailableTopics();
Expand All @@ -126,14 +135,15 @@ public String createProducerForm( final ProducerForm producerForm, final Model m
return "configuration/producer/create";
}

/**
* Edit a Producer by Id.
*/
@GetMapping("/edit/{id}")
public String editProducer(@PathVariable final Long id, final ProducerForm producerForm,
final RedirectAttributes redirectAttributes,
final Model model)
{
final Model model) {
final Optional<Producer> producerOptional = producerRepository.findById( id );
if(!producerOptional.isPresent())
{
if (!producerOptional.isPresent()) {
// Set flash message
redirectAttributes.addFlashAttribute("FlashMessage", FlashMessage.newWarning("Unable to find producer!"));

Expand All @@ -153,7 +163,10 @@ public String editProducer(@PathVariable final Long id, final ProducerForm produ
return createProducerForm( producerForm, model );
}

@RequestMapping(path = "/delete/{id}", method = RequestMethod.POST)
/**
* Delete a Producer by Id.
*/
@PostMapping( path = "/delete/{id}" )
public String deleteProducer(@PathVariable final Long id, final RedirectAttributes redirectAttributes) {
// Retrieve it
if (!producerRepository.existsById(id)) {
Expand All @@ -170,13 +183,15 @@ public String deleteProducer(@PathVariable final Long id, final RedirectAttribut
return "redirect:/configuration/producer";
}

@RequestMapping(path = "/update", method = RequestMethod.POST)
/**
* Update a producer.
*/
@PostMapping(path = "/update")
public String updateProducer(
@Valid final ProducerForm producerForm,
final BindingResult bindingResult,
final RedirectAttributes redirectAttributes,
final Model model)
{
final Model model) {
// Determine if we're updating or creating
final boolean updateExisting = producerForm.exists();

Expand Down Expand Up @@ -215,8 +230,7 @@ public String updateProducer(

//Retrieve producer message
final Optional<ProducerMessage> producerMessageOptional = producerMessageRepository.findByProducer( producer );
if(!producerMessageOptional.isPresent())
{
if (!producerMessageOptional.isPresent()) {
// Set flash message and redirect
redirectAttributes.addFlashAttribute("FlashMessage", FlashMessage.newWarning("Unable to find producer's message!"));

Expand Down
Loading