Skip to content

Commit

Permalink
Activity => Object: add script to migrate existing Activitys to Objects
Browse files Browse the repository at this point in the history
#286

temporarily adds back Activity model definition, will remove again later.
  • Loading branch information
snarfed committed Feb 5, 2023
1 parent 0826512 commit 2c3a693
Show file tree
Hide file tree
Showing 2 changed files with 217 additions and 3 deletions.
55 changes: 52 additions & 3 deletions models.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,55 @@ def verify(self):
return self


class Activity(StringIdModel):
"""A reply, like, repost, or other interaction that we've relayed.
Key name is 'SOURCE_URL TARGET_URL', e.g. 'http://a/reply http://orig/post'.
"""
STATUSES = ('new', 'complete', 'error', 'ignored')
PROTOCOLS = ('activitypub', 'ostatus')
DIRECTIONS = ('out', 'in')

# domains of the Bridgy Fed users this activity is to or from
domain = ndb.StringProperty(repeated=True)
status = ndb.StringProperty(choices=STATUSES, default='new')
protocol = ndb.StringProperty(choices=PROTOCOLS)
direction = ndb.StringProperty(choices=DIRECTIONS)

# usually only one of these at most will be populated.
source_mf2 = ndb.TextProperty() # JSON
source_as2 = ndb.TextProperty() # JSON
source_atom = ndb.TextProperty()
target_as2 = ndb.TextProperty() # JSON

# TODO: uncomment
created = ndb.DateTimeProperty(auto_now_add=True)
updated = ndb.DateTimeProperty(auto_now=True)

@classmethod
def _get_kind(cls):
return 'Response'

def source(self):
return self.key.id().split()[0]

def target(self):
return self.key.id().split()[1]

def to_as1(self):
"""Returns this activity as an ActivityStreams 1 dict, if available."""
if self.source_mf2:
mf2 = json_loads(self.source_mf2)
items = mf2.get('items')
if items:
mf2 = items[0]
return microformats2.json_to_object(mf2)
if self.source_as2:
return as2.to_as1(json_loads(self.source_as2))
if self.source_atom:
return atom.atom_to_activity(self.source_atom)


class Target(ndb.Model):
"""Delivery destinations. ActivityPub inboxes, webmention targets, etc.
Expand Down Expand Up @@ -257,7 +306,7 @@ class Object(StringIdModel):

# domains of the Bridgy Fed users this activity is to or from
domains = ndb.StringProperty(repeated=True)
status = ndb.StringProperty(choices=STATUSES, default='new')
status = ndb.StringProperty(choices=STATUSES)
source_protocol = ndb.StringProperty(choices=PROTOCOLS)
labels = ndb.StringProperty(repeated=True, choices=LABELS)

Expand All @@ -277,8 +326,8 @@ class Object(StringIdModel):
undelivered = ndb.StructuredProperty(Target, repeated=True)
failed = ndb.StructuredProperty(Target, repeated=True)

created = ndb.DateTimeProperty(auto_now_add=True)
updated = ndb.DateTimeProperty(auto_now=True)
created = ndb.DateTimeProperty()#auto_now_add=True)
updated = ndb.DateTimeProperty()#auto_now=True)

def proxy_url(self):
"""Returns the Bridgy Fed proxy URL to render this post as HTML."""
Expand Down
165 changes: 165 additions & 0 deletions scripts/migrate_activity_to_object.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""Convert all stored Activity entities to Objects.
https://github.com/snarfed/bridgy-fed/issues/286
Run with:
source local/bin/activate.csh
env PYTHONPATH=. GOOGLE_APPLICATION_CREDENTIALS=service_account_creds.json \
python scripts/migrate_activity_to_object.py
"""
from datetime import datetime
import json
import sys

import dateutil.parser
from google.cloud import ndb
from granary import as1
from oauth_dropins.webutil import appengine_config, util

