Git

유튜브 조회수 봇 조회수 늘리기 git youtube view bot

sailorCat 2023. 5. 9. 06:24
728x90

 

 

git 에서 재미있는 코드를 발견했다.

조회수를 늘려주는 유튜브 뷰봇이고 같은 계열 봇들 중에 가장 별을 많이 받은 코드이다.

코드를 살펴보고 다운받아서 실행시켜 보았다.

 

 

https://github.com/MShawon/YouTube-Viewer

 

GitHub - MShawon/YouTube-Viewer: A multithreaded view bot for YouTube

A multithreaded view bot for YouTube. Contribute to MShawon/YouTube-Viewer development by creating an account on GitHub.

github.com

 

 

youtube_viewer.exe 로 실행된다.

urls.txt 그리고 search.txt 여기에 조회수를 올릴 주소를 작성한다.

killdrive.bat는 모든 실행을 마치고 실행되고 있는 전체 크롬 프로세스를 종료시킨다.

 

사용할 수 있는 프록시가 없기 때문에 무료 프록시 리스트를 proxy_free.txt로 저장했다.

무료 프록시를 사용해서 proxy_check에서 걸린다.

하지만 어떻게 실행되는지 알 수 있었다.

위의 내용을 자세히 설명한다면 실제로 사용하는 사람이 있을 수 있기에 생략한다.

 

 

