Skip Navigation

Search

Generic Typing with Iterable or Collection: Should you use it?

I often find myself defining function args with list[SomeClass] type and think "do I really care that it's a list? No, tuple or Generator is fine, too". I then tend to use Iterable[SomeClass] or Collection[SomeClass]. But when it comes to str, I really don't like that solution, because if you have this function:

python def foo(bar: Collection[str]) -> None: pass

Then calling foo("hello") is fine, too, because "hello" is a collection of strings with length 1, which would not be fine if I just used list[str] in the first place. What would you do in a situation like this?

12

...

The instance where I was using it changed it's rules to prevent bots from posting in it and I didn't care enough to search for another instance.

https://lemm.ee/c/issue_tracker?dataType=Post&page=1&sort=Active

`config_template.py py LEMMY_INSTANCE_URL = "" LEMMY_COMMUNITY_NAME = "" LEMMY_USERNAME = "" LEMMY_PASSWORD = "" GITHUB_API_BASE = "https://api.github.com" GITHUB_URL_BASE = "https://github.com" REPOSITORIES = ["LemmyNet/lemmy", "LemmyNet/lemmy-ui"] DB_FILE = "lemmy_github.db" DELAY = 1 MAX_BACKOFF_TIME = 300 PERSONAL_ACCESS_TOKEN = ""

github_lemmy_issue_reposter.py ```py import backoff import datetime import logging import requests import schedule import sqlite3 import time

from config import * from pythorhead import Lemmy from typing import Any, Dict, Generator, List, Optional, Tuple, Callable, TypeVar

T = TypeVar('T')

"[%(levelname)s]:%(asctime)s:%(name)s [%(filename)s:%(lineno)s - %(funcName)s()] %(message)s"

FORMAT = "%(message)s" logging.basicConfig( level=logging.INFO, format=FORMAT, handlers=[logging.FileHandler("debug.log", mode="w"), logging.StreamHandler()], )

def on_giveup(details: Dict[str, int]) -> None: logging.error(f"Failed to fetch issues after {details['tries']} attempts", exc_info=True)

def handle_errors(message: Optional[str] = None) -> Callable[[Callable[..., T]], Callable[..., T]]: def decorator(function: Callable[..., T]) -> Callable[..., T]: def wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> T: try: return function(*args, **kwargs) except Exception as e: if message: logging.exception(f"{message} - Error in {function.name}:\n{e}") else: logging.exception(f"Error in {function.name}:\n{e}") raise

return wrapper

return decorator

class GitHubIssue: def init(self, issue_dict: dict[str, Any], github_repo: str) -> None: try: self.url = issue_dict["html_url"] logging.info(f"Creating issue {self.url}") self.state = issue_dict["state"] self.state_fmt = "[Closed]" if issue_dict["state"] == "closed" else "" self.repo_abbr = "[UI]" if "lemmy-ui" in github_repo else "[BE]" self.title = f"{self.state_fmt}{self.repo_abbr} {issue_dict['title']} #{issue_dict['number']}" self.title = self.title[:200] self.body = issue_dict["body"] if self.body is not None: self.body = self.body[:30000] self.user = issue_dict["user"]["login"] self.user_url = issue_dict["user"]["html_url"] self.updated_at = datetime.datetime.strptime(issue_dict["updated_at"], '%Y-%m-%dT%H:%M:%SZ') except Exception as e: log_message: str = ( f"Formatted issue:\n" f" - Repo: {github_repo}\n" f" - Issue State: {self.state}\n" f" - Repo Abbreviation: {self.repo_abbr}\n" f" - Title: {self.title}\n" f" - URL: {self.url}\n" f" - User: {self.user}\n" f" - User URL: {self.user_url}\n" f" - Updated At: {self.updated_at}\n" ) logging.exception(log_message) logging.exception(e)

