Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ShareDataFrame class #241

Merged
merged 18 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .github/workflows/test-server-all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,49 @@ jobs:
run: docker-compose ${{ env.COMPOSE_FILES_OPT }} run medium-libc pytest src/tests/test_${{ matrix.test_name }}.py -s -v -log-cli-level=DEBUG
working-directory: ./scripts

medium_test_libclient_new:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
test_name:
[
correl,
get_elapsed_time,
job_error_info,
join,
mean,
parallel,
send_share,
string,
sum,
token,
variance,
get_computation_result
]
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0
token: ${{ secrets.CI_REPOSITORY_ACCESS_TOKEN }}
submodules: true

- name: Update .bazelrc for using remote cache
run: |
eval "echo \"$(cat ./.github/workflows/.bazelrc_for_ci)\"" >> ./packages/server/computation_container/.bazelrc
env:
BUILDBUDDY_API_KEY: ${{ secrets.BUILDBUDDY_API_KEY }}

- name: Log into registry
run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u ${{ github.repository_owner }} --password-stdin

- uses: ./.github/actions/prepare_libclient_test

- name: Run container test
run: docker-compose ${{ env.COMPOSE_FILES_OPT }} run medium-libc pytest src/tests_new/test_${{ matrix.test_name }}.py -s -v -log-cli-level=DEBUG
working-directory: ./scripts

medium_test_container_up:
runs-on: ubuntu-latest
steps:
Expand Down
2 changes: 2 additions & 0 deletions packages/client/libclient-py/quickmpc/qmpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class QMPC:
def __post_init__(self, endpoints: List[str],
retry_num: int, retry_wait_time: int):
logger.info(f"[QuickMPC server IP]={endpoints}")
object.__setattr__(self, "_QMPC__qmpc_request", QMPCRequest(
endpoints, retry_num, retry_wait_time))
KotaTakahashi9320 marked this conversation as resolved.
Show resolved Hide resolved
object.__setattr__(self, "_QMPC__qmpc_request", QMPCRequest(
endpoints, retry_num, retry_wait_time))
object.__setattr__(self, "_QMPC__party_size", len(endpoints))
Expand Down
135 changes: 135 additions & 0 deletions packages/client/libclient-py/quickmpc/qmpc_new.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from dataclasses import dataclass, field, InitVar
from typing import List, Union

import pandas as pd

from .qmpc_request import QMPCRequest
from .request.qmpc_request_interface import QMPCRequestInterface
from .restore import restore
from .share_data_frame import ShareDataFrame
from .utils.overload_tools import Dim1, methoddispatch
from .utils.parse_csv import to_float


@dataclass(frozen=True)
class QMPC:
"""QuickMPCのdataのやり取りをするクラス

Attributes
----------
arg: Union[List[str], QMPCRequestInterface]]
parties: List[str]
serverのIP
qmpc_request: QMPCRequestMock
qmpc serverに対してrequestを送るinterface
"""

arg: InitVar[Union[List[str], QMPCRequestInterface]]

__qmpc_request: QMPCRequestInterface = field(init=False)

@methoddispatch()
def __post_init__(self, *args, **kwargs):
raise NotImplementedError("QMPCClientにサポートしていない値が与えられた.")

@__post_init__.register(Dim1)
def __post_init__default(self, parties: List[str]):
object.__setattr__(self, "_QMPC__qmpc_request",
QMPCRequest(parties))

@__post_init__.register(QMPCRequest)
def __post_init__original(self, qmpc_request: QMPCRequest):
# mydispatchを改造してinterfaceでオーバーロードさせる
object.__setattr__(self, "_QMPC__qmpc_request", qmpc_request)

def read_csv(self, *args, index_col: str, **kwargs) -> pd.DataFrame:
"""csvからテーブルデータを読み込む.

テーブル結合処理に用いる列がどの列かを`index_col`で指定する必要がある.
`index_col`以外の引数は全てpandasのread_csvと同じ.

Parameters
----------
filepath_or_buffer: str, path object or file-like object
らしい
index_col: str
ID列としたいカラム名

Returns
----------
pd.DataFrame
読み込んだテーブルデータ
"""
df = pd.read_csv(*args, **kwargs)
# ID列を数値化
df[index_col] = df[index_col].map(lambda x: to_float(x))
# join時にQMPCのCC側でID列でsortできる様に、座圧を行いindexに設定しておく
df["original_index"] = df.index
df = df.sort_values(by=index_col)
df = df.reset_index(drop=True)
df = df.sort_values(by="original_index")
df = df.drop('original_index', axis=1)
df.set_index(index_col)
return df

def send_to(self, df: pd.DataFrame) -> ShareDataFrame:
"""QuickMPCサーバにデータを送信する.

Parameters
----------
df: df.DataFrame
送信するデータ

Returns
----------
ShareDataFrame
QuickMPC形式のDataframe
"""
# send_shareできる形式に変換
res = self.__qmpc_request.send_share(df, piece_size=1_000_000)
return ShareDataFrame(res.data_id, self.__qmpc_request)

def load_from(self, data_id: str) -> ShareDataFrame:
"""既に送信してあるデータを参照する.

Parameters
----------
data_id: str
既に送信してあるデータのID

Returns
----------
ShareDataFrame
QuickMPC形式のDataframe
"""
return ShareDataFrame(data_id, self.__qmpc_request)

# TODO: job_uuidとparty_sizeは指定しなくても良いようにしたい
def restore(self, job_uuid: str, filepath: str, party_size: int) \
-> ShareDataFrame:
"""既に送信してあるデータを参照する.

Parameters
----------
job_uuid: str
データのID
filepath: str
dataが保存してあるディレクトリ

Returns
----------
ShareDataFrame
QuickMPC形式のDataframe
"""
# TODO: get_computation_resultと同じ処理なのでうまくまとめる
res = restore(job_uuid, filepath, party_size)
if res is None:
return pd.DataFrame()
if type(res) == dict:
if res["schema"] is None or res["table"] is None:
return pd.DataFrame()
schema = [s.name for s in res["schema"]]
df = pd.DataFrame(res["table"],
columns=schema)
return df
return pd.DataFrame(res)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import List
from typing import List, Optional

import pandas as pd

Expand Down Expand Up @@ -41,11 +41,12 @@ def meshcode(self, data_ids: List[str], columns: List[int]) \
-> ExecuteResponse: ...

@abstractmethod
def join(self, data_ids: List[str]) \
def join(self, data_ids: List[str], *, debug_mode: bool) \
-> ExecuteResponse: ...

@abstractmethod
def get_computation_result(self, job_uuid: str, output_path: str) \
def get_computation_result(self, job_uuid: str,
output_path: Optional[str]) \
-> GetResultResponse: ...

@abstractmethod
Expand Down
Loading