"""
MIT License
Copyright (c) 2021-2023 MShawon
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import os
import shutil
import sys
from concurrent.futures import ThreadPoolExecutor, wait
from glob import glob
from time import sleep

import requests
from fake_headers import Headers

os.system("")


class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKCYAN = '\033[96m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'


print(bcolors.OKGREEN + """
  ____                                    
 |  _ \ _ __ _____  ___   _               
 | |_) | '__/ _ \ \/ / | | |              
 |  __/| | | (_) >  <| |_| |              
 |_|   |_|_ \___/_/\_\\__, |_             
      / ___| |__   ___|___/| | _____ _ __ 
     | |   | '_ \ / _ \/ __| |/ / _ \ '__|
     | |___| | | |  __/ (__|   <  __/ |   
      \____|_| |_|\___|\___|_|\_\___|_|   
                                          
""" + bcolors.ENDC)

print(bcolors.OKCYAN + """
[ GitHub : https://github.com/MShawon/YouTube-Viewer ]
""" + bcolors.ENDC)


checked = {}
cancel_all = False


def backup():
    try:
        shutil.copy('GoodProxy.txt', 'ProxyBackup.txt')
        print(bcolors.WARNING +
              'GoodProxy.txt backed up in ProxyBackup.txt' + bcolors.ENDC)
    except Exception:
        pass

    print('', file=open('GoodProxy.txt', 'w'))


def clean_exe_temp(folder):
    try:
        temp_name = sys._MEIPASS.split('\\')[-1]
    except Exception:
        temp_name = None

    for f in glob(os.path.join('temp', folder, '*')):
        if temp_name not in f:
            shutil.rmtree(f, ignore_errors=True)


def load_proxy():
    proxies = []

    filename = input(bcolors.OKBLUE +
                     'Enter your proxy file name: ' + bcolors.ENDC)

    if not os.path.isfile(filename) and filename[-4:] != '.txt':
        filename = f'{filename}.txt'

    try:
        with open(filename, encoding="utf-8") as fh:
            loaded = [x.strip() for x in fh if x.strip() != '']
    except Exception as e:
        print(bcolors.FAIL + str(e) + bcolors.ENDC)
        input('')
        sys.exit()

    for lines in loaded:
        if lines.count(':') == 3:
            split = lines.split(':')
            lines = f'{split[2]}:{split[-1]}@{split[0]}:{split[1]}'
        proxies.append(lines)

    return proxies


def main_checker(proxy_type, proxy, position):
    if cancel_all:
        raise KeyboardInterrupt

    checked[position] = None

    try:
        proxy_dict = {
            "http": f"{proxy_type}://{proxy}",
            "https": f"{proxy_type}://{proxy}",
        }

        header = Headers(
            headers=False
        ).generate()
        agent = header['User-Agent']

        headers = {
            'User-Agent': f'{agent}',
        }

        response = requests.get(
            'https://www.youtube.com/', headers=headers, proxies=proxy_dict, timeout=30)
        status = response.status_code

        if status != 200:
            raise Exception(status)

        print(bcolors.OKBLUE + f"Worker {position+1} | " + bcolors.OKGREEN +
              f'{proxy} | GOOD | Type : {proxy_type} | Response : {status}' + bcolors.ENDC)

        print(f'{proxy}|{proxy_type}', file=open('GoodProxy.txt', 'a'))

    except Exception as e:
        try:
            e = int(e.args[0])
        except Exception:
            e = ''
        print(bcolors.OKBLUE + f"Worker {position+1} | " + bcolors.FAIL +
              f'{proxy} | {proxy_type} | BAD | {e}' + bcolors.ENDC)
        checked[position] = proxy_type


def proxy_check(position):
    sleep(2)
    proxy = proxy_list[position]

    if '|' in proxy:
        splitted = proxy.split('|')
        main_checker(splitted[-1], splitted[0], position)
    else:
        main_checker('http', proxy, position)
        if checked[position] == 'http':
            main_checker('socks4', proxy, position)
        if checked[position] == 'socks4':
            main_checker('socks5', proxy, position)


def main():
    global cancel_all

    cancel_all = False
    pool_number = [i for i in range(total_proxies)]

    with ThreadPoolExecutor(max_workers=threads) as executor:
        futures = [executor.submit(proxy_check, position)
                   for position in pool_number]
        done, not_done = wait(futures, timeout=0)
        try:
            while not_done:
                freshly_done, not_done = wait(not_done, timeout=5)
                done |= freshly_done
        except KeyboardInterrupt:
            print(bcolors.WARNING +
                  'Hold on!!! Allow me a moment to finish the running threads' + bcolors.ENDC)
            cancel_all = True
            for future in not_done:
                _ = future.cancel()
            _ = wait(not_done, timeout=None)
            raise KeyboardInterrupt
        except IndexError:
            print(bcolors.WARNING + 'Number of proxies are less than threads. Provide more proxies or less threads. ' + bcolors.ENDC)


if __name__ == '__main__':

    clean_exe_temp(folder='proxy_check')
    backup()

    try:
        threads = int(
            input(bcolors.OKBLUE+'Threads (recommended = 100): ' + bcolors.ENDC))
    except Exception:
        threads = 100

    proxy_list = load_proxy()
    # removing empty & duplicate proxies
    proxy_list = list(set(filter(None, proxy_list)))

    total_proxies = len(proxy_list)
    print(bcolors.OKCYAN +
          f'Total unique proxies : {total_proxies}' + bcolors.ENDC)

    try:
        main()
    except KeyboardInterrupt:
        sys.exit()

 

뷰봇의 proxy_check.py 의 원리는 txt 파일에 저장된 임의의 프록시 ip 리스트를 하나씩 로딩하여 토큰을 읽어들인다.

 

실행시켰을 때 가장 먼저 temp 폴더를 비우고 활동 시작을 백업하기 시작한다.

프록시를 로드하고 토큰을 읽어 그 리스트를 만든다.

전체 프록시 리스트의 길이를 재고 main 코드를 실행시킨다.

 

main 코드에서는 프록시 리스트를 쓰레드로 비동기적으로 호출한다.

프록시 리스트를 사용할 수 있는지 판단을 하고 사용자에게 알린다.

 

"""
MIT License
Copyright (c) 2021-2023 MShawon
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import io
import json
import logging
import re
import textwrap
from concurrent.futures import ThreadPoolExecutor, wait
from time import gmtime, sleep, strftime, time

import psutil
from fake_headers import Headers, browsers
from faker import Faker
from requests.exceptions import RequestException
from tabulate import tabulate
from undetected_chromedriver.patcher import Patcher

from youtubeviewer import website
from youtubeviewer.basics import *
from youtubeviewer.config import create_config
from youtubeviewer.database import *
from youtubeviewer.download_driver import *
from youtubeviewer.load_files import *
from youtubeviewer.proxies import *

log = logging.getLogger('werkzeug')
log.disabled = True

SCRIPT_VERSION = '1.8.0'

print(bcolors.OKGREEN + """
Yb  dP  dP"Yb  88   88 888888 88   88 88""Yb 888888
 YbdP  dP   Yb 88   88   88   88   88 88__dP 88__
  8P   Yb   dP Y8   8P   88   Y8   8P 88""Yb 88""
 dP     YbodP  `YbodP'   88   `YbodP' 88oodP 888888
                        Yb    dP 88 888888 Yb        dP 888888 88""Yb
                         Yb  dP  88 88__    Yb  db  dP  88__   88__dP
                          YbdP   88 88""     YbdPYbdP   88""   88"Yb
                           YP    88 888888    YP  YP    888888 88  Yb
""" + bcolors.ENDC)

print(bcolors.OKCYAN + """
           [ GitHub : https://github.com/MShawon/YouTube-Viewer ]
""" + bcolors.ENDC)

print(bcolors.WARNING + f"""
+{'-'*26} Version: {SCRIPT_VERSION} {'-'*26}+
""" + bcolors.ENDC)

proxy = None
status = None
start_time = None
cancel_all = False

urls = []
queries = []
suggested = []

hash_urls = None
hash_queries = None
hash_config = None

driver_dict = {}
duration_dict = {}
checked = {}
summary = {}
video_statistics = {}
view = []
bad_proxies = []
used_proxies = []
temp_folders = []
console = []

threads = 0
views = 100

fake = Faker()
cwd = os.getcwd()
patched_drivers = os.path.join(cwd, 'patched_drivers')
config_path = os.path.join(cwd, 'config.json')
driver_identifier = os.path.join(cwd, 'patched_drivers', 'chromedriver')

DATABASE = os.path.join(cwd, 'database.db')
DATABASE_BACKUP = os.path.join(cwd, 'database_backup.db')

animation = ["⢿", "⣻", "⣽", "⣾", "⣷", "⣯", "⣟", "⡿"]
headers_1 = ['Worker', 'Video Title', 'Watch / Actual Duration']
headers_2 = ['Index', 'Video Title', 'Views']

width = 0
viewports = ['2560,1440', '1920,1080', '1440,900',
             '1536,864', '1366,768', '1280,1024', '1024,768']

referers = ['https://search.yahoo.com/', 'https://duckduckgo.com/', 'https://www.google.com/',
            'https://www.bing.com/', 'https://t.co/', '']

referers = choices(referers, k=len(referers)*3)

website.console = console
website.database = DATABASE


def monkey_patch_exe(self):
    linect = 0
    replacement = self.gen_random_cdc()
    replacement = f"  var key = '${replacement.decode()}_';\n".encode()
    with io.open(self.executable_path, "r+b") as fh:
        for line in iter(lambda: fh.readline(), b""):
            if b"var key = " in line:
                fh.seek(-len(line), 1)
                fh.write(replacement)
                linect += 1
        return linect


Patcher.patch_exe = monkey_patch_exe


def timestamp():
    global date_fmt
    date_fmt = datetime.now().strftime("%d-%b-%Y %H:%M:%S")
    return bcolors.OKGREEN + f'[{date_fmt}] | ' + bcolors.OKCYAN + f'{cpu_usage} | '


def clean_exe_temp(folder):
    temp_name = None
    if hasattr(sys, '_MEIPASS'):
        temp_name = sys._MEIPASS.split('\\')[-1]
    else:
        if sys.version_info.minor < 7 or sys.version_info.minor > 11:
            print(
                f'Your current python version is not compatible : {sys.version}')
            print(f'Install Python version between 3.7.x to 3.11.x to run this script')
            input("")
            sys.exit()

    for f in glob(os.path.join('temp', folder, '*')):
        if temp_name not in f:
            shutil.rmtree(f, ignore_errors=True)


def update_chrome_version():
    link = 'https://gist.githubusercontent.com/MShawon/29e185038f22e6ac5eac822a1e422e9d/raw/versions.txt'

    output = requests.get(link, timeout=60).text
    chrome_versions = output.split('\n')

    browsers.chrome_ver = chrome_versions


def check_update():
    api_url = 'https://api.github.com/repos/MShawon/YouTube-Viewer/releases/latest'
    try:
        response = requests.get(api_url, timeout=30)

        RELEASE_VERSION = response.json()['tag_name']

        if RELEASE_VERSION > SCRIPT_VERSION:
            print(bcolors.OKCYAN + '#'*100 + bcolors.ENDC)
            print(bcolors.OKCYAN + 'Update Available!!! ' +
                  f'YouTube Viewer version {SCRIPT_VERSION} needs to update to {RELEASE_VERSION} version.' + bcolors.ENDC)

            try:
                notes = response.json()['body'].split(
                    'SHA256')[0].split('\r\n')
                for note in notes:
                    if note:
                        print(bcolors.HEADER + note + bcolors.ENDC)
            except Exception:
                pass
            print(bcolors.OKCYAN + '#'*100 + '\n' + bcolors.ENDC)
    except Exception:
        pass


def create_html(text_dict):
    if len(console) > 250:
        console.pop()

    date = f'<span style="color:#23d18b"> [{date_fmt}] | </span>'
    cpu = f'<span style="color:#29b2d3"> {cpu_usage} | </span>'
    str_fmt = ''.join(
        [f'<span style="color:{key}"> {value} </span>' for key, value in text_dict.items()])
    html = date + cpu + str_fmt

    console.insert(0, html)


def detect_file_change():
    global hash_urls, hash_queries, urls, queries

    if hash_urls != get_hash("urls.txt"):
        hash_urls = get_hash("urls.txt")
        urls = load_url()
        suggested.clear()

    if hash_queries != get_hash("search.txt"):
        hash_queries = get_hash("search.txt")
        queries = load_search()
        suggested.clear()


def direct_or_search(position):
    keyword = None
    video_title = None
    if position % 2:
        try:
            method = 1
            url = choice(urls)
            if 'music.youtube.com' in url:
                youtube = 'Music'
            else:
                youtube = 'Video'
        except IndexError:
            raise Exception("Your urls.txt is empty!")

    else:
        try:
            method = 2
            query = choice(queries)
            keyword = query[0]
            video_title = query[1]
            url = "https://www.youtube.com"
            youtube = 'Video'
        except IndexError:
            try:
                youtube = 'Music'
                url = choice(urls)
                if 'music.youtube.com' not in url:
                    raise Exception
            except Exception:
                raise Exception("Your search.txt is empty!")

    return url, method, youtube, keyword, video_title


def features(driver):
    if bandwidth:
        save_bandwidth(driver)

    bypass_popup(driver)

    bypass_other_popup(driver)

    play_video(driver)

    change_playback_speed(driver, playback_speed)


def update_view_count(position):
    view.append(position)
    view_count = len(view)
    print(timestamp() + bcolors.OKCYAN +
          f'Worker {position} | View added : {view_count}' + bcolors.ENDC)

    create_html({"#29b2d3": f'Worker {position} | View added : {view_count}'})

    if database:
        try:
            update_database(
                database=DATABASE, threads=max_threads)
        except Exception:
            pass


def set_referer(position, url, method, driver):
    referer = choice(referers)
    if referer:
        if method == 2 and 't.co/' in referer:
            driver.get(url)
        else:
            if 'search.yahoo.com' in referer:
                driver.get('https://duckduckgo.com/')
                driver.execute_script(
                    "window.history.pushState('page2', 'Title', arguments[0]);", referer)
            else:
                driver.get(referer)

            driver.execute_script(
                "window.location.href = '{}';".format(url))

        print(timestamp() + bcolors.OKBLUE +
              f"Worker {position} | Referer used : {referer}" + bcolors.ENDC)

        create_html(
            {"#3b8eea": f"Worker {position} | Referer used : {referer}"})

    else:
        driver.get(url)


def youtube_normal(method, keyword, video_title, driver, output):
    if method == 2:
        msg = search_video(driver, keyword, video_title)
        if msg == 'failed':
            raise Exception(
                f"Can't find this [{video_title}] video with this keyword [{keyword}]")

    skip_initial_ad(driver, output, duration_dict)

    try:
        WebDriverWait(driver, 10).until(EC.visibility_of_element_located(
            (By.ID, 'movie_player')))
    except WebDriverException:
        raise Exception(
            "Slow internet speed or Stuck at reCAPTCHA! Can't load YouTube...")

    features(driver)

    try:
        view_stat = WebDriverWait(driver, 30).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, '#count span'))).text
        if not view_stat:
            raise WebDriverException
    except WebDriverException:
        view_stat = driver.find_element(
            By.XPATH, '//*[@id="info"]/span[1]').text

    return view_stat


