1from __future__ import annotations 2 3import argparse 4import datetime 5import json 6import os 7import time 8import urllib.parse 9from typing import Any, Callable, cast, Dict, List 10from urllib.error import HTTPError 11from urllib.request import Request, urlopen 12 13from tools.stats.upload_stats_lib import upload_to_s3 14 15 16FILTER_OUT_USERS = { 17 "pytorchmergebot", 18 "facebook-github-bot", 19 "pytorch-bot[bot]", 20 "pytorchbot", 21 "pytorchupdatebot", 22 "dependabot[bot]", 23} 24 25 26def _fetch_url( 27 url: str, 28 headers: dict[str, str], 29 data: dict[str, Any] | None = None, 30 method: str | None = None, 31 reader: Callable[[Any], Any] = lambda x: x.read(), 32) -> Any: 33 token = os.environ.get("GITHUB_TOKEN") 34 if token is not None and url.startswith("https://api.github.com/"): 35 headers["Authorization"] = f"token {token}" 36 data_ = json.dumps(data).encode() if data is not None else None 37 try: 38 with urlopen(Request(url, headers=headers, data=data_, method=method)) as conn: 39 return reader(conn) 40 except HTTPError as err: 41 print(err.reason) 42 print(err.headers) 43 if err.code == 403 and all( 44 key in err.headers for key in ["X-RateLimit-Limit", "X-RateLimit-Used"] 45 ): 46 print( 47 f"Rate limit exceeded: {err.headers['X-RateLimit-Used']}/{err.headers['X-RateLimit-Limit']}" 48 ) 49 raise 50 51 52def fetch_json( 53 url: str, 54 params: dict[str, Any] | None = None, 55 data: dict[str, Any] | None = None, 56) -> list[dict[str, Any]]: 57 headers = {"Accept": "application/vnd.github.v3+json"} 58 if params is not None and len(params) > 0: 59 url += "?" + "&".join( 60 f"{name}={urllib.parse.quote(str(val))}" for name, val in params.items() 61 ) 62 return cast( 63 List[Dict[str, Any]], 64 _fetch_url(url, headers=headers, data=data, reader=json.load), 65 ) 66 67 68def get_external_pr_data( 69 start_date: datetime.date, end_date: datetime.date, period_length: int = 1 70) -> list[dict[str, Any]]: 71 pr_info = [] 72 period_begin_date = start_date 73 74 pr_count = 0 75 users: set[str] = set() 76 while period_begin_date < end_date: 77 period_end_date = period_begin_date + datetime.timedelta(days=period_length - 1) 78 page = 1 79 responses: list[dict[str, Any]] = [] 80 while len(responses) > 0 or page == 1: 81 response = cast( 82 Dict[str, Any], 83 fetch_json( 84 "https://api.github.com/search/issues", 85 params={ 86 "q": f'repo:pytorch/pytorch is:pr is:closed \ 87 label:"open source" label:Merged -label:Reverted closed:{period_begin_date}..{period_end_date}', 88 "per_page": "100", 89 "page": str(page), 90 }, 91 ), 92 ) 93 items = response["items"] 94 for item in items: 95 u = item["user"]["login"] 96 if u not in FILTER_OUT_USERS: 97 pr_count += 1 98 users.add(u) 99 page += 1 100 101 pr_info.append( 102 { 103 "date": str(period_begin_date), 104 "pr_count": pr_count, 105 "user_count": len(users), 106 "users": list(users), 107 } 108 ) 109 period_begin_date = period_end_date + datetime.timedelta(days=1) 110 return pr_info 111 112 113if __name__ == "__main__": 114 parser = argparse.ArgumentParser( 115 description="Upload external contribution stats to Rockset" 116 ) 117 parser.add_argument( 118 "--startDate", 119 type=datetime.date.fromisoformat, 120 required=True, 121 help="the first date to upload data for in any valid ISO 8601 format format (eg. YYYY-MM-DD).", 122 ) 123 parser.add_argument( 124 "--length", 125 type=int, 126 required=False, 127 help="the number of days to upload data for. Default is 1.", 128 default=1, 129 ) 130 parser.add_argument( 131 "--period-length", 132 type=int, 133 required=False, 134 help="the number of days to group data for. Default is 1.", 135 default=1, 136 ) 137 args = parser.parse_args() 138 for i in range(args.length): 139 tries = 0 140 startdate = args.startDate + datetime.timedelta(days=i) 141 data = get_external_pr_data( 142 startdate, 143 startdate + datetime.timedelta(days=args.period_length), 144 period_length=args.period_length, 145 ) 146 for pr_info in data: 147 # sometimes users does not get added, so we check it got uploaded 148 assert "users" in pr_info 149 assert isinstance(pr_info["users"], list) 150 print(f"uploading the following data: \n {data}") 151 upload_to_s3( 152 bucket_name="torchci-contribution-data", 153 key=f"external_contribution_counts/{str(startdate)}", 154 docs=data, 155 ) 156 # get around rate limiting 157 time.sleep(10) 158