From 1fceee8316ba2a30cded3f42f958be83f7bf4c94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20L=C3=A9caille?= Date: Mon, 25 Feb 2019 15:30:36 +0100 Subject: [PATCH] MINOR: http_fetch: add "req.ungrpc" sample fetch for gRPC. This patch implements "req.ungrpc" sample fetch method to decode and parse a gRPC request. It takes only one argument: a protocol buffers field number to identify the protocol buffers message number to be looked up. This argument is a sort of path in dotted notation to the terminal field number to be retrieved. ex: req.ungrpc(1.2.3.4) This sample fetch catch the data in raw mode, without interpreting them. Some protocol buffers specific converters may be used to convert the data to the correct type. --- include/proto/protocol_buffers.h | 185 ++++++++++++++++++++++++ src/http_fetch.c | 241 +++++++++++++++++++++++++++++++ 2 files changed, 426 insertions(+) create mode 100644 include/proto/protocol_buffers.h diff --git a/include/proto/protocol_buffers.h b/include/proto/protocol_buffers.h new file mode 100644 index 000000000..d210a7250 --- /dev/null +++ b/include/proto/protocol_buffers.h @@ -0,0 +1,185 @@ +/* + * include/proto/protocol_buffers.h + * This file contains functions and macros declarations for protocol buffers decoding. + * + * Copyright 2012 Willy Tarreau + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, version 2.1 + * exclusively. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef _PROTO_PROTOCOL_BUFFERS_H +#define _PROTO_PROTOCOL_BUFFERS_H + +#include + +#define PBUF_TYPE_VARINT 0 +#define PBUF_TYPE_64BIT 1 +#define PBUF_TYPE_LENGTH_DELIMITED 2 +#define PBUF_TYPE_START_GROUP 3 +#define PBUF_TYPE_STOP_GROUP 4 +#define PBUF_TYPE_32BIT 5 + +#define PBUF_VARINT_DONT_STOP_BIT 7 +#define PBUF_VARINT_DONT_STOP_BITMASK (1 << PBUF_VARINT_DONT_STOP_BIT) +#define PBUF_VARINT_DATA_BITMASK ~PBUF_VARINT_DONT_STOP_BITMASK + +/* + * Decode a protocol buffers varint located in a buffer at address with + * as length. The decoded value is stored at . + * Returns 1 if succeeded, 0 if not. + */ +static inline int +protobuf_varint(uint64_t *val, unsigned char *pos, size_t len) +{ + unsigned int shift; + + *val = 0; + shift = 0; + + while (len > 0) { + int stop = !(*pos & PBUF_VARINT_DONT_STOP_BITMASK); + + *val |= ((uint64_t)(*pos & PBUF_VARINT_DATA_BITMASK)) << shift; + + ++pos; + --len; + + if (stop) + break; + else if (!len) + return 0; + + shift += 7; + /* The maximum length in bytes of a 64-bit encoded value is 10. */ + if (shift > 70) + return 0; + } + + return 1; +} + +/* + * Decode a protocol buffers varint located in a buffer at offset address with + * as length address. Update and consequently. Decrease <*len> + * by the number of decoded bytes. The decoded value is stored at . + * Returns 1 if succeeded, 0 if not. + */ +static inline int +protobuf_decode_varint(uint64_t *val, unsigned char **pos, size_t *len) +{ + unsigned int shift; + + *val = 0; + shift = 0; + + while (*len > 0) { + int stop = !(**pos & PBUF_VARINT_DONT_STOP_BITMASK); + + *val |= ((uint64_t)**pos & PBUF_VARINT_DATA_BITMASK) << shift; + + ++*pos; + --*len; + + if (stop) + break; + else if (!*len) + return 0; + + shift += 7; + /* The maximum length in bytes of a 64-bit encoded value is 10. */ + if (shift > 70) + return 0; + } + + return 1; +} + +/* + * Skip a protocol buffer varint found at as position address with + * as available length address. Update <*pos> to make it point to the next + * available byte. Decrease <*len> by the number of skipped bytes. + * Returns 1 if succeeded, 0 if not. + */ +static inline int +protobuf_skip_varint(unsigned char **pos, size_t *len) +{ + unsigned int shift; + + shift = 0; + + while (*len > 0) { + int stop = !(**pos & PBUF_VARINT_DONT_STOP_BITMASK); + + ++*pos; + --*len; + + if (stop) + break; + else if (!*len) + return 0; + + shift += 7; + /* The maximum length in bytes of a 64-bit encoded value is 10. */ + if (shift > 70) + return 0; + } + + return 1; +} + +/* + * If succeeded, return the length of a prococol buffers varint found at as + * position address, with as address of the available bytes at <*pos>. + * Update <*pos> to make it point to the next available byte. Decrease <*len> + * by the number of bytes used to encode this varint. + * Return -1 if failed. + */ +static inline int +protobuf_varint_getlen(unsigned char **pos, size_t *len) +{ + unsigned char *spos; + unsigned int shift; + + shift = 0; + spos = *pos; + + while (*len > 0) { + int stop = !(**pos & PBUF_VARINT_DONT_STOP_BITMASK); + + ++*pos; + --*len; + + if (stop) + break; + else if (!*len) + return -1; + + shift += 7; + /* The maximum length in bytes of a 64-bit encoded value is 10. */ + if (shift > 70) + return -1; + } + + return *pos - spos; +} + +#endif /* _PROTO_PROTOCOL_BUFFERS_H */ + +/* + * Local variables: + * c-indent-level: 8 + * c-basic-offset: 8 + * End: + */ diff --git a/src/http_fetch.c b/src/http_fetch.c index 51f2ef13f..8f88646ea 100644 --- a/src/http_fetch.c +++ b/src/http_fetch.c @@ -39,6 +39,7 @@ #include #include #include +#include #include #include @@ -1516,6 +1517,245 @@ static int smp_fetch_hdr_val(const struct arg *args, struct sample *smp, const c return ret; } +static inline struct buffer * +smp_fetch_body_buf(const struct arg *args, struct sample *smp) +{ + struct buffer *buf; + + if (IS_HTX_SMP(smp) || (smp->px->mode == PR_MODE_TCP)) { + /* HTX version */ + struct htx *htx = smp_prefetch_htx(smp, args); + int32_t pos; + + if (!htx) + return NULL; + + buf = get_trash_chunk(); + for (pos = htx_get_head(htx); pos != -1; pos = htx_get_next(htx, pos)) { + struct htx_blk *blk = htx_get_blk(htx, pos); + enum htx_blk_type type = htx_get_blk_type(blk); + + if (type == HTX_BLK_EOM || type == HTX_BLK_EOD) + break; + if (type == HTX_BLK_DATA) { + if (!htx_data_to_h1(htx_get_blk_value(htx, blk), buf, 0)) + return NULL; + } + } + } + else { + /* LEGACY version */ + struct http_msg *msg; + unsigned long len; + unsigned long block1; + char *body; + + if (smp_prefetch_http(smp->px, smp->strm, smp->opt, args, smp, 1) <= 0) + return NULL; + + if ((smp->opt & SMP_OPT_DIR) == SMP_OPT_DIR_REQ) + msg = &smp->strm->txn->req; + else + msg = &smp->strm->txn->rsp; + + len = http_body_bytes(msg); + body = c_ptr(msg->chn, -http_data_rewind(msg)); + + block1 = len; + if (block1 > b_wrap(&msg->chn->buf) - body) + block1 = b_wrap(&msg->chn->buf) - body; + + buf = get_trash_chunk(); + if (block1 == len) { + /* buffer is not wrapped (or empty) */ + memcpy(buf->area, body, len); + } + else { + /* buffer is wrapped, we need to defragment it */ + memcpy(buf->area, body, block1); + memcpy(buf->area + block1, b_orig(&msg->chn->buf), + len - block1); + } + buf->data = len; + } + + return buf; +} + +#define GRPC_MSG_COMPRESS_FLAG_SZ 1 /* 1 byte */ +#define GRPC_MSG_LENGTH_SZ 4 /* 4 bytes */ +#define GRPC_MSG_HEADER_SZ (GRPC_MSG_COMPRESS_FLAG_SZ + GRPC_MSG_LENGTH_SZ) + +/* + * Fetch a gRPC field value. Takes a mandatory argument: the field identifier + * (dotted notation) internally represented as an array of unsigned integers + * and its size. + * Return 1 if the field was found, 0 if not. + */ +static int smp_fetch_req_ungrpc(const struct arg *args, struct sample *smp, const char *kw, void *private) +{ + struct buffer *body; + unsigned char *pos; + size_t grpc_left; + unsigned int *fid; + size_t fid_sz; + + if (!smp->strm) + return 0; + + fid = args[0].data.fid.ids; + fid_sz = args[0].data.fid.sz; + + body = smp_fetch_body_buf(args, smp); + if (!body) + return 0; + + pos = (unsigned char *)body->area; + /* Remaining bytes in the body to be parsed. */ + grpc_left = body->data; + + while (grpc_left > GRPC_MSG_COMPRESS_FLAG_SZ + GRPC_MSG_LENGTH_SZ) { + int next_field, found; + size_t grpc_msg_len, left; + unsigned int wire_type, field_number; + uint64_t key, elen; + + grpc_msg_len = left = ntohl(*(uint32_t *)(pos + GRPC_MSG_COMPRESS_FLAG_SZ)); + + pos += GRPC_MSG_HEADER_SZ; + grpc_left -= GRPC_MSG_HEADER_SZ; + + if (grpc_left < left) + return 0; + + found = 1; + /* Length of the length-delimited messages if any. */ + elen = 0; + + /* Message decoding: there may be serveral key+value protobuf pairs by + * gRPC message. + */ + next_field = 0; + while (next_field < fid_sz) { + uint64_t sleft; + + if ((ssize_t)left <= 0) + return 0; + + /* Remaining bytes saving. */ + sleft = left; + + /* Key decoding */ + if (!protobuf_decode_varint(&key, &pos, &left)) + return 0; + + wire_type = key & 0x7; + field_number = key >> 3; + found = field_number == fid[next_field]; + + if (found && field_number != fid[next_field]) + found = 0; + + switch (wire_type) { + case PBUF_TYPE_VARINT: + { + if (!found) { + protobuf_skip_varint(&pos, &left); + } else if (next_field == fid_sz - 1) { + int varint_len; + unsigned char *spos = pos; + + varint_len = protobuf_varint_getlen(&pos, &left); + if (varint_len == -1) + return 0; + + smp->data.type = SMP_T_BIN; + smp->data.u.str.area = (char *)spos; + smp->data.u.str.data = varint_len; + smp->flags = SMP_F_VOL_TEST; + return 1; + } + break; + } + + case PBUF_TYPE_64BIT: + { + if (!found) { + pos += sizeof(uint64_t); + left -= sizeof(uint64_t); + } else if (next_field == fid_sz - 1) { + smp->data.type = SMP_T_BIN; + smp->data.u.str.area = (char *)pos; + smp->data.u.str.data = sizeof(uint64_t); + smp->flags = SMP_F_VOL_TEST; + return 1; + } + break; + } + + case PBUF_TYPE_LENGTH_DELIMITED: + { + /* Decode the length of this length-delimited field. */ + if (!protobuf_decode_varint(&elen, &pos, &left)) + return 0; + + if (elen > left) + return 0; + + /* The size of the current field is computed from here do skip + * the bytes to encode the previous lenght.* + */ + sleft = left; + if (!found) { + /* Skip the current length-delimited field. */ + pos += elen; + left -= elen; + break; + } else if (next_field == fid_sz - 1) { + smp->data.type = SMP_T_BIN; + smp->data.u.str.area = (char *)pos; + smp->data.u.str.data = elen; + smp->flags = SMP_F_VOL_TEST; + return 1; + } + break; + } + + case PBUF_TYPE_32BIT: + { + if (!found) { + pos += sizeof(uint32_t); + left -= sizeof(uint32_t); + } else if (next_field == fid_sz - 1) { + smp->data.type = SMP_T_BIN; + smp->data.u.str.area = (char *)pos; + smp->data.u.str.data = sizeof(uint32_t); + smp->flags = SMP_F_VOL_TEST; + return 1; + } + break; + } + + default: + return 0; + } + + if ((ssize_t)(elen) > 0) + elen -= sleft - left; + + if (found) { + next_field++; + } + else if ((ssize_t)elen <= 0) { + next_field = 0; + } + } + grpc_left -= grpc_msg_len; + } + + return 0; +} + /* Fetch an HTTP header's IP value. takes a mandatory argument of type string * and an optional one of type int to designate a specific occurrence. * It returns an IPv4 or IPv6 address. @@ -2882,6 +3122,7 @@ static struct sample_fetch_kw_list sample_fetch_keywords = {ILH, { { "req.hdr_ip", smp_fetch_hdr_ip, ARG2(0,STR,SINT), val_hdr, SMP_T_IPV4, SMP_USE_HRQHV }, { "req.hdr_names", smp_fetch_hdr_names, ARG1(0,STR), NULL, SMP_T_STR, SMP_USE_HRQHV }, { "req.hdr_val", smp_fetch_hdr_val, ARG2(0,STR,SINT), val_hdr, SMP_T_SINT, SMP_USE_HRQHV }, + { "req.ungrpc", smp_fetch_req_ungrpc, ARG1(1, PBUF_FNUM), NULL, SMP_T_BIN, SMP_USE_HRQHV }, /* explicit req.{cook,hdr} are used to force the fetch direction to be response-only */ { "res.cook", smp_fetch_cookie, ARG1(0,STR), NULL, SMP_T_STR, SMP_USE_HRSHV },