Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 28 additions & 73 deletions pkg/component/generic/http/v0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ import (
"errors"
"fmt"
"mime"
"net"
"net/http"
"net/url"
"os"
"slices"
"strings"
"sync"

Expand All @@ -20,7 +16,6 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

"github.com/instill-ai/pipeline-backend/config"
"github.com/instill-ai/pipeline-backend/pkg/component/base"
"github.com/instill-ai/pipeline-backend/pkg/component/internal/util/httpclient"
"github.com/instill-ai/pipeline-backend/pkg/data"
Expand Down Expand Up @@ -63,6 +58,7 @@ var (

type component struct {
base.Component
urlValidator URLValidator
}

type execution struct {
Expand All @@ -72,21 +68,13 @@ type execution struct {
execute func(context.Context, *base.Job) error
}

// isTestEnvironment detects if we're running in a test environment
func isTestEnvironment() bool {
// Check if we're running under go test
for _, arg := range os.Args {
if strings.Contains(arg, "test") {
return true
}
}
// Also check for test environment variable
return os.Getenv("GO_TESTING") == "true"
}

// Init creates a component instance for production use
func Init(bc base.Component) *component {
once.Do(func() {
comp = &component{Component: bc}
comp = &component{
Component: bc,
urlValidator: NewURLValidator(),
}
err := comp.LoadDefinition(definitionYAML, setupYAML, tasksYAML, nil, nil)
if err != nil {
panic(err)
Expand All @@ -95,6 +83,21 @@ func Init(bc base.Component) *component {
return comp
}

// InitForTest creates a component instance for testing with configurable validation
// whitelist: URLs to allow (nil/empty = allow all external URLs)
// allowLocalhost: whether to allow localhost/127.x.x.x URLs
func InitForTest(bc base.Component, whitelist []string, allowLocalhost bool) *component {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd put this away from the production code

c := &component{
Component: bc,
urlValidator: NewTestURLValidator(whitelist, allowLocalhost),
}
err := c.LoadDefinition(definitionYAML, setupYAML, tasksYAML, nil, nil)
if err != nil {
panic(err)
}
return c
}

func (c *component) CreateExecution(x base.ComponentExecution) (base.IExecution, error) {

// We may have different url in batch.
Expand Down Expand Up @@ -159,60 +162,12 @@ func (e *execution) Execute(ctx context.Context, jobs []*base.Job) error {
return base.ConcurrentExecutor(ctx, jobs, e.execute)
}

// validateURL checks the component's input is a valid URL. This component only
// validateInput checks the component's input is a valid URL. In production mode, this component only
// accepts requests to *publicly available* endpoints. Any call to the internal
// network will produce an error.
func (e *execution) validateURL(endpointURL string) error {
// Skip validation in test environment
if isTestEnvironment() {
return nil
}
parsedURL, err := url.Parse(endpointURL)
if err != nil {
return errorsx.AddMessage(
fmt.Errorf("parsing endpoint URL: %w", err),
"Couldn't parse the endpoint URL as a valid URI reference",
)
}

host := parsedURL.Hostname()
if host == "" {
err := fmt.Errorf("missing hostname")
return errorsx.AddMessage(err, "Endpoint URL must have a hostname")
}

var whitelistedHosts = []string{
// Pipeline's public port is exposed to call pipelines from pipelines.
// When a `pipeline` component is implemented, this won't be necessary.
fmt.Sprintf("%s:%d", config.Config.Server.InstanceID, config.Config.Server.PublicPort),
// Model's public port is exposed until the model component allows
// triggering models in the custom mode.
fmt.Sprintf("%s:%d", config.Config.ModelBackend.Host, config.Config.ModelBackend.PublicPort),
}
// Certain pipelines used by artifact-backend need to trigger pipelines and
// models via this component.
// TODO jvallesm: Remove this after INS-8119 is completed.
if slices.Contains(whitelistedHosts, parsedURL.Host) {
return nil
}

// Get IP addresses for the host
ips, err := net.LookupIP(host)
if err != nil {
return fmt.Errorf("looking up IP: %w", err)
}

// Check if any resolved IP is in private ranges
for _, ip := range ips {
if ip.IsPrivate() || ip.IsLoopback() {
return errorsx.AddMessage(
fmt.Errorf("endpoint URL resolves to private/internal IP address"),
"URL must point to a publicly available endpoint (no private/internal addresses)",
)
}
}

return nil
// network will produce an error. In test mode, behavior is controlled by whitelist and localhost settings.
func (e *execution) validateInput(input *httpInput) error {
comp := e.Component.(*component)
return comp.urlValidator.ValidateInput(input)
}

func (e *execution) executeHTTP(ctx context.Context, job *base.Job) error {
Expand All @@ -221,8 +176,8 @@ func (e *execution) executeHTTP(ctx context.Context, job *base.Job) error {
return fmt.Errorf("reading input data: %w", err)
}

if err := e.validateURL(in.EndpointURL); err != nil {
return fmt.Errorf("validating URL: %w", err)
if err := e.validateInput(&in); err != nil {
return fmt.Errorf("validating input: %w", err)
}

// An API error is a valid output in this component.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"io"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"

qt "github.com/frankban/quicktest"
"google.golang.org/protobuf/types/known/structpb"

qt "github.com/frankban/quicktest"

"github.com/instill-ai/pipeline-backend/pkg/component/base"
"github.com/instill-ai/pipeline-backend/pkg/component/internal/mock"
"github.com/instill-ai/pipeline-backend/pkg/data"
Expand Down Expand Up @@ -228,10 +228,6 @@ func TestComponent(t *testing.T) {
c := qt.New(t)
c.Parallel()

// Set test environment to bypass URL validation
os.Setenv("GO_TESTING", "true")
defer os.Unsetenv("GO_TESTING")

// respEquals returns a checker for equality between the received response
// and the expected one.
respEquals := func(want httpOutput) func(*qt.C, httpOutput) {
Expand Down Expand Up @@ -524,7 +520,8 @@ func TestComponent(t *testing.T) {
actualInput := tc.input
actualInput.EndpointURL = strings.Replace(actualInput.EndpointURL, "PLACEHOLDER_URL", server.URL, 1)

component := Init(base.Component{})
// Use InitForTest with localhost enabled to create component that allows localhost URLs
component := InitForTest(base.Component{}, nil, true)
c.Assert(component, qt.IsNotNil)

execution, err := component.CreateExecution(base.ComponentExecution{
Expand Down
136 changes: 136 additions & 0 deletions pkg/component/generic/http/v0/validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package http

import (
"fmt"
"net"
"net/url"
"slices"
"strings"

"github.com/instill-ai/pipeline-backend/config"
errorsx "github.com/instill-ai/x/errors"
)

// URLValidator defines the interface for validating HTTP input
type URLValidator interface {
ValidateInput(input *httpInput) error
}

// urlValidator provides common validation logic
type urlValidator struct {
whitelistedEndpoints []string
allowLocalhost bool
allowPrivateIPs bool
}

// NewURLValidator creates a validator for production use
func NewURLValidator() URLValidator {
return &urlValidator{allowPrivateIPs: false}
}

// NewTestURLValidator creates a validator for testing
func NewTestURLValidator(whitelistedEndpoints []string, allowLocalhost bool) URLValidator {
return &urlValidator{
whitelistedEndpoints: whitelistedEndpoints,
allowLocalhost: allowLocalhost,
allowPrivateIPs: true, // Test mode allows external URLs by default
}
}
Comment on lines +31 to +38
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we define this in the non-production code (*_test.go files)?


// ValidateInput implements the consolidated validation logic
func (v *urlValidator) ValidateInput(input *httpInput) error {
if input == nil {
return fmt.Errorf("input cannot be nil")
}

endpointURL := input.EndpointURL
if endpointURL == "" {
return errorsx.AddMessage(
fmt.Errorf("endpoint URL is required"),
"Endpoint URL must be provided",
)
}

parsedURL, err := url.Parse(endpointURL)
if err != nil {
return errorsx.AddMessage(
fmt.Errorf("parsing endpoint URL: %w", err),
"Couldn't parse the endpoint URL as a valid URI reference",
)
}

host := parsedURL.Hostname()
if host == "" {
return errorsx.AddMessage(
fmt.Errorf("missing hostname"),
"Endpoint URL must have a hostname",
)
}

// Check explicit whitelist first
for _, allowedEndpoint := range v.whitelistedEndpoints {
if strings.HasPrefix(endpointURL, allowedEndpoint) {
return nil
}
}

// Production-specific whitelisted hosts
if !v.allowPrivateIPs {
prodWhitelist := []string{
// Pipeline's public port is exposed to call pipelines from pipelines.
// When a `pipeline` component is implemented, this won't be necessary.
fmt.Sprintf("%s:%d", config.Config.Server.InstanceID, config.Config.Server.PublicPort),
// Model's public port is exposed until the model component allows
// triggering models in the custom mode.
fmt.Sprintf("%s:%d", config.Config.ModelBackend.Host, config.Config.ModelBackend.PublicPort),
}
// Certain pipelines used by artifact-backend need to trigger pipelines and
// models via this component.
// TODO jvallesm: Remove this after INS-8119 is completed.
if slices.Contains(prodWhitelist, parsedURL.Host) {
return nil
}
}
Comment on lines +78 to +93
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this validator gives us the possibility to move the the model and pipeline whitelisting away from the CE repo, as it is used only in production.
I'd make the whitelist parameter a constructor param for the validator and then in the program's main.go I'd read the whitelisted hosts from the config, create the URL validator with them and pass it to the component store.

However, I'm fine with having this here out of simplicity and fairness with CE users (they might also want to trigger pipelines or models from other pipelines)


// Check localhost allowance
if v.allowLocalhost && (host == "localhost" || host == "127.0.0.1" || strings.HasPrefix(host, "127.")) {
return nil
}

// Production mode: check if IP is private and block private IPs
if !v.allowPrivateIPs {
ips, err := net.LookupIP(host)
if err != nil {
return fmt.Errorf("looking up IP: %w", err)
}

for _, ip := range ips {
if ip.IsPrivate() || ip.IsLoopback() {
return errorsx.AddMessage(
fmt.Errorf("endpoint URL resolves to private/internal IP address"),
"URL must point to a publicly available endpoint (no private/internal addresses)",
)
}
}
// Production mode: allow public IPs
return nil
}

// Test mode: apply whitelist and localhost restrictions
if len(v.whitelistedEndpoints) > 0 {
return errorsx.AddMessage(
fmt.Errorf("endpoint URL not in test whitelist"),
fmt.Sprintf("URL %s is not in the whitelisted endpoints for testing", endpointURL),
)
}

if !v.allowLocalhost {
return errorsx.AddMessage(
fmt.Errorf("endpoint URL not allowed"),
"No endpoints are whitelisted for testing and localhost access is disabled",
)
}

// Test mode: allow if localhost is enabled
return nil
}
Loading
Loading