Compare commits

..

No commits in common. "master" and "b62acac" have entirely different histories.

16 changed files with 376 additions and 1762 deletions

View file

@ -1,80 +0,0 @@
name: Release
on:
push:
branches:
- master
jobs:
check_for_changed_version:
runs-on: 'ubuntu-latest'
# Declare outputs for next jobs
outputs:
version_changed: ${{ steps.check_file_changed.outputs.version_changed }}
steps:
- uses: actions/checkout@v2
with:
# Checkout as many commits as needed for the diff
fetch-depth: 2
- id: check_file_changed
run: |
# Diff HEAD with the previous commit
if git diff HEAD^ HEAD pyproject.toml | grep -q "+version =";
then
GOTIME="True"
else
GOTIME="False"
fi
echo "::notice title=GOTIME::$GOTIME"
# Set the output named "version_changed"
echo "version_changed=$GOTIME" >> $GITHUB_OUTPUT
build:
runs-on: ubuntu-latest
# this should start working with https://github.com/go-gitea/gitea/pull/24230
# needs: [ check_for_changed_version ]
# if: needs.check_for_changed_version.outputs.version_changed == 'True'
permissions:
contents: write
steps:
- uses: https://github.com/actions/checkout@v2
- uses: https://github.com/actions/setup-python@v4
with:
python-version: '3.11.x'
- name: Install Env
# this should be all we need because shiv will download the deps itself
run: |
pip install --upgrade pip
pip install shiv
pip install poetry
- name: Add VERSION env property
run: |
echo "VERSION=v$(poetry version | python -c 'import sys;print(sys.stdin.read().split()[1])')" >> $GITHUB_ENV
echo ${{ env.VERSION }}
- name: Build the sucker
run: |
sed -i -e "s/?????/${{ env.VERSION }}/g" src/__init__.py
make build
- name: Create release!
run: |
JSON_DATA=$(
printf '%s' \
'{'\
'"tag_name":"${{ env.VERSION }}",'\
'"name":"${{ env.VERSION }}",'\
'"body":"RELEASE THE KRAKEN"'\
'}' \
)
echo """release_id=$(\
curl -X POST \
-s https://git.joekaufeld.com/api/v1/repos/${GITHUB_REPOSITORY%/*}/${{ github.event.repository.name }}/releases \
-H "Authorization: token ${{ secrets.PAT }}" \
-H 'Content-Type: application/json' \
-d "$JSON_DATA" \
| python3 -c "import sys, json; print(json.load(sys.stdin)['id'])"\
)""" >> $GITHUB_ENV
- name: Upload assets!
run: |
curl https://git.joekaufeld.com/api/v1/repos/${GITHUB_REPOSITORY%/*}/${{ github.event.repository.name }}/releases/${{ env.release_id }}/assets \
-H "Authorization: token ${{ secrets.PAT }}" \
-F attachment=@utils

37
.github/workflows/build_and_release.yml vendored Normal file
View file

@ -0,0 +1,37 @@
name: Release
on:
push:
branches:
- master
jobs:
build:
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v4
with:
python-version: '3.10.x'
- uses: snok/install-poetry@v1
with:
virtualenvs-create: true
- name: Install Dependencies
run: poetry install
# https://stackoverflow.com/a/64195658
- name: Add SHORT_SHA env property with commit short sha
run: echo "SHORT_SHA=`echo ${GITHUB_SHA} | cut -c1-7`" >> $GITHUB_ENV
- name: Inject version into src.__version__
run: echo "__version__ = '${{ env.SHORT_SHA }}'" > src/__init__.py
- name: build the sucker
run: make build
- uses: ncipollo/release-action@v1
with:
artifacts: "utils"
body: "It's releasin' time"
generateReleaseNotes: true
tag: ${{ env.SHORT_SHA }}
commit: master
token: ${{ secrets.PAT }}

1
.gitignore vendored
View file

@ -129,4 +129,3 @@ dmypy.json
.pyre/
setup.py
utils
praw.ini

View file