@property def formatted_body(self) -> str: formatted_body: str = self.body try: if self.body is not None: formatted_body = self.body.replace("\n", "\n> ") formatted_body = f"> {formatted_body}\n> \n> Originally posted by {self.user} in #{self.number}" except Exception as e: logging.exception(f"Error formatting body for {self.url}\n{e}") return formatted_body

@property def number(self) -> int: return int(self.url.split("/")[-1])

class GitHubComment: def init(self, comment_dict: dict[str, Any], issue_number: int) -> None: self.id = comment_dict["id"] self.body = comment_dict["body"] self.user = comment_dict["user"]["login"] self.user_url = comment_dict["user"]["html_url"] self.url = comment_dict["html_url"] self.issue_number = issue_number

@property def formatted_comment(self) -> str: formatted_body:str = self.body.replace("\n", "\n> ") formatted_body = f"> {formatted_body}\n> \n> Originally posted by {self.user} in #{self.issue_number}" return formatted_body

@handle_errors("Error initializing database") def initialize_database() -> sqlite3.Connection: logging.info("Initializing database") conn: sqlite3.Connection = sqlite3.connect(DB_FILE) cursor: sqlite3.Cursor = conn.cursor() cursor.execute( """ CREATE TABLE IF NOT EXISTS posts ( issue_number INTEGER PRIMARY KEY, lemmy_post_id INTEGER NOT NULL UNIQUE, issue_title TEXT, issue_body TEXT, updated_at TIMESTAMP DEFAULT NULL ) """ ) cursor.execute( """ CREATE TABLE IF NOT EXISTS comments ( github_comment_id INTEGER PRIMARY KEY, lemmy_comment_id INTEGER NOT NULL UNIQUE, comment_user TEXT, comment_body TEXT updated_at TIMESTAMP DEFAULT NULL ) """ ) cursor.execute( """ CREATE TABLE IF NOT EXISTS last_updated ( id INTEGER PRIMARY KEY, last_updated_time TIMESTAMP ); """ ) conn.commit() return conn

def get_last_updated_time() -> str: conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute("SELECT last_updated_time FROM last_updated WHERE id = 1") last_updated_time: str = cursor.fetchone()[0] conn.close()

return last_updated_time

def update_last_updated_time() -> None: conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() current_time = datetime.datetime.utcnow().isoformat()

cursor.execute("UPDATE last_updated SET last_updated_time = ? WHERE id = 1", (current_time,)) if cursor.rowcount == 0: cursor.execute("INSERT INTO last_updated (id, last_updated_time) VALUES (1, ?)", (current_time,))

conn.commit() conn.close() logging.info("Updated last updated time")

def update_post_time(post_id: int, updated_at: datetime.datetime) -> None: conn: sqlite3.Connection = sqlite3.connect(DB_FILE) cursor: sqlite3.Cursor = conn.cursor() time_formatted = updated_at.strftime('%Y-%m-%d %H:%M:%S') SQL = "UPDATE posts SET updated_at = ? WHERE lemmy_post_id = ?" cursor.execute(SQL, (time_formatted, post_id)) conn.commit() conn.close()

def check_updated_at(issue_number: int) -> Optional[Tuple[int, str, str, Optional[str]]]: logging.info(f"Checking last post update for {issue_number}") conn: sqlite3.Connection = sqlite3.connect(DB_FILE) cursor: sqlite3.Cursor = conn.cursor() SQL = "SELECT lemmy_post_id, issue_title, issue_body, updated_at FROM posts WHERE issue_number = ?" cursor.execute(SQL, (issue_number,)) result: Tuple[int, str, str, Optional[str]] = cursor.fetchone() conn.close()

if result is None: logging.info(f"No post found for {issue_number}") return None else: logging.info(f"Found post for {issue_number}") return result

