102 lines
3.2 KiB
Python
102 lines
3.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Extract every Nth frame as PNG; OCR with SVG column/row bounds; save one CSV per frame."""
|
|
import csv
|
|
import os
|
|
import sys
|
|
|
|
import cv2
|
|
import pytesseract
|
|
|
|
from assign_cells import assign_cell, merge_cell_words
|
|
from svg_columns import column_bounds, parse_column_rects, row_bounds
|
|
|
|
|
|
def extract_frames_every_n(video_path, out_dir, n=10):
|
|
cap = cv2.VideoCapture(video_path)
|
|
if not cap.isOpened():
|
|
raise RuntimeError(f"Cannot open video: {video_path}")
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
frame_num = 0
|
|
saved = []
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
if frame_num % n == 0:
|
|
path = os.path.join(out_dir, f"{frame_num}.png")
|
|
cv2.imwrite(path, frame)
|
|
saved.append((frame_num, path))
|
|
frame_num += 1
|
|
cap.release()
|
|
return saved
|
|
|
|
|
|
def ocr_with_positions(image, lang="por"):
|
|
data = pytesseract.image_to_data(image, lang=lang, output_type=pytesseract.Output.DICT)
|
|
out = []
|
|
for i in range(len(data["text"])):
|
|
t = (data["text"][i] or "").strip()
|
|
if not t:
|
|
continue
|
|
out.append({
|
|
"text": t,
|
|
"x": data["left"][i],
|
|
"y": data["top"][i],
|
|
"w": data["width"][i],
|
|
"h": data["height"][i],
|
|
})
|
|
return out
|
|
|
|
|
|
def build_table_from_boxes(boxes, col_bounds, row_bounds, num_cols=6, num_rows=28):
|
|
cells = {}
|
|
for b in boxes:
|
|
cx = b["x"] + b["w"] / 2
|
|
cy = b["y"] + b["h"] / 2
|
|
ci, ri = assign_cell(cx, cy, col_bounds, row_bounds)
|
|
if ci is not None and ri is not None:
|
|
key = (ri, ci)
|
|
cells.setdefault(key, []).append(b["text"])
|
|
table = [[""] * num_cols for _ in range(num_rows)]
|
|
for (ri, ci), words in cells.items():
|
|
if 0 <= ri < num_rows and 0 <= ci < num_cols:
|
|
table[ri][ci] = merge_cell_words(words, ci)
|
|
return table
|
|
|
|
|
|
def write_csv(path, table):
|
|
with open(path, "w", newline="", encoding="utf-8") as f:
|
|
csv.writer(f).writerows(table)
|
|
|
|
|
|
def run(out_dir, video_path, svg_path, every_n=10, num_rows=28):
|
|
cols = parse_column_rects(svg_path)
|
|
col_bounds = column_bounds(cols)
|
|
table_y = cols[0]["y"]
|
|
table_h = cols[0]["h"]
|
|
row_b = row_bounds(num_rows, table_y, table_h)
|
|
num_cols = len(col_bounds)
|
|
|
|
frames = extract_frames_every_n(video_path, out_dir, every_n)
|
|
for frame_num, png_path in frames:
|
|
img = cv2.imread(png_path)
|
|
if img is None:
|
|
continue
|
|
boxes = ocr_with_positions(img, lang="por")
|
|
table = build_table_from_boxes(boxes, col_bounds, row_b, num_cols, num_rows)
|
|
csv_path = os.path.join(out_dir, f"{frame_num}.csv")
|
|
write_csv(csv_path, table)
|
|
return len(frames)
|
|
|
|
|
|
def main():
|
|
out_dir = sys.argv[1] if len(sys.argv) > 1 else "frames"
|
|
video_path = sys.argv[2] if len(sys.argv) > 2 else "video-data.webm"
|
|
svg_path = sys.argv[3] if len(sys.argv) > 3 else "template.svg"
|
|
every_n = int(sys.argv[4]) if len(sys.argv) > 4 else 10
|
|
n = run(out_dir, video_path, svg_path, every_n=every_n)
|
|
print(f"Extracted {n} frames and CSVs to {out_dir}/")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|