bulk-checkin.c 7.3 KB
Newer Older
1 2 3
/*
 * Copyright (c) 2011, Google Inc.
 */
4
#include "cache.h"
5
#include "bulk-checkin.h"
6
#include "repository.h"
7 8
#include "csum-file.h"
#include "pack.h"
9
#include "strbuf.h"
10
#include "packfile.h"
11
#include "object-store.h"
12 13 14 15 16

static struct bulk_checkin_state {
	unsigned plugged:1;

	char *pack_tmp_name;
17
	struct hashfile *f;
18 19 20 21 22 23 24 25 26 27
	off_t offset;
	struct pack_idx_option pack_idx_opts;

	struct pack_idx_entry **written;
	uint32_t alloc_written;
	uint32_t nr_written;
} state;

static void finish_bulk_checkin(struct bulk_checkin_state *state)
{
28
	struct object_id oid;
29
	struct strbuf packname = STRBUF_INIT;
30 31 32 33 34 35 36 37 38 39
	int i;

	if (!state->f)
		return;

	if (state->nr_written == 0) {
		close(state->f->fd);
		unlink(state->pack_tmp_name);
		goto clear_exit;
	} else if (state->nr_written == 1) {
40
		finalize_hashfile(state->f, oid.hash, CSUM_HASH_IN_STREAM | CSUM_FSYNC | CSUM_CLOSE);
41
	} else {
42
		int fd = finalize_hashfile(state->f, oid.hash, 0);
43 44
		fixup_pack_header_footer(fd, oid.hash, state->pack_tmp_name,
					 state->nr_written, oid.hash,
45 46 47 48
					 state->offset);
		close(fd);
	}

49 50
	strbuf_addf(&packname, "%s/pack/pack-", get_object_directory());
	finish_tmp_packfile(&packname, state->pack_tmp_name,
51
			    state->written, state->nr_written,
52
			    &state->pack_idx_opts, oid.hash);
53 54 55 56 57 58 59
	for (i = 0; i < state->nr_written; i++)
		free(state->written[i]);

clear_exit:
	free(state->written);
	memset(state, 0, sizeof(*state));

60
	strbuf_release(&packname);
61
	/* Make objects we just wrote available to ourselves */
62
	reprepare_packed_git(the_repository);
63 64
}

65
static int already_written(struct bulk_checkin_state *state, struct object_id *oid)
66 67 68 69
{
	int i;

	/* The object may already exist in the repository */
70
	if (has_object_file(oid))
71 72 73 74
		return 1;

	/* Might want to keep the list sorted */
	for (i = 0; i < state->nr_written; i++)
75
		if (oideq(&state->written[i]->oid, oid))
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
			return 1;

	/* This is a new object we need to keep */
	return 0;
}

/*
 * Read the contents from fd for size bytes, streaming it to the
 * packfile in state while updating the hash in ctx. Signal a failure
 * by returning a negative value when the resulting pack would exceed
 * the pack size limit and this is not the first object in the pack,
 * so that the caller can discard what we wrote from the current pack
 * by truncating it and opening a new one. The caller will then call
 * us again after rewinding the input fd.
 *
 * The already_hashed_to pointer is kept untouched by the caller to
 * make sure we do not hash the same byte when we are called
 * again. This way, the caller does not have to checkpoint its hash
 * status before calling us just in case we ask it to call us again
 * with a new pack.
 */
static int stream_to_pack(struct bulk_checkin_state *state,
98
			  git_hash_ctx *ctx, off_t *already_hashed_to,
99 100 101 102 103 104 105 106 107 108 109 110
			  int fd, size_t size, enum object_type type,
			  const char *path, unsigned flags)
{
	git_zstream s;
	unsigned char obuf[16384];
	unsigned hdrlen;
	int status = Z_OK;
	int write_object = (flags & HASH_WRITE_OBJECT);
	off_t offset = 0;

	git_deflate_init(&s, pack_compression_level);

111
	hdrlen = encode_in_pack_object_header(obuf, sizeof(obuf), type, size);
112 113 114 115 116 117 118 119
	s.next_out = obuf + hdrlen;
	s.avail_out = sizeof(obuf) - hdrlen;

	while (status != Z_STREAM_END) {
		unsigned char ibuf[16384];

		if (size && !s.avail_in) {
			ssize_t rsize = size < sizeof(ibuf) ? size : sizeof(ibuf);
120 121 122 123
			ssize_t read_result = read_in_full(fd, ibuf, rsize);
			if (read_result < 0)
				die_errno("failed to read from '%s'", path);
			if (read_result != rsize)
124 125 126 127 128 129 130 131
				die("failed to read %d bytes from '%s'",
				    (int)rsize, path);
			offset += rsize;
			if (*already_hashed_to < offset) {
				size_t hsize = offset - *already_hashed_to;
				if (rsize < hsize)
					hsize = rsize;
				if (hsize)
132
					the_hash_algo->update_fn(ctx, ibuf, hsize);
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
				*already_hashed_to = offset;
			}
			s.next_in = ibuf;
			s.avail_in = rsize;
			size -= rsize;
		}

		status = git_deflate(&s, size ? 0 : Z_FINISH);

		if (!s.avail_out || status == Z_STREAM_END) {
			if (write_object) {
				size_t written = s.next_out - obuf;

				/* would we bust the size limit? */
				if (state->nr_written &&
				    pack_size_limit_cfg &&
				    pack_size_limit_cfg < state->offset + written) {
					git_deflate_abort(&s);
					return -1;
				}

154
				hashwrite(state->f, obuf, written);
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
				state->offset += written;
			}
			s.next_out = obuf;
			s.avail_out = sizeof(obuf);
		}

		switch (status) {
		case Z_OK:
		case Z_BUF_ERROR:
		case Z_STREAM_END:
			continue;
		default:
			die("unexpected deflate failure: %d", status);
		}
	}
	git_deflate_end(&s);
	return 0;
}