@handle_errors("Error initializing Lemmy instance") def initialize_lemmy_instance() -> Lemmy: logging.info("Initializing Lemmy instance") lemmy = Lemmy(LEMMY_INSTANCE_URL) logging.info(f"Initialized Lemmy instance in {LEMMY_INSTANCE_URL}") lemmy.log_in(LEMMY_USERNAME, LEMMY_PASSWORD) logging.info(f"Logged in to Lemmy instance with user {LEMMY_USERNAME}") return lemmy

@backoff.on_exception( backoff.expo, (requests.exceptions.RequestException, TypeError), max_time=MAX_BACKOFF_TIME, on_giveup=on_giveup, ) def fetch_github_data(url: str) -> List[Dict[str, Any]]: global LAST_REQUEST_TIME try: headers = { "Accept": "application/vnd.github+json", "Authorization": f"Bearer {PERSONAL_ACCESS_TOKEN}", "X-GitHub-Api-Version": "2022-11-28", } time_elapsed = time.time() - LAST_REQUEST_TIME required_delay = max(0, DELAY - time_elapsed) time.sleep(required_delay) response = requests.get(url, headers=headers) LAST_REQUEST_TIME = time.time() logging.info(f"Fetched data from {url}") res: List[Dict[str, Any]] = response.json() return res except requests.exceptions.RequestException as e: logging.exception(f"Error fetching data from {url}\n{e}") raise

def check_existing_post(issue_number: str) -> Optional[int]: conn: sqlite3.Connection = sqlite3.connect(DB_FILE) cursor: sqlite3.Cursor = conn.cursor() SQL = "SELECT lemmy_post_id FROM posts WHERE issue_number=?" cursor.execute(SQL, (issue_number,)) post_id: Optional[tuple[int]] = cursor.fetchone() if post_id: return post_id[0] return None

def insert_post_to_db(issue: GitHubIssue, lemmy_post_id: Optional[int]) -> None: try: conn: sqlite3.Connection = sqlite3.connect(DB_FILE) cursor: sqlite3.Cursor = conn.cursor() SQL = "INSERT INTO posts (issue_number, lemmy_post_id, issue_title, issue_body, updated_at) VALUES (?, ?, ?, ?, ?)" cursor.execute(SQL, (issue.number, lemmy_post_id, issue.title, issue.formatted_body, issue.updated_at)) conn.commit() logging.info(f"Inserted new Lemmy post {lemmy_post_id} into the database") except sqlite3.Error as e: logging.exception(f"Error inserting post into the database for issue {issue.title} with url {issue.url}\n{e}") raise

def insert_comment_to_database(cursor: sqlite3.Cursor, github_comment_id: int, lemmy_comment_id: int, comment: GitHubComment) -> None: try: SQL = "INSERT INTO comments (github_comment_id, lemmy_comment_id, comment_user, comment_body) VALUES (?, ?, ?, ?)" cursor.execute(SQL, (github_comment_id, lemmy_comment_id, comment.user, comment.formatted_comment,)) logging.info(f"Inserted comment {github_comment_id} into the database") except Exception as e: logging.exception(f"Error encountered while inserting comment {github_comment_id} to database\n{e}")

@backoff.on_exception( backoff.expo, (requests.exceptions.RequestException, TypeError), max_time=MAX_BACKOFF_TIME, on_giveup=on_giveup, ) def create_lemmy_post(lemmy: Any, community_id: int, issue: GitHubIssue) -> Optional[int]: lemmy_post_id: Optional[int] = None lemmy_post_id = lemmy.post.create(community_id, issue.title, url=issue.url, body=issue.body)["post_view"]["post"]["id"] lemmy_url = f"{LEMMY_INSTANCE_URL}/post/{lemmy_post_id}" logging.info(f"Posted issue {lemmy_url}")

return lemmy_post_id

@backoff.on_exception( backoff.expo, (requests.exceptions.RequestException, TypeError), max_time=MAX_BACKOFF_TIME, on_giveup=on_giveup, ) def create_lemmy_comment(lemmy: Any, post_id: Optional[int], comment: GitHubComment) -> Optional[int]: logging.info(f"Creating new Lemmy comment in {LEMMY_INSTANCE_URL}/post/{post_id}")

