Vald Upsert APIs

Overview

Upsert Service is responsible for updating existing vectors in the vald-agent or inserting new vectors into the vald-agent if the vector is not exists.

service Upsert {

  rpc Upsert(payload.v1.Upsert.Request)
      returns (payload.v1.Object.Location) {}

  rpc StreamUpsert(stream payload.v1.Upsert.Request)
      returns (stream payload.v1.Object.StreamLocation) {}

  rpc MultiUpsert(payload.v1.Upsert.MultiRequest)
      returns (payload.v1.Object.Locations) {}
}

Upsert RPC

Upsert RPC is the method to update a single vector and add a new single vector.

Input

  • the scheme of payload.v1.Upsert.Request

    message Upsert {
      message Request {
        Object.Vector vector = 1 [ (validate.rules).repeated .min_items = 2 ];
        Config config = 2;
      }
    
      message Config {
        bool skip_strict_exist_check = 1;
        Filter.Config filters = 2;
        int64 timestamp = 3;
      }
    }
    
    message Object {
        message Vector {
            string id = 1 [ (validate.rules).string.min_len = 1 ];
            repeated float vector = 2 [ (validate.rules).repeated .min_items = 2 ];
        }
    }
    
    • Upsert.Request

      fieldtypelabelrequireddesc.
      vectorObject.Vector*the information of vector
      configConfig*the configuration of the upsert request
    • Upsert.Config

      fieldtypelabelrequireddesc.
      skip_strict_exist_checkboolcheck the same vector is already inserted or not.
      the ID should be unique if the value is true.
      timestampint64the timestamp of the vector updated/inserted.
      if it is N/A, the current time will be used.
      filtersFilter.Configconfiguration for filter
    • Object.Vector

      fieldtypelabelrequireddesc.
      idstring*the ID of a vector. ID should consist of 1 or more characters.
      vectorfloatrepeated(Array[float])*the vector data. its dimension is between 2 and 65,536.

Output

  • the scheme of payload.v1.Object.Location

    message Object {
        message Location {
          string name = 1;
          string uuid = 2;
          repeated string ips = 3;
        }
    }
    
    • Object.Location
      fieldtypelabeldesc.
      namestringthe name of vald agent pod where the request vector is updated/inserted.
      uuidstringthe ID of the updated/inserted vector. It is the same as an Object.Vector.
      ipsstringrepeated(Array[string])the IP list of vald-agent pods where the request vector is updated/inserted.

Status Code

codedesc.
0OK
3INVALID_ARGUMENT
6ALREADY_EXISTS
13INTERNAL

StreamUpsert RPC

StreamUpsert RPC is the method to update multiple exist vectors or add new multiple vectors using the bidirectional streaming RPC.
By using the bidirectional streaming RPC, the upsert request can be communicated in any order between client and server. Each Upsert request and response are independent. It’s the recommended method to upsert the large amount of vectors.

Input

  • the scheme of payload.v1.Upsert.Request stream

    message Upsert {
        message Request {
            Object.Vector vector = 1 [ (validate.rules).repeated .min_items = 2 ];
            Config config = 2;
        }
        message Config {
            bool skip_strict_exist_check = 1;
            Filter.Config filters = 2;
            int64 timestamp = 3;
        }
    }
    
    message Object {
        message Vector {
            string id = 1 [ (validate.rules).string.min_len = 1 ];
            repeated float vector = 2 [ (validate.rules).repeated .min_items = 2 ];
        }
    }
    
    • Upsert.Request

      fieldtypelabelrequireddesc.
      vectorObject.Vector*the information of vector
      configConfig*the configuration of the upsert request
    • Upsert.Config

      fieldtypelabelrequireddesc.
      skip_strict_exist_checkboolcheck the same vector is already inserted or not.
      the ID should be unique if the value is true.
      timestampint64the timestamp of the vector updated/inserted.
      if it is N/A, the current time will be used.
      filtersFilter.Configconfiguration for filter
    • Object.Vector

      fieldtypelabelrequireddesc.
      idstring*the ID of the vector. ID should consist of 1 or more characters.
      vectorfloatrepeated(Array[float])*the vector data. its dimension is between 2 and 65,536.

