diff --git a/.gitignore b/.gitignore index 963073e..1692abc 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ # Go vendor +rainbow.db # Test binaries *.test diff --git a/README.md b/README.md index d8c595d..250a1c3 100755 --- a/README.md +++ b/README.md @@ -42,10 +42,19 @@ make server ``` ```console go run cmd/server/server.go -2024/02/11 15:36:19 creating server... -2024/02/11 15:36:19 starting server: server (development) vv0.0.1-default -2024/02/11 15:36:19 server listening: [::]:50051 -2024/02/11 15:39:08 received register: type_url:"type.googleapis.com/google.protobuf.StringValue" value:"\n\x07keebler" +2024/02/11 18:06:57 creating ๐ŸŒˆ๏ธ server... +2024/02/11 18:06:57 ๐Ÿงน๏ธ cleaning up rainbow.db... +2024/02/11 18:06:57 โœจ๏ธ creating rainbow.db... +2024/02/11 18:06:57 rainbow.db file created +2024/02/11 18:06:57 create jobs table... +2024/02/11 18:06:57 jobs table created +2024/02/11 18:06:57 create cluster table... +2024/02/11 18:06:57 cluster table created +2024/02/11 18:06:57 starting scheduler server: rainbow vv0.0.1-default +2024/02/11 18:06:57 server listening: [::]:50051 +2024/02/11 18:06:59 ๐Ÿ“๏ธ received register: keebler +SELECT count(*) from clusters WHERE name LIKE "keebler": (0) +INSERT into clusters VALUES ("keebler", "2804a013-89df-433d-a904-4666a6415ad0"): (1) ``` And then mock a registration: @@ -55,14 +64,15 @@ make register ``` ```console go run cmd/register/register.go -2024/02/11 15:46:53 creating client (v0.0.1-default)... -2024/02/11 15:46:53 starting client (localhost:50051)... -2024/02/11 15:46:53 registering cluster: keebler -request_id:"0f7a0e7d-c2ed-4a46-9eaa-3d554349244e" -2024/02/11 15:46:53 received response: register success +2024/02/11 18:06:59 creating client (v0.0.1-default)... +2024/02/11 18:06:59 ๐ŸŒˆ๏ธ starting client (localhost:50051)... +2024/02/11 18:06:59 registering cluster: keebler +2024/02/11 18:06:59 status: REGISTER_SUCCESS +2024/02/11 18:06:59 token: 2804a013-89df-433d-a904-4666a6415ad0 ``` -Nothing meaningful is happening yet - I'm just creating a skeleton (and learning about servers / services in Go with grpc more) and am going to add meat to this. My plan is below in [TODO](#TODO). +In the above, we are providing a cluster name (keebler) and it is being registered to the database, and a token and status returned. +We would then use this token for subsequent requests to interact with the cluster. ## Container Images @@ -70,10 +80,8 @@ Nothing meaningful is happening yet - I'm just creating a skeleton (and learning ## TODO -- Add an actual database (sqlite) to the server, which should init, create tables for clusters, jobs (ids and cluster assignment) -- When a registration is done, it should check against this database (and add a new cluster or determine already registered) -- Add a secret to validate that, and generation of a cluster-specific token to validate further responses. - Write the job submission endpoint, which should take a cluster name and command, and return status (success, denied, etc.) +- Make a nicer (single UI entrypoint) for client with different functions At this point we will have a dumb little database with jobs assigned to clusters. We can then modify the client to add a polling command (intended to be run on a flux instance) that will use the cluster-specific token to say "Do you have any jobs for me?" at some interval. This can run anywhere there is a Flux instance. It can receive the job, and run it. When it receives the job, the job will be deleted from the database, because we don't care anymore. diff --git a/api/v1/api.proto b/api/v1/api.proto index f68a123..8370176 100755 --- a/api/v1/api.proto +++ b/api/v1/api.proto @@ -12,7 +12,7 @@ option go_package = "github.com/converged-computing/rainbow/pkg/api/v1"; service Service { // Register cluster - request to register a new cluster - rpc Register(Request) returns (RegisterResponse); + rpc Register(RegisterRequest) returns (RegisterResponse); // Job Submission - request for submitting a job to a named cluster rpc SubmitJob(Request) returns (SubmitJobResponse); @@ -48,14 +48,21 @@ message Request { google.protobuf.Timestamp sent = 2; } +message RegisterRequest { + string name = 1; + string secret = 2; + google.protobuf.Timestamp sent = 3; +} + + // Testing response - the server's response to a request. message Response { // Enum to represent the result types of the operation. enum ResultType { - RESULT_TYPE_UNSPECIFIED = 0; // Default value, unspecified result type. - RESULT_TYPE_SUCCESS = 1; // Indicates successful processing. - RESULT_TYPE_ERROR = 2; // Indicates an error occurred. + RESULT_TYPE_UNSPECIFIED = 0; + RESULT_TYPE_SUCCESS = 1; + RESULT_TYPE_ERROR = 2; } // Unique identifier correlating to the request. @@ -74,14 +81,17 @@ message Response { // Register Response message RegisterResponse { - // Enum to represent the result types of the operation. + // Registration statuses enum ResultType { REGISTER_UNSPECIFIED = 0; REGISTER_SUCCESS = 1; REGISTER_ERROR = 2; REGISTER_DENIED = 3; + REGISTER_EXISTS = 4; } string request_id = 1; + string token = 2; + ResultType status = 3; } // Submit Job Response diff --git a/cmd/register/register.go b/cmd/register/register.go index 5fda47c..6531ea4 100755 --- a/cmd/register/register.go +++ b/cmd/register/register.go @@ -11,14 +11,14 @@ import ( var ( host string clusterName string - - // set at build time - version = "v0.0.1-default" + secret string + version = "v0.0.1-default" ) func main() { flag.StringVar(&host, "host", "localhost:50051", "Scheduler server address (host:port)") flag.StringVar(&clusterName, "cluster", "keebler", "Name of cluster to register") + flag.StringVar(&secret, "secret", "chocolate-cookies", "Registration 'secret'") flag.Parse() log.Printf("creating client (%s)...", version) @@ -31,9 +31,12 @@ func main() { log.Printf("registering cluster: %s", clusterName) // Last argument is secret, empty for now - m, err := c.Register(context.Background(), clusterName, "") + response, err := c.Register(context.Background(), clusterName, secret) if err != nil { log.Fatalf("error while running client: %v", err) } - log.Printf("received response: %s", m) + + // If we get here, success! Dump all the stuff. + log.Printf("status: %s", response.Status) + log.Printf(" token: %s", response.Token) } diff --git a/cmd/server/server.go b/cmd/server/server.go index ffb8cdd..53e84bf 100755 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -10,32 +10,37 @@ import ( var ( address string - name = "server" + name = "rainbow" + sqliteFile = "rainbow.db" environment = "development" - // set at build time - version = "v0.0.1-default" + // Remove the previous database + skipCleanup = false + secret = "chocolate-cookies" + version = "v0.0.1-default" ) func main() { flag.StringVar(&address, "address", ":50051", "Server address (host:port)") - flag.StringVar(&name, "name", name, "Server name (default: server)") - flag.StringVar(&environment, "environment", environment, "Server environment (default: development)") + flag.StringVar(&name, "name", name, "Server name (default: rainbow)") + flag.StringVar(&sqliteFile, "db", sqliteFile, "sqlite3 database file (default: rainbow.db)") + flag.StringVar(&secret, "secret", secret, "secret to validate registration (default: chocolate-cookies)") + flag.StringVar(&environment, "environment", environment, "environment (default: development)") + flag.BoolVar(&skipCleanup, "skip-cleanup", skipCleanup, "skip cleanup of previous sqlite database (default: false)") flag.Parse() // create server - log.Print("creating server...") - s, err := server.NewServer(name, version, environment) + log.Print("creating ๐ŸŒˆ๏ธ server...") + s, err := server.NewServer(name, version, environment, sqliteFile, !skipCleanup, secret) if err != nil { log.Fatalf("error while creating server: %v", err) } defer s.Stop() // run server - log.Printf("starting server: %s", s.String()) + log.Printf("starting scheduler server: %s", s.String()) if err := s.Start(context.Background(), address); err != nil { - log.Fatalf("error while running server: %v", err) + log.Fatalf("error while running scheduler server: %v", err) } - - log.Printf("done") + log.Printf("๐ŸŒˆ๏ธ done ๐ŸŒˆ๏ธ") } diff --git a/go.mod b/go.mod index 30d2090..067ddb6 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.20 require ( github.com/google/uuid v1.6.0 + github.com/mattn/go-sqlite3 v1.14.22 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.4 google.golang.org/grpc v1.61.0 diff --git a/go.sum b/go.sum index 973671a..e9aeb56 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/pkg/api/v1/api.pb.go b/pkg/api/v1/api.pb.go index d616361..6c5909f 100644 --- a/pkg/api/v1/api.pb.go +++ b/pkg/api/v1/api.pb.go @@ -26,9 +26,9 @@ const ( type Response_ResultType int32 const ( - Response_RESULT_TYPE_UNSPECIFIED Response_ResultType = 0 // Default value, unspecified result type. - Response_RESULT_TYPE_SUCCESS Response_ResultType = 1 // Indicates successful processing. - Response_RESULT_TYPE_ERROR Response_ResultType = 2 // Indicates an error occurred. + Response_RESULT_TYPE_UNSPECIFIED Response_ResultType = 0 + Response_RESULT_TYPE_SUCCESS Response_ResultType = 1 + Response_RESULT_TYPE_ERROR Response_ResultType = 2 ) // Enum value maps for Response_ResultType. @@ -69,10 +69,10 @@ func (x Response_ResultType) Number() protoreflect.EnumNumber { // Deprecated: Use Response_ResultType.Descriptor instead. func (Response_ResultType) EnumDescriptor() ([]byte, []int) { - return file_api_proto_rawDescGZIP(), []int{2, 0} + return file_api_proto_rawDescGZIP(), []int{3, 0} } -// Enum to represent the result types of the operation. +// Registration statuses type RegisterResponse_ResultType int32 const ( @@ -80,6 +80,7 @@ const ( RegisterResponse_REGISTER_SUCCESS RegisterResponse_ResultType = 1 RegisterResponse_REGISTER_ERROR RegisterResponse_ResultType = 2 RegisterResponse_REGISTER_DENIED RegisterResponse_ResultType = 3 + RegisterResponse_REGISTER_EXISTS RegisterResponse_ResultType = 4 ) // Enum value maps for RegisterResponse_ResultType. @@ -89,12 +90,14 @@ var ( 1: "REGISTER_SUCCESS", 2: "REGISTER_ERROR", 3: "REGISTER_DENIED", + 4: "REGISTER_EXISTS", } RegisterResponse_ResultType_value = map[string]int32{ "REGISTER_UNSPECIFIED": 0, "REGISTER_SUCCESS": 1, "REGISTER_ERROR": 2, "REGISTER_DENIED": 3, + "REGISTER_EXISTS": 4, } ) @@ -122,7 +125,7 @@ func (x RegisterResponse_ResultType) Number() protoreflect.EnumNumber { // Deprecated: Use RegisterResponse_ResultType.Descriptor instead. func (RegisterResponse_ResultType) EnumDescriptor() ([]byte, []int) { - return file_api_proto_rawDescGZIP(), []int{3, 0} + return file_api_proto_rawDescGZIP(), []int{4, 0} } // Enum to represent the result types of the operation. @@ -175,7 +178,7 @@ func (x SubmitJobResponse_ResultType) Number() protoreflect.EnumNumber { // Deprecated: Use SubmitJobResponse_ResultType.Descriptor instead. func (SubmitJobResponse_ResultType) EnumDescriptor() ([]byte, []int) { - return file_api_proto_rawDescGZIP(), []int{4, 0} + return file_api_proto_rawDescGZIP(), []int{5, 0} } // Content represents the message content with metadata. @@ -304,6 +307,69 @@ func (x *Request) GetSent() *timestamppb.Timestamp { return nil } +type RegisterRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Secret string `protobuf:"bytes,2,opt,name=secret,proto3" json:"secret,omitempty"` + Sent *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=sent,proto3" json:"sent,omitempty"` +} + +func (x *RegisterRequest) Reset() { + *x = RegisterRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_api_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RegisterRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterRequest) ProtoMessage() {} + +func (x *RegisterRequest) ProtoReflect() protoreflect.Message { + mi := &file_api_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterRequest.ProtoReflect.Descriptor instead. +func (*RegisterRequest) Descriptor() ([]byte, []int) { + return file_api_proto_rawDescGZIP(), []int{2} +} + +func (x *RegisterRequest) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *RegisterRequest) GetSecret() string { + if x != nil { + return x.Secret + } + return "" +} + +func (x *RegisterRequest) GetSent() *timestamppb.Timestamp { + if x != nil { + return x.Sent + } + return nil +} + // Testing response - the server's response to a request. type Response struct { state protoimpl.MessageState @@ -323,7 +389,7 @@ type Response struct { func (x *Response) Reset() { *x = Response{} if protoimpl.UnsafeEnabled { - mi := &file_api_proto_msgTypes[2] + mi := &file_api_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -336,7 +402,7 @@ func (x *Response) String() string { func (*Response) ProtoMessage() {} func (x *Response) ProtoReflect() protoreflect.Message { - mi := &file_api_proto_msgTypes[2] + mi := &file_api_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -349,7 +415,7 @@ func (x *Response) ProtoReflect() protoreflect.Message { // Deprecated: Use Response.ProtoReflect.Descriptor instead. func (*Response) Descriptor() ([]byte, []int) { - return file_api_proto_rawDescGZIP(), []int{2} + return file_api_proto_rawDescGZIP(), []int{3} } func (x *Response) GetRequestId() string { @@ -386,13 +452,15 @@ type RegisterResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` + Token string `protobuf:"bytes,2,opt,name=token,proto3" json:"token,omitempty"` + Status RegisterResponse_ResultType `protobuf:"varint,3,opt,name=status,proto3,enum=convergedcomputing.org.grpc.v1.RegisterResponse_ResultType" json:"status,omitempty"` } func (x *RegisterResponse) Reset() { *x = RegisterResponse{} if protoimpl.UnsafeEnabled { - mi := &file_api_proto_msgTypes[3] + mi := &file_api_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -405,7 +473,7 @@ func (x *RegisterResponse) String() string { func (*RegisterResponse) ProtoMessage() {} func (x *RegisterResponse) ProtoReflect() protoreflect.Message { - mi := &file_api_proto_msgTypes[3] + mi := &file_api_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -418,7 +486,7 @@ func (x *RegisterResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterResponse.ProtoReflect.Descriptor instead. func (*RegisterResponse) Descriptor() ([]byte, []int) { - return file_api_proto_rawDescGZIP(), []int{3} + return file_api_proto_rawDescGZIP(), []int{4} } func (x *RegisterResponse) GetRequestId() string { @@ -428,6 +496,20 @@ func (x *RegisterResponse) GetRequestId() string { return "" } +func (x *RegisterResponse) GetToken() string { + if x != nil { + return x.Token + } + return "" +} + +func (x *RegisterResponse) GetStatus() RegisterResponse_ResultType { + if x != nil { + return x.Status + } + return RegisterResponse_REGISTER_UNSPECIFIED +} + // Submit Job Response type SubmitJobResponse struct { state protoimpl.MessageState @@ -441,7 +523,7 @@ type SubmitJobResponse struct { func (x *SubmitJobResponse) Reset() { *x = SubmitJobResponse{} if protoimpl.UnsafeEnabled { - mi := &file_api_proto_msgTypes[4] + mi := &file_api_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -454,7 +536,7 @@ func (x *SubmitJobResponse) String() string { func (*SubmitJobResponse) ProtoMessage() {} func (x *SubmitJobResponse) ProtoReflect() protoreflect.Message { - mi := &file_api_proto_msgTypes[4] + mi := &file_api_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -467,7 +549,7 @@ func (x *SubmitJobResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use SubmitJobResponse.ProtoReflect.Descriptor instead. func (*SubmitJobResponse) Descriptor() ([]byte, []int) { - return file_api_proto_rawDescGZIP(), []int{4} + return file_api_proto_rawDescGZIP(), []int{5} } func (x *SubmitJobResponse) GetRequestId() string { @@ -514,74 +596,89 @@ var file_api_proto_rawDesc = []byte{ 0x6e, 0x74, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x12, 0x2e, 0x0a, 0x04, 0x73, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, - 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x04, 0x73, 0x65, 0x6e, 0x74, 0x22, 0x87, 0x02, 0x0a, 0x08, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x2d, 0x0a, 0x12, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x5f, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, - 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x11, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x73, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x12, 0x2d, 0x0a, 0x12, 0x70, - 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x69, 0x6e, 0x67, 0x5f, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, - 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, - 0x69, 0x6e, 0x67, 0x44, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x22, 0x59, 0x0a, 0x0a, 0x52, 0x65, - 0x73, 0x75, 0x6c, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1b, 0x0a, 0x17, 0x52, 0x45, 0x53, 0x55, - 0x4c, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, - 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x17, 0x0a, 0x13, 0x52, 0x45, 0x53, 0x55, 0x4c, 0x54, 0x5f, - 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x01, 0x12, 0x15, - 0x0a, 0x11, 0x52, 0x45, 0x53, 0x55, 0x4c, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x52, - 0x52, 0x4f, 0x52, 0x10, 0x02, 0x22, 0x98, 0x01, 0x0a, 0x10, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, - 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, - 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x22, 0x65, 0x0a, 0x0a, 0x52, 0x65, 0x73, - 0x75, 0x6c, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x52, 0x45, 0x47, 0x49, 0x53, - 0x54, 0x45, 0x52, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, - 0x00, 0x12, 0x14, 0x0a, 0x10, 0x52, 0x45, 0x47, 0x49, 0x53, 0x54, 0x45, 0x52, 0x5f, 0x53, 0x55, - 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x01, 0x12, 0x12, 0x0a, 0x0e, 0x52, 0x45, 0x47, 0x49, 0x53, - 0x54, 0x45, 0x52, 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f, 0x52, - 0x45, 0x47, 0x49, 0x53, 0x54, 0x45, 0x52, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x03, - 0x22, 0xa8, 0x01, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x4a, 0x6f, 0x62, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x15, 0x0a, 0x06, 0x6a, 0x6f, 0x62, 0x5f, 0x69, 0x64, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6a, 0x6f, 0x62, 0x49, 0x64, 0x22, 0x5d, 0x0a, 0x0a, - 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x12, 0x53, 0x55, - 0x42, 0x4d, 0x49, 0x54, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, - 0x10, 0x00, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x55, 0x42, 0x4d, 0x49, 0x54, 0x5f, 0x53, 0x55, 0x43, - 0x43, 0x45, 0x53, 0x53, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x53, 0x55, 0x42, 0x4d, 0x49, 0x54, - 0x5f, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x02, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x55, 0x42, 0x4d, - 0x49, 0x54, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x03, 0x32, 0x97, 0x03, 0x0a, 0x07, - 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x65, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, - 0x74, 0x65, 0x72, 0x12, 0x27, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x04, 0x73, 0x65, 0x6e, 0x74, 0x22, 0x6d, 0x0a, 0x0f, 0x52, + 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, + 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, + 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x06, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x12, 0x2e, 0x0a, 0x04, 0x73, 0x65, + 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x52, 0x04, 0x73, 0x65, 0x6e, 0x74, 0x22, 0x87, 0x02, 0x0a, 0x08, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x2d, 0x0a, 0x12, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x5f, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, + 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x11, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x73, 0x50, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x12, 0x2d, 0x0a, 0x12, 0x70, 0x72, + 0x6f, 0x63, 0x65, 0x73, 0x73, 0x69, 0x6e, 0x67, 0x5f, 0x64, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x69, + 0x6e, 0x67, 0x44, 0x65, 0x74, 0x61, 0x69, 0x6c, 0x73, 0x22, 0x59, 0x0a, 0x0a, 0x52, 0x65, 0x73, + 0x75, 0x6c, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1b, 0x0a, 0x17, 0x52, 0x45, 0x53, 0x55, 0x4c, + 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, + 0x45, 0x44, 0x10, 0x00, 0x12, 0x17, 0x0a, 0x13, 0x52, 0x45, 0x53, 0x55, 0x4c, 0x54, 0x5f, 0x54, + 0x59, 0x50, 0x45, 0x5f, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x01, 0x12, 0x15, 0x0a, + 0x11, 0x52, 0x45, 0x53, 0x55, 0x4c, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x52, 0x52, + 0x4f, 0x52, 0x10, 0x02, 0x22, 0x98, 0x02, 0x0a, 0x10, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, + 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x6b, 0x65, + 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x53, + 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x3b, + 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, + 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, + 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x06, 0x73, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x22, 0x7a, 0x0a, 0x0a, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x54, 0x79, 0x70, + 0x65, 0x12, 0x18, 0x0a, 0x14, 0x52, 0x45, 0x47, 0x49, 0x53, 0x54, 0x45, 0x52, 0x5f, 0x55, 0x4e, + 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x52, + 0x45, 0x47, 0x49, 0x53, 0x54, 0x45, 0x52, 0x5f, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, + 0x01, 0x12, 0x12, 0x0a, 0x0e, 0x52, 0x45, 0x47, 0x49, 0x53, 0x54, 0x45, 0x52, 0x5f, 0x45, 0x52, + 0x52, 0x4f, 0x52, 0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f, 0x52, 0x45, 0x47, 0x49, 0x53, 0x54, 0x45, + 0x52, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x03, 0x12, 0x13, 0x0a, 0x0f, 0x52, 0x45, + 0x47, 0x49, 0x53, 0x54, 0x45, 0x52, 0x5f, 0x45, 0x58, 0x49, 0x53, 0x54, 0x53, 0x10, 0x04, 0x22, + 0xa8, 0x01, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x49, 0x64, 0x12, 0x15, 0x0a, 0x06, 0x6a, 0x6f, 0x62, 0x5f, 0x69, 0x64, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6a, 0x6f, 0x62, 0x49, 0x64, 0x22, 0x5d, 0x0a, 0x0a, 0x52, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x12, 0x53, 0x55, 0x42, + 0x4d, 0x49, 0x54, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, + 0x00, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x55, 0x42, 0x4d, 0x49, 0x54, 0x5f, 0x53, 0x55, 0x43, 0x43, + 0x45, 0x53, 0x53, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x53, 0x55, 0x42, 0x4d, 0x49, 0x54, 0x5f, + 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x02, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x55, 0x42, 0x4d, 0x49, + 0x54, 0x5f, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x03, 0x32, 0x9f, 0x03, 0x0a, 0x07, 0x53, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x6d, 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, + 0x65, 0x72, 0x12, 0x2f, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, + 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, + 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x30, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, - 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x30, 0x2e, 0x63, - 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, - 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, - 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x67, - 0x0a, 0x09, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x4a, 0x6f, 0x62, 0x12, 0x27, 0x2e, 0x63, 0x6f, + 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x67, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x4a, + 0x6f, 0x62, 0x12, 0x27, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, + 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, + 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x31, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, - 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x31, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, - 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, - 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x4a, 0x6f, 0x62, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x5b, 0x0a, 0x06, 0x53, 0x65, 0x72, 0x69, 0x61, - 0x6c, 0x12, 0x27, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, + 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x75, 0x62, + 0x6d, 0x69, 0x74, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x5b, + 0x0a, 0x06, 0x53, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x12, 0x27, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, + 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, + 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x28, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, - 0x76, 0x31, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x63, 0x6f, 0x6e, - 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, - 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x5f, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x27, + 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x5f, 0x0a, 0x06, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x27, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, + 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, + 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, - 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, - 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, - 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x2d, 0x63, 0x6f, - 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2f, 0x72, 0x61, 0x69, 0x6e, 0x62, 0x6f, 0x77, 0x2f, - 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x33, 0x5a, 0x31, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x76, 0x65, + 0x72, 0x67, 0x65, 0x64, 0x2d, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2f, 0x72, + 0x61, 0x69, 0x6e, 0x62, 0x6f, 0x77, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, + 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -597,38 +694,41 @@ func file_api_proto_rawDescGZIP() []byte { } var file_api_proto_enumTypes = make([]protoimpl.EnumInfo, 3) -var file_api_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_api_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_api_proto_goTypes = []interface{}{ (Response_ResultType)(0), // 0: convergedcomputing.org.grpc.v1.Response.ResultType (RegisterResponse_ResultType)(0), // 1: convergedcomputing.org.grpc.v1.RegisterResponse.ResultType (SubmitJobResponse_ResultType)(0), // 2: convergedcomputing.org.grpc.v1.SubmitJobResponse.ResultType (*Content)(nil), // 3: convergedcomputing.org.grpc.v1.Content (*Request)(nil), // 4: convergedcomputing.org.grpc.v1.Request - (*Response)(nil), // 5: convergedcomputing.org.grpc.v1.Response - (*RegisterResponse)(nil), // 6: convergedcomputing.org.grpc.v1.RegisterResponse - (*SubmitJobResponse)(nil), // 7: convergedcomputing.org.grpc.v1.SubmitJobResponse - nil, // 8: convergedcomputing.org.grpc.v1.Content.MetadataEntry - (*anypb.Any)(nil), // 9: google.protobuf.Any - (*timestamppb.Timestamp)(nil), // 10: google.protobuf.Timestamp + (*RegisterRequest)(nil), // 5: convergedcomputing.org.grpc.v1.RegisterRequest + (*Response)(nil), // 6: convergedcomputing.org.grpc.v1.Response + (*RegisterResponse)(nil), // 7: convergedcomputing.org.grpc.v1.RegisterResponse + (*SubmitJobResponse)(nil), // 8: convergedcomputing.org.grpc.v1.SubmitJobResponse + nil, // 9: convergedcomputing.org.grpc.v1.Content.MetadataEntry + (*anypb.Any)(nil), // 10: google.protobuf.Any + (*timestamppb.Timestamp)(nil), // 11: google.protobuf.Timestamp } var file_api_proto_depIdxs = []int32{ - 9, // 0: convergedcomputing.org.grpc.v1.Content.data:type_name -> google.protobuf.Any - 8, // 1: convergedcomputing.org.grpc.v1.Content.metadata:type_name -> convergedcomputing.org.grpc.v1.Content.MetadataEntry + 10, // 0: convergedcomputing.org.grpc.v1.Content.data:type_name -> google.protobuf.Any + 9, // 1: convergedcomputing.org.grpc.v1.Content.metadata:type_name -> convergedcomputing.org.grpc.v1.Content.MetadataEntry 3, // 2: convergedcomputing.org.grpc.v1.Request.content:type_name -> convergedcomputing.org.grpc.v1.Content - 10, // 3: convergedcomputing.org.grpc.v1.Request.sent:type_name -> google.protobuf.Timestamp - 4, // 4: convergedcomputing.org.grpc.v1.Service.Register:input_type -> convergedcomputing.org.grpc.v1.Request - 4, // 5: convergedcomputing.org.grpc.v1.Service.SubmitJob:input_type -> convergedcomputing.org.grpc.v1.Request - 4, // 6: convergedcomputing.org.grpc.v1.Service.Serial:input_type -> convergedcomputing.org.grpc.v1.Request - 4, // 7: convergedcomputing.org.grpc.v1.Service.Stream:input_type -> convergedcomputing.org.grpc.v1.Request - 6, // 8: convergedcomputing.org.grpc.v1.Service.Register:output_type -> convergedcomputing.org.grpc.v1.RegisterResponse - 7, // 9: convergedcomputing.org.grpc.v1.Service.SubmitJob:output_type -> convergedcomputing.org.grpc.v1.SubmitJobResponse - 5, // 10: convergedcomputing.org.grpc.v1.Service.Serial:output_type -> convergedcomputing.org.grpc.v1.Response - 5, // 11: convergedcomputing.org.grpc.v1.Service.Stream:output_type -> convergedcomputing.org.grpc.v1.Response - 8, // [8:12] is the sub-list for method output_type - 4, // [4:8] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 11, // 3: convergedcomputing.org.grpc.v1.Request.sent:type_name -> google.protobuf.Timestamp + 11, // 4: convergedcomputing.org.grpc.v1.RegisterRequest.sent:type_name -> google.protobuf.Timestamp + 1, // 5: convergedcomputing.org.grpc.v1.RegisterResponse.status:type_name -> convergedcomputing.org.grpc.v1.RegisterResponse.ResultType + 5, // 6: convergedcomputing.org.grpc.v1.Service.Register:input_type -> convergedcomputing.org.grpc.v1.RegisterRequest + 4, // 7: convergedcomputing.org.grpc.v1.Service.SubmitJob:input_type -> convergedcomputing.org.grpc.v1.Request + 4, // 8: convergedcomputing.org.grpc.v1.Service.Serial:input_type -> convergedcomputing.org.grpc.v1.Request + 4, // 9: convergedcomputing.org.grpc.v1.Service.Stream:input_type -> convergedcomputing.org.grpc.v1.Request + 7, // 10: convergedcomputing.org.grpc.v1.Service.Register:output_type -> convergedcomputing.org.grpc.v1.RegisterResponse + 8, // 11: convergedcomputing.org.grpc.v1.Service.SubmitJob:output_type -> convergedcomputing.org.grpc.v1.SubmitJobResponse + 6, // 12: convergedcomputing.org.grpc.v1.Service.Serial:output_type -> convergedcomputing.org.grpc.v1.Response + 6, // 13: convergedcomputing.org.grpc.v1.Service.Stream:output_type -> convergedcomputing.org.grpc.v1.Response + 10, // [10:14] is the sub-list for method output_type + 6, // [6:10] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_api_proto_init() } @@ -662,7 +762,7 @@ func file_api_proto_init() { } } file_api_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Response); i { + switch v := v.(*RegisterRequest); i { case 0: return &v.state case 1: @@ -674,7 +774,7 @@ func file_api_proto_init() { } } file_api_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*RegisterResponse); i { + switch v := v.(*Response); i { case 0: return &v.state case 1: @@ -686,6 +786,18 @@ func file_api_proto_init() { } } file_api_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RegisterResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_api_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SubmitJobResponse); i { case 0: return &v.state @@ -704,7 +816,7 @@ func file_api_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_api_proto_rawDesc, NumEnums: 3, - NumMessages: 6, + NumMessages: 7, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/api/v1/api_grpc.pb.go b/pkg/api/v1/api_grpc.pb.go index 88756e7..ddcfe54 100644 --- a/pkg/api/v1/api_grpc.pb.go +++ b/pkg/api/v1/api_grpc.pb.go @@ -23,7 +23,7 @@ const _ = grpc.SupportPackageIsVersion7 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type ServiceClient interface { // Register cluster - request to register a new cluster - Register(ctx context.Context, in *Request, opts ...grpc.CallOption) (*RegisterResponse, error) + Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*RegisterResponse, error) // Job Submission - request for submitting a job to a named cluster SubmitJob(ctx context.Context, in *Request, opts ...grpc.CallOption) (*SubmitJobResponse, error) // TESTING ENDPOINTS @@ -42,7 +42,7 @@ func NewServiceClient(cc grpc.ClientConnInterface) ServiceClient { return &serviceClient{cc} } -func (c *serviceClient) Register(ctx context.Context, in *Request, opts ...grpc.CallOption) (*RegisterResponse, error) { +func (c *serviceClient) Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*RegisterResponse, error) { out := new(RegisterResponse) err := c.cc.Invoke(ctx, "/convergedcomputing.org.grpc.v1.Service/Register", in, out, opts...) if err != nil { @@ -105,7 +105,7 @@ func (x *serviceStreamClient) Recv() (*Response, error) { // for forward compatibility type ServiceServer interface { // Register cluster - request to register a new cluster - Register(context.Context, *Request) (*RegisterResponse, error) + Register(context.Context, *RegisterRequest) (*RegisterResponse, error) // Job Submission - request for submitting a job to a named cluster SubmitJob(context.Context, *Request) (*SubmitJobResponse, error) // TESTING ENDPOINTS @@ -121,7 +121,7 @@ type ServiceServer interface { type UnimplementedServiceServer struct { } -func (UnimplementedServiceServer) Register(context.Context, *Request) (*RegisterResponse, error) { +func (UnimplementedServiceServer) Register(context.Context, *RegisterRequest) (*RegisterResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Register not implemented") } func (UnimplementedServiceServer) SubmitJob(context.Context, *Request) (*SubmitJobResponse, error) { @@ -147,7 +147,7 @@ func RegisterServiceServer(s grpc.ServiceRegistrar, srv ServiceServer) { } func _Service_Register_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(Request) + in := new(RegisterRequest) if err := dec(in); err != nil { return nil, err } @@ -159,7 +159,7 @@ func _Service_Register_Handler(srv interface{}, ctx context.Context, dec func(in FullMethod: "/convergedcomputing.org.grpc.v1.Service/Register", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ServiceServer).Register(ctx, req.(*Request)) + return srv.(ServiceServer).Register(ctx, req.(*RegisterRequest)) } return interceptor(ctx, in, info, handler) } diff --git a/pkg/client/client.go b/pkg/client/client.go index 5f76c54..f658a39 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -12,30 +12,36 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -var _ Client = (*SimpleClient)(nil) +// RainbowClient is our instantiation of Client +type RainbowClient struct { + host string + connection *grpc.ClientConn + service pb.ServiceClient +} +var _ Client = (*RainbowClient)(nil) + +// Client interface defines functions required for a valid client type Client interface { Serial(ctx context.Context, message string) (string, error) Stream(ctx context.Context, it provider.MessageIterator) error - Register(ctx context.Context, clusterName, secret string) (string, error) + Register(ctx context.Context, clusterName, secret string) (*pb.RegisterResponse, error) } -func NewClient(target string) (Client, error) { - if target == "" { - return nil, errors.New("target is required") +// NewClient creates a new RainbowClient +func NewClient(host string) (Client, error) { + if host == "" { + return nil, errors.New("host is required") } - log.Printf("starting client (%s)...", target) - - c := &SimpleClient{ - target: target, - } + log.Printf("๐ŸŒˆ๏ธ starting client (%s)...", host) + c := &RainbowClient{host: host} // Set up a connection to the server. creds := grpc.WithTransportCredentials(insecure.NewCredentials()) - conn, err := grpc.Dial(c.GetTarget(), creds, grpc.WithBlock()) + conn, err := grpc.Dial(c.GetHost(), creds, grpc.WithBlock()) if err != nil { - return nil, errors.Wrapf(err, "unable to connect to %s", target) + return nil, errors.Wrapf(err, "unable to connect to %s", host) } c.connection = conn @@ -44,24 +50,20 @@ func NewClient(target string) (Client, error) { return c, nil } -type SimpleClient struct { - target string - connection *grpc.ClientConn - service pb.ServiceClient -} - // Close closes the created resources (e.g. connection). -func (c *SimpleClient) Close() error { +func (c *RainbowClient) Close() error { if c.connection != nil { return c.connection.Close() } return nil } -func (c *SimpleClient) Connected() bool { +// Connected returns true if we are connected and the connection is ready +func (c *RainbowClient) Connected() bool { return c.service != nil && c.connection != nil && c.connection.GetState() == connectivity.Ready } -func (c *SimpleClient) GetTarget() string { - return c.target +// GetHost returns the private hostn name +func (c *RainbowClient) GetHost() string { + return c.host } diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go deleted file mode 100644 index a94c64b..0000000 --- a/pkg/client/client_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package client - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestClient(t *testing.T) { - t.Run("new client sans address", func(t *testing.T) { - _, err := NewClient("") - assert.Error(t, err) - }) - - c := &SimpleClient{} - - t.Run("close client", func(t *testing.T) { - err := c.Close() - assert.NoError(t, err) - assert.Empty(t, c.GetTarget()) - }) - - t.Run("serial sans message", func(t *testing.T) { - _, err := c.Serial(context.TODO(), "") - assert.Error(t, err) - }) - - t.Run("stream sans iterator", func(t *testing.T) { - err := c.Stream(context.TODO(), nil) - assert.Error(t, err) - }) -} diff --git a/pkg/client/endpoint.go b/pkg/client/endpoint.go index 6bc71aa..6890077 100644 --- a/pkg/client/endpoint.go +++ b/pkg/client/endpoint.go @@ -2,55 +2,49 @@ package client import ( "context" - "fmt" "time" pb "github.com/converged-computing/rainbow/pkg/api/v1" - "github.com/google/uuid" "github.com/pkg/errors" - "google.golang.org/protobuf/types/known/anypb" ts "google.golang.org/protobuf/types/known/timestamppb" - wrapper "google.golang.org/protobuf/types/known/wrapperspb" ) +type RegisterRequest struct { + Name string + Secret string +} + // Register makes a request to register a new cluster -func (c *SimpleClient) Register( +func (c *RainbowClient) Register( ctx context.Context, clusterName string, secret string, -) (string, error) { +) (*pb.RegisterResponse, error) { + + response := &pb.RegisterResponse{} // TODO add secret requirement when server has database if clusterName == "" { - return "", errors.New("message is required") + return response, errors.New("message is required") } if !c.Connected() { - return "", errors.New("client is not connected") + return response, errors.New("client is not connected") } // Contact the server and print out its response. ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - // create cluster name with wrapper - a, err := anypb.New(wrapper.String(clusterName)) - if err != nil { - return "", errors.Wrap(err, "unable to create message") - } - // Hit the register endpoint - r, err := c.service.Register(ctx, &pb.Request{ - Content: &pb.Content{ - Id: uuid.New().String(), - Data: a, - }, - Sent: ts.Now(), + response, err := c.service.Register(ctx, &pb.RegisterRequest{ + Name: clusterName, + Secret: secret, + Sent: ts.Now(), }) // For now we blindly accept all register, it's a fake endpoint if err != nil { - return "register failed", errors.Wrap(err, "could not register cluster") + return response, errors.Wrap(err, "could not register cluster") } - fmt.Println(r) - return "register success", nil + return response, nil } diff --git a/pkg/client/serial.go b/pkg/client/serial.go index b54a44b..42c949c 100644 --- a/pkg/client/serial.go +++ b/pkg/client/serial.go @@ -13,7 +13,7 @@ import ( ) // Scalar sends a message to the server and returns the response. -func (c *SimpleClient) Serial(ctx context.Context, message string) (string, error) { +func (c *RainbowClient) Serial(ctx context.Context, message string) (string, error) { if message == "" { return "", errors.New("message is required") } diff --git a/pkg/client/stream.go b/pkg/client/stream.go index 313c285..8bd32c3 100644 --- a/pkg/client/stream.go +++ b/pkg/client/stream.go @@ -14,7 +14,7 @@ import ( ) // Stream sends a message to the server and receives the response. -func (c *SimpleClient) Stream(ctx context.Context, it provider.MessageIterator) error { +func (c *RainbowClient) Stream(ctx context.Context, it provider.MessageIterator) error { if it == nil { return errors.New("message provider is required") } diff --git a/pkg/provider/message_test.go b/pkg/provider/message_test.go deleted file mode 100644 index a22885f..0000000 --- a/pkg/provider/message_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package provider - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestMessageIterator(t *testing.T) { - it := &CountedMessageProvider{} - assert.NotNil(t, it) - - // Test the iterator - p := "" - c := 0 - for it.HasNext() { - m := it.Next() - assert.NotEmpty(t, m) - assert.NotEqual(t, p, m) - p = m - c++ - if c > 10 { - break - } - } -} diff --git a/pkg/server/database.go b/pkg/server/database.go new file mode 100644 index 0000000..b815723 --- /dev/null +++ b/pkg/server/database.go @@ -0,0 +1,162 @@ +package server + +import ( + "fmt" + + pb "github.com/converged-computing/rainbow/pkg/api/v1" + "github.com/google/uuid" + _ "github.com/mattn/go-sqlite3" + + "database/sql" + "log" + "os" +) + +type Database struct { + filepath string +} + +// cleanup removes the filepath +func (db *Database) cleanup() { + // Delete a previous database that exists + // Note that in the future we might not want to do this + log.Printf("๐Ÿงน๏ธ cleaning up %s...", db.filepath) + os.Remove(db.filepath) +} + +// create the database +func (db *Database) create() error { + + log.Printf("โœจ๏ธ creating %s...", db.filepath) + + // Create SQLite file (ensures that we can!) + file, err := os.Create(db.filepath) + if err != nil { + return err + } + file.Close() + log.Printf(" %s file created", db.filepath) + + // Open the created SQLite File (to test) + conn, err := db.connect() + defer conn.Close() + return err +} + +// Connect to the database - the caller is responsible for closing +func (db *Database) connect() (*sql.DB, error) { + conn, err := sql.Open("sqlite3", db.filepath) + if err != nil { + return nil, err + } + return conn, err +} + +// RegisterCluster registers a cluster or returns another status +// REGISTER_SUCCESS = 1; +// REGISTER_ERROR = 2; +// REGISTER_DENIED = 3; +// REGISTER_EXISTS = 4; +func (db *Database) RegisterCluster(name string) (pb.RegisterResponse_ResultType, string, error) { + + // Connect! + conn, err := db.connect() + if err != nil { + return 0, "", err + } + defer conn.Close() + + // First determine if it exists + query := fmt.Sprintf("SELECT count(*) from clusters WHERE name LIKE \"%s\"", name) + result, err := conn.Exec(query) + count, err := result.RowsAffected() + + // Debugging extra for now + fmt.Printf("%s: (%d)\n", query, count) + + // Case 1: already exists + if count > 0 { + return 4, "", nil + } + + // Generate a "secret" token, lol + token := uuid.New().String() + query = fmt.Sprintf("INSERT into clusters VALUES (\"%s\", \"%s\")", name, token) + result, err = conn.Exec(query) + if err != nil { + return 2, "", err + } + count, err = result.RowsAffected() + fmt.Printf("%s: (%d)\n", query, count) + + // REGISTER_SUCCESS + if count > 0 { + return 1, token, nil + } + + // REGISTER_ERROR + return 2, "", err +} + +// create the database +func (db *Database) createTables() error { + + conn, err := db.connect() + if err != nil { + return err + } + defer conn.Close() + + // Create the clusters table, where we store the name and secret + // obviously the secret should not be stored in plain text - it's fine for now + createClusterTableSQL := ` + CREATE TABLE clusters ( + name TEXT NOT NULL PRIMARY KEY, + secret TEXT + );` + + createJobsTableSQL := ` + CREATE TABLE jobs ( + idJob integer NOT NULL PRIMARY KEY AUTOINCREMENT, + jobspec TEXT, + cluster TEXT, + FOREIGN KEY(cluster) REFERENCES clusters(name) + );` + + for table, statement := range map[string]string{"cluster": createClusterTableSQL, "jobs": createJobsTableSQL} { + log.Printf(" create %s table...\n", table) + query, err := conn.Prepare(statement) // Prepare SQL Statement + if err != nil { + return err + } + // Execute SQL query + _, err = query.Exec() + if err != nil { + return err + } + log.Printf(" %s table created\n", table) + } + return nil +} + +func initDatabase(filepath string, cleanup bool) (*Database, error) { + + // Create a new database (todo, add cleanupc check) + db := Database{filepath: filepath} + + if cleanup { + db.cleanup() + } + + // Create the database + err := db.create() + if err != nil { + return nil, err + } + + if err != nil { + return nil, err + } + err = db.createTables() + return &db, err +} diff --git a/pkg/server/endpoint.go b/pkg/server/endpoint.go index e60577f..cfc5471 100644 --- a/pkg/server/endpoint.go +++ b/pkg/server/endpoint.go @@ -10,16 +10,25 @@ import ( ) // Register a new cluster with the server -func (s *Server) Register(_ context.Context, in *pb.Request) (*pb.RegisterResponse, error) { +func (s *Server) Register(_ context.Context, in *pb.RegisterRequest) (*pb.RegisterResponse, error) { if in == nil { return nil, errors.New("request is required") } - c := in.GetContent() - log.Printf("received register: %v", c.GetData()) + // Validate the secret + if in.Secret == "" || (in.Secret != s.secret) { + return nil, errors.New("request denied") + } + + // Convert data to string, should be the cluster name + // TODO make this better, I'm sure there is a better way + log.Printf("๐Ÿ“๏ธ received register: %s", in.Name) + + status, token, err := s.db.RegisterCluster(in.Name) return &pb.RegisterResponse{ - RequestId: c.GetId(), - }, nil + Status: status, + Token: token, + }, err } // TEST ENDPOINTS ------------------------------------------------ diff --git a/pkg/server/endpoint_test.go b/pkg/server/endpoint_test.go deleted file mode 100644 index e3addc3..0000000 --- a/pkg/server/endpoint_test.go +++ /dev/null @@ -1,99 +0,0 @@ -package server - -import ( - "context" - "log" - "testing" - "time" - - pb "github.com/converged-computing/rainbow/pkg/api/v1" - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "google.golang.org/grpc/test/bufconn" - anypb "google.golang.org/protobuf/types/known/anypb" - tspb "google.golang.org/protobuf/types/known/timestamppb" - wrbp "google.golang.org/protobuf/types/known/wrapperspb" -) - -var ( - maxTestRunDuration = 180 * time.Second // 3 minutes -) - -func TestScalar(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), maxTestRunDuration) - defer cancel() - - s := startTestServer(ctx, t) - assert.NotNil(t, s) - defer s.Stop() - - t.Run("scalar sans args", func(t *testing.T) { - _, err := NewServer("", "", "") - assert.Error(t, err) - _, err = NewServer("test", "", "") - assert.Error(t, err) - _, err = NewServer("test", "test", "") - assert.Error(t, err) - _, err = NewServer("test", "", "test") - assert.Error(t, err) - _, err = NewServer("", "", "test") - assert.Error(t, err) - }) - - t.Run("scalar sans args", func(t *testing.T) { - if _, err := s.Serial(ctx, nil); err == nil { - t.Fatalf("expected error on scalar without args") - } - }) - - t.Run("scalar with args", func(t *testing.T) { - data, err := anypb.New(wrbp.String("test")) - if err != nil { - t.Fatalf("error creating data: %v", err) - } - - req := &pb.Request{ - Content: &pb.Content{ - Id: uuid.New().String(), - Data: data, - }, - Sent: tspb.Now(), - } - - // Scalar example - res, err := s.Serial(ctx, req) - if err != nil { - t.Fatalf("error on scalar: %v", err) - } - - assert.NotEmpty(t, res.GetRequestId()) - assert.Greater(t, res.GetMessageCount(), int64(0)) - assert.Equal(t, res.GetMessagesProcessed(), res.GetMessageCount()) - assert.Equal(t, success, res.GetProcessingDetails()) - }) - - t.Run("stream sans args", func(t *testing.T) { - if err := s.Stream(nil); err == nil { - t.Fatalf("expected error on stream without args") - } - }) -} - -func startTestServer(ctx context.Context, t *testing.T) *Server { - buf := 101024 * 1024 - lis := bufconn.Listen(buf) - defer lis.Close() - - s, err := NewServer("test-server", "v0.0.0-test", "test") - if err != nil { - t.Fatalf("error while creating server: %v", err) - } - - go func() { - if err := s.serve(ctx, lis); err != nil && err.Error() != "closed" { - log.Printf("error on server: %v", err) - } - }() - - return s -} diff --git a/pkg/server/server.go b/pkg/server/server.go index ac3725e..db9c02d 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -18,22 +18,43 @@ const ( success = "processed successfully" ) +var ( + defaultEnv = "development" + defaultName = "rainbow" +) + // NewServer creates a new "scheduler" server // The scheduler server registers clusters and then accepts jobs -func NewServer(name, version, environment string) (*Server, error) { - if name == "" { - return nil, errors.New("name is required") +func NewServer( + name, version, environment, sqliteFile string, + cleanup bool, + secret string, +) (*Server, error) { + + if secret == "" { + return nil, errors.New("secret is required") } if version == "" { return nil, errors.New("version is required") } + if name == "" { + name = defaultName + } if environment == "" { - return nil, errors.New("environment is required") + environment = defaultEnv + } + + // init the database, creating jobs and clusters tables + db, err := initDatabase(sqliteFile, cleanup) + if err != nil { + log.Fatal(err) } return &Server{ + db: db, name: name, version: version, + secret: secret, environment: environment, }, nil } @@ -41,16 +62,20 @@ func NewServer(name, version, environment string) (*Server, error) { // Server is used to implement your Service. type Server struct { pb.UnimplementedServiceServer - server *grpc.Server - listener net.Listener - counter atomic.Uint64 // counter for messages - name string // server name - version string // server version - environment string // server environment + server *grpc.Server + listener net.Listener + + // counter will be for job ids + counter atomic.Uint64 + name string + version string + environment string + secret string + db *Database } func (s *Server) String() string { - return fmt.Sprintf("%s (%s) v%s", s.name, s.environment, s.version) + return fmt.Sprintf("%s v%s", s.name, s.version) } func (s *Server) GetCounter() int64 { @@ -65,10 +90,6 @@ func (s *Server) GetVersion() string { return s.version } -func (s *Server) GetEnvironment() string { - return s.environment -} - func (s *Server) Stop() { log.Printf("stopping server: %s", s.String()) if s.listener != nil {