if not post_id: logging.warning("Post ID is empty. Skipping comment creation") return None

response = lemmy.comment.create(post_id, comment.formatted_comment) lemmy_comment_id:int = response["comment_view"]["comment"]["id"] logging.info(f"Successfully created Lemmy comment {LEMMY_INSTANCE_URL}/comment/{lemmy_comment_id}")

return lemmy_comment_id

def get_total_issues(github_repo: str) -> int: url: str = f"https://api.github.com/repos/{github_repo}" data: List[Dict[str, Any]] = fetch_github_data(url) total_issues: int = data["open_issues_count"] return total_issues

def fetch_issues(github_repo: str, last_updated_time: str) -> Generator[Dict[str, Any], None, None]: page = 1 per_page = 100 issues_url = (f"{GITHUB_API_BASE}/repos/{github_repo}/issues?state=all&since={last_updated_time}&per_page={per_page}")

while True: page_url = f"{issues_url}&page={page}" issues: List[Dict[str, Any]] = fetch_github_data(page_url)

if not issues: break

for issue_dict in issues: yield issue_dict

page += 1

@backoff.on_exception( backoff.expo, (requests.exceptions.RequestException, TypeError), max_time=MAX_BACKOFF_TIME, on_giveup=on_giveup, ) def edit_lemmy_post(lemmy: Any, lemmy_post_id: int, issue: GitHubIssue) -> None: lemmy.post.edit(lemmy_post_id, name=issue.title, url=issue.url, body=issue.body)

def process_issues(lemmy: Any, community_id: int, github_repo: str) -> None: last_updated_time = get_last_updated_time() update_last_updated_time() for issue_dict in fetch_issues(github_repo, last_updated_time): process_issue(lemmy, community_id, github_repo, issue_dict)

def process_issue(lemmy: Any, community_id: int, github_repo: str, issue_dict: dict[str, Any]) -> None: issue: GitHubIssue = GitHubIssue(issue_dict, github_repo) res: Optional[Tuple[int, str, str, Optional[str]]] = check_updated_at(issue.number)

if res is None: create_new_lemmy_post(lemmy, community_id, github_repo, issue) else: lemmy_post_id, existing_title, existing_body, updated_at = res if updated_at is None or has_enough_time_passed(updated_at, issue.updated_at): update_issue_if_needed(lemmy, lemmy_post_id, existing_title, existing_body, issue) process_comments(lemmy, lemmy_post_id, github_repo, issue) update_post_time(lemmy_post_id, issue.updated_at)

def has_enough_time_passed(old_updated_at_str: str, new_updated_at: datetime.datetime) -> bool: old_updated_at = datetime.datetime.strptime(old_updated_at_str, '%Y-%m-%d %H:%M:%S') time_difference: datetime.timedelta = new_updated_at - old_updated_at return time_difference >= datetime.timedelta(hours=2)

def update_issue_if_needed(lemmy: Any, lemmy_post_id: int, existing_title: str, existing_body: str, issue: GitHubIssue) -> None: if existing_title != issue.title or existing_body != issue.formatted_body: edit_lemmy_post(lemmy, lemmy_post_id, issue)

def create_new_lemmy_post(lemmy: Any, community_id: int, github_repo: str, issue: GitHubIssue) -> None: lemmy_post_id: Optional[int] = post_issue_to_lemmy(lemmy, community_id, issue) insert_post_to_db(issue, lemmy_post_id) process_comments(lemmy, lemmy_post_id, github_repo, issue)

def post_issue_to_lemmy(lemmy: Any, community_id: int, issue: GitHubIssue) -> Optional[int]: try: logging.info(f"Start posting issue {issue.title} to community {community_id}") lemmy_post_id: Optional[int] = create_lemmy_post(lemmy, community_id, issue) return lemmy_post_id except Exception as e: logging.exception(f"Error posting issue {issue.title} to community {community_id}\n{e}") return None

