diff options
Diffstat (limited to 'libs/libcurl/src/cf-h2-proxy.c')
-rw-r--r-- | libs/libcurl/src/cf-h2-proxy.c | 531 |
1 files changed, 329 insertions, 202 deletions
diff --git a/libs/libcurl/src/cf-h2-proxy.c b/libs/libcurl/src/cf-h2-proxy.c index b504504e89..5248660fc8 100644 --- a/libs/libcurl/src/cf-h2-proxy.c +++ b/libs/libcurl/src/cf-h2-proxy.c @@ -44,26 +44,25 @@ #include "curl_memory.h"
#include "memdebug.h"
-#define H2_NW_CHUNK_SIZE (128*1024)
-#define H2_NW_RECV_CHUNKS 1
-#define H2_NW_SEND_CHUNKS 1
+#define H2_CHUNK_SIZE (16*1024)
-#define HTTP2_HUGE_WINDOW_SIZE (32 * 1024 * 1024) /* 32 MB */
+#define PROXY_HTTP2_HUGE_WINDOW_SIZE (100 * 1024 * 1024)
+#define H2_TUNNEL_WINDOW_SIZE (10 * 1024 * 1024)
+
+#define PROXY_H2_NW_RECV_CHUNKS (H2_TUNNEL_WINDOW_SIZE / H2_CHUNK_SIZE)
+#define PROXY_H2_NW_SEND_CHUNKS 1
+
+#define H2_TUNNEL_RECV_CHUNKS (H2_TUNNEL_WINDOW_SIZE / H2_CHUNK_SIZE)
+#define H2_TUNNEL_SEND_CHUNKS ((128 * 1024) / H2_CHUNK_SIZE)
-#define H2_TUNNEL_WINDOW_SIZE (1024 * 1024)
-#define H2_TUNNEL_CHUNK_SIZE (32 * 1024)
-#define H2_TUNNEL_RECV_CHUNKS \
- (H2_TUNNEL_WINDOW_SIZE / H2_TUNNEL_CHUNK_SIZE)
-#define H2_TUNNEL_SEND_CHUNKS \
- (H2_TUNNEL_WINDOW_SIZE / H2_TUNNEL_CHUNK_SIZE)
typedef enum {
- TUNNEL_INIT, /* init/default/no tunnel state */
- TUNNEL_CONNECT, /* CONNECT request is being send */
- TUNNEL_RESPONSE, /* CONNECT response received completely */
- TUNNEL_ESTABLISHED,
- TUNNEL_FAILED
-} tunnel_state;
+ H2_TUNNEL_INIT, /* init/default/no tunnel state */
+ H2_TUNNEL_CONNECT, /* CONNECT request is being send */
+ H2_TUNNEL_RESPONSE, /* CONNECT response received completely */
+ H2_TUNNEL_ESTABLISHED,
+ H2_TUNNEL_FAILED
+} h2_tunnel_state;
struct tunnel_stream {
struct http_resp *resp;
@@ -72,10 +71,11 @@ struct tunnel_stream { char *authority;
int32_t stream_id;
uint32_t error;
- tunnel_state state;
- bool has_final_response;
- bool closed;
- bool reset;
+ size_t upload_blocked_len;
+ h2_tunnel_state state;
+ BIT(has_final_response);
+ BIT(closed);
+ BIT(reset);
};
static CURLcode tunnel_stream_init(struct Curl_cfilter *cf,
@@ -85,11 +85,11 @@ static CURLcode tunnel_stream_init(struct Curl_cfilter *cf, int port;
bool ipv6_ip = cf->conn->bits.ipv6_ip;
- ts->state = TUNNEL_INIT;
+ ts->state = H2_TUNNEL_INIT;
ts->stream_id = -1;
- Curl_bufq_init2(&ts->recvbuf, H2_TUNNEL_CHUNK_SIZE, H2_TUNNEL_RECV_CHUNKS,
+ Curl_bufq_init2(&ts->recvbuf, H2_CHUNK_SIZE, H2_TUNNEL_RECV_CHUNKS,
BUFQ_OPT_SOFT_LIMIT);
- Curl_bufq_init(&ts->sendbuf, H2_TUNNEL_CHUNK_SIZE, H2_TUNNEL_SEND_CHUNKS);
+ Curl_bufq_init(&ts->sendbuf, H2_CHUNK_SIZE, H2_TUNNEL_SEND_CHUNKS);
if(cf->conn->bits.conn_to_host)
hostname = cf->conn->conn_to_host.name;
@@ -123,13 +123,13 @@ static void tunnel_stream_clear(struct tunnel_stream *ts) Curl_bufq_free(&ts->sendbuf);
Curl_safefree(ts->authority);
memset(ts, 0, sizeof(*ts));
- ts->state = TUNNEL_INIT;
+ ts->state = H2_TUNNEL_INIT;
}
-static void tunnel_go_state(struct Curl_cfilter *cf,
- struct tunnel_stream *ts,
- tunnel_state new_state,
- struct Curl_easy *data)
+static void h2_tunnel_go_state(struct Curl_cfilter *cf,
+ struct tunnel_stream *ts,
+ h2_tunnel_state new_state,
+ struct Curl_easy *data)
{
(void)cf;
@@ -137,7 +137,7 @@ static void tunnel_go_state(struct Curl_cfilter *cf, return;
/* leaving this one */
switch(ts->state) {
- case TUNNEL_CONNECT:
+ case H2_TUNNEL_CONNECT:
data->req.ignorebody = FALSE;
break;
default:
@@ -145,29 +145,29 @@ static void tunnel_go_state(struct Curl_cfilter *cf, }
/* entering this one */
switch(new_state) {
- case TUNNEL_INIT:
+ case H2_TUNNEL_INIT:
DEBUGF(LOG_CF(data, cf, "new tunnel state 'init'"));
tunnel_stream_clear(ts);
break;
- case TUNNEL_CONNECT:
+ case H2_TUNNEL_CONNECT:
DEBUGF(LOG_CF(data, cf, "new tunnel state 'connect'"));
- ts->state = TUNNEL_CONNECT;
+ ts->state = H2_TUNNEL_CONNECT;
break;
- case TUNNEL_RESPONSE:
+ case H2_TUNNEL_RESPONSE:
DEBUGF(LOG_CF(data, cf, "new tunnel state 'response'"));
- ts->state = TUNNEL_RESPONSE;
+ ts->state = H2_TUNNEL_RESPONSE;
break;
- case TUNNEL_ESTABLISHED:
+ case H2_TUNNEL_ESTABLISHED:
DEBUGF(LOG_CF(data, cf, "new tunnel state 'established'"));
infof(data, "CONNECT phase completed");
data->state.authproxy.done = TRUE;
data->state.authproxy.multipass = FALSE;
/* FALLTHROUGH */
- case TUNNEL_FAILED:
- if(new_state == TUNNEL_FAILED)
+ case H2_TUNNEL_FAILED:
+ if(new_state == H2_TUNNEL_FAILED)
DEBUGF(LOG_CF(data, cf, "new tunnel state 'failed'"));
ts->state = new_state;
/* If a proxy-authorization header was used for the proxy, then we should
@@ -191,9 +191,11 @@ struct cf_h2_proxy_ctx { int32_t last_stream_id;
BIT(conn_closed);
BIT(goaway);
+ BIT(nw_out_blocked);
};
/* How to access `call_data` from a cf_h2 filter */
+#undef CF_CTX_CALL_DATA
#define CF_CTX_CALL_DATA(cf) \
((struct cf_h2_proxy_ctx *)(cf)->ctx)->call_data
@@ -219,35 +221,54 @@ static void cf_h2_proxy_ctx_free(struct cf_h2_proxy_ctx *ctx) }
}
-static ssize_t nw_in_reader(void *reader_ctx,
- unsigned char *buf, size_t buflen,
- CURLcode *err)
+static void drain_tunnel(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct tunnel_stream *tunnel)
+{
+ unsigned char bits;
+
+ (void)cf;
+ bits = CURL_CSELECT_IN;
+ if(!tunnel->closed && !tunnel->reset && tunnel->upload_blocked_len)
+ bits |= CURL_CSELECT_OUT;
+ if(data->state.dselect_bits != bits) {
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%d] DRAIN dselect_bits=%x",
+ tunnel->stream_id, bits));
+ data->state.dselect_bits = bits;
+ Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ }
+}
+
+static ssize_t proxy_nw_in_reader(void *reader_ctx,
+ unsigned char *buf, size_t buflen,
+ CURLcode *err)
{
struct Curl_cfilter *cf = reader_ctx;
struct Curl_easy *data = CF_DATA_CURRENT(cf);
ssize_t nread;
nread = Curl_conn_cf_recv(cf->next, data, (char *)buf, buflen, err);
- DEBUGF(LOG_CF(data, cf, "nw_in recv(len=%zu) -> %zd, %d",
+ DEBUGF(LOG_CF(data, cf, "nw_in_reader(len=%zu) -> %zd, %d",
buflen, nread, *err));
return nread;
}
-static ssize_t nw_out_writer(void *writer_ctx,
- const unsigned char *buf, size_t buflen,
- CURLcode *err)
+static ssize_t proxy_h2_nw_out_writer(void *writer_ctx,
+ const unsigned char *buf, size_t buflen,
+ CURLcode *err)
{
struct Curl_cfilter *cf = writer_ctx;
struct Curl_easy *data = CF_DATA_CURRENT(cf);
ssize_t nwritten;
nwritten = Curl_conn_cf_send(cf->next, data, (const char *)buf, buflen, err);
- DEBUGF(LOG_CF(data, cf, "nw_out send(len=%zu) -> %zd", buflen, nwritten));
+ DEBUGF(LOG_CF(data, cf, "nw_out_writer(len=%zu) -> %zd, %d",
+ buflen, nwritten, *err));
return nwritten;
}
-static int h2_client_new(struct Curl_cfilter *cf,
- nghttp2_session_callbacks *cbs)
+static int proxy_h2_client_new(struct Curl_cfilter *cf,
+ nghttp2_session_callbacks *cbs)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
nghttp2_option *o;
@@ -271,15 +292,18 @@ static int h2_client_new(struct Curl_cfilter *cf, static ssize_t on_session_send(nghttp2_session *h2,
const uint8_t *buf, size_t blen,
int flags, void *userp);
-static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
- void *userp);
-static int on_stream_close(nghttp2_session *session, int32_t stream_id,
- uint32_t error_code, void *userp);
-static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
- const uint8_t *name, size_t namelen,
- const uint8_t *value, size_t valuelen,
- uint8_t flags,
- void *userp);
+static int proxy_h2_on_frame_recv(nghttp2_session *session,
+ const nghttp2_frame *frame,
+ void *userp);
+static int proxy_h2_on_stream_close(nghttp2_session *session,
+ int32_t stream_id,
+ uint32_t error_code, void *userp);
+static int proxy_h2_on_header(nghttp2_session *session,
+ const nghttp2_frame *frame,
+ const uint8_t *name, size_t namelen,
+ const uint8_t *value, size_t valuelen,
+ uint8_t flags,
+ void *userp);
static int tunnel_recv_callback(nghttp2_session *session, uint8_t flags,
int32_t stream_id,
const uint8_t *mem, size_t len, void *userp);
@@ -298,8 +322,8 @@ static CURLcode cf_h2_proxy_ctx_init(struct Curl_cfilter *cf, DEBUGASSERT(!ctx->h2);
memset(&ctx->tunnel, 0, sizeof(ctx->tunnel));
- Curl_bufq_init(&ctx->inbufq, H2_NW_CHUNK_SIZE, H2_NW_RECV_CHUNKS);
- Curl_bufq_init(&ctx->outbufq, H2_NW_CHUNK_SIZE, H2_NW_SEND_CHUNKS);
+ Curl_bufq_init(&ctx->inbufq, H2_CHUNK_SIZE, PROXY_H2_NW_RECV_CHUNKS);
+ Curl_bufq_init(&ctx->outbufq, H2_CHUNK_SIZE, PROXY_H2_NW_SEND_CHUNKS);
if(tunnel_stream_init(cf, &ctx->tunnel))
goto out;
@@ -311,14 +335,16 @@ static CURLcode cf_h2_proxy_ctx_init(struct Curl_cfilter *cf, }
nghttp2_session_callbacks_set_send_callback(cbs, on_session_send);
- nghttp2_session_callbacks_set_on_frame_recv_callback(cbs, on_frame_recv);
+ nghttp2_session_callbacks_set_on_frame_recv_callback(
+ cbs, proxy_h2_on_frame_recv);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
cbs, tunnel_recv_callback);
- nghttp2_session_callbacks_set_on_stream_close_callback(cbs, on_stream_close);
- nghttp2_session_callbacks_set_on_header_callback(cbs, on_header);
+ nghttp2_session_callbacks_set_on_stream_close_callback(
+ cbs, proxy_h2_on_stream_close);
+ nghttp2_session_callbacks_set_on_header_callback(cbs, proxy_h2_on_header);
/* The nghttp2 session is not yet setup, do it */
- rc = h2_client_new(cf, cbs);
+ rc = proxy_h2_client_new(cf, cbs);
if(rc) {
failf(data, "Couldn't initialize nghttp2");
goto out;
@@ -343,7 +369,7 @@ static CURLcode cf_h2_proxy_ctx_init(struct Curl_cfilter *cf, }
rc = nghttp2_session_set_local_window_size(ctx->h2, NGHTTP2_FLAG_NONE, 0,
- HTTP2_HUGE_WINDOW_SIZE);
+ PROXY_HTTP2_HUGE_WINDOW_SIZE);
if(rc) {
failf(data, "nghttp2_session_set_local_window_size() failed: %s(%d)",
nghttp2_strerror(rc), rc);
@@ -362,27 +388,35 @@ out: return result;
}
-static CURLcode nw_out_flush(struct Curl_cfilter *cf,
- struct Curl_easy *data)
+static int should_close_session(struct cf_h2_proxy_ctx *ctx)
+{
+ return !nghttp2_session_want_read(ctx->h2) &&
+ !nghttp2_session_want_write(ctx->h2);
+}
+
+static CURLcode proxy_h2_nw_out_flush(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
- size_t buflen = Curl_bufq_len(&ctx->outbufq);
ssize_t nwritten;
CURLcode result;
(void)data;
- if(!buflen)
+ if(Curl_bufq_is_empty(&ctx->outbufq))
return CURLE_OK;
- DEBUGF(LOG_CF(data, cf, "h2 conn flush %zu bytes", buflen));
- nwritten = Curl_bufq_pass(&ctx->outbufq, nw_out_writer, cf, &result);
+ nwritten = Curl_bufq_pass(&ctx->outbufq, proxy_h2_nw_out_writer, cf,
+ &result);
if(nwritten < 0) {
+ if(result == CURLE_AGAIN) {
+ DEBUGF(LOG_CF(data, cf, "flush nw send buffer(%zu) -> EAGAIN",
+ Curl_bufq_len(&ctx->outbufq)));
+ ctx->nw_out_blocked = 1;
+ }
return result;
}
- if((size_t)nwritten < buflen) {
- return CURLE_AGAIN;
- }
- return CURLE_OK;
+ DEBUGF(LOG_CF(data, cf, "nw send buffer flushed"));
+ return Curl_bufq_is_empty(&ctx->outbufq)? CURLE_OK: CURLE_AGAIN;
}
/*
@@ -390,9 +424,9 @@ static CURLcode nw_out_flush(struct Curl_cfilter *cf, * This function returns 0 if it succeeds, or -1 and error code will
* be assigned to *err.
*/
-static int h2_process_pending_input(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- CURLcode *err)
+static int proxy_h2_process_pending_input(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ CURLcode *err)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
const unsigned char *buf;
@@ -422,19 +456,11 @@ static int h2_process_pending_input(struct Curl_cfilter *cf, }
}
- if(nghttp2_session_check_request_allowed(ctx->h2) == 0) {
- /* No more requests are allowed in the current session, so
- the connection may not be reused. This is set when a
- GOAWAY frame has been received or when the limit of stream
- identifiers has been reached. */
- connclose(cf->conn, "http/2: No new requests allowed");
- }
-
return 0;
}
-static CURLcode h2_progress_ingress(struct Curl_cfilter *cf,
- struct Curl_easy *data)
+static CURLcode proxy_h2_progress_ingress(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
CURLcode result = CURLE_OK;
@@ -442,9 +468,9 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf, /* Process network input buffer fist */
if(!Curl_bufq_is_empty(&ctx->inbufq)) {
- DEBUGF(LOG_CF(data, cf, "Process %zd bytes in connection buffer",
+ DEBUGF(LOG_CF(data, cf, "Process %zu bytes in connection buffer",
Curl_bufq_len(&ctx->inbufq)));
- if(h2_process_pending_input(cf, data, &result) < 0)
+ if(proxy_h2_process_pending_input(cf, data, &result) < 0)
return result;
}
@@ -455,8 +481,8 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf, Curl_bufq_is_empty(&ctx->inbufq) && /* and we consumed our input */
!Curl_bufq_is_full(&ctx->tunnel.recvbuf)) {
- nread = Curl_bufq_slurp(&ctx->inbufq, nw_in_reader, cf, &result);
- DEBUGF(LOG_CF(data, cf, "read %zd bytes nw data -> %zd, %d",
+ nread = Curl_bufq_slurp(&ctx->inbufq, proxy_nw_in_reader, cf, &result);
+ DEBUGF(LOG_CF(data, cf, "read %zu bytes nw data -> %zd, %d",
Curl_bufq_len(&ctx->inbufq), nread, result));
if(nread < 0) {
if(result != CURLE_AGAIN) {
@@ -470,7 +496,7 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf, break;
}
- if(h2_process_pending_input(cf, data, &result))
+ if(proxy_h2_process_pending_input(cf, data, &result))
return result;
}
@@ -481,25 +507,22 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf, return CURLE_OK;
}
-/*
- * Check if there's been an update in the priority /
- * dependency settings and if so it submits a PRIORITY frame with the updated
- * info.
- * Flush any out data pending in the network buffer.
- */
-static CURLcode h2_progress_egress(struct Curl_cfilter *cf,
- struct Curl_easy *data)
+static CURLcode proxy_h2_progress_egress(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
int rv = 0;
- rv = nghttp2_session_send(ctx->h2);
+ ctx->nw_out_blocked = 0;
+ while(!rv && !ctx->nw_out_blocked && nghttp2_session_want_write(ctx->h2))
+ rv = nghttp2_session_send(ctx->h2);
+
if(nghttp2_is_fatal(rv)) {
DEBUGF(LOG_CF(data, cf, "nghttp2_session_send error (%s)%d",
nghttp2_strerror(rv), rv));
return CURLE_SEND_ERROR;
}
- return nw_out_flush(cf, data);
+ return proxy_h2_nw_out_flush(cf, data);
}
static ssize_t on_session_send(nghttp2_session *h2,
@@ -517,7 +540,7 @@ static ssize_t on_session_send(nghttp2_session *h2, DEBUGASSERT(data);
nwritten = Curl_bufq_write_pass(&ctx->outbufq, buf, blen,
- nw_out_writer, cf, &result);
+ proxy_h2_nw_out_writer, cf, &result);
if(nwritten < 0) {
if(result == CURLE_AGAIN) {
return NGHTTP2_ERR_WOULDBLOCK;
@@ -532,8 +555,9 @@ static ssize_t on_session_send(nghttp2_session *h2, return nwritten;
}
-static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
- void *userp)
+static int proxy_h2_on_frame_recv(nghttp2_session *session,
+ const nghttp2_frame *frame,
+ void *userp)
{
struct Curl_cfilter *cf = userp;
struct cf_h2_proxy_ctx *ctx = cf->ctx;
@@ -616,11 +640,12 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, return 0;
}
-static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
- const uint8_t *name, size_t namelen,
- const uint8_t *value, size_t valuelen,
- uint8_t flags,
- void *userp)
+static int proxy_h2_on_header(nghttp2_session *session,
+ const nghttp2_frame *frame,
+ const uint8_t *name, size_t namelen,
+ const uint8_t *value, size_t valuelen,
+ uint8_t flags,
+ void *userp)
{
struct Curl_cfilter *cf = userp;
struct cf_h2_proxy_ctx *ctx = cf->ctx;
@@ -752,8 +777,9 @@ static int tunnel_recv_callback(nghttp2_session *session, uint8_t flags, return 0;
}
-static int on_stream_close(nghttp2_session *session, int32_t stream_id,
- uint32_t error_code, void *userp)
+static int proxy_h2_on_stream_close(nghttp2_session *session,
+ int32_t stream_id,
+ uint32_t error_code, void *userp)
{
struct Curl_cfilter *cf = userp;
struct cf_h2_proxy_ctx *ctx = cf->ctx;
@@ -765,7 +791,7 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id, if(stream_id != ctx->tunnel.stream_id)
return 0;
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] on_stream_close, %s (err %d)",
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%u] proxy_h2_on_stream_close, %s (err %d)",
stream_id, nghttp2_http2_strerror(error_code), error_code));
ctx->tunnel.closed = TRUE;
ctx->tunnel.error = error_code;
@@ -773,15 +799,15 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id, return 0;
}
-static CURLcode h2_submit(int32_t *pstream_id,
- struct Curl_cfilter *cf,
- struct Curl_easy *data,
- nghttp2_session *h2,
- struct httpreq *req,
- const nghttp2_priority_spec *pri_spec,
- void *stream_user_data,
- nghttp2_data_source_read_callback read_callback,
- void *read_ctx)
+static CURLcode proxy_h2_submit(int32_t *pstream_id,
+ struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ nghttp2_session *h2,
+ struct httpreq *req,
+ const nghttp2_priority_spec *pri_spec,
+ void *stream_user_data,
+ nghttp2_data_source_read_callback read_callback,
+ void *read_ctx)
{
struct dynhds h2_headers;
nghttp2_nv *nva = NULL;
@@ -881,8 +907,8 @@ static CURLcode submit_CONNECT(struct Curl_cfilter *cf, if(result)
goto out;
- result = h2_submit(&ts->stream_id, cf, data, ctx->h2, req,
- NULL, ts, tunnel_send_callback, cf);
+ result = proxy_h2_submit(&ts->stream_id, cf, data, ctx->h2, req,
+ NULL, ts, tunnel_send_callback, cf);
if(result) {
DEBUGF(LOG_CF(data, cf, "send: nghttp2_submit_request error (%s)%u",
nghttp2_strerror(ts->stream_id), ts->stream_id));
@@ -907,7 +933,7 @@ static CURLcode inspect_response(struct Curl_cfilter *cf, DEBUGASSERT(ts->resp);
if(ts->resp->status/100 == 2) {
infof(data, "CONNECT tunnel established, response %d", ts->resp->status);
- tunnel_go_state(cf, ts, TUNNEL_ESTABLISHED, data);
+ h2_tunnel_go_state(cf, ts, H2_TUNNEL_ESTABLISHED, data);
return CURLE_OK;
}
@@ -928,7 +954,7 @@ static CURLcode inspect_response(struct Curl_cfilter *cf, if(data->req.newurl) {
/* Inidicator that we should try again */
Curl_safefree(data->req.newurl);
- tunnel_go_state(cf, ts, TUNNEL_INIT, data);
+ h2_tunnel_go_state(cf, ts, H2_TUNNEL_INIT, data);
return CURLE_OK;
}
}
@@ -937,9 +963,9 @@ static CURLcode inspect_response(struct Curl_cfilter *cf, return CURLE_RECV_ERROR;
}
-static CURLcode CONNECT(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- struct tunnel_stream *ts)
+static CURLcode H2_CONNECT(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct tunnel_stream *ts)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
CURLcode result = CURLE_OK;
@@ -948,27 +974,27 @@ static CURLcode CONNECT(struct Curl_cfilter *cf, DEBUGASSERT(ts->authority);
do {
switch(ts->state) {
- case TUNNEL_INIT:
+ case H2_TUNNEL_INIT:
/* Prepare the CONNECT request and make a first attempt to send. */
DEBUGF(LOG_CF(data, cf, "CONNECT start for %s", ts->authority));
result = submit_CONNECT(cf, data, ts);
if(result)
goto out;
- tunnel_go_state(cf, ts, TUNNEL_CONNECT, data);
+ h2_tunnel_go_state(cf, ts, H2_TUNNEL_CONNECT, data);
/* FALLTHROUGH */
- case TUNNEL_CONNECT:
+ case H2_TUNNEL_CONNECT:
/* see that the request is completely sent */
- result = h2_progress_ingress(cf, data);
+ result = proxy_h2_progress_ingress(cf, data);
if(!result)
- result = h2_progress_egress(cf, data);
- if(result) {
- tunnel_go_state(cf, ts, TUNNEL_FAILED, data);
+ result = proxy_h2_progress_egress(cf, data);
+ if(result && result != CURLE_AGAIN) {
+ h2_tunnel_go_state(cf, ts, H2_TUNNEL_FAILED, data);
break;
}
if(ts->has_final_response) {
- tunnel_go_state(cf, ts, TUNNEL_RESPONSE, data);
+ h2_tunnel_go_state(cf, ts, H2_TUNNEL_RESPONSE, data);
}
else {
result = CURLE_OK;
@@ -976,28 +1002,28 @@ static CURLcode CONNECT(struct Curl_cfilter *cf, }
/* FALLTHROUGH */
- case TUNNEL_RESPONSE:
+ case H2_TUNNEL_RESPONSE:
DEBUGASSERT(ts->has_final_response);
result = inspect_response(cf, data, ts);
if(result)
goto out;
break;
- case TUNNEL_ESTABLISHED:
+ case H2_TUNNEL_ESTABLISHED:
return CURLE_OK;
- case TUNNEL_FAILED:
+ case H2_TUNNEL_FAILED:
return CURLE_RECV_ERROR;
default:
break;
}
- } while(ts->state == TUNNEL_INIT);
+ } while(ts->state == H2_TUNNEL_INIT);
out:
if(result || ctx->tunnel.closed)
- tunnel_go_state(cf, ts, TUNNEL_FAILED, data);
+ h2_tunnel_go_state(cf, ts, H2_TUNNEL_FAILED, data);
return result;
}
@@ -1043,10 +1069,10 @@ static CURLcode cf_h2_proxy_connect(struct Curl_cfilter *cf, /* for the secondary socket (FTP), use the "connect to host"
* but ignore the "connect to port" (use the secondary port)
*/
- result = CONNECT(cf, data, ts);
+ result = H2_CONNECT(cf, data, ts);
out:
- *done = (result == CURLE_OK) && (ts->state == TUNNEL_ESTABLISHED);
+ *done = (result == CURLE_OK) && (ts->state == H2_TUNNEL_ESTABLISHED);
cf->connected = *done;
CF_DATA_RESTORE(cf, save);
return result;
@@ -1082,7 +1108,7 @@ static bool cf_h2_proxy_data_pending(struct Curl_cfilter *cf, {
struct cf_h2_proxy_ctx *ctx = cf->ctx;
if((ctx && !Curl_bufq_is_empty(&ctx->inbufq)) ||
- (ctx && ctx->tunnel.state == TUNNEL_ESTABLISHED &&
+ (ctx && ctx->tunnel.state == H2_TUNNEL_ESTABLISHED &&
!Curl_bufq_is_empty(&ctx->tunnel.recvbuf)))
return TRUE;
return cf->next? cf->next->cft->has_data_pending(cf->next, data) : FALSE;
@@ -1188,14 +1214,14 @@ static ssize_t cf_h2_proxy_recv(struct Curl_cfilter *cf, struct cf_call_data save;
CURLcode result;
- if(ctx->tunnel.state != TUNNEL_ESTABLISHED) {
+ if(ctx->tunnel.state != H2_TUNNEL_ESTABLISHED) {
*err = CURLE_RECV_ERROR;
return -1;
}
CF_DATA_SAVE(save, cf, data);
if(Curl_bufq_is_empty(&ctx->tunnel.recvbuf)) {
- *err = h2_progress_ingress(cf, data);
+ *err = proxy_h2_progress_ingress(cf, data);
if(*err)
goto out;
}
@@ -1208,13 +1234,19 @@ static ssize_t cf_h2_proxy_recv(struct Curl_cfilter *cf, nghttp2_session_consume(ctx->h2, ctx->tunnel.stream_id, (size_t)nread);
}
- result = h2_progress_egress(cf, data);
- if(result) {
+ result = proxy_h2_progress_egress(cf, data);
+ if(result && result != CURLE_AGAIN) {
*err = result;
nread = -1;
}
out:
+ if(!Curl_bufq_is_empty(&ctx->tunnel.recvbuf) &&
+ (nread >= 0 || *err == CURLE_AGAIN)) {
+ /* data pending and no fatal error to report. Need to trigger
+ * draining to avoid stalling when no socket events happen. */
+ drain_tunnel(cf, data, &ctx->tunnel);
+ }
DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_recv(len=%zu) -> %zd %d",
ctx->tunnel.stream_id, len, nread, *err));
CF_DATA_RESTORE(cf, save);
@@ -1223,93 +1255,188 @@ out: static ssize_t cf_h2_proxy_send(struct Curl_cfilter *cf,
struct Curl_easy *data,
- const void *mem, size_t len, CURLcode *err)
+ const void *buf, size_t len, CURLcode *err)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
struct cf_call_data save;
- ssize_t nwritten = -1;
- const unsigned char *buf = mem;
- size_t start_len = len;
int rv;
+ ssize_t nwritten;
+ CURLcode result;
+ int blocked = 0;
- if(ctx->tunnel.state != TUNNEL_ESTABLISHED) {
+ if(ctx->tunnel.state != H2_TUNNEL_ESTABLISHED) {
*err = CURLE_SEND_ERROR;
return -1;
}
CF_DATA_SAVE(save, cf, data);
- while(len) {
+ if(ctx->tunnel.closed) {
+ nwritten = -1;
+ *err = CURLE_SEND_ERROR;
+ goto out;
+ }
+ else if(ctx->tunnel.upload_blocked_len) {
+ /* the data in `buf` has alread been submitted or added to the
+ * buffers, but have been EAGAINed on the last invocation. */
+ DEBUGASSERT(len >= ctx->tunnel.upload_blocked_len);
+ if(len < ctx->tunnel.upload_blocked_len) {
+ /* Did we get called again with a smaller `len`? This should not
+ * happend. We are not prepared to handle that. */
+ failf(data, "HTTP/2 proxy, send again with decreased length");
+ *err = CURLE_HTTP2;
+ nwritten = -1;
+ goto out;
+ }
+ nwritten = (ssize_t)ctx->tunnel.upload_blocked_len;
+ ctx->tunnel.upload_blocked_len = 0;
+ }
+ else {
nwritten = Curl_bufq_write(&ctx->tunnel.sendbuf, buf, len, err);
- if(nwritten <= 0) {
- if(*err && *err != CURLE_AGAIN) {
- DEBUGF(LOG_CF(data, cf, "error adding data to tunnel sendbuf: %d",
- *err));
- nwritten = -1;
+ if(nwritten < 0) {
+ if(*err != CURLE_AGAIN)
goto out;
- }
- /* blocked */
nwritten = 0;
}
- else {
- DEBUGASSERT((size_t)nwritten <= len);
- buf += (size_t)nwritten;
- len -= (size_t)nwritten;
- }
+ }
- /* resume the tunnel stream and let the h2 session send, which
- * triggers reading from tunnel.sendbuf */
+ if(!Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) {
+ /* req body data is buffered, resume the potentially suspended stream */
rv = nghttp2_session_resume_data(ctx->h2, ctx->tunnel.stream_id);
if(nghttp2_is_fatal(rv)) {
*err = CURLE_SEND_ERROR;
nwritten = -1;
goto out;
}
- *err = h2_progress_egress(cf, data);
- if(*err) {
- nwritten = -1;
- goto out;
- }
-
- if(!nwritten && Curl_bufq_is_full(&ctx->tunnel.sendbuf)) {
- size_t rwin;
- /* we could not add to the buffer and after session processing,
- * it is still full. */
- rwin = nghttp2_session_get_stream_remote_window_size(
- ctx->h2, ctx->tunnel.stream_id);
- DEBUGF(LOG_CF(data, cf, "cf_send: tunnel win %u/%zu",
- nghttp2_session_get_remote_window_size(ctx->h2), rwin));
- if(rwin == 0) {
- /* We cannot upload more as the stream's remote window size
- * is 0. We need to receive WIN_UPDATEs before we can continue.
- */
- data->req.keepon |= KEEP_SEND_HOLD;
- DEBUGF(LOG_CF(data, cf, "pausing send as remote flow "
- "window is exhausted"));
- }
- break;
- }
}
- nwritten = start_len - len;
- if(nwritten > 0) {
- *err = CURLE_OK;
+ /* Call the nghttp2 send loop and flush to write ALL buffered data,
+ * headers and/or request body completely out to the network */
+ result = proxy_h2_progress_egress(cf, data);
+ if(result == CURLE_AGAIN) {
+ blocked = 1;
}
- else if(ctx->tunnel.closed) {
+ else if(result) {
+ *err = result;
nwritten = -1;
- *err = CURLE_SEND_ERROR;
+ goto out;
}
- else {
- nwritten = -1;
+ else if(!Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) {
+ /* although we wrote everything that nghttp2 wants to send now,
+ * there is data left in our stream send buffer unwritten. This may
+ * be due to the stream's HTTP/2 flow window being exhausted. */
+ blocked = 1;
+ }
+
+ if(blocked) {
+ /* Unable to send all data, due to connection blocked or H2 window
+ * exhaustion. Data is left in our stream buffer, or nghttp2's internal
+ * frame buffer or our network out buffer. */
+ size_t rwin = nghttp2_session_get_stream_remote_window_size(
+ ctx->h2, ctx->tunnel.stream_id);
+ if(rwin == 0) {
+ /* H2 flow window exhaustion.
+ * FIXME: there is no way to HOLD all transfers that use this
+ * proxy connection AND to UNHOLD all of them again when the
+ * window increases.
+ * We *could* iterate over all data on this conn maybe? */
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%d] remote flow "
+ "window is exhausted", ctx->tunnel.stream_id));
+ }
+
+ /* Whatever the cause, we need to return CURL_EAGAIN for this call.
+ * We have unwritten state that needs us being invoked again and EAGAIN
+ * is the only way to ensure that. */
+ ctx->tunnel.upload_blocked_len = nwritten;
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%d] cf_send(len=%zu) BLOCK: win %u/%zu "
+ "blocked_len=%zu",
+ ctx->tunnel.stream_id, len,
+ nghttp2_session_get_remote_window_size(ctx->h2), rwin,
+ nwritten));
*err = CURLE_AGAIN;
+ nwritten = -1;
+ goto out;
+ }
+ else if(should_close_session(ctx)) {
+ /* nghttp2 thinks this session is done. If the stream has not been
+ * closed, this is an error state for out transfer */
+ if(ctx->tunnel.closed) {
+ *err = CURLE_SEND_ERROR;
+ nwritten = -1;
+ }
+ else {
+ DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
+ *err = CURLE_HTTP2;
+ nwritten = -1;
+ }
}
out:
- DEBUGF(LOG_CF(data, cf, "cf_send(len=%zu) -> %zd, %d ",
- start_len, nwritten, *err));
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%d] cf_send(len=%zu) -> %zd, %d, "
+ "h2 windows %d-%d (stream-conn), "
+ "buffers %zu-%zu (stream-conn)",
+ ctx->tunnel.stream_id, len, nwritten, *err,
+ nghttp2_session_get_stream_remote_window_size(
+ ctx->h2, ctx->tunnel.stream_id),
+ nghttp2_session_get_remote_window_size(ctx->h2),
+ Curl_bufq_len(&ctx->tunnel.sendbuf),
+ Curl_bufq_len(&ctx->outbufq)));
CF_DATA_RESTORE(cf, save);
return nwritten;
}
+static bool proxy_h2_connisalive(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ bool *input_pending)
+{
+ struct cf_h2_proxy_ctx *ctx = cf->ctx;
+ bool alive = TRUE;
+
+ *input_pending = FALSE;
+ if(!cf->next || !cf->next->cft->is_alive(cf->next, data, input_pending))
+ return FALSE;
+
+ if(*input_pending) {
+ /* This happens before we've sent off a request and the connection is
+ not in use by any other transfer, there shouldn't be any data here,
+ only "protocol frames" */
+ CURLcode result;
+ ssize_t nread = -1;
+
+ *input_pending = FALSE;
+ nread = Curl_bufq_slurp(&ctx->inbufq, proxy_nw_in_reader, cf, &result);
+ if(nread != -1) {
+ if(proxy_h2_process_pending_input(cf, data, &result) < 0)
+ /* immediate error, considered dead */
+ alive = FALSE;
+ else {
+ alive = !should_close_session(ctx);
+ }
+ }
+ else if(result != CURLE_AGAIN) {
+ /* the read failed so let's say this is dead anyway */
+ alive = FALSE;
+ }
+ }
+
+ return alive;
+}
+
+static bool cf_h2_proxy_is_alive(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ bool *input_pending)
+{
+ struct cf_h2_proxy_ctx *ctx = cf->ctx;
+ CURLcode result;
+ struct cf_call_data save;
+
+ CF_DATA_SAVE(save, cf, data);
+ result = (ctx && ctx->h2 && proxy_h2_connisalive(cf, data, input_pending));
+ DEBUGF(LOG_CF(data, cf, "conn alive -> %d, input_pending=%d",
+ result, *input_pending));
+ CF_DATA_RESTORE(cf, save);
+ return result;
+}
+
struct Curl_cftype Curl_cft_h2_proxy = {
"H2-PROXY",
CF_TYPE_IP_CONNECT,
@@ -1323,7 +1450,7 @@ struct Curl_cftype Curl_cft_h2_proxy = { cf_h2_proxy_send,
cf_h2_proxy_recv,
Curl_cf_def_cntrl,
- Curl_cf_def_conn_is_alive,
+ cf_h2_proxy_is_alive,
Curl_cf_def_conn_keep_alive,
Curl_cf_def_query,
};
|