|
|
本帖最后由 host0108 于 2019-9-28 15:43 编辑
[ol]import gevent.monkeygevent.monkey.patch_all()import requestsimport jsonimport clickimport randomimport stringfrom gevent.subprocess import Popen, PIPEimport peeweeimport datetimeimport geventimport sysdef _call(cmd): child = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE) result = child.communicate()[0] return resultclass Wireguard(object): @staticmethod def gen_key(): private_key = _call("wg genkey").decode('utf-8').strip() public_key = _call("echo '{}' | wg pubkey".format(private_key)).decode('utf-8').strip() return private_key, public_key @staticmethod def gen_config(private_key, public_key, address, endpoint): r = """[Interface]PrivateKey = {0}Address = {1}DNS = 1.1.1.1[Peer]PublicKey = {2}Endpoint = {3}AllowedIPs = 0.0.0.0/0""".format(private_key, address, public_key, endpoint) return rclass WarpCli(object): def __init__(self): self._session = requests.session() @staticmethod def _gen_string(n): letters = string.ascii_letters + string.digits return ''.join(random.choice(letters) for i in range(n)) def reg(self, public_key, referrer=None): url = 'https://api.cloudflareclient.com/v0a745/reg' install_id = self._gen_string(11) data = {"key": public_key, "install_id": install_id, "fcm_token": "{}:APA91b{}".format(install_id, self._gen_string(134)), "warp_enabled": True, "tos": datetime.datetime.now().isoformat()[:-3] + "-07:00", "type": "Android", "locale": "en_US"} if referrer: data['referrer'] = referrer headers = { 'Content-Type': 'application/json; charset=UTF-8', 'Connection': 'Keep-Alive', 'User-Agent': 'okhttp/3.12.1' } r = self._session.post(url, json=data, headers=headers, timeout=5) return r.json() @staticmethod def get_info(client_id, token): url = 'https://api.cloudflareclient.com/v0i1909221500/reg/{}'.format(client_id) headers = { "Accept": "*/*", "Authorization": "Bearer {}".format(token), "Accept-Encoding": "gzip", "Accept-Language": "Language", "User-Agent": "1.1.1.1/1909221500.1 CFNetwork/978.0.7 Darwin/18.7.0" } r = requests.get(url, headers=headers) return r.json()db = peewee.SqliteDatabase("warp.db")class WarpAccount(peewee.Model): id = peewee.PrimaryKeyField() client_id = peewee.CharField(unique=True) private_key = peewee.CharField(unique=True) public_key = peewee.CharField() token = peewee.CharField() referrer = peewee.IntegerField() info = peewee.CharField() class Meta: database = dbWarpAccount.create_table()@click.group()def cli(): pass@cli.command()@click.option('--client_id', required=True)def account_config(client_id): r = WarpAccount.select().where(WarpAccount.client_id == client_id) for i in r: data = json.loads(i.info) r = Wireguard.gen_config(i.private_key, data['config']['peers'][0]['public_key'], data['config']['interface']['addresses']['v4'], data['config']['peers'][0]['endpoint']['host']) print(r)@cli.command()@click.option('--client_id', required=True)def account_info(client_id): r = WarpAccount.select().where(WarpAccount.client_id == client_id) for i in r: print(json.dumps(json.loads(i.info), indent=4, sort_keys=True))@cli.command()@click.option('--limit', default=10)def list_account(limit): all_account = WarpAccount.select().limit(limit) for i in all_account: print(i.client_id)do_count = 0do_referrer = Nonemjj_ids = []@cli.command()@click.option('--count', default=1)@click.option('--referrer', default="")@click.option('--thread', default=2)@click.option('--mjj_mode', default=0)def create_account(count, referrer, thread, mjj_mode): global do_count global do_referrer do_count = count do_referrer = referrer def loop_reg(idx): w = WarpCli() while True: global do_count global do_referrer do_count -= 1 if do_count == -1: break try: if mjj_mode: import random referrer = random.choice(mjj_ids) else: referrer = do_referrer private_key, public_key = Wireguard.gen_key() r = w.reg(public_key, referrer) if mjj_mode: q = WarpAccount.update(referrer=WarpAccount.referrer + 1).where(WarpAccount.client_id == referrer) q.execute() print('tid: {} add referrer for {}'.format(idx, referrer)) else: WarpAccount.create(client_id=r['id'], private_key=private_key, public_key=public_key, token=r['token'], info=json.dumps(r), referrer=0) print('tid: {} reg success {}'.format(idx, r['id'])) except KeyboardInterrupt: sys.exit(0) except: import traceback traceback.print_exc() gevent.sleep(1) if mjj_mode: r = WarpAccount.select() if len(r) == 0: print('please create account try again') for i in r: mjj_ids.append(i.client_id) threads = [gevent.spawn(loop_reg, idx) for idx in range(thread)] gevent.joinall(threads)if __name__ == '__main__': cli()[/ol]复制代码
多协程自动注册,数据库持久化,然后你就有了一堆账户,还有神奇的mjj模式,可以干啥自己研究。
支持生成wireguard配置。
有部分是抄https://www.52.ht/thread-590354-1-1.html大佬的,改了下生成的公钥是合法的,真实账户,不然拿不到配置。
add-apt-repository ppa:wireguard/wireguard
apt-get update
apt-get install wireguard
apt install python3-dev python3-pip
pip3 install gevent requests click peewee
# use referrer add 100 account
python3 warp.py create-account --count 100 --thread 2 --referrer xxxxxxxxxxxxxxxxx
python3 warp.py list-account
python3 warp.py account-config --client_id xxxxxxxxxxxxxxxxx
thread 太高会被ban!
|
|