[HN Gopher] OpenTelemetry at Scale: Using Kafka to handle bursty...
       ___________________________________________________________________
        
       OpenTelemetry at Scale: Using Kafka to handle bursty traffic
        
       Author : pranay01
       Score  : 138 points
       Date   : 2023-10-22 18:58 UTC (14 hours ago)
        
 (HTM) web link (signoz.io)
 (TXT) w3m dump (signoz.io)
        
       | francoismassot wrote:
       | I heard several times that Kafka was put in front of
       | elasticsearch clusters for handling traffic burst. You can also
       | use Redpanda, Pulsar, NATS and other distributed queues.
       | 
       | One thing that is also very interesting with Kafka is that you
       | can achieve exactly-once semantic without too much efforts: by
       | keeping track of the positions of partitions in your own database
       | and carefully acknowledging them when you are sure data is safely
       | stored in your db. That's what we did with our engine Quickwit,
       | so far it's the most efficient way to index data in it.
       | 
       | One obvious drawback with Kafka is that it's one more piece to
       | maintain... and it's not a small one.
        
         | pranay01 wrote:
         | Have you done/seen any benchmarks between Redpanda/NATS and
         | Kafka for this use case?
         | 
         | Some folks in SigNoz community have also suggested NATS for
         | this, but I have not deep dived into benchmarks/features yet
        
           | francoismassot wrote:
           | Unfortunately no :/
        
         | radicality wrote:
         | Exactly-once semantics of what specifically? Or do you mean at-
         | least-once ?
        
           | francoismassot wrote:
           | Exactly-once semantic between Kafka and the observability
           | engine.
        
         | richieartoul wrote:
         | You have to do a bit more than that if you want exactly once
         | end-to-end (I.E if Kafka itself can contain duplicates). One of
         | my former colleagues did a good write up on how Husky does it:
         | https://www.datadoghq.com/blog/engineering/husky-deep-dive/
        
           | francoismassot wrote:
           | Yeah, I was only talking about exactly once semantic between
           | Kafka and Quickwit.
        
         | viraptor wrote:
         | > exactly-once semantic without too much efforts: by keeping
         | track of the positions of partitions in your own database and
         | carefully acknowledging them when you are sure data is safely
         | stored in your db
         | 
         | That's not really "exactly once". What happens when your system
         | dies after it made sure the data is safely stored in the db and
         | before ack-ing?
        
           | Svenskunganka wrote:
           | Depending on how you use the database it is. If you write the
           | data as well as the offset to the DB in the same transaction,
           | you can then seek to the offset stored in the DB after
           | application restart and continue from there.
        
             | viraptor wrote:
             | > after application restart and continue from there.
             | 
             | What if the application doesn't restart before the queue
             | decides the message was lost and resends?
        
               | hashhar wrote:
               | In Kafka the "queue" is dumb, it doesn't lose messages
               | (it's an append only durable log) nor does it resend
               | anything unless the consumer requests it.
        
             | mirekrusin wrote:
             | You should drop "(...) and carefully acknowledging them
             | when you are sure data is safely stored in your db (...)"
             | part then, because it means it's not necessary, you don't
             | rely on it.
             | 
             | One-or-more semantics + local deduplication gives one-and-
             | only semantics.
             | 
             | In this case you're optimising local deduplication with
             | strictly monotonic index.
             | 
             | One downside is that you leak internals of other system
             | (partitions).
             | 
             | The other is that it implies serialised processing - you
             | can't process anything in parallel as you have single index
             | threshold that defines what has been and what has yet not
             | been processed.
        
               | Svenskunganka wrote:
               | I'm not the one who wrote the original comment, so I
               | can't modify it. But one should still commit offsets
               | because it is the happy-path; DB transaction successful?
               | Commit offset. If the latter fails due to e.g application
               | crash and you seek at startup to the partition offset
               | stored in the DB + 1, you get exactly-once semantics.
               | There's some more details, e.g you'd have to do the same
               | during consumer group rebalance, and topic configuration
               | also plays a role, for example if the topic is a
               | compacted topic or not, and if you write tombstones, what
               | its retention policy is.
               | 
               | edit: You added some more to your comment after I posted
               | this one, so I'll try to cover them as well:
               | 
               | > One downside is that you leak internals of other system
               | (partitions).
               | 
               | Yeah, sure.
               | 
               | > The other is that it implies serialised processing -
               | you can't process anything in parallel as you have single
               | index threshold that defines what has been and what has
               | yet not been processed.
               | 
               | It doesn't imply serialised processing. It depends on the
               | use-case, if each record in a topic has to be processed
               | serially, you can't parallelize full-stop; number of
               | partitions equals 1. But if each record can be
               | individually processed you get parallelism equal to the
               | number of partitions the topic has configured. You also
               | achieve parallelism in the same way if only some records
               | in a topic needs to be processed serially, at which point
               | you can use the same key for the records needing to be
               | serially processed and they will end up in the same
               | partition, for example recording the coordinates of a
               | plane - each plane can be processed in parallel, but an
               | individual plane's coordinates need to be processed
               | serially - just use the planes unique identifier as key
               | and the coordinates for the same plane will be appended
               | to the log of the same partition.
        
               | mirekrusin wrote:
               | Yes, it's good option but it requires serialised
               | processing in partition scope, which may or may not be
               | desirable.
               | 
               | If one-and-only-one semantics are needed and processing
               | should be parallel, other methods have to be used.
        
               | francoismassot wrote:
               | Good point: first you're right, we do the ack on Kafka
               | but it's not necessary. Second, this is not what I wanted
               | to stress... and I should have not used the verb
               | "acknowledge". What we do is upload the data on S3, then
               | we commit partitions + positions in what we call the
               | metastore. I can't edit my comment unfortunately.
               | 
               | > One downside is that you leak internals of other system
               | (partitions).
               | 
               | True, but we generalized the concept of partitions for
               | other datasources, pretty convenient to use it for
               | distributing indexing tasks.
        
               | fulmicoton wrote:
               | > The other is that it implies serialised processing -
               | you can't process anything > in parallel as you have
               | single index threshold that defines what has been and >
               | what has yet not been processed.
               | 
               | Fortunately Kafka is partitioned. You cannot work in
               | parallel along partitions.
               | 
               | Also, you can streamline your process. If you are running
               | your data through operation (A, B, C). (C on batch N) can
               | run at the same time as (B on batch N+1), and (A on batch
               | N+2)
               | 
               | We do both at quickwit.
        
         | foota wrote:
         | Isn't exactly once delivery the kind of problem like the CAP
         | thereom where it's not possible?
         | 
         | You can make the downstream idemptoent wrt what the queue is
         | delivering, but the queue might still redeliver things.
        
         | ankitnayan wrote:
         | https://www.confluent.io/blog/exactly-once-semantics-are-pos...
        
       | bushbaba wrote:
       | Seems like overkill no? Otel collectors are fairly cheap, why add
       | expensive Kafka into the mix. If you need to buffer why not just
       | dump to s3 or similar data store as a temporary storage array.
        
         | francoismassot wrote:
         | I really like this idea. And there is an OTEL exporter to AWS
         | S3, still in alpha but I'm gonna test it soon:
         | https://github.com/open-telemetry/opentelemetry-collector-co...
        
         | prpl wrote:
         | Why not both, dump to S3 and write pointers to kafka for
         | portable event-based ingestion (since everybody does messages a
         | bit differently)
        
           | bushbaba wrote:
           | No need as s3 objects is your dead letter queue and the
           | system should be designed anyway to coupe with multiple write
           | of same event.
           | 
           | The point is to only use s3 etc in the event of system
           | instability. Not as a primary data transfer means.
        
         | lmm wrote:
         | > If you need to buffer why not just dump to s3 or similar data
         | store as a temporary storage array.
         | 
         | At that point it's very easy to sleepwalk into implementing
         | your own database on top of s3, which is very hard to get good
         | semantics out of - e.g. it offers essentially no ordering
         | guarantees, and forget atomicity. For telemetry you might well
         | be ok with fuzzy data, but if you want exact traces every time
         | then Kafka could make sense.
        
           | dikei wrote:
           | Yeah, and to use S3 efficiently you also need to batch your
           | messages into large blobs of at least 10s of MB, which
           | further complicates the matter, especially if you don't want
           | to lose those messages buffers.
        
             | bushbaba wrote:
             | if your otel collector is being overwhelmed. In such cases
             | you have a lot of backlogged data not able to be ingested.
             | So you dead letter queue it to s3 for freeing up buffers.
             | 
             | The approach here is to only send data to s3 as a last
             | ditch resort.
        
           | ankitnayan wrote:
           | it's very hard to think s3 work as a buffer. Every datastore
           | can work for almost all storage usecases buffer/queue/db when
           | the scale is low but the latter were designed to work at
           | scale
        
         | richieartoul wrote:
         | (WarpStream founder)
         | 
         | This is more or less exactly what WarpStream is:
         | https://www.warpstream.com/blog/minimizing-s3-api-costs-with...
         | 
         | Kafka API, S3 costs and ease of use
        
       | daurnimator wrote:
       | I expect it would be far cheaper to scale up tempo/loki than it
       | would be to even run an idle kafka cluster. This feels like
       | spending thousands of dollars to save tens of dollars.
        
         | neetle wrote:
         | Tempo can still buckle under huge bursts of traffic, and you
         | don't need the retention to be in the hours
        
         | pranay01 wrote:
         | Querying in Tempo/Loki does seem to not scale particularly
         | well, and Loki has known issues with high cardinality data,
         | so...
        
         | ankitnayan wrote:
         | When handling surges of the order of 10x, it's much more
         | difficult to scale the different components of loki than to
         | write them to Kafka/Redpanda first and consume at a consistent
         | rate.
        
       | blinded wrote:
       | This arch is how the big players do it at scale (ie. datadog, new
       | relic - the second it passes their edge it lands in a kafka
       | cluster). Also otel components lack rate limiting(1) meaning its
       | super easy to overload your backend storage (s3).
       | 
       | Grafana has some posts how they softened the s3 blow with
       | memcached(2,3).
       | 
       | 1. https://github.com/open-telemetry/opentelemetry-collector-
       | co... 2. https://grafana.com/docs/loki/latest/operations/caching/
       | 3. https://grafana.com/blog/2023/08/23/how-we-scaled-grafana-
       | cl...
       | 
       | I know the post is about telemetry data and my comments on
       | grafana are logs, but the arch bits still apply.
        
         | ankitnayan wrote:
         | Caching is to improve read performance whereas Kafka is used to
         | handle ingest volume. I couldn't correlate the Grafana articles
         | shared
        
         | wardb wrote:
         | Grafana Labs employee here => On the linked articles: I'm not
         | aware of any caching being used in the writing data to S3 part
         | of the pipeline other then some time based/volume based
         | buffering at the ingester microservices before writing the
         | chunks of data to object storage.
         | 
         | The linked Loki caching docs/articles are for optimising the
         | read access patterns of S3/object storage, not for writes.
        
       | chris_armstrong wrote:
       | A similar idea [^1] has cropped up in the serverless
       | OpenTelemetry world to collate OpenTelemetry spans in a Kinesis
       | stream before forwarding them to a third-party service for
       | analysis, obviating the need for a separate collector, reducing
       | forwarding latency and removing the cold-start overhead of the
       | AWS Distribution for OpenTelemetry Lambda Layer.
       | 
       | [^1] https://x.com/donkersgood/status/1662074303456636929?s=20
        
       | Joel_Mckay wrote:
       | If you have distributed concurrent data streams that exhibit
       | coherent temporal events, than at some point you pretty much have
       | to implement a queuing balancer.
       | 
       | One simply trades latency for capacity and eventual coherent data
       | locality.
       | 
       | Its almost a arbitrary detail whether you use Kafka, RabbitMQ, or
       | Erlang channels. If you can add smart client application-layer
       | predictive load-balancing, than it is possible to cut burst
       | traffic loads by a magnitude or two. Cost optimized Dynamic host
       | scaling is not always a solution that solves every problem.
       | 
       | Good luck out there =)
        
       | nicognaw wrote:
       | Signoz is too good at SEO.
       | 
       | Early days, I looked up otel and observability stuff, and I
       | always saw Signoz articles on the first screen.
        
       ___________________________________________________________________
       (page generated 2023-10-23 09:00 UTC)