def youtube_music(driver):
    if 'coming-soon' in driver.title or 'not available' in driver.title:
        raise Exception(
            "YouTube Music is not available in your area!")
    try:
        WebDriverWait(driver, 10).until(EC.visibility_of_element_located(
            (By.XPATH, '//*[@id="player-page"]')))
    except WebDriverException:
        raise Exception(
            "Slow internet speed or Stuck at reCAPTCHA! Can't load YouTube...")

    bypass_popup(driver)

    play_music(driver)

    output = driver.find_element(
        By.XPATH, '//ytmusic-player-bar//yt-formatted-string').text
    view_stat = 'music'

    return view_stat, output


def spoof_timezone_geolocation(proxy_type, proxy, driver):
    try:
        proxy_dict = {
            "http": f"{proxy_type}://{proxy}",
                    "https": f"{proxy_type}://{proxy}",
        }
        resp = requests.get(
            "http://ip-api.com/json", proxies=proxy_dict, timeout=30)

        if resp.status_code == 200:
            location = resp.json()
            tz_params = {'timezoneId': location['timezone']}
            latlng_params = {
                "latitude": location['lat'],
                "longitude": location['lon'],
                "accuracy": randint(20, 100)
            }
            info = f"ip-api.com | Lat : {location['lat']} | Lon : {location['lon']} | TZ: {location['timezone']}"
        else:
            raise RequestException

    except RequestException:
        location = fake.location_on_land()
        tz_params = {'timezoneId': location[-1]}
        latlng_params = {
            "latitude": location[0],
            "longitude": location[1],
            "accuracy": randint(20, 100)
        }
        info = f"Random | Lat : {location[0]} | Lon : {location[1]} | TZ: {location[-1]}"

    try:
        driver.execute_cdp_cmd('Emulation.setTimezoneOverride', tz_params)

        driver.execute_cdp_cmd(
            "Emulation.setGeolocationOverride", latlng_params)

    except WebDriverException:
        pass

    return info


