From d6b1b56fa47008f0cf5c511bc9664ca2c3483a6b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 13 Apr 2026 14:49:47 +0000 Subject: [PATCH 1/2] Initial plan From 6eb98769ed990c9019ec7d87c34352f82092cbf5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 13 Apr 2026 15:09:23 +0000 Subject: [PATCH 2/2] Fix optical flow video export: replace cv2.VideoWriter with FFmpeg subprocess Agent-Logs-Url: https://github.com/fourMs/MGT-python/sessions/f77c804c-89c5-45c5-aba8-7ce1cc97c650 Co-authored-by: alexarje <114316+alexarje@users.noreply.github.com> --- musicalgestures/_flow.py | 30 +++++++++------ tests/test_flow.py | 82 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 12 deletions(-) create mode 100644 tests/test_flow.py diff --git a/musicalgestures/_flow.py b/musicalgestures/_flow.py index fbc61b8..7cca289 100644 --- a/musicalgestures/_flow.py +++ b/musicalgestures/_flow.py @@ -7,7 +7,7 @@ from scipy.stats import entropy import musicalgestures -from musicalgestures._utils import MgFigure, extract_wav, embed_audio_in_video, MgProgressbar, convert_to_avi, generate_outfilename +from musicalgestures._utils import MgFigure, extract_wav, embed_audio_in_video, MgProgressbar, convert_to_avi, generate_outfilename, ffmpeg_cmd class Flow: @@ -111,8 +111,10 @@ def dense( if not overwrite: target_name = generate_outfilename(target_name) - fourcc = cv2.VideoWriter_fourcc(*'MJPG') - out = cv2.VideoWriter(target_name, fourcc, fps, (width, height)) + cmd = ['ffmpeg', '-y', '-s', '{}x{}'.format(width, height), + '-r', str(fps), '-f', 'rawvideo', '-pix_fmt', 'bgr24', '-vcodec', 'rawvideo', + '-i', '-', '-vcodec', 'mjpeg', '-q:v', '3', target_name] + out = ffmpeg_cmd(cmd, total_time=length, pipe='write') ret, frame1 = vidcap.read() prev_frame = cv2.cvtColor(cv2.resize(frame1, size), cv2.COLOR_BGR2GRAY) @@ -154,14 +156,14 @@ def dense( if skip_empty: if np.sum(rgb) > 0: - out.write(rgb.astype(np.uint8)) + out.stdin.write(rgb.astype(np.uint8)) else: if ii == 0: - out.write(rgb.astype(np.uint8)) + out.stdin.write(rgb.astype(np.uint8)) else: - out.write(prev_rgb.astype(np.uint8)) + out.stdin.write(prev_rgb.astype(np.uint8)) else: - out.write(rgb.astype(np.uint8)) + out.stdin.write(rgb.astype(np.uint8)) if skip_empty: if np.sum(rgb) > 0 or ii == 0: @@ -232,7 +234,8 @@ def dense( return mgf else: - out.release() + out.stdin.close() + out.wait() destination_video = target_name if self.has_audio: @@ -321,7 +324,6 @@ def sparse( vidcap = cv2.VideoCapture(filename) ret, frame = vidcap.read() - fourcc = cv2.VideoWriter_fourcc(*'MJPG') fps = int(vidcap.get(cv2.CAP_PROP_FPS)) width = int(vidcap.get(cv2.CAP_PROP_FRAME_WIDTH)) @@ -336,7 +338,10 @@ def sparse( if not overwrite: target_name = generate_outfilename(target_name) - out = cv2.VideoWriter(target_name, fourcc, fps, (width, height)) + cmd = ['ffmpeg', '-y', '-s', '{}x{}'.format(width, height), + '-r', str(fps), '-f', 'rawvideo', '-pix_fmt', 'bgr24', '-vcodec', 'rawvideo', + '-i', '-', '-vcodec', 'mjpeg', '-q:v', '3', target_name] + out = ffmpeg_cmd(cmd, total_time=length, pipe='write') # params for ShiTomasi corner detection feature_params = dict(maxCorners=corner_max_corners, @@ -390,7 +395,7 @@ def sparse( img = cv2.add(frame, mask) - out.write(img.astype(np.uint8)) + out.stdin.write(img.astype(np.uint8)) # Now update the previous frame and previous points old_gray = frame_gray.copy() @@ -403,7 +408,8 @@ def sparse( pb.progress(ii) ii += 1 - out.release() + out.stdin.close() + out.wait() destination_video = target_name diff --git a/tests/test_flow.py b/tests/test_flow.py new file mode 100644 index 0000000..e7d692d --- /dev/null +++ b/tests/test_flow.py @@ -0,0 +1,82 @@ +import musicalgestures +import os +import pytest + + +@pytest.fixture(scope="class") +def testvideo_avi(tmp_path_factory): + target_name = str(tmp_path_factory.mktemp("data")).replace("\\", "/") + "/testvideo.avi" + testvideo_avi = musicalgestures._utils.extract_subclip( + musicalgestures.examples.dance, 5, 6, target_name=target_name) + return testvideo_avi + + +@pytest.fixture(scope="class") +def testvideo_mp4(tmp_path_factory): + target_name = str(tmp_path_factory.mktemp("data")).replace( + "\\", "/") + "/testvideo.avi" + testvideo_avi = musicalgestures._utils.extract_subclip( + musicalgestures.examples.dance, 5, 6, target_name=target_name) + testvideo_mp4 = musicalgestures._utils.convert_to_mp4(testvideo_avi) + os.remove(testvideo_avi) + return testvideo_mp4 + + +class Test_flow_dense: + def test_normal_case(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + result = mg.flow.dense() + assert type(result) == musicalgestures.MgVideo + assert os.path.isfile(result.filename) == True + + def test_overwrite(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + result = mg.flow.dense(overwrite=True) + assert type(result) == musicalgestures.MgVideo + assert os.path.isfile(result.filename) == True + + def test_not_avi(self, testvideo_mp4): + mg = musicalgestures.MgVideo(testvideo_mp4) + result = mg.flow.dense() + assert type(result) == musicalgestures.MgVideo + assert os.path.isfile(result.filename) == True + + def test_skip_empty(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + result = mg.flow.dense(skip_empty=True) + assert type(result) == musicalgestures.MgVideo + assert os.path.isfile(result.filename) == True + + def test_with_target_name(self, testvideo_avi): + target_name = os.path.dirname(testvideo_avi) + "/result_dense.avi" + mg = musicalgestures.MgVideo(testvideo_avi) + result = mg.flow.dense(target_name=target_name, overwrite=True) + assert type(result) == musicalgestures.MgVideo + assert os.path.isfile(result.filename) == True + + +class Test_flow_sparse: + def test_normal_case(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + result = mg.flow.sparse() + assert type(result) == musicalgestures.MgVideo + assert os.path.isfile(result.filename) == True + + def test_overwrite(self, testvideo_avi): + mg = musicalgestures.MgVideo(testvideo_avi) + result = mg.flow.sparse(overwrite=True) + assert type(result) == musicalgestures.MgVideo + assert os.path.isfile(result.filename) == True + + def test_not_avi(self, testvideo_mp4): + mg = musicalgestures.MgVideo(testvideo_mp4) + result = mg.flow.sparse() + assert type(result) == musicalgestures.MgVideo + assert os.path.isfile(result.filename) == True + + def test_with_target_name(self, testvideo_avi): + target_name = os.path.dirname(testvideo_avi) + "/result_sparse.avi" + mg = musicalgestures.MgVideo(testvideo_avi) + result = mg.flow.sparse(target_name=target_name, overwrite=True) + assert type(result) == musicalgestures.MgVideo + assert os.path.isfile(result.filename) == True