@ -1,5 +1,5 @@
setup:
python src/poetry2setup.py > setup.py
.venv/bin/python src/poetry2setup.py > setup.py
build: setup shiv
@ -7,4 +7,4 @@ clean:
rm setup.py
shiv:
shiv -c utils -o utils .
.venv/bin/shiv -c utils -o utils .

1484
poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,6 @@
[tool.poetry]
name = "src"
version = "0.3.0"
version = "0.1.0"
description = ""
authors = ["Joe Kaufeld <opensource@joekaufeld.com>"]
@ -8,10 +8,6 @@ authors = ["Joe Kaufeld <opensource@joekaufeld.com>"]
python = "^3.10"
shiv = "^1.0.1"
httpx = "^0.23.0"
click = "^8.1.3"
rich = "^12.5.1"
markdownify = "^0.11.6"
praw = "^7.7.0"
[tool.poetry.dev-dependencies]
poetry = "^1.1.14"

View file

@ -1 +0,0 @@
__version__ = "?????" # will be replaced during build by CI

View file

@ -1,30 +0,0 @@
_BANNER1 = r"""
. o8o oooo o8o . o8o
.o8 `"' `888 `"' .o8 `"'
oooo oooo .o888oo oooo 888 oooo .o888oo oooo .ooooo. .oooo.o
`888 `888 888 `888 888 `888 888 `888 d88' `88b d88( "8
888 888 888 888 888 888 888 888 888ooo888 `"Y88b.
888 888 888 . 888 888 888 888 . 888 888 .o o. )88b
`V88V"V8P' "888" o888o o888o o888o "888" o888o `Y8bod8P' 8""888P'
"""
_BANNER2 = r"""
__ .__.__ .__ __ .__
__ ___/ |_|__| | |__|/ |_|__| ____ ______
| | \ __\ | | | \ __\ |/ __ \ / ___/
| | /| | | | |_| || | | \ ___/ \___ \
|____/ |__| |__|____/__||__| |__|\___ >____ >
\/ \/
"""
_BANNER3 = r"""
__ __ _______ ___ ___ ___ _______ ___ _______ _______
| | | || || | | | | | | || | | || |
| | | ||_ _|| | | | | | |_ _|| | | ___|| _____|
| |_| | | | | | | | | | | | | | | |___ | |_____
| | | | | | | |___ | | | | | | | ___||_____ |
| | | | | | | || | | | | | | |___ _____| |
|_______| |___| |___| |_______||___| |___| |___| |_______||_______|
"""
BANNERS: list[str] = [_BANNER1, _BANNER2, _BANNER3]

View file

