import asyncio
import logging
import os
from typing import Any, Tuple, AsyncIterator
from requests.exceptions import ReadTimeout, ConnectTimeout
from oi_cli2.cli.constant import APP_NAME, CIPHER_KEY, GREEN, DEFAULT
from oi_cli2.model.BaseOj import BaseOj
from oi_cli2.model.ParseProblemResult import ParsedProblemResult
from oi_cli2.model.LangKV import LangKV
from oi_cli2.model.Account import Account
from oi_cli2.model.ProblemMeta import ContestMeta, ProblemMeta, E_STATUS
from oi_cli2.model.Result import SubmissionResult
from oi_cli2.model.TestCase import TestCase
# from oi_cli2.utils.async2sync import iter_over_async
from oi_cli2.utils.enc import AESCipher
from codeforces_core.account import async_login, async_fetch_logged_in
from codeforces_core.contest_list import async_contest_list
from codeforces_core.contest_register import async_register, RegisterResultMsg
from codeforces_core.contest_standing import async_friends_standing
from codeforces_core.contest_meta import async_contest_meta, ProblemMeta as InnerProblemMeta
from codeforces_core.interfaces.AioHttpHelper import AioHttpHelperInterface
from codeforces_core.language import async_language
from codeforces_core.problem import async_problem
from codeforces_core.submit import async_submit, transform_submission, async_fetch_submission_page, SubmissionWSResult, SubmissionPageResult
from codeforces_core.url import pid2url, pid2split, problem_url_parse
from codeforces_core.websocket import create_contest_ws_task_yield
[docs]class Codeforces(BaseOj):
def __init__(self, http_util: AioHttpHelperInterface, logger: logging.Logger, account: Account) -> None:
super().__init__()
assert (account is not None)
self._base_url = 'https://codeforces.com/'
self.logger: logging.Logger = logger
self.account: Account = account
self.http = http_util
self.api_sub_logger: logging.Logger = logging.getLogger(f'{APP_NAME}.yxr-cf-core')
[docs] async def init(self) -> None:
await self.http.open_session()
[docs] async def deinit(self) -> None:
await self.http.close_session()
[docs] def pid2url(self, problem_id: str):
return self._base_url[:-1] + pid2url(problem_id)
[docs] def pid2file_path(self, problem_id: str):
contest_id, problem_key = pid2split(problem_id)
return os.path.join(contest_id, problem_key)
[docs] def problem_by_id(self, problem_id: str) -> ParsedProblemResult:
return self.async_2_sync_session_wrap(lambda: self.async_problem_by_id(problem_id))
[docs] async def async_problem_by_id(self, problem_id: str) -> ParsedProblemResult:
contest_id, problem_key = pid2split(problem_id)
self.logger.debug(f'{problem_id} => {contest_id}, {problem_key}')
result = await async_problem(http=self.http, contest_id=contest_id, level=problem_key)
return ParsedProblemResult(
status=ParsedProblemResult.Status.NOTVIS, # TODO for show progress
title=result.title,
test_cases=list(map(lambda x: TestCase(in_data=x.in_data, out_data=x.out_data), result.test_cases)),
id=problem_id,
oj=Codeforces.__name__,
description=result.description,
time_limit=result.time_limit,
mem_limit=result.mem_limit,
url=self.pid2url(problem_id),
html=result.html,
file_path=self.pid2file_path(problem_id))
[docs] def problem(self, problem: ProblemMeta) -> ParsedProblemResult:
return self.problem_by_id(problem.contest_id + problem.id)
[docs] async def async_problem(self, problem: ProblemMeta) -> ParsedProblemResult:
return await self.async_problem_by_id(problem.contest_id + problem.id)
[docs] def login_website(self, force=False) -> bool: # return successful
return self.async_2_sync_session_wrap(lambda: self.async_login_website(force=force))
# Force: true/false login whatever login before
# TODO 逻辑还是有点问题,要实现支持
# - 未登录 => 登录
# - 已登录 => 不操作
# 强制:
# - 未登录 => 登录
# - 已登录 => 取消cookies等 强制登录
[docs] async def async_login_website(self, force=False) -> bool: # return successful
if not force:
# try using cookies
self.logger.info(f"{GREEN}Checking Log in {DEFAULT}")
try:
if await self.async_is_login():
self.logger.info(f"{GREEN}{self.account.account} is Logged in {Codeforces.__name__}{DEFAULT}")
return True
except (ReadTimeout, ConnectTimeout) as e:
self.logger.error(f'Http Timeout[{type(e).__name__}]: {e.request.url}')
except Exception as e:
self.logger.exception(e)
try:
self.logger.debug(f"{GREEN}{self.account.account} Logining {Codeforces.__name__}{DEFAULT}")
return (await async_login(http=self.http,
handle=self.account.account,
password=AESCipher(CIPHER_KEY).decrypt(self.account.password))).success
except (ReadTimeout, ConnectTimeout) as e:
self.logger.error(f'Http Timeout[{type(e).__name__}]: {e.request.url}')
except Exception as e:
self.logger.exception(e)
return False
def _is_login(self) -> bool:
return self.async_2_sync_session_wrap(lambda: self.async_is_login())
[docs] async def async_is_login(self) -> bool:
ok, html_data = await async_fetch_logged_in(self.http)
return ok
[docs] def reg_contest(self, contest_id: str) -> bool:
return self.async_2_sync_session_wrap(lambda: self.async_reg_contest(contest_id))
[docs] async def async_reg_contest(self, contest_id: str) -> bool:
if not await self.async_login_website():
raise Exception('Login Failed')
result = await async_register(http=self.http, contest_id=contest_id)
return result.msg == RegisterResultMsg.AlreadyRegistered or result.msg == RegisterResultMsg.HaveBeenRegistered
[docs] def submit_code(self, problem_url: str, language_id: str, code_path: str) -> bool:
# https://codeforces.com/contest/1740/problem/G
contest_id, problem_key = problem_url_parse(problem_url)
sid = contest_id + problem_key
return self.async_2_sync_session_wrap(lambda: self.async_submit_code(sid, language_id, code_path))
# TODO move sid out as just Syntactic sugar
[docs] async def async_submit_code(self, sid: str, language_id: str, code_path: str) -> bool:
if not await self.async_login_website():
raise Exception('Login Failed')
contest_id, problem_key = pid2split(sid)
self.logger.debug(f'{contest_id},{problem_key}')
submit_id, resp = await async_submit(http=self.http,
contest_id=contest_id,
level=problem_key,
file_path=code_path,
lang_id=language_id,
logger=self.api_sub_logger)
self.logger.debug(f'submit_id = {submit_id}')
return bool(submit_id)
[docs] async def async_get_result_yield(self, problem_url: str, time_gap: float = 2) -> AsyncIterator[SubmissionResult]:
contest_id, problem_key = problem_url_parse(problem_url)
# TODO move more parse inside codeforces-core ? cf是出错中断形式,状态+数量
def page_result_transform(res: SubmissionPageResult) -> SubmissionResult:
self.logger.debug('page res:' + str(res))
cur_status = SubmissionResult.Status.PENDING
if res.verdict.startswith('Running'):
cur_status = SubmissionResult.Status.RUNNING
elif res.verdict.startswith('In queue'):
cur_status = SubmissionResult.Status.RUNNING
elif res.verdict.startswith('Accepted'):
cur_status = SubmissionResult.Status.AC
elif res.verdict.startswith('Pretests passed'):
cur_status = SubmissionResult.Status.AC
elif res.verdict.startswith('Wrong answer'):
cur_status = SubmissionResult.Status.WA
elif res.verdict.startswith('Time limit exceeded'):
cur_status = SubmissionResult.Status.TLE
elif res.verdict.startswith('Runtime error'): # Runtime error on pretest 2
cur_status = SubmissionResult.Status.RE
else:
self.logger.error('NOT HANDLE PAGE:' + str(res.verdict))
if res.url.startswith('/'):
res.url = self._base_url[:-1] + res.url
return SubmissionResult(id=res.id,
cur_status=cur_status,
time_note=res.time_ms + ' ms',
mem_note=str(int(res.mem_bytes) / 1000) + ' kb',
url=res.url,
msg_txt=res.verdict)
# TODO move more parse inside codeforces-core ?
def ws_result_transform(res: SubmissionWSResult) -> SubmissionResult:
cur_status = SubmissionResult.Status.PENDING
if res.msg == 'TESTING':
cur_status = SubmissionResult.Status.RUNNING
elif res.msg == 'OK':
cur_status = SubmissionResult.Status.AC
elif res.msg == 'WRONG_ANSWER':
cur_status = SubmissionResult.Status.WA
else:
self.logger.error('NOT HANDLE WS:' + str(res.msg))
msg_txt = str(res.testcases)
if cur_status in [SubmissionResult.Status.AC, SubmissionResult.Status.WA]:
msg_txt = f'{res.passed}/{res.testcases}'
return SubmissionResult(
id=str(res.submit_id),
cur_status=cur_status,
time_note=str(res.ms) + ' ms',
mem_note=str(int(res.mem) / 1000) + ' kb',
url=f'{self._base_url}contest/{res.contest_id}/submission/{res.submit_id}',
msg_txt=msg_txt,
)
# TODO visit page without ws first
results = await async_fetch_submission_page(http=self.http, problem_url=problem_url, logger=self.api_sub_logger)
fix_submit_id = ''
if len(results) > 0:
result = page_result_transform(results[0])
fix_submit_id = result.id
self.logger.debug(f"fix submit_id = {fix_submit_id}")
yield result
if result.cur_status not in [SubmissionResult.Status.PENDING, SubmissionResult.Status.RUNNING]:
return
self.logger.debug('after page result, enter ws result')
# return (end watch?, transform result)
def custom_handler(result: Any) -> Tuple[bool, SubmissionWSResult]:
parsed_data = transform_submission(result)
if fix_submit_id and fix_submit_id != parsed_data.contest_id: # submit id not match, dont end watch ws
return False, parsed_data
if parsed_data.msg != 'TESTING':
return True, parsed_data
return False, parsed_data
# TODO add timeout for ws
# TODO 可能有别人的? pc/cc?
async for wsresult in create_contest_ws_task_yield(http=self.http,
contest_id=contest_id,
ws_handler=custom_handler,
logger=self.api_sub_logger):
self.logger.debug('ws res:' + str(wsresult))
if fix_submit_id and str(wsresult.submit_id) != fix_submit_id:
self.logger.debug(f'[skip]fixed id not match! continue, fixed[{fix_submit_id}], ws[{wsresult.submit_id}]]')
continue
data = ws_result_transform(wsresult)
yield data
if data.cur_status not in [SubmissionResult.Status.PENDING, SubmissionResult.Status.RUNNING]:
return
results = await async_fetch_submission_page(http=self.http, problem_url=problem_url)
assert len(results) > 0
yield page_result_transform(results[0])
[docs] def get_language(self) -> LangKV:
return self.async_2_sync_session_wrap(lambda: self.async_get_language())
[docs] async def async_get_language(self) -> LangKV:
await self.async_login_website()
res = await async_language(self.http)
ret: LangKV = {}
for item in res:
ret[item.value] = item.text
return ret
[docs] @staticmethod
def support_contest() -> bool:
return True
[docs] def print_contest_list(self) -> bool:
return self.async_2_sync_session_wrap(lambda: self.async_print_contest_list())
[docs] async def async_print_contest_list(self) -> bool:
await self.async_login_website()
result = await async_contest_list(http=self.http)
from .contestList import printData
printData(result)
return True
[docs] def async_2_sync_session_wrap(self, fn):
async def task():
await self.http.open_session()
result = await fn()
await self.http.close_session()
return result
return asyncio.run(task())
[docs] def print_friends_standing(self, cid: str) -> None:
return self.async_2_sync_session_wrap(lambda: self.async_print_friends_standing(cid))
[docs] async def async_print_friends_standing(self, cid: str) -> None:
result = await async_friends_standing(http=self.http, contest_id=cid)
from .standing import printData
printData(result, title=f"Friends standing {result.url}", handle=self.account.account)
[docs] def cid2url(self, cid: str) -> str:
return f'{self._base_url}contests/{cid}'