def process_comments(lemmy: Any, post_id: Optional[int], github_repo: str, issue: GitHubIssue) -> None: try: logging.info(f"Posting comments from issue #{issue.number} to Lemmy post {LEMMY_INSTANCE_URL}/post/{post_id}") comments_url: str = f"{GITHUB_API_BASE}/repos/{github_repo}/issues/{issue.number}/comments" comments: Dict[str, Any] = fetch_github_data(comments_url) for comment_data in comments: if isinstance(comment_data, str): logging.warning(f"Skipping comment {comment_data}") continue process_comment(lemmy, github_repo, comment_data, post_id, issue.number) except Exception as e: logging.exception(f"Error posting comments to lemmy post {post_id}\n{e}")

def process_comment(lemmy: Any, github_repo: str, comment_data: Dict[str, Any], post_id: Optional[int], issue_number: int) -> None: conn: sqlite3.Connection = sqlite3.connect(DB_FILE) cursor: sqlite3.Cursor = conn.cursor() comment = GitHubComment(comment_data, issue_number)

existing_comment_id: Optional[int] = get_existing_comment_id(cursor, comment.id) if existing_comment_id: logging.info(f"Skipping existing comment with GitHub comment ID: {comment.id}") return

post_comment_to_lemmy(cursor, lemmy, github_repo, comment, post_id, issue_number) conn.commit()

def post_comment_to_lemmy(cursor: sqlite3.Cursor, lemmy: Any, github_repo: str, comment: GitHubComment, post_id: Optional[int], issue_number: int) -> None: lemmy_post_url = f"{LEMMY_INSTANCE_URL}/post/{post_id}" comment_url = f"{GITHUB_URL_BASE}/{github_repo}/issues/{issue_number}#issuecomment-{comment.id}" logging.info(f"Posting comment {comment.url} to Lemmy post {lemmy_post_url}") lemmy_comment_id: Optional[int] = create_lemmy_comment(lemmy, post_id, comment)

if not lemmy_comment_id: logging.exception(f"Error creating Lemmy comment {lemmy_comment_id} to {lemmy_post_url} from Github comment {comment.url}") return

logging.info(f"Posted comment {comment_url} to Lemmy post {lemmy_post_url}") insert_comment_to_database(cursor, comment.id, lemmy_comment_id, comment)

def get_existing_comment_id(cursor: sqlite3.Cursor, github_comment_id: int) -> Optional[int]: logging.info(f"Checking if comment with GitHub comment ID: {github_comment_id} exists") cursor.execute("SELECT lemmy_comment_id FROM comments WHERE github_comment_id=?", (github_comment_id,)) existing_comment = cursor.fetchone() if existing_comment is not None: logging.info(f"Found existing comment with GitHub comment ID: {github_comment_id}") existing_comment_id: int = existing_comment[0] return existing_comment_id else: logging.info(f"No existing comment found with GitHub comment ID: {github_comment_id}") return None

def fetch_issue_data(github_repo: str) -> List[Tuple[str, Optional[int]]]: logging.info("Fetching the GitHub issue number and Lemmy post ID for all issues") conn: sqlite3.Connection = sqlite3.connect(DB_FILE) cursor: sqlite3.Cursor = conn.cursor() SQL = "SELECT issue_url, lemmy_post_id FROM posts WHERE issue_url LIKE ?" issues_url = f"https://github.com/{github_repo}/issues/%" issue_data = cursor.execute(SQL, (issues_url,)).fetchall() logging.info(f"Fetched {len(issue_data)} issues") return issue_data

def process_repo(lemmy: Any, community_id: int, github_repo: str) -> None: try: logging.info(f"Processing repository {github_repo}") process_issues(lemmy, community_id, github_repo) except Exception as e: logging.exception(f"Error occurred while processing repository {github_repo}\n{e}")

