forked from toolshed/abra
.gitea
cli
cmd
pkg
scripts
tests
vendor
coopcloud.tech
dario.cat
git.coopcloud.tech
github.com
go.opentelemetry.io
golang.org
x
crypto
exp
net
context
http
http2
hpack
.gitignore
ascii.go
ciphers.go
client_conn_pool.go
config.go
config_go124.go
config_pre_go124.go
databuffer.go
errors.go
flow.go
frame.go
gotrack.go
headermap.go
http2.go
pipe.go
server.go
timer.go
transport.go
unencrypted.go
write.go
writesched.go
writesched_priority.go
writesched_random.go
writesched_roundrobin.go
idna
internal
proxy
trace
LICENSE
PATENTS
sync
sys
term
text
time
google.golang.org
gopkg.in
gotest.tools
modules.txt
.dockerignore
.drone.yml
.envrc.sample
.gitignore
.goreleaser.yml
AUTHORS.md
Dockerfile
LICENSE
Makefile
README.md
go.mod
go.sum
renovate.json
120 lines
2.7 KiB
Go
120 lines
2.7 KiB
Go
// Copyright 2023 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package http2
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
)
|
|
|
|
type roundRobinWriteScheduler struct {
|
|
// control contains control frames (SETTINGS, PING, etc.).
|
|
control writeQueue
|
|
|
|
// streams maps stream ID to a queue.
|
|
streams map[uint32]*writeQueue
|
|
|
|
// stream queues are stored in a circular linked list.
|
|
// head is the next stream to write, or nil if there are no streams open.
|
|
head *writeQueue
|
|
|
|
// pool of empty queues for reuse.
|
|
queuePool writeQueuePool
|
|
}
|
|
|
|
// newRoundRobinWriteScheduler constructs a new write scheduler.
|
|
// The round robin scheduler priorizes control frames
|
|
// like SETTINGS and PING over DATA frames.
|
|
// When there are no control frames to send, it performs a round-robin
|
|
// selection from the ready streams.
|
|
func newRoundRobinWriteScheduler() WriteScheduler {
|
|
ws := &roundRobinWriteScheduler{
|
|
streams: make(map[uint32]*writeQueue),
|
|
}
|
|
return ws
|
|
}
|
|
|
|
func (ws *roundRobinWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
|
|
if ws.streams[streamID] != nil {
|
|
panic(fmt.Errorf("stream %d already opened", streamID))
|
|
}
|
|
q := ws.queuePool.get()
|
|
ws.streams[streamID] = q
|
|
if ws.head == nil {
|
|
ws.head = q
|
|
q.next = q
|
|
q.prev = q
|
|
} else {
|
|
// Queues are stored in a ring.
|
|
// Insert the new stream before ws.head, putting it at the end of the list.
|
|
q.prev = ws.head.prev
|
|
q.next = ws.head
|
|
q.prev.next = q
|
|
q.next.prev = q
|
|
}
|
|
}
|
|
|
|
func (ws *roundRobinWriteScheduler) CloseStream(streamID uint32) {
|
|
q := ws.streams[streamID]
|
|
if q == nil {
|
|
return
|
|
}
|
|
if q.next == q {
|
|
// This was the only open stream.
|
|
ws.head = nil
|
|
} else {
|
|
q.prev.next = q.next
|
|
q.next.prev = q.prev
|
|
if ws.head == q {
|
|
ws.head = q.next
|
|
}
|
|
}
|
|
delete(ws.streams, streamID)
|
|
ws.queuePool.put(q)
|
|
}
|
|
|
|
func (ws *roundRobinWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {}
|
|
|
|
func (ws *roundRobinWriteScheduler) Push(wr FrameWriteRequest) {
|
|
if wr.isControl() {
|
|
ws.control.push(wr)
|
|
return
|
|
}
|
|
q := ws.streams[wr.StreamID()]
|
|
if q == nil {
|
|
// This is a closed stream.
|
|
// wr should not be a HEADERS or DATA frame.
|
|
// We push the request onto the control queue.
|
|
if wr.DataSize() > 0 {
|
|
panic("add DATA on non-open stream")
|
|
}
|
|
ws.control.push(wr)
|
|
return
|
|
}
|
|
q.push(wr)
|
|
}
|
|
|
|
func (ws *roundRobinWriteScheduler) Pop() (FrameWriteRequest, bool) {
|
|
// Control and RST_STREAM frames first.
|
|
if !ws.control.empty() {
|
|
return ws.control.shift(), true
|
|
}
|
|
if ws.head == nil {
|
|
return FrameWriteRequest{}, false
|
|
}
|
|
q := ws.head
|
|
for {
|
|
if wr, ok := q.consume(math.MaxInt32); ok {
|
|
ws.head = q.next
|
|
return wr, true
|
|
}
|
|
q = q.next
|
|
if q == ws.head {
|
|
break
|
|
}
|
|
}
|
|
return FrameWriteRequest{}, false
|
|
}
|