Enhanced TikTok Video Downloader with Stop, Pause, and Resume Functionality
Introduction
In today's digital landscape, TikTok has emerged as one of the most popular social media platforms, with users constantly looking for ways to save and share their favorite videos. This article explores the development of a full-featured TikTok Video Downloader web application built with Python (Flask) and JavaScript.
Application Overview
Our TikTok Downloader offers these key features:
Batch downloading of multiple videos at once
Download controls including pause, resume, and stop functionality
Custom save locations with the "Save To..." feature
Cookie support for accessing restricted content
Real-time progress tracking with a progress bar
Comprehensive status updates
This is Python Code
from flask import Flask, render_template, request, jsonify, send_from_directory
import requests
import re
import json
import os
import time
from urllib.parse import unquote
from werkzeug.utils import secure_filename
import threading
from queue import Queue
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'downloads'
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB max upload
# Configuration
CONFIG_FILE = "config.json"
DEFAULT_SAVE_DIR = "downloads"
download_queue = Queue()
completed_count = 0
total_video_count = 0
# Global variables for controlling the download thread
stop_flag = False
pause_flag = False
current_downloading = None
def read_cookies(filename="cookie.txt"):
cookies = {}
try:
with open(filename, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
cookies[key.strip()] = value.strip()
return cookies
except Exception as e:
print(f"❌ Cookie error: {e}")
return None
def load_config():
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
return json.load(f)
return {"save_directory": DEFAULT_SAVE_DIR}
def save_config(config):
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f)
config = load_config()
save_directory = config.get("save_directory", DEFAULT_SAVE_DIR)
def clean_filename(filename):
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '')
return filename.strip()
def get_video_data(video_url, cookies):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Referer": "https://www.tiktok.com/",
}
try:
video_id = re.search(r'/video/(\d+)', video_url)
if not video_id:
video_id = re.search(r'%2Fvideo%2F(\d+)', video_url)
video_id = video_id.group(1) if video_id else None
if video_id:
api_url = f"https://api16-normal-c-useast1a.tiktokv.com/aweme/v1/feed/?aweme_id={video_id}"
response = requests.get(api_url, headers=headers, cookies=cookies)
if response.status_code == 200:
data = response.json()
if data.get('aweme_list'):
return data['aweme_list'][0]
response = requests.get(video_url, headers=headers, cookies=cookies)
html = response.text
json_data = re.search(r'<script id="SIGI_STATE" type="application/json">(.*?)</script>', html)
if json_data:
data = json.loads(json_data.group(1))
video_id = list(data.get("ItemModule", {}).keys())[0]
video_data = data["ItemModule"][video_id]
if "video" in video_data and "playAddr" in video_data["video"]:
play_addr = video_data["video"]["playAddr"]
if isinstance(play_addr, str):
play_addr = {"url_list": [play_addr]}
elif isinstance(play_addr, dict):
if "url_list" not in play_addr:
play_addr["url_list"] = [play_addr.get("url", "")]
return {
"video": {"play_addr": play_addr},
"author": {"unique_id": video_data.get("author", "tiktok_user")},
"desc": video_data.get("desc", "No Title")
}
json_data = re.search(r'<script id="__UNIVERSAL_DATA_FOR_REHYDRATION__" type="application/json">(.*?)</script>', html)
if json_data:
data = json.loads(json_data.group(1))
video_info = data.get("__DEFAULT_SCOPE__", {}).get("webapp.video-detail", {}).get("itemInfo", {}).get("itemStruct", {})
if video_info:
return {
"video": {"play_addr": {"url_list": [video_info.get("video", {}).get("playAddr", "")]}},
"author": {"unique_id": video_info.get("author", {}).get("uniqueId", "tiktok_user")},
"desc": video_info.get("desc", "No Title")
}
play_addr = re.search(r'"playAddr":"(https:[^"]+)"', html)
if play_addr:
return {
"video": {"play_addr": {"url_list": [play_addr.group(1).replace('\\u002F', '/')]}},
"author": {"unique_id": "tiktok_user"},
"desc": "No Title"
}
raise Exception("Could not extract video data")
except Exception as e:
raise Exception(f"Failed to get video data: {str(e)}")
@app.route('/')
def index():
return render_template('index.html')
@app.route('/download', methods=['POST'])
def start_download():
global total_video_count, completed_count, stop_flag, pause_flag, save_directory
# Reset flags
stop_flag = False
pause_flag = False
urls = request.form.get('urls', '').strip().splitlines()
if not urls or not urls[0]:
return jsonify({'error': 'Please enter at least one URL.'}), 400
# Clear queue and add new URLs
with download_queue.mutex:
download_queue.queue.clear()
for url in urls:
if url.strip():
download_queue.put(url.strip())
# Start download thread
threading.Thread(target=process_queue_web, daemon=True).start()
return jsonify({'message': 'Download started'})
@app.route('/stop', methods=['POST'])
def stop_download():
global stop_flag
stop_flag = True
return jsonify({'message': 'Stopping download...'})
@app.route('/pause', methods=['POST'])
def pause_download():
global pause_flag
pause_flag = True
return jsonify({'message': 'Download paused'})
@app.route('/resume', methods=['POST'])
def resume_download():
global pause_flag
pause_flag = False
return jsonify({'message': 'Download resumed'})
@app.route('/clear', methods=['POST'])
def clear_all():
global stop_flag, completed_count, total_video_count, pause_flag
stop_flag = True
time.sleep(1) # Give time to stop
# Reset counters and flags
stop_flag = False
pause_flag = False
completed_count = 0
total_video_count = 0
# Clear the queue
with download_queue.mutex:
download_queue.queue.clear()
return jsonify({'message': 'Cleared all'})
@app.route('/get_save_directory', methods=['GET'])
def get_save_directory():
config = load_config()
return jsonify({'directory': config.get('save_directory', DEFAULT_SAVE_DIR)})
@app.route('/set_save_directory', methods=['POST'])
def set_save_directory():
global save_directory
directory = request.json.get('directory')
if directory:
config = load_config()
config['save_directory'] = directory
save_config(config)
save_directory = directory
# Create directory if it doesn't exist
if not os.path.exists(directory):
os.makedirs(directory)
return jsonify({'message': 'Directory updated successfully', 'directory': directory})
return jsonify({'error': 'No directory provided'}), 400
@app.route('/status')
def get_status():
return jsonify({
'completed': completed_count,
'total': total_video_count,
'paused': pause_flag,
'stopped': stop_flag
})
@app.route('/downloads/<path:filename>')
def download_file(filename):
config = load_config()
return send_from_directory(config.get('save_directory', DEFAULT_SAVE_DIR), filename)
def process_queue_web():
global completed_count, total_video_count, stop_flag, pause_flag, save_directory
cookies = read_cookies()
if not cookies:
return
total_video_count = download_queue.qsize()
config = load_config()
save_directory = config.get('save_directory', DEFAULT_SAVE_DIR)
while not download_queue.empty() and not stop_flag:
url = download_queue.get()
try:
video_data = get_video_data(url, cookies)
download_url = None
if "video" in video_data and "play_addr" in video_data["video"]:
play_addr = video_data["video"]["play_addr"]
if isinstance(play_addr, dict) and "url_list" in play_addr:
download_url = play_addr["url_list"][0]
elif isinstance(play_addr, str):
download_url = play_addr
if not download_url:
raise Exception("Could not find video download URL")
username = video_data["author"]["unique_id"]
video_title = clean_filename(video_data.get("desc", "No Title"))
video_id = re.search(r'/video/(\d+)', url)
if not video_id:
video_id = re.search(r'%2Fvideo%2F(\d+)', url)
video_id = video_id.group(1) if video_id else "unknown"
user_folder = os.path.join(save_directory, username)
os.makedirs(user_folder, exist_ok=True)
filename = f"{video_title[:50]}_{video_id}.mp4" if video_title != "No Title" else f"{video_id}.mp4"
filepath = os.path.join(user_folder, filename)
if os.path.exists(filepath):
completed_count += 1
continue
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Referer": "https://www.tiktok.com/"
}
global current_downloading
current_downloading = filepath
with requests.get(download_url, stream=True, headers=headers) as response:
if response.status_code != 200:
raise Exception(f"Server returned status code {response.status_code}")
total = int(response.headers.get('content-length', 0))
downloaded = 0
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024):
if stop_flag:
f.close()
if os.path.exists(filepath):
os.remove(filepath)
break
while pause_flag:
time.sleep(1)
if stop_flag:
f.close()
if os.path.exists(filepath):
os.remove(filepath)
break
if chunk:
f.write(chunk)
downloaded += len(chunk)
if not stop_flag:
completed_count += 1
except Exception as e:
print(f"Error downloading {url}: {str(e)}")
if stop_flag:
break
if __name__ == '__main__':
if not os.path.exists(DEFAULT_SAVE_DIR):
os.makedirs(DEFAULT_SAVE_DIR)
app.run(debug=True)
No comments