$ ~ echo "Hello World" | kafkacat -b localhost:29092 -t hello-topic
% Auto-selecting Producer mode (use -P or -C to override)
$ ~ kafkacat -b localhost:29092 -t hello-topic
% Auto-selecting Consumer mode (use -P or -C to override)
Hello World
% Reached end of topic hello-topic [0] at offset 1
$ ~ echo "123:Jane Smith" | kafkacat -b localhost:29092 -t customers -K:
% Auto-selecting Producer mode (use -P or -C to override)
$ ~ kafkacat -b localhost:29092 -t customers -K:
% Auto-selecting Consumer mode (use -P or -C to override)
123:Jane Smith
% Reached end of topic customers [0] at offset 1
$ ~ kafkacat -b localhost:29092 -t customers
% Auto-selecting Consumer mode (use -P or -C to override)
Jane Smith
% Reached end of topic customers [0] at offset 1
$ ~ kafkacat -C -b localhost:29092 -t movies -s value=avro -r http://localhost:8081
------------------------------------------------------
{"id": 294, "title": "Die Hard", "release_year": 1988}
{"id": 354, "title": "Tree of Life", "release_year": 2011}
{"id": 782, "title": "A Walk in the Clouds", "release_year": 1995}
{"id": 128, "title": "The Big Lebowski", "release_year": 1998}
{"id": 780, "title": "Super Mario Bros.", "release_year": 1993}
$ ~ curl -X POST \
-H "Content-Type: application/vnd.kafka.avro.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data @newMovieData.json "http://localhost:8082/topics/movies"
------------------------------------------------------------------------------
{"offsets":[{"partition":0,"offset":5,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":3}
$ ~ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "movie_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/movie_consumers
------------------------------------------------------------------------------
{"instance_id":"movie_consumer_instance","base_uri":"http://localhost:8082/consumers/movie_consumers/instances/movie_consumer_instance"}
$ ~ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["movies"]}' \ http://localhost:8082/consumers/movie_consumers/instances/movie_consumer_instance/subscription
$ ~ curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \ http://localhost:8082/consumers/movie_consumers/instances/movie_consumer_instance/records
----------------------------------------------------------------------
[{"topic":"movies","key":null,"value":{"id": 294, "title": "Die Hard", "release_year": 1988},"partition":0,"offset":0},
{"topic":"movies","key":null,"value":{"id": 354, "title": "Tree of Life", "release_year": 2011},"partition":0,"offset":1},{"topic":"movies","key":null,"value":{"id": 782, "title": "A Walk in the Clouds", "release_year": 1995},"partition":0,"offset":2},{"topic":"movies","key":null,"value":{"id": 128, "title": "The Big Lebowski", "release_year": 1998},"partition":0,"offset":3},{"topic":"movies","key":null,"value":{"id": 780, "title": "Super Mario Bros.", "release_year": 1993},"partition":0,"offset":4},{"topic":"movies","key":null,"value":{"id":101,"title":"Chariots of Fire","release_year":1981},"partition":0,"offset":5}]
$ ~ curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \ http://localhost:8082/consumers/movie_consumers/instances/movie_consumer_instance
$ ~ curl "http://localhost:8082/brokers"
$ ~ curl "http://localhost:8082/topics"
$ ~ curl "http://localhost:8082/topics/movies"
$ ~ curl "http://localhost:8082/topics/movies/partitions"
$ ~ curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
http://localhost:8082/consumers/movie_consumers/instances/movie_consumer_instance/records | jq
[
{
"topic": "movies",
"key": null,
"value": {
"id": 294,
"title": "Die Hard",
"release_year": 1988
},
"partition": 0,
"offset": 0
},
{
"topic": "movies",
"key": null,
"value": {
"id": 354,
"title": "Tree of Life",
"release_year": 2011
},
"partition": 0,
"offset": 1
},
{
"topic": "movies",
"key": null,
"value": {
"id": 782,
"title": "A Walk in the Clouds",
"release_year": 1995
},
"partition": 0,
"offset": 2
},
{
"topic": "movies",
"key": null,
"value": {
"id": 128,
"title": "The Big Lebowski",
"release_year": 1998
},
"partition": 0,
"offset": 3
},
{
"topic": "movies",
"key": null,
"value": {
"id": 780,
"title": "Super Mario Bros.",
"release_year": 1993
},
"partition": 0,
"offset": 4
},
{
"topic": "movies",
"key": null,
"value": {
"id": 101,
"title": "Chariots of Fire",
"release_year": 1981
},
"partition": 0,
"offset": 5
}
]
$ ~ curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
http://localhost:8082/consumers/movie_consumers/instances/movie_consumer_instance/records | jq \
| jq '.[] | {title: .value.title, year: .value.release_year}'
{
"title": "Die Hard",
"year": 1988
}
{
"title": "Tree of Life",
"year": 2011
}
{
"title": "A Walk in the Clouds",
"year": 1995
}
{
"title": "The Big Lebowski",
"year": 1998
}
{
"title": "Super Mario Bros.",
"year": 1993
}
{
"title": "Chariots of Fire",
"year": 1981
}
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [movies])
--> KSTREAM-MAP-0000000001
Processor: KSTREAM-MAP-0000000001 (stores: [])
--> KSTREAM-SINK-0000000002
<-- KSTREAM-SOURCE-0000000000
Sink: KSTREAM-SINK-0000000002 (topic: rekeyed-movies)
<-- KSTREAM-MAP-0000000001 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000010 (topics: [KSTREAM-MAP-0000000007-repartition]) --> KSTREAM-JOIN-0000000011
Processor: KSTREAM-JOIN-0000000011 (stores: [rekeyed-movies-STATE-STORE-0000000003])
--> KSTREAM-SINK-0000000012
<-- KSTREAM-SOURCE-0000000010 Source: KSTREAM-SOURCE-0000000004 (topics: [rekeyed-movies]) --> KTABLE-SOURCE-0000000005
Sink: KSTREAM-SINK-0000000012 (topic: rated-movies)
<-- KSTREAM-JOIN-0000000011 Processor: KTABLE-SOURCE-0000000005 (stores: [rekeyed-movies-STATE-STORE-0000000003]) --> none
<-- KSTREAM-SOURCE-0000000004 Sub-topology: 2 Source: KSTREAM-SOURCE-0000000006 (topics: [ratings]) --> KSTREAM-MAP-0000000007
Processor: KSTREAM-MAP-0000000007 (stores: [])
--> KSTREAM-FILTER-0000000009
<-- KSTREAM-SOURCE-0000000006 Processor: KSTREAM-FILTER-0000000009 (stores: []) --> KSTREAM-SINK-0000000008
<-- KSTREAM-MAP-0000000007
Sink: KSTREAM-SINK-0000000008 (topic: KSTREAM-MAP-0000000007-repartition)
<-- KSTREAM-FILTER-0000000009
ksql> SHOW QUERIES;
Query ID | Query Type | Status | Sink Name | Sink Kafka Topic | Query String
--------------------------------------------------------------------------------------------------------------------------
CSAS_SHIPPED_ORDERS_0 | PERSISTENT | RUNNING:1 | SHIPPED_ORDERS | SHIPPED_ORDERS | CREATE STREAM SHIPPED_ORDERS WITH
...
ksql> EXPLAIN CSAS_SHIPPED_ORDERS_0;