Compare commits

..

No commits in common. "master" and "1234" have entirely different histories.
master ... 1234

8 changed files with 332 additions and 1585 deletions

View File

@ -1,80 +0,0 @@
name: Release
on:
push:
branches:
- master
jobs:
check_for_changed_version:
runs-on: 'ubuntu-latest'
# Declare outputs for next jobs
outputs:
version_changed: ${{ steps.check_file_changed.outputs.version_changed }}
steps:
- uses: actions/checkout@v2
with:
# Checkout as many commits as needed for the diff
fetch-depth: 2
- id: check_file_changed
run: |
# Diff HEAD with the previous commit
if git diff HEAD^ HEAD pyproject.toml | grep -q "+version =";
then
GOTIME="True"
else
GOTIME="False"
fi
echo "::notice title=GOTIME::$GOTIME"
# Set the output named "version_changed"
echo "version_changed=$GOTIME" >> $GITHUB_OUTPUT
build:
runs-on: ubuntu-latest
# this should start working with https://github.com/go-gitea/gitea/pull/24230
# needs: [ check_for_changed_version ]
# if: needs.check_for_changed_version.outputs.version_changed == 'True'
permissions:
contents: write
steps:
- uses: https://github.com/actions/checkout@v2
- uses: https://github.com/actions/setup-python@v4
with:
python-version: '3.11.x'
- name: Install Env
# this should be all we need because shiv will download the deps itself
run: |
pip install --upgrade pip
pip install shiv
pip install poetry
- name: Add VERSION env property
run: |
echo "VERSION=v$(poetry version | python -c 'import sys;print(sys.stdin.read().split()[1])')" >> $GITHUB_ENV
echo ${{ env.VERSION }}
- name: Build the sucker
run: |
sed -i -e "s/?????/${{ env.VERSION }}/g" src/__init__.py
make build
- name: Create release!
run: |
JSON_DATA=$(
printf '%s' \
'{'\
'"tag_name":"${{ env.VERSION }}",'\
'"name":"${{ env.VERSION }}",'\
'"body":"RELEASE THE KRAKEN"'\
'}' \
)
echo """release_id=$(\
curl -X POST \
-s https://git.joekaufeld.com/api/v1/repos/${GITHUB_REPOSITORY%/*}/${{ github.event.repository.name }}/releases \
-H "Authorization: token ${{ secrets.PAT }}" \
-H 'Content-Type: application/json' \
-d "$JSON_DATA" \
| python3 -c "import sys, json; print(json.load(sys.stdin)['id'])"\
)""" >> $GITHUB_ENV
- name: Upload assets!
run: |
curl https://git.joekaufeld.com/api/v1/repos/${GITHUB_REPOSITORY%/*}/${{ github.event.repository.name }}/releases/${{ env.release_id }}/assets \
-H "Authorization: token ${{ secrets.PAT }}" \
-F attachment=@utils

39
.github/workflows/build_and_release.yml vendored Normal file
View File

@ -0,0 +1,39 @@
name: Release
on:
push:
branches:
- master
jobs:
build:
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- uses: https://github.com/actions/checkout@v2
- name: lsb_release -i -r -s
run: lsb_release -i -r -s
- uses: https://github.com/actions/setup-python@v4
with:
python-version: '3.11.x'
- name: Install Env
# this should be all we need because shiv will download the deps itself
run: |
pip install --upgrade pip
pip install shiv
pip install poetry
- name: Add CURRENT_TIME env property
run: echo "CURRENT_TIME_VERSION=v$(date '+%s')" >> $GITHUB_ENV
- name: Build the sucker
run: |
sed -i -e "s/?????/${{ env.CURRENT_TIME_VERSION }}/g" src/__init__.py
make build
- uses: https://github.com/ncipollo/release-action@v1
with:
artifacts: "utils"
body: "It's releasin' time"
generateReleaseNotes: false
tag: ${{ env.CURRENT_TIME_VERSION }}
commit: master
token: ${{ secrets.PAT }}

3
.gitignore vendored
View File

@ -128,5 +128,4 @@ dmypy.json
# Pyre type checker # Pyre type checker
.pyre/ .pyre/
setup.py setup.py
utils utils
praw.ini

