Source code for oi_cli2.cli.contest

#!/usr/bin/env python3
import asyncio
from enum import Enum
import json
import logging
import os
import subprocess
import sys
import threading
import time
from requests.exceptions import ReadTimeout, ConnectTimeout
from typing import Callable, List
import click
from rich.live import Live
from rich.table import Table
from rich.style import Style
from rich.console import Console
from oi_cli2.cli.adaptor.ojman import OJManager
from oi_cli2.cli.constant import APP_NAME
from oi_cli2.core.DI import DI_ACCMAN, DI_CFG, DI_TEMPMAN

from oi_cli2.model.BaseOj import BaseOj
from oi_cli2.model.FolderState import FolderState
from oi_cli2.model.ProblemMeta import E_STATUS, ProblemMeta
from oi_cli2.model.TestCase import TestCase

from oi_cli2.utils.FileUtil import FileUtil
from oi_cli2.utils.Provider2 import Provider2
from oi_cli2.utils.account import AccountManager
from oi_cli2.utils.configFolder import ConfigFolder

# file_util can be any thing , everything is file
from oi_cli2.utils.template import TemplateManager

console = Console(color_system='256', style=None)
logger = logging.getLogger(APP_NAME+'.contest')

[docs]class VisitStatus(Enum): BEFORE = 1 SUCCESS = 2 FAILED = 3 TIMEOUT = 3
visitedTextMap = { VisitStatus.BEFORE: "Fetching", VisitStatus.SUCCESS: "[green]OK", VisitStatus.FAILED: "[red]Failed", VisitStatus.TIMEOUT: "[red]Timeout", }
[docs]def all_visit(result) -> bool: for v in result: if v[1] == VisitStatus.BEFORE: return False return True
[docs]def generate_table(result) -> Table: """Generate new table.""" table = Table() table.add_column("Problem", width=10) table.add_column("Fetched", width=10) table.add_column("Parse Status") for row in result: table.add_row( row[0], visitedTextMap[row[1]], "[green]OK" if row[2] else "Unknown", ) return table
# TODO support by problem id
[docs]async def create_problem(data, pm: ProblemMeta, contest_id: str, template, oj: BaseOj, updater:Callable[[],None]): config_folder: ConfigFolder = Provider2().get(DI_CFG) # TODO remove retry, should provided by sesssion timeOutRetry = 3 while timeOutRetry >= 0: try: file_util = FileUtil problem_id = contest_id + pm.id result = await oj.async_problem(pm) data[1] = VisitStatus.SUCCESS updater() test_cases: List[TestCase] = result.test_cases directory = config_folder.get_file_path(os.path.join('dist', type(oj).__name__, contest_id, pm.id)) for i in range(len(test_cases)): file_util.write(config_folder.get_file_path(os.path.join(directory, f'in.{i}')), test_cases[i].in_data) file_util.write(config_folder.get_file_path(os.path.join(directory, f'out.{i}')), test_cases[i].out_data) # if code file exist not cover code if not os.path.exists(config_folder.get_file_path(os.path.join(directory, os.path.basename(template.path)))): file_util.copy(config_folder.get_file_path(template.path), config_folder.get_file_path(os.path.join(directory, os.path.basename(template.path)))) # TODO 生成state.json ( 提供 自定义字段) STATE_FILE = 'state.json' # TODO provide more info, like single test and # generate state.json folder_state = FolderState(oj=type(oj).__name__, cid=contest_id, pid=pm.id, sid=problem_id, problem_url=pm.url, template_alias=template.alias, up_lang=template.uplang) # TODO get data from analyzer logger.debug(f'create folder_state {folder_state}') with open(config_folder.get_file_path(os.path.join(directory, STATE_FILE)), "w") as statejson: json.dump(folder_state.__dict__, statejson) statejson.close() data[2] = True updater() return True except (ReadTimeout, ConnectTimeout) as e: logger.info(f'Http Timeout[{type(e).__name__}]: {e.request.url}') if timeOutRetry == 0: data[1] = VisitStatus.TIMEOUT updater() return False timeOutRetry -= 1 time.sleep(0.5) except Exception as e: logger.exception(type(e).__name__) logger.exception(e) data[1] = VisitStatus.FAILED updater() return False return False
[docs]async def createDir(oj: BaseOj, contest_id: str, problems: List[ProblemMeta]): config_folder: ConfigFolder = Provider2().get(DI_CFG) template_manager: TemplateManager = Provider2().get(DI_TEMPMAN) template = template_manager.get_platform_default(type(oj).__name__) if template is None: logger.error(type(oj).__name__ + ' has no default template, run `oi config template` first') return None async def sync_oj(): # use thread logger.info("sync oj") # ID, Fetched, success result = [] for v in problems: result.append([v.id, VisitStatus.BEFORE, False]) def between_callback(*args): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(create_problem(*args)) loop.close() with Live(generate_table(result), auto_refresh=False) as live: tasks = [] for i in range(len(problems)): task = threading.Thread(target=between_callback, args=(result[i], problems[i], contest_id, template, oj, lambda:live.update(generate_table(result),refresh=True))) tasks.append(task) task.start() for t in tasks: # wait all task finished t.join() return config_folder.get_file_path(os.path.join('dist', type(oj).__name__, contest_id)) async def async_oj(): # use async/await logger.info("async oj") # ID, Fetched, success result = [] for v in problems: result.append([v.id, VisitStatus.BEFORE, False]) with Live(generate_table(result), auto_refresh=False) as live: tasks = [] for i in range(len(problems)): tasks.append(create_problem(result[i], problems[i], contest_id, template, oj,lambda:live.update(generate_table(result),refresh=True))) await asyncio.gather(*tasks) # wait all task finished live.update(generate_table(result), refresh=True) return config_folder.get_file_path(os.path.join('dist', type(oj).__name__, contest_id)) if oj.__class__.__name__ == 'AtCoder': return await sync_oj() elif oj.__class__.__name__ == 'Codeforces': return await async_oj() else: raise Exception(f'Un specific oj[{oj.__class__.__name__}] sync/async type')
@click.group() def contest(): """Contest relative(fetch,list,standing,detail)""" @contest.command() @click.argument('platform') @click.argument('contestid') def fetch(platform, contestid) -> None: asyncio.run(async_fetch(platform, contestid))
[docs]async def async_fetch(platform, contestid) -> None: """Fetch a contest(problems and testcases) PLATFORM e.g. AtCoder, Codeforces CONTESTID The id in the url, e.g. Codeforces(1122),AtCoder(abc230) """ am: AccountManager = Provider2().get(DI_ACCMAN) try: oj: BaseOj = OJManager.createOj(platform=platform, account=am.get_default_account(platform=platform), provider=Provider2()) except Exception as e: logger.exception(e) raise e await oj.init() try: problems = (await oj.async_get_contest_meta(contestid)).problems except (ReadTimeout, ConnectTimeout) as e: logger.error(f'Http Timeout[{type(e).__name__}]: {e.request.url}') return except Exception as e: logger.exception(e) return try: logger.debug(f"{contestid},{problems}") directory = await createDir(oj=oj, contest_id=contestid, problems=problems) console.print(f"[green bold] {directory} ") except Exception as e: logger.error(e) await oj.deinit()
@contest.command(name='list') @click.argument('platform') def list_command(platform: str): """list passed and recent contest PLATFORM e.g. AtCoder, Codeforces """ am: AccountManager = Provider2().get(DI_ACCMAN) try: oj: BaseOj = OJManager.createOj(platform=platform, account=am.get_default_account(platform=platform), provider=Provider2()) except Exception as e: logger.exception(e) raise e oj.print_contest_list() @contest.command() @click.argument('platform') @click.argument('contestid') def detail(platform, contestid) -> None: """Display problem set and your submit status of a specific contest" PLATFORM e.g. AtCoder, Codeforces CONTESTID The id in the url, e.g. Codeforces(1122),AtCoder(abc230) """ am: AccountManager = Provider2().get(DI_ACCMAN) try: oj: BaseOj = OJManager.createOj(platform=platform, account=am.get_default_account(platform=platform), provider=Provider2()) except Exception as e: logger.exception(e) raise e cm = oj.get_contest_meta(contestid) table = Table(title=f"Contest {cm.url}") table.add_column("ID", style="cyan", no_wrap=False) table.add_column("Name") table.add_column("Time") table.add_column("Memory") table.add_column("Passed") table.add_column("Url") for o in cm.problems: style = Style() if o.status == E_STATUS.AC: style = Style(bgcolor="dark_green") elif o.status == E_STATUS.ERROR: style = Style(bgcolor="dark_red") table.add_row(o.id, o.name, str(o.time_limit_msec / 1000) + "s", str(o.memory_limit_kb / 1000) + "mb", o.passed, o.url, style=style) console = Console() console.print(table) @contest.command() @click.argument('platform') @click.argument('contestid') def standing(platform, contestid) -> None: """Display your friends standing of a specific contest" PLATFORM e.g. AtCoder, Codeforces CONTESTID The id in the url, e.g. Codeforces(1122),AtCoder(abc230) """ am: AccountManager = Provider2().get(DI_ACCMAN) try: oj: BaseOj = OJManager.createOj(platform=platform, account=am.get_default_account(platform=platform), provider=Provider2()) except Exception as e: logger.exception(e) raise e oj.print_friends_standing(cid=contestid) @contest.command(name='open') @click.argument('platform') @click.argument('contestid') def open_browser(platform, contestid) -> None: def open_url_with_default_browser(url: str) -> None: if sys.platform == 'win32': os.startfile(url) elif sys.platform == 'darwin': subprocess.Popen(['open', url]) else: try: subprocess.Popen(['xdg-open', url]) except OSError: print("Please open a browser on: " + url) am: AccountManager = Provider2().get(DI_ACCMAN) try: oj: BaseOj = OJManager.createOj(platform=platform, account=am.get_default_account(platform=platform), provider=Provider2()) except Exception as e: logger.exception(e) raise e open_url_with_default_browser(oj.cid2url(contestid)) @contest.command() @click.argument('platform') @click.argument('contestid') def reg(platform, contestid) -> None: am: AccountManager = Provider2().get(DI_ACCMAN) try: oj: BaseOj = OJManager.createOj(platform=platform, account=am.get_default_account(platform=platform), provider=Provider2()) except Exception as e: logger.exception(e) raise e oj.reg_contest(contestid)