chore: make deps

This commit is contained in:
2025-11-11 14:18:57 +01:00
parent db7c4042d0
commit 45af67d22d
590 changed files with 22837 additions and 16387 deletions

View File

@ -27,6 +27,7 @@ import (
// - If the resulting value is zero or out of range, use a default.
type http2Config struct {
MaxConcurrentStreams uint32
StrictMaxConcurrentRequests bool
MaxDecoderHeaderTableSize uint32
MaxEncoderHeaderTableSize uint32
MaxReadFrameSize uint32
@ -64,12 +65,13 @@ func configFromServer(h1 *http.Server, h2 *Server) http2Config {
// (the net/http Transport).
func configFromTransport(h2 *Transport) http2Config {
conf := http2Config{
MaxEncoderHeaderTableSize: h2.MaxEncoderHeaderTableSize,
MaxDecoderHeaderTableSize: h2.MaxDecoderHeaderTableSize,
MaxReadFrameSize: h2.MaxReadFrameSize,
SendPingTimeout: h2.ReadIdleTimeout,
PingTimeout: h2.PingTimeout,
WriteByteTimeout: h2.WriteByteTimeout,
StrictMaxConcurrentRequests: h2.StrictMaxConcurrentStreams,
MaxEncoderHeaderTableSize: h2.MaxEncoderHeaderTableSize,
MaxDecoderHeaderTableSize: h2.MaxDecoderHeaderTableSize,
MaxReadFrameSize: h2.MaxReadFrameSize,
SendPingTimeout: h2.ReadIdleTimeout,
PingTimeout: h2.PingTimeout,
WriteByteTimeout: h2.WriteByteTimeout,
}
// Unlike most config fields, where out-of-range values revert to the default,
@ -128,6 +130,9 @@ func fillNetHTTPConfig(conf *http2Config, h2 *http.HTTP2Config) {
if h2.MaxConcurrentStreams != 0 {
conf.MaxConcurrentStreams = uint32(h2.MaxConcurrentStreams)
}
if http2ConfigStrictMaxConcurrentRequests(h2) {
conf.StrictMaxConcurrentRequests = true
}
if h2.MaxEncoderHeaderTableSize != 0 {
conf.MaxEncoderHeaderTableSize = uint32(h2.MaxEncoderHeaderTableSize)
}

15
vendor/golang.org/x/net/http2/config_go125.go generated vendored Normal file
View File

@ -0,0 +1,15 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build !go1.26
package http2
import (
"net/http"
)
func http2ConfigStrictMaxConcurrentRequests(h2 *http.HTTP2Config) bool {
return false
}

15
vendor/golang.org/x/net/http2/config_go126.go generated vendored Normal file
View File

@ -0,0 +1,15 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.26
package http2
import (
"net/http"
)
func http2ConfigStrictMaxConcurrentRequests(h2 *http.HTTP2Config) bool {
return h2.StrictMaxConcurrentRequests
}

View File

@ -347,7 +347,7 @@ func (fr *Framer) maxHeaderListSize() uint32 {
func (f *Framer) startWrite(ftype FrameType, flags Flags, streamID uint32) {
// Write the FrameHeader.
f.wbuf = append(f.wbuf[:0],
0, // 3 bytes of length, filled in in endWrite
0, // 3 bytes of length, filled in endWrite
0,
0,
byte(ftype),
@ -1152,6 +1152,15 @@ type PriorityFrame struct {
PriorityParam
}
var defaultRFC9218Priority = PriorityParam{
incremental: 0,
urgency: 3,
}
// Note that HTTP/2 has had two different prioritization schemes, and
// PriorityParam struct below is a superset of both schemes. The exported
// symbols are from RFC 7540 and the non-exported ones are from RFC 9218.
// PriorityParam are the stream prioritzation parameters.
type PriorityParam struct {
// StreamDep is a 31-bit stream identifier for the
@ -1167,6 +1176,20 @@ type PriorityParam struct {
// the spec, "Add one to the value to obtain a weight between
// 1 and 256."
Weight uint8
// "The urgency (u) parameter value is Integer (see Section 3.3.1 of
// [STRUCTURED-FIELDS]), between 0 and 7 inclusive, in descending order of
// priority. The default is 3."
urgency uint8
// "The incremental (i) parameter value is Boolean (see Section 3.3.6 of
// [STRUCTURED-FIELDS]). It indicates if an HTTP response can be processed
// incrementally, i.e., provide some meaningful output as chunks of the
// response arrive."
//
// We use uint8 (i.e. 0 is false, 1 is true) instead of bool so we can
// avoid unnecessary type conversions and because either type takes 1 byte.
incremental uint8
}
func (p PriorityParam) IsZero() bool {

View File

@ -34,7 +34,6 @@ var (
VerboseLogs bool
logFrameWrites bool
logFrameReads bool
inTests bool
// Enabling extended CONNECT by causes browsers to attempt to use
// WebSockets-over-HTTP/2. This results in problems when the server's websocket

View File

@ -181,6 +181,10 @@ type Server struct {
type serverInternalState struct {
mu sync.Mutex
activeConns map[*serverConn]struct{}
// Pool of error channels. This is per-Server rather than global
// because channels can't be reused across synctest bubbles.
errChanPool sync.Pool
}
func (s *serverInternalState) registerConn(sc *serverConn) {
@ -212,6 +216,27 @@ func (s *serverInternalState) startGracefulShutdown() {
s.mu.Unlock()
}
// Global error channel pool used for uninitialized Servers.
// We use a per-Server pool when possible to avoid using channels across synctest bubbles.
var errChanPool = sync.Pool{
New: func() any { return make(chan error, 1) },
}
func (s *serverInternalState) getErrChan() chan error {
if s == nil {
return errChanPool.Get().(chan error) // Server used without calling ConfigureServer
}
return s.errChanPool.Get().(chan error)
}
func (s *serverInternalState) putErrChan(ch chan error) {
if s == nil {
errChanPool.Put(ch) // Server used without calling ConfigureServer
return
}
s.errChanPool.Put(ch)
}
// ConfigureServer adds HTTP/2 support to a net/http Server.
//
// The configuration conf may be nil.
@ -224,7 +249,10 @@ func ConfigureServer(s *http.Server, conf *Server) error {
if conf == nil {
conf = new(Server)
}
conf.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})}
conf.state = &serverInternalState{
activeConns: make(map[*serverConn]struct{}),
errChanPool: sync.Pool{New: func() any { return make(chan error, 1) }},
}
if h1, h2 := s, conf; h2.IdleTimeout == 0 {
if h1.IdleTimeout != 0 {
h2.IdleTimeout = h1.IdleTimeout
@ -1124,25 +1152,6 @@ func (sc *serverConn) readPreface() error {
}
}
var errChanPool = sync.Pool{
New: func() interface{} { return make(chan error, 1) },
}
func getErrChan() chan error {
if inTests {
// Channels cannot be reused across synctest tests.
return make(chan error, 1)
} else {
return errChanPool.Get().(chan error)
}
}
func putErrChan(ch chan error) {
if !inTests {
errChanPool.Put(ch)
}
}
var writeDataPool = sync.Pool{
New: func() interface{} { return new(writeData) },
}
@ -1150,7 +1159,7 @@ var writeDataPool = sync.Pool{
// writeDataFromHandler writes DATA response frames from a handler on
// the given stream.
func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error {
ch := getErrChan()
ch := sc.srv.state.getErrChan()
writeArg := writeDataPool.Get().(*writeData)
*writeArg = writeData{stream.id, data, endStream}
err := sc.writeFrameFromHandler(FrameWriteRequest{
@ -1182,7 +1191,7 @@ func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStrea
return errStreamClosed
}
}
putErrChan(ch)
sc.srv.state.putErrChan(ch)
if frameWriteDone {
writeDataPool.Put(writeArg)
}
@ -2436,7 +2445,7 @@ func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) erro
// waiting for this frame to be written, so an http.Flush mid-handler
// writes out the correct value of keys, before a handler later potentially
// mutates it.
errc = getErrChan()
errc = sc.srv.state.getErrChan()
}
if err := sc.writeFrameFromHandler(FrameWriteRequest{
write: headerData,
@ -2448,7 +2457,7 @@ func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) erro
if errc != nil {
select {
case err := <-errc:
putErrChan(errc)
sc.srv.state.putErrChan(errc)
return err
case <-sc.doneServing:
return errClientDisconnected
@ -3129,7 +3138,7 @@ func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
method: opts.Method,
url: u,
header: cloneHeader(opts.Header),
done: getErrChan(),
done: sc.srv.state.getErrChan(),
}
select {
@ -3146,7 +3155,7 @@ func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
case <-st.cw:
return errStreamClosed
case err := <-msg.done:
putErrChan(msg.done)
sc.srv.state.putErrChan(msg.done)
return err
}
}

View File

@ -355,6 +355,7 @@ type ClientConn struct {
readIdleTimeout time.Duration
pingTimeout time.Duration
extendedConnectAllowed bool
strictMaxConcurrentStreams bool
// rstStreamPingsBlocked works around an unfortunate gRPC behavior.
// gRPC strictly limits the number of PING frames that it will receive.
@ -784,7 +785,8 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
initialWindowSize: 65535, // spec default
initialStreamRecvWindowSize: conf.MaxUploadBufferPerStream,
maxConcurrentStreams: initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
strictMaxConcurrentStreams: conf.StrictMaxConcurrentRequests,
peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
streams: make(map[uint32]*clientStream),
singleUse: singleUse,
seenSettingsChan: make(chan struct{}),
@ -1018,7 +1020,7 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
return
}
var maxConcurrentOkay bool
if cc.t.StrictMaxConcurrentStreams {
if cc.strictMaxConcurrentStreams {
// We'll tell the caller we can take a new request to
// prevent the caller from dialing a new TCP
// connection, but then we'll block later before

View File

@ -42,6 +42,8 @@ type OpenStreamOptions struct {
// PusherID is zero if the stream was initiated by the client. Otherwise,
// PusherID names the stream that pushed the newly opened stream.
PusherID uint32
// priority is used to set the priority of the newly opened stream.
priority PriorityParam
}
// FrameWriteRequest is a request to write a frame.

View File

@ -11,7 +11,7 @@ import (
)
// RFC 7540, Section 5.3.5: the default weight is 16.
const priorityDefaultWeight = 15 // 16 = 15 + 1
const priorityDefaultWeightRFC7540 = 15 // 16 = 15 + 1
// PriorityWriteSchedulerConfig configures a priorityWriteScheduler.
type PriorityWriteSchedulerConfig struct {
@ -66,8 +66,8 @@ func NewPriorityWriteScheduler(cfg *PriorityWriteSchedulerConfig) WriteScheduler
}
}
ws := &priorityWriteScheduler{
nodes: make(map[uint32]*priorityNode),
ws := &priorityWriteSchedulerRFC7540{
nodes: make(map[uint32]*priorityNodeRFC7540),
maxClosedNodesInTree: cfg.MaxClosedNodesInTree,
maxIdleNodesInTree: cfg.MaxIdleNodesInTree,
enableWriteThrottle: cfg.ThrottleOutOfOrderWrites,
@ -81,32 +81,32 @@ func NewPriorityWriteScheduler(cfg *PriorityWriteSchedulerConfig) WriteScheduler
return ws
}
type priorityNodeState int
type priorityNodeStateRFC7540 int
const (
priorityNodeOpen priorityNodeState = iota
priorityNodeClosed
priorityNodeIdle
priorityNodeOpenRFC7540 priorityNodeStateRFC7540 = iota
priorityNodeClosedRFC7540
priorityNodeIdleRFC7540
)
// priorityNode is a node in an HTTP/2 priority tree.
// priorityNodeRFC7540 is a node in an HTTP/2 priority tree.
// Each node is associated with a single stream ID.
// See RFC 7540, Section 5.3.
type priorityNode struct {
q writeQueue // queue of pending frames to write
id uint32 // id of the stream, or 0 for the root of the tree
weight uint8 // the actual weight is weight+1, so the value is in [1,256]
state priorityNodeState // open | closed | idle
bytes int64 // number of bytes written by this node, or 0 if closed
subtreeBytes int64 // sum(node.bytes) of all nodes in this subtree
type priorityNodeRFC7540 struct {
q writeQueue // queue of pending frames to write
id uint32 // id of the stream, or 0 for the root of the tree
weight uint8 // the actual weight is weight+1, so the value is in [1,256]
state priorityNodeStateRFC7540 // open | closed | idle
bytes int64 // number of bytes written by this node, or 0 if closed
subtreeBytes int64 // sum(node.bytes) of all nodes in this subtree
// These links form the priority tree.
parent *priorityNode
kids *priorityNode // start of the kids list
prev, next *priorityNode // doubly-linked list of siblings
parent *priorityNodeRFC7540
kids *priorityNodeRFC7540 // start of the kids list
prev, next *priorityNodeRFC7540 // doubly-linked list of siblings
}
func (n *priorityNode) setParent(parent *priorityNode) {
func (n *priorityNodeRFC7540) setParent(parent *priorityNodeRFC7540) {
if n == parent {
panic("setParent to self")
}
@ -141,7 +141,7 @@ func (n *priorityNode) setParent(parent *priorityNode) {
}
}
func (n *priorityNode) addBytes(b int64) {
func (n *priorityNodeRFC7540) addBytes(b int64) {
n.bytes += b
for ; n != nil; n = n.parent {
n.subtreeBytes += b
@ -154,7 +154,7 @@ func (n *priorityNode) addBytes(b int64) {
//
// f(n, openParent) takes two arguments: the node to visit, n, and a bool that is true
// if any ancestor p of n is still open (ignoring the root node).
func (n *priorityNode) walkReadyInOrder(openParent bool, tmp *[]*priorityNode, f func(*priorityNode, bool) bool) bool {
func (n *priorityNodeRFC7540) walkReadyInOrder(openParent bool, tmp *[]*priorityNodeRFC7540, f func(*priorityNodeRFC7540, bool) bool) bool {
if !n.q.empty() && f(n, openParent) {
return true
}
@ -165,7 +165,7 @@ func (n *priorityNode) walkReadyInOrder(openParent bool, tmp *[]*priorityNode, f
// Don't consider the root "open" when updating openParent since
// we can't send data frames on the root stream (only control frames).
if n.id != 0 {
openParent = openParent || (n.state == priorityNodeOpen)
openParent = openParent || (n.state == priorityNodeOpenRFC7540)
}
// Common case: only one kid or all kids have the same weight.
@ -195,7 +195,7 @@ func (n *priorityNode) walkReadyInOrder(openParent bool, tmp *[]*priorityNode, f
*tmp = append(*tmp, n.kids)
n.kids.setParent(nil)
}
sort.Sort(sortPriorityNodeSiblings(*tmp))
sort.Sort(sortPriorityNodeSiblingsRFC7540(*tmp))
for i := len(*tmp) - 1; i >= 0; i-- {
(*tmp)[i].setParent(n) // setParent inserts at the head of n.kids
}
@ -207,11 +207,11 @@ func (n *priorityNode) walkReadyInOrder(openParent bool, tmp *[]*priorityNode, f
return false
}
type sortPriorityNodeSiblings []*priorityNode
type sortPriorityNodeSiblingsRFC7540 []*priorityNodeRFC7540
func (z sortPriorityNodeSiblings) Len() int { return len(z) }
func (z sortPriorityNodeSiblings) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
func (z sortPriorityNodeSiblings) Less(i, k int) bool {
func (z sortPriorityNodeSiblingsRFC7540) Len() int { return len(z) }
func (z sortPriorityNodeSiblingsRFC7540) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
func (z sortPriorityNodeSiblingsRFC7540) Less(i, k int) bool {
// Prefer the subtree that has sent fewer bytes relative to its weight.
// See sections 5.3.2 and 5.3.4.
wi, bi := float64(z[i].weight+1), float64(z[i].subtreeBytes)
@ -225,13 +225,13 @@ func (z sortPriorityNodeSiblings) Less(i, k int) bool {
return bi/bk <= wi/wk
}
type priorityWriteScheduler struct {
type priorityWriteSchedulerRFC7540 struct {
// root is the root of the priority tree, where root.id = 0.
// The root queues control frames that are not associated with any stream.
root priorityNode
root priorityNodeRFC7540
// nodes maps stream ids to priority tree nodes.
nodes map[uint32]*priorityNode
nodes map[uint32]*priorityNodeRFC7540
// maxID is the maximum stream id in nodes.
maxID uint32
@ -239,7 +239,7 @@ type priorityWriteScheduler struct {
// lists of nodes that have been closed or are idle, but are kept in
// the tree for improved prioritization. When the lengths exceed either
// maxClosedNodesInTree or maxIdleNodesInTree, old nodes are discarded.
closedNodes, idleNodes []*priorityNode
closedNodes, idleNodes []*priorityNodeRFC7540
// From the config.
maxClosedNodesInTree int
@ -248,19 +248,19 @@ type priorityWriteScheduler struct {
enableWriteThrottle bool
// tmp is scratch space for priorityNode.walkReadyInOrder to reduce allocations.
tmp []*priorityNode
tmp []*priorityNodeRFC7540
// pool of empty queues for reuse.
queuePool writeQueuePool
}
func (ws *priorityWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
func (ws *priorityWriteSchedulerRFC7540) OpenStream(streamID uint32, options OpenStreamOptions) {
// The stream may be currently idle but cannot be opened or closed.
if curr := ws.nodes[streamID]; curr != nil {
if curr.state != priorityNodeIdle {
if curr.state != priorityNodeIdleRFC7540 {
panic(fmt.Sprintf("stream %d already opened", streamID))
}
curr.state = priorityNodeOpen
curr.state = priorityNodeOpenRFC7540
return
}
@ -272,11 +272,11 @@ func (ws *priorityWriteScheduler) OpenStream(streamID uint32, options OpenStream
if parent == nil {
parent = &ws.root
}
n := &priorityNode{
n := &priorityNodeRFC7540{
q: *ws.queuePool.get(),
id: streamID,
weight: priorityDefaultWeight,
state: priorityNodeOpen,
weight: priorityDefaultWeightRFC7540,
state: priorityNodeOpenRFC7540,
}
n.setParent(parent)
ws.nodes[streamID] = n
@ -285,19 +285,19 @@ func (ws *priorityWriteScheduler) OpenStream(streamID uint32, options OpenStream
}
}
func (ws *priorityWriteScheduler) CloseStream(streamID uint32) {
func (ws *priorityWriteSchedulerRFC7540) CloseStream(streamID uint32) {
if streamID == 0 {
panic("violation of WriteScheduler interface: cannot close stream 0")
}
if ws.nodes[streamID] == nil {
panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID))
}
if ws.nodes[streamID].state != priorityNodeOpen {
if ws.nodes[streamID].state != priorityNodeOpenRFC7540 {
panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID))
}
n := ws.nodes[streamID]
n.state = priorityNodeClosed
n.state = priorityNodeClosedRFC7540
n.addBytes(-n.bytes)
q := n.q
@ -310,7 +310,7 @@ func (ws *priorityWriteScheduler) CloseStream(streamID uint32) {
}
}
func (ws *priorityWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {
func (ws *priorityWriteSchedulerRFC7540) AdjustStream(streamID uint32, priority PriorityParam) {
if streamID == 0 {
panic("adjustPriority on root")
}
@ -324,11 +324,11 @@ func (ws *priorityWriteScheduler) AdjustStream(streamID uint32, priority Priorit
return
}
ws.maxID = streamID
n = &priorityNode{
n = &priorityNodeRFC7540{
q: *ws.queuePool.get(),
id: streamID,
weight: priorityDefaultWeight,
state: priorityNodeIdle,
weight: priorityDefaultWeightRFC7540,
state: priorityNodeIdleRFC7540,
}
n.setParent(&ws.root)
ws.nodes[streamID] = n
@ -340,7 +340,7 @@ func (ws *priorityWriteScheduler) AdjustStream(streamID uint32, priority Priorit
parent := ws.nodes[priority.StreamDep]
if parent == nil {
n.setParent(&ws.root)
n.weight = priorityDefaultWeight
n.weight = priorityDefaultWeightRFC7540
return
}
@ -381,8 +381,8 @@ func (ws *priorityWriteScheduler) AdjustStream(streamID uint32, priority Priorit
n.weight = priority.Weight
}
func (ws *priorityWriteScheduler) Push(wr FrameWriteRequest) {
var n *priorityNode
func (ws *priorityWriteSchedulerRFC7540) Push(wr FrameWriteRequest) {
var n *priorityNodeRFC7540
if wr.isControl() {
n = &ws.root
} else {
@ -401,8 +401,8 @@ func (ws *priorityWriteScheduler) Push(wr FrameWriteRequest) {
n.q.push(wr)
}
func (ws *priorityWriteScheduler) Pop() (wr FrameWriteRequest, ok bool) {
ws.root.walkReadyInOrder(false, &ws.tmp, func(n *priorityNode, openParent bool) bool {
func (ws *priorityWriteSchedulerRFC7540) Pop() (wr FrameWriteRequest, ok bool) {
ws.root.walkReadyInOrder(false, &ws.tmp, func(n *priorityNodeRFC7540, openParent bool) bool {
limit := int32(math.MaxInt32)
if openParent {
limit = ws.writeThrottleLimit
@ -428,7 +428,7 @@ func (ws *priorityWriteScheduler) Pop() (wr FrameWriteRequest, ok bool) {
return wr, ok
}
func (ws *priorityWriteScheduler) addClosedOrIdleNode(list *[]*priorityNode, maxSize int, n *priorityNode) {
func (ws *priorityWriteSchedulerRFC7540) addClosedOrIdleNode(list *[]*priorityNodeRFC7540, maxSize int, n *priorityNodeRFC7540) {
if maxSize == 0 {
return
}
@ -442,7 +442,7 @@ func (ws *priorityWriteScheduler) addClosedOrIdleNode(list *[]*priorityNode, max
*list = append(*list, n)
}
func (ws *priorityWriteScheduler) removeNode(n *priorityNode) {
func (ws *priorityWriteSchedulerRFC7540) removeNode(n *priorityNodeRFC7540) {
for n.kids != nil {
n.kids.setParent(n.parent)
}

View File

@ -0,0 +1,209 @@
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import (
"fmt"
"math"
)
type streamMetadata struct {
location *writeQueue
priority PriorityParam
}
type priorityWriteSchedulerRFC9218 struct {
// control contains control frames (SETTINGS, PING, etc.).
control writeQueue
// heads contain the head of a circular list of streams.
// We put these heads within a nested array that represents urgency and
// incremental, as defined in
// https://www.rfc-editor.org/rfc/rfc9218.html#name-priority-parameters.
// 8 represents u=0 up to u=7, and 2 represents i=false and i=true.
heads [8][2]*writeQueue
// streams contains a mapping between each stream ID and their metadata, so
// we can quickly locate them when needing to, for example, adjust their
// priority.
streams map[uint32]streamMetadata
// queuePool are empty queues for reuse.
queuePool writeQueuePool
// prioritizeIncremental is used to determine whether we should prioritize
// incremental streams or not, when urgency is the same in a given Pop()
// call.
prioritizeIncremental bool
}
func newPriorityWriteSchedulerRFC9128() WriteScheduler {
ws := &priorityWriteSchedulerRFC9218{
streams: make(map[uint32]streamMetadata),
}
return ws
}
func (ws *priorityWriteSchedulerRFC9218) OpenStream(streamID uint32, opt OpenStreamOptions) {
if ws.streams[streamID].location != nil {
panic(fmt.Errorf("stream %d already opened", streamID))
}
q := ws.queuePool.get()
ws.streams[streamID] = streamMetadata{
location: q,
priority: opt.priority,
}
u, i := opt.priority.urgency, opt.priority.incremental
if ws.heads[u][i] == nil {
ws.heads[u][i] = q
q.next = q
q.prev = q
} else {
// Queues are stored in a ring.
// Insert the new stream before ws.head, putting it at the end of the list.
q.prev = ws.heads[u][i].prev
q.next = ws.heads[u][i]
q.prev.next = q
q.next.prev = q
}
}
func (ws *priorityWriteSchedulerRFC9218) CloseStream(streamID uint32) {
metadata := ws.streams[streamID]
q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
if q == nil {
return
}
if q.next == q {
// This was the only open stream.
ws.heads[u][i] = nil
} else {
q.prev.next = q.next
q.next.prev = q.prev
if ws.heads[u][i] == q {
ws.heads[u][i] = q.next
}
}
delete(ws.streams, streamID)
ws.queuePool.put(q)
}
func (ws *priorityWriteSchedulerRFC9218) AdjustStream(streamID uint32, priority PriorityParam) {
metadata := ws.streams[streamID]
q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
if q == nil {
return
}
// Remove stream from current location.
if q.next == q {
// This was the only open stream.
ws.heads[u][i] = nil
} else {
q.prev.next = q.next
q.next.prev = q.prev
if ws.heads[u][i] == q {
ws.heads[u][i] = q.next
}
}
// Insert stream to the new queue.
u, i = priority.urgency, priority.incremental
if ws.heads[u][i] == nil {
ws.heads[u][i] = q
q.next = q
q.prev = q
} else {
// Queues are stored in a ring.
// Insert the new stream before ws.head, putting it at the end of the list.
q.prev = ws.heads[u][i].prev
q.next = ws.heads[u][i]
q.prev.next = q
q.next.prev = q
}
// Update the metadata.
ws.streams[streamID] = streamMetadata{
location: q,
priority: priority,
}
}
func (ws *priorityWriteSchedulerRFC9218) Push(wr FrameWriteRequest) {
if wr.isControl() {
ws.control.push(wr)
return
}
q := ws.streams[wr.StreamID()].location
if q == nil {
// This is a closed stream.
// wr should not be a HEADERS or DATA frame.
// We push the request onto the control queue.
if wr.DataSize() > 0 {
panic("add DATA on non-open stream")
}
ws.control.push(wr)
return
}
q.push(wr)
}
func (ws *priorityWriteSchedulerRFC9218) Pop() (FrameWriteRequest, bool) {
// Control and RST_STREAM frames first.
if !ws.control.empty() {
return ws.control.shift(), true
}
// On the next Pop(), we want to prioritize incremental if we prioritized
// non-incremental request of the same urgency this time. Vice-versa.
// i.e. when there are incremental and non-incremental requests at the same
// priority, we give 50% of our bandwidth to the incremental ones in
// aggregate and 50% to the first non-incremental one (since
// non-incremental streams do not use round-robin writes).
ws.prioritizeIncremental = !ws.prioritizeIncremental
// Always prioritize lowest u (i.e. highest urgency level).
for u := range ws.heads {
for i := range ws.heads[u] {
// When we want to prioritize incremental, we try to pop i=true
// first before i=false when u is the same.
if ws.prioritizeIncremental {
i = (i + 1) % 2
}
q := ws.heads[u][i]
if q == nil {
continue
}
for {
if wr, ok := q.consume(math.MaxInt32); ok {
if i == 1 {
// For incremental streams, we update head to q.next so
// we can round-robin between multiple streams that can
// immediately benefit from partial writes.
ws.heads[u][i] = q.next
} else {
// For non-incremental streams, we try to finish one to
// completion rather than doing round-robin. However,
// we update head here so that if q.consume() is !ok
// (e.g. the stream has no more frame to consume), head
// is updated to the next q that has frames to consume
// on future iterations. This way, we do not prioritize
// writing to unavailable stream on next Pop() calls,
// preventing head-of-line blocking.
ws.heads[u][i] = q
}
return wr, true
}
q = q.next
if q == ws.heads[u][i] {
break
}
}
}
}
return FrameWriteRequest{}, false
}

View File

@ -25,7 +25,7 @@ type roundRobinWriteScheduler struct {
}
// newRoundRobinWriteScheduler constructs a new write scheduler.
// The round robin scheduler priorizes control frames
// The round robin scheduler prioritizes control frames
// like SETTINGS and PING over DATA frames.
// When there are no control frames to send, it performs a round-robin
// selection from the ready streams.

View File

@ -51,7 +51,7 @@ type EncodeHeadersParam struct {
DefaultUserAgent string
}
// EncodeHeadersParam is the result of EncodeHeaders.
// EncodeHeadersResult is the result of EncodeHeaders.
type EncodeHeadersResult struct {
HasBody bool
HasTrailers bool
@ -399,7 +399,7 @@ type ServerRequestResult struct {
// If the request should be rejected, this is a short string suitable for passing
// to the http2 package's CountError function.
// It might be a bit odd to return errors this way rather than returing an error,
// It might be a bit odd to return errors this way rather than returning an error,
// but this ensures we don't forget to include a CountError reason.
InvalidReason string
}

View File

@ -297,7 +297,7 @@ func (up *UsernamePassword) Authenticate(ctx context.Context, rw io.ReadWriter,
b = append(b, up.Username...)
b = append(b, byte(len(up.Password)))
b = append(b, up.Password...)
// TODO(mikio): handle IO deadlines and cancelation if
// TODO(mikio): handle IO deadlines and cancellation if
// necessary
if _, err := rw.Write(b); err != nil {
return err