1475
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "src" name = "src"
version = "0.3.0" version = "0.1.0"
description = "" description = ""
authors = ["Joe Kaufeld <opensource@joekaufeld.com>"] authors = ["Joe Kaufeld <opensource@joekaufeld.com>"]
@ -10,8 +10,6 @@ shiv = "^1.0.1"
httpx = "^0.23.0" httpx = "^0.23.0"
click = "^8.1.3" click = "^8.1.3"
rich = "^12.5.1" rich = "^12.5.1"
markdownify = "^0.11.6"
praw = "^7.7.0"
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]
poetry = "^1.1.14" poetry = "^1.1.14"
@ -22,4 +20,4 @@ requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api" build-backend = "poetry.core.masonry.api"
[tool.poetry.plugins."console_scripts"] [tool.poetry.plugins."console_scripts"]
"utils" = "src.cli:main" "utils" = "src.cli:main"

View File

@ -6,15 +6,12 @@ import uuid
import click import click
import httpx import httpx
from praw import Reddit
from rich import pretty from rich import pretty
from rich.status import Status
from rich.traceback import install from rich.traceback import install
from shiv.bootstrap import current_zipfile from shiv.bootstrap import current_zipfile
import src import src
from src.helpers import flip_char, load_config, write_config from src.helpers import flip_char
from src.joplin import process_joplin_posts, get_folders, BASE_URL
from src.art import BANNERS from src.art import BANNERS
@ -24,23 +21,15 @@ from src.art import BANNERS
) )
@click.pass_context @click.pass_context
@click.version_option(version=src.__version__, prog_name="utils") @click.version_option(version=src.__version__, prog_name="utils")
@click.option( def main(ctx):
"--update",
is_flag=True,
help="Check Gitea for a new version and auto-update.",
)
def main(ctx, update):
"""Launch a utility or drop into a command line REPL if no command is given.""" """Launch a utility or drop into a command line REPL if no command is given."""
if update: if ctx.invoked_subcommand is None:
update_from_gitea()
sys.exit()
elif ctx.invoked_subcommand is None:
banner = random.choice(BANNERS) banner = random.choice(BANNERS)
pretty.install() # type: ignore pretty.install() # type: ignore
install() # traceback handler install() # traceback handler
code.interact(local=globals(), banner=banner) code.interact(banner, None)
sys.exit() sys.exit()
@ -50,21 +39,6 @@ def uuid4():
click.echo(uuid.uuid4()) click.echo(uuid.uuid4())
@main.command()
def joplin():
"""Search Joplin for notes titled 'convert' and process them."""
process_joplin_posts()
@main.command()
def objectid():
"""Generate a random ObjectID."""
new_id = ""
for _ in range(24):
new_id += random.choice(string.ascii_lowercase[:6] + string.digits)
click.echo(new_id)
@main.command() @main.command()
@click.argument("dice") @click.argument("dice")
def roll(dice: str): def roll(dice: str):
@ -109,139 +83,39 @@ def beautify(words: list[str]):
new_beautiful_string = [] new_beautiful_string = []
for num, letter in enumerate(message): for num, letter in enumerate(message):
if num % 2: if letter in string.ascii_letters:
letter = flip_char(letter) if num % 2:
new_beautiful_string.append(letter) letter = flip_char(letter)
new_beautiful_string.append(letter)
click.echo("".join(new_beautiful_string)) click.echo("".join(new_beautiful_string))
@main.command() @main.command()
def get_saved_from_reddit() -> None: def update():
"""Get saved posts from reddit."""
def _process(item):
fullname = item.fullname
if fullname.startswith("t3"):
# we got a post
title = f"{item.subreddit.display_name} - {item.title}"
body = item.selftext if item.selftext else item.url
elif fullname.startswith("t1"):
# comment time
try:
author_name = item.author.name
except AttributeError:
author_name = "[deleted]"
title = (
f"{item.submission.subreddit.display_name} - {item.submission.title}"
)
body = f"Comment from {author_name}:\n\n{item.body}"
else:
click.echo(f"Not sure how to process https://reddit.com{item.permalink}")
return
if item.over_18:
title = "🔴 " + title
body = "https://reddit.com" + item.permalink + "\n\n" + body
joplin = BASE_URL + cfg["JOPLIN_PORT"]
notes_url = joplin + f"/notes/?token={cfg.get('JOPLIN_TOKEN')}"
click.echo(f"Processing {title}...")
httpx.post(
notes_url,
json={
"title": title,
"body": body,
"parent_id": cfg.get("joplin_saved_posts_folder_id"),
},
)
item.unsave()
cfg = load_config()
# because 2fa is enabled, we have to get the code first
twofa_code = click.prompt("What's the current 2fa code?", type=str)
if not cfg.get("REDDIT_CLIENT_ID"):
cfg["REDDIT_CLIENT_ID"] = ""
if not cfg.get("REDDIT_CLIENT_SECRET"):
cfg["REDDIT_CLIENT_SECRET"] = ""
if not cfg.get("REDDIT_PASSWORD"):
cfg["REDDIT_PASSWORD"] = ""
if not cfg.get("REDDIT_USERNAME"):
cfg["REDDIT_USERNAME"] = ""
if not cfg.get("joplin_saved_posts_folder_id"):
cfg["joplin_saved_posts_folder_id"] = ""
write_config(cfg)
username = cfg.get("REDDIT_USERNAME", "")
r = Reddit(
client_id=cfg.get("REDDIT_CLIENT_ID", ""),
client_secret=cfg.get("REDDIT_CLIENT_SECRET", ""),
username=username,
password=f"{cfg.get('REDDIT_PASSWORD', '')}:{twofa_code}",
user_agent=f"getter of saved posts, u/{username}",
)
try:
r.user.me()
except Exception as e:
click.echo(f"Cannot connect to reddit: {e}")
return
if not cfg.get("joplin_saved_posts_folder_id"):
folders = get_folders()
click.echo("Pick the folder that I should write your saved posts to:")
for count, option in enumerate(folders["items"]):
click.echo(f"{count} - {option['title']}")
folder_position = click.prompt("Number of folder?", type=int)
folder_name = folders["items"][folder_position]["title"]
folder_id = folders["items"][folder_position]["id"]
click.echo(f"Got it, will write to {folder_name}, ID {folder_id}.")
cfg["joplin_saved_posts_folder_id"] = folder_id
write_config(cfg)
for _ in range(10):
click.echo("Getting new posts...")
for item in r.user.me().saved(limit=None):
_process(item)
def update_from_gitea():
"""Get the newest release from Gitea and install it.""" """Get the newest release from Gitea and install it."""
status = Status("Checking for new release...")
status.start()
response = httpx.get( response = httpx.get(
# "https://api.github.com/repos/itsthejoker/utils/releases/latest"
"https://git.joekaufeld.com/api/v1/repos/jkaufeld/utils/releases/latest" "https://git.joekaufeld.com/api/v1/repos/jkaufeld/utils/releases/latest"
) )
if response.status_code != 200: if response.status_code != 200:
status.stop()
click.echo( click.echo(
f"Something went wrong when talking to Gitea; got a" f"Something went wrong when talking to Gitea; got a"
f" {response.status_code} with the following content:\n" f" {response.status_code} with the following content:\n"
f"{response.content}" f"{response.content}"
) )
return return
status.update("Checking for new release...")
release_data = response.json() release_data = response.json()
if release_data["tag_name"] == src.__version__: if release_data["name"] == src.__version__:
status.stop()
click.echo("Server version is the same as current version; nothing to update.") click.echo("Server version is the same as current version; nothing to update.")
return return
status.update("Updating...")
url = release_data["assets"][0]["browser_download_url"] url = release_data["assets"][0]["browser_download_url"]
with current_zipfile() as archive: with current_zipfile() as archive:
with open(archive.filename, "wb") as f, httpx.stream( with open(archive.filename, "wb") as f, httpx.stream("GET", url, follow_redirects=True) as r:
"GET", url, follow_redirects=True
) as r:
for line in r.iter_bytes(): for line in r.iter_bytes():
f.write(line) f.write(line)
click.echo(f"Updated to {release_data['name']}! 🎉")
status.stop()
click.echo(f"Updated to {release_data['tag_name']}! 🎉")
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,46 +1,8 @@
from pathlib import Path
import json
import os
import string import string
import click
def flip_char(char: str): def flip_char(char: str):
if char.lower() not in string.ascii_letters:
return char
if char in string.ascii_lowercase: if char in string.ascii_lowercase:
return string.ascii_uppercase[string.ascii_lowercase.find(char)] return string.ascii_uppercase[string.ascii_lowercase.find(char)]
else: else:
return string.ascii_lowercase[string.ascii_uppercase.find(char)] return string.ascii_lowercase[string.ascii_uppercase.find(char)]
base_folder = Path(Path.home() / ".config")
config_file = base_folder / "utils_config.json"
def load_config() -> dict:
"""Try to load the computer-specific utils JSON file."""
if not base_folder.exists():
os.mkdir(base_folder)
if not config_file.exists():
with open(config_file, "w") as f:
f.write(json.dumps({}, indent=2))
return {}
try:
with open(config_file, "r") as f:
return json.load(f)
except json.JSONDecodeError:
click.echo(
f"Cannot load config -- potentially corrupt file. Check {config_file:s}"
)
return {}
def write_config(config_obj) -> None:
load_config() # make sure everything exists
with open(config_file, "w") as f:
f.write(json.dumps(config_obj, indent=2))

