""" asset_downloader.py — скачивает Roblox-ассеты с CDN и кеширует локально. Использование: from asset_downloader import AssetDownloader dl = AssetDownloader(db_conn, storage_root='/opt/roblox-assets') asset = dl.fetch_sync(rbx_asset_id=12345) print(asset.public_url) # 'https://assets.rublox.pro/roblox/raw/sha256.glb' Дедупликация: 1. По rbx_asset_id (первичный лукап в БД). 2. По SHA256 контента (если разные id указывают на тот же файл). Кеширование: raw → /opt/roblox-assets/raw//. converted (для mesh→glb, csg→glb) → /opt/roblox-assets/converted/.glb Источник: Roblox AssetDelivery API. - https://assetdelivery.roblox.com/v1/asset?id= → бинарь (с редиректом на CDN) - https://assetdelivery.roblox.com/v2/assetId/ → JSON c metadata При первой скачке сохраняем в БД, потом — мгновенный возврат public_url. """ import os import hashlib import logging import re import time from dataclasses import dataclass from typing import Optional, Dict import requests import psycopg2 from psycopg2.extras import RealDictCursor from asset_proxy import get_proxy_config, get_http_proxies, PendingDownload logger = logging.getLogger(__name__) # Roblox CDN endpoints (используются в direct mode; см. asset_proxy.py) ASSETDELIVERY_V1 = 'https://assetdelivery.roblox.com/v1/asset' ASSETDELIVERY_V2 = 'https://assetdelivery.roblox.com/v2/assetId' # Сопоставление content-type → расширение CONTENT_TYPE_MAP = { 'image/png': ('texture', '.png'), 'image/jpeg': ('texture', '.jpg'), 'image/jpg': ('texture', '.jpg'), 'image/webp': ('texture', '.webp'), 'image/bmp': ('texture', '.bmp'), 'image/x-targa': ('texture', '.tga'), 'audio/mpeg': ('sound', '.mp3'), 'audio/mp3': ('sound', '.mp3'), 'audio/ogg': ('sound', '.ogg'), 'audio/wav': ('sound', '.wav'), 'audio/x-wav': ('sound', '.wav'), 'application/octet-stream': ('mesh', '.mesh'), # Roblox mesh обычно это } # Базовые public URL'ы (для последующего конверта в Cloudflare) PUBLIC_BASE = 'https://assets.rublox.pro/roblox' @dataclass class AssetRecord: rbx_asset_id: int sha256_raw: str asset_kind: str content_type: str raw_size_bytes: int raw_path: str public_url: str converted_path: Optional[str] = None converted_sha256: Optional[str] = None cached: bool = False # True если был лукап в БД, False если только что скачан class AssetDownloader: """ Скачивает Roblox-ассеты с дедупликацией. Thread-safe (через PG-транзакции). """ def __init__( self, db_dsn: str, storage_root: str = '/opt/roblox-assets', public_base: str = PUBLIC_BASE, request_timeout: int = 30, max_retries: int = 3, user_agent: str = 'Roblox/WinInet', # притворяемся Roblox-клиентом ): self.db_dsn = db_dsn self.storage_root = storage_root self.public_base = public_base.rstrip('/') self.request_timeout = request_timeout self.max_retries = max_retries self.session = requests.Session() self.session.headers.update({ 'User-Agent': user_agent, 'Accept': '*/*', }) # Если есть .ROBLOSECURITY cookie — авторизуемся. # Без неё большинство ассетов отдаёт 401. # Cookie получается из браузера: F12 → Application → Cookies → roblox.com → .ROBLOSECURITY. roblosecurity = os.environ.get('ROBLOX_SECURITY_COOKIE', '').strip() if roblosecurity: self.session.cookies.set( '.ROBLOSECURITY', roblosecurity, domain='.roblox.com', path='/', ) logger.info('AssetDownloader: .ROBLOSECURITY cookie loaded (auth enabled)') else: logger.warning('AssetDownloader: no .ROBLOSECURITY cookie — most assets will return 401') # Создаём корневые папки for sub in ('raw', 'converted', 'failed'): os.makedirs(os.path.join(storage_root, sub), exist_ok=True) # ─── публичный API ─── def fetch_sync(self, rbx_asset_id: int) -> AssetRecord: """Скачать (или взять из кеша) один ассет. Бросает исключение при провале. Если ROBLOX_PROXY_MODE=disabled — бросает PendingDownload (но запись в БД создаётся со status='pending', чтобы потом можно было скачать batch'ем). """ cached = self._lookup(rbx_asset_id) if cached: self._bump_last_used(rbx_asset_id) cached.cached = True return cached # Не было в кеше — скачиваем proxy_cfg = get_proxy_config() if proxy_cfg.mode == 'disabled': self._insert_pending(rbx_asset_id) raise PendingDownload(rbx_asset_id) raw_bytes, content_type = self._download_raw(rbx_asset_id, proxy_cfg) sha256 = hashlib.sha256(raw_bytes).hexdigest() # Проверим: может уже есть в БД с другим rbx_asset_id но тем же sha256 existing_by_sha = self._lookup_by_sha256(sha256) if existing_by_sha: # Записываем алиас: новый rbx_asset_id → существующая запись # Просто вставим новую запись с теми же путями new_record = self._insert_alias(rbx_asset_id, existing_by_sha) new_record.cached = True return new_record # Это новый файл — сохраняем asset_kind, ext = self._classify(content_type, raw_bytes) raw_path = self._save_raw(sha256, ext, raw_bytes) public_url = f'{self.public_base}/raw/{sha256[:2]}/{sha256}{ext}' record = AssetRecord( rbx_asset_id=rbx_asset_id, sha256_raw=sha256, asset_kind=asset_kind, content_type=content_type, raw_size_bytes=len(raw_bytes), raw_path=raw_path, public_url=public_url, cached=False, ) self._insert(record) return record # ─── PG helpers ─── def _connect(self): return psycopg2.connect(self.db_dsn) def _lookup(self, rbx_asset_id: int) -> Optional[AssetRecord]: with self._connect() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute( "SELECT rbx_asset_id, sha256_raw, asset_kind, content_type, " "raw_size_bytes, raw_path, converted_path, converted_sha256, public_url " "FROM roblox_assets WHERE rbx_asset_id = %s", (rbx_asset_id,), ) row = cur.fetchone() if not row: return None return AssetRecord( rbx_asset_id=row['rbx_asset_id'], sha256_raw=row['sha256_raw'], asset_kind=row['asset_kind'], content_type=row['content_type'], raw_size_bytes=row['raw_size_bytes'], raw_path=row['raw_path'], public_url=row['public_url'], converted_path=row['converted_path'], converted_sha256=row['converted_sha256'], ) def _lookup_by_sha256(self, sha256: str) -> Optional[AssetRecord]: with self._connect() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute( "SELECT rbx_asset_id, sha256_raw, asset_kind, content_type, " "raw_size_bytes, raw_path, converted_path, converted_sha256, public_url " "FROM roblox_assets WHERE sha256_raw = %s LIMIT 1", (sha256,), ) row = cur.fetchone() if not row: return None return AssetRecord(**{k: row[k] for k in row.keys() if k != 'cached'}) def _bump_last_used(self, rbx_asset_id: int) -> None: with self._connect() as conn: with conn.cursor() as cur: cur.execute( "UPDATE roblox_assets SET last_used_at = NOW() WHERE rbx_asset_id = %s", (rbx_asset_id,), ) conn.commit() def _insert(self, r: AssetRecord) -> None: with self._connect() as conn: with conn.cursor() as cur: cur.execute( "INSERT INTO roblox_assets " "(rbx_asset_id, sha256_raw, asset_kind, content_type, raw_size_bytes, " " raw_path, public_url, converted_path, converted_sha256) " "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) " "ON CONFLICT (rbx_asset_id) DO NOTHING", (r.rbx_asset_id, r.sha256_raw, r.asset_kind, r.content_type, r.raw_size_bytes, r.raw_path, r.public_url, r.converted_path, r.converted_sha256), ) conn.commit() def _insert_pending(self, rbx_asset_id: int) -> None: """Создаёт plceholder-запись для ассета который не скачали (proxy disabled). Поля sha256, content_type, raw_path, public_url заполняются заглушками. asset_kind='pending'. Когда proxy будет настроен, batch-скрипт обновит запись. """ with self._connect() as conn: with conn.cursor() as cur: cur.execute( "INSERT INTO roblox_assets " "(rbx_asset_id, sha256_raw, asset_kind, content_type, raw_size_bytes, " " raw_path, public_url, error_msg) " "VALUES (%s, %s, 'pending', 'application/octet-stream', 0, '', '', " "'proxy disabled — asset not downloaded yet') " "ON CONFLICT (rbx_asset_id) DO NOTHING", (rbx_asset_id, '0' * 64), ) conn.commit() def _insert_alias(self, new_id: int, existing: AssetRecord) -> AssetRecord: """Сохраняет новый rbx_asset_id указывая на тот же файл.""" alias = AssetRecord( rbx_asset_id=new_id, sha256_raw=existing.sha256_raw, asset_kind=existing.asset_kind, content_type=existing.content_type, raw_size_bytes=existing.raw_size_bytes, raw_path=existing.raw_path, public_url=existing.public_url, converted_path=existing.converted_path, converted_sha256=existing.converted_sha256, ) self._insert(alias) return alias # ─── HTTP скачка ─── def _download_raw(self, rbx_asset_id: int, proxy_cfg) -> tuple: """Скачивает с Roblox CDN (или через CF Worker, или через HTTP proxy). Возвращает (raw_bytes, content_type). """ url = proxy_cfg.build_url(rbx_asset_id) headers = {**self.session.headers, **proxy_cfg.headers} # Для режима http_proxy — передаём proxies в requests proxies = get_http_proxies() if proxy_cfg.mode == 'http_proxy' else None last_exc = None for attempt in range(self.max_retries): try: resp = self.session.get(url, timeout=self.request_timeout, allow_redirects=True, headers=headers, proxies=proxies) if resp.status_code == 404: raise RuntimeError(f"asset {rbx_asset_id}: 404 Not Found (asset deleted or private)") if resp.status_code == 403: raise RuntimeError(f"asset {rbx_asset_id}: 403 Forbidden (private or moderation)") if resp.status_code == 401: raise RuntimeError(f"asset {rbx_asset_id}: 401 Unauthorized (requires Roblox auth)") if resp.status_code != 200: raise RuntimeError(f"asset {rbx_asset_id}: HTTP {resp.status_code}") content_type = resp.headers.get('Content-Type', 'application/octet-stream').split(';')[0].strip() return resp.content, content_type except (requests.RequestException, RuntimeError) as e: last_exc = e # 4xx — нет смысла retry if isinstance(e, RuntimeError) and ('404' in str(e) or '403' in str(e) or '401' in str(e)): raise wait = 2 ** attempt logger.warning(f"asset {rbx_asset_id} attempt {attempt+1} failed: {e}; retry in {wait}s") time.sleep(wait) raise RuntimeError(f"asset {rbx_asset_id}: max retries exceeded: {last_exc}") def _classify(self, content_type: str, raw_bytes: bytes) -> tuple: """Возвращает (asset_kind, extension).""" if content_type in CONTENT_TYPE_MAP: return CONTENT_TYPE_MAP[content_type] # Эвристика по magic bytes if raw_bytes.startswith(b'\x89PNG\r\n\x1a\n'): return ('texture', '.png') if raw_bytes[:3] == b'\xff\xd8\xff': return ('texture', '.jpg') if raw_bytes[:4] == b'RIFF' and raw_bytes[8:12] == b'WAVE': return ('sound', '.wav') if raw_bytes[:3] == b'ID3' or raw_bytes[:2] == b'\xff\xfb': return ('sound', '.mp3') if raw_bytes[:4] == b'OggS': return ('sound', '.ogg') # Roblox mesh: начинается с ASCII "version 1.00\n" или "version 2.00\n" или # бинарь начинающийся с magic. if raw_bytes[:8].startswith(b'version '): return ('mesh', '.mesh') # CSG: начинается с magic "CSGPHS" if raw_bytes[:6] == b'CSGPHS': return ('csg', '.csg') # Animation: KeyframeSequence, raw — обычно XML или binary с magic. if raw_bytes[:5] == b' str: subdir = sha256[:2] dir_path = os.path.join(self.storage_root, 'raw', subdir) os.makedirs(dir_path, exist_ok=True) file_path = os.path.join(dir_path, f'{sha256}{ext}') if not os.path.exists(file_path): # на всякий — atomic write через tmp tmp = file_path + '.tmp' with open(tmp, 'wb') as f: f.write(data) os.rename(tmp, file_path) return file_path # ─── CLI для тестов ─── if __name__ == '__main__': import sys import argparse parser = argparse.ArgumentParser() parser.add_argument('rbx_asset_id', type=int, help='Roblox asset ID') parser.add_argument('--db-dsn', default=os.environ.get('PG_DSN', ''), help='Postgres DSN, например "host=192.168.1.117 user=min password=... dbname=storys_db"') parser.add_argument('--storage', default='/opt/roblox-assets') args = parser.parse_args() if not args.db_dsn: print("error: provide --db-dsn or PG_DSN env var") sys.exit(1) logging.basicConfig(level=logging.INFO) dl = AssetDownloader(db_dsn=args.db_dsn, storage_root=args.storage) rec = dl.fetch_sync(args.rbx_asset_id) print(f" rbx_asset_id: {rec.rbx_asset_id}") print(f" sha256: {rec.sha256_raw}") print(f" kind: {rec.asset_kind}") print(f" content-type: {rec.content_type}") print(f" size: {rec.raw_size_bytes} bytes") print(f" raw_path: {rec.raw_path}") print(f" public_url: {rec.public_url}") print(f" cached: {rec.cached}")