Skip to content

Commit

Permalink
Features: Secure upload endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
1yam committed Aug 21, 2023
1 parent b25c310 commit fa1e5cf
Showing 1 changed file with 124 additions and 5 deletions.
129 changes: 124 additions & 5 deletions src/aleph/web/controllers/storage.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,49 @@
import asyncio
import base64
import datetime as dt
import functools
import logging
from hashlib import sha256
from io import StringIO
from typing import Union, Tuple, Dict, Optional

import aio_pika
from eth_account import Account
from eth_account.messages import encode_defunct

from aleph.chains.common import get_verification_buffer
from aleph.jobs.process_pending_messages import PendingMessageProcessor

from aiohttp import web
from aiohttp.web_request import FileField
from aleph_message.models import ItemType
from multidict import MultiDictProxy

from aleph.chains.chain_service import ChainService, LOGGER
from aleph.chains.nuls import NulsConnector
from aleph.db.accessors.balances import get_total_balance
from aleph.db.accessors.files import count_file_pins, get_file
from aleph.db.accessors.messages import get_message_status, message_exists
from aleph.db.connection import make_session_factory
from aleph.db.models import PendingMessageDb
from aleph.exceptions import AlephStorageException, UnknownHashError
from aleph.services.p2p import init_p2p_client
from aleph.services.storage.engine import StorageEngine
from aleph.services.storage.fileystem_engine import FileSystemStorageEngine
from aleph.storage import StorageService
from aleph.toolkit import json
from aleph.toolkit.timestamp import timestamp_to_datetime
from aleph.types.db_session import DbSessionFactory, DbSession
from aleph.utils import run_in_executor, item_type_from_hash
from aleph.web.controllers.app_state_getters import (
get_session_factory_from_request,
get_storage_service_from_request,
get_storage_service_from_request, get_mq_channel_from_request, get_config_from_request, get_mq_conn_from_request,
)
from aleph.web.controllers.utils import multidict_proxy_to_io
from aleph.schemas.pending_messages import BasePendingMessage

logger = logging.getLogger(__name__)

from aleph.schemas.pending_messages import parse_message

MAX_FILE_SIZE = 100 * 1024 * 1024

Expand Down Expand Up @@ -56,20 +84,111 @@ async def add_storage_json_controller(request: web.Request):
return web.json_response(output)


async def storage_add_file(request: web.Request):
async def verify_signature(message: BasePendingMessage) -> bool:
"""Verifies a signature of a message, return True if verified, false if not"""
verification = get_verification_buffer(message)

message_hash = await run_in_executor(
None, functools.partial(encode_defunct, text=verification.decode("utf-8"))
)

verified = False
try:
# we assume the signature is a valid string
address = await run_in_executor(
None,
functools.partial(
Account.recover_message, message_hash, signature=message.signature
),
)
if address == message.sender:
verified = True
else:
return False

except Exception as e:
verified = False
return verified


async def get_message_content(post_data: MultiDictProxy[Union[str, bytes, FileField]]) -> Tuple[dict, int]:
message_bytearray = post_data.get("message", b"")
value = post_data.get("size") or 0
if not message_bytearray:
return {}, int(value) # Empty dictionary if no message content

message_string = message_bytearray.decode("utf-8")
message_dict = json.loads(message_string)
message_dict["time"] = float(message_dict["time"])

return message_dict, int(value)


async def init_mq_con(config):
return await aio_pika.connect_robust(
host=config.p2p.mq_host.value, port=config.rabbitmq.port.value, login=config.rabbitmq.username.value,
password=config.rabbitmq.password.value
)


async def verify_and_handle_request(pending_message_db, file_io, message, size):
content = file_io.read(size)
item_content = json.loads(message["item_content"])
actual_item_hash = sha256(content).hexdigest()
c_item_hash = item_content["item_hash"]

is_signature = await verify_signature(message=pending_message_db)

if not is_signature:
output = {"status": "Forbidden"}
return web.json_response(output, status=403)
elif actual_item_hash != c_item_hash:
output = {"status": "Unprocessable Content"}
return web.json_response(output, status=422)
elif len(content) > 25_000 and not message:
output = {"status": "Unauthorized"}
return web.json_response(output, status=401)
else:
return None


async def storage_add_file_with_message(request: web.Request):
storage_service = get_storage_service_from_request(request)
session_factory = get_session_factory_from_request(request)
config = get_config_from_request(request)
mq_con = init_mq_con(config)

# No need to pin it here anymore.
# TODO: find a way to specify linked ipfs hashes in posts/aggr.
post = await request.post()
file_io = multidict_proxy_to_io(post)
message, size = await get_message_content(post)
pending_message_db = PendingMessageDb.from_message_dict(message_dict=message, reception_time=dt.datetime.now(),
fetched=True)
is_valid_message = await verify_and_handle_request(pending_message_db, file_io, message, size)
if is_valid_message is not None:
return is_valid_message

with session_factory() as session:
file_hash = await storage_service.add_file(
session=session, fileobject=file_io, engine=ItemType.storage
)
session.add(pending_message_db)
session.commit()
output = {"status": "success", "hash": file_hash}
return web.json_response(output)


async def storage_add_file(request: web.Request):
post = await request.post()
if post.get("message", b"") is not None and post.get("size") is not None:
return await storage_add_file_with_message(request)

storage_service = get_storage_service_from_request(request)
session_factory = get_session_factory_from_request(request)
file_io = multidict_proxy_to_io(post)
with session_factory() as session:
file_hash = await storage_service.add_file(
session=session, fileobject=file_io, engine=ItemType.storage
)
output = {"status": "success", "hash": file_hash}
return web.json_response(output)

Expand Down

0 comments on commit fa1e5cf

Please sign in to comment.