Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Compressor: Support metadata other than Name for trained compression #7017

Open
coxley opened this issue Mar 4, 2024 · 8 comments
Open
Assignees
Labels
Area: RPC Features Includes Compression, Encoding, Attributes/Metadata, Interceptors. P2 Type: Feature New features or improvements in behavior

Comments

@coxley
Copy link
Contributor

coxley commented Mar 4, 2024

According to a comment on #7003, the grpc-go maintainers would like feedback on the API before eventually marking it as stable. So here are some of my current thoughts. :)

Use case(s) - what problem will this feature solve?

The only "metadata" supported for gRPC compressors today is the name. This is sent in the content encoding header between the client and server.

This works for most use-cases, but it breaks down a bit if the compressor needs extra context about the request being compressed. Specifically for dictionary compression when using a format like zstd. Decompression is a bit easier — assuming your dictionary IDs are unique.

For example, we train dictionaries by "category" and "namespace". A "category" would relate to a common type of data (eg: pubsub topic), whereas all of the instances of that data in a given "namespace" relate closely to each other and are what dictionaries are trained on.

The gRPC client knows what "category" and "namespace" it's issuing a request for, but the Compressor only has access to an io.Writer.

If the Compressor had extra context, it could decide which dictionary to compress with. Or alternatively, the decision on which to use could be done before invoking the call and passed explicitly. Unless I'm missing something, neither is possible today.

I can only think of two workarounds at the moment, neither of which feel great:

  • Register a unique compressor name for every "category+namespace", but that feels unwieldy — there are hundreds of namespaces and new ones pop up.
  • Commit great sins by instead using ForceCodec at the client side. We can provide it a unique object that uses a common encoding name, but knows how to select the appropriate dictionary for compression. The server won't have access to that context, but we can make dictionary IDs globally unique so it won't matter. (they're in the zstd header)

Proposed Solution

I assume you folks would know the best way to fit this in organically. My current thoughts are:

  • A new CallOption to pass metadata down to the compressor:
    • CompressionContext
    • CompressionMetadata
    • CompressionDictID
    • ???
  • Giving the Compressor access to *metadata.MD

The latter is more flexible and makes less assumptions. CompressionDictID would only work for us because we are fine with 4 byte globally distinct IDs. But that is likely a deal breaker for other workloads. Setting custom metadata to control how this is resolved would let both the client and the server make the right call.

@coxley coxley added the Type: Feature New features or improvements in behavior label Mar 4, 2024
@coxley
Copy link
Contributor Author

coxley commented Mar 4, 2024

Here's an example for how I will probably work around this until there's a better way. Feel free to steer me another direction :)

Protobuf

syntax = "proto3";

package zstd;
option go_package = "repro/zstd";

// Compressed is a wrapper for zstd encoded data in lieu of better ways of sharing metadata
message Compressed {
  string category = 1;
  string namespace = 2;
  bytes data = 3;
}

Codec

// codec is a dictionary-aware compression implementation
//
// Given a category and namespace, it knows how to find the correct dictionary
// to compress with. For decompression, it reads the dictionary ID from the
// zstd payload.
//
// This could later be augmented to have decompression IDs category:namespace
// aware vs. assuming they're globally scoped.
type codec struct {
	Category   string
	Namespace string
}

func (c *codec) Marshal(v any) ([]byte, error) {
	return marshal(c.Category, c.Namespace, v)
}

func (c *codec) Unmarshal(data []byte, v any) error {
	return unmarshal(data, v)
}

func (c *codec) Name() string {
	return "zstd-wrapper"
}

// marshal and compress 'v', wrap in [zstd.Compressed], and return the result
// of marshalling that
//
// Skip wrapping if 'v' is already a [zstd.Compressed]
func marshal(category, namespace string, v any) ([]byte, error) {
	switch vv := v.(type) {
	case *zstd.Compressed:
		return proto.Marshal(vv)
	case proto.Message:
		data, err := proto.Marshal(vv)
		if err != nil {
			return nil, err
		}
		container := &zstd.Compressed{
			Category:   category,
			Namespace: namespace,
			// TODO: Imagine there was compression happening before this
			Data: data,
		}

		fmt.Println("using the codec for compression")
		return proto.Marshal(container)
	default:
		return nil, fmt.Errorf("unsure what to do for type: %T", v)
	}
}

// unmarshal data into a [zstd.Compressed], then unmarshal it's data field into 'v'
func unmarshal(data []byte, v any) error {
	switch vv := v.(type) {
	case *zstd.Compressed:
		return proto.Unmarshal(data, vv)
	case proto.Message:
		var container zstd.Compressed
		if err := proto.Unmarshal(data, &container); err != nil {
			return err
		}
		fmt.Println("using the codec for decompression")
		// TODO: Imagine there was decompression happening before this
		return proto.Unmarshal(container.Data, vv)
	default:
		return fmt.Errorf("unsure what to do for type: %T", v)
	}
}

