Compare commits

..

No commits in common. "master" and "1234" have entirely different histories.
master ... 1234

8 changed files with 332 additions and 1585 deletions

View File

@ -1,80 +0,0 @@
name: Release
on:
push:
branches:
- master
jobs:
check_for_changed_version:
runs-on: 'ubuntu-latest'
# Declare outputs for next jobs
outputs:
version_changed: ${{ steps.check_file_changed.outputs.version_changed }}
steps:
- uses: actions/checkout@v2
with:
# Checkout as many commits as needed for the diff
fetch-depth: 2
- id: check_file_changed
run: |
# Diff HEAD with the previous commit
if git diff HEAD^ HEAD pyproject.toml | grep -q "+version =";
then
GOTIME="True"
else
GOTIME="False"
fi
echo "::notice title=GOTIME::$GOTIME"
# Set the output named "version_changed"
echo "version_changed=$GOTIME" >> $GITHUB_OUTPUT
build:
runs-on: ubuntu-latest
# this should start working with https://github.com/go-gitea/gitea/pull/24230
# needs: [ check_for_changed_version ]
# if: needs.check_for_changed_version.outputs.version_changed == 'True'
permissions:
contents: write
steps:
- uses: https://github.com/actions/checkout@v2
- uses: https://github.com/actions/setup-python@v4
with:
python-version: '3.11.x'
- name: Install Env
# this should be all we need because shiv will download the deps itself
run: |
pip install --upgrade pip
pip install shiv
pip install poetry
- name: Add VERSION env property
run: |
echo "VERSION=v$(poetry version | python -c 'import sys;print(sys.stdin.read().split()[1])')" >> $GITHUB_ENV
echo ${{ env.VERSION }}
- name: Build the sucker
run: |
sed -i -e "s/?????/${{ env.VERSION }}/g" src/__init__.py
make build
- name: Create release!
run: |
JSON_DATA=$(
printf '%s' \
'{'\
'"tag_name":"${{ env.VERSION }}",'\
'"name":"${{ env.VERSION }}",'\
'"body":"RELEASE THE KRAKEN"'\
'}' \
)
echo """release_id=$(\
curl -X POST \
-s https://git.joekaufeld.com/api/v1/repos/${GITHUB_REPOSITORY%/*}/${{ github.event.repository.name }}/releases \
-H "Authorization: token ${{ secrets.PAT }}" \
-H 'Content-Type: application/json' \
-d "$JSON_DATA" \
| python3 -c "import sys, json; print(json.load(sys.stdin)['id'])"\
)""" >> $GITHUB_ENV
- name: Upload assets!
run: |
curl https://git.joekaufeld.com/api/v1/repos/${GITHUB_REPOSITORY%/*}/${{ github.event.repository.name }}/releases/${{ env.release_id }}/assets \
-H "Authorization: token ${{ secrets.PAT }}" \
-F attachment=@utils

39
.github/workflows/build_and_release.yml vendored Normal file
View File

@ -0,0 +1,39 @@
name: Release
on:
push:
branches:
- master
jobs:
build:
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- uses: https://github.com/actions/checkout@v2
- name: lsb_release -i -r -s
run: lsb_release -i -r -s
- uses: https://github.com/actions/setup-python@v4
with:
python-version: '3.11.x'
- name: Install Env
# this should be all we need because shiv will download the deps itself
run: |
pip install --upgrade pip
pip install shiv
pip install poetry
- name: Add CURRENT_TIME env property
run: echo "CURRENT_TIME_VERSION=v$(date '+%s')" >> $GITHUB_ENV
- name: Build the sucker
run: |
sed -i -e "s/?????/${{ env.CURRENT_TIME_VERSION }}/g" src/__init__.py
make build
- uses: https://github.com/ncipollo/release-action@v1
with:
artifacts: "utils"
body: "It's releasin' time"
generateReleaseNotes: false
tag: ${{ env.CURRENT_TIME_VERSION }}
commit: master
token: ${{ secrets.PAT }}

1
.gitignore vendored
View File

@ -129,4 +129,3 @@ dmypy.json
.pyre/
setup.py
utils
praw.ini

1475
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[tool.poetry]
name = "src"
version = "0.3.0"
version = "0.1.0"
description = ""
authors = ["Joe Kaufeld <opensource@joekaufeld.com>"]
@ -10,8 +10,6 @@ shiv = "^1.0.1"
httpx = "^0.23.0"
click = "^8.1.3"
rich = "^12.5.1"
markdownify = "^0.11.6"
praw = "^7.7.0"
[tool.poetry.dev-dependencies]
poetry = "^1.1.14"

View File