def control_player(driver, output, position, proxy, youtube, collect_id=True):
    current_url = driver.current_url

    video_len = duration_dict.get(output, 0)
    for _ in range(90):
        if video_len != 0:
            duration_dict[output] = video_len
            break

        video_len = driver.execute_script(
            "return document.getElementById('movie_player').getDuration()")
        sleep(1)

    if video_len == 0:
        raise Exception('Video player is not loading...')

    actual_duration = strftime(
        "%Hh:%Mm:%Ss", gmtime(video_len)).lstrip("0h:0m:")
    video_len = video_len*uniform(minimum, maximum)
    duration = strftime("%Hh:%Mm:%Ss", gmtime(video_len)).lstrip("0h:0m:")

    if len(output) == 11:
        output = driver.title[:-10]

    summary[position] = [position, output, f'{duration} / {actual_duration}']
    website.summary_table = tabulate(
        summary.values(), headers=headers_1, numalign='center', stralign='center', tablefmt="html")

    print(timestamp() + bcolors.OKBLUE + f"Worker {position} | " + bcolors.OKGREEN +
          f"{proxy} --> {youtube} Found : {output} | Watch Duration : {duration} " + bcolors.ENDC)

    create_html({"#3b8eea": f"Worker {position} | ",
                 "#23d18b": f"{proxy.split('@')[-1]} --> {youtube} Found : {output} | Watch Duration : {duration} "})

    if youtube == 'Video' and collect_id:
        try:
            video_id = re.search(
                r"(?:v=|\/)([0-9A-Za-z_-]{11}).*", current_url).group(1)
            if video_id not in suggested and output in driver.title:
                suggested.append(video_id)
        except Exception:
            pass

    try:
        current_channel = driver.find_element(
            By.CSS_SELECTOR, '#upload-info a').text
    except WebDriverException:
        current_channel = 'Unknown'

    error = 0
    loop = int(video_len/4)
    for _ in range(loop):
        sleep(5)
        current_time = driver.execute_script(
            "return document.getElementById('movie_player').getCurrentTime()")

        if youtube == 'Video':
            play_video(driver)
            random_command(driver)
        elif youtube == 'Music':
            play_music(driver)

        current_state = driver.execute_script(
            "return document.getElementById('movie_player').getPlayerState()")
        if current_state in [-1, 3]:
            error += 1
        else:
            error = 0

        if error == 10:
            error_msg = f'Taking too long to play the video | Reason : buffering'
            if current_state == -1:
                error_msg = f"Failed to play the video | Possible Reason : {proxy.split('@')[-1]} not working anymore"
            raise Exception(error_msg)

        elif current_time > video_len or driver.current_url != current_url:
            break

    summary.pop(position, None)
    website.summary_table = tabulate(
        summary.values(), headers=headers_1, numalign='center', stralign='center', tablefmt="html")

    output = textwrap.fill(text=output, width=75, break_on_hyphens=False)
    video_statistics[output] = video_statistics.get(output, 0) + 1
    website.html_table = tabulate(video_statistics.items(), headers=headers_2,
                                  showindex=True, numalign='center', stralign='center', tablefmt="html")

    return current_url, current_channel


