This commit is contained in:
2
vendor/google.golang.org/grpc/internal/transport/client_stream.go
generated
vendored
2
vendor/google.golang.org/grpc/internal/transport/client_stream.go
generated
vendored
@ -59,7 +59,7 @@ func (s *ClientStream) Read(n int) (mem.BufferSlice, error) {
|
||||
return b, err
|
||||
}
|
||||
|
||||
// Close closes the stream and popagates err to any readers.
|
||||
// Close closes the stream and propagates err to any readers.
|
||||
func (s *ClientStream) Close(err error) {
|
||||
var (
|
||||
rst bool
|
||||
|
68
vendor/google.golang.org/grpc/internal/transport/controlbuf.go
generated
vendored
68
vendor/google.golang.org/grpc/internal/transport/controlbuf.go
generated
vendored
@ -40,6 +40,13 @@ var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
|
||||
e.SetMaxDynamicTableSizeLimit(v)
|
||||
}
|
||||
|
||||
// itemNodePool is used to reduce heap allocations.
|
||||
var itemNodePool = sync.Pool{
|
||||
New: func() any {
|
||||
return &itemNode{}
|
||||
},
|
||||
}
|
||||
|
||||
type itemNode struct {
|
||||
it any
|
||||
next *itemNode
|
||||
@ -51,7 +58,9 @@ type itemList struct {
|
||||
}
|
||||
|
||||
func (il *itemList) enqueue(i any) {
|
||||
n := &itemNode{it: i}
|
||||
n := itemNodePool.Get().(*itemNode)
|
||||
n.next = nil
|
||||
n.it = i
|
||||
if il.tail == nil {
|
||||
il.head, il.tail = n, n
|
||||
return
|
||||
@ -71,7 +80,9 @@ func (il *itemList) dequeue() any {
|
||||
return nil
|
||||
}
|
||||
i := il.head.it
|
||||
temp := il.head
|
||||
il.head = il.head.next
|
||||
itemNodePool.Put(temp)
|
||||
if il.head == nil {
|
||||
il.tail = nil
|
||||
}
|
||||
@ -146,10 +157,11 @@ type earlyAbortStream struct {
|
||||
func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
|
||||
|
||||
type dataFrame struct {
|
||||
streamID uint32
|
||||
endStream bool
|
||||
h []byte
|
||||
reader mem.Reader
|
||||
streamID uint32
|
||||
endStream bool
|
||||
h []byte
|
||||
data mem.BufferSlice
|
||||
processing bool
|
||||
// onEachWrite is called every time
|
||||
// a part of data is written out.
|
||||
onEachWrite func()
|
||||
@ -234,6 +246,7 @@ type outStream struct {
|
||||
itl *itemList
|
||||
bytesOutStanding int
|
||||
wq *writeQuota
|
||||
reader mem.Reader
|
||||
|
||||
next *outStream
|
||||
prev *outStream
|
||||
@ -461,7 +474,9 @@ func (c *controlBuffer) finish() {
|
||||
v.onOrphaned(ErrConnClosing)
|
||||
}
|
||||
case *dataFrame:
|
||||
_ = v.reader.Close()
|
||||
if !v.processing {
|
||||
v.data.Free()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -650,10 +665,11 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
|
||||
|
||||
func (l *loopyWriter) registerStreamHandler(h *registerStream) {
|
||||
str := &outStream{
|
||||
id: h.streamID,
|
||||
state: empty,
|
||||
itl: &itemList{},
|
||||
wq: h.wq,
|
||||
id: h.streamID,
|
||||
state: empty,
|
||||
itl: &itemList{},
|
||||
wq: h.wq,
|
||||
reader: mem.BufferSlice{}.Reader(),
|
||||
}
|
||||
l.estdStreams[h.streamID] = str
|
||||
}
|
||||
@ -685,10 +701,11 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
|
||||
}
|
||||
// Case 2: Client wants to originate stream.
|
||||
str := &outStream{
|
||||
id: h.streamID,
|
||||
state: empty,
|
||||
itl: &itemList{},
|
||||
wq: h.wq,
|
||||
id: h.streamID,
|
||||
state: empty,
|
||||
itl: &itemList{},
|
||||
wq: h.wq,
|
||||
reader: mem.BufferSlice{}.Reader(),
|
||||
}
|
||||
return l.originateStream(str, h)
|
||||
}
|
||||
@ -790,10 +807,13 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
|
||||
// a RST_STREAM before stream initialization thus the stream might
|
||||
// not be established yet.
|
||||
delete(l.estdStreams, c.streamID)
|
||||
str.reader.Close()
|
||||
str.deleteSelf()
|
||||
for head := str.itl.dequeueAll(); head != nil; head = head.next {
|
||||
if df, ok := head.it.(*dataFrame); ok {
|
||||
_ = df.reader.Close()
|
||||
if !df.processing {
|
||||
df.data.Free()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -928,7 +948,13 @@ func (l *loopyWriter) processData() (bool, error) {
|
||||
if str == nil {
|
||||
return true, nil
|
||||
}
|
||||
reader := str.reader
|
||||
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
|
||||
if !dataItem.processing {
|
||||
dataItem.processing = true
|
||||
str.reader.Reset(dataItem.data)
|
||||
dataItem.data.Free()
|
||||
}
|
||||
// A data item is represented by a dataFrame, since it later translates into
|
||||
// multiple HTTP2 data frames.
|
||||
// Every dataFrame has two buffers; h that keeps grpc-message header and data
|
||||
@ -936,13 +962,13 @@ func (l *loopyWriter) processData() (bool, error) {
|
||||
// from data is copied to h to make as big as the maximum possible HTTP2 frame
|
||||
// size.
|
||||
|
||||
if len(dataItem.h) == 0 && dataItem.reader.Remaining() == 0 { // Empty data frame
|
||||
if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame
|
||||
// Client sends out empty data frame with endStream = true
|
||||
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
|
||||
return false, err
|
||||
}
|
||||
str.itl.dequeue() // remove the empty data item from stream
|
||||
_ = dataItem.reader.Close()
|
||||
_ = reader.Close()
|
||||
if str.itl.isEmpty() {
|
||||
str.state = empty
|
||||
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
|
||||
@ -971,8 +997,8 @@ func (l *loopyWriter) processData() (bool, error) {
|
||||
}
|
||||
// Compute how much of the header and data we can send within quota and max frame length
|
||||
hSize := min(maxSize, len(dataItem.h))
|
||||
dSize := min(maxSize-hSize, dataItem.reader.Remaining())
|
||||
remainingBytes := len(dataItem.h) + dataItem.reader.Remaining() - hSize - dSize
|
||||
dSize := min(maxSize-hSize, reader.Remaining())
|
||||
remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize
|
||||
size := hSize + dSize
|
||||
|
||||
var buf *[]byte
|
||||
@ -993,7 +1019,7 @@ func (l *loopyWriter) processData() (bool, error) {
|
||||
defer pool.Put(buf)
|
||||
|
||||
copy((*buf)[:hSize], dataItem.h)
|
||||
_, _ = dataItem.reader.Read((*buf)[hSize:])
|
||||
_, _ = reader.Read((*buf)[hSize:])
|
||||
}
|
||||
|
||||
// Now that outgoing flow controls are checked we can replenish str's write quota
|
||||
@ -1014,7 +1040,7 @@ func (l *loopyWriter) processData() (bool, error) {
|
||||
dataItem.h = dataItem.h[hSize:]
|
||||
|
||||
if remainingBytes == 0 { // All the data from that message was written out.
|
||||
_ = dataItem.reader.Close()
|
||||
_ = reader.Close()
|
||||
str.itl.dequeue()
|
||||
}
|
||||
if str.itl.isEmpty() {
|
||||
|
52
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
52
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
@ -176,7 +176,7 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error
|
||||
return fn(ctx, address)
|
||||
}
|
||||
if !ok {
|
||||
networkType, address = parseDialTarget(address)
|
||||
networkType, address = ParseDialTarget(address)
|
||||
}
|
||||
if opts, present := proxyattributes.Get(addr); present {
|
||||
return proxyDial(ctx, addr, grpcUA, opts)
|
||||
@ -309,11 +309,9 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
scheme = "https"
|
||||
}
|
||||
}
|
||||
dynamicWindow := true
|
||||
icwz := int32(initialWindowSize)
|
||||
if opts.InitialConnWindowSize >= defaultWindowSize {
|
||||
icwz = opts.InitialConnWindowSize
|
||||
dynamicWindow = false
|
||||
}
|
||||
writeBufSize := opts.WriteBufferSize
|
||||
readBufSize := opts.ReadBufferSize
|
||||
@ -381,9 +379,8 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
t.controlBuf = newControlBuffer(t.ctxDone)
|
||||
if opts.InitialWindowSize >= defaultWindowSize {
|
||||
t.initialWindowSize = opts.InitialWindowSize
|
||||
dynamicWindow = false
|
||||
}
|
||||
if dynamicWindow {
|
||||
if !opts.StaticWindowSize {
|
||||
t.bdpEst = &bdpEstimator{
|
||||
bdp: initialWindowSize,
|
||||
updateFlowControl: t.updateFlowControl,
|
||||
@ -545,7 +542,7 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
|
||||
Method: callHdr.Method,
|
||||
AuthInfo: t.authInfo,
|
||||
}
|
||||
ctxWithRequestInfo := icredentials.NewRequestInfoContext(ctx, ri)
|
||||
ctxWithRequestInfo := credentials.NewContextWithRequestInfo(ctx, ri)
|
||||
authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -592,6 +589,9 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
|
||||
// Send out timeout regardless its value. The server can detect timeout context by itself.
|
||||
// TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
|
||||
timeout := time.Until(dl)
|
||||
if timeout <= 0 {
|
||||
return nil, status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error())
|
||||
}
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration(timeout)})
|
||||
}
|
||||
for k, v := range authData {
|
||||
@ -749,6 +749,25 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
|
||||
callHdr = &newCallHdr
|
||||
}
|
||||
|
||||
// The authority specified via the `CallAuthority` CallOption takes the
|
||||
// highest precedence when determining the `:authority` header. It overrides
|
||||
// any value present in the Host field of CallHdr. Before applying this
|
||||
// override, the authority string is validated. If the credentials do not
|
||||
// implement the AuthorityValidator interface, or if validation fails, the
|
||||
// RPC is failed with a status code of `UNAVAILABLE`.
|
||||
if callHdr.Authority != "" {
|
||||
auth, ok := t.authInfo.(credentials.AuthorityValidator)
|
||||
if !ok {
|
||||
return nil, &NewStreamError{Err: status.Errorf(codes.Unavailable, "credentials type %q does not implement the AuthorityValidator interface, but authority override specified with CallAuthority call option", t.authInfo.AuthType())}
|
||||
}
|
||||
if err := auth.ValidateAuthority(callHdr.Authority); err != nil {
|
||||
return nil, &NewStreamError{Err: status.Errorf(codes.Unavailable, "failed to validate authority %q : %v", callHdr.Authority, err)}
|
||||
}
|
||||
newCallHdr := *callHdr
|
||||
newCallHdr.Host = callHdr.Authority
|
||||
callHdr = &newCallHdr
|
||||
}
|
||||
|
||||
headerFields, err := t.createHeaderFields(ctx, callHdr)
|
||||
if err != nil {
|
||||
return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
|
||||
@ -1069,32 +1088,29 @@ func (t *http2Client) GracefulClose() {
|
||||
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
|
||||
// should proceed only if Write returns nil.
|
||||
func (t *http2Client) write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
|
||||
reader := data.Reader()
|
||||
|
||||
if opts.Last {
|
||||
// If it's the last message, update stream state.
|
||||
if !s.compareAndSwapState(streamActive, streamWriteDone) {
|
||||
_ = reader.Close()
|
||||
return errStreamDone
|
||||
}
|
||||
} else if s.getState() != streamActive {
|
||||
_ = reader.Close()
|
||||
return errStreamDone
|
||||
}
|
||||
df := &dataFrame{
|
||||
streamID: s.id,
|
||||
endStream: opts.Last,
|
||||
h: hdr,
|
||||
reader: reader,
|
||||
data: data,
|
||||
}
|
||||
if hdr != nil || df.reader.Remaining() != 0 { // If it's not an empty data frame, check quota.
|
||||
if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
|
||||
_ = reader.Close()
|
||||
dataLen := data.Len()
|
||||
if hdr != nil || dataLen != 0 { // If it's not an empty data frame, check quota.
|
||||
if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
data.Ref()
|
||||
if err := t.controlBuf.put(df); err != nil {
|
||||
_ = reader.Close()
|
||||
data.Free()
|
||||
return err
|
||||
}
|
||||
t.incrMsgSent()
|
||||
@ -1242,7 +1258,8 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
|
||||
statusCode = codes.DeadlineExceeded
|
||||
}
|
||||
}
|
||||
t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
|
||||
st := status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode)
|
||||
t.closeStream(s, st.Err(), false, http2.ErrCodeNo, st, nil, false)
|
||||
}
|
||||
|
||||
func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
|
||||
@ -1390,8 +1407,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error {
|
||||
// the caller.
|
||||
func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
|
||||
t.goAwayReason = GoAwayNoReason
|
||||
switch f.ErrCode {
|
||||
case http2.ErrCodeEnhanceYourCalm:
|
||||
if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
|
||||
if string(f.DebugData()) == "too_many_pings" {
|
||||
t.goAwayReason = GoAwayTooManyPings
|
||||
}
|
||||
|
72
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
72
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
@ -35,9 +35,11 @@ import (
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/grpclog"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
"google.golang.org/grpc/internal/pretty"
|
||||
istatus "google.golang.org/grpc/internal/status"
|
||||
"google.golang.org/grpc/internal/syscall"
|
||||
"google.golang.org/grpc/mem"
|
||||
"google.golang.org/protobuf/proto"
|
||||
@ -130,6 +132,10 @@ type http2Server struct {
|
||||
maxStreamID uint32 // max stream ID ever seen
|
||||
|
||||
logger *grpclog.PrefixLogger
|
||||
// setResetPingStrikes is stored as a closure instead of making this a
|
||||
// method on http2Server to avoid a heap allocation when converting a method
|
||||
// to a closure for passing to frames objects.
|
||||
setResetPingStrikes func()
|
||||
}
|
||||
|
||||
// NewServerTransport creates a http2 transport with conn and configuration
|
||||
@ -174,16 +180,13 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
Val: config.MaxStreams,
|
||||
})
|
||||
}
|
||||
dynamicWindow := true
|
||||
iwz := int32(initialWindowSize)
|
||||
if config.InitialWindowSize >= defaultWindowSize {
|
||||
iwz = config.InitialWindowSize
|
||||
dynamicWindow = false
|
||||
}
|
||||
icwz := int32(initialWindowSize)
|
||||
if config.InitialConnWindowSize >= defaultWindowSize {
|
||||
icwz = config.InitialConnWindowSize
|
||||
dynamicWindow = false
|
||||
}
|
||||
if iwz != defaultWindowSize {
|
||||
isettings = append(isettings, http2.Setting{
|
||||
@ -264,6 +267,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
initialWindowSize: iwz,
|
||||
bufferPool: config.BufferPool,
|
||||
}
|
||||
t.setResetPingStrikes = func() {
|
||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||
}
|
||||
var czSecurity credentials.ChannelzSecurityValue
|
||||
if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
|
||||
czSecurity = au.GetSecurityValue()
|
||||
@ -283,7 +289,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
t.logger = prefixLoggerForServerTransport(t)
|
||||
|
||||
t.controlBuf = newControlBuffer(t.done)
|
||||
if dynamicWindow {
|
||||
if !config.StaticWindowSize {
|
||||
t.bdpEst = &bdpEstimator{
|
||||
bdp: initialWindowSize,
|
||||
updateFlowControl: t.updateFlowControl,
|
||||
@ -594,10 +600,41 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if s.ctx.Err() != nil {
|
||||
t.mu.Unlock()
|
||||
// Early abort in case the timeout was zero or so low it already fired.
|
||||
t.controlBuf.put(&earlyAbortStream{
|
||||
httpStatus: http.StatusOK,
|
||||
streamID: s.id,
|
||||
contentSubtype: s.contentSubtype,
|
||||
status: status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()),
|
||||
rst: !frame.StreamEnded(),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
t.activeStreams[streamID] = s
|
||||
if len(t.activeStreams) == 1 {
|
||||
t.idle = time.Time{}
|
||||
}
|
||||
|
||||
// Start a timer to close the stream on reaching the deadline.
|
||||
if timeoutSet {
|
||||
// We need to wait for s.cancel to be updated before calling
|
||||
// t.closeStream to avoid data races.
|
||||
cancelUpdated := make(chan struct{})
|
||||
timer := internal.TimeAfterFunc(timeout, func() {
|
||||
<-cancelUpdated
|
||||
t.closeStream(s, true, http2.ErrCodeCancel, false)
|
||||
})
|
||||
oldCancel := s.cancel
|
||||
s.cancel = func() {
|
||||
oldCancel()
|
||||
timer.Stop()
|
||||
}
|
||||
close(cancelUpdated)
|
||||
}
|
||||
t.mu.Unlock()
|
||||
if channelz.IsOn() {
|
||||
t.channelz.SocketMetrics.StreamsStarted.Add(1)
|
||||
@ -998,10 +1035,6 @@ func (t *http2Server) writeHeader(s *ServerStream, md metadata.MD) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *http2Server) setResetPingStrikes() {
|
||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||
}
|
||||
|
||||
func (t *http2Server) writeHeaderLocked(s *ServerStream) error {
|
||||
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
|
||||
// first and create a slice of that exact size.
|
||||
@ -1038,7 +1071,7 @@ func (t *http2Server) writeHeaderLocked(s *ServerStream) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteStatus sends stream status to the client and terminates the stream.
|
||||
// writeStatus sends stream status to the client and terminates the stream.
|
||||
// There is no further I/O operations being able to perform on this stream.
|
||||
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
|
||||
// OK is adopted.
|
||||
@ -1066,7 +1099,7 @@ func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
|
||||
|
||||
if p := st.Proto(); p != nil && len(p.Details) > 0 {
|
||||
if p := istatus.RawStatusProto(st); len(p.GetDetails()) > 0 {
|
||||
// Do not use the user's grpc-status-details-bin (if present) if we are
|
||||
// even attempting to set our own.
|
||||
delete(s.trailer, grpcStatusDetailsBinHeader)
|
||||
@ -1114,17 +1147,13 @@ func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
|
||||
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
|
||||
// is returns if it fails (e.g., framing error, transport error).
|
||||
func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {
|
||||
reader := data.Reader()
|
||||
|
||||
if !s.isHeaderSent() { // Headers haven't been written yet.
|
||||
if err := t.writeHeader(s, nil); err != nil {
|
||||
_ = reader.Close()
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Writing headers checks for this condition.
|
||||
if s.getState() == streamDone {
|
||||
_ = reader.Close()
|
||||
return t.streamContextErr(s)
|
||||
}
|
||||
}
|
||||
@ -1132,15 +1161,16 @@ func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _
|
||||
df := &dataFrame{
|
||||
streamID: s.id,
|
||||
h: hdr,
|
||||
reader: reader,
|
||||
data: data,
|
||||
onEachWrite: t.setResetPingStrikes,
|
||||
}
|
||||
if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
|
||||
_ = reader.Close()
|
||||
dataLen := data.Len()
|
||||
if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil {
|
||||
return t.streamContextErr(s)
|
||||
}
|
||||
data.Ref()
|
||||
if err := t.controlBuf.put(df); err != nil {
|
||||
_ = reader.Close()
|
||||
data.Free()
|
||||
return err
|
||||
}
|
||||
t.incrMsgSent()
|
||||
@ -1274,7 +1304,6 @@ func (t *http2Server) Close(err error) {
|
||||
|
||||
// deleteStream deletes the stream s from transport's active streams.
|
||||
func (t *http2Server) deleteStream(s *ServerStream, eosReceived bool) {
|
||||
|
||||
t.mu.Lock()
|
||||
if _, ok := t.activeStreams[s.id]; ok {
|
||||
delete(t.activeStreams, s.id)
|
||||
@ -1324,7 +1353,10 @@ func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCo
|
||||
// called to interrupt the potential blocking on other goroutines.
|
||||
s.cancel()
|
||||
|
||||
s.swapState(streamDone)
|
||||
oldState := s.swapState(streamDone)
|
||||
if oldState == streamDone {
|
||||
return
|
||||
}
|
||||
t.deleteStream(s, eosReceived)
|
||||
|
||||
t.controlBuf.put(&cleanupStream{
|
||||
|
8
vendor/google.golang.org/grpc/internal/transport/http_util.go
generated
vendored
8
vendor/google.golang.org/grpc/internal/transport/http_util.go
generated
vendored
@ -196,11 +196,11 @@ func decodeTimeout(s string) (time.Duration, error) {
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("transport: timeout unit is not recognized: %q", s)
|
||||
}
|
||||
t, err := strconv.ParseInt(s[:size-1], 10, 64)
|
||||
t, err := strconv.ParseUint(s[:size-1], 10, 64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
const maxHours = math.MaxInt64 / int64(time.Hour)
|
||||
const maxHours = math.MaxInt64 / uint64(time.Hour)
|
||||
if d == time.Hour && t > maxHours {
|
||||
// This timeout would overflow math.MaxInt64; clamp it.
|
||||
return time.Duration(math.MaxInt64), nil
|
||||
@ -439,8 +439,8 @@ func getWriteBufferPool(size int) *sync.Pool {
|
||||
return pool
|
||||
}
|
||||
|
||||
// parseDialTarget returns the network and address to pass to dialer.
|
||||
func parseDialTarget(target string) (string, string) {
|
||||
// ParseDialTarget returns the network and address to pass to dialer.
|
||||
func ParseDialTarget(target string) (string, string) {
|
||||
net := "tcp"
|
||||
m1 := strings.Index(target, ":")
|
||||
m2 := strings.Index(target, ":/")
|
||||
|
6
vendor/google.golang.org/grpc/internal/transport/server_stream.go
generated
vendored
6
vendor/google.golang.org/grpc/internal/transport/server_stream.go
generated
vendored
@ -35,8 +35,10 @@ type ServerStream struct {
|
||||
*Stream // Embed for common stream functionality.
|
||||
|
||||
st internalServerTransport
|
||||
ctxDone <-chan struct{} // closed at the end of stream. Cache of ctx.Done() (for performance)
|
||||
cancel context.CancelFunc // invoked at the end of stream to cancel ctx.
|
||||
ctxDone <-chan struct{} // closed at the end of stream. Cache of ctx.Done() (for performance)
|
||||
// cancel is invoked at the end of stream to cancel ctx. It also stops the
|
||||
// timer for monitoring the rpc deadline if configured.
|
||||
cancel func()
|
||||
|
||||
// Holds compressor names passed in grpc-accept-encoding metadata from the
|
||||
// client.
|
||||
|
8
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
8
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
@ -466,6 +466,7 @@ type ServerConfig struct {
|
||||
MaxHeaderListSize *uint32
|
||||
HeaderTableSize *uint32
|
||||
BufferPool mem.BufferPool
|
||||
StaticWindowSize bool
|
||||
}
|
||||
|
||||
// ConnectOptions covers all relevant options for communicating with the server.
|
||||
@ -504,6 +505,8 @@ type ConnectOptions struct {
|
||||
MaxHeaderListSize *uint32
|
||||
// The mem.BufferPool to use when reading/writing to the wire.
|
||||
BufferPool mem.BufferPool
|
||||
// StaticWindowSize controls whether dynamic window sizing is enabled.
|
||||
StaticWindowSize bool
|
||||
}
|
||||
|
||||
// WriteOptions provides additional hints and information for message
|
||||
@ -540,6 +543,11 @@ type CallHdr struct {
|
||||
PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
|
||||
|
||||
DoneFunc func() // called when the stream is finished
|
||||
|
||||
// Authority is used to explicitly override the `:authority` header. If set,
|
||||
// this value takes precedence over the Host field and will be used as the
|
||||
// value for the `:authority` header.
|
||||
Authority string
|
||||
}
|
||||
|
||||
// ClientTransport is the common interface for all gRPC client-side transport
|
||||
|
Reference in New Issue
Block a user