@ -6,15 +6,12 @@ import uuid
import click
import httpx
from praw import Reddit
from rich import pretty
from rich.status import Status
from rich.traceback import install
from shiv.bootstrap import current_zipfile
import src
from src.helpers import flip_char, load_config, write_config
from src.joplin import process_joplin_posts, get_folders, BASE_URL
from src.helpers import flip_char
from src.art import BANNERS
@ -24,23 +21,15 @@ from src.art import BANNERS
)
@click.pass_context
@click.version_option(version=src.__version__, prog_name="utils")
@click.option(
"--update",
is_flag=True,
help="Check Gitea for a new version and auto-update.",
)
def main(ctx, update):
def main(ctx):
"""Launch a utility or drop into a command line REPL if no command is given."""
if update:
update_from_gitea()
sys.exit()
elif ctx.invoked_subcommand is None:
if ctx.invoked_subcommand is None:
banner = random.choice(BANNERS)
pretty.install() # type: ignore
install() # traceback handler
code.interact(local=globals(), banner=banner)
code.interact(banner, None)
sys.exit()
@ -50,21 +39,6 @@ def uuid4():
click.echo(uuid.uuid4())
@main.command()
def joplin():
"""Search Joplin for notes titled 'convert' and process them."""
process_joplin_posts()
@main.command()
def objectid():
"""Generate a random ObjectID."""
new_id = ""
for _ in range(24):
new_id += random.choice(string.ascii_lowercase[:6] + string.digits)
click.echo(new_id)
@main.command()
@click.argument("dice")
def roll(dice: str):
@ -109,139 +83,39 @@ def beautify(words: list[str]):
new_beautiful_string = []
for num, letter in enumerate(message):
if num % 2:
letter = flip_char(letter)
new_beautiful_string.append(letter)
if letter in string.ascii_letters:
if num % 2:
letter = flip_char(letter)
new_beautiful_string.append(letter)
click.echo("".join(new_beautiful_string))
@main.command()
def get_saved_from_reddit() -> None:
"""Get saved posts from reddit."""
def _process(item):
fullname = item.fullname
if fullname.startswith("t3"):
# we got a post
title = f"{item.subreddit.display_name} - {item.title}"
body = item.selftext if item.selftext else item.url
elif fullname.startswith("t1"):
# comment time
try:
author_name = item.author.name
except AttributeError:
author_name = "[deleted]"
title = (
f"{item.submission.subreddit.display_name} - {item.submission.title}"
)
body = f"Comment from {author_name}:\n\n{item.body}"
else:
click.echo(f"Not sure how to process https://reddit.com{item.permalink}")
return
if item.over_18:
title = "🔴 " + title
body = "https://reddit.com" + item.permalink + "\n\n" + body
joplin = BASE_URL + cfg["JOPLIN_PORT"]
notes_url = joplin + f"/notes/?token={cfg.get('JOPLIN_TOKEN')}"
click.echo(f"Processing {title}...")
httpx.post(
notes_url,
json={
"title": title,
"body": body,
"parent_id": cfg.get("joplin_saved_posts_folder_id"),
},
)
item.unsave()
cfg = load_config()
# because 2fa is enabled, we have to get the code first
twofa_code = click.prompt("What's the current 2fa code?", type=str)
if not cfg.get("REDDIT_CLIENT_ID"):
cfg["REDDIT_CLIENT_ID"] = ""
if not cfg.get("REDDIT_CLIENT_SECRET"):
cfg["REDDIT_CLIENT_SECRET"] = ""
if not cfg.get("REDDIT_PASSWORD"):
cfg["REDDIT_PASSWORD"] = ""
if not cfg.get("REDDIT_USERNAME"):
cfg["REDDIT_USERNAME"] = ""
if not cfg.get("joplin_saved_posts_folder_id"):
cfg["joplin_saved_posts_folder_id"] = ""
write_config(cfg)
username = cfg.get("REDDIT_USERNAME", "")
r = Reddit(
client_id=cfg.get("REDDIT_CLIENT_ID", ""),
client_secret=cfg.get("REDDIT_CLIENT_SECRET", ""),
username=username,
password=f"{cfg.get('REDDIT_PASSWORD', '')}:{twofa_code}",
user_agent=f"getter of saved posts, u/{username}",
)
try:
r.user.me()
except Exception as e:
click.echo(f"Cannot connect to reddit: {e}")
return
if not cfg.get("joplin_saved_posts_folder_id"):
folders = get_folders()
click.echo("Pick the folder that I should write your saved posts to:")
for count, option in enumerate(folders["items"]):
click.echo(f"{count} - {option['title']}")
folder_position = click.prompt("Number of folder?", type=int)
folder_name = folders["items"][folder_position]["title"]
folder_id = folders["items"][folder_position]["id"]
click.echo(f"Got it, will write to {folder_name}, ID {folder_id}.")
cfg["joplin_saved_posts_folder_id"] = folder_id
write_config(cfg)
for _ in range(10):
click.echo("Getting new posts...")
for item in r.user.me().saved(limit=None):
_process(item)
def update_from_gitea():
def update():
"""Get the newest release from Gitea and install it."""
status = Status("Checking for new release...")
status.start()
response = httpx.get(
# "https://api.github.com/repos/itsthejoker/utils/releases/latest"
"https://git.joekaufeld.com/api/v1/repos/jkaufeld/utils/releases/latest"
)
if response.status_code != 200:
status.stop()
click.echo(
f"Something went wrong when talking to Gitea; got a"
f" {response.status_code} with the following content:\n"
f"{response.content}"
)
return
status.update("Checking for new release...")
release_data = response.json()
if release_data["tag_name"] == src.__version__:
status.stop()
if release_data["name"] == src.__version__:
click.echo("Server version is the same as current version; nothing to update.")
return
status.update("Updating...")
url = release_data["assets"][0]["browser_download_url"]
with current_zipfile() as archive:
with open(archive.filename, "wb") as f, httpx.stream(
"GET", url, follow_redirects=True
) as r:
with open(archive.filename, "wb") as f, httpx.stream("GET", url, follow_redirects=True) as r:
for line in r.iter_bytes():
f.write(line)
status.stop()
click.echo(f"Updated to {release_data['tag_name']}! 🎉")
click.echo(f"Updated to {release_data['name']}! 🎉")
if __name__ == "__main__":

