Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 298 additions & 0 deletions GramAddict/core/handle_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from GramAddict.core.resources import ClassName
from GramAddict.core.storage import FollowingStatus
from GramAddict.core.utils import (
append_to_file,
get_value,
inspect_current_view,
random_choice,
Expand Down Expand Up @@ -88,6 +89,57 @@ def interact(
scraped=scraped,
)

def scrape(
storage,
is_follow_limit_reached,
username,
interaction,
device,
session_state,
current_job,
target,
on_interaction,
):
# can_follow = False
# if is_follow_limit_reached is not None:
# can_follow = not is_follow_limit_reached() and storage.get_following_status(
# username
# ) in [FollowingStatus.NONE, FollowingStatus.NOT_IN_LIST]

(
interaction_succeed,
followed,
requested,
scraped,
pm_sent,
number_of_liked,
number_of_watched,
number_of_comments,
) = interaction(device, username=username, can_follow=False)

add_interacted_user = partial(
storage.add_interacted_user,
session_id=session_state.id,
job_name=current_job,
target=target,
)

add_interacted_user(
username,
followed=False,
is_requested=False,
scraped=True,
liked=0,
watched=0,
commented=0,
pm_sent=False,
)
return on_interaction(
succeed=True,
followed=False,
scraped=True,
)


def handle_blogger(
self,
Expand Down Expand Up @@ -138,6 +190,55 @@ def handle_blogger(
):
return

def scrape_blogger(
self,
device,
session_state,
blogger,
current_job,
storage,
profile_filter,
on_interaction,
interaction,
is_follow_limit_reached,
):
if not nav_to_blogger(device, blogger, session_state.my_username):
return
can_interact = False
if storage.is_user_in_blacklist(blogger):
logger.info(f"@{blogger} is in blacklist. Skip.")
else:
interacted, interacted_when = storage.check_user_was_interacted(blogger)
if interacted:
can_reinteract = storage.can_be_reinteract(
interacted_when, get_value(self.args.can_reinteract_after, None, 0)
)
logger.info(
f"@{blogger}: already interacted on {interacted_when:%Y/%m/%d %H:%M:%S}. {'Interacting again now' if can_reinteract else 'Skip'}."
)
if can_reinteract:
can_interact = True
else:
can_interact = True

if can_interact:
logger.info(
f"@{blogger}: scrape",
extra={"color": f"{Fore.YELLOW}"},
)
if not scrape(
storage=storage,
is_follow_limit_reached=is_follow_limit_reached,
username=blogger,
interaction=interaction,
device=device,
session_state=session_state,
current_job=current_job,
target=blogger,
on_interaction=on_interaction,
):
return


def handle_blogger_from_file(
self,
Expand Down Expand Up @@ -689,6 +790,37 @@ def handle_followers(
)


def scrape_followers(
self,
device,
session_state,
username,
current_job,
storage,
on_interaction,
interaction,
is_follow_limit_reached,
scroll_end_detector,
):
is_myself = username == session_state.my_username
if not nav_to_blogger(device, username, current_job):
return

iterate_and_scrape_over_followers(
self,
device,
interaction,
is_follow_limit_reached,
storage,
on_interaction,
is_myself,
scroll_end_detector,
session_state,
current_job,
username,
)


def iterate_over_followers(
self,
device,
Expand Down Expand Up @@ -865,3 +997,169 @@ def scrolled_to_top():
extra={"color": f"{Fore.GREEN}"},
)
return

def iterate_and_scrape_over_followers(
self,
device,
interaction,
is_follow_limit_reached,
storage,
on_interaction,
is_myself,
scroll_end_detector,
session_state,
current_job,
target,
):
device.find(
resourceId=self.ResourceID.FOLLOW_LIST_CONTAINER,
className=ClassName.LINEAR_LAYOUT,
).wait(Timeout.LONG)

def scrolled_to_top():
row_search = device.find(
resourceId=self.ResourceID.ROW_SEARCH_EDIT_TEXT,
className=ClassName.EDIT_TEXT,
)
return row_search.exists()

while True:
logger.info("Iterate over visible followers.")
screen_iterated_followers = []
screen_skipped_followers_count = 0
scroll_end_detector.notify_new_page()
user_list = device.find(
resourceIdMatches=self.ResourceID.USER_LIST_CONTAINER,
)
row_height, n_users = inspect_current_view(user_list)
try:

for item in user_list:
cur_row_height = item.get_height()
if cur_row_height < row_height:
continue
user_info_view = item.child(index=1)
user_name_view = user_info_view.child(index=0).child()
if not user_name_view.exists():
logger.info(
"Next item not found: probably reached end of the screen.",
extra={"color": f"{Fore.GREEN}"},
)
break

username = user_name_view.get_text()
screen_iterated_followers.append(username)
scroll_end_detector.notify_username_iterated(username)

can_interact = False
if storage.is_user_in_blacklist(username):
logger.info(f"@{username} is in blacklist. Skip.")
else:
interacted, interacted_when = storage.check_user_was_interacted_with_target(
username, target
)
if interacted:
can_reinteract = storage.can_be_reinteract(
interacted_when,
get_value(self.args.can_reinteract_after, None, 0),
)
logger.info(
f"@{username}: already interacted with @{target} on {interacted_when:%Y/%m/%d %H:%M:%S}. {'Interacting again now' if can_reinteract else 'Skip'}."
)
if can_reinteract:
can_interact = True
else:
screen_skipped_followers_count += 1
else:
can_interact = True

if can_interact:
if not session_state.check_limit(limit_type=self.session_state.Limit.TOTAL, output=True):
logger.info(
f"@{username}: interact", extra={"color": f"{Fore.YELLOW}"}
)
# Scrape the username to file
append_to_file(f"{target}_followers", username)
storage.add_interacted_user(username, session_state.id, target=target)
session_state.add_interaction(source=username, succeed=True, followed=False, scraped=True)
else:
return

except IndexError:
logger.info(
"Cannot get next item: probably reached end of the screen.",
extra={"color": f"{Fore.GREEN}"},
)

if is_myself and scrolled_to_top():
logger.info("Scrolled to top, finish.", extra={"color": f"{Fore.GREEN}"})
return
elif len(screen_iterated_followers) > 0:
load_more_button = device.find(
resourceId=self.ResourceID.ROW_LOAD_MORE_BUTTON
)
load_more_button_exists = load_more_button.exists()

if scroll_end_detector.is_the_end():
return

need_swipe = screen_skipped_followers_count == len(
screen_iterated_followers
)
list_view = device.find(
resourceId=self.ResourceID.LIST, className=ClassName.LIST_VIEW
)
if not list_view.exists():
logger.error(
"Cannot find the list of followers. Trying to press back again."
)
device.back()
list_view = device.find(
resourceId=self.ResourceID.LIST,
className=ClassName.LIST_VIEW,
)

if is_myself:
logger.info("Need to scroll now", extra={"color": f"{Fore.GREEN}"})
list_view.scroll(Direction.UP)
else:
pressed_retry = False
if load_more_button_exists:
retry_button = load_more_button.child(
className=ClassName.IMAGE_VIEW,
descriptionMatches=case_insensitive_re("Retry"),
)
if retry_button.exists():
random_sleep()
"""It exist but can disappear without pressing on it"""
if retry_button.exists():
logger.info('Press "Load" button and wait few seconds.')
retry_button.click_retry()
random_sleep(5, 10, modulable=False)
pressed_retry = True

if need_swipe and not pressed_retry:
scroll_end_detector.notify_skipped_all()
if scroll_end_detector.is_skipped_limit_reached():
return
if scroll_end_detector.is_fling_limit_reached():
logger.info(
"Limit of all followers skipped reached, let's fling.",
extra={"color": f"{Fore.GREEN}"},
)
list_view.fling(Direction.DOWN)
else:
logger.info(
"All followers skipped, let's scroll.",
extra={"color": f"{Fore.GREEN}"},
)
list_view.scroll(Direction.DOWN)
else:
logger.info("Need to scroll now", extra={"color": f"{Fore.GREEN}"})
list_view.scroll(Direction.DOWN)
else:
logger.info(
"No followers were iterated, finish.",
extra={"color": f"{Fore.GREEN}"},
)
return
Loading