pip install python-telegram-bot==13.15
import os
import sqlite3
import uuid
import logging
from telegram import Update, File
from telegram.ext import Updater, CommandHandler, MessageHandler, Filters, CallbackContext
# === Config ===
BOT_TOKEN = 'YOUR_BOT_TOKEN_HERE'
FILES_DIR = "files"
os.makedirs(FILES_DIR, exist_ok=True)
# === Logger ===
logging.basicConfig(level=logging.INFO)
# === Database ===
conn = sqlite3.connect("vault.db", check_same_thread=False)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS files (
id TEXT PRIMARY KEY,
user_id INTEGER,
file_name TEXT,
file_path TEXT,
tags TEXT
)
''')
conn.commit()
# === Command Handlers ===
def start(update: Update, context: CallbackContext):
update.message.reply_text("📁 Welcome to File Vault Bot!\nSend me a file and add tags in the caption.")
def handle_file(update: Update, context: CallbackContext):
file = update.message.document or update.message.photo[-1] if update.message.photo else None
caption = update.message.caption or ""
tags = caption.strip() if caption else "untagged"
if not file:
update.message.reply_text("❌ Unsupported file type.")
return
file_id = str(uuid.uuid4())
file_name = file.file_name if hasattr(file, 'file_name') else f"{file_id}.jpg"
file_path = os.path.join(FILES_DIR, file_name)
telegram_file: File = context.bot.get_file(file.file_id)
telegram_file.download(file_path)
cursor.execute("INSERT INTO files VALUES (?, ?, ?, ?, ?)",
(file_id, update.message.from_user.id, file_name, file_path, tags))
conn.commit()
update.message.reply_text(f"✅ File saved with ID: {file_id} and tags: {tags}")
def get_file(update: Update, context: CallbackContext):
if not context.args:
update.message.reply_text("Usage: /get filename")
return
file_name = " ".join(context.args)
cursor.execute("SELECT file_path FROM files WHERE file_name=?", (file_name,))
result = cursor.fetchone()
if result:
update.message.reply_document(open(result[0], "rb"))
else:
update.message.reply_text("❌ File not found.")
def search_by_tag(update: Update, context: CallbackContext):
if not context.args:
update.message.reply_text("Usage: /search tag")
return
tag = " ".join(context.args)
cursor.execute("SELECT file_name, tags FROM files")
results = cursor.fetchall()
found = [name for name, tags in results if tag.lower() in tags.lower()]
if found:
update.message.reply_text("🔍 Matching files:\n" + "\n".join(found))
else:
update.message.reply_text("❌ No matching files.")
# === Main Bot ===
def main():
updater = Updater(BOT_TOKEN)
dp = updater.dispatcher
dp.add_handler(CommandHandler("start", start))
dp.add_handler(CommandHandler("get", get_file))
dp.add_handler(CommandHandler("search", search_by_tag))
dp.add_handler(MessageHandler(Filters.document | Filters.photo, handle_file))
updater.start_polling()
print("Bot started.")
updater.idle()
if __name__ == '__main__':
main()
No comments:
Post a Comment