def youtube_live(proxy, position, driver, output):
    error = 0
    while True:
        try:
            view_stat = driver.find_element(
                By.CSS_SELECTOR, '#count span').text
            if not view_stat:
                raise WebDriverException
        except WebDriverException:
            view_stat = driver.find_element(
                By.XPATH, '//*[@id="info"]/span[1]').text
        if 'watching' in view_stat:
            print(timestamp() + bcolors.OKBLUE + f"Worker {position} | " + bcolors.OKGREEN +
                  f"{proxy} | {output} | " + bcolors.OKCYAN + f"{view_stat} " + bcolors.ENDC)

            create_html({"#3b8eea": f"Worker {position} | ",
                         "#23d18b": f"{proxy.split('@')[-1]} | {output} | ", "#29b2d3": f"{view_stat} "})
        else:
            error += 1

        play_video(driver)

        random_command(driver)

        if error == 5:
            break
        sleep(60)

    update_view_count(position)


def music_and_video(proxy, position, youtube, driver, output, view_stat):
    rand_choice = 1
    if len(suggested) > 1 and view_stat != 'music':
        rand_choice = randint(1, 3)

    for i in range(rand_choice):
        if i == 0:
            current_url, current_channel = control_player(
                driver, output, position, proxy, youtube, collect_id=True)

            update_view_count(position)

        else:
            print(timestamp() + bcolors.OKBLUE +
                  f"Worker {position} | Suggested video loop : {i}" + bcolors.ENDC)

            create_html(
                {"#3b8eea": f"Worker {position} | Suggested video loop : {i}"})

            try:
                output = play_next_video(driver, suggested)
            except WebDriverException as e:
                raise Exception(
                    f"Error suggested | {type(e).__name__} | {e.args[0] if e.args else ''}")

            print(timestamp() + bcolors.OKBLUE +
                  f"Worker {position} | Found next suggested video : [{output}]" + bcolors.ENDC)

            create_html(
                {"#3b8eea": f"Worker {position} | Found next suggested video : [{output}]"})

            skip_initial_ad(driver, output, duration_dict)

            features(driver)

            current_url, current_channel = control_player(
                driver, output, position, proxy, youtube, collect_id=False)

            update_view_count(position)

    return current_url, current_channel


def channel_or_endscreen(proxy, position, youtube, driver, view_stat, current_url, current_channel):
    option = 1
    if view_stat != 'music' and driver.current_url == current_url:
        option = choices([1, 2, 3], cum_weights=(0.5, 0.75, 1.00), k=1)[0]

        if option == 2:
            try:
                output, log, option = play_from_channel(
                    driver, current_channel)
            except WebDriverException as e:
                raise Exception(
                    f"Error channel | {type(e).__name__} | {e.args[0] if e.args else ''}")

            print(timestamp() + bcolors.OKBLUE +
                  f"Worker {position} | {log}" + bcolors.ENDC)

            create_html({"#3b8eea": f"Worker {position} | {log}"})

        elif option == 3:
            try:
                output = play_end_screen_video(driver)
            except WebDriverException as e:
                raise Exception(
                    f"Error end screen | {type(e).__name__} | {e.args[0] if e.args else ''}")

            print(timestamp() + bcolors.OKBLUE +
                  f"Worker {position} | Video played from end screen : [{output}]" + bcolors.ENDC)

            create_html(
                {"#3b8eea": f"Worker {position} | Video played from end screen : [{output}]"})

        if option in [2, 3]:
            skip_initial_ad(driver, output, duration_dict)

            features(driver)

            current_url, current_channel = control_player(
                driver, output, position, proxy, youtube, collect_id=False)

        if option in [2, 3, 4]:
            update_view_count(position)


def windows_kill_drivers():
    for process in constructor.Win32_Process(["CommandLine", "ProcessId"]):
        try:
            if 'UserAgentClientHint' in process.CommandLine:
                print(f'Killing PID : {process.ProcessId}', end="\r")
                subprocess.Popen(['taskkill', '/F', '/PID', f'{process.ProcessId}'],
                                 stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL)
        except Exception:
            pass
    print('\n')


