Blog Pages

Telegram Bot File Vault

 pip install python-telegram-bot==13.15


import os
import sqlite3
import uuid
import logging
from telegram import Update, File
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext

# === Config ===
BOT_TOKEN = 'YOUR_BOT_TOKEN_HERE'
FILES_DIR = "files"
os.makedirs(FILES_DIR, exist_ok=True)

# === Logger ===
logging.basicConfig(level=logging.INFO)

# === Database ===
conn = sqlite3.connect("vault.db", check_same_thread=False)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS files (
    id TEXT PRIMARY KEY,
    user_id INTEGER,
    file_name TEXT,
    file_path TEXT,
    tags TEXT
)
''')
conn.commit()

# === Command Handlers ===

def start(update: Update, context: CallbackContext):
    update.message.reply_text("📁 Welcome to File Vault Bot!\nSend me a file and add tags in the caption.")

def handle_file(update: Update, context: CallbackContext):
    file = update.message.document or update.message.photo[-1] if update.message.photo else None
    caption = update.message.caption or ""
    tags = caption.strip() if caption else "untagged"
    
    if not file:
        update.message.reply_text("❌ Unsupported file type.")
        return

    file_id = str(uuid.uuid4())
    file_name = file.file_name if hasattr(file, 'file_name') else f"{file_id}.jpg"
    file_path = os.path.join(FILES_DIR, file_name)

    telegram_file: File = context.bot.get_file(file.file_id)
    telegram_file.download(file_path)

    cursor.execute("INSERT INTO files VALUES (?, ?, ?, ?, ?)",
                   (file_id, update.message.from_user.id, file_name, file_path, tags))
    conn.commit()

    update.message.reply_text(f"✅ File saved with ID: {file_id} and tags: {tags}")

def get_file(update: Update, context: CallbackContext):
    if not context.args:
        update.message.reply_text("Usage: /get filename")
        return

    file_name = " ".join(context.args)
    cursor.execute("SELECT file_path FROM files WHERE file_name=?", (file_name,))
    result = cursor.fetchone()

    if result:
        update.message.reply_document(open(result[0], "rb"))
    else:
        update.message.reply_text("❌ File not found.")

def search_by_tag(update: Update, context: CallbackContext):
    if not context.args:
        update.message.reply_text("Usage: /search tag")
        return

    tag = " ".join(context.args)
    cursor.execute("SELECT file_name, tags FROM files")
    results = cursor.fetchall()

    found = [name for name, tags in results if tag.lower() in tags.lower()]
    if found:
        update.message.reply_text("🔍 Matching files:\n" + "\n".join(found))
    else:
        update.message.reply_text("❌ No matching files.")

# === Main Bot ===

def main():
    updater = Updater(BOT_TOKEN)
    dp = updater.dispatcher

    dp.add_handler(CommandHandler("start", start))
    dp.add_handler(CommandHandler("get", get_file))
    dp.add_handler(CommandHandler("search", search_by_tag))
    dp.add_handler(MessageHandler(Filters.document | Filters.photo, handle_file))

    updater.start_polling()
    print("Bot started.")
    updater.idle()

if __name__ == '__main__':
    main()

No comments:

Post a Comment