Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

instagram: better normalising/error handling #325

Merged
merged 2 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 49 additions & 34 deletions my/instagram/android.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime
import json
from pathlib import Path
import sqlite3
from typing import Iterator, Sequence, Optional, Dict, Union

from more_itertools import unique_everseen
Expand All @@ -22,6 +23,7 @@
assert_never,
)
from my.core.cachew import mcachew
from my.core.error import echain
from my.core.sqlite import sqlite_connect_immutable, select

from my.config import instagram as user_config
Expand Down Expand Up @@ -132,6 +134,48 @@ def _parse_message(j: Json) -> Optional[_Message]:
)


def _process_db(db: sqlite3.Connection) -> Iterator[Res[Union[User, _Message]]]:
# TODO ugh. seems like no way to extract username?
# sometimes messages (e.g. media_share) contain it in message field
# but generally it's not present. ugh
for (self_uid,) in select(('user_id',), 'FROM session', db=db):
yield User(
id=str(self_uid),
full_name=config.full_name or 'USERS_OWN_FULL_NAME',
username=config.full_name or 'USERS_OWN_USERNAME',
)

for (thread_json,) in select(('thread_info',), 'FROM threads', db=db):
j = json.loads(thread_json)
# todo in principle should leave the thread attached to the message?
# since thread is a group of users?
pre_users = []
# inviter usually contains our own user
if 'inviter' in j:
# sometimes it's missing (e.g. in broadcast channels)
pre_users.append(j['inviter'])
pre_users.extend(j['recipients'])
for r in pre_users:
# id disappeared and seems that pk_id is in use now (around december 2022)
uid = r.get('id') or r.get('pk_id')
assert uid is not None
yield User(
id=str(uid), # for some reason it's int in the db
full_name=r['full_name'],
username=r['username'],
)

for (msg_json,) in select(('message',), 'FROM messages ORDER BY timestamp', db=db):
# eh, seems to contain everything in json?
j = json.loads(msg_json)
try:
m = _parse_message(j)
if m is not None:
yield m
except Exception as e:
yield e


def _entities() -> Iterator[Res[Union[User, _Message]]]:
# NOTE: definitely need to merge multiple, app seems to recycle old messages
# TODO: hmm hard to guarantee timestamp ordering when we use synthetic input data...
Expand All @@ -140,40 +184,11 @@ def _entities() -> Iterator[Res[Union[User, _Message]]]:
for f in dbs:
logger.info(f'{f} : processing...')
with sqlite_connect_immutable(f) as db:
# TODO ugh. seems like no way to extract username?
# sometimes messages (e.g. media_share) contain it in message field
# but generally it's not present. ugh
for (self_uid,) in select(('user_id',), 'FROM session', db=db):
yield User(
id=str(self_uid),
full_name=config.full_name or 'USERS_OWN_FULL_NAME',
username=config.full_name or 'USERS_OWN_USERNAME',
)

for (thread_json,) in select(('thread_info',), 'FROM threads', db=db):
j = json.loads(thread_json)
# todo in principle should leave the thread attached to the message?
# since thread is a group of users?
# inviter usually contains our own user
for r in [j['inviter'], *j['recipients']]:
# id disappeared and seems that pk_id is in use now (around december 2022)
uid = r.get('id') or r.get('pk_id')
assert uid is not None
yield User(
id=str(uid), # for some reason it's int in the db
full_name=r['full_name'],
username=r['username'],
)

for (msg_json,) in select(('message',), 'FROM messages ORDER BY timestamp', db=db):
# eh, seems to contain everything in json?
j = json.loads(msg_json)
try:
m = _parse_message(j)
if m is not None:
yield m
except Exception as e:
yield e
try:
yield from _process_db(db=db)
except Exception as e:
# todo use error policy here
yield echain(RuntimeError(f'While processing {f}'), cause=e)


@mcachew(depends_on=inputs)
Expand Down
43 changes: 32 additions & 11 deletions my/instagram/gdpr.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pathlib import Path
from typing import Iterator, Sequence, Dict, Union

from more_itertools import bucket
from more_itertools import bucket, unique_everseen