@ -1,248 +1,30 @@
import code
import random
import string
import argparse
import importlib
import sys
import uuid
import click
import httpx
from praw import Reddit
from rich import pretty
from rich.status import Status
from rich.traceback import install
from shiv.bootstrap import current_zipfile
import src
from src.helpers import flip_char, load_config, write_config
from src.joplin import process_joplin_posts, get_folders, BASE_URL
from src.art import BANNERS
@click.group(
context_settings=dict(help_option_names=["-h", "--help", "--halp"]),
invoke_without_command=True,
)
@click.pass_context
@click.version_option(version=src.__version__, prog_name="utils")
@click.option(
"--update",
is_flag=True,
help="Check Gitea for a new version and auto-update.",
)
def main(ctx, update):
"""Launch a utility or drop into a command line REPL if no command is given."""
if update:
update_from_gitea()
sys.exit()
elif ctx.invoked_subcommand is None:
banner = random.choice(BANNERS)
pretty.install() # type: ignore
install() # traceback handler
code.interact(local=globals(), banner=banner)
sys.exit()
@main.command()
def uuid4():
"""Generate a random UUID4."""
click.echo(uuid.uuid4())
@main.command()
def joplin():
"""Search Joplin for notes titled 'convert' and process them."""
process_joplin_posts()
@main.command()
def objectid():
"""Generate a random ObjectID."""
new_id = ""
for _ in range(24):
new_id += random.choice(string.ascii_lowercase[:6] + string.digits)
click.echo(new_id)
@main.command()
@click.argument("dice")
def roll(dice: str):
"""Roll some dice. Format: utils roll 3d8"""
if "d" not in dice:
click.echo("Missing part of the call. Example: 1d10")
return
if len(dice.split("d")) != 2:
click.echo("Error parsing dice. Example: 2d6")
return
num, sides = dice.split("d")
try:
num = int(num)
sides = int(sides)
except ValueError:
click.echo("Need numbers for the dice. Example: 30d4")
return
if num < 1 or sides < 1:
click.echo("Dude. Example: 2d8")
return
values: list[int] = []
for die in range(num):
values.append(random.randint(1, sides))
if num == 1:
click.echo(f"You rolled a {values[0]}.")
else:
click.echo(f"You rolled: {'+'.join([str(item) for item in values])}")
click.echo(f"Total: {sum(values)}")
@main.command()
@click.argument("words", nargs=-1)
def beautify(words: list[str]):
BANNER = """
. o8o oooo o8o . o8o
.o8 `"' `888 `"' .o8 `"'
oooo oooo .o888oo oooo 888 oooo .o888oo oooo .ooooo. .oooo.o
`888 `888 888 `888 888 `888 888 `888 d88' `88b d88( "8
888 888 888 888 888 888 888 888 888ooo888 `"Y88b.
888 888 888 . 888 888 888 888 . 888 888 .o o. )88b
`V88V"V8P' "888" o888o o888o o888o "888" o888o `Y8bod8P' 8""888P'
"""
MAkE YoUr mEsSaGe bEaUtIfUl!!!1!!
WORDS is either a single string surrounded by double quotes or multiple bare words,
e.g. `utils beautify "one two three"` or `utils beautify one two three`.
"""
message = " ".join(words)
new_beautiful_string = []
for num, letter in enumerate(message):
if num % 2:
letter = flip_char(letter)
new_beautiful_string.append(letter)
click.echo("".join(new_beautiful_string))
@main.command()
def get_saved_from_reddit() -> None:
"""Get saved posts from reddit."""
def _process(item):
fullname = item.fullname
if fullname.startswith("t3"):
# we got a post
title = f"{item.subreddit.display_name} - {item.title}"
body = item.selftext if item.selftext else item.url
elif fullname.startswith("t1"):
# comment time
try:
author_name = item.author.name
except AttributeError:
author_name = "[deleted]"
title = (
f"{item.submission.subreddit.display_name} - {item.submission.title}"
def main():
parser = argparse.ArgumentParser(
description='Run a specific script by name. If no name is provided, start a REPL.'
)
body = f"Comment from {author_name}:\n\n{item.body}"
parser.add_argument('user_input', type=str, nargs='*',
help='The name of the script that you want to run plus any arguments for that script.')
args = parser.parse_args()
user_input = args.user_input
if len(user_input) == 0:
code.interact(banner=BANNER, local=locals())
else:
click.echo(f"Not sure how to process https://reddit.com{item.permalink}")
return
if item.over_18:
title = "🔴 " + title
body = "https://reddit.com" + item.permalink + "\n\n" + body
joplin = BASE_URL + cfg["JOPLIN_PORT"]
notes_url = joplin + f"/notes/?token={cfg.get('JOPLIN_TOKEN')}"
click.echo(f"Processing {title}...")
httpx.post(
notes_url,
json={
"title": title,
"body": body,
"parent_id": cfg.get("joplin_saved_posts_folder_id"),
},
)
item.unsave()
cfg = load_config()
# because 2fa is enabled, we have to get the code first
twofa_code = click.prompt("What's the current 2fa code?", type=str)
if not cfg.get("REDDIT_CLIENT_ID"):
cfg["REDDIT_CLIENT_ID"] = ""
if not cfg.get("REDDIT_CLIENT_SECRET"):
cfg["REDDIT_CLIENT_SECRET"] = ""
if not cfg.get("REDDIT_PASSWORD"):
cfg["REDDIT_PASSWORD"] = ""
if not cfg.get("REDDIT_USERNAME"):
cfg["REDDIT_USERNAME"] = ""
if not cfg.get("joplin_saved_posts_folder_id"):
cfg["joplin_saved_posts_folder_id"] = ""
write_config(cfg)
username = cfg.get("REDDIT_USERNAME", "")
r = Reddit(
client_id=cfg.get("REDDIT_CLIENT_ID", ""),
client_secret=cfg.get("REDDIT_CLIENT_SECRET", ""),
username=username,
password=f"{cfg.get('REDDIT_PASSWORD', '')}:{twofa_code}",
user_agent=f"getter of saved posts, u/{username}",
)
try:
r.user.me()
except Exception as e:
click.echo(f"Cannot connect to reddit: {e}")
return
if not cfg.get("joplin_saved_posts_folder_id"):
folders = get_folders()
click.echo("Pick the folder that I should write your saved posts to:")
for count, option in enumerate(folders["items"]):
click.echo(f"{count} - {option['title']}")
folder_position = click.prompt("Number of folder?", type=int)
folder_name = folders["items"][folder_position]["title"]
folder_id = folders["items"][folder_position]["id"]
click.echo(f"Got it, will write to {folder_name}, ID {folder_id}.")
cfg["joplin_saved_posts_folder_id"] = folder_id
write_config(cfg)
for _ in range(10):
click.echo("Getting new posts...")
for item in r.user.me().saved(limit=None):
_process(item)
def update_from_gitea():
"""Get the newest release from Gitea and install it."""
status = Status("Checking for new release...")
status.start()
response = httpx.get(
"https://git.joekaufeld.com/api/v1/repos/jkaufeld/utils/releases/latest"
)
if response.status_code != 200:
status.stop()
click.echo(
f"Something went wrong when talking to Gitea; got a"
f" {response.status_code} with the following content:\n"
f"{response.content}"
)
return
status.update("Checking for new release...")
release_data = response.json()
if release_data["tag_name"] == src.__version__:
status.stop()
click.echo("Server version is the same as current version; nothing to update.")
return
status.update("Updating...")
url = release_data["assets"][0]["browser_download_url"]
with current_zipfile() as archive:
with open(archive.filename, "wb") as f, httpx.stream(
"GET", url, follow_redirects=True
) as r:
for line in r.iter_bytes():
f.write(line)
status.stop()
click.echo(f"Updated to {release_data['tag_name']}! 🎉")
if __name__ == "__main__":
main()
command_name = user_input.pop(0)
module = importlib.import_module(f"src.commands.{command_name}")
sys.exit(module.main(user_input))

0
src/commands/__init__.py Normal file
View file

22
src/commands/beautify.py Normal file
View file

@ -0,0 +1,22 @@
import string
def flip_char(char: str):
if char in string.ascii_lowercase:
return string.ascii_uppercase[string.ascii_lowercase.find(char)]
else:
return string.ascii_lowercase[string.ascii_uppercase.find(char)]
def main(args: list[str]):
message = " ".join(args)
new_beautiful_string = []
for num, letter in enumerate(message):
if letter in string.ascii_letters:
if num % 2:
new_beautiful_string.append(flip_char(letter))
continue
new_beautiful_string.append(letter)
print("".join(new_beautiful_string))

22
src/commands/update.py Normal file
View file

@ -0,0 +1,22 @@
import httpx
import json
import sys
import zipfile
from datetime import datetime
def main(args):
self_name = sys.argv[0].strip(".").strip("/") # todo: is this resilient?
data = json.loads(zipfile.ZipFile(self_name).read("environment.json"))
build_time = datetime.fromisoformat(data['built_at'])
release_data = httpx.get(
'https://api.github.com/repos/itsthejoker/utils/releases/latest'
)
if release_data.status_code != 200:
print(
f"Something went wrong when talking to github; got a"
f" {release_data.status_code} with the following content:\n"
f"{release_data.content}"
)
sys.exit()

6
src/commands/uuid4.py Normal file
View file

@ -0,0 +1,6 @@
"""Generate and print a random UUID4."""
import uuid
def main(args: list[str]):
print(uuid.uuid4())

View file

@ -1,46 +0,0 @@
from pathlib import Path
import json
import os
import string
import click
def flip_char(char: str):
if char.lower() not in string.ascii_letters:
return char
if char in string.ascii_lowercase:
return string.ascii_uppercase[string.ascii_lowercase.find(char)]
else:
return string.ascii_lowercase[string.ascii_uppercase.find(char)]
base_folder = Path(Path.home() / ".config")
config_file = base_folder / "utils_config.json"
def load_config() -> dict:
"""Try to load the computer-specific utils JSON file."""
if not base_folder.exists():
os.mkdir(base_folder)
if not config_file.exists():
with open(config_file, "w") as f:
f.write(json.dumps({}, indent=2))
return {}
try:
with open(config_file, "r") as f:
return json.load(f)
except json.JSONDecodeError:
click.echo(
f"Cannot load config -- potentially corrupt file. Check {config_file:s}"
)
return {}
def write_config(config_obj) -> None:
load_config() # make sure everything exists
with open(config_file, "w") as f:
f.write(json.dumps(config_obj, indent=2))

View file

@ -1,124 +0,0 @@
import click
import httpx
import time
import bs4
from markdownify import markdownify as md
from src.helpers import load_config, write_config
BASE_URL = "http://localhost:"
def doublecheck_port_number(port: int):
try:
if (
httpx.get(f"http://localhost:{port}/ping").content.decode()
== "JoplinClipperServer"
):
return True
except Exception:
return False
def auth_with_joplin():
config = load_config()
if not config.get("JOPLIN_PORT"):
while True:
config["JOPLIN_PORT"] = click.prompt(
"What's the port number of the Joplin Web Clipper service?",
type=int,
default=41184,
)
config["JOPLIN_PORT"] = str(config["JOPLIN_PORT"])
if doublecheck_port_number(config["JOPLIN_PORT"]):
break
else:
click.echo("Something's not right there. Is the service enabled?")
# this triggers the dialog in Joplin to accept the auth connection.
auth_token = (
httpx.post(BASE_URL + config["JOPLIN_PORT"] + "/auth").json().get("auth_token")
)
click.echo("Check Joplin to allow this connection.")
while True:
check_resp = httpx.get(
BASE_URL + config["JOPLIN_PORT"] + f"/auth/check?auth_token={auth_token}"
).json()
if check_resp.get("status") == "waiting":
time.sleep(0.5)
else:
config["JOPLIN_TOKEN"] = check_resp.get("token")
break
write_config(config)
def get_folders():
c = load_config()
first_call_success = False
try:
resp = httpx.get(
BASE_URL
+ c.get("JOPLIN_PORT", "")
+ f"/folders?token={c.get('JOPLIN_TOKEN')}"
)
if resp.status_code == 403:
click.echo("got 403")
auth_with_joplin()
else:
resp = resp.json()
first_call_success = True
except httpx._exceptions.ConnectError:
auth_with_joplin()
if not first_call_success:
c = load_config()
resp = httpx.get(
BASE_URL
+ c.get("JOPLIN_PORT", "")
+ f"/folders?token={c.get('JOPLIN_TOKEN')}"
).json()
return resp
def process_joplin_posts():
# this will handle the initial auth flow if we aren't already authed
get_folders()
c = load_config()
joplin = BASE_URL + c["JOPLIN_PORT"]
resp = httpx.get(
joplin
+ f"/search?query=convert&fields=id,parent_id&token={c.get('JOPLIN_TOKEN')}"
)
resp = resp.json()
if not resp.get("items"):
click.echo("No notes found with title 'convert'.")
return
else:
items = resp.get("items")
for blob in items:
url = (
httpx.get(
joplin
+ f"/notes/{blob['id']}?fields=body&token={c.get('JOPLIN_TOKEN')}"
)
.json()["body"]
.strip()
)
click.echo(f"Processing {url}...")
site = httpx.get(url, follow_redirects=True)
soup = bs4.BeautifulSoup(site.content, features="html5lib")
# clean up that schizz
title = soup.title.text
[t.extract() for t in soup(["script", "head", "style"])]
body = md(str(soup))
httpx.put(
joplin + f"/notes/{blob['id']}?token={c.get('JOPLIN_TOKEN')}",
json={"title": title, "body": body},
)
click.echo(resp)

View file

@ -1,5 +1,6 @@
from pathlib import Path
from __future__ import print_function
from poetry.core.utils._compat import Path
from poetry.core.factory import Factory
from poetry.core.masonry.builders.sdist import SdistBuilder