def quit_driver(driver, data_dir):
    if driver and driver in driver_dict:
        driver.quit()
        if data_dir in temp_folders:
            temp_folders.remove(data_dir)

    proxy_folder = driver_dict.pop(driver, None)
    if proxy_folder:
        shutil.rmtree(proxy_folder, ignore_errors=True)

    status = 400
    return status


def main_viewer(proxy_type, proxy, position):
    global width, viewports
    driver = None
    data_dir = None

    if cancel_all:
        raise KeyboardInterrupt

    try:
        detect_file_change()

        checked[position] = None

        header = Headers(
            browser="chrome",
            os=osname,
            headers=False
        ).generate()
        agent = header['User-Agent']

        url, method, youtube, keyword, video_title = direct_or_search(position)

        if category == 'r' and proxy_api:
            for _ in range(20):
                proxy = choice(proxies_from_api)
                if proxy not in used_proxies:
                    break
            used_proxies.append(proxy)

        status = check_proxy(category, agent, proxy, proxy_type)

        if status != 200:
            raise RequestException(status)

        try:
            print(timestamp() + bcolors.OKBLUE + f"Worker {position} | " + bcolors.OKGREEN +
                  f"{proxy} | {proxy_type.upper()} | Good Proxy | Opening a new driver..." + bcolors.ENDC)

            create_html({"#3b8eea": f"Worker {position} | ",
                        "#23d18b": f"{proxy.split('@')[-1]} | {proxy_type.upper()} | Good Proxy | Opening a new driver..."})

            while proxy in bad_proxies:
                bad_proxies.remove(proxy)
                sleep(1)

            patched_driver = os.path.join(
                patched_drivers, f'chromedriver_{position%threads}{exe_name}')

            try:
                Patcher(executable_path=patched_driver).patch_exe()
            except Exception:
                pass

            proxy_folder = os.path.join(
                cwd, 'extension', f'proxy_auth_{position}')

            factor = int(threads/(0.1*threads + 1))
            sleep_time = int((str(position)[-1])) * factor
            sleep(sleep_time)
            if cancel_all:
                raise KeyboardInterrupt

            driver = get_driver(background, viewports, agent, auth_required,
                                patched_driver, proxy, proxy_type, proxy_folder)

            driver_dict[driver] = proxy_folder

            data_dir = driver.capabilities['chrome']['userDataDir']
            temp_folders.append(data_dir)

            sleep(2)

            info = spoof_timezone_geolocation(proxy_type, proxy, driver)

            isdetected = driver.execute_script('return navigator.webdriver')

            print(timestamp() + bcolors.OKBLUE + f"Worker {position} | " + bcolors.OKGREEN +
                  f"{proxy} | {proxy_type.upper()} | " + bcolors.OKCYAN + f"{info} | Detected? : {isdetected}" + bcolors.ENDC)

            create_html({"#3b8eea": f"Worker {position} | ",
                        "#23d18b": f"{proxy.split('@')[-1]} | {proxy_type.upper()} | ", "#29b2d3": f"{info} | Detected? : {isdetected}"})

            if width == 0:
                width = driver.execute_script('return screen.width')
                height = driver.execute_script('return screen.height')
                print(f'Display resolution : {width}x{height}')
                viewports = [i for i in viewports if int(i[:4]) <= width]

            set_referer(position, url, method, driver)

            if 'consent' in driver.current_url:
                print(timestamp() + bcolors.OKBLUE +
                      f"Worker {position} | Bypassing consent..." + bcolors.ENDC)

                create_html(
                    {"#3b8eea": f"Worker {position} | Bypassing consent..."})

                bypass_consent(driver)

            if video_title:
                output = video_title
            else:
                output = driver.title[:-10]

            if youtube == 'Video':
                view_stat = youtube_normal(
                    method, keyword, video_title, driver, output)
            else:
                view_stat, output = youtube_music(driver)

            if 'watching' in view_stat:
                youtube_live(proxy, position, driver, output)

            else:
                current_url, current_channel = music_and_video(
                    proxy, position, youtube, driver, output, view_stat)

            channel_or_endscreen(proxy, position, youtube,
                                 driver, view_stat, current_url, current_channel)

            if randint(1, 2) == 1:
                try:
                    driver.find_element(By.ID, 'movie_player').send_keys('k')
                except WebDriverException:
                    pass

            status = quit_driver(driver=driver, data_dir=data_dir)

        except Exception as e:
            status = quit_driver(driver=driver, data_dir=data_dir)

            print(timestamp() + bcolors.FAIL +
                  f"Worker {position} | Line : {e.__traceback__.tb_lineno} | {type(e).__name__} | {e.args[0] if e.args else ''}" + bcolors.ENDC)

            create_html(
                {"#f14c4c": f"Worker {position} | Line : {e.__traceback__.tb_lineno} | {type(e).__name__} | {e.args[0] if e.args else ''}"})

    except RequestException:
        print(timestamp() + bcolors.OKBLUE + f"Worker {position} | " +
              bcolors.FAIL + f"{proxy} | {proxy_type.upper()} | Bad proxy " + bcolors.ENDC)

        create_html({"#3b8eea": f"Worker {position} | ",
                     "#f14c4c": f"{proxy.split('@')[-1]} | {proxy_type.upper()} | Bad proxy "})

        checked[position] = proxy_type
        bad_proxies.append(proxy)

    except Exception as e:
        print(timestamp() + bcolors.FAIL +
              f"Worker {position} | Line : {e.__traceback__.tb_lineno} | {type(e).__name__} | {e.args[0] if e.args else ''}" + bcolors.ENDC)

        create_html(
            {"#f14c4c": f"Worker {position} | Line : {e.__traceback__.tb_lineno} | {type(e).__name__} | {e.args[0] if e.args else ''}"})