from my.core import (
get_files,
Expand Down Expand Up @@ -69,7 +69,20 @@ def _decode(s: str) -> str:


def _entities() -> Iterator[Res[Union[User, _Message]]]:
last = max(inputs())
# it's worth processing all previous export -- sometimes instagram removes some metadata from newer ones
# NOTE: here there are basically two options
# - process inputs as is (from oldest to newest)
# this would be more stable wrt newer exports (e.g. existing thread ids won't change)
# the downside is that newer exports seem to have better thread ids, so might be preferrable to use them
# - process inputs reversed (from newest to oldest)
# the upside is that thread ids/usernames might be better
# the downside is that if for example the user renames, thread ids will change _a lot_, might be undesirable..
# (from newest to oldest)
for path in inputs():
yield from _entitites_from_path(path)


def _entitites_from_path(path: Path) -> Iterator[Res[Union[User, _Message]]]:
# TODO make sure it works both with plan directory
# idelaly get_files should return the right thing, and we won't have to force ZipPath/match_structure here
# e.g. possible options are:
Expand All @@ -84,10 +97,10 @@ def _entities() -> Iterator[Res[Union[User, _Message]]]:
# whereas here I don't need it..
# so for now will just implement this adhoc thing and think about properly fixing later

personal_info = last / 'personal_information'
personal_info = path / 'personal_information'
if not personal_info.exists():
# old path, used up to somewhere between feb-aug 2022
personal_info = last / 'account_information'
personal_info = path / 'account_information'

j = json.loads((personal_info / 'personal_information.json').read_text())
[profile] = j['profile_user']
Expand All @@ -104,8 +117,8 @@ def _entities() -> Iterator[Res[Union[User, _Message]]]:
)
yield self_user

files = list(last.rglob('messages/inbox/*/message_*.json'))
assert len(files) > 0, last
files = list(path.rglob('messages/inbox/*/message_*.json'))
assert len(files) > 0, path

buckets = bucket(files, key=lambda p: p.parts[-2])
file_map = {k: list(buckets[k]) for k in buckets}
Expand All @@ -126,7 +139,7 @@ def _entities() -> Iterator[Res[Union[User, _Message]]]:
# so I feel like there is just not guaranteed way to correlate :(
other_id = fname[-id_len:]
# NOTE: no match in android db?
other_username = fname[:-id_len - 1]
other_username = fname[: -id_len - 1]
other_full_name = _decode(j['title'])
yield User(
id=other_id,
Expand All @@ -135,7 +148,7 @@ def _entities() -> Iterator[Res[Union[User, _Message]]]:
)

# todo "thread_type": "Regular" ?
for jm in j['messages']:
for jm in reversed(j['messages']): # in json, they are in reverse order for some reason
try:
content = None
if 'content' in jm:
Expand All @@ -144,7 +157,15 @@ def _entities() -> Iterator[Res[Union[User, _Message]]]:
# ugh. for some reason these contain an extra space and that messes up message merging..
content = content.strip()
else:
share = jm.get('share')
if (share := jm.get('share')) is not None:
if (share_link := share.get('link')) is not None:
# somewhere around 20231007, instagram removed these from gdpr links and they show up a lot in various diffs
share_link = share_link.replace('feed_type=reshare_chaining&', '')
share_link = share_link.replace('?feed_type=reshare_chaining', '')
share['link'] = share_link
if (share_text := share.get('share_text')) is not None:
share['share_text'] = _decode(share_text)

photos = jm.get('photos')
videos = jm.get('videos')
cc = share or photos or videos
Expand All @@ -166,7 +187,7 @@ def _entities() -> Iterator[Res[Union[User, _Message]]]:
created=datetime.fromtimestamp(timestamp_ms / 1000),
text=content,
user_id=user_id,
thread_id=fname, # meh.. but no better way?
thread_id=fname, # meh.. but no better way?
)
except Exception as e:
yield e
Expand All @@ -175,7 +196,7 @@ def _entities() -> Iterator[Res[Union[User, _Message]]]:
# TODO basically copy pasted from android.py... hmm
def messages() -> Iterator[Res[Message]]:
id2user: Dict[str, User] = {}
for x in _entities():
for x in unique_everseen(_entities()):
if isinstance(x, Exception):
yield x
continue
Expand Down