-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathzipout.py
165 lines (142 loc) · 7.7 KB
/
zipout.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
First make sure you have a Python3 program for your answer in ./answer/
Then run:
python3 zipout.py
This will create a file `output.zip`.
To customize the files used by default, run:
python3 zipout.py -h
"""
import sys, os, argparse, logging, tempfile, subprocess, shutil
import iocollect
class ZipOutput:
def __init__(self, opts):
self.run_program = opts.run_program # solution to hw that is being tested
self.python_bin = opts.python_bin # Python binary to run
self.answer_dir = opts.answer_dir # name of directory where run_program exists
self.input_dir = opts.input_dir # directory where input files are placed
self.output_dir = opts.output_dir # directory for output files of your program
self.file_suffix = opts.file_suffix # file suffix for input files
self.device = opts.device # cuda device
self.model = opts.model # model
def mkdirp(self, path):
try:
os.makedirs(path)
except os.error:
print("Warning: {} already exists. Existing files will be over-written.".format(path), file=sys.stderr)
pass
def run(self, filename, path, output_path, base):
"""
Runs a command specified by an argument vector (including the program name)
and returns lists of lines from stdout and stderr.
"""
# create the output files
if output_path is not None:
stdout_path = os.path.join(output_path, "{}.out".format(base))
stderr_path = os.path.join(output_path, "{}.err".format(base))
# existing files are erased!
stdout_file = open(stdout_path, 'w')
stderr_file = open(stderr_path, 'w')
status_path = os.path.join(output_path, "{}.ret".format(base))
else:
stdout_file, stdout_path = tempfile.mkstemp("stdout")
stderr_file, stderr_path = tempfile.mkstemp("stderr")
status_path = None
run_program_path = os.path.abspath(os.path.join(self.answer_dir, self.run_program))
run_python = os.path.abspath(self.python_bin)
if os.path.exists(run_python) and os.access(run_python, os.X_OK):
argv = [ run_python, run_program_path, '-i', filename, '-d', self.device, '-m', self.model ]
else:
print("Did not find {}. Are you sure you set up a virtualenv? Run `python3 -m venv venv` in the current directory.".format(self.python_bin), file=sys.stderr)
if os.path.exists(self.run_program_path) and os.access(self.run_program_path, os.X_OK):
argv = [ run_program_path, '-i', filename, '-d', self.device, '-m', self.model ]
else:
raise ValueError("Could not run {} {}".format(self.python_bin, self.run_program_path))
stdin_file = open(filename, 'r')
try:
try:
prog = subprocess.Popen(argv, stdin=stdin_file or subprocess.PIPE, stdout=stdout_file, stderr=stderr_file)
if stdin_file is None:
prog.stdin.close()
prog.wait()
finally:
if output_path is not None:
stdout_file.close()
stderr_file.close()
else:
os.close(stdout_file)
os.close(stderr_file)
if status_path is not None:
with open(status_path, 'w') as status_file:
print(prog.returncode, file=status_file)
with open(stdout_path) as stdout_input:
stdout_lines = list(stdout_input)
with open(stderr_path) as stderr_input:
stderr_lines = list(stderr_input)
if prog.stdin != None:
prog.stdin.close()
return stdout_lines, stderr_lines, prog.returncode
except:
print("error: something went wrong when trying to run the following command:", file=sys.stderr)
print(argv, file=sys.stderr)
raise
#sys.exit(1)
finally:
if output_path is None:
os.remove(stdout_path)
os.remove(stderr_path)
def run_path(self, path, files):
# set up output directory
if path is None or path == '':
output_path = os.path.abspath(self.output_dir)
else:
output_path = os.path.abspath(os.path.join(self.output_dir, path))
self.mkdirp(output_path)
for filename in files:
if path is None or path == '':
testfile_path = os.path.abspath(os.path.join(self.input_dir, filename))
else:
testfile_path = os.path.abspath(os.path.join(self.input_dir, path, filename))
if filename[-len(self.file_suffix):] == self.file_suffix:
base = filename[:-len(self.file_suffix)]
if os.path.exists(testfile_path):
print("running on input {}".format(testfile_path), file=sys.stderr)
self.run(testfile_path, path, output_path, base)
def run_all(self):
# check that a compiled binary exists to run on the input files
argv = os.path.abspath(os.path.join(self.answer_dir, self.run_program))
if not (os.path.isfile(argv)):
logging.error("answer program missing: {}".format(argv))
raise ValueError("Missing {}".format(argv))
# check if input directory has subdirectories
testcase_subdirs = iocollect.getdirs(os.path.abspath(self.input_dir))
if len(testcase_subdirs) > 0:
for subdir in testcase_subdirs:
files = iocollect.getfiles(os.path.abspath(os.path.join(self.testcase_dir, subdir)))
self.run_path(subdir, files)
else:
files = iocollect.getfiles(os.path.abspath(self.input_dir))
self.run_path(None, files)
return True
if __name__ == '__main__':
#zipout_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
argparser = argparse.ArgumentParser()
argparser.add_argument("-r", "--run", dest="run_program", default='prefopt.py', help="run this program against testcases [default: prefopt.py]")
argparser.add_argument("-x", "--pythonbin", dest="python_bin", default='venv/bin/python3', help="run this binary of Python to run the program [default: python3]")
argparser.add_argument("-a", "--answerdir", dest="answer_dir", default='answer', help="answer directory [default: answer]")
argparser.add_argument("-i", "--inputdir", dest="input_dir", default=os.path.join('data', 'input'), help="testcases directory [default: data/input]")
argparser.add_argument("-e", "--ending", dest="file_suffix", default='.txt', help="suffix to use for testcases [default: .txt]")
argparser.add_argument("-o", "--output", dest="output_dir", default='output', help="Save the output from the testcases to this directory.")
argparser.add_argument("-z", "--zipfile", dest="zipfile", default='output', help="zip file with your output answers")
argparser.add_argument("-l", "--logfile", dest="logfile", default=None, help="log file for debugging")
argparser.add_argument("-d", "--device", dest="device", default='cpu', help="optionally provide a cuda device")
argparser.add_argument("-m", "--model", dest="model", default="Qwen/Qwen2.5-0.5B-Instruct", help="model to use with prefopt.py")
opts = argparser.parse_args()
if opts.logfile is not None:
logging.basicConfig(filename=opts.logfile, filemode='w', level=logging.INFO)
zo = ZipOutput(opts)
if zo.run_all():
outputs_zipfile = shutil.make_archive(opts.zipfile, 'zip', opts.output_dir)
print("{} created".format(outputs_zipfile), file=sys.stderr)
else:
logging.error("problem in creating output zip file")
sys.exit(1)