def get_proxy_list():
    if filename:
        if category == 'r':
            factor = max_threads if max_threads > 1000 else 1000
            proxy_list = [filename] * factor
        else:
            if proxy_api:
                proxy_list = scrape_api(filename)
            else:
                proxy_list = load_proxy(filename)

    else:
        proxy_list = gather_proxy()

    return proxy_list


def stop_server(immediate=False):
    if not immediate:
        print('Allowing a maximum of 15 minutes to finish all the running drivers...')
        for _ in range(180):
            sleep(5)
            if 'state=running' not in str(futures[1:-1]):
                break

    if api:
        for _ in range(10):
            response = requests.post(f'http://127.0.0.1:{port}/shutdown')
            if response.status_code == 200:
                print('Server shut down successfully!')
                break
            else:
                print(f'Server shut down error : {response.status_code}')
                sleep(3)


def clean_exit():
    print(timestamp() + bcolors.WARNING +
          'Cleaning up processes...' + bcolors.ENDC)
    create_html({"#f3f342": "Cleaning up processes..."})

    if osname == 'win':
        driver_dict.clear()
        windows_kill_drivers()
    else:
        for driver in list(driver_dict):
            quit_driver(driver=driver, data_dir=None)

    for folder in temp_folders:
        shutil.rmtree(folder, ignore_errors=True)


def cancel_pending_task(not_done):
    global cancel_all

    cancel_all = True
    for future in not_done:
        _ = future.cancel()

    clean_exit()

    stop_server(immediate=True)
    _ = wait(not_done, timeout=None)

    clean_exit()


def view_video(position):
    if position == 0:
        if api:
            website.start_server(host=host, port=port)

    elif position == total_proxies - 1:
        stop_server(immediate=False)
        clean_exit()

    else:
        sleep(2)
        proxy = proxy_list[position]

        if proxy_type:
            main_viewer(proxy_type, proxy, position)
        elif '|' in proxy:
            splitted = proxy.split('|')
            main_viewer(splitted[-1], splitted[0], position)
        else:
            main_viewer('http', proxy, position)
            if checked[position] == 'http':
                main_viewer('socks4', proxy, position)
            if checked[position] == 'socks4':
                main_viewer('socks5', proxy, position)


def main():
    global cancel_all, proxy_list, total_proxies, proxies_from_api, threads, hash_config, futures, cpu_usage

    cancel_all = False
    start_time = time()
    hash_config = get_hash(config_path)

    proxy_list = get_proxy_list()
    if category != 'r':
        print(bcolors.OKCYAN +
              f'Total proxies : {len(proxy_list)}' + bcolors.ENDC)

    proxy_list = [x for x in proxy_list if x not in bad_proxies]
    if len(proxy_list) == 0:
        bad_proxies.clear()
        proxy_list = get_proxy_list()
    if proxy_list[0] != 'dummy':
        proxy_list.insert(0, 'dummy')
    if proxy_list[-1] != 'dummy':
        proxy_list.append('dummy')
    total_proxies = len(proxy_list)

    if category == 'r' and proxy_api:
        proxies_from_api = scrape_api(link=filename)

    threads = randint(min_threads, max_threads)
    if api:
        threads += 1

    loop = 0
    pool_number = list(range(total_proxies))

    with ThreadPoolExecutor(max_workers=threads) as executor:
        futures = [executor.submit(view_video, position)
                   for position in pool_number]

        done, not_done = wait(futures, timeout=0)
        try:
            while not_done:
                freshly_done, not_done = wait(not_done, timeout=1)
                done |= freshly_done

                loop += 1
                for _ in range(70):
                    cpu = str(psutil.cpu_percent(0.2))
                    cpu_usage = cpu + '%' + ' ' * \
                        (5-len(cpu)) if cpu != '0.0' else cpu_usage

                if loop % 40 == 0:
                    print(tabulate(video_statistics.items(),
                          headers=headers_2, showindex=True, tablefmt="pretty"))

                if category == 'r' and proxy_api:
                    proxies_from_api = scrape_api(link=filename)

                if len(view) >= views:
                    print(timestamp() + bcolors.WARNING +
                          f'Amount of views added : {views} | Stopping program...' + bcolors.ENDC)
                    create_html(
                        {"#f3f342": f'Amount of views added : {views} | Stopping program...'})

                    cancel_pending_task(not_done=not_done)
                    break

                elif hash_config != get_hash(config_path):
                    hash_config = get_hash(config_path)
                    print(timestamp() + bcolors.WARNING +
                          'Modified config.json will be in effect soon...' + bcolors.ENDC)
                    create_html(
                        {"#f3f342": 'Modified config.json will be in effect soon...'})

                    cancel_pending_task(not_done=not_done)
                    break

                elif refresh != 0 and category != 'r':

                    if (time() - start_time) > refresh*60:
                        start_time = time()

                        proxy_list_new = get_proxy_list()
                        proxy_list_new = [
                            x for x in proxy_list_new if x not in bad_proxies]

                        proxy_list_old = [
                            x for x in proxy_list[1:-1] if x not in bad_proxies]

                        if sorted(proxy_list_new) != sorted(proxy_list_old):
                            print(timestamp() + bcolors.WARNING +
                                  f'Refresh {refresh} minute triggered. Proxies will be reloaded soon...' + bcolors.ENDC)
                            create_html(
                                {"#f3f342": f'Refresh {refresh} minute triggered. Proxies will be reloaded soon...'})

                            cancel_pending_task(not_done=not_done)
                            break

        except KeyboardInterrupt:
            print(timestamp() + bcolors.WARNING +
                  'Hold on!!! Allow me a moment to close all the running drivers.' + bcolors.ENDC)
            create_html(
                {"#f3f342": 'Hold on!!! Allow me a moment to close all the running drivers.'})

            cancel_pending_task(not_done=not_done)
            raise KeyboardInterrupt