Output

  • the scheme of payload.v1.Object.StreamLocation

    message Object {
        message StreamLocation {
          oneof payload {
              Location location = 1;
              google.rpc.Status status = 2;
          }
        }
    
        message Location {
          string name = 1;
          string uuid = 2;
          repeated string ips = 3;
        }
    }
    
    • Object.StreamLocation

      fieldtypelabeldesc.
      locationObject.Locationthe information of Object.Location data.
      statusgoogle.rpc.Statusthe status of google RPC.
    • Object.Location

      fieldtypelabeldesc.
      namestringthe name of vald agent pod where the request vector is updated/inserted.
      uuidstringthe ID of the updated/inserted vector. It is the same as an Object.Vector.
      ipsstringrepeated(Array[string])the IP list of vald-agent pods where the request vector is updated/inserted.
    • google.rpc.Status

      fieldtypelabeldesc.
      codeint32status code (code list is next section)
      messagestringerror message
      detailsgoogle.protobuf.Anyrepeated(Array[any])the details error message list

Status Code

codedesc.
0OK
3INVALID_ARGUMENT
6ALREADY_EXISTS
13INTERNAL

MultiUpsert RPC

MultiUpsert is the method to update existing multiple vectors and add new multiple vectors in 1 request.

gRPC has the message size limitation.
Please be careful that the size of the request exceed the limit.

Input

  • the scheme of payload.v1.Upsert.MultiRequest

    message Upsert {
        message MultiRequest { repeated Request requests = 1; }
    
        message Request {
            Object.Vector vector = 1 [ (validate.rules).repeated .min_items = 2 ];
            Config config = 2;
        }
    
        message Config {
            bool skip_strict_exist_check = 1;
            Filter.Config filters = 2;
            int64 timestamp = 3;
        }
    }
    
    message Object {
        message Vector {
            string id = 1 [ (validate.rules).string.min_len = 1 ];
            repeated float vector = 2 [ (validate.rules).repeated .min_items = 2 ];
        }
    }
    
    • Upsert.MultiRequest

      fieldtypelabelrequireddesc.
      requestsUpsert.Requestrepeated(Array[Insert.Request])*the request list
    • Upsert.Request

      fieldtypelabelrequireddesc.
      vectorObject.Vector*the information of vector
      configConfig*the configuration of the upsert request
    • Upsert.Config

      fieldtypelabelrequireddesc.
      skip_strict_exist_checkboolcheck the same vector is already updated/inserted or not.
      the ID should be unique if the value is true.
      timestampint64the timestamp of the vector updated/inserted.
      if it is N/A, the current time will be used.
      filtersFilter.Configconfiguration for filter
    • Object.Vector

      fieldtypelabelrequireddesc.
      idstring*the ID of a vector. ID should consist of 1 or more characters.
      vectorfloatrepeated(Array[float])*the vector data. its dimension is between 2 and 65,536.

Output

  • the scheme of payload.v1.Object.Locations.

    message Object {
        message Locations { repeated Location locations = 1; }
    
        message Location {
          string name = 1;
          string uuid = 2;
          repeated string ips = 3;
        }
    }
    
    • Object.Locations

      fieldtypelabeldesc.
      locationObject.Locationrepeated(Array[Object.Location])the list of Object.Location
    • Object.Location

      fieldtypelabeldesc.
      namestringthe name of vald agent pod where the request vector is updated/inserted.
      uuidstringthe ID of the updated/inserted vector. It is the same as an Object.Vector.
      ipsstringrepeated(Array[string])the IP list of vald-agent pods where the request vector is updated/inserted.

Status Code

codedesc.
0OK
3INVALID_ARGUMENT
6ALREADY_EXISTS
13INTERNAL