Compare commits
20 Commits
v168032163
...
master
Author | SHA1 | Date | |
---|---|---|---|
8ab616e5fd | |||
1749a8659a | |||
5592c0e7cd | |||
b8a482f86c | |||
d3595cf6b0 | |||
05d2f5829d | |||
4fd6848372 | |||
ff3319a2fc | |||
4568dd0e4c | |||
fc2e10f9ad | |||
52e563c4dd | |||
9d6a3c58d9 | |||
ce6d4d3ab8 | |||
d87c813ddf | |||
bd645ee02d | |||
a9eb6da4bd | |||
a05332ba66 | |||
9703aba83b | |||
ec6d5fd2a2 | |||
c9dc7fb270 |
@ -6,8 +6,34 @@ on:
|
||||
- master
|
||||
|
||||
jobs:
|
||||
check_for_changed_version:
|
||||
runs-on: 'ubuntu-latest'
|
||||
# Declare outputs for next jobs
|
||||
outputs:
|
||||
version_changed: ${{ steps.check_file_changed.outputs.version_changed }}
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
with:
|
||||
# Checkout as many commits as needed for the diff
|
||||
fetch-depth: 2
|
||||
- id: check_file_changed
|
||||
run: |
|
||||
# Diff HEAD with the previous commit
|
||||
if git diff HEAD^ HEAD pyproject.toml | grep -q "+version =";
|
||||
then
|
||||
GOTIME="True"
|
||||
else
|
||||
GOTIME="False"
|
||||
fi
|
||||
echo "::notice title=GOTIME::$GOTIME"
|
||||
# Set the output named "version_changed"
|
||||
echo "version_changed=$GOTIME" >> $GITHUB_OUTPUT
|
||||
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
# this should start working with https://github.com/go-gitea/gitea/pull/24230
|
||||
# needs: [ check_for_changed_version ]
|
||||
# if: needs.check_for_changed_version.outputs.version_changed == 'True'
|
||||
permissions:
|
||||
contents: write
|
||||
steps:
|
||||
@ -21,19 +47,21 @@ jobs:
|
||||
pip install --upgrade pip
|
||||
pip install shiv
|
||||
pip install poetry
|
||||
- name: Add CURRENT_TIME env property
|
||||
run: echo "CURRENT_TIME_VERSION=v$(date '+%s')" >> $GITHUB_ENV
|
||||
- name: Add VERSION env property
|
||||
run: |
|
||||
echo "VERSION=v$(poetry version | python -c 'import sys;print(sys.stdin.read().split()[1])')" >> $GITHUB_ENV
|
||||
echo ${{ env.VERSION }}
|
||||
- name: Build the sucker
|
||||
run: |
|
||||
sed -i -e "s/?????/${{ env.CURRENT_TIME_VERSION }}/g" src/__init__.py
|
||||
sed -i -e "s/?????/${{ env.VERSION }}/g" src/__init__.py
|
||||
make build
|
||||
- name: Create release!
|
||||
run: |
|
||||
JSON_DATA=$(
|
||||
printf '%s' \
|
||||
'{'\
|
||||
'"tag_name":"${{ env.CURRENT_TIME_VERSION }}",'\
|
||||
'"name":"${{ env.CURRENT_TIME_VERSION }}",'\
|
||||
'"tag_name":"${{ env.VERSION }}",'\
|
||||
'"name":"${{ env.VERSION }}",'\
|
||||
'"body":"RELEASE THE KRAKEN"'\
|
||||
'}' \
|
||||
)
|
3
.gitignore
vendored
3
.gitignore
vendored
@ -128,4 +128,5 @@ dmypy.json
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
setup.py
|
||||
utils
|
||||
utils
|
||||
praw.ini
|
||||
|
1475
poetry.lock
generated
1475
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "src"
|
||||
version = "0.1.0"
|
||||
version = "0.3.0"
|
||||
description = ""
|
||||
authors = ["Joe Kaufeld <opensource@joekaufeld.com>"]
|
||||
|
||||
@ -10,6 +10,8 @@ shiv = "^1.0.1"
|
||||
httpx = "^0.23.0"
|
||||
click = "^8.1.3"
|
||||
rich = "^12.5.1"
|
||||
markdownify = "^0.11.6"
|
||||
praw = "^7.7.0"
|
||||
|
||||
[tool.poetry.dev-dependencies]
|
||||
poetry = "^1.1.14"
|
||||
@ -20,4 +22,4 @@ requires = ["poetry-core>=1.0.0"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.poetry.plugins."console_scripts"]
|
||||
"utils" = "src.cli:main"
|
||||
"utils" = "src.cli:main"
|
||||
|
117
src/cli.py
117
src/cli.py
@ -6,13 +6,15 @@ import uuid
|
||||
|
||||
import click
|
||||
import httpx
|
||||
from praw import Reddit
|
||||
from rich import pretty
|
||||
from rich.status import Status
|
||||
from rich.traceback import install
|
||||
from shiv.bootstrap import current_zipfile
|
||||
|
||||
import src
|
||||
from src.helpers import flip_char
|
||||
from src.helpers import flip_char, load_config, write_config
|
||||
from src.joplin import process_joplin_posts, get_folders, BASE_URL
|
||||
from src.art import BANNERS
|
||||
|
||||
|
||||
@ -38,7 +40,7 @@ def main(ctx, update):
|
||||
pretty.install() # type: ignore
|
||||
install() # traceback handler
|
||||
|
||||
code.interact(banner, None)
|
||||
code.interact(local=globals(), banner=banner)
|
||||
sys.exit()
|
||||
|
||||
|
||||
@ -48,6 +50,21 @@ def uuid4():
|
||||
click.echo(uuid.uuid4())
|
||||
|
||||
|
||||
@main.command()
|
||||
def joplin():
|
||||
"""Search Joplin for notes titled 'convert' and process them."""
|
||||
process_joplin_posts()
|
||||
|
||||
|
||||
@main.command()
|
||||
def objectid():
|
||||
"""Generate a random ObjectID."""
|
||||
new_id = ""
|
||||
for _ in range(24):
|
||||
new_id += random.choice(string.ascii_lowercase[:6] + string.digits)
|
||||
click.echo(new_id)
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.argument("dice")
|
||||
def roll(dice: str):
|
||||
@ -99,6 +116,98 @@ def beautify(words: list[str]):
|
||||
click.echo("".join(new_beautiful_string))
|
||||
|
||||
|
||||
@main.command()
|
||||
def get_saved_from_reddit() -> None:
|
||||
"""Get saved posts from reddit."""
|
||||
|
||||
def _process(item):
|
||||
fullname = item.fullname
|
||||
if fullname.startswith("t3"):
|
||||
# we got a post
|
||||
title = f"{item.subreddit.display_name} - {item.title}"
|
||||
body = item.selftext if item.selftext else item.url
|
||||
elif fullname.startswith("t1"):
|
||||
# comment time
|
||||
try:
|
||||
author_name = item.author.name
|
||||
except AttributeError:
|
||||
author_name = "[deleted]"
|
||||
title = (
|
||||
f"{item.submission.subreddit.display_name} - {item.submission.title}"
|
||||
)
|
||||
body = f"Comment from {author_name}:\n\n{item.body}"
|
||||
else:
|
||||
click.echo(f"Not sure how to process https://reddit.com{item.permalink}")
|
||||
return
|
||||
|
||||
if item.over_18:
|
||||
title = "🔴 " + title
|
||||
body = "https://reddit.com" + item.permalink + "\n\n" + body
|
||||
|
||||
joplin = BASE_URL + cfg["JOPLIN_PORT"]
|
||||
notes_url = joplin + f"/notes/?token={cfg.get('JOPLIN_TOKEN')}"
|
||||
click.echo(f"Processing {title}...")
|
||||
httpx.post(
|
||||
notes_url,
|
||||
json={
|
||||
"title": title,
|
||||
"body": body,
|
||||
"parent_id": cfg.get("joplin_saved_posts_folder_id"),
|
||||
},
|
||||
)
|
||||
item.unsave()
|
||||
|
||||
cfg = load_config()
|
||||
# because 2fa is enabled, we have to get the code first
|
||||
twofa_code = click.prompt("What's the current 2fa code?", type=str)
|
||||
if not cfg.get("REDDIT_CLIENT_ID"):
|
||||
cfg["REDDIT_CLIENT_ID"] = ""
|
||||
if not cfg.get("REDDIT_CLIENT_SECRET"):
|
||||
cfg["REDDIT_CLIENT_SECRET"] = ""
|
||||
if not cfg.get("REDDIT_PASSWORD"):
|
||||
cfg["REDDIT_PASSWORD"] = ""
|
||||
if not cfg.get("REDDIT_USERNAME"):
|
||||
cfg["REDDIT_USERNAME"] = ""
|
||||
if not cfg.get("joplin_saved_posts_folder_id"):
|
||||
cfg["joplin_saved_posts_folder_id"] = ""
|
||||
|
||||
write_config(cfg)
|
||||
|
||||
username = cfg.get("REDDIT_USERNAME", "")
|
||||
|
||||
r = Reddit(
|
||||
client_id=cfg.get("REDDIT_CLIENT_ID", ""),
|
||||
client_secret=cfg.get("REDDIT_CLIENT_SECRET", ""),
|
||||
username=username,
|
||||
password=f"{cfg.get('REDDIT_PASSWORD', '')}:{twofa_code}",
|
||||
user_agent=f"getter of saved posts, u/{username}",
|
||||
)
|
||||
|
||||
try:
|
||||
r.user.me()
|
||||
except Exception as e:
|
||||
click.echo(f"Cannot connect to reddit: {e}")
|
||||
return
|
||||
|
||||
if not cfg.get("joplin_saved_posts_folder_id"):
|
||||
folders = get_folders()
|
||||
click.echo("Pick the folder that I should write your saved posts to:")
|
||||
for count, option in enumerate(folders["items"]):
|
||||
click.echo(f"{count} - {option['title']}")
|
||||
folder_position = click.prompt("Number of folder?", type=int)
|
||||
folder_name = folders["items"][folder_position]["title"]
|
||||
folder_id = folders["items"][folder_position]["id"]
|
||||
|
||||
click.echo(f"Got it, will write to {folder_name}, ID {folder_id}.")
|
||||
cfg["joplin_saved_posts_folder_id"] = folder_id
|
||||
write_config(cfg)
|
||||
|
||||
for _ in range(10):
|
||||
click.echo("Getting new posts...")
|
||||
for item in r.user.me().saved(limit=None):
|
||||
_process(item)
|
||||
|
||||
|
||||
def update_from_gitea():
|
||||
"""Get the newest release from Gitea and install it."""
|
||||
status = Status("Checking for new release...")
|
||||
@ -119,9 +228,7 @@ def update_from_gitea():
|
||||
release_data = response.json()
|
||||
if release_data["tag_name"] == src.__version__:
|
||||
status.stop()
|
||||
click.echo(
|
||||
"Server version is the same as current version;" " nothing to update."
|
||||
)
|
||||
click.echo("Server version is the same as current version; nothing to update.")
|
||||
return
|
||||
status.update("Updating...")
|
||||
|
||||
|
@ -1,5 +1,10 @@
|
||||
from pathlib import Path
|
||||
import json
|
||||
import os
|
||||
import string
|
||||
|
||||
import click
|
||||
|
||||
|
||||
def flip_char(char: str):
|
||||
if char.lower() not in string.ascii_letters:
|
||||
@ -9,3 +14,33 @@ def flip_char(char: str):
|
||||
return string.ascii_uppercase[string.ascii_lowercase.find(char)]
|
||||
else:
|
||||
return string.ascii_lowercase[string.ascii_uppercase.find(char)]
|
||||
|
||||
|
||||
base_folder = Path(Path.home() / ".config")
|
||||
config_file = base_folder / "utils_config.json"
|
||||
|
||||
|
||||
def load_config() -> dict:
|
||||
"""Try to load the computer-specific utils JSON file."""
|
||||
if not base_folder.exists():
|
||||
os.mkdir(base_folder)
|
||||
|
||||
if not config_file.exists():
|
||||
with open(config_file, "w") as f:
|
||||
f.write(json.dumps({}, indent=2))
|
||||
return {}
|
||||
|
||||
try:
|
||||
with open(config_file, "r") as f:
|
||||
return json.load(f)
|
||||
except json.JSONDecodeError:
|
||||
click.echo(
|
||||
f"Cannot load config -- potentially corrupt file. Check {config_file:s}"
|
||||
)
|
||||
return {}
|
||||
|
||||
|
||||
def write_config(config_obj) -> None:
|
||||
load_config() # make sure everything exists
|
||||
with open(config_file, "w") as f:
|
||||
f.write(json.dumps(config_obj, indent=2))
|
||||
|
124
src/joplin.py
Normal file
124
src/joplin.py
Normal file
@ -0,0 +1,124 @@
|
||||
import click
|
||||
import httpx
|
||||
import time
|
||||
|
||||
import bs4
|
||||
from markdownify import markdownify as md
|
||||
|
||||
from src.helpers import load_config, write_config
|
||||
|
||||
|
||||
BASE_URL = "http://localhost:"
|
||||
|
||||
|
||||
def doublecheck_port_number(port: int):
|
||||
try:
|
||||
if (
|
||||
httpx.get(f"http://localhost:{port}/ping").content.decode()
|
||||
== "JoplinClipperServer"
|
||||
):
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def auth_with_joplin():
|
||||
config = load_config()
|
||||
if not config.get("JOPLIN_PORT"):
|
||||
while True:
|
||||
config["JOPLIN_PORT"] = click.prompt(
|
||||
"What's the port number of the Joplin Web Clipper service?",
|
||||
type=int,
|
||||
default=41184,
|
||||
)
|
||||
config["JOPLIN_PORT"] = str(config["JOPLIN_PORT"])
|
||||
if doublecheck_port_number(config["JOPLIN_PORT"]):
|
||||
break
|
||||
else:
|
||||
click.echo("Something's not right there. Is the service enabled?")
|
||||
|
||||
# this triggers the dialog in Joplin to accept the auth connection.
|
||||
auth_token = (
|
||||
httpx.post(BASE_URL + config["JOPLIN_PORT"] + "/auth").json().get("auth_token")
|
||||
)
|
||||
click.echo("Check Joplin to allow this connection.")
|
||||
|
||||
while True:
|
||||
check_resp = httpx.get(
|
||||
BASE_URL + config["JOPLIN_PORT"] + f"/auth/check?auth_token={auth_token}"
|
||||
).json()
|
||||
if check_resp.get("status") == "waiting":
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
config["JOPLIN_TOKEN"] = check_resp.get("token")
|
||||
break
|
||||
|
||||
write_config(config)
|
||||
|
||||
|
||||
def get_folders():
|
||||
c = load_config()
|
||||
first_call_success = False
|
||||
try:
|
||||
resp = httpx.get(
|
||||
BASE_URL
|
||||
+ c.get("JOPLIN_PORT", "")
|
||||
+ f"/folders?token={c.get('JOPLIN_TOKEN')}"
|
||||
)
|
||||
if resp.status_code == 403:
|
||||
click.echo("got 403")
|
||||
auth_with_joplin()
|
||||
else:
|
||||
resp = resp.json()
|
||||
first_call_success = True
|
||||
except httpx._exceptions.ConnectError:
|
||||
auth_with_joplin()
|
||||
|
||||
if not first_call_success:
|
||||
c = load_config()
|
||||
resp = httpx.get(
|
||||
BASE_URL
|
||||
+ c.get("JOPLIN_PORT", "")
|
||||
+ f"/folders?token={c.get('JOPLIN_TOKEN')}"
|
||||
).json()
|
||||
return resp
|
||||
|
||||
|
||||
def process_joplin_posts():
|
||||
# this will handle the initial auth flow if we aren't already authed
|
||||
get_folders()
|
||||
c = load_config()
|
||||
joplin = BASE_URL + c["JOPLIN_PORT"]
|
||||
resp = httpx.get(
|
||||
joplin
|
||||
+ f"/search?query=convert&fields=id,parent_id&token={c.get('JOPLIN_TOKEN')}"
|
||||
)
|
||||
resp = resp.json()
|
||||
if not resp.get("items"):
|
||||
click.echo("No notes found with title 'convert'.")
|
||||
return
|
||||
else:
|
||||
items = resp.get("items")
|
||||
|
||||
for blob in items:
|
||||
url = (
|
||||
httpx.get(
|
||||
joplin
|
||||
+ f"/notes/{blob['id']}?fields=body&token={c.get('JOPLIN_TOKEN')}"
|
||||
)
|
||||
.json()["body"]
|
||||
.strip()
|
||||
)
|
||||
click.echo(f"Processing {url}...")
|
||||
site = httpx.get(url, follow_redirects=True)
|
||||
soup = bs4.BeautifulSoup(site.content, features="html5lib")
|
||||
# clean up that schizz
|
||||
title = soup.title.text
|
||||
[t.extract() for t in soup(["script", "head", "style"])]
|
||||
body = md(str(soup))
|
||||
httpx.put(
|
||||
joplin + f"/notes/{blob['id']}?token={c.get('JOPLIN_TOKEN')}",
|
||||
json={"title": title, "body": body},
|
||||
)
|
||||
|
||||
click.echo(resp)
|
Loading…
Reference in New Issue
Block a user