dev
This commit is contained in:
5
.gitignore
vendored
5
.gitignore
vendored
@@ -1,3 +1,4 @@
|
|||||||
/venv/
|
|
||||||
.venv/
|
|
||||||
.env
|
.env
|
||||||
|
venv
|
||||||
|
downloads
|
||||||
|
urls.txt
|
||||||
114
app.py
114
app.py
@@ -8,8 +8,7 @@ from telegram.ext import Application
|
|||||||
|
|
||||||
async def download_single_pack(bot: Bot, session: aiohttp.ClientSession, sticker_url: str, allowed_types: list):
|
async def download_single_pack(bot: Bot, session: aiohttp.ClientSession, sticker_url: str, allowed_types: list):
|
||||||
"""
|
"""
|
||||||
Asynchronously downloads specific file types from a sticker pack into its own folder.
|
Asynchronously downloads sticker files based on user-selected types or an 'auto' mode.
|
||||||
Includes improved logging for skipped files.
|
|
||||||
"""
|
"""
|
||||||
master_download_dir = "downloads"
|
master_download_dir = "downloads"
|
||||||
try:
|
try:
|
||||||
@@ -26,53 +25,75 @@ async def download_single_pack(bot: Bot, session: aiohttp.ClientSession, sticker
|
|||||||
|
|
||||||
sticker_set = await bot.get_sticker_set(pack_name)
|
sticker_set = await bot.get_sticker_set(pack_name)
|
||||||
total_stickers = len(sticker_set.stickers)
|
total_stickers = len(sticker_set.stickers)
|
||||||
print(f"Found {total_stickers} stickers. Filtering for types: {', '.join(allowed_types)}")
|
|
||||||
|
mode_text = "auto" if "auto" in allowed_types else ', '.join(allowed_types)
|
||||||
|
print(f"Found {total_stickers} stickers. Mode: {mode_text}")
|
||||||
|
|
||||||
download_count = 0
|
download_count = 0
|
||||||
skipped_count = 0
|
skipped_count = 0
|
||||||
for i, sticker in enumerate(sticker_set.stickers):
|
for i, sticker in enumerate(sticker_set.stickers):
|
||||||
was_skipped = True # Ga ervan uit dat de sticker wordt overgeslagen, tenzij we iets downloaden
|
|
||||||
|
|
||||||
# 1. Check voor primaire sticker (webp, tgs, webm)
|
# GEWIJZIGD: Hoofdlogica gesplitst voor 'auto' en specifieke modi
|
||||||
primary_extension = ".webp"
|
if 'auto' in allowed_types:
|
||||||
if sticker.is_animated:
|
# --- AUTO MODE LOGICA ---
|
||||||
primary_extension = ".tgs"
|
file_to_download = None
|
||||||
elif sticker.is_video:
|
file_name = None
|
||||||
primary_extension = ".webm"
|
|
||||||
|
|
||||||
primary_type = primary_extension.strip('.')
|
if sticker.is_video:
|
||||||
if primary_type in allowed_types:
|
# Voor video-stickers, pak de originele .webm
|
||||||
was_skipped = False
|
|
||||||
file_to_download = await bot.get_file(sticker.file_id)
|
file_to_download = await bot.get_file(sticker.file_id)
|
||||||
file_name = f"{i+1:03d}{primary_extension}"
|
file_name = f"{i+1:03d}.webm"
|
||||||
file_path = os.path.join(pack_path, file_name)
|
elif sticker.is_animated:
|
||||||
|
# Voor geanimeerde stickers (.tgs), pak de .png thumbnail
|
||||||
|
if sticker.thumbnail:
|
||||||
|
file_to_download = await bot.get_file(sticker.thumbnail.file_id)
|
||||||
|
file_name = f"{i+1:03d}_thumb.png"
|
||||||
|
else:
|
||||||
|
# Voor statische stickers, pak de originele .webp
|
||||||
|
file_to_download = await bot.get_file(sticker.file_id)
|
||||||
|
file_name = f"{i+1:03d}.webp"
|
||||||
|
|
||||||
|
if file_to_download and file_name:
|
||||||
|
file_path = os.path.join(pack_path, file_name)
|
||||||
async with session.get(file_to_download.file_path) as response:
|
async with session.get(file_to_download.file_path) as response:
|
||||||
if response.status == 200:
|
if response.status == 200:
|
||||||
content = await response.read()
|
content = await response.read()
|
||||||
with open(file_path, 'wb') as f:
|
with open(file_path, 'wb') as f: f.write(content)
|
||||||
f.write(content)
|
|
||||||
download_count += 1
|
download_count += 1
|
||||||
|
else:
|
||||||
|
skipped_count += 1
|
||||||
|
|
||||||
|
else:
|
||||||
|
# --- SPECIFIEKE TYPES MODE LOGICA (DE OUDE CODE) ---
|
||||||
|
was_downloaded_this_run = False
|
||||||
|
|
||||||
|
# 1. Check voor primaire sticker (webp, tgs, webm)
|
||||||
|
primary_extension = ".webp"
|
||||||
|
if sticker.is_animated: primary_extension = ".tgs"
|
||||||
|
elif sticker.is_video: primary_extension = ".webm"
|
||||||
|
|
||||||
|
if primary_extension.strip('.') in allowed_types:
|
||||||
|
file = await bot.get_file(sticker.file_id)
|
||||||
|
file_path = os.path.join(pack_path, f"{i+1:03d}{primary_extension}")
|
||||||
|
async with session.get(file.file_path) as response:
|
||||||
|
if response.status == 200:
|
||||||
|
with open(file_path, 'wb') as f: f.write(await response.read())
|
||||||
|
was_downloaded_this_run = True
|
||||||
|
|
||||||
# 2. Check voor PNG thumbnail
|
# 2. Check voor PNG thumbnail
|
||||||
if 'png' in allowed_types and sticker.thumbnail:
|
if 'png' in allowed_types and sticker.thumbnail:
|
||||||
was_skipped = False
|
thumb = await bot.get_file(sticker.thumbnail.file_id)
|
||||||
thumb_to_download = await bot.get_file(sticker.thumbnail.file_id)
|
thumb_path = os.path.join(pack_path, f"{i+1:03d}_thumb.png")
|
||||||
thumb_name = f"{i+1:03d}_thumb.png"
|
async with session.get(thumb.file_path) as response:
|
||||||
thumb_path = os.path.join(pack_path, thumb_name)
|
|
||||||
|
|
||||||
async with session.get(thumb_to_download.file_path) as response:
|
|
||||||
if response.status == 200:
|
if response.status == 200:
|
||||||
content = await response.read()
|
with open(thumb_path, 'wb') as f: f.write(await response.read())
|
||||||
with open(thumb_path, 'wb') as f:
|
was_downloaded_this_run = True
|
||||||
f.write(content)
|
|
||||||
download_count += 1
|
|
||||||
|
|
||||||
# NIEUW: Verbeterde logging voor overgeslagen bestanden
|
if was_downloaded_this_run:
|
||||||
if was_skipped:
|
download_count += 1
|
||||||
|
else:
|
||||||
skipped_count += 1
|
skipped_count += 1
|
||||||
|
|
||||||
# Update de status op de console
|
|
||||||
print(f"\rProcessing... (sticker {i+1}/{total_stickers}, downloaded: {download_count}, skipped: {skipped_count})", end="", flush=True)
|
print(f"\rProcessing... (sticker {i+1}/{total_stickers}, downloaded: {download_count}, skipped: {skipped_count})", end="", flush=True)
|
||||||
|
|
||||||
print(f"\n✅ Download complete for pack: {pack_name}. Total files downloaded: {download_count}")
|
print(f"\n✅ Download complete for pack: {pack_name}. Total files downloaded: {download_count}")
|
||||||
@@ -85,9 +106,7 @@ async def download_single_pack(bot: Bot, session: aiohttp.ClientSession, sticker
|
|||||||
print(f"\nAn unexpected error occurred with pack '{pack_name}': {e}")
|
print(f"\nAn unexpected error occurred with pack '{pack_name}': {e}")
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
"""
|
""" Main function to run the downloader. """
|
||||||
Main asynchronous function to run the sticker downloader script.
|
|
||||||
"""
|
|
||||||
print("--- Telegram Sticker Pack Downloader ---")
|
print("--- Telegram Sticker Pack Downloader ---")
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
@@ -109,11 +128,14 @@ async def main():
|
|||||||
print(f"Error initializing bot. Is je token in .env correct? Details: {e}")
|
print(f"Error initializing bot. Is je token in .env correct? Details: {e}")
|
||||||
return
|
return
|
||||||
|
|
||||||
print("\nAvailable file types: webp (static), tgs (animated), webm (video), png (thumbnail)")
|
# GEWIJZIGD: 'auto' toegevoegd als optie
|
||||||
type_input = input("Enter desired file types (comma-separated), or type 'all': ").strip().lower()
|
print("\nAvailable file types: webp, tgs, webm, png")
|
||||||
|
type_input = input("Enter desired file types (comma-separated), 'all', or 'auto': ").strip().lower()
|
||||||
|
|
||||||
# GEWIJZIGD: Correcte logica voor 'all' of lege input
|
if type_input == 'auto':
|
||||||
if not type_input or type_input == 'all':
|
allowed_types = ['auto']
|
||||||
|
print("--> Auto mode selected. Choosing best format per sticker.")
|
||||||
|
elif not type_input or type_input == 'all':
|
||||||
allowed_types = ['webp', 'tgs', 'webm', 'png']
|
allowed_types = ['webp', 'tgs', 'webm', 'png']
|
||||||
print("--> Downloading all available types.")
|
print("--> Downloading all available types.")
|
||||||
else:
|
else:
|
||||||
@@ -128,31 +150,21 @@ async def main():
|
|||||||
|
|
||||||
if choice == '1':
|
if choice == '1':
|
||||||
sticker_url = input("Enter the Sticker Pack URL: ").strip()
|
sticker_url = input("Enter the Sticker Pack URL: ").strip()
|
||||||
if not sticker_url:
|
if sticker_url:
|
||||||
print("Error: Sticker URL cannot be empty.")
|
|
||||||
else:
|
|
||||||
await download_single_pack(bot, session, sticker_url, allowed_types)
|
await download_single_pack(bot, session, sticker_url, allowed_types)
|
||||||
|
|
||||||
elif choice == '2':
|
elif choice == '2':
|
||||||
try:
|
try:
|
||||||
with open('urls.txt', 'r') as f:
|
with open('urls.txt', 'r') as f: urls = [line.strip() for line in f if line.strip()]
|
||||||
urls = [line.strip() for line in f if line.strip()]
|
if not urls: print("'urls.txt' is empty."); return
|
||||||
|
|
||||||
if not urls:
|
|
||||||
print("'urls.txt' is empty.")
|
|
||||||
return
|
|
||||||
|
|
||||||
print(f"\nFound {len(urls)} URLs. Starting batch download...")
|
print(f"\nFound {len(urls)} URLs. Starting batch download...")
|
||||||
for url in urls:
|
for url in urls:
|
||||||
await download_single_pack(bot, session, url, allowed_types)
|
await download_single_pack(bot, session, url, allowed_types)
|
||||||
|
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
print("\nError: 'urls.txt' not found.")
|
print("\nError: 'urls.txt' not found.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"An error occurred while reading the file: {e}")
|
print(f"An error occurred while reading the file: {e}")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
print("Invalid choice. Please run the script again and enter 1 or 2.")
|
print("Invalid choice.")
|
||||||
|
|
||||||
print("\n--- Script finished ---")
|
print("\n--- Script finished ---")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user