import argparse, time, importlib import feedparser import f_run_once, f_sigint from settings import * from mastodon import Mastodon from os.path import exists from datetime import timedelta, datetime def main(): # print("Start ...") if not exists(app_path): Mastodon.create_app( bot_name, api_base_url = api_url, to_file = app_path ) if not exists(user_path): mastodon = Mastodon( client_id = app_path, api_base_url = api_url ) mastodon.log_in( account_name, account_password, to_file = user_path ) api = Mastodon( access_token = user_path, api_base_url = api_url ) for j, url in enumerate(feeds): save = False feed = feedparser.parse(url) # print(feed['feed']['title']) datafile = f"{log_path}sent_{j}.data" if not exists(datafile): data = open(datafile, 'w') data.close() send_data = [] with open(datafile,'r') as f: for line in f: send_data.append(line.strip()) f.close() reply_id = None visibility = 'public' for item in feed['items']: title = item['title'].encode('latin-1', 'ignore').decode('latin-1') line = f"{title}{item['link']}" try: pubdate = datetime.strptime(item['published'],'%a, %d %b %Y %H:%M:%S %z') except ValueError: try: pubdate = datetime.fromisoformat(item['published']) except ValueError: pubdate = datetime.now(pubdate.tzinfo) # Fix later pubdate is not defined if pubdate > datetime.now(pubdate.tzinfo) - timedelta(days=5): if not line in send_data: try: result = api.status_post( (title + "\n\n Source: " + item['link']), reply_id, #in_reply_to_id None, #media_ids False, #sensitive visibility, #visibility ('direct', 'private', 'unlisted', 'public') None, #spoiler_text None, #language line, #idempotency_key None, #content_type None, #scheduled_at None, #poll None, #quote_id ) except Exception as exception: result = False print("err: (" + type(exception).__name__ + ") " + title + item['link']) if result and result.id: if not reply_id: reply_id = result.id visibility = 'unlisted' send_data.append(line) save = True if save: save = False while len(send_data) > 1000: send_data.pop(0) with open(datafile,'w+') as f: for lines in send_data: f.write('%s\n' %lines) f.close() def startup(): parser = argparse.ArgumentParser(description = "Mastodon robot posting RSS-Feeds") parser.add_argument('--settingsfile', dest='settingsfile', action='store', help='load another feedslist') parser.add_argument('--daemon', dest='daemon', action='store_true', help='run in daemon mode and repeat after a delay') parser.add_argument('--delay', dest='daemon_delay', action='store', type=int, help='number of seconds to wait for next run default=3600') args = parser.parse_args() f_run_once.run_once() run_as_daemon = False if args.settingsfile: global feeds global log_path global data_path global app_path global user_path global bot_name global account_name global account_password feedslist = importlib.import_module(args.settingsfile) feeds = feedslist.feeds log_path = feedslist.log_path data_path = feedslist.data_path app_path = feedslist.app_path user_path = feedslist.user_path api_url = feedslist.api_url bot_name = feedslist.bot_name account_name = feedslist.account_name account_password = feedslist.account_password # for j, url in enumerate(feeds): # print(url) if args.daemon or args.daemon_delay: run_as_daemon = True if run_as_daemon: daemon_delay = 3600 if args.daemon_delay: daemon_delay = args.daemon_delay if daemon_delay == 0 or not isinstance(daemon_delay, int): daemon_delay = 3600 if run_as_daemon: while True: main() time.sleep(daemon_delay) else: main() if __name__ == "__main__": startup()