/* Lazily create backing packfile for the state */
static void prepare_to_stream(struct bulk_checkin_state *state,
			      unsigned flags)
{
	if (!(flags & HASH_WRITE_OBJECT) || state->f)
		return;

	state->f = create_tmp_packfile(&state->pack_tmp_name);
	reset_pack_idx_option(&state->pack_idx_opts);

	/* Pretend we are going to write only one object */
	state->offset = write_pack_header(state->f, 1);
	if (!state->offset)
		die_errno("unable to write pack header");
}

static int deflate_to_pack(struct bulk_checkin_state *state,
191
			   struct object_id *result_oid,
192 193 194 195 196
			   int fd, size_t size,
			   enum object_type type, const char *path,
			   unsigned flags)
{
	off_t seekback, already_hashed_to;
197
	git_hash_ctx ctx;
198 199
	unsigned char obuf[16384];
	unsigned header_len;
200
	struct hashfile_checkpoint checkpoint;
201 202 203 204 205 206
	struct pack_idx_entry *idx = NULL;

	seekback = lseek(fd, 0, SEEK_CUR);
	if (seekback == (off_t) -1)
		return error("cannot find the current offset");

207
	header_len = xsnprintf((char *)obuf, sizeof(obuf), "%s %" PRIuMAX,
208
			       type_name(type), (uintmax_t)size) + 1;
209 210
	the_hash_algo->init_fn(&ctx);
	the_hash_algo->update_fn(&ctx, obuf, header_len);
211 212 213 214 215 216 217 218 219 220

	/* Note: idx is non-NULL when we are writing */
	if ((flags & HASH_WRITE_OBJECT) != 0)
		idx = xcalloc(1, sizeof(*idx));

	already_hashed_to = 0;

	while (1) {
		prepare_to_stream(state, flags);
		if (idx) {
221
			hashfile_checkpoint(state->f, &checkpoint);
222 223 224 225 226 227 228 229 230 231 232 233
			idx->offset = state->offset;
			crc32_begin(state->f);
		}
		if (!stream_to_pack(state, &ctx, &already_hashed_to,
				    fd, size, type, path, flags))
			break;
		/*
		 * Writing this object to the current pack will make
		 * it too big; we need to truncate it, start a new
		 * pack, and write into it.
		 */
		if (!idx)
234
			BUG("should not happen");
235
		hashfile_truncate(state->f, &checkpoint);
236 237 238 239 240
		state->offset = checkpoint.offset;
		finish_bulk_checkin(state);
		if (lseek(fd, seekback, SEEK_SET) == (off_t) -1)
			return error("cannot seek back");
	}
241
	the_hash_algo->final_fn(result_oid->hash, &ctx);
242 243 244 245
	if (!idx)
		return 0;

	idx->crc32 = crc32_end(state->f);
246
	if (already_written(state, result_oid)) {
247
		hashfile_truncate(state->f, &checkpoint);
248 249 250
		state->offset = checkpoint.offset;
		free(idx);
	} else {
251
		oidcpy(&idx->oid, result_oid);
252 253 254 255 256 257 258 259
		ALLOC_GROW(state->written,
			   state->nr_written + 1,
			   state->alloc_written);
		state->written[state->nr_written++] = idx;
	}
	return 0;
}

260
int index_bulk_checkin(struct object_id *oid,
261 262 263
		       int fd, size_t size, enum object_type type,
		       const char *path, unsigned flags)
{
264
	int status = deflate_to_pack(&state, oid, fd, size, type,
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
				     path, flags);
	if (!state.plugged)
		finish_bulk_checkin(&state);
	return status;
}

void plug_bulk_checkin(void)
{
	state.plugged = 1;
}

void unplug_bulk_checkin(void)
{
	state.plugged = 0;
	if (state.f)
		finish_bulk_checkin(&state);
}