View File

@ -1,46 +1,8 @@
from pathlib import Path
import json
import os
import string
import click
def flip_char(char: str):
if char.lower() not in string.ascii_letters:
return char
if char in string.ascii_lowercase:
return string.ascii_uppercase[string.ascii_lowercase.find(char)]
else:
return string.ascii_lowercase[string.ascii_uppercase.find(char)]
base_folder = Path(Path.home() / ".config")
config_file = base_folder / "utils_config.json"
def load_config() -> dict:
"""Try to load the computer-specific utils JSON file."""
if not base_folder.exists():
os.mkdir(base_folder)
if not config_file.exists():
with open(config_file, "w") as f:
f.write(json.dumps({}, indent=2))
return {}
try:
with open(config_file, "r") as f:
return json.load(f)
except json.JSONDecodeError:
click.echo(
f"Cannot load config -- potentially corrupt file. Check {config_file:s}"
)
return {}
def write_config(config_obj) -> None:
load_config() # make sure everything exists
with open(config_file, "w") as f:
f.write(json.dumps(config_obj, indent=2))

View File

@ -1,124 +0,0 @@
import click
import httpx
import time
import bs4
from markdownify import markdownify as md
from src.helpers import load_config, write_config
BASE_URL = "http://localhost:"
def doublecheck_port_number(port: int):
try:
if (
httpx.get(f"http://localhost:{port}/ping").content.decode()
== "JoplinClipperServer"
):
return True
except Exception:
return False
def auth_with_joplin():
config = load_config()
if not config.get("JOPLIN_PORT"):
while True:
config["JOPLIN_PORT"] = click.prompt(
"What's the port number of the Joplin Web Clipper service?",
type=int,
default=41184,
)
config["JOPLIN_PORT"] = str(config["JOPLIN_PORT"])
if doublecheck_port_number(config["JOPLIN_PORT"]):
break
else:
click.echo("Something's not right there. Is the service enabled?")
# this triggers the dialog in Joplin to accept the auth connection.
auth_token = (
httpx.post(BASE_URL + config["JOPLIN_PORT"] + "/auth").json().get("auth_token")
)
click.echo("Check Joplin to allow this connection.")
while True:
check_resp = httpx.get(
BASE_URL + config["JOPLIN_PORT"] + f"/auth/check?auth_token={auth_token}"
).json()
if check_resp.get("status") == "waiting":
time.sleep(0.5)
else:
config["JOPLIN_TOKEN"] = check_resp.get("token")
break
write_config(config)
def get_folders():
c = load_config()
first_call_success = False
try:
resp = httpx.get(
BASE_URL
+ c.get("JOPLIN_PORT", "")
+ f"/folders?token={c.get('JOPLIN_TOKEN')}"
)
if resp.status_code == 403:
click.echo("got 403")
auth_with_joplin()
else:
resp = resp.json()
first_call_success = True
except httpx._exceptions.ConnectError:
auth_with_joplin()
if not first_call_success:
c = load_config()
resp = httpx.get(
BASE_URL
+ c.get("JOPLIN_PORT", "")
+ f"/folders?token={c.get('JOPLIN_TOKEN')}"
).json()
return resp
def process_joplin_posts():
# this will handle the initial auth flow if we aren't already authed
get_folders()
c = load_config()
joplin = BASE_URL + c["JOPLIN_PORT"]
resp = httpx.get(
joplin
+ f"/search?query=convert&fields=id,parent_id&token={c.get('JOPLIN_TOKEN')}"
)
resp = resp.json()
if not resp.get("items"):
click.echo("No notes found with title 'convert'.")
return
else:
items = resp.get("items")
for blob in items:
url = (
httpx.get(
joplin
+ f"/notes/{blob['id']}?fields=body&token={c.get('JOPLIN_TOKEN')}"
)
.json()["body"]
.strip()
)
click.echo(f"Processing {url}...")
site = httpx.get(url, follow_redirects=True)
soup = bs4.BeautifulSoup(site.content, features="html5lib")
# clean up that schizz
title = soup.title.text
[t.extract() for t in soup(["script", "head", "style"])]
body = md(str(soup))
httpx.put(
joplin + f"/notes/{blob['id']}?token={c.get('JOPLIN_TOKEN')}",
json={"title": title, "body": body},
)
click.echo(resp)