if __name__ == '__main__':

    clean_exe_temp(folder='youtube_viewer')
    date_fmt = datetime.now().strftime("%d-%b-%Y %H:%M:%S")
    cpu_usage = str(psutil.cpu_percent(1))
    update_chrome_version()
    check_update()
    osname, exe_name = download_driver(patched_drivers=patched_drivers)
    create_database(database=DATABASE, database_backup=DATABASE_BACKUP)

    if osname == 'win':
        import wmi
        constructor = wmi.WMI()

    urls = load_url()
    queries = load_search()

    if os.path.isfile(config_path):
        with open(config_path, 'r', encoding='utf-8-sig') as openfile:
            config = json.load(openfile)

        if len(config) == 11:
            print(json.dumps(config, indent=4))
            print(bcolors.OKCYAN + 'Config file exists! Program will start automatically after 20 seconds...' + bcolors.ENDC)
            print(bcolors.FAIL + 'If you want to create a new config file PRESS CTRL+C within 20 seconds!' + bcolors.ENDC)
            start = time() + 20
            try:
                i = 0
                while i < 96:
                    print(bcolors.OKBLUE + f"{start - time():.0f} seconds remaining " +
                          animation[i % len(animation)] + bcolors.ENDC, end="\r")
                    i += 1
                    sleep(0.2)
                print('\n')
            except KeyboardInterrupt:
                create_config(config_path=config_path)
        else:
            print(bcolors.FAIL + 'Previous config file is not compatible with the latest script! Create a new one...' + bcolors.ENDC)
            create_config(config_path=config_path)
    else:
        create_config(config_path=config_path)

    hash_urls = get_hash("urls.txt")
    hash_queries = get_hash("search.txt")
    hash_config = get_hash(config_path)

    while len(view) < views:
        try:
            with open(config_path, 'r', encoding='utf-8-sig') as openfile:
                config = json.load(openfile)

            if cancel_all:
                print(json.dumps(config, indent=4))
            api = config["http_api"]["enabled"]
            host = config["http_api"]["host"]
            port = config["http_api"]["port"]
            database = config["database"]
            views = config["views"]
            minimum = config["minimum"] / 100
            maximum = config["maximum"] / 100
            category = config["proxy"]["category"]
            proxy_type = config["proxy"]["proxy_type"]
            filename = config["proxy"]["filename"]
            auth_required = config["proxy"]["authentication"]
            proxy_api = config["proxy"]["proxy_api"]
            refresh = config["proxy"]["refresh"]
            background = config["background"]
            bandwidth = config["bandwidth"]
            playback_speed = config["playback_speed"]
            max_threads = config["max_threads"]
            min_threads = config["min_threads"]

            if minimum >= maximum:
                minimum = maximum - 5

            if min_threads >= max_threads:
                max_threads = min_threads

            if auth_required and background:
                print(bcolors.FAIL +
                      "Premium proxy needs extension to work. Chrome doesn't support extension in Headless mode." + bcolors.ENDC)
                input(bcolors.WARNING +
                      f"Either use proxy without username & password or disable headless mode " + bcolors.ENDC)
                sys.exit()

            copy_drivers(cwd=cwd, patched_drivers=patched_drivers,
                         exe=exe_name, total=max_threads)

            main()
        except KeyboardInterrupt:
            sys.exit()

 

간단히 코드를 보면

temp를 비우고 타임스탬프로 시간과 cpu 사용량을 기록한다.

 

patched drivers 폴더에 크롬 드라이버가 6개 설치되어 있다.

이 6개의 드라이버에 프록시 주소 하나씩을 할당해서 타임스탬프로 시간을 재며 조회수 작업을 마치고 다음 주소를 할당하는 것이다. 

시간 설정을 미니멈 30초 이상으로 해야 조회수 카운트가 가능하고, 앞에 붙은 광고의 시간을 고려해서 설정해야 한다.

그래서 만약 15초의 광고가 앞에 붙는다면 미니멈 45초에서 시작해야 한다.

 

1GB의 프록시 사용량을 생각하면 480p로 시청했을 때 3시간에서 4시간 사이로 주어진다.

 

 

728x90