def main() -> None: logging.info("Running main function") initialize_database() lemmy = initialize_lemmy_instance() community_id = lemmy.discover_community(LEMMY_COMMUNITY_NAME)

for github_repo in REPOSITORIES: process_repo(lemmy, community_id, github_repo)

def run_periodically() -> None: logging.info("Starting periodic run") schedule.every(1).hours.do(main)

while True: try: schedule.run_pending() except Exception as e: logging.exception(f"Error occurred during scheduling\n{e}") time.sleep(60)

if name == "main": try: logging.info("Starting script") main() run_periodically() except Exception as e: logging.exception(f"Error occurred during script execution\n{e}") ```

requirements.txt pythorhead==0.12.3 schedule==1.2.0 backoff==2.2.1 feedparser==6.0.10

1

Has using 'thing = list()' instead of 'thing: list = ' any downsides?

I have seen some people prefer to create a list of strings by using thing = list[str]() instead of thing: list[str] = []. I think it looks kinda weird, but maybe that's just because I have never seen that syntax before. Does that have any downsides?

It is also possible to use this for dicts: thing = dict[str, SomeClass](). Looks equally weird to me. Is that widely used? Would you use it? Would you point it out in a code review?

12

Optimizing Script to Find Fast Instances

Last month, I developed a script because lemmy.ml had become too slow. Unfortunately, I have the same problem again, but this time there are too many instances to evaluate, causing the script to take an excessively long time to complete. I'm seeking advice on how to enhance the script to simultaneously ping multiple instances. Are there any alternative scripts available that might provide a more efficient solution?

git clone https://github.com/LemmyNet/lemmy-stats-crawler cd lemmy-stats-crawler cargo run -- --json > stats.json

```python #!/usr/bin/env python3 import json import time import requests import requests.exceptions

from typing import List, Dict

TIME_BETWEEN_REQUESTS = 5 # 10 * 60 = 10 minutes TIME_TOTAL = 60 # 8 * 60 * 60 = 8 hours

def get_latency(domain): try: start = time.time() if not domain.startswith(("http://", "https://")): domain = "https://" + domain requests.get(domain, timeout=3) end = time.time() return end - start except requests.exceptions.Timeout: return float("inf")

def measure_latencies(domains, duration): latencies = {} start_time = time.time() end_time = start_time + duration while time.time() < end_time: latencies = measure_latencies_for_domains(domains, latencies) time.sleep(TIME_BETWEEN_REQUESTS) return latencies

def measure_latencies_for_domains(domains, latencies): for domain in domains: latency = get_latency(domain) latencies = add_latency_to_domain(domain, latency, latencies) return latencies

def add_latency_to_domain(domain, latency, latencies): if domain not in latencies: latencies[domain] = [] latencies[domain].append(latency) return latencies

def average_latencies(latencies): averages = [] for domain, latency_list in latencies.items(): avg_latency = sum(latency_list) / len(latency_list) averages.append((domain, avg_latency)) return averages

def sort_latencies(averages): return sorted(averages, key=lambda x: x[1])

def get_latency_report(domains, duration): latencies = measure_latencies(domains, duration) averages = average_latencies(latencies) return sort_latencies(averages)

def get_instances(data: Dict) -> List[Dict]: instances = [] for instance_details in data["instance_details"]: instances.append(instance_details) return instances

def get_domains(instances: List[Dict]) -> List[str]: return [instance["domain"] for instance in instances]

def load_json_data(filepath: str) -> Dict: with open(filepath) as json_data: return json.load(json_data)

def main(): data = load_json_data('stats.json') instances = get_instances(data) domains = get_domains(instances) report = get_latency_report(domains, TIME_TOTAL) for domain, avg_latency in report: print(f"{domain}: {avg_latency:.2f} seconds")

if name == "main": main() ```

6