aboutsummaryrefslogtreecommitdiff
path: root/bulk-audio/bulk-audio.py
diff options
context:
space:
mode:
authorlonkaars <loek@pipeframe.xyz>2023-10-08 14:15:43 +0200
committerlonkaars <loek@pipeframe.xyz>2023-10-08 14:15:43 +0200
commit7e0166543e946a7e7a553169a49420e769a53e14 (patch)
treeddc8cd5ead92b79c6bdb0852c417510fbe40caa0 /bulk-audio/bulk-audio.py
parent855483401c04f4e741733736de7958b8d88db849 (diff)
add options to bulk audio script
Diffstat (limited to 'bulk-audio/bulk-audio.py')
-rwxr-xr-xbulk-audio/bulk-audio.py93
1 files changed, 72 insertions, 21 deletions
diff --git a/bulk-audio/bulk-audio.py b/bulk-audio/bulk-audio.py
index 9d34cdc..1027b3e 100755
--- a/bulk-audio/bulk-audio.py
+++ b/bulk-audio/bulk-audio.py
@@ -1,13 +1,22 @@
#!/bin/python3
-
import sys
import subprocess
import hashlib
import os
import re
+import io
import argparse
from math import floor, log10
+from time import sleep
+
+real_stdout = sys.stdout
+class TrashFileIO(object):
+ def write(self, x): pass
+trash_out = TrashFileIO()
+
+sys.stdout = trash_out
import aqt
+sys.stdout = real_stdout
# this function only works for refold-tools sentence mining card template
pattern = re.compile("^([^[、 【]+)[^【]*(【(.+)】)?")
@@ -30,17 +39,34 @@ def parse_args(argv):
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.usage = f"{argv[0]} [options] [anki options]"
- parser.add_argument("-n", "--note-type", help="note type to add audio to", default="Sentence mining")
+ parser.add_argument("-t", "--note-type", help="note type to add audio to", default="Sentence mining")
parser.add_argument("-a", "--audio-field", help="field name to modify with audio", default="Audio")
parser.add_argument("-f", "--filename-prefix", help="download filename prefix", default="refold-tools-")
+ parser.add_argument("-s", "--source-list", help="set source list (see `./get -h`)", default=None)
+ parser.add_argument("-O", "--force-override", help="force override audio field, even if it is not empty", action='store_true')
+ parser.add_argument("-C", "--clear-audio", help="CLEARS ALL AUDIO FIELDS REGARDLESS OF VALUE", action='store_true')
+ parser.add_argument("-n", "--noaudio", help="only modify notes that have \"noaudio\" as AUDIO_FIELD value", action='store_true')
+ parser.add_argument("-i", "--note-id", help="select specific note (specify multiple times to select multiple notes)", action='append', nargs=1, default=[])
+ parser.add_argument("-d", "--dry-run", help="print only, do not edit anything", action='store_true')
return parser.parse_known_args(argv[1:])
def main():
options, args = parse_args(sys.argv)
args.insert(0, sys.argv[0]) # restore first index of argv (QT crashes if argv[] is empty)
+ if options.clear_audio:
+ print("Safety delay of 3 seconds (are you sure you want to clear ALL audio fields?)...")
+ print("Press Ctrl+C to cancel")
+ sleep(3)
+
# forward remaining CLI parameters to Anki
+ sys.stdout = trash_out
app = aqt._run(args, False)
+ sys.stdout = real_stdout
+ if aqt.mw == None:
+ print("Please close any open Anki windows before running this script!")
+ exit(1)
+
# load last open profile if no profile was specified on command line (option parsed by Anki)
if not aqt.mw.pm.name:
aqt.mw.pm.load(aqt.mw.pm.last_loaded_profile_name())
@@ -50,24 +76,38 @@ def main():
model = col.models.by_name(options.note_type)
note_ids = col.models.nids(model)
+ # filter list if note ids were specified
+ if len(options.note_id) > 0:
+ filtered_note_ids = [int(arg[0]) for arg in options.note_id]
+ note_ids = [nid for nid in note_ids if nid in filtered_note_ids]
+
+ # filter only "noaudio" cards
+ if options.noaudio:
+ note_ids = [nid for nid in note_ids if col.get_note(nid)[options.audio_field] == "noaudio"]
+
+ if len(note_ids) == 0:
+ print("-- No notes found! (check your filters or note type?) --")
+ exit(1)
+
edited_notes = 0
+ note_index_format = ("{:0" + str(floor(log10(len(note_ids))) + 1) + "d}/{:d}")
for note_index, note_id in enumerate(note_ids):
note = col.get_note(note_id)
- note_index_format = ("{:0" + str(floor(log10(len(note_ids))) + 1) + "d}/{:d}").format(note_index + 1, len(note_ids))
- print(f"[nid:{note_id}] ({note_index_format}) ", end="")
+ print(f"[nid:{note_id}] ({note_index_format.format(note_index + 1, len(note_ids))}) ", end="")
- # bulk clear audio field (dev only)
- # note[options.audio_field] = ""
- # note.flush()
- # print(f"cleared \"{options.audio_field}\" field!")
- # continue
+ if options.clear_audio:
+ if not options.dry_run:
+ note[options.audio_field] = ""
+ note.flush()
+ print(f"cleared \"{options.audio_field}\" field!")
+ continue
# autosave deck every 20 cards
if note_index % 20 == 0: col.save()
# skip any notes that already have audio
- if len(note[options.audio_field]) > 0:
+ if not options.force_override and len(note[options.audio_field]) > 0:
print("skipped -> audio field not empty")
continue
@@ -79,10 +119,12 @@ def main():
print(f"{kanji} ({kana}) ", end="")
# attempt to download audio
- exit_code, data = get(kanji, kana)
+ exit_code, data = get(kanji, kana, options.source_list)
if exit_code != 0:
- note[options.audio_field] = "noaudio"
- note.flush()
+ if not options.dry_run:
+ note[options.audio_field] = "noaudio"
+ note.flush()
+ edited_notes += 1
print("skipped -> no recording available, marked as 'noaudio'")
continue
@@ -90,14 +132,16 @@ def main():
digest = hashlib.sha1(data).hexdigest()
filename = f"{options.filename_prefix}{digest}.mp3"
output_path = os.path.join(media_dir, filename)
- with open(output_path, "wb+") as f:
- f.write(data)
- f.close()
+ if not options.dry_run:
+ with open(output_path, "wb+") as f:
+ f.write(data)
+ f.close()
# set audio field to audio filename
audio_str = f"[sound:{filename}]"
- note[options.audio_field] = audio_str
- note.flush()
+ if not options.dry_run:
+ note[options.audio_field] = audio_str
+ note.flush()
print(f"written audio as {audio_str}")
edited_notes += 1
@@ -107,12 +151,19 @@ def main():
print("-- Done: no edits --")
else:
print(f"-- Done: succesfully edited {edited_notes} note{'' if edited_notes == 1 else 's'} --")
- print("TODO: circumvent below error message (anki python api problems, notes were edited succesfully though):")
+ # circumvent "Exception ignored in atexit callbackException ignored in sys.unraisablehook"
+ sys.stdout = trash_out
# run ./get to get audio data from stdout
# returns (exit_code, stdout_data)
-def get(kanji, kana):
- p = subprocess.run(["./get", kanji, kana], capture_output=True)
+def get(kanji, kana, source_list):
+ args = ["./get"]
+ if source_list != None:
+ args.append("-s")
+ args.append(source_list)
+ args.append(kanji)
+ args.append(kana)
+ p = subprocess.run(args, capture_output=True)
if p.returncode != 0:
return (1, None)
return (0, p.stdout)