|
| 1 | +package main |
| 2 | + |
| 3 | +import ( |
| 4 | + "fmt" |
| 5 | + "net/rpc" |
| 6 | + "os" |
| 7 | + "flag" |
| 8 | + "encoding/json" |
| 9 | + "bufio" |
| 10 | + "io" |
| 11 | + "strconv" |
| 12 | + "regexp" |
| 13 | + "strings" |
| 14 | + "MP1/util" |
| 15 | +) |
| 16 | + |
| 17 | +//struct for the configuration file's datatype encoding format decoder - see description in main() |
| 18 | +type JSONConfigEntry struct { |
| 19 | + ServerHostname string |
| 20 | + ServerPortNumber int |
| 21 | +} |
| 22 | + |
| 23 | +//struct carrying the grep result together with the metainformation used to perform a RPC to do grep on a remote machine |
| 24 | +type RemoteQueryResult struct { |
| 25 | + ResultString string |
| 26 | + SourceServerQuerySpecification RemoteQuerySpecification |
| 27 | +} |
| 28 | + |
| 29 | +//struct with information needed to perform a RPC to do grep on a remote machine |
| 30 | +type RemoteQuerySpecification struct { |
| 31 | + RemoteServerAndPortName string |
| 32 | + RemoteVMNumber int |
| 33 | + ClientSentArguments util.ServerQueryArguments |
| 34 | +} |
| 35 | + |
| 36 | +//The thread spawned for each server, which performs a blocking RPC |
| 37 | +//There is no need for an explict timeout for two reasons: One, query time is variable conditional on pattern match freq |
| 38 | +//and file size and with no possible upper bound. Second, if a machine fails, net/RPC returns a error through the RPC |
| 39 | +//Call function, so we will not block, which means this is sufficient under the fail-stop regime assumed by the MP |
| 40 | +func query_goroutine(ThisServerQuery RemoteQuerySpecification, OutputQueue chan<- RemoteQueryResult, LogFilePrefix string) { |
| 41 | + //Open a connection to the remote server assigned to this goroutine |
| 42 | + |
| 43 | + RPCClient, RPCClientErr := rpc.DialHTTP("tcp", ThisServerQuery.RemoteServerAndPortName) |
| 44 | + //Return a null result if the RPC server cannot be contacted along with this request's metainformation |
| 45 | + if RPCClientErr != nil { |
| 46 | + fmt.Printf("Opening RPC Connection Failed! Error: %s, Server: %#v\n", RPCClientErr.Error(), |
| 47 | + ThisServerQuery) |
| 48 | + OutputQueue <- RemoteQueryResult{"", ThisServerQuery} |
| 49 | + return |
| 50 | + } |
| 51 | + |
| 52 | + //Send the grep pattern to the server |
| 53 | + ThisServerQuery.ClientSentArguments.LogFilePrefix = LogFilePrefix |
| 54 | + QueryServerCommand := ThisServerQuery.ClientSentArguments |
| 55 | + fmt.Printf("Querying Server: %#v, Command: %#v\n", ThisServerQuery, QueryServerCommand) |
| 56 | + |
| 57 | + //Allocate storage for the RPC return value and perform the RPC call |
| 58 | + var RPCResult string |
| 59 | + RPCCallErr := RPCClient.Call("Grepper.Grep", QueryServerCommand, &RPCResult) |
| 60 | + //Return a null result if the RPC server fails after the RPC call request is sent along with this request's |
| 61 | + //metainformation |
| 62 | + if RPCCallErr != nil { |
| 63 | + fmt.Printf("Getting RPC Response Failed! Error: %s, Server: %#v\n", RPCCallErr.Error(), ThisServerQuery) |
| 64 | + OutputQueue <- RemoteQueryResult{"", ThisServerQuery} |
| 65 | + return |
| 66 | + } |
| 67 | + |
| 68 | + //Clean up resources by closing the RPCClient object - even if we fail we should still return the RPC call's result |
| 69 | + //as it has succeeded at this point |
| 70 | + RPCClientCloseErr := RPCClient.Close() |
| 71 | + if RPCClientCloseErr != nil { |
| 72 | + fmt.Printf("Closing RPC Client Failed! Error: %s, Server: %#v\n", RPCClientCloseErr.Error(), |
| 73 | + ThisServerQuery) |
| 74 | + } |
| 75 | + |
| 76 | + //Return the remote Grep result along with this request's metainformation |
| 77 | + OutputQueue <- RemoteQueryResult{RPCResult, ThisServerQuery} |
| 78 | +} |
| 79 | + |
| 80 | +//Write result string to target output file |
| 81 | +func write(result RemoteQueryResult) { |
| 82 | + f, err := os.Create(os.Getenv("GOPATH") + "/src/MP1/resource/out" + strconv.Itoa(result.SourceServerQuerySpecification.RemoteVMNumber) + ".log") |
| 83 | + if err != nil { |
| 84 | + panic(err) |
| 85 | + } |
| 86 | + f.Write([]byte(strconv.Itoa(result.SourceServerQuerySpecification.RemoteVMNumber) + "\n")) |
| 87 | + f.Write([]byte(result.ResultString)) |
| 88 | +} |
| 89 | + |
| 90 | +//Launch one goroutine for each possible remote server to perform the grep query on the distributed logs |
| 91 | +func query(ServerList [] RemoteQuerySpecification, LogFilePrefix string) []RemoteQueryResult { |
| 92 | + //Use a buffered channel as a FIFO queue, in case goroutines return quicker than usual due to errors |
| 93 | + //Each goroutine will return one and only RemoteQueryResult on the channel, so the needed capacity is known upon |
| 94 | + //entry to this function |
| 95 | + OutputQueue := make(chan RemoteQueryResult, len(ServerList)) |
| 96 | + |
| 97 | + //Create a slice of strings with each server's returned output and query metainformation - this is for ease of unit |
| 98 | + //testing |
| 99 | + var OutputsCombined [] RemoteQueryResult |
| 100 | + |
| 101 | + //Launch one goroutine for each possible remote server - we are using blocking RPCs so threading is needed for |
| 102 | + //concurrency |
| 103 | + for _, ServerQuery := range ServerList { |
| 104 | + //fmt.Printf("Launching %#v\n", ServerQuery) |
| 105 | + go query_goroutine(ServerQuery, OutputQueue, LogFilePrefix) |
| 106 | + } |
| 107 | + |
| 108 | + TotalNumLinesReceived := 0 |
| 109 | + NumQueriesReceived := 0 |
| 110 | + //Receive the results of the remote queries on the main thread |
| 111 | + for NumQueriesReceived < len(ServerList) { |
| 112 | + RemoteQueryAnswer := <-OutputQueue |
| 113 | + write(RemoteQueryAnswer) |
| 114 | + //Count the number of lines received for this query |
| 115 | + NumLinesReceivedThisQuery := strings.Count(RemoteQueryAnswer.ResultString, "\n") |
| 116 | + fmt.Printf("Result for Remote Query %#v\n %s\n Line Count for This Result: %d\n", |
| 117 | + RemoteQueryAnswer.SourceServerQuerySpecification, RemoteQueryAnswer.ResultString, NumLinesReceivedThisQuery) |
| 118 | + |
| 119 | + //Keep track of the number of server we have heard from |
| 120 | + NumQueriesReceived++ |
| 121 | + |
| 122 | + //Count the number of lines received for this query in total from all the servers we have heard from up to now |
| 123 | + TotalNumLinesReceived += NumLinesReceivedThisQuery |
| 124 | + |
| 125 | + //Add this server's returned output and query metainformation to OutputsCombined |
| 126 | + OutputsCombined = append(OutputsCombined, RemoteQueryAnswer) |
| 127 | + } |
| 128 | + |
| 129 | + //Print the total line count as required by the MP |
| 130 | + fmt.Printf("Total Line Count: %d\n", TotalNumLinesReceived) |
| 131 | + |
| 132 | + return OutputsCombined |
| 133 | +} |
| 134 | + |
| 135 | +//Parse the server configuration file |
| 136 | +func parse_json(JSONConfigFilefd *os.File, GrepPattern string) []RemoteQuerySpecification { |
| 137 | + //Allow for a variable number of servers to send grep request to |
| 138 | + var ServerList []RemoteQuerySpecification |
| 139 | + |
| 140 | + //See below |
| 141 | + VMNumberRegex, RegexCompileErr := regexp.Compile( |
| 142 | + "fa17-cs425-g24-(?P<VMNumberGroup>[0-9]{2})\\.cs\\.illinois\\.edu") |
| 143 | + if RegexCompileErr != nil { |
| 144 | + fmt.Printf("Regex Compile Failed! Error: %s\n", RegexCompileErr.Error()) |
| 145 | + return ServerList |
| 146 | + } |
| 147 | + |
| 148 | + //We use a quasi-JSON format specified in main() |
| 149 | + JSONParser := json.NewDecoder(bufio.NewReader(JSONConfigFilefd)) |
| 150 | + for { |
| 151 | + var ServerEntry JSONConfigEntry |
| 152 | + JSONDecoderErr := JSONParser.Decode(&ServerEntry) |
| 153 | + if JSONDecoderErr == io.EOF { //We have hit the last entry of the configuration file and thus we should exit |
| 154 | + // this loop |
| 155 | + fmt.Printf("JSON Configuration Decoding Completed!\n") |
| 156 | + break |
| 157 | + } else if JSONDecoderErr != nil { |
| 158 | + //If the current entry is invalid, either because the fields are wrong |
| 159 | + //or the data in the fields are invalid, simply skip the current entry and treat them as if the |
| 160 | + //corresponding |
| 161 | + //servers are down, since there could still be other valid entries in the configuration file |
| 162 | + fmt.Printf("JSON Configuration Entry Decoding Failed! Entry Ignored, Error: %s, Entry: %#v\n", |
| 163 | + JSONDecoderErr.Error(), ServerEntry) |
| 164 | + continue |
| 165 | + } else if ServerEntry.ServerPortNumber < 0 || ServerEntry.ServerPortNumber > 65535 { //port numbers are 16-bit |
| 166 | + fmt.Printf( |
| 167 | + "JSON Configuration Entry Decoding Failed! Entry Ignored, Port Number Out of Range! Entry: %#v\n", |
| 168 | + ServerEntry) |
| 169 | + continue |
| 170 | + } |
| 171 | + |
| 172 | + //net.DialTimeout requires a hostname string in the form hostname:portnumber, so create such a string from the |
| 173 | + //config file read data |
| 174 | + ServerAndPortName := ServerEntry.ServerHostname + ":" + strconv.Itoa(ServerEntry.ServerPortNumber) |
| 175 | + |
| 176 | + //we need to get the virtual machine number for (see VMNumberGroup above) so that we can grep the correct |
| 177 | + //local logfile |
| 178 | + //as specified by the Piazza post for MP1 demo |
| 179 | + VMNumberString := VMNumberRegex.FindStringSubmatch(ServerEntry.ServerHostname) |
| 180 | + |
| 181 | + //If the regex does not match, we will get zero matching groups - so the data for this current entry is bad |
| 182 | + //If the regex does match, we will get two groups. The first match is unnamed and will be the whole string, |
| 183 | + //while the second group will be named VMNumberGroup and will contain the two digit virtual machine number |
| 184 | + if len(VMNumberString) != 2 { |
| 185 | + fmt.Printf( |
| 186 | + "JSON Configuration Entry Decoding Failed! Entry Ignored, Malformed Hostname! Entry: %#v\n", |
| 187 | + ServerEntry) |
| 188 | + continue |
| 189 | + } |
| 190 | + |
| 191 | + //The match from the regex call is a string and needs conversion |
| 192 | + VMNumber, VMNumberStringConvertErr := strconv.Atoi(VMNumberString[1]) |
| 193 | + if VMNumberStringConvertErr != nil { |
| 194 | + fmt.Printf( |
| 195 | + "JSON Configuration Entry Decoding Failed! Entry Ignored,"+ |
| 196 | + " VM Number to Int Conversion Error: %s, Entry: %#v\n", VMNumberStringConvertErr.Error(), |
| 197 | + ServerEntry) |
| 198 | + continue |
| 199 | + } |
| 200 | + |
| 201 | + //We don't to need check if the virtual machine number is negative since that can't match the regex that we |
| 202 | + //used, |
| 203 | + //while since we get the virtual machine number from the hostname, all other virtual machine numbers are |
| 204 | + //potentially possible |
| 205 | + //Regardless, the goroutine that does the actual RPC will gracefully fail if the virtual machine does not |
| 206 | + //actually exist. So we don't need extra error checking here. |
| 207 | + //We allow for potentially 100 servers with the above regex |
| 208 | + ServerSpec := RemoteQuerySpecification{ServerAndPortName, VMNumber, |
| 209 | + util.ServerQueryArguments{GrepPattern, ""}} |
| 210 | + ServerList = append(ServerList, ServerSpec) |
| 211 | + } |
| 212 | + |
| 213 | + return ServerList |
| 214 | +} |
| 215 | + |
| 216 | +func main() { |
| 217 | + //Two required arguments - default values are ok since the empty string is a valid grep pattern while the empty string |
| 218 | + //will be caught by the file validation logic below as an non-existent file |
| 219 | + //The config file format is as follows: Any number of lines in the following format: { |
| 220 | + // {"ServerHostname": |
| 221 | + // "fa17-cs425-g24-<Virtual Machine ID with leading zero from 01 to 10, inclusive>.cs.illinois.edu", |
| 222 | + // "ServerPortNumber": <any valid port number> } |
| 223 | + JSONConfigFilePath := flag.String("json_config_path", "", "Path to JSON Configuration File") |
| 224 | + GrepPatternString := flag.String("grep_pattern", "", "Pattern to Grep for") |
| 225 | + |
| 226 | + flag.Parse() |
| 227 | + |
| 228 | + // Check if the configuration file exists and is not a directory |
| 229 | + JSONConfigFileInfo, OSLstatErr := os.Lstat(*JSONConfigFilePath) |
| 230 | + if OSLstatErr != nil { |
| 231 | + fmt.Printf("Determining Type of File at JSON Configuration File Path Failed! Error: %s, Path: %s\n", |
| 232 | + OSLstatErr.Error(), *JSONConfigFilePath) |
| 233 | + return |
| 234 | + } else if JSONConfigFileMode := JSONConfigFileInfo.Mode(); JSONConfigFileMode.IsDir() { |
| 235 | + fmt.Printf("JSON Configuration File Path is a Directory! Path: %s\n", *JSONConfigFilePath) |
| 236 | + return |
| 237 | + } |
| 238 | + |
| 239 | + //Open the configuration file and pass it to the JSON decoding function |
| 240 | + JSONConfigFilefd, OSOpenErr := os.Open(*JSONConfigFilePath) |
| 241 | + if OSOpenErr != nil { |
| 242 | + fmt.Printf("Opening JSON Configuration File Failed! Error: %s, Path: %s\n", OSOpenErr.Error(), |
| 243 | + *JSONConfigFilePath) |
| 244 | + return |
| 245 | + } |
| 246 | + |
| 247 | + ServerList := parse_json(JSONConfigFilefd, *GrepPatternString) |
| 248 | + |
| 249 | + query(ServerList, "daemon") |
| 250 | + |
| 251 | + //close the configuration file to prevent leaks |
| 252 | + JSONConfigFileCloseErr := JSONConfigFilefd.Close() |
| 253 | + if JSONConfigFileCloseErr != nil { |
| 254 | + fmt.Printf("Closing JSON Configuration File Failed! Error: %s, Path: %s\n", |
| 255 | + JSONConfigFileCloseErr.Error(), JSONConfigFilePath) |
| 256 | + } |
| 257 | +} |
0 commit comments