csv-extractor/extract_frames_and_tables.py
2026-02-04 21:11:16 -03:00

102 lines
3.2 KiB
Python

#!/usr/bin/env python3
"""Extract every Nth frame as PNG; OCR with SVG column/row bounds; save one CSV per frame."""
import csv
import os
import sys
import cv2
import pytesseract
from assign_cells import assign_cell, merge_cell_words
from svg_columns import column_bounds, parse_column_rects, row_bounds
def extract_frames_every_n(video_path, out_dir, n=10):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"Cannot open video: {video_path}")
os.makedirs(out_dir, exist_ok=True)
frame_num = 0
saved = []
while True:
ret, frame = cap.read()
if not ret:
break
if frame_num % n == 0:
path = os.path.join(out_dir, f"{frame_num}.png")
cv2.imwrite(path, frame)
saved.append((frame_num, path))
frame_num += 1
cap.release()
return saved
def ocr_with_positions(image, lang="por"):
data = pytesseract.image_to_data(image, lang=lang, output_type=pytesseract.Output.DICT)
out = []
for i in range(len(data["text"])):
t = (data["text"][i] or "").strip()
if not t:
continue
out.append({
"text": t,
"x": data["left"][i],
"y": data["top"][i],
"w": data["width"][i],
"h": data["height"][i],
})
return out
def build_table_from_boxes(boxes, col_bounds, row_bounds, num_cols=6, num_rows=28):
cells = {}
for b in boxes:
cx = b["x"] + b["w"] / 2
cy = b["y"] + b["h"] / 2
ci, ri = assign_cell(cx, cy, col_bounds, row_bounds)
if ci is not None and ri is not None:
key = (ri, ci)
cells.setdefault(key, []).append(b["text"])
table = [[""] * num_cols for _ in range(num_rows)]
for (ri, ci), words in cells.items():
if 0 <= ri < num_rows and 0 <= ci < num_cols:
table[ri][ci] = merge_cell_words(words, ci)
return table
def write_csv(path, table):
with open(path, "w", newline="", encoding="utf-8") as f:
csv.writer(f).writerows(table)
def run(out_dir, video_path, svg_path, every_n=10, num_rows=28):
cols = parse_column_rects(svg_path)
col_bounds = column_bounds(cols)
table_y = cols[0]["y"]
table_h = cols[0]["h"]
row_b = row_bounds(num_rows, table_y, table_h)
num_cols = len(col_bounds)
frames = extract_frames_every_n(video_path, out_dir, every_n)
for frame_num, png_path in frames:
img = cv2.imread(png_path)
if img is None:
continue
boxes = ocr_with_positions(img, lang="por")
table = build_table_from_boxes(boxes, col_bounds, row_b, num_cols, num_rows)
csv_path = os.path.join(out_dir, f"{frame_num}.csv")
write_csv(csv_path, table)
return len(frames)
def main():
out_dir = sys.argv[1] if len(sys.argv) > 1 else "frames"
video_path = sys.argv[2] if len(sys.argv) > 2 else "video-data.webm"
svg_path = sys.argv[3] if len(sys.argv) > 3 else "template.svg"
every_n = int(sys.argv[4]) if len(sys.argv) > 4 else 10
n = run(out_dir, video_path, svg_path, every_n=every_n)
print(f"Extracted {n} frames and CSVs to {out_dir}/")
if __name__ == "__main__":
main()