Usage

package main

import (
	"context"
	"fmt"
	"log"
	"net"

	zstd "repro/gen"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/encoding"
	pb "google.golang.org/grpc/examples/helloworld/helloworld"
	"google.golang.org/protobuf/proto"
)

const addr = "localhost:50051"

type server struct {
	pb.UnimplementedGreeterServer
}

func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
	return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

func main() {
	// Register default codec without category/namespace for decompression
	encoding.RegisterCodec(&codec{})

	lis, err := net.Listen("tcp", addr)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	s := grpc.NewServer()
	pb.RegisterGreeterServer(s, &server{})

	go func() {
		if err := s.Serve(lis); err != nil {
			log.Fatalf("failed to serve: %v", err)
		}
	}()

	// Now connect from client
	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	c := pb.NewGreeterClient(conn)
	r, err := c.SayHello(
		context.Background(),
		&pb.HelloRequest{Name: "coxley"},
		// TODO: This would ideally be cached and retrieved via an accessor somewhere
		grpc.ForceCodec(&codec{Category: "test", Namespace: "foo"}),
	)
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	fmt.Printf("Result: %v\n", r)
}

@coxley coxley changed the title New Compressor: Support metadata other than Name New Compressor: Support metadata other than Name for trained compression Mar 5, 2024
@dfawley
Copy link
Member

dfawley commented Mar 5, 2024

Another option here would be to pass UseCompressor and SetSendCompressor an optional any argument that gets passed to the compressor as an extra parameter to Compress. This would not be a backward compatible change (it could be if we made it a variadic parameter, but that's a bit ugly from an API documentation POV). With this approach, Decompress can't really be parameterized, so the compressor would need to encode whatever the decompressor needs in its header.

If we wanted to pass the metadata around, then we could pass the metadata to both sides (outgoing metadata to the compressor, and incoming metadata to the decompressor). This then has the unfortunate effect of requiring the metadata to be polluted in order to parameterize the compressor.

Instead of passing metadata, we could pass the context directly. This would enable parameterization via the context instead of the metadata. I'm not sure we want to encourage blocking on this path, though, and passing a context might do that.

@ejona86
Copy link
Member

ejona86 commented Mar 5, 2024

For reference, shared dictionary compression examples in HTTP:
https://chromium.googlesource.com/chromium/src/+/53.0.2744.1/net/sdch/README.md (vcdiff, the first example of this)
https://learn.microsoft.com/en-us/deployedge/learnmore-zsdch-compression (zstd)
https://chromestatus.com/feature/5124977788977152 (originally just brotli)

Those last two are related, and link to a shared draft rfc

@coxley
Copy link
Contributor Author

coxley commented Mar 6, 2024

And with interesting timing, this recent one: https://developer.chrome.com/blog/shared-dictionary-compression

@dfawley
Copy link
Member

dfawley commented Mar 6, 2024

From a quick reading, it looks like none of these approaches requires decompression to have access to anything besides the compressed data -- is that correct? So we don't have any known use case that requires passing the incoming metadata to the decompressor?

@coxley
Copy link
Contributor Author

coxley commented Mar 6, 2024

@dfawley I think that could be true. Some cases may need to have multiple decompressor "names" for ID space scoping if that's the direction we go, though. (eg: zstd-groupA, zstd-groupB)

At least in zstd, the dictionary ID is limited to 4 bytes. I'm not sure how other companies "scope" their dictionary registries, but it's at least a consideration. Maybe not end of the world.

@dfawley
Copy link
Member

dfawley commented Mar 22, 2024

My proposed change, then would be:

package grpc

func UseCompressor(name string, compressorOptions ...any) CallOption {}

func SetSendCompressor(ctx context.Context, name string, compressorOptions ...any) error {}
package encoding

type Compressor interface {
	Compress(/*see below*/, ...any) /*see below*/
	...
}

Essentially this simply adds "...any" to all the places where we set and invoke the compressor.

Regarding the Compressor interface, we are planning some changes to the encoding package to support scatter/gather and memory re-use. Those will be covered in #6619 and I would propose incorporating these changes into the work there.

How does all of that sound?

@coxley
Copy link
Contributor Author

coxley commented Apr 4, 2024

@dfawley Sorry for the late reply — my Github notifications organization is abysmal.

I think that sounds good as long as it does for you!

@purnesh42H purnesh42H added the Area: RPC Features Includes Compression, Encoding, Attributes/Metadata, Interceptors. label Sep 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area: RPC Features Includes Compression, Encoding, Attributes/Metadata, Interceptors. P2 Type: Feature New features or improvements in behavior
Projects
None yet
Development

No branches or pull requests

5 participants