diff --git a/.gitignore b/.gitignore index f5b9c7a1..9c34e72c 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ venv/ # Uploaded images webservice/uploaded +tmp/ diff --git a/colorization/pipeline.py b/colorization/pipeline.py index e48136d0..99e41da1 100644 --- a/colorization/pipeline.py +++ b/colorization/pipeline.py @@ -1,6 +1,10 @@ from .colorize_process import preprocess, inference, postprocess import os import cv2 +import tempfile +from .videodata import video2frames, \ + frames2video, split_audio_from_video, merge_audio_and_video +from moviepy.editor import VideoFileClip import shutil # error codes @@ -93,11 +97,68 @@ def colorize_video(video_path_input, video_path_output): video_path_output : str the path of the (colorized) video after processing """ - # TODO: load video located at - # TODO: split video into images - # TODO: call on each image - # TODO: combine to video - # TODO: save at + # check if the input path is valid + if not os.path.isfile(video_path_input): + print("input path is not a file.") + return FAILED - shutil.copyfile(video_path_input, video_path_output) + # split video into images + my_tmp_path = os.path.join(os.path.abspath( + os.path.dirname(__file__)), '../tmp') + + # hack to fix problem when the tmp folder is not found + # I suspect this is due to the colorization folder beeing + # used through a symlink in the webservice + if my_tmp_path.endswith('webservice/colorization/../tmp'): + my_tmp_path = my_tmp_path.replace('webservice/colorization/../tmp', 'tmp') + + print(my_tmp_path) + + tmpdir = tempfile.mkdtemp(suffix="_split_and_merge", + prefix="tp_images_and_audio_", dir=my_tmp_path) + image_output_folder_path = tmpdir + video_intermediate_path = os.path.join(tmpdir, 'merged_images.webm') + audio_path = os.path.join(tmpdir, 'split_audio.ogg') + ret = video2frames(video_path_input, image_output_folder_path) + if ret != SUCCESS: + print("split video into images failed") + shutil.rmtree(tmpdir) + return FAILED + # call colorize_image on each image + images = os.listdir(image_output_folder_path) + for i in range(len(images)): + image_path = os.path.join(image_output_folder_path, images[i]) + ret = colorize_image(image_path, image_path) + if ret != SUCCESS: + print("colorize video failed") + shutil.rmtree(tmpdir) + return FAILED + + # combine to video and save at + if VideoFileClip(video_path_input).audio is None: + ret = frames2video(image_output_folder_path, video_path_output) + if ret != SUCCESS: + print("merge images back to video failed") + shutil.rmtree(tmpdir) + return FAILED + else: + ret = frames2video(image_output_folder_path, video_intermediate_path) + if ret != SUCCESS: + print("merge images back to video failed") + shutil.rmtree(tmpdir) + return FAILED + ret = split_audio_from_video(video_path_input, audio_path) + if ret != SUCCESS: + print("split audio from original video failed") + shutil.rmtree(tmpdir) + return FAILED + ret = merge_audio_and_video(video_intermediate_path, + audio_path, video_path_output) + if ret != SUCCESS: + print("merge audio back to colorized video failed") + shutil.rmtree(tmpdir) + return FAILED + + # cleanup and return success code + shutil.rmtree(tmpdir) return SUCCESS diff --git a/colorization/test_data/audio_for_video.ogg b/colorization/test_data/audio_for_video.ogg new file mode 100644 index 00000000..d9bec4f0 Binary files /dev/null and b/colorization/test_data/audio_for_video.ogg differ diff --git a/colorization/tests.py b/colorization/tests.py index d532afa7..4a42e4a5 100644 --- a/colorization/tests.py +++ b/colorization/tests.py @@ -5,7 +5,7 @@ import shutil import colorization.videodata as videodata from colorization.colorize_process import preprocess, inference, postprocess -from colorization.pipeline import colorize_image +from colorization.pipeline import colorize_image, colorize_video FAILED = 1 SUCCESS = 0 @@ -94,22 +94,32 @@ class SplitAndMergeTestsForVideo(unittest.TestCase): def setUp(self): # init path variables + self.image_output_folder_path = os.path.join(os.path.abspath( + os.path.dirname(__file__)), 'test_data/split_frames') self.video_input_path = os.path.join(os.path.abspath( os.path.dirname(__file__)), 'test_data/greyscaleVideo.mp4') + self.video_input_path_with_audio = os.path.join(os.path.abspath( + os.path.dirname(__file__)), 'test_data/test_video_with_voice.mp4') + self.video_output_path = os.path.join(os.path.abspath( + os.path.dirname(__file__)), 'test_data/merged_video.webm') self.audio_output_path = os.path.join(os.path.abspath( - os.path.dirname(__file__)), 'test_data/audio_from_video.mp3') + os.path.dirname(__file__)), 'test_data/audio_from_video.ogg') self.audio_input_path = os.path.join(os.path.abspath( - os.path.dirname(__file__)), 'test_data/audio_for_video.mp3') + os.path.dirname(__file__)), 'test_data/audio_for_video.ogg') self.video_with_audio_output_path = os.path.join(os.path.abspath( - os.path.dirname(__file__)), 'test_data/merged_video_with_audio.mp4') + os.path.dirname(__file__)), 'test_data/merged_video_with_audio.webm') def tearDown(self): # cleanup files, that were created in this class print('tear down called') + if os.path.isfile(self.video_output_path): + os.remove(self.video_output_path) if os.path.isfile(self.audio_output_path): os.remove(self.audio_output_path) if os.path.isfile(self.video_with_audio_output_path): os.remove(self.video_with_audio_output_path) + if os.path.isdir(self.image_output_folder_path): + shutil.rmtree(self.image_output_folder_path) def test_step_video2frames_frames2video(self): """ @@ -119,41 +129,34 @@ def test_step_video2frames_frames2video(self): # current path cwd = os.path.abspath(os.path.dirname(__file__)) # split_frames path - image_output_folder_path = os.path.join(cwd, 'test_data/split_frames') # create split_frames path folder - os.mkdir(image_output_folder_path) + os.mkdir(self.image_output_folder_path) # split the video - ret = videodata.video2frames(self.video_input_path, image_output_folder_path) + ret = videodata.video2frames(self.video_input_path, + self.image_output_folder_path) self.assertEqual(ret, SUCCESS) # Test2: for wrong path (as a picture) video_input_path2 = os.path.join(cwd, 'test_data/dog.jpg') # split the video ret = videodata.video2frames(video_input_path2, - image_output_folder_path) + self.image_output_folder_path) self.assertEqual(ret, FAILED) # Test3: test to merge the frames - # output video path - video_output_path = os.path.join(cwd, 'test_data/merged_video') - # create the output video folder - os.mkdir(video_output_path) # merge the video - ret = videodata.frames2video(image_output_folder_path, - video_output_path) + ret = videodata.frames2video(self.image_output_folder_path, + self.video_output_path) self.assertEqual(ret, SUCCESS) - # destroy the frames folder and video folder after test - shutil.rmtree(video_output_path) + self.assertTrue(os.path.isfile(self.video_output_path)) def test_step_split_audio_from_video(self): """ Unit-Test to test the split_audio_from_video function """ - my_video_input_path = os.path.join(os.path.abspath( - os.path.dirname(__file__)), 'test_data/test_video_with_voice.mp4') - self.assertTrue(os.path.isfile(my_video_input_path)) + self.assertTrue(os.path.isfile(self.video_input_path_with_audio)) ret = videodata.split_audio_from_video( - my_video_input_path, self.audio_output_path) + self.video_input_path_with_audio, self.audio_output_path) self.assertEqual(ret, SUCCESS) self.assertTrue(os.path.isfile(self.audio_output_path)) @@ -161,10 +164,10 @@ def test_step_merge_audio_and_video(self): """ Unit-Test to test the merge_audio_and_video function """ - self.assertTrue(os.path.isfile(self.video_input_path)) + self.assertTrue(os.path.isfile(self.video_input_path_with_audio)) self.assertTrue(os.path.isfile(self.audio_input_path)) ret = videodata.merge_audio_and_video( - self.video_input_path, self.audio_input_path, + self.video_input_path_with_audio, self.audio_input_path, self.video_with_audio_output_path) self.assertEqual(ret, SUCCESS) self.assertTrue(os.path.isfile(self.video_with_audio_output_path)) @@ -184,6 +187,17 @@ def setUp(self): os.path.dirname(__file__)), 'test_data/lena_colorized.png') self.fake_input_image_path = os.path.join(os.path.abspath( os.path.dirname(__file__)), '../../Data/notexist.png') + self.input_video_path = os.path.join(os.path.abspath( + os.path.dirname(__file__)), 'test_data/greyscaleVideo.mp4') + self.output_video_path = os.path.join(os.path.abspath( + os.path.dirname(__file__)), 'test_data/colorized_video.webm') + + self.input_video_path_with_audio = \ + os.path.join(os.path.abspath( + os.path.dirname(__file__)), 'test_data/test_video_with_voice.mp4') + self.output_video_path_with_audio = os.path.join(os.path.abspath( + os.path.dirname(__file__)), + 'test_data/colorized_video_with_audio.webm') def tearDown(self): # cleanup files, that were created in the @@ -191,6 +205,10 @@ def tearDown(self): print('tear down called') if os.path.isfile(self.output_image_path): os.remove(self.output_image_path) + if os.path.isfile(self.output_video_path): + os.remove(self.output_video_path) + if os.path.isfile(self.output_video_path_with_audio): + os.remove(self.output_video_path_with_audio) def test_complete_colorize_image(self): """ @@ -229,3 +247,17 @@ def test_colorize_twice(self): ret = colorize_image(self.input_image_path, self.output_image_path) self.assertEqual(ret, SUCCESS) + + def test_colorize_video(self): + """ + Functional test to test the colorization of two images in single session + """ + + ret = colorize_video(self.input_video_path, self.output_video_path) + self.assertEqual(ret, SUCCESS) + self.assertTrue(os.path.isfile(self.output_video_path)) + + ret = colorize_video(self.input_video_path_with_audio, + self.output_video_path_with_audio) + self.assertEqual(ret, SUCCESS) + self.assertTrue(os.path.isfile(self.output_video_path_with_audio)) diff --git a/colorization/videodata.py b/colorization/videodata.py index 48a7e964..8a91d5a8 100644 --- a/colorization/videodata.py +++ b/colorization/videodata.py @@ -1,12 +1,12 @@ import cv2 import os -import numpy -import shutil from moviepy.editor import AudioFileClip from moviepy.editor import VideoFileClip +from moviepy.video.io import ImageSequenceClip SUCCESS = 0 FAILED = 1 +FPS = 60 def video2frames(video_input_path, image_output_folder_path): @@ -21,26 +21,21 @@ def video2frames(video_input_path, image_output_folder_path): video = cv2.VideoCapture(video_input_path) type = os.path.splitext(video_input_path)[-1] if (video.isOpened() is False) or (not (type == '.mp4')): - print("Error opening video") + print("Input path is not a video") return FAILED - FPS = 60 # frames per second - video.set(cv2.CAP_PROP_FPS, FPS) + global FPS + FPS = int(video.get(cv2.CAP_PROP_FPS)) currentFrame = 0 while (video.isOpened()): ret, frame = video.read() if ret is True: folder_name = os.path.join(image_output_folder_path, str(currentFrame) + '.png') - # print('Creating...' + folder_name) - # cv2.imshow('Frame', frame) cv2.imwrite(folder_name, frame) currentFrame += 1 - # if cv2.waitKey(25) & 0xFF == ord('q'): - # break else: break video.release() - # cv2.destroyAllWindows() return SUCCESS @@ -54,27 +49,11 @@ def frames2video(image_input_folder_path, video_output_path): 1 for FAILED. """ - mat = cv2.imread(os.path.join(image_input_folder_path + '/0.png'), - cv2.IMREAD_COLOR) - # print(os.path.join(image_input_folder_path + '/0.png')) - if numpy.any(mat) is None: - return FAILED - size = mat.shape[:2] - FPS = 60 - fourcc = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G') - video = cv2.VideoWriter(os.path.join(video_output_path, "out01.avi"), - fourcc, FPS, (size[1], size[0])) files = os.listdir(image_input_folder_path) - length = len(files) - for i in range(0, length): - index = str(i) - item = image_input_folder_path + '/' + index + '.png' - # print(item) - img = cv2.imread(item) - video.write(img) - video.release() - # after merged video delate the image_input_folder_path folder - shutil.rmtree(image_input_folder_path) + frames_path = [image_input_folder_path+'/'+str(i)+'.png' for i in range(len(files))] + clip = ImageSequenceClip.ImageSequenceClip(frames_path, fps=FPS) + clip.write_videofile(video_output_path, codec='libvpx', bitrate="50000k") + clip.close() return SUCCESS @@ -91,10 +70,12 @@ def split_audio_from_video(video_input_path, audio_output_path): print("invalid video path") return FAILED my_audio_clip = AudioFileClip(video_input_path) - my_audio_clip.write_audiofile(audio_output_path) + my_audio_clip.write_audiofile(audio_output_path, codec='libvorbis') if not os.path.isfile(audio_output_path): print("invalid output path") + my_audio_clip.close() return FAILED + my_audio_clip.close() return SUCCESS @@ -117,8 +98,12 @@ def merge_audio_and_video(video_input_path, audio_input_path, video_output_path) my_video_clip = VideoFileClip(video_input_path) my_audio_clip = AudioFileClip(audio_input_path) video = my_video_clip.set_audio(my_audio_clip) - video.write_videofile(video_output_path) + video.write_videofile(video_output_path, codec='libvpx') if not os.path.isfile(video_output_path): print("invalid output path") + my_audio_clip.close() + my_video_clip.close() return FAILED + my_audio_clip.close() + my_video_clip.close() return SUCCESS diff --git a/tmp/.keep b/tmp/.keep new file mode 100644 index 00000000..e69de29b diff --git a/webservice/app.py b/webservice/app.py index d5a2671e..f36fc511 100644 --- a/webservice/app.py +++ b/webservice/app.py @@ -136,7 +136,12 @@ def result(id): origin_url = url_for("uploaded_file", fpath=id, filename=f) extension = get_extension(f) - color_name = get_name(f) + "_color." + extension + + if extension.lower() in ALLOWED_EXTENSIONS['video']: + color_name = get_name(f) + "_color.webm" + else: + color_name = get_name(f) + "_color." + extension + colorized_url = url_for("uploaded_file", fpath=id, filename=color_name) # check whether the colorization process is finished @@ -198,14 +203,21 @@ def colorize(id): if get_name(f) == id: extension = get_extension(f) finpath = os.path.join(folder, f) - coloredfile = get_name(f) + "_color." + extension + + if extension.lower() in ALLOWED_EXTENSIONS['video']: + coloredfile = get_name(f) + "_color.webm" + else: + coloredfile = get_name(f) + "_color." + extension + foutpath = os.path.join(folder, coloredfile) if not os.path.exists(foutpath): # colorize_image if extension.lower() in ALLOWED_EXTENSIONS['pic']: + print(f'Starting colorization of {id}') res = pipeline.colorize_image(finpath, foutpath) elif extension.lower() in ALLOWED_EXTENSIONS['video']: + print(f'Starting colorization of {id}') res = pipeline.colorize_video(finpath, foutpath) else: return jsonify(msg="Unsupported file format"), 400