View File

@ -1,124 +0,0 @@
import click
import httpx
import time
import bs4
from markdownify import markdownify as md
from src.helpers import load_config, write_config
BASE_URL = "http://localhost:"
def doublecheck_port_number(port: int):
try:
if (
httpx.get(f"http://localhost:{port}/ping").content.decode()
== "JoplinClipperServer"
):
return True
except Exception:
return False
def auth_with_joplin():
config = load_config()
if not config.get("JOPLIN_PORT"):
while True:
config["JOPLIN_PORT"] = click.prompt(
"What's the port number of the Joplin Web Clipper service?",
type=int,
default=41184,
)
config["JOPLIN_PORT"] = str(config["JOPLIN_PORT"])
if doublecheck_port_number(config["JOPLIN_PORT"]):
break
else:
click.echo("Something's not right there. Is the service enabled?")
# this triggers the dialog in Joplin to accept the auth connection.
auth_token = (
httpx.post(BASE_URL + config["JOPLIN_PORT"] + "/auth").json().get("auth_token")
)
click.echo("Check Joplin to allow this connection.")
while True:
check_resp = httpx.get(
BASE_URL + config["JOPLIN_PORT"] + f"/auth/check?auth_token={auth_token}"
).json()
if check_resp.get("status") == "waiting":
time.sleep(0.5)
else:
config["JOPLIN_TOKEN"] = check_resp.get("token")
break
write_config(config)
def get_folders():
c = load_config()
first_call_success = False
try:
resp = httpx.get(
BASE_URL
+ c.get("JOPLIN_PORT", "")
+ f"/folders?token={c.get('JOPLIN_TOKEN')}"
)
if resp.status_code == 403:
click.echo("got 403")
auth_with_joplin()
else:
resp = resp.json()
first_call_success = True
except httpx._exceptions.ConnectError:
auth_with_joplin()
if not first_call_success:
c = load_config()
resp = httpx.get(
BASE_URL
+ c.get("JOPLIN_PORT", "")
+ f"/folders?token={c.get('JOPLIN_TOKEN')}"
).json()
return resp
def process_joplin_posts():
# this will handle the initial auth flow if we aren't already authed
get_folders()
c = load_config()
joplin = BASE_URL + c["JOPLIN_PORT"]
resp = httpx.get(
joplin
+ f"/search?query=convert&fields=id,parent_id&token={c.get('JOPLIN_TOKEN')}"
)
resp = resp.json()
if not resp.get("items"):
click.echo("No notes found with title 'convert'.")
return
else:
items = resp.get("items")
for blob in items:
url = (
httpx.get(
joplin
+ f"/notes/{blob['id']}?fields=body&token={c.get('JOPLIN_TOKEN')}"
)
.json()["body"]
.strip()
)
click.echo(f"Processing {url}...")
site = httpx.get(url, follow_redirects=True)
soup = bs4.BeautifulSoup(site.content, features="html5lib")
# clean up that schizz
title = soup.title.text
[t.extract() for t in soup(["script", "head", "style"])]
body = md(str(soup))
httpx.put(
joplin + f"/notes/{blob['id']}?token={c.get('JOPLIN_TOKEN')}",
json={"title": title, "body": body},
)
click.echo(resp)