首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >bigquery时间戳字段的Protobuf类型

bigquery时间戳字段的Protobuf类型
EN

Stack Overflow用户
提问于 2021-12-26 21:15:53
回答 1查看 984关注 0票数 2

我正在使用新的存储API将数据从Golang流到bigquery。bigquery表的架构包含一个时间戳字段,如下所示:

代码语言:javascript
复制
bq mk -t mydataset.mytable name:string,lastseen:timestamp

另外,我定义了这样一个协议缓冲区:

代码语言:javascript
复制
message Row {
    string Name = 1;
    google.protobuf.Timestamp LastSeen = 3;
}

但是,当我将这些数据提交给BigQuery时,会出现以下错误:

代码语言:javascript
复制
rpc error: code = InvalidArgument desc = The proto field mismatched with BigQuery field at tutorial_Row.LastSeen, the proto field type message, BigQuery field type TIMESTAMP

google.protobuf.Timestamp原型似乎与bigquery中的时间戳类型不相对应。这是有意义的,因为bigquery文档说时间戳包含时区,但是google.protobuf.Timestamp不包含时区。但是我应该使用哪个协议缓冲区呢?

我使用的代码来自这个储存库,如下所示:

代码语言:javascript
复制
import (
    "context"
    "fmt"
    "log"

    storage "cloud.google.com/go/bigquery/storage/apiv1beta2"
    "cloud.google.com/go/bigquery/storage/managedwriter/adapt"
    storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
    "google.golang.org/protobuf/proto"
    timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)

const (
    project = "myproject"
    dataset = "mydataset"
    table   = "mytable2"
)

func main() {
    ctx := context.Background()

    // the data we will stream to bigquery
    var rows = []*Row{
        {Name: "John Doe", Age: 104, LastSeen: timestamppb.Now()},
        {Name: "Jane Doe", Age: 69, LastSeen: timestamppb.Now()},
        {Name: "Adam Smith", Age: 33, LastSeen: timestamppb.Now()},
    }

    // create the bigquery client
    client, err := storage.NewBigQueryWriteClient(ctx)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // create the write stream
    // a COMMITTED write stream inserts data immediately into bigquery
    resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
        Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
        WriteStream: &storagepb.WriteStream{
            Type: storagepb.WriteStream_COMMITTED,
        },
    })
    if err != nil {
        log.Fatal("CreateWriteStream: ", err)
    }

    // get the stream by calling AppendRows
    stream, err := client.AppendRows(ctx)
    if err != nil {
        log.Fatal("AppendRows: ", err)
    }

    // get the protobuf descriptor for our row type
    var row Row
    descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
    if err != nil {
        log.Fatal("NormalizeDescriptor: ", err)
    }

    // serialize the rows
    var opts proto.MarshalOptions
    var data [][]byte
    for _, row := range rows {
        buf, err := opts.Marshal(row)
        if err != nil {
            log.Fatal("protobuf.Marshal: ", err)
        }
        data = append(data, buf)
    }

    // send the rows to bigquery
    err = stream.Send(&storagepb.AppendRowsRequest{
        WriteStream: resp.Name,
        Rows: &storagepb.AppendRowsRequest_ProtoRows{
            ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
                // protocol buffer schema
                WriterSchema: &storagepb.ProtoSchema{
                    ProtoDescriptor: descriptor,
                },
                // protocol buffer data
                Rows: &storagepb.ProtoRows{
                    SerializedRows: data, // serialized protocol buffer data
                },
            },
        },
    })
    if err != nil {
        log.Fatal("AppendRows.Send: ", err)
    }

    // get the response, which will tell us whether it worked
    _, err = stream.Recv()
    if err != nil {
        log.Fatal("AppendRows.Recv: ", err)
    }

    log.Println("done")
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-12-27 01:58:36

是的,后端不能正确解码原时间戳消息。

最快的解析答案:发送int64类型,填充为划时代的微秒。

转换

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70489919

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档