import common
from models import Activity, Object, Target


seen = {}
latest_updated = datetime(1900, 1, 1)

with open('seen.json') as f:
# maps object id to updated timestamp, or None
seen = {k: datetime.fromisoformat(v) for k, v in json.load(f).items()}
latest_updated = seen['_latest_updated']


def run():
query = Activity.query().order(Activity.key)
if len(sys.argv) > 1:
print(f'Starting at {sys.argv[1]}')
query = query.filter(Activity.key >= ndb.Key(Activity, sys.argv[1]))
else:
print('Starting at the beginning')

id = obj = None
obj = None
num_activities = count = 0

for a in query:
if a.source() != id:
# finished the current Object
if obj:
print(f'{num_activities} total', flush=True)
obj.status = ('in progress' if obj.undelivered
else 'failed' if obj.failed
else 'complete' if obj.delivered
else 'new')
print(f' Storing object', flush=True)
obj.put()
seen[obj.key.id()] = obj.updated

for field in 'actor', 'object':
inner = obj_as1.get(field)
if isinstance(inner, dict) and inner.get('id'):
id = inner['id']
updated = inner.get('updated')
if updated:
updated = dateutil.parser.parse(updated)
published = inner.get('published')
if published:
published = dateutil.parser.parse(published)

inner_obj = Object(
id=id,
source_protocol=obj.source_protocol,
as1=json.dumps(inner),
type=as1.object_type(inner),
created=(published or updated or obj.created
).replace(tzinfo=None),
updated=(updated or published or obj.updated
).replace(tzinfo=None),
)
if id not in seen or inner_obj.updated > seen[id]:
print(f' Storing inner {field} {id}')
inner_obj.put()
seen[id] = inner_obj.updated

count += 1

id = a.source()
if id == 'UI':
id = json.loads(a.source_as2)['id']

# start a new Object
num_activities = 0
print(f'Collecting {id} ..', end='', flush=True)
assert util.is_web(id)

obj_as1 = a.to_as1()
type = as1.object_type(obj_as1)

labels = []
if obj_as1.get('objectType') == 'activity':
labels.append('activity')
if a.direction == 'out':
labels.append('user')
elif a.domain:
if type in ('like', 'share', 'follow'):
labels.append('notification')
elif type in ('note', 'article', 'post'):
labels.append('feed')

obj = Object(
id=id,
domains=a.domain,
source_protocol=('ui' if a.source() == 'UI'
else 'webmention' if a.direction == 'out'
else a.protocol),
labels=labels,
as1=json.dumps(obj_as1),
# bsky=None,
as2=a.source_as2,
mf2=a.source_mf2,
type=type,
# deleted=None,
object_ids=as1.get_ids(obj_as1, 'object'),
delivered=[],
undelivered=[],
failed=[],
created=a.created,
updated=a.updated,
)

# add this Activity to current Object
status = a.status
if a.protocol == 'ostatus':
# only 26 'complete' ostatus Activitys, all with different source URLs
obj.status = 'ignored' if a.status == 'error' else a.status
elif status != 'ignored':
dest = (obj.delivered if a.status == 'complete'
else obj.failed if a.status == 'error'
else obj.undelivered)
dest.append(Target(uri=a.target(), protocol='activitypub'))

if a.created < obj.created:
obj.created = a.created
if a.updated > obj.updated:
obj.updated = a.updated

global latest_updated
if a.updated > latest_updated:
latest_updated = a.updated

# if count == 20:
# break

num_activities += 1
print('.', end='', flush=True)


with appengine_config.ndb_client.context():
try:
run()
finally:
print(f'\n\nLatest updated: {latest_updated}', flush=True)
if seen:
seen['_latest_updated'] = latest_updated
with open('seen.json', 'w') as f:
json.dump({id: dt.isoformat() for id, dt in seen.items()}, f, indent=2)

0 comments on commit 2c3a693

Please sign in to comment.