Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @venasolutions/insights-team
36 changes: 36 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Pull Request Template

## For Vena Developers

Please include a link to the Jira ticket here

## Description

Please include a summary of the change and which issue is fixed. Also include relevant motivation and context. List any dependencies that are required for this change.

Fixes # (issue)

## Type of Change

Please delete options that are not relevant.

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] Documentation update
- [ ] Code style update (formatting, renaming)
- [ ] Refactoring (no functional changes, no API changes)
- [ ] Other (please describe):

## Checklist

- [ ] My code follows the style guidelines of this project
- [ ] I have performed a self-review of my own code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] Any dependent changes have been merged and published in downstream modules

## Additional Notes

Please add any additional information or context about the pull request here.
30 changes: 0 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 +37,6 @@ vena_etl = VenaETL(
)
```

### Getting Models and Processes

#### Get Models
```python
# Get a DataFrame of all models
models_df = vena_etl.get_models()
print(f"Found {len(models_df)} models")
print(models_df[['id', 'name', 'desc']])
```

#### Get Processes
```python
# Get a DataFrame of all processes
processes_df = vena_etl.get_processes()
print(f"Found {len(processes_df)} processes")
print(processes_df[['id', 'name', 'status', 'processFolderId']])
```

#### Get Job History
```python
# Get job history with default offset (0)
jobs = vena_etl.job_history()
print(f"Retrieved {len(jobs.get('jobs', []))} jobs")

# Get next page of jobs
next_jobs = vena_etl.job_history(offset=100)
print(f"Retrieved {len(next_jobs.get('jobs', []))} more jobs")
```

### Importing Data

#### Using DataFrame (start_with_data)
Expand Down Expand Up @@ -191,7 +162,6 @@ The package includes comprehensive error handling for:
- Data validation errors
- Job submission errors
- Job cancellation errors
- Model and process retrieval errors

## License

Expand Down
2 changes: 1 addition & 1 deletion vepi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@

from .vena_etl import VenaETL

__version__ = "0.1.2"
__version__ = "0.1.3"
__all__ = ["VenaETL"]
142 changes: 3 additions & 139 deletions vepi/vena_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,11 @@ def __init__(self, hub: str, api_user: str, api_key: str, template_id: str, mode

# API URLs
self.base_url = f'https://{hub}.vena.io/api/public/v1'
self.closed_url = f'https://{hub}.vena.io/api/'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was reading through the entire file. Maybe you and I can go over it and put comments on it for future changes. For example, the hubs constructor argument could be a enum rather than a string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'll make a separate PR for general clean-up stuff

self.start_with_data_url = f'{self.base_url}/etl/templates/{template_id}/startWithData'
self.start_with_file_url = f'{self.base_url}/etl/templates/{template_id}/startWithFile'
self.create_job_url = f'{self.base_url}/etl/templates/{template_id}/jobs'
self.job_status_url = f'{self.base_url}/etl/jobs' # Base URL for job operations
self.intersections_url = f'{self.base_url}/models/{model_id}/intersections' if model_id else None
self.models_url = f'{self.closed_url}/models'
self.processes_url = f'{self.closed_url}/processes'

# Headers for requests
self.headers = {
Expand Down Expand Up @@ -369,12 +366,12 @@ def import_dataframe(self, df: pd.DataFrame) -> None:
self.start_with_data(df)
print("Data Import Script Finished")

def export_data(self, page_size: int = 50000) -> Optional[pd.DataFrame]:
def export_data(self, page_size: int = 100000) -> Optional[pd.DataFrame]:
"""
Export intersections data from the Vena model with pagination support.

Args:
page_size (int): Number of records to fetch per page (default: 50000)
page_size (int): Number of records to fetch per page (default: 100000)

Returns:
Optional[pd.DataFrame]: DataFrame containing all intersections data, or None if there was an error
Expand Down Expand Up @@ -713,137 +710,4 @@ def cancel_job(self, job_id: str) -> Dict[str, Any]:
auth=(self.api_user, self.api_key)
)
response.raise_for_status()
return response.json()

def models(self) -> Dict[str, Any]:
url = f"{self.models_url}"
response = requests.get(url, headers=self.headers, auth=(self.api_user, self.api_key))
response.raise_for_status()
return response.json()

def get_models(self):
"""
Get a DataFrame of models with id, name, and description.

Returns:
pd.DataFrame: DataFrame containing model information with columns:
- id: Model ID
- name: Model name
- desc: Model description
"""
try:
# Get models data
models_data = self.models()

# Extract only the required fields
models_list = [{
'id': model['id'],
'name': model['name'],
'desc': model['desc']
} for model in models_data]

# Convert to DataFrame
return pd.DataFrame(models_list)

except Exception as e:
print(f"Error creating models DataFrame: {e}", file=sys.stderr)
return None

def processes(self):
url = f"{self.processes_url}"
response = requests.get(url, headers=self.headers, auth=(self.api_user, self.api_key))
response.raise_for_status()
return response.json()

def get_processes(self):
"""
Get a DataFrame of processes with their details.

Returns:
pd.DataFrame: DataFrame containing process information with columns:
- id: Process ID
- name: Process name
- status: Process status
- processFolderId: Parent process folder ID
- allModels: List of associated models
"""
try:
# Get processes data
processes_data = self.processes()

if not processes_data:
print("No processes data received")
return None

# Extract only the required fields with safe access
processes_list = []
for process in processes_data:
try:
process_info = {
'id': process.get('id', ''),
'name': process.get('name', ''),
'status': process.get('status', ''),
'processFolderId': process.get('processFolderId', ''),
'allModels': process.get('allModels', [])
}
processes_list.append(process_info)
except Exception as e:
print(f"Error processing process: {e}")
continue

if not processes_list:
print("No valid processes found in the data")
return None

# Convert to DataFrame
df = pd.DataFrame(processes_list)
print(f"Successfully created DataFrame with {len(df)} processes")
return df

except Exception as e:
print(f"Error creating processes DataFrame: {e}", file=sys.stderr)
return None

def job_history(self, offset: int = 0) -> Dict[str, Any]:
"""
Get job history from the ETL v2 API.

Args:
offset (int): Number of records to skip (default: 0)

Returns:
Dict[str, Any]: Job history data containing:
- jobs: List of job records
- total: Total number of jobs
- offset: Current offset
- limit: Number of records per page
"""
try:
# Construct the URL with fixed parameters and configurable offset
# Using the closed API URL for v2 endpoints
url = f"{self.closed_url}/etl/v2/jobs?offset={offset}&requested=100&orderBy=id&orderDirection=desc"

# Make the API request
response = requests.get(
url,
headers=self.headers,
auth=(self.api_user, self.api_key)
)
response.raise_for_status()

# Parse and return the response
data = response.json()
print(f"Retrieved {len(data.get('jobs', []))} jobs from history")
return data

except requests.exceptions.RequestException as e:
print(f"Error retrieving job history: {e}", file=sys.stderr)
if hasattr(e, 'response') and e.response is not None:
try:
error_data = e.response.json()
print(f"Error details: {error_data}", file=sys.stderr)
except:
print(f"Error response: {e.response.text}", file=sys.stderr)
return None


return response.json()